瀏覽代碼

优化mqtt

dev
OCEAN 3 月之前
父節點
當前提交
3d66a87708
共有 2 個檔案被更改,包括 52 行新增32 行删除
  1. +1
    -1
      config.json
  2. +51
    -31
      src/main.rs

+ 1
- 1
config.json 查看文件

@@ -8,7 +8,7 @@
"port": 1883, "port": 1883,
"username": "admin", "username": "admin",
"password": "Auseft@2025", "password": "Auseft@2025",
"keep_alive_secs": 5,
"keep_alive_secs": 60,
"topic_prefix": "weight/data" "topic_prefix": "weight/data"
} }
} }

+ 51
- 31
src/main.rs 查看文件

@@ -146,24 +146,38 @@ async fn run_mqtt_and_serial() -> Result<()> {
Ok(_) => {} Ok(_) => {}
} }
// 创建 MQTT 客户端
let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive_secs));
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
let (mut client, mut connection) = Client::new(mqttopts, 10);

// 在单独的线程中处理 MQTT 连接
// 创建MQTT连接函数
let create_mqtt_client = || -> (Client, rumqttc::Connection) {
let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加保活时间到60秒
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_clean_session(true);
Client::new(mqttopts, 10)
};

// 初始化MQTT客户端
let (mut client, mut connection) = create_mqtt_client();

// 在单独的线程中处理MQTT连接
let mqtt_config = config.mqtt.clone();
thread::spawn(move || { thread::spawn(move || {
for notification in connection.iter() {
match notification {
Ok(_) => {}
Err(e) => {
eprintln!("MQTT连接错误: {:?}", e);
break;
loop {
for notification in connection.iter() {
match notification {
Ok(_) => {}
Err(e) => {
eprintln!("MQTT连接错误: {:?}, 尝试重新连接...", e);
thread::sleep(Duration::from_secs(5)); // 等待5秒后重试
// 重新创建连接
let (new_client, new_connection) = create_mqtt_client();
client = new_client;
connection = new_connection;
break;
}
} }
} }
} }
@@ -187,13 +201,10 @@ async fn run_mqtt_and_serial() -> Result<()> {
match port.read(serial_buf.as_mut_slice()) { match port.read(serial_buf.as_mut_slice()) {
Ok(t) => { Ok(t) => {
if t > 0 { if t > 0 {
// 将新数据添加到累积的字符串中
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
accumulated_data.push_str(&data); accumulated_data.push_str(&data);
// 检查是否有完整的数据行(以换行符或回车符结束)
if accumulated_data.contains('\n') || accumulated_data.contains('\r') { if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
// 处理累积的数据
let lines: Vec<&str> = accumulated_data let lines: Vec<&str> = accumulated_data
.split(|c| c == '\n' || c == '\r') .split(|c| c == '\n' || c == '\r')
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
@@ -204,7 +215,6 @@ async fn run_mqtt_and_serial() -> Result<()> {
if !trimmed_data.is_empty() { if !trimmed_data.is_empty() {
println!("收到完整数据: {}", trimmed_data); println!("收到完整数据: {}", trimmed_data);
// 创建权重数据结构
let weight_data = WeightData { let weight_data = WeightData {
weight: trimmed_data.to_string(), weight: trimmed_data.to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()
@@ -213,31 +223,41 @@ async fn run_mqtt_and_serial() -> Result<()> {
.as_secs(), .as_secs(),
}; };


// 序列化数据
let json_data = serde_json::to_string(&weight_data)?; let json_data = serde_json::to_string(&weight_data)?;
println!("当前JSON数据: {}", json_data); println!("当前JSON数据: {}", json_data);
// 获取MAC地址
let mac_address = get_mac_address(); let mac_address = get_mac_address();
println!("MAC地址: {}", mac_address); println!("MAC地址: {}", mac_address);
// 发布到 MQTT,主题中包含MAC地址
let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
eprintln!("MQTT发布错误: {:?}", e);
} else {
println!("成功发送数据到MQTT");
let topic = format!("{}/{}", mqtt_config.topic_prefix, mac_address);
let mut retry_count = 0;
let max_retries = 3;
// 添加重试机制
while retry_count < max_retries {
match client.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) {
Ok(_) => {
println!("成功发送数据到MQTT");
break;
}
Err(e) => {
eprintln!("MQTT发布错误 (尝试 {}/{}): {:?}", retry_count + 1, max_retries, e);
retry_count += 1;
if retry_count < max_retries {
thread::sleep(Duration::from_secs(1));
}
}
}
} }
} }
} }
// 清空累积的数据
accumulated_data.clear(); accumulated_data.clear();
} }
} }
} }
} }
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// Timeout is not an error, just continue
continue; continue;
} }
Err(e) => { Err(e) => {


Loading…
取消
儲存