Selaa lähdekoodia

优化调整

晋江
OCEAN 2 kuukautta sitten
vanhempi
commit
b6c8d97ef6
4 muutettua tiedostoa jossa 105 lisäystä ja 425 poistoa
  1. +7
    -111
      Cargo.lock
  2. +1
    -2
      Cargo.toml
  3. +0
    -19
      config.json.orig
  4. +97
    -293
      src/main.rs

+ 7
- 111
Cargo.lock Näytä tiedosto

@@ -64,7 +64,7 @@ dependencies = [
"mime",
"percent-encoding",
"pin-project-lite",
"rand 0.9.0",
"rand",
"sha1",
"smallvec",
"tokio",
@@ -326,12 +326,6 @@ version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"

[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"

[[package]]
name = "bytes"
version = "1.10.1"
@@ -435,12 +429,6 @@ dependencies = [
"typenum",
]

[[package]]
name = "data-encoding"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"

[[package]]
name = "deranged"
version = "0.4.1"
@@ -603,17 +591,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"

[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "futures-sink"
version = "0.3.31"
@@ -634,8 +611,6 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
@@ -653,17 +628,6 @@ dependencies = [
"version_check",
]

[[package]]
name = "getrandom"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
]

[[package]]
name = "getrandom"
version = "0.3.2"
@@ -1272,38 +1236,17 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"

[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]

[[package]]
name = "rand"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"rand_chacha",
"rand_core",
"zerocopy",
]

[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
]

[[package]]
name = "rand_chacha"
version = "0.9.0"
@@ -1311,16 +1254,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.3",
]

[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.15",
"rand_core",
]

[[package]]
@@ -1329,7 +1263,7 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.2",
"getrandom",
]

[[package]]
@@ -1685,7 +1619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
dependencies = [
"fastrand",
"getrandom 0.3.2",
"getrandom",
"once_cell",
"rustix",
"windows-sys 0.59.0",
@@ -1791,18 +1725,6 @@ dependencies = [
"tokio",
]

[[package]]
name = "tokio-tungstenite"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]

[[package]]
name = "tokio-util"
version = "0.7.14"
@@ -1860,25 +1782,6 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"

[[package]]
name = "tungstenite"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror",
"url",
"utf-8",
]

[[package]]
name = "typenum"
version = "1.18.0"
@@ -1917,12 +1820,6 @@ dependencies = [
"percent-encoding",
]

[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"

[[package]]
name = "utf16_iter"
version = "1.0.5"
@@ -2059,13 +1956,12 @@ dependencies = [
"actix-cors",
"actix-web",
"anyhow",
"futures-util",
"once_cell",
"reqwest",
"serde",
"serde_json",
"serialport",
"tokio",
"tokio-tungstenite",
"winapi",
]



+ 1
- 2
Cargo.toml Näytä tiedosto

@@ -12,8 +12,7 @@ actix-web = "4.4"
actix-cors = "0.6"
tokio = { version = "1.0", features = ["full"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
tokio-tungstenite = "0.20"
futures-util = "0.3"
once_cell = "1.18"

[profile.release]
opt-level = 3


+ 0
- 19
config.json.orig Näytä tiedosto

@@ -1,19 +0,0 @@
{
"scale_type": "001",
"serial_port": "/dev/ttyS1",
"baud_rate": 9600,
"mqtt": {
"client_id": "weight_reader",
"host": "10.180.4.100",
"port": 1883,
<<<<<<< HEAD
"username": "auseft",
"password": "1q2w3E**",
=======
"username": "admin",
"password": "Auseft@2025",
>>>>>>> c2a749df78e5f78ad42f2fefc374d641c881253b
"keep_alive_secs": 60,
"topic_prefix": "weight/data"
}
}

+ 97
- 293
src/main.rs Näytä tiedosto

@@ -7,182 +7,138 @@ use serialport;
use actix_web::{web, App, HttpServer, HttpResponse};
use actix_cors::Cors;
use std::fs;
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tokio::sync::broadcast;
use std::path::PathBuf;
use std::sync::Mutex;
use once_cell::sync::Lazy;

#[derive(Serialize, Deserialize)]
struct WeightData {
weight: String,
timestamp: u64,
is_read: bool,
}

#[derive(Serialize)]
struct MacResponse {
mac_address: String,
}

#[derive(Serialize)]
struct ScaleTypeResponse {
scale_type: String,
}

#[derive(Deserialize)]
#[derive(Serialize, Deserialize)]
struct Config {
scale_type: String,
serial_port: String,
baud_rate: u32,
}

// 读取配置文件
fn read_config() -> Result<Config> {
let config_path = "config.json";
let content = fs::read_to_string(config_path)
.with_context(|| format!("无法读取配置文件 {}", config_path))?;
let config: Config = serde_json::from_str(&content)
.with_context(|| format!("配置文件格式错误 {}", config_path))?;
let config_str = fs::read_to_string("config.json")?;
let config: Config = serde_json::from_str(&config_str)?;
Ok(config)
}

fn get_mac_address() -> String {
match read_config() {
Ok(config) => config.scale_type,
Err(_) => String::from("Unknown")
}
}

fn get_scale_type() -> String {
match read_config() {
Ok(config) => config.scale_type,
Err(_) => String::from("Unknown")
}
}
// 使用全局静态变量来存储重量数据
static WEIGHT_DATA: Lazy<Mutex<WeightData>> = Lazy::new(|| {
Mutex::new(WeightData {
weight: "0".to_string(),
timestamp: 0,
is_read: false,
})
});

// HTTP处理函数
// 获取MAC地址的HTTP处理函数
async fn get_mac() -> HttpResponse {
let mac = get_mac_address();
println!("HTTP请求:获取MAC地址 = {}", mac);
let response = MacResponse {
mac_address: mac,
};
let response = serde_json::json!({
"mac": "00:11:22:33:44:55" // 这里替换为实际的MAC地址获取逻辑
});
HttpResponse::Ok().json(response)
}

// 新增的HTTP处理函数
// 获取秤类型的HTTP处理函数
async fn get_scale() -> HttpResponse {
let scale_type = get_scale_type();
println!("HTTP请求:获取天平类型 = {}", scale_type);
let response = ScaleTypeResponse {
scale_type,
let config = match read_config() {
Ok(config) => config,
Err(_) => return HttpResponse::InternalServerError().finish(),
};

let response = serde_json::json!({
"scale_type": config.scale_type
});
HttpResponse::Ok().json(response)
}

// WebSocket测试客户端函数
async fn test_websocket_client() -> Result<()> {
println!("开始测试WebSocket客户端...");
// 连接到WebSocket服务器
println!("正在连接到WebSocket服务器...");
let (ws_stream, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:8081")
.await
.context("无法连接到WebSocket服务器")?;
println!("已成功连接到WebSocket服务器");

let (_, mut read) = ws_stream.split();

// 持续接收消息
println!("等待接收数据...");
while let Some(message) = read.next().await {
match message {
Ok(msg) => {
if let Ok(text) = msg.into_text() {
println!("收到原始数据: {}", text);
// 尝试解析JSON数据
match serde_json::from_str::<WeightData>(&text) {
Ok(weight_data) => {
println!("收到重量数据: {} (时间戳: {})", weight_data.weight, weight_data.timestamp);
}
Err(e) => {
println!("JSON解析错误: {}", e);
println!("原始数据: {}", text);
}
}
}
}
Err(e) => {
eprintln!("WebSocket接收错误: {}", e);
break;
}
}
// 获取重量数据的HTTP处理函数
async fn get_weight() -> HttpResponse {
let mut weight_data = WEIGHT_DATA.lock().unwrap();
if weight_data.is_read {
// 如果数据已读,返回 null
HttpResponse::Ok().json(serde_json::json!({"weight": null}))
} else {
// 标记为已读并返回数据
weight_data.is_read = true;
HttpResponse::Ok().json(&*weight_data)
}

Ok(())
}

// 模拟串口数据发送
async fn simulate_serial_data() -> Result<()> {
println!("开始模拟串口数据发送...");
// 串口数据处理函数
async fn run_serial() -> Result<()> {
// 读取配置文件
let config = read_config()?;
// 创建一个TCP连接到WebSocket服务器
println!("正在连接到WebSocket服务器...");
let (ws_stream, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:8081")
.await
.context("无法连接到WebSocket服务器")?;
println!("已成功连接到WebSocket服务器");
let (mut write, _) = ws_stream.split();
// 打开配置的串口
println!("正在尝试打开串口 {}...", config.serial_port);
let mut port = serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(1000))
.open()
.with_context(|| format!("无法打开串口 {}", config.serial_port))?;
println!("串口打开成功");
// 模拟每秒发送一个随机重量数据
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut counter = 0;
let mut buf = [0u8; 1024];
let mut accumulated_data = String::new();
println!("开始发送模拟数据 (按Ctrl+C退出)...");
loop {
interval.tick().await;
counter += 1;
// 生成一个模拟的重量值(10.5 - 20.5之间的随机数)
let weight = format!("{:.1}", 10.5 + (counter % 10) as f32);
// 创建数据结构
let weight_data = WeightData {
weight: weight.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

// 序列化并发送数据
if let Ok(json_data) = serde_json::to_string(&weight_data) {
println!("发送模拟数据: {} (JSON: {})", weight, json_data);
if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text(json_data)).await {
eprintln!("发送数据错误: {}", e);
break;
match port.read(&mut buf) {
Ok(bytes_read) if bytes_read > 0 => {
// 将读取的数据转换为字符串并添加到累积的数据中
if let Ok(data) = String::from_utf8(buf[..bytes_read].to_vec()) {
accumulated_data.push_str(&data);
// 检查是否有完整的行
if accumulated_data.contains('\n') {
// 处理所有完整的行
for line in accumulated_data.lines() {
let trimmed_data = line.trim();
if !trimmed_data.is_empty() {
println!("收到串口数据: {}", trimmed_data);
// 更新内存中的数据(直接覆盖)
let mut weight_data = WEIGHT_DATA.lock().unwrap();
weight_data.weight = trimmed_data.to_string();
weight_data.timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
weight_data.is_read = false; // 新数据未读
}
}
// 清除已处理的数据
accumulated_data.clear();
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// 超时是正常的,继续尝试读取
continue;
}
println!("数据发送成功");
Err(e) => {
eprintln!("串口读取错误: {}", e);
return Err(e.into());
}
_ => {}
}

// 等待一小段时间
tokio::time::sleep(Duration::from_secs(1)).await;
}

Ok(())
}

#[actix_web::main]
async fn main() -> Result<()> {
// 检查命令行参数
let args: Vec<String> = std::env::args().collect();
if args.len() > 1 {
match args[1].as_str() {
"--test-client" => return test_websocket_client().await,
"--simulate" => return simulate_serial_data().await,
_ => {}
}
}

println!("程序启动...");
// 启动HTTP服务器
@@ -200,175 +156,23 @@ async fn main() -> Result<()> {
App::new()
.wrap(cors) // 添加CORS中间件
.route("/mac", web::get().to(get_mac))
.route("/scale", web::get().to(get_scale)) // 新增的路由
.route("/scale", web::get().to(get_scale))
.route("/weight", web::get().to(get_weight))
})
.bind(("0.0.0.0", 8080))?
.run();
println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080");
// 在新线程中运行WebSocket和串口服务
let _websocket_handle = tokio::spawn(async {
if let Err(e) = run_websocket_and_serial().await {
eprintln!("WebSocket/串口服务错误: {}", e);
// 在新线程中运行串口服务
let _serial_handle = tokio::spawn(async {
if let Err(e) = run_serial().await {
eprintln!("串口服务错误: {}", e);
}
});
// 等待HTTP服务器结束
server.await?;
Ok(())
}

// WebSocket和串口处理函数
async fn run_websocket_and_serial() -> Result<()> {
// 读取配置
let config = read_config()?;
// 创建一个广播通道,用于向所有WebSocket客户端发送数据
let (tx, _rx) = broadcast::channel::<String>(100);

// 启动WebSocket服务器
let listener = TcpListener::bind("127.0.0.1:8081").await?;
println!("WebSocket服务器已启动,监听在 ws://127.0.0.1:8081");

// 在单独的任务中处理串口数据
let tx_serial = tx.clone();
let _serial_handle = tokio::spawn(async move {
// 打开配置的串口
println!("正在尝试打开串口 {}...", config.serial_port);
let mut port = serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(1000))
.open()
.with_context(|| format!("无法打开串口 {}", config.serial_port))?;

println!("成功打开串口 {}!", config.serial_port);
println!("正在读取数据... (按 Ctrl+C 退出)");

// 增加缓冲区大小并使用String来累积数据
let mut serial_buf: Vec<u8> = vec![0; 1024];
let mut accumulated_data = String::new();
loop {
match port.read(serial_buf.as_mut_slice()) {
Ok(t) => {
if t > 0 {
// 将新数据添加到累积的字符串中
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
accumulated_data.push_str(&data);
// 检查是否有完整的数据行(以换行符或回车符结束)
if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
// 处理累积的数据
let lines: Vec<&str> = accumulated_data
.split(|c| c == '\n' || c == '\r')
.filter(|s| !s.is_empty())
.collect();
for line in lines {
let trimmed_data = line.trim();
if !trimmed_data.is_empty() {
println!("收到串口数据: {}", trimmed_data);
// 创建带时间戳的数据结构
let weight_data = WeightData {
weight: trimmed_data.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

// 序列化数据
if let Ok(json_data) = serde_json::to_string(&weight_data) {
println!("发送串口数据到WebSocket: {}", json_data);
// 发送数据到所有WebSocket客户端
if let Err(e) = tx_serial.send(json_data) {
eprintln!("发送数据到广播通道失败: {}", e);
}
}
}
}
// 清除已处理的数据
accumulated_data.clear();
}
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// 超时是正常的,继续尝试读取
continue;
}
Err(e) => {
eprintln!("串口读取错误: {}", e);
break;
}
}
}
Ok::<_, anyhow::Error>(())
});

// 处理WebSocket连接
while let Ok((stream, addr)) = listener.accept().await {
println!("新的WebSocket连接: {}", addr);
let ws_stream = accept_async(stream).await.expect("Failed to accept WebSocket");
let tx = tx.clone();
// 为每个WebSocket连接创建一个新任务
tokio::spawn(async move {
println!("开始处理WebSocket连接: {}", addr);
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let mut rx = tx.subscribe();

// 创建两个任务:一个用于接收消息,一个用于发送消息
let mut send_task = tokio::spawn(async move {
println!("[{}] 等待接收广播数据...", addr);
while let Ok(msg) = rx.recv().await {
println!("[{}] 准备发送数据: {}", addr, msg);
if let Err(e) = ws_sender.send(tokio_tungstenite::tungstenite::Message::Text(msg.clone())).await {
eprintln!("[{}] 发送WebSocket消息失败: {}", addr, e);
break;
}
println!("[{}] 成功发送数据", addr);
}
});

let tx_clone = tx.clone();
let mut recv_task = tokio::spawn(async move {
println!("[{}] 开始监听WebSocket消息...", addr);
while let Some(result) = ws_receiver.next().await {
match result {
Ok(msg) => {
if let Ok(text) = msg.into_text() {
println!("[{}] 收到WebSocket消息: {}", addr, text);
// 将收到的消息广播给所有客户端
if let Err(e) = tx_clone.send(text.clone()) {
eprintln!("[{}] 广播消息失败: {}", addr, e);
}
}
}
Err(e) => {
eprintln!("[{}] WebSocket接收错误: {}", addr, e);
break;
}
}
}
});

// 等待任意一个任务结束
tokio::select! {
_ = (&mut send_task) => {
println!("[{}] 发送任务结束", addr);
}
_ = (&mut recv_task) => {
println!("[{}] 接收任务结束", addr);
}
}

println!("[{}] WebSocket连接结束", addr);
});
}

Ok(())
}

Loading…
Peruuta
Tallenna