Browse Source

优化mqtt

dev
OCEAN 3 months ago
parent
commit
0f469b0a73
2 changed files with 18 additions and 9 deletions
  1. +1
    -1
      config.json
  2. +17
    -8
      src/main.rs

+ 1
- 1
config.json View File

@@ -8,7 +8,7 @@
"port": 1883,
"username": "auseft",
"password": "1q2w3E**",
"keep_alive_secs": 5,
"keep_alive_secs": 60,
"topic_prefix": "weight/data"
}
}

+ 17
- 8
src/main.rs View File

@@ -1,5 +1,5 @@
use anyhow::{Context, Result};
use rumqttc::{Client, MqttOptions, QoS};
use rumqttc::{Client, MqttOptions, QoS, Transport};
use serde::{Serialize, Deserialize};
use serde_json;
use std::io::{self, Read};
@@ -138,20 +138,29 @@ async fn run_mqtt_and_serial() -> Result<()> {
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive_secs));
mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_clean_session(true);
mqttopts.set_transport(Transport::Tcp);
mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小
let (mut client, mut connection) = Client::new(mqttopts, 10);

// 在单独的线程中处理 MQTT 连接
thread::spawn(move || {
for notification in connection.iter() {
match notification {
Ok(_) => {}
Err(e) => {
eprintln!("MQTT连接错误: {:?}", e);
break;
loop { // 添加无限循环来保持重连
for notification in connection.iter() {
match notification {
Ok(_) => {},
Err(e) => {
eprintln!("MQTT连接错误: {:?}", e);
// 短暂等待后继续尝试重连
thread::sleep(Duration::from_secs(5));
break;
}
}
}
eprintln!("MQTT连接断开,5秒后尝试重连...");
thread::sleep(Duration::from_secs(5));
}
});



Loading…
Cancel
Save