From 3d66a87708c41bf781d63ddb367aa9306233626b Mon Sep 17 00:00:00 2001 From: OCEAN <1010331798@qq.com> Date: Tue, 1 Apr 2025 16:41:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 2 +- src/main.rs | 82 +++++++++++++++++++++++++++++++++-------------------- 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/config.json b/config.json index 60fcd65..e7b4b47 100644 --- a/config.json +++ b/config.json @@ -8,7 +8,7 @@ "port": 1883, "username": "admin", "password": "Auseft@2025", - "keep_alive_secs": 5, + "keep_alive_secs": 60, "topic_prefix": "weight/data" } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index eae5e06..6253fcd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -146,24 +146,38 @@ async fn run_mqtt_and_serial() -> Result<()> { Ok(_) => {} } - // 创建 MQTT 客户端 - let mut mqttopts = MqttOptions::new( - &config.mqtt.client_id, - &config.mqtt.host, - config.mqtt.port - ); - mqttopts.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive_secs)); - mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); - let (mut client, mut connection) = Client::new(mqttopts, 10); - - // 在单独的线程中处理 MQTT 连接 + // 创建MQTT连接函数 + let create_mqtt_client = || -> (Client, rumqttc::Connection) { + let mut mqttopts = MqttOptions::new( + &config.mqtt.client_id, + &config.mqtt.host, + config.mqtt.port + ); + mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加保活时间到60秒 + mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); + mqttopts.set_clean_session(true); + Client::new(mqttopts, 10) + }; + + // 初始化MQTT客户端 + let (mut client, mut connection) = create_mqtt_client(); + + // 在单独的线程中处理MQTT连接 + let mqtt_config = config.mqtt.clone(); thread::spawn(move || { - for notification in connection.iter() { - match notification { - Ok(_) => {} - Err(e) => { - eprintln!("MQTT连接错误: {:?}", e); - break; + loop { + for notification in connection.iter() { + match notification { + Ok(_) => {} + Err(e) => { + eprintln!("MQTT连接错误: {:?}, 尝试重新连接...", e); + thread::sleep(Duration::from_secs(5)); // 等待5秒后重试 + // 重新创建连接 + let (new_client, new_connection) = create_mqtt_client(); + client = new_client; + connection = new_connection; + break; + } } } } @@ -187,13 +201,10 @@ async fn run_mqtt_and_serial() -> Result<()> { match port.read(serial_buf.as_mut_slice()) { Ok(t) => { if t > 0 { - // 将新数据添加到累积的字符串中 if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { accumulated_data.push_str(&data); - // 检查是否有完整的数据行(以换行符或回车符结束) if accumulated_data.contains('\n') || accumulated_data.contains('\r') { - // 处理累积的数据 let lines: Vec<&str> = accumulated_data .split(|c| c == '\n' || c == '\r') .filter(|s| !s.is_empty()) @@ -204,7 +215,6 @@ async fn run_mqtt_and_serial() -> Result<()> { if !trimmed_data.is_empty() { println!("收到完整数据: {}", trimmed_data); - // 创建权重数据结构 let weight_data = WeightData { weight: trimmed_data.to_string(), timestamp: std::time::SystemTime::now() @@ -213,31 +223,41 @@ async fn run_mqtt_and_serial() -> Result<()> { .as_secs(), }; - // 序列化数据 let json_data = serde_json::to_string(&weight_data)?; println!("当前JSON数据: {}", json_data); - // 获取MAC地址 let mac_address = get_mac_address(); println!("MAC地址: {}", mac_address); - // 发布到 MQTT,主题中包含MAC地址 - let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address); - if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) { - eprintln!("MQTT发布错误: {:?}", e); - } else { - println!("成功发送数据到MQTT"); + + let topic = format!("{}/{}", mqtt_config.topic_prefix, mac_address); + let mut retry_count = 0; + let max_retries = 3; + + // 添加重试机制 + while retry_count < max_retries { + match client.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) { + Ok(_) => { + println!("成功发送数据到MQTT"); + break; + } + Err(e) => { + eprintln!("MQTT发布错误 (尝试 {}/{}): {:?}", retry_count + 1, max_retries, e); + retry_count += 1; + if retry_count < max_retries { + thread::sleep(Duration::from_secs(1)); + } + } + } } } } - // 清空累积的数据 accumulated_data.clear(); } } } } Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { - // Timeout is not an error, just continue continue; } Err(e) => {