|
- use anyhow::{Context, Result};
- use rumqttc::{Client, MqttOptions, QoS};
- use serde::{Serialize, Deserialize};
- use serde_json;
- use std::io::{self, Read};
- use std::time::Duration;
- use std::thread;
- use serialport;
- use actix_web::{web, App, HttpServer, HttpResponse};
- use actix_cors::Cors;
- use std::fs;
-
- #[derive(Serialize, Deserialize)]
- struct WeightData {
- weight: String,
- timestamp: u64,
- }
-
- #[derive(Serialize)]
- struct MacResponse {
- mac_address: String,
- }
-
- #[derive(Serialize)]
- struct ScaleTypeResponse {
- scale_type: String,
- }
-
- #[derive(Deserialize)]
- struct MqttConfig {
- client_id: String,
- host: String,
- port: u16,
- username: String,
- password: String,
- keep_alive_secs: u64,
- topic_prefix: String,
- }
-
- #[derive(Deserialize)]
- struct Config {
- scale_type: String,
- serial_port: String,
- baud_rate: u32,
- mqtt: MqttConfig,
- }
-
- fn read_config() -> Result<Config> {
- let config_path = "config.json";
- let content = fs::read_to_string(config_path)
- .with_context(|| format!("无法读取配置文件 {}", config_path))?;
- let config: Config = serde_json::from_str(&content)
- .with_context(|| format!("配置文件格式错误 {}", config_path))?;
- Ok(config)
- }
-
- fn get_mac_address() -> String {
- match read_config() {
- Ok(config) => config.scale_type,
- Err(_) => String::from("Unknown")
- }
- }
-
- fn get_scale_type() -> String {
- match read_config() {
- Ok(config) => config.scale_type,
- Err(_) => String::from("Unknown")
- }
- }
-
- // HTTP处理函数
- async fn get_mac() -> HttpResponse {
- let mac = get_mac_address();
- println!("HTTP请求:获取MAC地址 = {}", mac);
- let response = MacResponse {
- mac_address: mac,
- };
- // 在新线程中运行MQTT和串口服务
- let mqtt_handle = tokio::spawn(async {
- if let Err(e) = run_mqtt_and_serial().await {
- eprintln!("MQTT/串口服务错误: {}", e);
- }
- });
-
-
- HttpResponse::Ok().json(response)
- }
-
- // 新增的HTTP处理函数
- async fn get_scale() -> HttpResponse {
- let scale_type = get_scale_type();
- println!("HTTP请求:获取天平类型 = {}", scale_type);
- let response = ScaleTypeResponse {
- scale_type,
- };
- HttpResponse::Ok().json(response)
- }
-
- #[actix_web::main]
- async fn main() -> Result<()> {
- println!("程序启动...");
-
- // 启动HTTP服务器
- println!("正在启动HTTP服务器...");
-
- // 创建HTTP服务器
- let server = HttpServer::new(|| {
- // 配置CORS
- let cors = Cors::default()
- .allow_any_origin() // 允许所有来源
- .allow_any_method() // 允许所有HTTP方法
- .allow_any_header() // 允许所有请求头
- .max_age(3600); // 设置预检请求的缓存时间(秒)
-
- App::new()
- .wrap(cors) // 添加CORS中间件
- .route("/mac", web::get().to(get_mac))
- .route("/scale", web::get().to(get_scale)) // 新增的路由
- })
- .bind(("0.0.0.0", 8080))?
- .run();
-
- println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080");
-
-
- // 等待HTTP服务器结束
- server.await?;
-
- Ok(())
- }
-
- // MQTT和串口处理函数
- async fn run_mqtt_and_serial() -> Result<()> {
- // 读取配置
- let config = read_config()?;
-
- // 创建 MQTT 客户端
- let mut mqttopts = MqttOptions::new(
- &config.mqtt.client_id,
- &config.mqtt.host,
- config.mqtt.port
- );
- mqttopts.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive_secs));
- mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
- let (mut client, mut connection) = Client::new(mqttopts, 10);
-
- // 在单独的线程中处理 MQTT 连接
- thread::spawn(move || {
- for notification in connection.iter() {
- match notification {
- Ok(_) => {}
- Err(e) => {
- eprintln!("MQTT连接错误: {:?}", e);
- break;
- }
- }
- }
- });
-
- // 打开配置的串口
- println!("正在尝试打开串口 {}...", config.serial_port);
- let mut port = serialport::new(&config.serial_port, config.baud_rate)
- .timeout(Duration::from_millis(1000))
- .open()
- .with_context(|| format!("无法打开串口 {}", config.serial_port))?;
-
- println!("成功打开串口 {}!", config.serial_port);
- println!("正在读取数据... (按 Ctrl+C 退出)");
-
- // 增加缓冲区大小并使用String来累积数据
- let mut serial_buf: Vec<u8> = vec![0; 1024];
- let mut accumulated_data = String::new();
-
- loop {
- match port.read(serial_buf.as_mut_slice()) {
- Ok(t) => {
- if t > 0 {
- // 将新数据添加到累积的字符串中
- if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
- accumulated_data.push_str(&data);
-
- // 检查是否有完整的数据行(以换行符或回车符结束)
- if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
- // 处理累积的数据
- let lines: Vec<&str> = accumulated_data
- .split(|c| c == '\n' || c == '\r')
- .filter(|s| !s.is_empty())
- .collect();
-
- for line in lines {
- let trimmed_data = line.trim();
- if !trimmed_data.is_empty() {
- println!("收到完整数据: {}", trimmed_data);
-
- // 创建权重数据结构
- let weight_data = WeightData {
- weight: trimmed_data.to_string(),
- timestamp: std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs(),
- };
-
- // 序列化数据
- let json_data = serde_json::to_string(&weight_data)?;
- println!("当前JSON数据: {}", json_data);
-
- // 获取MAC地址
- let mac_address = get_mac_address();
- println!("MAC地址: {}", mac_address);
- // 发布到 MQTT,主题中包含MAC地址
- let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
- if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
- eprintln!("MQTT发布错误: {:?}", e);
- } else {
- println!("成功发送数据到MQTT");
- }
- }
- }
-
- // 清空累积的数据
- accumulated_data.clear();
- }
- }
- }
- }
- Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
- // Timeout is not an error, just continue
- continue;
- }
- Err(e) => {
- eprintln!("发生错误: {}", e);
- break;
- }
- }
- }
-
- Ok(())
- }
|