rust称重到mqtt
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

254 rindas
8.3KB

  1. use anyhow::{Context, Result};
  2. use rumqttc::{Client, MqttOptions, QoS};
  3. use serde::{Serialize, Deserialize};
  4. use serde_json;
  5. use std::io::{self, Read};
  6. use std::time::Duration;
  7. use std::thread;
  8. use serialport;
  9. use actix_web::{web, App, HttpServer, HttpResponse};
  10. use actix_cors::Cors;
  11. use std::fs;
  12. #[derive(Serialize, Deserialize)]
  13. struct WeightData {
  14. weight: String,
  15. timestamp: u64,
  16. }
  17. #[derive(Serialize)]
  18. struct MacResponse {
  19. mac_address: String,
  20. }
  21. #[derive(Serialize)]
  22. struct ScaleTypeResponse {
  23. scale_type: String,
  24. }
  25. #[derive(Deserialize)]
  26. struct MqttConfig {
  27. client_id: String,
  28. host: String,
  29. port: u16,
  30. username: String,
  31. password: String,
  32. keep_alive_secs: u64,
  33. topic_prefix: String,
  34. }
  35. #[derive(Deserialize)]
  36. struct Config {
  37. scale_type: String,
  38. serial_port: String,
  39. baud_rate: u32,
  40. mqtt: MqttConfig,
  41. }
  42. fn read_config() -> Result<Config> {
  43. let config_path = "config.json";
  44. let content = fs::read_to_string(config_path)
  45. .with_context(|| format!("无法读取配置文件 {}", config_path))?;
  46. let config: Config = serde_json::from_str(&content)
  47. .with_context(|| format!("配置文件格式错误 {}", config_path))?;
  48. Ok(config)
  49. }
  50. fn get_mac_address() -> String {
  51. match read_config() {
  52. Ok(config) => config.scale_type,
  53. Err(_) => String::from("Unknown")
  54. }
  55. }
  56. fn get_scale_type() -> String {
  57. match read_config() {
  58. Ok(config) => config.scale_type,
  59. Err(_) => String::from("Unknown")
  60. }
  61. }
  62. // HTTP处理函数
  63. async fn get_mac() -> HttpResponse {
  64. let mac = get_mac_address();
  65. println!("HTTP请求:获取MAC地址 = {}", mac);
  66. let response = MacResponse {
  67. mac_address: mac,
  68. };
  69. // 在新线程中运行MQTT和串口服务
  70. let mqtt_handle = tokio::spawn(async {
  71. if let Err(e) = run_mqtt_and_serial().await {
  72. eprintln!("MQTT/串口服务错误: {}", e);
  73. }
  74. });
  75. HttpResponse::Ok().json(response)
  76. }
  77. // 新增的HTTP处理函数
  78. async fn get_scale() -> HttpResponse {
  79. let scale_type = get_scale_type();
  80. println!("HTTP请求:获取天平类型 = {}", scale_type);
  81. let response = ScaleTypeResponse {
  82. scale_type,
  83. };
  84. HttpResponse::Ok().json(response)
  85. }
  86. #[actix_web::main]
  87. async fn main() -> Result<()> {
  88. println!("程序启动...");
  89. // 启动HTTP服务器
  90. println!("正在启动HTTP服务器...");
  91. // 创建HTTP服务器
  92. let server = HttpServer::new(|| {
  93. // 配置CORS
  94. let cors = Cors::default()
  95. .allow_any_origin() // 允许所有来源
  96. .allow_any_method() // 允许所有HTTP方法
  97. .allow_any_header() // 允许所有请求头
  98. .max_age(3600); // 设置预检请求的缓存时间(秒)
  99. App::new()
  100. .wrap(cors) // 添加CORS中间件
  101. .route("/mac", web::get().to(get_mac))
  102. .route("/scale", web::get().to(get_scale)) // 新增的路由
  103. })
  104. .bind(("0.0.0.0", 8080))?
  105. .run();
  106. println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080");
  107. // 等待HTTP服务器结束
  108. server.await?;
  109. Ok(())
  110. }
  111. // MQTT和串口处理函数
  112. async fn run_mqtt_and_serial() -> Result<()> {
  113. // 读取配置
  114. let config = read_config()?;
  115. // 检查串口是否被占用
  116. println!("正在检查串口 {} 是否可用...", config.serial_port);
  117. match serialport::new(&config.serial_port, config.baud_rate)
  118. .timeout(Duration::from_millis(10))
  119. .open() {
  120. Err(e) => {
  121. if e.kind() == std::io::ErrorKind::PermissionDenied {
  122. eprintln!("串口 {} 已被占用", config.serial_port);
  123. return Ok(());
  124. }
  125. },
  126. Ok(_) => {}
  127. }
  128. // 创建 MQTT 客户端
  129. let mut mqttopts = MqttOptions::new(
  130. &config.mqtt.client_id,
  131. &config.mqtt.host,
  132. config.mqtt.port
  133. );
  134. mqttopts.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive_secs));
  135. mqttopts.set_credentials(&config.mqtt.username, &config.mqtt.password);
  136. let (mut client, mut connection) = Client::new(mqttopts, 10);
  137. // 在单独的线程中处理 MQTT 连接
  138. thread::spawn(move || {
  139. for notification in connection.iter() {
  140. match notification {
  141. Ok(_) => {}
  142. Err(e) => {
  143. eprintln!("MQTT连接错误: {:?}", e);
  144. break;
  145. }
  146. }
  147. }
  148. });
  149. // 打开配置的串口
  150. println!("正在尝试打开串口 {}...", config.serial_port);
  151. let mut port = serialport::new(&config.serial_port, config.baud_rate)
  152. .timeout(Duration::from_millis(1000))
  153. .open()
  154. .with_context(|| format!("无法打开串口 {}", config.serial_port))?;
  155. println!("成功打开串口 {}!", config.serial_port);
  156. println!("正在读取数据... (按 Ctrl+C 退出)");
  157. // 增加缓冲区大小并使用String来累积数据
  158. let mut serial_buf: Vec<u8> = vec![0; 1024];
  159. let mut accumulated_data = String::new();
  160. loop {
  161. match port.read(serial_buf.as_mut_slice()) {
  162. Ok(t) => {
  163. if t > 0 {
  164. // 将新数据添加到累积的字符串中
  165. if let Ok(data) = String::from_utf8(serial_buf[..t].to_vec()) {
  166. accumulated_data.push_str(&data);
  167. // 检查是否有完整的数据行(以换行符或回车符结束)
  168. if accumulated_data.contains('\n') || accumulated_data.contains('\r') {
  169. // 处理累积的数据
  170. let lines: Vec<&str> = accumulated_data
  171. .split(|c| c == '\n' || c == '\r')
  172. .filter(|s| !s.is_empty())
  173. .collect();
  174. for line in lines {
  175. let trimmed_data = line.trim();
  176. if !trimmed_data.is_empty() {
  177. println!("收到完整数据: {}", trimmed_data);
  178. // 创建权重数据结构
  179. let weight_data = WeightData {
  180. weight: trimmed_data.to_string(),
  181. timestamp: std::time::SystemTime::now()
  182. .duration_since(std::time::UNIX_EPOCH)
  183. .unwrap()
  184. .as_secs(),
  185. };
  186. // 序列化数据
  187. let json_data = serde_json::to_string(&weight_data)?;
  188. println!("当前JSON数据: {}", json_data);
  189. // 获取MAC地址
  190. let mac_address = get_mac_address();
  191. println!("MAC地址: {}", mac_address);
  192. // 发布到 MQTT,主题中包含MAC地址
  193. let topic = format!("{}/{}", config.mqtt.topic_prefix, mac_address);
  194. if let Err(e) = client.publish(topic, QoS::AtLeastOnce, false, json_data) {
  195. eprintln!("MQTT发布错误: {:?}", e);
  196. } else {
  197. println!("成功发送数据到MQTT");
  198. }
  199. }
  200. }
  201. // 清空累积的数据
  202. accumulated_data.clear();
  203. }
  204. }
  205. }
  206. }
  207. Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
  208. // Timeout is not an error, just continue
  209. continue;
  210. }
  211. Err(e) => {
  212. eprintln!("发生错误: {}", e);
  213. break;
  214. }
  215. }
  216. }
  217. Ok(())
  218. }