소스 검색

还原成 优化mqtt

dev
OCEAN 3 달 전
부모
커밋
3fdfc038c9
1개의 변경된 파일51개의 추가작업 그리고 81개의 파일을 삭제
  1. +51
    -81
      src/main.rs

+ 51
- 81
src/main.rs 파일 보기

@@ -9,7 +9,6 @@ use serialport;
use actix_web::{web, App, HttpServer, HttpResponse}; use actix_web::{web, App, HttpServer, HttpResponse};
use actix_cors::Cors; use actix_cors::Cors;
use std::fs; use std::fs;
use std::sync::{Arc, Mutex};


#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct WeightData { struct WeightData {
@@ -27,7 +26,7 @@ struct ScaleTypeResponse {
scale_type: String, scale_type: String,
} }


#[derive(Deserialize, Clone)]
#[derive(Deserialize)]
struct MqttConfig { struct MqttConfig {
client_id: String, client_id: String,
host: String, host: String,
@@ -76,21 +75,13 @@ async fn get_mac() -> HttpResponse {
let response = MacResponse { let response = MacResponse {
mac_address: mac, mac_address: mac,
}; };
// �冽鰵蝥輻�銝剛�銵愢QTT�䔶葡����
let _mqtt_handle = tokio::spawn(async {
if let Err(e) = run_mqtt_and_serial().await {
eprintln!("MQTT/銝脣藁�滚𦛚�躰秤: {}", e);
}
});
HttpResponse::Ok().json(response) HttpResponse::Ok().json(response)
} }


// �啣���TTP憭���賣㺭 // �啣���TTP憭���賣㺭
async fn get_scale() -> HttpResponse { async fn get_scale() -> HttpResponse {
let scale_type = get_scale_type(); let scale_type = get_scale_type();
println!("HTTP霂瑟�嚗朞繮�硋予撟喟掩�= {}", scale_type);
println!("HTTP霂瑟�嚗朞繮�硋予撟喟掩�� = {}", scale_type);
let response = ScaleTypeResponse { let response = ScaleTypeResponse {
scale_type, scale_type,
}; };
@@ -101,93 +92,79 @@ async fn get_scale() -> HttpResponse {
async fn main() -> Result<()> { async fn main() -> Result<()> {
println!("蝔见��臬𢆡..."); println!("蝔见��臬𢆡...");
// �臬𢆡HTTP�滚𦛚�
println!("甇�銁�臬𢆡HTTP�滚𦛚�..");
// �臬𢆡HTTP�滚𦛚�
println!("甇�銁�臬𢆡HTTP�滚𦛚��...");
// �𥕦遣HTTP�滚𦛚�
// �𥕦遣HTTP�滚𦛚�
let server = HttpServer::new(|| { let server = HttpServer::new(|| {
// �滨蔭CORS // �滨蔭CORS
let cors = Cors::default() let cors = Cors::default()
.allow_any_origin() // ��捂���㗇䔉皞
.allow_any_origin() // ��捂���㗇䔉皞
.allow_any_method() // ��捂���鵎TTP�寞� .allow_any_method() // ��捂���鵎TTP�寞�
.allow_any_header() // ��捂���㕑窈瘙�仍 .allow_any_header() // ��捂���㕑窈瘙�仍
.max_age(3600); // 霈曄蔭憸��霂瑟����摮䀹𧒄�湛�蝘𡜐� .max_age(3600); // 霈曄蔭憸��霂瑟����摮䀹𧒄�湛�蝘𡜐�
App::new() App::new()
.wrap(cors) // 瘛餃�CORS銝剝𡢿隞
.wrap(cors) // 瘛餃�CORS銝剝𡢿隞
.route("/mac", web::get().to(get_mac)) .route("/mac", web::get().to(get_mac))
.route("/scale", web::get().to(get_scale)) // �啣���楝�
.route("/scale", web::get().to(get_scale)) // �啣���楝�
}) })
.bind(("0.0.0.0", 8080))? .bind(("0.0.0.0", 8080))?
.run(); .run();
println!("HTTP�滚𦛚�典歇�臬𢆡嚗𣬚��砍銁 http://127.0.0.1:8080"); println!("HTTP�滚𦛚�典歇�臬𢆡嚗𣬚��砍銁 http://127.0.0.1:8080");
// 蝑匧�HTTP�滚𦛚�函��
// �冽鰵蝥輻�銝剛�銵愢QTT�䔶葡�����
let mqtt_handle = tokio::spawn(async {
if let Err(e) = run_mqtt_and_serial().await {
eprintln!("MQTT/銝脣藁�滚𦛚�躰秤: {}", e);
}
});
// 蝑匧�HTTP�滚𦛚�函���
server.await?; server.await?;
Ok(()) Ok(())
} }


// MQTT�䔶葡�����遆�
// MQTT�䔶葡�����遆�
async fn run_mqtt_and_serial() -> Result<()> { async fn run_mqtt_and_serial() -> Result<()> {
// 霂餃��滨蔭 // 霂餃��滨蔭
let config = read_config()?; let config = read_config()?;
// 璉��乩葡��糓�西◤�删鍂
println!("甇�銁璉��乩葡�{} �臬炏�舐鍂...", config.serial_port);
match serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(10))
.open() {
Err(_e) => {
eprintln!("銝脣藁 {} 撌脰◤�删鍂", config.serial_port);
return Ok(());
},
Ok(_) => {}
}
let mqtt_config = config.mqtt.clone();
// �𥕦遣MQTT餈墧𦻖�賣㺭
let create_mqtt_client = move || -> (Client, rumqttc::Connection) {
let mut mqttopts = MqttOptions::new(
&mqtt_config.client_id,
&mqtt_config.host,
mqtt_config.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 憓𧼮�keep-alive�園𡢿�0蝘
mqttopts.set_credentials(&mqtt_config.username, &mqtt_config.password);

// �嘥��䤼QTT摰X�蝡
let (client, mut connection) = create_mqtt_client();
let client = Arc::new(Mutex::new(client));
let client_clone = client.clone();

// �典��祉�蝥輻�銝剖���QTT餈墧𦻖
// �𥕦遣 MQTT 摰X�蝡�
let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 憓𧼮�keep-alive�園𡢿��60蝘�
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_clean_session(true);
mqttopts.set_transport(Transport::Tcp);
mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 霈曄蔭�煾����交𤣰���憭批�憭批�
let (mut client, mut connection) = Client::new(mqttopts, 10);

// �典��祉�蝥輻�銝剖��� MQTT 餈墧𦻖
thread::spawn(move || { thread::spawn(move || {
loop { // 瘛餃��𣳇�敺芰㴓�乩����餈
loop { // 瘛餃��𣳇�敺芰㴓�乩����餈�
for notification in connection.iter() { for notification in connection.iter() {
match notification { match notification {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(e) => {
eprintln!("MQTT餈墧𦻖�躰秤: {:?}", e); eprintln!("MQTT餈墧𦻖�躰秤: {:?}", e);
// �剜�蝑匧��𡒊誧蝏剖�霂閖�餈
// �剜�蝑匧��𡒊誧蝏剖�霂閖�餈�
thread::sleep(Duration::from_secs(5)); thread::sleep(Duration::from_secs(5));
let (new_client, new_connection) = create_mqtt_client();
*client_clone.lock().unwrap() = new_client;
connection = new_connection;
break; break;
} }
} }
} }
eprintln!("MQTT餈墧𦻖�剖�嚗蝘鍦�撠肽��滩�...");
eprintln!("MQTT餈墧𦻖�剖�嚗�5蝘鍦�撠肽��滩�...");
thread::sleep(Duration::from_secs(5)); thread::sleep(Duration::from_secs(5));
}
} }
}); });


// �枏��滨蔭��葡�
// �枏��滨蔭��葡�
println!("甇�銁撠肽��枏�銝脣藁 {}...", config.serial_port); println!("甇�銁撠肽��枏�銝脣藁 {}...", config.serial_port);
let mut port = serialport::new(&config.serial_port, config.baud_rate) let mut port = serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(1000)) .timeout(Duration::from_millis(1000))
@@ -195,9 +172,9 @@ async fn run_mqtt_and_serial() -> Result<()> {
.with_context(|| format!("�䭾��枏�銝脣藁 {}", config.serial_port))?; .with_context(|| format!("�䭾��枏�銝脣藁 {}", config.serial_port))?;


println!("�𣂼��枏�銝脣藁 {}!", config.serial_port); println!("�𣂼��枏�銝脣藁 {}!", config.serial_port);
println!("甇�銁霂餃��唳旿... (�Ctrl+C ���");
println!("甇�銁霂餃��唳旿... (�Ctrl+C ����)");


// 憓𧼮�蝻枏��箏之撠誩僎雿輻鍂String�亦敞蝘舀㺭�
// 憓𧼮�蝻枏��箏之撠誩僎雿輻鍂String�亦敞蝘舀㺭�
let mut serial_buf: Vec<u8> = vec![0; 1024]; let mut serial_buf: Vec<u8> = vec![0; 1024];
let mut accumulated_data = String::new(); let mut accumulated_data = String::new();
@@ -205,10 +182,13 @@ async fn run_mqtt_and_serial() -> Result<()> {
match port.read(serial_buf.as_mut_slice()) { match port.read(serial_buf.as_mut_slice()) {
Ok(t) => { Ok(t) => {
if t > 0 { if t > 0 {
// 撠�鰵�唳旿瘛餃��啁敞蝘舐�摮㛖泵銝脖葉
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
accumulated_data.push_str(&data); accumulated_data.push_str(&data);
// 璉��交糓�行�摰峕㟲��㺭�株�嚗�誑�Z�蝚行��噼膠蝚衣����
if accumulated_data.contains('\n') || accumulated_data.contains('\r') { if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
// 憭��蝝舐妖��㺭��
let lines: Vec<&str> = accumulated_data let lines: Vec<&str> = accumulated_data
.split(|c| c == '\n' || c == '\r') .split(|c| c == '\n' || c == '\r')
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
@@ -219,6 +199,7 @@ async fn run_mqtt_and_serial() -> Result<()> {
if !trimmed_data.is_empty() { if !trimmed_data.is_empty() {
println!("�嗅�摰峕㟲�唳旿: {}", trimmed_data); println!("�嗅�摰峕㟲�唳旿: {}", trimmed_data);
// �𥕦遣����唳旿蝏𤘪�
let weight_data = WeightData { let weight_data = WeightData {
weight: trimmed_data.to_string(), weight: trimmed_data.to_string(),
timestamp: std::time::SystemTime::now() timestamp: std::time::SystemTime::now()
@@ -227,42 +208,31 @@ async fn run_mqtt_and_serial() -> Result<()> {
.as_secs(), .as_secs(),
}; };


// 摨誩��𡝗㺭��
let json_data = serde_json::to_string(&weight_data)?; let json_data = serde_json::to_string(&weight_data)?;
println!("敶枏�JSON�唳旿: {}", json_data); println!("敶枏�JSON�唳旿: {}", json_data);
// �瑕�MAC�啣�
let mac_address = get_mac_address(); let mac_address = get_mac_address();
println!("MAC�啣�: {}", mac_address); println!("MAC�啣�: {}", mac_address);
let topic = format!("{}/{}", mqtt_config.topic_prefix, mac_address);
let mut retry_count = 0;
let max_retries = 3;
// 瘛餃��滩��箏�
while retry_count < max_retries {
let mut client_guard = client.lock().unwrap();
match client_guard.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) {
Ok(_) => {
println!("�𣂼��煾��㺭�桀�MQTT");
break;
}
Err(e) => {
eprintln!("MQTT�穃��躰秤 (撠肽� {}/{}): {:?}", retry_count + 1, max_retries, e);
retry_count += 1;
if retry_count < max_retries {
thread::sleep(Duration::from_secs(1));
}
}
}
// �穃��� MQTT嚗䔶蜓憸䀝葉��鉄MAC�啣�
let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
eprintln!("MQTT�穃��躰秤: {:?}", e);
} else {
println!("�𣂼��煾��㺭�桀�MQTT");
} }
} }
} }
// 皜�征蝝舐妖��㺭��
accumulated_data.clear(); accumulated_data.clear();
} }
} }
} }
} }
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// Timeout is not an error, just continue
continue; continue;
} }
Err(e) => { Err(e) => {


불러오는 중...
취소
저장