OCEAN před 2 měsíci
rodič
revize
993bd933d2
3 změnil soubory, kde provedl 36 přidání a 249 odebrání
  1. +34
    -1
      README.md
  2. +2
    -2
      config.json
  3. +0
    -246
      src/main.rs.orig

+ 34
- 1
README.md Zobrazit soubor

@@ -2,4 +2,37 @@

rust称重到mqtt
麒麟 "serial_port": "/dev/ttyS1",
windows "serial_port": "com1",
windows "serial_port": "com1",


{
"scale_type": "001",
"serial_port": "/dev/ttyS1",
"baud_rate": 9600,
"mqtt": {
"client_id": "weight_reader",
"host": "10.180.4.100",
"port": 1883,
"username": "admin",
"password": "Auseft@2025",
"keep_alive_secs": 60,
"topic_prefix": "weight/data"
}
}



{
"scale_type": "001",
"serial_port": "com1",
"baud_rate": 9600,
"mqtt": {
"client_id": "weight_reader",
"host": "192.168.0.100",
"port": 1883,
"username": "admin",
"password": "Auseft@2025",
"keep_alive_secs": 60,
"topic_prefix": "weight/data"
}
}

+ 2
- 2
config.json Zobrazit soubor

@@ -1,10 +1,10 @@
{
"scale_type": "001",
"serial_port": "/dev/ttyS1",
"serial_port": "com1",
"baud_rate": 9600,
"mqtt": {
"client_id": "weight_reader",
"host": "10.180.4.100",
"host": "192.168.0.100",
"port": 1883,
"username": "admin",
"password": "Auseft@2025",


+ 0
- 246
src/main.rs.orig Zobrazit soubor

@@ -1,246 +0,0 @@
use anyhow::{Context, Result};
use rumqttc::{Client, MqttOptions, QoS, Transport};
use serde::{Serialize, Deserialize};
use serde_json;
use std::io::{self, Read};
use std::time::Duration;
use std::thread;
use serialport;
use actix_web::{web, App, HttpServer, HttpResponse};
use actix_cors::Cors;
use std::fs;

#[derive(Serialize, Deserialize)]
struct WeightData {
weight: String,
timestamp: u64,
}

#[derive(Serialize)]
struct MacResponse {
mac_address: String,
}

#[derive(Serialize)]
struct ScaleTypeResponse {
scale_type: String,
}

#[derive(Deserialize)]
struct MqttConfig {
client_id: String,
host: String,
port: u16,
username: String,
password: String,
keep_alive_secs: u64,
topic_prefix: String,
}

#[derive(Deserialize)]
struct Config {
scale_type: String,
serial_port: String,
baud_rate: u32,
mqtt: MqttConfig,
}

fn read_config() -> Result<Config> {
let config_path = "config.json";
let content = fs::read_to_string(config_path)
.with_context(|| format!("无法读取配置文件 {}", config_path))?;
let config: Config = serde_json::from_str(&content)
.with_context(|| format!("配置文件格式错误 {}", config_path))?;
Ok(config)
}

fn get_mac_address() -> String {
match read_config() {
Ok(config) => config.scale_type,
Err(_) => String::from("Unknown")
}
}

fn get_scale_type() -> String {
match read_config() {
Ok(config) => config.scale_type,
Err(_) => String::from("Unknown")
}
}

// HTTP处理函数
async fn get_mac() -> HttpResponse {
let mac = get_mac_address();
println!("HTTP请求:获取MAC地址 = {}", mac);
let response = MacResponse {
mac_address: mac,
};
HttpResponse::Ok().json(response)
}

// 新增的HTTP处理函数
async fn get_scale() -> HttpResponse {
let scale_type = get_scale_type();
println!("HTTP请求:获取天平类型 = {}", scale_type);
let response = ScaleTypeResponse {
scale_type,
};
HttpResponse::Ok().json(response)
}

#[actix_web::main]
async fn main() -> Result<()> {
println!("程序启动...");
// 启动HTTP服务器
println!("正在启动HTTP服务器...");
// 创建HTTP服务器
let server = HttpServer::new(|| {
// 配置CORS
let cors = Cors::default()
.allow_any_origin() // 允许所有来源
.allow_any_method() // 允许所有HTTP方法
.allow_any_header() // 允许所有请求头
.max_age(3600); // 设置预检请求的缓存时间(秒)
App::new()
.wrap(cors) // 添加CORS中间件
.route("/mac", web::get().to(get_mac))
.route("/scale", web::get().to(get_scale)) // 新增的路由
})
.bind(("0.0.0.0", 8080))?
.run();
println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080");
// 在新线程中运行MQTT和串口服务
let mqtt_handle = tokio::spawn(async {
if let Err(e) = run_mqtt_and_serial().await {
eprintln!("MQTT/串口服务错误: {}", e);
}
});
// 等待HTTP服务器结束
server.await?;
Ok(())
}

// MQTT和串口处理函数
async fn run_mqtt_and_serial() -> Result<()> {
// 读取配置
let config = read_config()?;
// 创建 MQTT 客户端
let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_clean_session(true);
mqttopts.set_transport(Transport::Tcp);
mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小
let (mut client, mut connection) = Client::new(mqttopts, 10);

// 在单独的线程中处理 MQTT 连接
thread::spawn(move || {
loop { // 添加无限循环来保持重连
for notification in connection.iter() {
match notification {
Ok(_) => {},
Err(e) => {
eprintln!("MQTT连接错误: {:?}", e);
// 短暂等待后继续尝试重连
thread::sleep(Duration::from_secs(5));
break;
}
}
}
eprintln!("MQTT连接断开,5秒后尝试重连...");
thread::sleep(Duration::from_secs(5));
}
});

// 打开配置的串口
println!("正在尝试打开串口 {}...", config.serial_port);
let mut port = serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(1000))
.open()
.with_context(|| format!("无法打开串口 {}", config.serial_port))?;

println!("成功打开串口 {}!", config.serial_port);
println!("正在读取数据... (按 Ctrl+C 退出)");

// 增加缓冲区大小并使用String来累积数据
let mut serial_buf: Vec<u8> = vec![0; 1024];
let mut accumulated_data = String::new();
loop {
match port.read(serial_buf.as_mut_slice()) {
Ok(t) => {
if t > 0 {
// 将新数据添加到累积的字符串中
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
accumulated_data.push_str(&data);
// 检查是否有完整的数据行(以换行符或回车符结束)
if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
// 处理累积的数据
let lines: Vec<&str> = accumulated_data
.split(|c| c == '\n' || c == '\r')
.filter(|s| !s.is_empty())
.collect();
for line in lines {
let trimmed_data = line.trim();
if !trimmed_data.is_empty() {
println!("收到完整数据: {}", trimmed_data);
// 创建权重数据结构
let weight_data = WeightData {
weight: trimmed_data.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

// 序列化数据
let json_data = serde_json::to_string(&weight_data)?;
println!("当前JSON数据: {}", json_data);
// 获取MAC地址
let mac_address = get_mac_address();
println!("MAC地址: {}", mac_address);
// 发布到 MQTT,主题中包含MAC地址
let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
eprintln!("MQTT发布错误: {:?}", e);
} else {
println!("成功发送数据到MQTT");
}
}
}
// 清空累积的数据
accumulated_data.clear();
}
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// Timeout is not an error, just continue
continue;
}
Err(e) => {
eprintln!("发生错误: {}", e);
break;
}
}
}

Ok(())
}

Načítá se…
Zrušit
Uložit