@@ -64,7 +64,7 @@ dependencies = [ | |||||
"mime", | "mime", | ||||
"percent-encoding", | "percent-encoding", | ||||
"pin-project-lite", | "pin-project-lite", | ||||
"rand", | |||||
"rand 0.9.0", | |||||
"sha1", | "sha1", | ||||
"smallvec", | "smallvec", | ||||
"tokio", | "tokio", | ||||
@@ -326,6 +326,12 @@ version = "3.17.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" | checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" | ||||
[[package]] | |||||
name = "byteorder" | |||||
version = "1.5.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" | |||||
[[package]] | [[package]] | ||||
name = "bytes" | name = "bytes" | ||||
version = "1.10.1" | version = "1.10.1" | ||||
@@ -429,6 +435,12 @@ dependencies = [ | |||||
"typenum", | "typenum", | ||||
] | ] | ||||
[[package]] | |||||
name = "data-encoding" | |||||
version = "2.8.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010" | |||||
[[package]] | [[package]] | ||||
name = "deranged" | name = "deranged" | ||||
version = "0.4.1" | version = "0.4.1" | ||||
@@ -534,17 +546,6 @@ dependencies = [ | |||||
"miniz_oxide", | "miniz_oxide", | ||||
] | ] | ||||
[[package]] | |||||
name = "flume" | |||||
version = "0.11.1" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" | |||||
dependencies = [ | |||||
"futures-core", | |||||
"futures-sink", | |||||
"spin", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "fnv" | name = "fnv" | ||||
version = "1.0.7" | version = "1.0.7" | ||||
@@ -602,6 +603,17 @@ version = "0.3.31" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" | checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" | ||||
[[package]] | |||||
name = "futures-macro" | |||||
version = "0.3.31" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" | |||||
dependencies = [ | |||||
"proc-macro2", | |||||
"quote", | |||||
"syn", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "futures-sink" | name = "futures-sink" | ||||
version = "0.3.31" | version = "0.3.31" | ||||
@@ -622,6 +634,8 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" | |||||
dependencies = [ | dependencies = [ | ||||
"futures-core", | "futures-core", | ||||
"futures-io", | "futures-io", | ||||
"futures-macro", | |||||
"futures-sink", | |||||
"futures-task", | "futures-task", | ||||
"memchr", | "memchr", | ||||
"pin-project-lite", | "pin-project-lite", | ||||
@@ -1258,17 +1272,38 @@ version = "5.2.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" | checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" | ||||
[[package]] | |||||
name = "rand" | |||||
version = "0.8.5" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" | |||||
dependencies = [ | |||||
"libc", | |||||
"rand_chacha 0.3.1", | |||||
"rand_core 0.6.4", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "rand" | name = "rand" | ||||
version = "0.9.0" | version = "0.9.0" | ||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" | checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" | ||||
dependencies = [ | dependencies = [ | ||||
"rand_chacha", | |||||
"rand_core", | |||||
"rand_chacha 0.9.0", | |||||
"rand_core 0.9.3", | |||||
"zerocopy", | "zerocopy", | ||||
] | ] | ||||
[[package]] | |||||
name = "rand_chacha" | |||||
version = "0.3.1" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" | |||||
dependencies = [ | |||||
"ppv-lite86", | |||||
"rand_core 0.6.4", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "rand_chacha" | name = "rand_chacha" | ||||
version = "0.9.0" | version = "0.9.0" | ||||
@@ -1276,7 +1311,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" | checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" | ||||
dependencies = [ | dependencies = [ | ||||
"ppv-lite86", | "ppv-lite86", | ||||
"rand_core", | |||||
"rand_core 0.9.3", | |||||
] | |||||
[[package]] | |||||
name = "rand_core" | |||||
version = "0.6.4" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" | |||||
dependencies = [ | |||||
"getrandom 0.2.15", | |||||
] | ] | ||||
[[package]] | [[package]] | ||||
@@ -1372,38 +1416,6 @@ dependencies = [ | |||||
"winreg", | "winreg", | ||||
] | ] | ||||
[[package]] | |||||
name = "ring" | |||||
version = "0.17.14" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" | |||||
dependencies = [ | |||||
"cc", | |||||
"cfg-if", | |||||
"getrandom 0.2.15", | |||||
"libc", | |||||
"untrusted", | |||||
"windows-sys 0.52.0", | |||||
] | |||||
[[package]] | |||||
name = "rumqttc" | |||||
version = "0.23.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8" | |||||
dependencies = [ | |||||
"bytes", | |||||
"flume", | |||||
"futures-util", | |||||
"log", | |||||
"rustls-native-certs", | |||||
"rustls-pemfile", | |||||
"rustls-webpki", | |||||
"thiserror", | |||||
"tokio", | |||||
"tokio-rustls", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "rustc-demangle" | name = "rustc-demangle" | ||||
version = "0.1.24" | version = "0.1.24" | ||||
@@ -1432,30 +1444,6 @@ dependencies = [ | |||||
"windows-sys 0.59.0", | "windows-sys 0.59.0", | ||||
] | ] | ||||
[[package]] | |||||
name = "rustls" | |||||
version = "0.21.12" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" | |||||
dependencies = [ | |||||
"log", | |||||
"ring", | |||||
"rustls-webpki", | |||||
"sct", | |||||
] | |||||
[[package]] | |||||
name = "rustls-native-certs" | |||||
version = "0.6.3" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" | |||||
dependencies = [ | |||||
"openssl-probe", | |||||
"rustls-pemfile", | |||||
"schannel", | |||||
"security-framework", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "rustls-pemfile" | name = "rustls-pemfile" | ||||
version = "1.0.4" | version = "1.0.4" | ||||
@@ -1465,16 +1453,6 @@ dependencies = [ | |||||
"base64 0.21.7", | "base64 0.21.7", | ||||
] | ] | ||||
[[package]] | |||||
name = "rustls-webpki" | |||||
version = "0.101.7" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" | |||||
dependencies = [ | |||||
"ring", | |||||
"untrusted", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "rustversion" | name = "rustversion" | ||||
version = "1.0.20" | version = "1.0.20" | ||||
@@ -1502,16 +1480,6 @@ version = "1.2.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" | checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" | ||||
[[package]] | |||||
name = "sct" | |||||
version = "0.7.1" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" | |||||
dependencies = [ | |||||
"ring", | |||||
"untrusted", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "security-framework" | name = "security-framework" | ||||
version = "2.11.1" | version = "2.11.1" | ||||
@@ -1655,15 +1623,6 @@ dependencies = [ | |||||
"windows-sys 0.52.0", | "windows-sys 0.52.0", | ||||
] | ] | ||||
[[package]] | |||||
name = "spin" | |||||
version = "0.9.8" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" | |||||
dependencies = [ | |||||
"lock_api", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "stable_deref_trait" | name = "stable_deref_trait" | ||||
version = "1.2.0" | version = "1.2.0" | ||||
@@ -1833,13 +1792,15 @@ dependencies = [ | |||||
] | ] | ||||
[[package]] | [[package]] | ||||
name = "tokio-rustls" | |||||
version = "0.24.1" | |||||
name = "tokio-tungstenite" | |||||
version = "0.20.1" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" | |||||
checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" | |||||
dependencies = [ | dependencies = [ | ||||
"rustls", | |||||
"futures-util", | |||||
"log", | |||||
"tokio", | "tokio", | ||||
"tungstenite", | |||||
] | ] | ||||
[[package]] | [[package]] | ||||
@@ -1899,6 +1860,25 @@ version = "0.2.5" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" | checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" | ||||
[[package]] | |||||
name = "tungstenite" | |||||
version = "0.20.1" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" | |||||
dependencies = [ | |||||
"byteorder", | |||||
"bytes", | |||||
"data-encoding", | |||||
"http", | |||||
"httparse", | |||||
"log", | |||||
"rand 0.8.5", | |||||
"sha1", | |||||
"thiserror", | |||||
"url", | |||||
"utf-8", | |||||
] | |||||
[[package]] | [[package]] | ||||
name = "typenum" | name = "typenum" | ||||
version = "1.18.0" | version = "1.18.0" | ||||
@@ -1926,12 +1906,6 @@ version = "0.2.6" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" | checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" | ||||
[[package]] | |||||
name = "untrusted" | |||||
version = "0.9.0" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" | |||||
[[package]] | [[package]] | ||||
name = "url" | name = "url" | ||||
version = "2.5.4" | version = "2.5.4" | ||||
@@ -1943,6 +1917,12 @@ dependencies = [ | |||||
"percent-encoding", | "percent-encoding", | ||||
] | ] | ||||
[[package]] | |||||
name = "utf-8" | |||||
version = "0.7.6" | |||||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" | |||||
[[package]] | [[package]] | ||||
name = "utf16_iter" | name = "utf16_iter" | ||||
version = "1.0.5" | version = "1.0.5" | ||||
@@ -2079,12 +2059,13 @@ dependencies = [ | |||||
"actix-cors", | "actix-cors", | ||||
"actix-web", | "actix-web", | ||||
"anyhow", | "anyhow", | ||||
"futures-util", | |||||
"reqwest", | "reqwest", | ||||
"rumqttc", | |||||
"serde", | "serde", | ||||
"serde_json", | "serde_json", | ||||
"serialport", | "serialport", | ||||
"tokio", | "tokio", | ||||
"tokio-tungstenite", | |||||
"winapi", | "winapi", | ||||
] | ] | ||||
@@ -6,13 +6,14 @@ edition = "2021" | |||||
[dependencies] | [dependencies] | ||||
serialport = "4.2.2" | serialport = "4.2.2" | ||||
anyhow = "1.0" | anyhow = "1.0" | ||||
rumqttc = "0.23" | |||||
serde = { version = "1.0", features = ["derive"] } | serde = { version = "1.0", features = ["derive"] } | ||||
serde_json = "1.0" | serde_json = "1.0" | ||||
actix-web = "4.4" | actix-web = "4.4" | ||||
actix-cors = "0.6" | actix-cors = "0.6" | ||||
tokio = { version = "1.0", features = ["full"] } | tokio = { version = "1.0", features = ["full"] } | ||||
reqwest = { version = "0.11", features = ["blocking", "json"] } | reqwest = { version = "0.11", features = ["blocking", "json"] } | ||||
tokio-tungstenite = "0.20" | |||||
futures-util = "0.3" | |||||
[profile.release] | [profile.release] | ||||
opt-level = 3 | opt-level = 3 | ||||
@@ -1,14 +1,5 @@ | |||||
{ | { | ||||
"scale_type": "001", | "scale_type": "001", | ||||
"serial_port": "com1", | "serial_port": "com1", | ||||
"baud_rate": 9600, | |||||
"mqtt": { | |||||
"client_id": "weight_reader", | |||||
"host": "192.168.0.100", | |||||
"port": 1883, | |||||
"username": "admin", | |||||
"password": "Auseft@2025", | |||||
"keep_alive_secs": 60, | |||||
"topic_prefix": "weight/data" | |||||
} | |||||
"baud_rate": 9600 | |||||
} | } |
@@ -1,14 +1,16 @@ | |||||
use anyhow::{Context, Result}; | use anyhow::{Context, Result}; | ||||
use rumqttc::{Client, MqttOptions, QoS, Transport}; | |||||
use serde::{Serialize, Deserialize}; | use serde::{Serialize, Deserialize}; | ||||
use serde_json; | use serde_json; | ||||
use std::io::{self, Read}; | use std::io::{self, Read}; | ||||
use std::time::Duration; | use std::time::Duration; | ||||
use std::thread; | |||||
use serialport; | use serialport; | ||||
use actix_web::{web, App, HttpServer, HttpResponse}; | use actix_web::{web, App, HttpServer, HttpResponse}; | ||||
use actix_cors::Cors; | use actix_cors::Cors; | ||||
use std::fs; | use std::fs; | ||||
use futures_util::{SinkExt, StreamExt}; | |||||
use tokio::net::TcpListener; | |||||
use tokio_tungstenite::accept_async; | |||||
use tokio::sync::broadcast; | |||||
#[derive(Serialize, Deserialize)] | #[derive(Serialize, Deserialize)] | ||||
struct WeightData { | struct WeightData { | ||||
@@ -26,23 +28,11 @@ struct ScaleTypeResponse { | |||||
scale_type: String, | scale_type: String, | ||||
} | } | ||||
#[derive(Deserialize)] | |||||
struct MqttConfig { | |||||
client_id: String, | |||||
host: String, | |||||
port: u16, | |||||
username: String, | |||||
password: String, | |||||
keep_alive_secs: u64, | |||||
topic_prefix: String, | |||||
} | |||||
#[derive(Deserialize)] | #[derive(Deserialize)] | ||||
struct Config { | struct Config { | ||||
scale_type: String, | scale_type: String, | ||||
serial_port: String, | serial_port: String, | ||||
baud_rate: u32, | baud_rate: u32, | ||||
mqtt: MqttConfig, | |||||
} | } | ||||
fn read_config() -> Result<Config> { | fn read_config() -> Result<Config> { | ||||
@@ -88,8 +78,111 @@ async fn get_scale() -> HttpResponse { | |||||
HttpResponse::Ok().json(response) | HttpResponse::Ok().json(response) | ||||
} | } | ||||
// WebSocket测试客户端函数 | |||||
async fn test_websocket_client() -> Result<()> { | |||||
println!("开始测试WebSocket客户端..."); | |||||
// 连接到WebSocket服务器 | |||||
println!("正在连接到WebSocket服务器..."); | |||||
let (ws_stream, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:8081") | |||||
.await | |||||
.context("无法连接到WebSocket服务器")?; | |||||
println!("已成功连接到WebSocket服务器"); | |||||
let (_, mut read) = ws_stream.split(); | |||||
// 持续接收消息 | |||||
println!("等待接收数据..."); | |||||
while let Some(message) = read.next().await { | |||||
match message { | |||||
Ok(msg) => { | |||||
if let Ok(text) = msg.into_text() { | |||||
println!("收到原始数据: {}", text); | |||||
// 尝试解析JSON数据 | |||||
match serde_json::from_str::<WeightData>(&text) { | |||||
Ok(weight_data) => { | |||||
println!("收到重量数据: {} (时间戳: {})", weight_data.weight, weight_data.timestamp); | |||||
} | |||||
Err(e) => { | |||||
println!("JSON解析错误: {}", e); | |||||
println!("原始数据: {}", text); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
Err(e) => { | |||||
eprintln!("WebSocket接收错误: {}", e); | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
Ok(()) | |||||
} | |||||
// 模拟串口数据发送 | |||||
async fn simulate_serial_data() -> Result<()> { | |||||
println!("开始模拟串口数据发送..."); | |||||
// 创建一个TCP连接到WebSocket服务器 | |||||
println!("正在连接到WebSocket服务器..."); | |||||
let (ws_stream, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:8081") | |||||
.await | |||||
.context("无法连接到WebSocket服务器")?; | |||||
println!("已成功连接到WebSocket服务器"); | |||||
let (mut write, _) = ws_stream.split(); | |||||
// 模拟每秒发送一个随机重量数据 | |||||
let mut interval = tokio::time::interval(Duration::from_secs(1)); | |||||
let mut counter = 0; | |||||
println!("开始发送模拟数据 (按Ctrl+C退出)..."); | |||||
loop { | |||||
interval.tick().await; | |||||
counter += 1; | |||||
// 生成一个模拟的重量值(10.5 - 20.5之间的随机数) | |||||
let weight = format!("{:.1}", 10.5 + (counter % 10) as f32); | |||||
// 创建数据结构 | |||||
let weight_data = WeightData { | |||||
weight: weight.clone(), | |||||
timestamp: std::time::SystemTime::now() | |||||
.duration_since(std::time::UNIX_EPOCH) | |||||
.unwrap() | |||||
.as_secs(), | |||||
}; | |||||
// 序列化并发送数据 | |||||
if let Ok(json_data) = serde_json::to_string(&weight_data) { | |||||
println!("发送模拟数据: {} (JSON: {})", weight, json_data); | |||||
if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text(json_data)).await { | |||||
eprintln!("发送数据错误: {}", e); | |||||
break; | |||||
} | |||||
println!("数据发送成功"); | |||||
} | |||||
// 等待一小段时间 | |||||
tokio::time::sleep(Duration::from_secs(1)).await; | |||||
} | |||||
Ok(()) | |||||
} | |||||
#[actix_web::main] | #[actix_web::main] | ||||
async fn main() -> Result<()> { | async fn main() -> Result<()> { | ||||
// 检查命令行参数 | |||||
let args: Vec<String> = std::env::args().collect(); | |||||
if args.len() > 1 { | |||||
match args[1].as_str() { | |||||
"--test-client" => return test_websocket_client().await, | |||||
"--simulate" => return simulate_serial_data().await, | |||||
_ => {} | |||||
} | |||||
} | |||||
println!("程序启动..."); | println!("程序启动..."); | ||||
// 启动HTTP服务器 | // 启动HTTP服务器 | ||||
@@ -114,10 +207,10 @@ async fn main() -> Result<()> { | |||||
println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080"); | println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080"); | ||||
// 在新线程中运行MQTT和串口服务 | |||||
let mqtt_handle = tokio::spawn(async { | |||||
if let Err(e) = run_mqtt_and_serial().await { | |||||
eprintln!("MQTT/串口服务错误: {}", e); | |||||
// 在新线程中运行WebSocket和串口服务 | |||||
let _websocket_handle = tokio::spawn(async { | |||||
if let Err(e) = run_websocket_and_serial().await { | |||||
eprintln!("WebSocket/串口服务错误: {}", e); | |||||
} | } | ||||
}); | }); | ||||
@@ -127,119 +220,154 @@ async fn main() -> Result<()> { | |||||
Ok(()) | Ok(()) | ||||
} | } | ||||
// MQTT和串口处理函数 | |||||
async fn run_mqtt_and_serial() -> Result<()> { | |||||
// WebSocket和串口处理函数 | |||||
async fn run_websocket_and_serial() -> Result<()> { | |||||
// 读取配置 | // 读取配置 | ||||
let config = read_config()?; | let config = read_config()?; | ||||
// 创建 MQTT 客户端 | |||||
let mut mqttopts = MqttOptions::new( | |||||
&config.mqtt.client_id, | |||||
&config.mqtt.host, | |||||
config.mqtt.port | |||||
); | |||||
mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒 | |||||
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); | |||||
mqttopts.set_clean_session(true); | |||||
mqttopts.set_transport(Transport::Tcp); | |||||
mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小 | |||||
let (mut client, mut connection) = Client::new(mqttopts, 10); | |||||
// 在单独的线程中处理 MQTT 连接 | |||||
thread::spawn(move || { | |||||
loop { // 添加无限循环来保持重连 | |||||
for notification in connection.iter() { | |||||
match notification { | |||||
Ok(_) => {}, | |||||
Err(e) => { | |||||
eprintln!("MQTT连接错误: {:?}", e); | |||||
// 短暂等待后继续尝试重连 | |||||
thread::sleep(Duration::from_secs(5)); | |||||
break; | |||||
// 创建一个广播通道,用于向所有WebSocket客户端发送数据 | |||||
let (tx, _rx) = broadcast::channel::<String>(100); | |||||
// 启动WebSocket服务器 | |||||
let listener = TcpListener::bind("127.0.0.1:8081").await?; | |||||
println!("WebSocket服务器已启动,监听在 ws://127.0.0.1:8081"); | |||||
// 在单独的任务中处理串口数据 | |||||
let tx_serial = tx.clone(); | |||||
let _serial_handle = tokio::spawn(async move { | |||||
// 打开配置的串口 | |||||
println!("正在尝试打开串口 {}...", config.serial_port); | |||||
let mut port = serialport::new(&config.serial_port, config.baud_rate) | |||||
.timeout(Duration::from_millis(1000)) | |||||
.open() | |||||
.with_context(|| format!("无法打开串口 {}", config.serial_port))?; | |||||
println!("成功打开串口 {}!", config.serial_port); | |||||
println!("正在读取数据... (按 Ctrl+C 退出)"); | |||||
// 增加缓冲区大小并使用String来累积数据 | |||||
let mut serial_buf: Vec<u8> = vec![0; 1024]; | |||||
let mut accumulated_data = String::new(); | |||||
loop { | |||||
match port.read(serial_buf.as_mut_slice()) { | |||||
Ok(t) => { | |||||
if t > 0 { | |||||
// 将新数据添加到累积的字符串中 | |||||
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { | |||||
accumulated_data.push_str(&data); | |||||
// 检查是否有完整的数据行(以换行符或回车符结束) | |||||
if accumulated_data.contains('\n') || accumulated_data.contains('\r') { | |||||
// 处理累积的数据 | |||||
let lines: Vec<&str> = accumulated_data | |||||
.split(|c| c == '\n' || c == '\r') | |||||
.filter(|s| !s.is_empty()) | |||||
.collect(); | |||||
for line in lines { | |||||
let trimmed_data = line.trim(); | |||||
if !trimmed_data.is_empty() { | |||||
println!("收到串口数据: {}", trimmed_data); | |||||
// 创建带时间戳的数据结构 | |||||
let weight_data = WeightData { | |||||
weight: trimmed_data.to_string(), | |||||
timestamp: std::time::SystemTime::now() | |||||
.duration_since(std::time::UNIX_EPOCH) | |||||
.unwrap() | |||||
.as_secs(), | |||||
}; | |||||
// 序列化数据 | |||||
if let Ok(json_data) = serde_json::to_string(&weight_data) { | |||||
println!("发送串口数据到WebSocket: {}", json_data); | |||||
// 发送数据到所有WebSocket客户端 | |||||
if let Err(e) = tx_serial.send(json_data) { | |||||
eprintln!("发送数据到广播通道失败: {}", e); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
// 清除已处理的数据 | |||||
accumulated_data.clear(); | |||||
} | |||||
} | |||||
} | } | ||||
} | } | ||||
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { | |||||
// 超时是正常的,继续尝试读取 | |||||
continue; | |||||
} | |||||
Err(e) => { | |||||
eprintln!("串口读取错误: {}", e); | |||||
break; | |||||
} | |||||
} | } | ||||
eprintln!("MQTT连接断开,5秒后尝试重连..."); | |||||
thread::sleep(Duration::from_secs(5)); | |||||
} | } | ||||
Ok::<_, anyhow::Error>(()) | |||||
}); | }); | ||||
// 打开配置的串口 | |||||
println!("正在尝试打开串口 {}...", config.serial_port); | |||||
let mut port = serialport::new(&config.serial_port, config.baud_rate) | |||||
.timeout(Duration::from_millis(1000)) | |||||
.open() | |||||
.with_context(|| format!("无法打开串口 {}", config.serial_port))?; | |||||
// 处理WebSocket连接 | |||||
while let Ok((stream, addr)) = listener.accept().await { | |||||
println!("新的WebSocket连接: {}", addr); | |||||
let ws_stream = accept_async(stream).await.expect("Failed to accept WebSocket"); | |||||
let tx = tx.clone(); | |||||
// 为每个WebSocket连接创建一个新任务 | |||||
tokio::spawn(async move { | |||||
println!("开始处理WebSocket连接: {}", addr); | |||||
let (mut ws_sender, mut ws_receiver) = ws_stream.split(); | |||||
let mut rx = tx.subscribe(); | |||||
println!("成功打开串口 {}!", config.serial_port); | |||||
println!("正在读取数据... (按 Ctrl+C 退出)"); | |||||
// 创建两个任务:一个用于接收消息,一个用于发送消息 | |||||
let mut send_task = tokio::spawn(async move { | |||||
println!("[{}] 等待接收广播数据...", addr); | |||||
while let Ok(msg) = rx.recv().await { | |||||
println!("[{}] 准备发送数据: {}", addr, msg); | |||||
if let Err(e) = ws_sender.send(tokio_tungstenite::tungstenite::Message::Text(msg.clone())).await { | |||||
eprintln!("[{}] 发送WebSocket消息失败: {}", addr, e); | |||||
break; | |||||
} | |||||
println!("[{}] 成功发送数据", addr); | |||||
} | |||||
}); | |||||
// 增加缓冲区大小并使用String来累积数据 | |||||
let mut serial_buf: Vec<u8> = vec![0; 1024]; | |||||
let mut accumulated_data = String::new(); | |||||
loop { | |||||
match port.read(serial_buf.as_mut_slice()) { | |||||
Ok(t) => { | |||||
if t > 0 { | |||||
// 将新数据添加到累积的字符串中 | |||||
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { | |||||
accumulated_data.push_str(&data); | |||||
// 检查是否有完整的数据行(以换行符或回车符结束) | |||||
if accumulated_data.contains('\n') || accumulated_data.contains('\r') { | |||||
// 处理累积的数据 | |||||
let lines: Vec<&str> = accumulated_data | |||||
.split(|c| c == '\n' || c == '\r') | |||||
.filter(|s| !s.is_empty()) | |||||
.collect(); | |||||
for line in lines { | |||||
let trimmed_data = line.trim(); | |||||
if !trimmed_data.is_empty() { | |||||
println!("收到完整数据: {}", trimmed_data); | |||||
// 创建权重数据结构 | |||||
let weight_data = WeightData { | |||||
weight: trimmed_data.to_string(), | |||||
timestamp: std::time::SystemTime::now() | |||||
.duration_since(std::time::UNIX_EPOCH) | |||||
.unwrap() | |||||
.as_secs(), | |||||
}; | |||||
// 序列化数据 | |||||
let json_data = serde_json::to_string(&weight_data)?; | |||||
println!("当前JSON数据: {}", json_data); | |||||
// 获取MAC地址 | |||||
let mac_address = get_mac_address(); | |||||
println!("MAC地址: {}", mac_address); | |||||
// 发布到 MQTT,主题中包含MAC地址 | |||||
let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address); | |||||
if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) { | |||||
eprintln!("MQTT发布错误: {:?}", e); | |||||
} else { | |||||
println!("成功发送数据到MQTT"); | |||||
} | |||||
let tx_clone = tx.clone(); | |||||
let mut recv_task = tokio::spawn(async move { | |||||
println!("[{}] 开始监听WebSocket消息...", addr); | |||||
while let Some(result) = ws_receiver.next().await { | |||||
match result { | |||||
Ok(msg) => { | |||||
if let Ok(text) = msg.into_text() { | |||||
println!("[{}] 收到WebSocket消息: {}", addr, text); | |||||
// 将收到的消息广播给所有客户端 | |||||
if let Err(e) = tx_clone.send(text.clone()) { | |||||
eprintln!("[{}] 广播消息失败: {}", addr, e); | |||||
} | } | ||||
} | } | ||||
// 清空累积的数据 | |||||
accumulated_data.clear(); | |||||
} | |||||
Err(e) => { | |||||
eprintln!("[{}] WebSocket接收错误: {}", addr, e); | |||||
break; | |||||
} | } | ||||
} | } | ||||
} | } | ||||
}); | |||||
// 等待任意一个任务结束 | |||||
tokio::select! { | |||||
_ = (&mut send_task) => { | |||||
println!("[{}] 发送任务结束", addr); | |||||
} | |||||
_ = (&mut recv_task) => { | |||||
println!("[{}] 接收任务结束", addr); | |||||
} | |||||
} | } | ||||
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { | |||||
// Timeout is not an error, just continue | |||||
continue; | |||||
} | |||||
Err(e) => { | |||||
eprintln!("发生错误: {}", e); | |||||
break; | |||||
} | |||||
} | |||||
println!("[{}] WebSocket连接结束", addr); | |||||
}); | |||||
} | } | ||||
Ok(()) | Ok(()) | ||||