Browse Source

优化

dev
OCEAN 3 months ago
parent
commit
c2a749df78
1 changed files with 15 additions and 10 deletions
  1. +15
    -10
      src/main.rs

+ 15
- 10
src/main.rs View File

@@ -9,6 +9,7 @@ use serialport;
use actix_web::{web, App, HttpServer, HttpResponse}; use actix_web::{web, App, HttpServer, HttpResponse};
use actix_cors::Cors; use actix_cors::Cors;
use std::fs; use std::fs;
use std::sync::{Arc, Mutex};


#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct WeightData { struct WeightData {
@@ -26,7 +27,7 @@ struct ScaleTypeResponse {
scale_type: String, scale_type: String,
} }


#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
struct MqttConfig { struct MqttConfig {
client_id: String, client_id: String,
host: String, host: String,
@@ -146,24 +147,27 @@ async fn run_mqtt_and_serial() -> Result<()> {
Ok(_) => {} Ok(_) => {}
} }
let mqtt_config = config.mqtt.clone();
// 创建MQTT连接函数 // 创建MQTT连接函数
let create_mqtt_client = || -> (Client, rumqttc::Connection) {
let create_mqtt_client = move || -> (Client, rumqttc::Connection) {
let mut mqttopts = MqttOptions::new( let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
&mqtt_config.client_id,
&mqtt_config.host,
mqtt_config.port
); );
mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加保活时间到60秒 mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加保活时间到60秒
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_credentials(&mqtt_config.username, &mqtt_config.password);
mqttopts.set_clean_session(true); mqttopts.set_clean_session(true);
Client::new(mqttopts, 10) Client::new(mqttopts, 10)
}; };


// 初始化MQTT客户端 // 初始化MQTT客户端
let (mut client, mut connection) = create_mqtt_client();
let (client, mut connection) = create_mqtt_client();
let client = Arc::new(Mutex::new(client));
let client_clone = client.clone();


// 在单独的线程中处理MQTT连接 // 在单独的线程中处理MQTT连接
let mqtt_config = config.mqtt.clone();
thread::spawn(move || { thread::spawn(move || {
loop { loop {
for notification in connection.iter() { for notification in connection.iter() {
@@ -174,7 +178,7 @@ async fn run_mqtt_and_serial() -> Result<()> {
thread::sleep(Duration::from_secs(5)); // 等待5秒后重试 thread::sleep(Duration::from_secs(5)); // 等待5秒后重试
// 重新创建连接 // 重新创建连接
let (new_client, new_connection) = create_mqtt_client(); let (new_client, new_connection) = create_mqtt_client();
client = new_client;
*client_clone.lock().unwrap() = new_client;
connection = new_connection; connection = new_connection;
break; break;
} }
@@ -235,7 +239,8 @@ async fn run_mqtt_and_serial() -> Result<()> {
// 添加重试机制 // 添加重试机制
while retry_count < max_retries { while retry_count < max_retries {
match client.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) {
let mut client_guard = client.lock().unwrap();
match client_guard.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) {
Ok(_) => { Ok(_) => {
println!("成功发送数据到MQTT"); println!("成功发送数据到MQTT");
break; break;


Loading…
Cancel
Save