diff --git a/Cargo.lock b/Cargo.lock index 8b4fc19..15d5c34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,7 +64,7 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rand 0.9.0", + "rand", "sha1", "smallvec", "tokio", @@ -326,12 +326,6 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytes" version = "1.10.1" @@ -435,12 +429,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "data-encoding" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010" - [[package]] name = "deranged" version = "0.4.1" @@ -603,17 +591,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.31" @@ -634,8 +611,6 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", "futures-io", - "futures-macro", - "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -653,17 +628,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "getrandom" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.11.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.3.2" @@ -1272,38 +1236,17 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - [[package]] name = "rand" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.3", + "rand_chacha", + "rand_core", "zerocopy", ] -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", -] - [[package]] name = "rand_chacha" version = "0.9.0" @@ -1311,16 +1254,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.3", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom 0.2.15", + "rand_core", ] [[package]] @@ -1329,7 +1263,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom 0.3.2", + "getrandom", ] [[package]] @@ -1685,7 +1619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" dependencies = [ "fastrand", - "getrandom 0.3.2", + "getrandom", "once_cell", "rustix", "windows-sys 0.59.0", @@ -1791,18 +1725,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.20.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite", -] - [[package]] name = "tokio-util" version = "0.7.14" @@ -1860,25 +1782,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.20.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http", - "httparse", - "log", - "rand 0.8.5", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "typenum" version = "1.18.0" @@ -1917,12 +1820,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf-8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" - [[package]] name = "utf16_iter" version = "1.0.5" @@ -2059,13 +1956,12 @@ dependencies = [ "actix-cors", "actix-web", "anyhow", - "futures-util", + "once_cell", "reqwest", "serde", "serde_json", "serialport", "tokio", - "tokio-tungstenite", "winapi", ] diff --git a/Cargo.toml b/Cargo.toml index 9aefafb..405eae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,7 @@ actix-web = "4.4" actix-cors = "0.6" tokio = { version = "1.0", features = ["full"] } reqwest = { version = "0.11", features = ["blocking", "json"] } -tokio-tungstenite = "0.20" -futures-util = "0.3" +once_cell = "1.18" [profile.release] opt-level = 3 diff --git a/config.json.orig b/config.json.orig deleted file mode 100644 index 96c67c2..0000000 --- a/config.json.orig +++ /dev/null @@ -1,19 +0,0 @@ -{ - "scale_type": "001", - "serial_port": "/dev/ttyS1", - "baud_rate": 9600, - "mqtt": { - "client_id": "weight_reader", - "host": "10.180.4.100", - "port": 1883, -<<<<<<< HEAD - "username": "auseft", - "password": "1q2w3E**", -======= - "username": "admin", - "password": "Auseft@2025", ->>>>>>> c2a749df78e5f78ad42f2fefc374d641c881253b - "keep_alive_secs": 60, - "topic_prefix": "weight/data" - } -} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c7bf7cc..17f4075 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,182 +7,138 @@ use serialport; use actix_web::{web, App, HttpServer, HttpResponse}; use actix_cors::Cors; use std::fs; -use futures_util::{SinkExt, StreamExt}; -use tokio::net::TcpListener; -use tokio_tungstenite::accept_async; -use tokio::sync::broadcast; +use std::path::PathBuf; +use std::sync::Mutex; +use once_cell::sync::Lazy; #[derive(Serialize, Deserialize)] struct WeightData { weight: String, timestamp: u64, + is_read: bool, } -#[derive(Serialize)] -struct MacResponse { - mac_address: String, -} - -#[derive(Serialize)] -struct ScaleTypeResponse { - scale_type: String, -} - -#[derive(Deserialize)] +#[derive(Serialize, Deserialize)] struct Config { scale_type: String, serial_port: String, baud_rate: u32, } +// 读取配置文件 fn read_config() -> Result { - let config_path = "config.json"; - let content = fs::read_to_string(config_path) - .with_context(|| format!("无法读取配置文件 {}", config_path))?; - let config: Config = serde_json::from_str(&content) - .with_context(|| format!("配置文件格式错误 {}", config_path))?; + let config_str = fs::read_to_string("config.json")?; + let config: Config = serde_json::from_str(&config_str)?; Ok(config) } -fn get_mac_address() -> String { - match read_config() { - Ok(config) => config.scale_type, - Err(_) => String::from("Unknown") - } -} - -fn get_scale_type() -> String { - match read_config() { - Ok(config) => config.scale_type, - Err(_) => String::from("Unknown") - } -} +// 使用全局静态变量来存储重量数据 +static WEIGHT_DATA: Lazy> = Lazy::new(|| { + Mutex::new(WeightData { + weight: "0".to_string(), + timestamp: 0, + is_read: false, + }) +}); -// HTTP处理函数 +// 获取MAC地址的HTTP处理函数 async fn get_mac() -> HttpResponse { - let mac = get_mac_address(); - println!("HTTP请求:获取MAC地址 = {}", mac); - let response = MacResponse { - mac_address: mac, - }; + let response = serde_json::json!({ + "mac": "00:11:22:33:44:55" // 这里替换为实际的MAC地址获取逻辑 + }); HttpResponse::Ok().json(response) } -// 新增的HTTP处理函数 +// 获取秤类型的HTTP处理函数 async fn get_scale() -> HttpResponse { - let scale_type = get_scale_type(); - println!("HTTP请求:获取天平类型 = {}", scale_type); - let response = ScaleTypeResponse { - scale_type, + let config = match read_config() { + Ok(config) => config, + Err(_) => return HttpResponse::InternalServerError().finish(), }; + + let response = serde_json::json!({ + "scale_type": config.scale_type + }); HttpResponse::Ok().json(response) } -// WebSocket测试客户端函数 -async fn test_websocket_client() -> Result<()> { - println!("开始测试WebSocket客户端..."); - - // 连接到WebSocket服务器 - println!("正在连接到WebSocket服务器..."); - let (ws_stream, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:8081") - .await - .context("无法连接到WebSocket服务器")?; - println!("已成功连接到WebSocket服务器"); - - let (_, mut read) = ws_stream.split(); - - // 持续接收消息 - println!("等待接收数据..."); - while let Some(message) = read.next().await { - match message { - Ok(msg) => { - if let Ok(text) = msg.into_text() { - println!("收到原始数据: {}", text); - // 尝试解析JSON数据 - match serde_json::from_str::(&text) { - Ok(weight_data) => { - println!("收到重量数据: {} (时间戳: {})", weight_data.weight, weight_data.timestamp); - } - Err(e) => { - println!("JSON解析错误: {}", e); - println!("原始数据: {}", text); - } - } - } - } - Err(e) => { - eprintln!("WebSocket接收错误: {}", e); - break; - } - } +// 获取重量数据的HTTP处理函数 +async fn get_weight() -> HttpResponse { + let mut weight_data = WEIGHT_DATA.lock().unwrap(); + if weight_data.is_read { + // 如果数据已读,返回 null + HttpResponse::Ok().json(serde_json::json!({"weight": null})) + } else { + // 标记为已读并返回数据 + weight_data.is_read = true; + HttpResponse::Ok().json(&*weight_data) } - - Ok(()) } -// 模拟串口数据发送 -async fn simulate_serial_data() -> Result<()> { - println!("开始模拟串口数据发送..."); +// 串口数据处理函数 +async fn run_serial() -> Result<()> { + // 读取配置文件 + let config = read_config()?; - // 创建一个TCP连接到WebSocket服务器 - println!("正在连接到WebSocket服务器..."); - let (ws_stream, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:8081") - .await - .context("无法连接到WebSocket服务器")?; - println!("已成功连接到WebSocket服务器"); - - let (mut write, _) = ws_stream.split(); + // 打开配置的串口 + println!("正在尝试打开串口 {}...", config.serial_port); + let mut port = serialport::new(&config.serial_port, config.baud_rate) + .timeout(Duration::from_millis(1000)) + .open() + .with_context(|| format!("无法打开串口 {}", config.serial_port))?; + + println!("串口打开成功"); - // 模拟每秒发送一个随机重量数据 - let mut interval = tokio::time::interval(Duration::from_secs(1)); - let mut counter = 0; + let mut buf = [0u8; 1024]; + let mut accumulated_data = String::new(); - println!("开始发送模拟数据 (按Ctrl+C退出)..."); loop { - interval.tick().await; - counter += 1; - - // 生成一个模拟的重量值(10.5 - 20.5之间的随机数) - let weight = format!("{:.1}", 10.5 + (counter % 10) as f32); - - // 创建数据结构 - let weight_data = WeightData { - weight: weight.clone(), - timestamp: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - }; - - // 序列化并发送数据 - if let Ok(json_data) = serde_json::to_string(&weight_data) { - println!("发送模拟数据: {} (JSON: {})", weight, json_data); - if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text(json_data)).await { - eprintln!("发送数据错误: {}", e); - break; + match port.read(&mut buf) { + Ok(bytes_read) if bytes_read > 0 => { + // 将读取的数据转换为字符串并添加到累积的数据中 + if let Ok(data) = String::from_utf8(buf[..bytes_read].to_vec()) { + accumulated_data.push_str(&data); + + // 检查是否有完整的行 + if accumulated_data.contains('\n') { + // 处理所有完整的行 + for line in accumulated_data.lines() { + let trimmed_data = line.trim(); + + if !trimmed_data.is_empty() { + println!("收到串口数据: {}", trimmed_data); + + // 更新内存中的数据(直接覆盖) + let mut weight_data = WEIGHT_DATA.lock().unwrap(); + weight_data.weight = trimmed_data.to_string(); + weight_data.timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + weight_data.is_read = false; // 新数据未读 + } + } + + // 清除已处理的数据 + accumulated_data.clear(); + } + } + } + Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { + // 超时是正常的,继续尝试读取 + continue; } - println!("数据发送成功"); + Err(e) => { + eprintln!("串口读取错误: {}", e); + return Err(e.into()); + } + _ => {} } - - // 等待一小段时间 - tokio::time::sleep(Duration::from_secs(1)).await; } - - Ok(()) } #[actix_web::main] async fn main() -> Result<()> { - // 检查命令行参数 - let args: Vec = std::env::args().collect(); - if args.len() > 1 { - match args[1].as_str() { - "--test-client" => return test_websocket_client().await, - "--simulate" => return simulate_serial_data().await, - _ => {} - } - } - println!("程序启动..."); // 启动HTTP服务器 @@ -200,175 +156,23 @@ async fn main() -> Result<()> { App::new() .wrap(cors) // 添加CORS中间件 .route("/mac", web::get().to(get_mac)) - .route("/scale", web::get().to(get_scale)) // 新增的路由 + .route("/scale", web::get().to(get_scale)) + .route("/weight", web::get().to(get_weight)) }) .bind(("0.0.0.0", 8080))? .run(); println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080"); - // 在新线程中运行WebSocket和串口服务 - let _websocket_handle = tokio::spawn(async { - if let Err(e) = run_websocket_and_serial().await { - eprintln!("WebSocket/串口服务错误: {}", e); + // 在新线程中运行串口服务 + let _serial_handle = tokio::spawn(async { + if let Err(e) = run_serial().await { + eprintln!("串口服务错误: {}", e); } }); // 等待HTTP服务器结束 server.await?; - - Ok(()) -} - -// WebSocket和串口处理函数 -async fn run_websocket_and_serial() -> Result<()> { - // 读取配置 - let config = read_config()?; - - // 创建一个广播通道,用于向所有WebSocket客户端发送数据 - let (tx, _rx) = broadcast::channel::(100); - - // 启动WebSocket服务器 - let listener = TcpListener::bind("127.0.0.1:8081").await?; - println!("WebSocket服务器已启动,监听在 ws://127.0.0.1:8081"); - - // 在单独的任务中处理串口数据 - let tx_serial = tx.clone(); - let _serial_handle = tokio::spawn(async move { - // 打开配置的串口 - println!("正在尝试打开串口 {}...", config.serial_port); - let mut port = serialport::new(&config.serial_port, config.baud_rate) - .timeout(Duration::from_millis(1000)) - .open() - .with_context(|| format!("无法打开串口 {}", config.serial_port))?; - - println!("成功打开串口 {}!", config.serial_port); - println!("正在读取数据... (按 Ctrl+C 退出)"); - - // 增加缓冲区大小并使用String来累积数据 - let mut serial_buf: Vec = vec![0; 1024]; - let mut accumulated_data = String::new(); - - loop { - match port.read(serial_buf.as_mut_slice()) { - Ok(t) => { - if t > 0 { - // 将新数据添加到累积的字符串中 - if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { - accumulated_data.push_str(&data); - - // 检查是否有完整的数据行(以换行符或回车符结束) - if accumulated_data.contains('\n') || accumulated_data.contains('\r') { - // 处理累积的数据 - let lines: Vec<&str> = accumulated_data - .split(|c| c == '\n' || c == '\r') - .filter(|s| !s.is_empty()) - .collect(); - - for line in lines { - let trimmed_data = line.trim(); - if !trimmed_data.is_empty() { - println!("收到串口数据: {}", trimmed_data); - - // 创建带时间戳的数据结构 - let weight_data = WeightData { - weight: trimmed_data.to_string(), - timestamp: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - }; - - // 序列化数据 - if let Ok(json_data) = serde_json::to_string(&weight_data) { - println!("发送串口数据到WebSocket: {}", json_data); - // 发送数据到所有WebSocket客户端 - if let Err(e) = tx_serial.send(json_data) { - eprintln!("发送数据到广播通道失败: {}", e); - } - } - } - } - - // 清除已处理的数据 - accumulated_data.clear(); - } - } - } - } - Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { - // 超时是正常的,继续尝试读取 - continue; - } - Err(e) => { - eprintln!("串口读取错误: {}", e); - break; - } - } - } - Ok::<_, anyhow::Error>(()) - }); - - // 处理WebSocket连接 - while let Ok((stream, addr)) = listener.accept().await { - println!("新的WebSocket连接: {}", addr); - let ws_stream = accept_async(stream).await.expect("Failed to accept WebSocket"); - let tx = tx.clone(); - - // 为每个WebSocket连接创建一个新任务 - tokio::spawn(async move { - println!("开始处理WebSocket连接: {}", addr); - let (mut ws_sender, mut ws_receiver) = ws_stream.split(); - let mut rx = tx.subscribe(); - - // 创建两个任务:一个用于接收消息,一个用于发送消息 - let mut send_task = tokio::spawn(async move { - println!("[{}] 等待接收广播数据...", addr); - while let Ok(msg) = rx.recv().await { - println!("[{}] 准备发送数据: {}", addr, msg); - if let Err(e) = ws_sender.send(tokio_tungstenite::tungstenite::Message::Text(msg.clone())).await { - eprintln!("[{}] 发送WebSocket消息失败: {}", addr, e); - break; - } - println!("[{}] 成功发送数据", addr); - } - }); - - let tx_clone = tx.clone(); - let mut recv_task = tokio::spawn(async move { - println!("[{}] 开始监听WebSocket消息...", addr); - while let Some(result) = ws_receiver.next().await { - match result { - Ok(msg) => { - if let Ok(text) = msg.into_text() { - println!("[{}] 收到WebSocket消息: {}", addr, text); - // 将收到的消息广播给所有客户端 - if let Err(e) = tx_clone.send(text.clone()) { - eprintln!("[{}] 广播消息失败: {}", addr, e); - } - } - } - Err(e) => { - eprintln!("[{}] WebSocket接收错误: {}", addr, e); - break; - } - } - } - }); - - // 等待任意一个任务结束 - tokio::select! { - _ = (&mut send_task) => { - println!("[{}] 发送任务结束", addr); - } - _ = (&mut recv_task) => { - println!("[{}] 接收任务结束", addr); - } - } - - println!("[{}] WebSocket连接结束", addr); - }); - } Ok(()) }