diff --git a/config.json b/config.json index 3c35469..e7b4b47 100644 --- a/config.json +++ b/config.json @@ -4,11 +4,11 @@ "baud_rate": 9600, "mqtt": { "client_id": "weight_reader", - "host": "112.33.111.160", + "host": "10.180.4.100", "port": 1883, - "username": "auseft", - "password": "1q2w3E**", + "username": "admin", + "password": "Auseft@2025", "keep_alive_secs": 60, "topic_prefix": "weight/data" } -} +} \ No newline at end of file diff --git a/config.json.orig b/config.json.orig new file mode 100644 index 0000000..96c67c2 --- /dev/null +++ b/config.json.orig @@ -0,0 +1,19 @@ +{ + "scale_type": "001", + "serial_port": "/dev/ttyS1", + "baud_rate": 9600, + "mqtt": { + "client_id": "weight_reader", + "host": "10.180.4.100", + "port": 1883, +<<<<<<< HEAD + "username": "auseft", + "password": "1q2w3E**", +======= + "username": "admin", + "password": "Auseft@2025", +>>>>>>> c2a749df78e5f78ad42f2fefc374d641c881253b + "keep_alive_secs": 60, + "topic_prefix": "weight/data" + } +} \ No newline at end of file diff --git a/install.sh b/install.sh deleted file mode 100644 index 3376127..0000000 --- a/install.sh +++ /dev/null @@ -1,122 +0,0 @@ -#!/bin/bash - -# 确保脚本以root权限运行 -if [ "$EUID" -ne 0 ]; then - echo "请使用root权限运行此脚本" - exit 1 -fi - -SERVICE_NAME="weight-reader" -INSTALL_DIR="/opt/weight-reader" -SERVICE_FILE="/etc/systemd/system/weight-reader.service" - -# 创建安装目录 -create_install_dir() { - echo "创建安装目录..." - mkdir -p "$INSTALL_DIR" - chmod 755 "$INSTALL_DIR" -} - -# 复制文件 -copy_files() { - echo "复制文件..." - cp weight_reader "$INSTALL_DIR/" - cp config.json "$INSTALL_DIR/" - cp weight-reader.service "$SERVICE_FILE" - chmod 755 "$INSTALL_DIR/weight_reader" - chmod 644 "$INSTALL_DIR/config.json" - chmod 644 "$SERVICE_FILE" -} - -# 安装服务 -install_service() { - echo "安装服务..." - systemctl daemon-reload - systemctl enable $SERVICE_NAME - systemctl start $SERVICE_NAME - echo "服务已安装并启动" -} - -# 卸载服务 -uninstall_service() { - echo "停止并卸载服务..." - systemctl stop $SERVICE_NAME - systemctl disable $SERVICE_NAME - rm -f "$SERVICE_FILE" - rm -rf "$INSTALL_DIR" - systemctl daemon-reload - echo "服务已卸载" -} - -# 启动服务 -start_service() { - echo "启动服务..." - systemctl start $SERVICE_NAME - echo "服务已启动" -} - -# 停止服务 -stop_service() { - echo "停止服务..." - systemctl stop $SERVICE_NAME - echo "服务已停止" -} - -# 重启服务 -restart_service() { - echo "重启服务..." - systemctl restart $SERVICE_NAME - echo "服务已重启" -} - -# 查看服务状态 -status_service() { - systemctl status $SERVICE_NAME -} - -# 显示使用帮助 -show_help() { - echo "使用方法: $0 [命令]" - echo "命令:" - echo " install - 安装并启动服务" - echo " uninstall - 停止并卸载服务" - echo " start - 启动服务" - echo " stop - 停止服务" - echo " restart - 重启服务" - echo " status - 查看服务状态" - echo " help - 显示此帮助信息" -} - -# 主程序 -case "$1" in - "install") - create_install_dir - copy_files - install_service - ;; - "uninstall") - uninstall_service - ;; - "start") - start_service - ;; - "stop") - stop_service - ;; - "restart") - restart_service - ;; - "status") - status_service - ;; - "help"|"") - show_help - ;; - *) - echo "未知命令: $1" - show_help - exit 1 - ;; -esac - -exit 0 diff --git a/server-cert.pem b/server-cert.pem new file mode 100644 index 0000000..e69de29 diff --git a/server-cert.pemopenssl b/server-cert.pemopenssl new file mode 100644 index 0000000..e69de29 diff --git a/src/main.rs b/src/main.rs index 02064e5..49fb6b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use serialport; use actix_web::{web, App, HttpServer, HttpResponse}; use actix_cors::Cors; use std::fs; +use std::sync::{Arc, Mutex}; #[derive(Serialize, Deserialize)] struct WeightData { @@ -26,7 +27,7 @@ struct ScaleTypeResponse { scale_type: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] struct MqttConfig { client_id: String, host: String, @@ -75,13 +76,21 @@ async fn get_mac() -> HttpResponse { let response = MacResponse { mac_address: mac, }; + // 在新线程中运行MQTT和串口服 + let _mqtt_handle = tokio::spawn(async { + if let Err(e) = run_mqtt_and_serial().await { + eprintln!("MQTT/串口服务错误: {}", e); + } + }); + + HttpResponse::Ok().json(response) } // 新增的HTTP处理函数 async fn get_scale() -> HttpResponse { let scale_type = get_scale_type(); - println!("HTTP请求:获取天平类型 = {}", scale_type); + println!("HTTP请求:获取天平类= {}", scale_type); let response = ScaleTypeResponse { scale_type, }; @@ -92,79 +101,93 @@ async fn get_scale() -> HttpResponse { async fn main() -> Result<()> { println!("程序启动..."); - // 启动HTTP服务器 - println!("正在启动HTTP服务器..."); + // 启动HTTP服务 + println!("正在启动HTTP服务.."); - // 创建HTTP服务器 + // 创建HTTP服务 let server = HttpServer::new(|| { // 配置CORS let cors = Cors::default() - .allow_any_origin() // 允许所有来源 + .allow_any_origin() // 允许所有来 .allow_any_method() // 允许所有HTTP方法 .allow_any_header() // 允许所有请求头 .max_age(3600); // 设置预检请求的缓存时间(秒) App::new() - .wrap(cors) // 添加CORS中间件 + .wrap(cors) // 添加CORS中间 .route("/mac", web::get().to(get_mac)) - .route("/scale", web::get().to(get_scale)) // 新增的路由 + .route("/scale", web::get().to(get_scale)) // 新增的路 }) .bind(("0.0.0.0", 8080))? .run(); println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080"); - // 在新线程中运行MQTT和串口服务 - let mqtt_handle = tokio::spawn(async { - if let Err(e) = run_mqtt_and_serial().await { - eprintln!("MQTT/串口服务错误: {}", e); - } - }); - - // 等待HTTP服务器结束 + + // 等待HTTP服务器结 server.await?; Ok(()) } -// MQTT和串口处理函数 +// MQTT和串口处理函 async fn run_mqtt_and_serial() -> Result<()> { // 读取配置 let config = read_config()?; - // 创建 MQTT 客户端 - let mut mqttopts = MqttOptions::new( - &config.mqtt.client_id, - &config.mqtt.host, - config.mqtt.port - ); - mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒 - mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); - mqttopts.set_clean_session(true); - mqttopts.set_transport(Transport::Tcp); - mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小 - let (mut client, mut connection) = Client::new(mqttopts, 10); - - // 在单独的线程中处理 MQTT 连接 + // 检查串口是否被占用 + println!("正在检查串{} 是否可用...", config.serial_port); + match serialport::new(&config.serial_port, config.baud_rate) + .timeout(Duration::from_millis(10)) + .open() { + Err(_e) => { + eprintln!("串口 {} 已被占用", config.serial_port); + return Ok(()); + }, + Ok(_) => {} + } + + let mqtt_config = config.mqtt.clone(); + + // 创建MQTT连接函数 + let create_mqtt_client = move || -> (Client, rumqttc::Connection) { + let mut mqttopts = MqttOptions::new( + &mqtt_config.client_id, + &mqtt_config.host, + mqtt_config.port + ); + mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间0 + mqttopts.set_credentials(&mqtt_config.username, &mqtt_config.password); + + // 初始化MQTT客户 + let (client, mut connection) = create_mqtt_client(); + let client = Arc::new(Mutex::new(client)); + let client_clone = client.clone(); + + // 在单独的线程中处理MQTT连接 thread::spawn(move || { - loop { // 添加无限循环来保持重连 + loop { // 添加无限循环来保持重 for notification in connection.iter() { match notification { Ok(_) => {}, Err(e) => { eprintln!("MQTT连接错误: {:?}", e); - // 短暂等待后继续尝试重连 + // 短暂等待后继续尝试重 thread::sleep(Duration::from_secs(5)); + let (new_client, new_connection) = create_mqtt_client(); + *client_clone.lock().unwrap() = new_client; + connection = new_connection; break; } } } - eprintln!("MQTT连接断开,5秒后尝试重连..."); + eprintln!("MQTT连接断开秒后尝试重连..."); thread::sleep(Duration::from_secs(5)); + } } }); - // 打开配置的串口 + // 打开配置的串 println!("正在尝试打开串口 {}...", config.serial_port); let mut port = serialport::new(&config.serial_port, config.baud_rate) .timeout(Duration::from_millis(1000)) @@ -172,9 +195,9 @@ async fn run_mqtt_and_serial() -> Result<()> { .with_context(|| format!("无法打开串口 {}", config.serial_port))?; println!("成功打开串口 {}!", config.serial_port); - println!("正在读取数据... (按 Ctrl+C 退出)"); + println!("正在读取数据... (Ctrl+C 退"); - // 增加缓冲区大小并使用String来累积数据 + // 增加缓冲区大小并使用String来累积数 let mut serial_buf: Vec = vec![0; 1024]; let mut accumulated_data = String::new(); @@ -182,13 +205,10 @@ async fn run_mqtt_and_serial() -> Result<()> { match port.read(serial_buf.as_mut_slice()) { Ok(t) => { if t > 0 { - // 将新数据添加到累积的字符串中 if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { accumulated_data.push_str(&data); - // 检查是否有完整的数据行(以换行符或回车符结束) if accumulated_data.contains('\n') || accumulated_data.contains('\r') { - // 处理累积的数据 let lines: Vec<&str> = accumulated_data .split(|c| c == '\n' || c == '\r') .filter(|s| !s.is_empty()) @@ -199,7 +219,6 @@ async fn run_mqtt_and_serial() -> Result<()> { if !trimmed_data.is_empty() { println!("收到完整数据: {}", trimmed_data); - // 创建权重数据结构 let weight_data = WeightData { weight: trimmed_data.to_string(), timestamp: std::time::SystemTime::now() @@ -208,31 +227,42 @@ async fn run_mqtt_and_serial() -> Result<()> { .as_secs(), }; - // 序列化数据 let json_data = serde_json::to_string(&weight_data)?; println!("当前JSON数据: {}", json_data); - // 获取MAC地址 let mac_address = get_mac_address(); println!("MAC地址: {}", mac_address); - // 发布到 MQTT,主题中包含MAC地址 - let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address); - if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) { - eprintln!("MQTT发布错误: {:?}", e); - } else { - println!("成功发送数据到MQTT"); + + let topic = format!("{}/{}", mqtt_config.topic_prefix, mac_address); + let mut retry_count = 0; + let max_retries = 3; + + // 添加重试机制 + while retry_count < max_retries { + let mut client_guard = client.lock().unwrap(); + match client_guard.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) { + Ok(_) => { + println!("成功发送数据到MQTT"); + break; + } + Err(e) => { + eprintln!("MQTT发布错误 (尝试 {}/{}): {:?}", retry_count + 1, max_retries, e); + retry_count += 1; + if retry_count < max_retries { + thread::sleep(Duration::from_secs(1)); + } + } + } } } } - // 清空累积的数据 accumulated_data.clear(); } } } } Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { - // Timeout is not an error, just continue continue; } Err(e) => { diff --git a/src/main.rs.orig b/src/main.rs.orig new file mode 100644 index 0000000..02064e5 --- /dev/null +++ b/src/main.rs.orig @@ -0,0 +1,246 @@ +use anyhow::{Context, Result}; +use rumqttc::{Client, MqttOptions, QoS, Transport}; +use serde::{Serialize, Deserialize}; +use serde_json; +use std::io::{self, Read}; +use std::time::Duration; +use std::thread; +use serialport; +use actix_web::{web, App, HttpServer, HttpResponse}; +use actix_cors::Cors; +use std::fs; + +#[derive(Serialize, Deserialize)] +struct WeightData { + weight: String, + timestamp: u64, +} + +#[derive(Serialize)] +struct MacResponse { + mac_address: String, +} + +#[derive(Serialize)] +struct ScaleTypeResponse { + scale_type: String, +} + +#[derive(Deserialize)] +struct MqttConfig { + client_id: String, + host: String, + port: u16, + username: String, + password: String, + keep_alive_secs: u64, + topic_prefix: String, +} + +#[derive(Deserialize)] +struct Config { + scale_type: String, + serial_port: String, + baud_rate: u32, + mqtt: MqttConfig, +} + +fn read_config() -> Result { + let config_path = "config.json"; + let content = fs::read_to_string(config_path) + .with_context(|| format!("无法读取配置文件 {}", config_path))?; + let config: Config = serde_json::from_str(&content) + .with_context(|| format!("配置文件格式错误 {}", config_path))?; + Ok(config) +} + +fn get_mac_address() -> String { + match read_config() { + Ok(config) => config.scale_type, + Err(_) => String::from("Unknown") + } +} + +fn get_scale_type() -> String { + match read_config() { + Ok(config) => config.scale_type, + Err(_) => String::from("Unknown") + } +} + +// HTTP处理函数 +async fn get_mac() -> HttpResponse { + let mac = get_mac_address(); + println!("HTTP请求:获取MAC地址 = {}", mac); + let response = MacResponse { + mac_address: mac, + }; + HttpResponse::Ok().json(response) +} + +// 新增的HTTP处理函数 +async fn get_scale() -> HttpResponse { + let scale_type = get_scale_type(); + println!("HTTP请求:获取天平类型 = {}", scale_type); + let response = ScaleTypeResponse { + scale_type, + }; + HttpResponse::Ok().json(response) +} + +#[actix_web::main] +async fn main() -> Result<()> { + println!("程序启动..."); + + // 启动HTTP服务器 + println!("正在启动HTTP服务器..."); + + // 创建HTTP服务器 + let server = HttpServer::new(|| { + // 配置CORS + let cors = Cors::default() + .allow_any_origin() // 允许所有来源 + .allow_any_method() // 允许所有HTTP方法 + .allow_any_header() // 允许所有请求头 + .max_age(3600); // 设置预检请求的缓存时间(秒) + + App::new() + .wrap(cors) // 添加CORS中间件 + .route("/mac", web::get().to(get_mac)) + .route("/scale", web::get().to(get_scale)) // 新增的路由 + }) + .bind(("0.0.0.0", 8080))? + .run(); + + println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080"); + + // 在新线程中运行MQTT和串口服务 + let mqtt_handle = tokio::spawn(async { + if let Err(e) = run_mqtt_and_serial().await { + eprintln!("MQTT/串口服务错误: {}", e); + } + }); + + // 等待HTTP服务器结束 + server.await?; + + Ok(()) +} + +// MQTT和串口处理函数 +async fn run_mqtt_and_serial() -> Result<()> { + // 读取配置 + let config = read_config()?; + + // 创建 MQTT 客户端 + let mut mqttopts = MqttOptions::new( + &config.mqtt.client_id, + &config.mqtt.host, + config.mqtt.port + ); + mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒 + mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password); + mqttopts.set_clean_session(true); + mqttopts.set_transport(Transport::Tcp); + mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小 + let (mut client, mut connection) = Client::new(mqttopts, 10); + + // 在单独的线程中处理 MQTT 连接 + thread::spawn(move || { + loop { // 添加无限循环来保持重连 + for notification in connection.iter() { + match notification { + Ok(_) => {}, + Err(e) => { + eprintln!("MQTT连接错误: {:?}", e); + // 短暂等待后继续尝试重连 + thread::sleep(Duration::from_secs(5)); + break; + } + } + } + eprintln!("MQTT连接断开,5秒后尝试重连..."); + thread::sleep(Duration::from_secs(5)); + } + }); + + // 打开配置的串口 + println!("正在尝试打开串口 {}...", config.serial_port); + let mut port = serialport::new(&config.serial_port, config.baud_rate) + .timeout(Duration::from_millis(1000)) + .open() + .with_context(|| format!("无法打开串口 {}", config.serial_port))?; + + println!("成功打开串口 {}!", config.serial_port); + println!("正在读取数据... (按 Ctrl+C 退出)"); + + // 增加缓冲区大小并使用String来累积数据 + let mut serial_buf: Vec = vec![0; 1024]; + let mut accumulated_data = String::new(); + + loop { + match port.read(serial_buf.as_mut_slice()) { + Ok(t) => { + if t > 0 { + // 将新数据添加到累积的字符串中 + if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) { + accumulated_data.push_str(&data); + + // 检查是否有完整的数据行(以换行符或回车符结束) + if accumulated_data.contains('\n') || accumulated_data.contains('\r') { + // 处理累积的数据 + let lines: Vec<&str> = accumulated_data + .split(|c| c == '\n' || c == '\r') + .filter(|s| !s.is_empty()) + .collect(); + + for line in lines { + let trimmed_data = line.trim(); + if !trimmed_data.is_empty() { + println!("收到完整数据: {}", trimmed_data); + + // 创建权重数据结构 + let weight_data = WeightData { + weight: trimmed_data.to_string(), + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + // 序列化数据 + let json_data = serde_json::to_string(&weight_data)?; + println!("当前JSON数据: {}", json_data); + + // 获取MAC地址 + let mac_address = get_mac_address(); + println!("MAC地址: {}", mac_address); + // 发布到 MQTT,主题中包含MAC地址 + let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address); + if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) { + eprintln!("MQTT发布错误: {:?}", e); + } else { + println!("成功发送数据到MQTT"); + } + } + } + + // 清空累积的数据 + accumulated_data.clear(); + } + } + } + } + Err(ref e) if e.kind() == io::ErrorKind::TimedOut => { + // Timeout is not an error, just continue + continue; + } + Err(e) => { + eprintln!("发生错误: {}", e); + break; + } + } + } + + Ok(()) +} diff --git a/weight-reader.service b/weight-reader.service deleted file mode 100644 index 9a8f0c0..0000000 --- a/weight-reader.service +++ /dev/null @@ -1,14 +0,0 @@ -[Unit] -Description=Weight Reader Service -After=network.target - -[Service] -Type=simple -User=root -WorkingDirectory=/opt/weight-reader -ExecStart=/opt/weight-reader/weight_reader -Restart=always -RestartSec=3 - -[Install] -WantedBy=multi-user.target