Преглед на файлове

Merge branch 'master' of http://112.33.111.155:3000/yanh/RustWeightService

# Conflicts:
#	config.json
#	src/main.rs
dev
OCEAN преди 3 месеца
родител
ревизия
e9f7b6762a
променени са 8 файла, в които са добавени 350 реда и са изтрити 191 реда
  1. +4
    -4
      config.json
  2. +19
    -0
      config.json.orig
  3. +0
    -122
      install.sh
  4. +0
    -0
      server-cert.pem
  5. +0
    -0
      server-cert.pemopenssl
  6. +81
    -51
      src/main.rs
  7. +246
    -0
      src/main.rs.orig
  8. +0
    -14
      weight-reader.service

+ 4
- 4
config.json Целия файл

@@ -4,11 +4,11 @@
"baud_rate": 9600,
"mqtt": {
"client_id": "weight_reader",
"host": "112.33.111.160",
"host": "10.180.4.100",
"port": 1883,
"username": "auseft",
"password": "1q2w3E**",
"username": "admin",
"password": "Auseft@2025",
"keep_alive_secs": 60,
"topic_prefix": "weight/data"
}
}
}

+ 19
- 0
config.json.orig Целия файл

@@ -0,0 +1,19 @@
{
"scale_type": "001",
"serial_port": "/dev/ttyS1",
"baud_rate": 9600,
"mqtt": {
"client_id": "weight_reader",
"host": "10.180.4.100",
"port": 1883,
<<<<<<< HEAD
"username": "auseft",
"password": "1q2w3E**",
=======
"username": "admin",
"password": "Auseft@2025",
>>>>>>> c2a749df78e5f78ad42f2fefc374d641c881253b
"keep_alive_secs": 60,
"topic_prefix": "weight/data"
}
}

+ 0
- 122
install.sh Целия файл

@@ -1,122 +0,0 @@
#!/bin/bash

# 确保脚本以root权限运行
if [ "$EUID" -ne 0 ]; then
echo "请使用root权限运行此脚本"
exit 1
fi

SERVICE_NAME="weight-reader"
INSTALL_DIR="/opt/weight-reader"
SERVICE_FILE="/etc/systemd/system/weight-reader.service"

# 创建安装目录
create_install_dir() {
echo "创建安装目录..."
mkdir -p "$INSTALL_DIR"
chmod 755 "$INSTALL_DIR"
}

# 复制文件
copy_files() {
echo "复制文件..."
cp weight_reader "$INSTALL_DIR/"
cp config.json "$INSTALL_DIR/"
cp weight-reader.service "$SERVICE_FILE"
chmod 755 "$INSTALL_DIR/weight_reader"
chmod 644 "$INSTALL_DIR/config.json"
chmod 644 "$SERVICE_FILE"
}

# 安装服务
install_service() {
echo "安装服务..."
systemctl daemon-reload
systemctl enable $SERVICE_NAME
systemctl start $SERVICE_NAME
echo "服务已安装并启动"
}

# 卸载服务
uninstall_service() {
echo "停止并卸载服务..."
systemctl stop $SERVICE_NAME
systemctl disable $SERVICE_NAME
rm -f "$SERVICE_FILE"
rm -rf "$INSTALL_DIR"
systemctl daemon-reload
echo "服务已卸载"
}

# 启动服务
start_service() {
echo "启动服务..."
systemctl start $SERVICE_NAME
echo "服务已启动"
}

# 停止服务
stop_service() {
echo "停止服务..."
systemctl stop $SERVICE_NAME
echo "服务已停止"
}

# 重启服务
restart_service() {
echo "重启服务..."
systemctl restart $SERVICE_NAME
echo "服务已重启"
}

# 查看服务状态
status_service() {
systemctl status $SERVICE_NAME
}

# 显示使用帮助
show_help() {
echo "使用方法: $0 [命令]"
echo "命令:"
echo " install - 安装并启动服务"
echo " uninstall - 停止并卸载服务"
echo " start - 启动服务"
echo " stop - 停止服务"
echo " restart - 重启服务"
echo " status - 查看服务状态"
echo " help - 显示此帮助信息"
}

# 主程序
case "$1" in
"install")
create_install_dir
copy_files
install_service
;;
"uninstall")
uninstall_service
;;
"start")
start_service
;;
"stop")
stop_service
;;
"restart")
restart_service
;;
"status")
status_service
;;
"help"|"")
show_help
;;
*)
echo "未知命令: $1"
show_help
exit 1
;;
esac

exit 0

+ 0
- 0
server-cert.pem Целия файл


+ 0
- 0
server-cert.pemopenssl Целия файл


+ 81
- 51
src/main.rs Целия файл

@@ -9,6 +9,7 @@ use serialport;
use actix_web::{web, App, HttpServer, HttpResponse};
use actix_cors::Cors;
use std::fs;
use std::sync::{Arc, Mutex};

#[derive(Serialize, Deserialize)]
struct WeightData {
@@ -26,7 +27,7 @@ struct ScaleTypeResponse {
scale_type: String,
}

#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
struct MqttConfig {
client_id: String,
host: String,
@@ -75,13 +76,21 @@ async fn get_mac() -> HttpResponse {
let response = MacResponse {
mac_address: mac,
};
// �冽鰵蝥輻�銝剛�銵愢QTT�䔶葡����
let _mqtt_handle = tokio::spawn(async {
if let Err(e) = run_mqtt_and_serial().await {
eprintln!("MQTT/銝脣藁�滚𦛚�躰秤: {}", e);
}
});
HttpResponse::Ok().json(response)
}

// �啣���TTP憭���賣㺭
async fn get_scale() -> HttpResponse {
let scale_type = get_scale_type();
println!("HTTP霂瑟�嚗朞繮�硋予撟喟掩�� = {}", scale_type);
println!("HTTP霂瑟�嚗朞繮�硋予撟喟掩�= {}", scale_type);
let response = ScaleTypeResponse {
scale_type,
};
@@ -92,79 +101,93 @@ async fn get_scale() -> HttpResponse {
async fn main() -> Result<()> {
println!("蝔见��臬𢆡...");
// �臬𢆡HTTP�滚𦛚�
println!("甇�銁�臬𢆡HTTP�滚𦛚��...");
// �臬𢆡HTTP�滚𦛚�
println!("甇�銁�臬𢆡HTTP�滚𦛚�..");
// �𥕦遣HTTP�滚𦛚�
// �𥕦遣HTTP�滚𦛚�
let server = HttpServer::new(|| {
// �滨蔭CORS
let cors = Cors::default()
.allow_any_origin() // ��捂���㗇䔉皞
.allow_any_origin() // ��捂���㗇䔉皞
.allow_any_method() // ��捂���鵎TTP�寞�
.allow_any_header() // ��捂���㕑窈瘙�仍
.max_age(3600); // 霈曄蔭憸��霂瑟����摮䀹𧒄�湛�蝘𡜐�
App::new()
.wrap(cors) // 瘛餃�CORS銝剝𡢿隞
.wrap(cors) // 瘛餃�CORS銝剝𡢿隞
.route("/mac", web::get().to(get_mac))
.route("/scale", web::get().to(get_scale)) // �啣���楝�
.route("/scale", web::get().to(get_scale)) // �啣���楝�
})
.bind(("0.0.0.0", 8080))?
.run();
println!("HTTP�滚𦛚�典歇�臬𢆡嚗𣬚��砍銁 http://127.0.0.1:8080");
// �冽鰵蝥輻�銝剛�銵愢QTT�䔶葡�����
let mqtt_handle = tokio::spawn(async {
if let Err(e) = run_mqtt_and_serial().await {
eprintln!("MQTT/銝脣藁�滚𦛚�躰秤: {}", e);
}
});
// 蝑匧�HTTP�滚𦛚�函���
// 蝑匧�HTTP�滚𦛚�函��
server.await?;
Ok(())
}

// MQTT�䔶葡�����遆�
// MQTT�䔶葡�����遆�
async fn run_mqtt_and_serial() -> Result<()> {
// 霂餃��滨蔭
let config = read_config()?;
// �𥕦遣 MQTT 摰X�蝡�
let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 憓𧼮�keep-alive�園𡢿��60蝘�
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_clean_session(true);
mqttopts.set_transport(Transport::Tcp);
mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 霈曄蔭�煾����交𤣰���憭批�憭批�
let (mut client, mut connection) = Client::new(mqttopts, 10);

// �典��祉�蝥輻�銝剖��� MQTT 餈墧𦻖
// 璉��乩葡��糓�西◤�删鍂
println!("甇�銁璉��乩葡�{} �臬炏�舐鍂...", config.serial_port);
match serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(10))
.open() {
Err(_e) => {
eprintln!("銝脣藁 {} 撌脰◤�删鍂", config.serial_port);
return Ok(());
},
Ok(_) => {}
}
let mqtt_config = config.mqtt.clone();
// �𥕦遣MQTT餈墧𦻖�賣㺭
let create_mqtt_client = move || -> (Client, rumqttc::Connection) {
let mut mqttopts = MqttOptions::new(
&mqtt_config.client_id,
&mqtt_config.host,
mqtt_config.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 憓𧼮�keep-alive�園𡢿�0蝘
mqttopts.set_credentials(&mqtt_config.username, &mqtt_config.password);

// �嘥��䤼QTT摰X�蝡
let (client, mut connection) = create_mqtt_client();
let client = Arc::new(Mutex::new(client));
let client_clone = client.clone();

// �典��祉�蝥輻�銝剖���QTT餈墧𦻖
thread::spawn(move || {
loop { // 瘛餃��𣳇�敺芰㴓�乩����餈�
loop { // 瘛餃��𣳇�敺芰㴓�乩����餈
for notification in connection.iter() {
match notification {
Ok(_) => {},
Err(e) => {
eprintln!("MQTT餈墧𦻖�躰秤: {:?}", e);
// �剜�蝑匧��𡒊誧蝏剖�霂閖�餈�
// �剜�蝑匧��𡒊誧蝏剖�霂閖�餈
thread::sleep(Duration::from_secs(5));
let (new_client, new_connection) = create_mqtt_client();
*client_clone.lock().unwrap() = new_client;
connection = new_connection;
break;
}
}
}
eprintln!("MQTT餈墧𦻖�剖�嚗�5蝘鍦�撠肽��滩�...");
eprintln!("MQTT餈墧𦻖�剖�嚗蝘鍦�撠肽��滩�...");
thread::sleep(Duration::from_secs(5));
}
}
});

// �枏��滨蔭��葡�
// �枏��滨蔭��葡�
println!("甇�銁撠肽��枏�銝脣藁 {}...", config.serial_port);
let mut port = serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(1000))
@@ -172,9 +195,9 @@ async fn run_mqtt_and_serial() -> Result<()> {
.with_context(|| format!("�䭾��枏�銝脣藁 {}", config.serial_port))?;

println!("�𣂼��枏�銝脣藁 {}!", config.serial_port);
println!("甇�銁霂餃��唳旿... (�Ctrl+C ����)");
println!("甇�銁霂餃��唳旿... (�Ctrl+C ���");

// 憓𧼮�蝻枏��箏之撠誩僎雿輻鍂String�亦敞蝘舀㺭�
// 憓𧼮�蝻枏��箏之撠誩僎雿輻鍂String�亦敞蝘舀㺭�
let mut serial_buf: Vec<u8> = vec![0; 1024];
let mut accumulated_data = String::new();
@@ -182,13 +205,10 @@ async fn run_mqtt_and_serial() -> Result<()> {
match port.read(serial_buf.as_mut_slice()) {
Ok(t) => {
if t > 0 {
// 撠�鰵�唳旿瘛餃��啁敞蝘舐�摮㛖泵銝脖葉
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
accumulated_data.push_str(&data);
// 璉��交糓�行�摰峕㟲��㺭�株�嚗�誑�Z�蝚行��噼膠蝚衣����
if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
// 憭��蝝舐妖��㺭��
let lines: Vec<&str> = accumulated_data
.split(|c| c == '\n' || c == '\r')
.filter(|s| !s.is_empty())
@@ -199,7 +219,6 @@ async fn run_mqtt_and_serial() -> Result<()> {
if !trimmed_data.is_empty() {
println!("�嗅�摰峕㟲�唳旿: {}", trimmed_data);
// �𥕦遣����唳旿蝏𤘪�
let weight_data = WeightData {
weight: trimmed_data.to_string(),
timestamp: std::time::SystemTime::now()
@@ -208,31 +227,42 @@ async fn run_mqtt_and_serial() -> Result<()> {
.as_secs(),
};

// 摨誩��𡝗㺭��
let json_data = serde_json::to_string(&weight_data)?;
println!("敶枏�JSON�唳旿: {}", json_data);
// �瑕�MAC�啣�
let mac_address = get_mac_address();
println!("MAC�啣�: {}", mac_address);
// �穃��� MQTT嚗䔶蜓憸䀝葉��鉄MAC�啣�
let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
eprintln!("MQTT�穃��躰秤: {:?}", e);
} else {
println!("�𣂼��煾��㺭�桀�MQTT");
let topic = format!("{}/{}", mqtt_config.topic_prefix, mac_address);
let mut retry_count = 0;
let max_retries = 3;
// 瘛餃��滩��箏�
while retry_count < max_retries {
let mut client_guard = client.lock().unwrap();
match client_guard.publish(&topic, QoS::AtLeastOnce, false, json_data.clone()) {
Ok(_) => {
println!("�𣂼��煾��㺭�桀�MQTT");
break;
}
Err(e) => {
eprintln!("MQTT�穃��躰秤 (撠肽� {}/{}): {:?}", retry_count + 1, max_retries, e);
retry_count += 1;
if retry_count < max_retries {
thread::sleep(Duration::from_secs(1));
}
}
}
}
}
}
// 皜�征蝝舐妖��㺭��
accumulated_data.clear();
}
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// Timeout is not an error, just continue
continue;
}
Err(e) => {


+ 246
- 0
src/main.rs.orig Целия файл

@@ -0,0 +1,246 @@
use anyhow::{Context, Result};
use rumqttc::{Client, MqttOptions, QoS, Transport};
use serde::{Serialize, Deserialize};
use serde_json;
use std::io::{self, Read};
use std::time::Duration;
use std::thread;
use serialport;
use actix_web::{web, App, HttpServer, HttpResponse};
use actix_cors::Cors;
use std::fs;

#[derive(Serialize, Deserialize)]
struct WeightData {
weight: String,
timestamp: u64,
}

#[derive(Serialize)]
struct MacResponse {
mac_address: String,
}

#[derive(Serialize)]
struct ScaleTypeResponse {
scale_type: String,
}

#[derive(Deserialize)]
struct MqttConfig {
client_id: String,
host: String,
port: u16,
username: String,
password: String,
keep_alive_secs: u64,
topic_prefix: String,
}

#[derive(Deserialize)]
struct Config {
scale_type: String,
serial_port: String,
baud_rate: u32,
mqtt: MqttConfig,
}

fn read_config() -> Result<Config> {
let config_path = "config.json";
let content = fs::read_to_string(config_path)
.with_context(|| format!("无法读取配置文件 {}", config_path))?;
let config: Config = serde_json::from_str(&content)
.with_context(|| format!("配置文件格式错误 {}", config_path))?;
Ok(config)
}

fn get_mac_address() -> String {
match read_config() {
Ok(config) => config.scale_type,
Err(_) => String::from("Unknown")
}
}

fn get_scale_type() -> String {
match read_config() {
Ok(config) => config.scale_type,
Err(_) => String::from("Unknown")
}
}

// HTTP处理函数
async fn get_mac() -> HttpResponse {
let mac = get_mac_address();
println!("HTTP请求:获取MAC地址 = {}", mac);
let response = MacResponse {
mac_address: mac,
};
HttpResponse::Ok().json(response)
}

// 新增的HTTP处理函数
async fn get_scale() -> HttpResponse {
let scale_type = get_scale_type();
println!("HTTP请求:获取天平类型 = {}", scale_type);
let response = ScaleTypeResponse {
scale_type,
};
HttpResponse::Ok().json(response)
}

#[actix_web::main]
async fn main() -> Result<()> {
println!("程序启动...");
// 启动HTTP服务器
println!("正在启动HTTP服务器...");
// 创建HTTP服务器
let server = HttpServer::new(|| {
// 配置CORS
let cors = Cors::default()
.allow_any_origin() // 允许所有来源
.allow_any_method() // 允许所有HTTP方法
.allow_any_header() // 允许所有请求头
.max_age(3600); // 设置预检请求的缓存时间(秒)
App::new()
.wrap(cors) // 添加CORS中间件
.route("/mac", web::get().to(get_mac))
.route("/scale", web::get().to(get_scale)) // 新增的路由
})
.bind(("0.0.0.0", 8080))?
.run();
println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080");
// 在新线程中运行MQTT和串口服务
let mqtt_handle = tokio::spawn(async {
if let Err(e) = run_mqtt_and_serial().await {
eprintln!("MQTT/串口服务错误: {}", e);
}
});
// 等待HTTP服务器结束
server.await?;
Ok(())
}

// MQTT和串口处理函数
async fn run_mqtt_and_serial() -> Result<()> {
// 读取配置
let config = read_config()?;
// 创建 MQTT 客户端
let mut mqttopts = MqttOptions::new(
&config.mqtt.client_id,
&config.mqtt.host,
config.mqtt.port
);
mqttopts.set_keep_alive(Duration::from_secs(60)); // 增加keep-alive时间到60秒
mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
mqttopts.set_clean_session(true);
mqttopts.set_transport(Transport::Tcp);
mqttopts.set_max_packet_size(100 * 1024, 100 * 1024); // 设置发送和接收的最大包大小
let (mut client, mut connection) = Client::new(mqttopts, 10);

// 在单独的线程中处理 MQTT 连接
thread::spawn(move || {
loop { // 添加无限循环来保持重连
for notification in connection.iter() {
match notification {
Ok(_) => {},
Err(e) => {
eprintln!("MQTT连接错误: {:?}", e);
// 短暂等待后继续尝试重连
thread::sleep(Duration::from_secs(5));
break;
}
}
}
eprintln!("MQTT连接断开,5秒后尝试重连...");
thread::sleep(Duration::from_secs(5));
}
});

// 打开配置的串口
println!("正在尝试打开串口 {}...", config.serial_port);
let mut port = serialport::new(&config.serial_port, config.baud_rate)
.timeout(Duration::from_millis(1000))
.open()
.with_context(|| format!("无法打开串口 {}", config.serial_port))?;

println!("成功打开串口 {}!", config.serial_port);
println!("正在读取数据... (按 Ctrl+C 退出)");

// 增加缓冲区大小并使用String来累积数据
let mut serial_buf: Vec<u8> = vec![0; 1024];
let mut accumulated_data = String::new();
loop {
match port.read(serial_buf.as_mut_slice()) {
Ok(t) => {
if t > 0 {
// 将新数据添加到累积的字符串中
if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
accumulated_data.push_str(&data);
// 检查是否有完整的数据行(以换行符或回车符结束)
if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
// 处理累积的数据
let lines: Vec<&str> = accumulated_data
.split(|c| c == '\n' || c == '\r')
.filter(|s| !s.is_empty())
.collect();
for line in lines {
let trimmed_data = line.trim();
if !trimmed_data.is_empty() {
println!("收到完整数据: {}", trimmed_data);
// 创建权重数据结构
let weight_data = WeightData {
weight: trimmed_data.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

// 序列化数据
let json_data = serde_json::to_string(&weight_data)?;
println!("当前JSON数据: {}", json_data);
// 获取MAC地址
let mac_address = get_mac_address();
println!("MAC地址: {}", mac_address);
// 发布到 MQTT,主题中包含MAC地址
let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
eprintln!("MQTT发布错误: {:?}", e);
} else {
println!("成功发送数据到MQTT");
}
}
}
// 清空累积的数据
accumulated_data.clear();
}
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
// Timeout is not an error, just continue
continue;
}
Err(e) => {
eprintln!("发生错误: {}", e);
break;
}
}
}

Ok(())
}

+ 0
- 14
weight-reader.service Целия файл

@@ -1,14 +0,0 @@
[Unit]
Description=Weight Reader Service
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/weight-reader
ExecStart=/opt/weight-reader/weight_reader
Restart=always
RestartSec=3

[Install]
WantedBy=multi-user.target

Loading…
Отказ
Запис