From 0f469b0a73dac84717261af90517d11cb8ddc58b Mon Sep 17 00:00:00 2001 From: OCEAN <1010331798@qq.com> Date: Tue, 1 Apr 2025 17:30:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 2 +- src/main.rs | 25 +++++++++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/config.json b/config.json index 4cf2c92..3c35469 100644 --- a/config.json +++ b/config.json @@ -8,7 +8,7 @@ "port": 1883, "username": "auseft", "password": "1q2w3E**", - "keep_alive_secs": 5, + "keep_alive_secs": 60, "topic_prefix": "weight/data" } } diff --git a/src/main.rs b/src/main.rs index 66a496c..02064e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use rumqttc::{Client, MqttOptions, QoS}; +use rumqttc::{Client, MqttOptions, QoS, Transport}; use serde::{Serialize, Deserialize}; use serde_json; use std::io::{self, Read}; @@ -138,20 +138,29 @@ async fn run_mqtt_and_serial() -> Result<()> { &config.mqtt.host, config.mqtt.port ); - mqttopts.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive_secs)); + mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒 mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); + mqttopts.set_clean_session(true); + mqttopts.set_transport(Transport::Tcp); + mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小 let (mut client, mut connection) = Client::new(mqttopts, 10); // 在单独的线程中处理 MQTT 连接 thread::spawn(move || { - for notification in connection.iter() { - match notification { - Ok(_) => {} - Err(e) => { - eprintln!("MQTT连接错误: {:?}", e); - break; + loop { // 添加无限循环来保持重连 + for notification in connection.iter() { + match notification { + Ok(_) => {}, + Err(e) => { + eprintln!("MQTT连接错误: {:?}", e); + // 短暂等待后继续尝试重连 + thread::sleep(Duration::from_secs(5)); + break; + } } } + eprintln!("MQTT连接断开,5秒后尝试重连..."); + thread::sleep(Duration::from_secs(5)); } });