From 3fdfc038c9edb5b7a5207e65e900acb9c5abd8a8 Mon Sep 17 00:00:00 2001 From: OCEAN <1010331798@qq.com> Date: Tue, 1 Apr 2025 17:35:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=98=E5=8E=9F=E6=88=90=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.rs | 132 ++++++++++++++++++++-------------------------------- 1 file changed, 51 insertions(+), 81 deletions(-) diff --git a/src/main.rs b/src/main.rs index 49fb6b9..02064e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,6 @@ use serialport; use actix_web::{web, App, HttpServer, HttpResponse}; use actix_cors::Cors; use std::fs; -use std::sync::{Arc, Mutex}; #[derive(Serialize, Deserialize)] struct WeightData { @@ -27,7 +26,7 @@ struct ScaleTypeResponse { scale_type: String, } -#[derive(Deserialize, Clone)] +#[derive(Deserialize)] struct MqttConfig { client_id: String, host: String, @@ -76,21 +75,13 @@ async fn get_mac() -> HttpResponse { let response = MacResponse { mac_address: mac, }; - // 在新线程中运行MQTT和串口服 - let _mqtt_handle = tokio::spawn(async { - if let Err(e) = run_mqtt_and_serial().await { - eprintln!("MQTT/串口服务错误: {}", e); - } - }); - - HttpResponse::Ok().json(response) } // 新增的HTTP处理函数 async fn get_scale() -> HttpResponse { let scale_type = get_scale_type(); - println!("HTTP请求:获取天平类= {}", scale_type); + println!("HTTP请求:获取天平类型 = {}", scale_type); let response = ScaleTypeResponse { scale_type, }; @@ -101,93 +92,79 @@ async fn get_scale() -> HttpResponse { async fn main() -> Result<()> { println!("程序启动..."); - // 启动HTTP服务 - println!("正在启动HTTP服务.."); + // 启动HTTP服务器 + println!("正在启动HTTP服务器..."); - // 创建HTTP服务 + // 创建HTTP服务器 let server = HttpServer::new(|| { // 配置CORS let cors = Cors::default() - .allow_any_origin() // 允许所有来 + .allow_any_origin() // 允许所有来源 .allow_any_method() // 允许所有HTTP方法 .allow_any_header() // 允许所有请求头 .max_age(3600); // 设置预检请求的缓存时间(秒) App::new() - .wrap(cors) // 添加CORS中间 + .wrap(cors) // 添加CORS中间件 .route("/mac", web::get().to(get_mac)) - .route("/scale", web::get().to(get_scale)) // 新增的路 + .route("/scale", web::get().to(get_scale)) // 新增的路由 }) .bind(("0.0.0.0", 8080))? .run(); println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080"); - - // 等待HTTP服务器结 + // 在新线程中运行MQTT和串口服务 + let mqtt_handle = tokio::spawn(async { + if let Err(e) = run_mqtt_and_serial().await { + eprintln!("MQTT/串口服务错误: {}", e); + } + }); + + // 等待HTTP服务器结束 server.await?; Ok(()) } -// MQTT和串口处理函 +// MQTT和串口处理函数 async fn run_mqtt_and_serial() -> Result<()> { // 读取配置 let config = read_config()?; - // 检查串口是否被占用 - println!("正在检查串{} 是否可用...", config.serial_port); - match serialport::new(&config.serial_port, config.baud_rate) - .timeout(Duration::from_millis(10)) - .open() { - Err(_e) => { - eprintln!("串口 {} 已被占用", config.serial_port); - return Ok(()); - }, - Ok(_) => {} - } - - let mqtt_config = config.mqtt.clone(); - - // 创建MQTT连接函数 - let create_mqtt_client = move || -> (Client, rumqttc::Connection) { - let mut mqttopts = MqttOptions::new( - &mqtt_config.client_id, - &mqtt_config.host, - mqtt_config.port - ); - mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间0 - mqttopts.set_credentials(&mqtt_config.username, &mqtt_config.password); - - // 初始化MQTT客户 - let (client, mut connection) = create_mqtt_client(); - let client = Arc::new(Mutex::new(client)); - let client_clone = client.clone(); - - // 在单独的线程中处理MQTT连接 + // 创建 MQTT 客户端 + let mut mqttopts = MqttOptions::new( + &config.mqtt.client_id, + &config.mqtt.host, + config.mqtt.port + ); + mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒 + mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); + mqttopts.set_clean_session(true); + mqttopts.set_transport(Transport::Tcp); + mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小 + let (mut client, mut connection) = Client::new(mqttopts, 10); + + // 在单独的线程中处理 MQTT 连接 thread::spawn(move || { - loop { // 添加无限循环来保持重 + loop { // 添加无限循环来保持重连 for notification in connection.iter() { match notification { Ok(_) => {}, Err(e) => { eprintln!("MQTT连接错误: {:?}", e); - // 短暂等待后继续尝试重 + // 短暂等待后继续尝试重连 thread::sleep(Duration::from_secs(5)); - let (new_client, new_connection) = create_mqtt_client(); - *client_clone.lock().unwrap() = new_client; - connection = new_connection; break; } } } - eprintln!("MQTT连接断开秒后尝试重连..."); + eprintln!("MQTT连接断开,5秒后尝试重连..."); thread::sleep(Duration::from_secs(5)); - } } }); - // 打开配置的串 + // 打开配置的串口 println!("正在尝试打开串口 {}...", config.serial_port); let mut port = serialport::new(&config.serial_port, config.baud_rate) .timeout(Duration::from_millis(1000)) @@ -195,9 +172,9 @@ async fn run_mqtt_and_serial() -> Result<()> { .with_context(|| format!("无法打开串口 {}", config.serial_port))?; println!("成功打开串口 {}!", config.serial_port); - println!("正在读取数据... (Ctrl+C 退"); + println!("正在读取数据... (按 Ctrl+C 退出)"); - // 增加缓冲区大小并使用String来累积数 + // 增加缓冲区大小并使用String来累积数据 let mut serial_buf: Vec = vec![0; 1024]; let mut accumulated_data = String::new(); @@ -205,10 +182,13 @@ async fn run_mqtt_and_serial() -> Result<()> { match port.read(serial_buf.as_mut_slice()) { Ok(t) => { if t > 0 { + // 将新数据添加到累积的字符串中 if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { accumulated_data.push_str(&data); + // 检查是否有完整的数据行(以换行符或回车符结束) if accumulated_data.contains('\n') || accumulated_data.contains('\r') { + // 处理累积的数据 let lines: Vec<&str> = accumulated_data .split(|c| c == '\n' || c == '\r') .filter(|s| !s.is_empty()) @@ -219,6 +199,7 @@ async fn run_mqtt_and_serial() -> Result<()> { if !trimmed_data.is_empty() { println!("收到完整数据: {}", trimmed_data); + // 创建权重数据结构 let weight_data = WeightData { weight: trimmed_data.to_string(), timestamp: std::time::SystemTime::now() @@ -227,42 +208,31 @@ async fn run_mqtt_and_serial() -> Result<()> { .as_secs(), }; + // 序列化数据 let json_data = serde_json::to_string(&weight_data)?; println!("当前JSON数据: {}", json_data); + // 获取MAC地址 let mac_address = get_mac_address(); println!("MAC地址: {}", mac_address); - - let topic = format!("{}/{}", mqtt_config.topic_prefix, mac_address); - let mut retry_count = 0; - let max_retries = 3; - - // 添加重试机制 - while retry_count < max_retries { - let mut client_guard = client.lock().unwrap(); - match client_guard.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) { - Ok(_) => { - println!("成功发送数据到MQTT"); - break; - } - Err(e) => { - eprintln!("MQTT发布错误 (尝试 {}/{}): {:?}", retry_count + 1, max_retries, e); - retry_count += 1; - if retry_count < max_retries { - thread::sleep(Duration::from_secs(1)); - } - } - } + // 发布到 MQTT,主题中包含MAC地址 + let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address); + if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) { + eprintln!("MQTT发布错误: {:?}", e); + } else { + println!("成功发送数据到MQTT"); } } } + // 清空累积的数据 accumulated_data.clear(); } } } } Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { + // Timeout is not an error, just continue continue; } Err(e) => {