rust称重到mqtt
No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

202 líneas
6.5KB

  1. use anyhow::{Context, Result};
  2. use rumqttc::{Client, MqttOptions, QoS};
  3. use serde::{Serialize, Deserialize};
  4. use serde_json;
  5. use std::io::{self, Read};
  6. use std::time::Duration;
  7. use std::thread;
  8. use serialport;
  9. use std::process::Command;
  10. use actix_web::{web, App, HttpServer, HttpResponse};
  11. use actix_cors::Cors;
  12. #[derive(Serialize, Deserialize)]
  13. struct WeightData {
  14. weight: String,
  15. timestamp: u64,
  16. }
  17. #[derive(Serialize)]
  18. struct MacResponse {
  19. mac_address: String,
  20. }
  21. fn get_mac_address() -> String {
  22. let output = Command::new("getmac")
  23. .arg("/fo")
  24. .arg("csv")
  25. .arg("/nh")
  26. .output()
  27. .expect("Failed to execute getmac command");
  28. let output_str = String::from_utf8_lossy(&output.stdout);
  29. let first_line = output_str.lines().next().unwrap_or("");
  30. let mac = first_line.split(',').next().unwrap_or("").trim_matches('"');
  31. mac.replace('-', ":").to_lowercase()
  32. }
  33. // HTTP处理函数
  34. async fn get_mac() -> HttpResponse {
  35. let mac = get_mac_address();
  36. println!("HTTP请求:获取MAC地址 = {}", mac); // 添加日志
  37. let response = MacResponse {
  38. mac_address: mac,
  39. };
  40. HttpResponse::Ok().json(response)
  41. }
  42. #[actix_web::main]
  43. async fn main() -> Result<()> {
  44. println!("程序启动...");
  45. // 启动HTTP服务器
  46. println!("正在启动HTTP服务器...");
  47. // 创建HTTP服务器
  48. let server = HttpServer::new(|| {
  49. // 配置CORS
  50. let cors = Cors::default()
  51. .allow_any_origin() // 允许所有来源
  52. .allow_any_method() // 允许所有HTTP方法
  53. .allow_any_header() // 允许所有请求头
  54. .max_age(3600); // 设置预检请求的缓存时间(秒)
  55. App::new()
  56. .wrap(cors) // 添加CORS中间件
  57. .route("/mac", web::get().to(get_mac))
  58. })
  59. .bind(("0.0.0.0", 8080))?
  60. .run();
  61. println!("HTTP服务器已启动,监听在 http://127.0.0.1:8080");
  62. // 在新线程中运行MQTT和串口服务
  63. let mqtt_handle = tokio::spawn(async {
  64. if let Err(e) = run_mqtt_and_serial().await {
  65. eprintln!("MQTT/串口服务错误: {}", e);
  66. }
  67. });
  68. // 等待HTTP服务器结束
  69. server.await?;
  70. Ok(())
  71. }
  72. // MQTT和串口处理函数
  73. async fn run_mqtt_and_serial() -> Result<()> {
  74. // 创建 MQTT 客户端
  75. let mut mqttopts = MqttOptions::new("weight_reader", "112.33.111.160", 1883);
  76. mqttopts.set_keep_alive(Duration::from_secs(5));
  77. mqttopts.set_credentials("auseft", "1q2w3E**");
  78. let (mut client, mut connection) = Client::new(mqttopts, 10);
  79. // 在单独的线程中处理 MQTT 连接
  80. thread::spawn(move || {
  81. for notification in connection.iter() {
  82. match notification {
  83. Ok(_) => {
  84. // if(!notification.contains("PingRe")) {
  85. // println!("MQTT事件: {:?}", notification);
  86. // }
  87. }
  88. Err(e) => {
  89. eprintln!("MQTT连接错误: {:?}", e);
  90. break;
  91. }
  92. }
  93. }
  94. });
  95. // List available ports
  96. let ports = serialport::available_ports().context("未找到串口设备!")?;
  97. if ports.is_empty() {
  98. println!("错误:没有找到任何串口设备");
  99. return Ok(());
  100. }
  101. println!("可用的串口设备:");
  102. for (i, port) in ports.iter().enumerate() {
  103. println!("{}: {}", i, port.port_name);
  104. }
  105. // Read port selection from user
  106. let port_idx = loop {
  107. println!("请输入要使用的串口编号 (0-{}):", ports.len() - 1);
  108. let mut input = String::new();
  109. io::stdin().read_line(&mut input)?;
  110. match input.trim().parse::<usize>() {
  111. Ok(idx) if idx < ports.len() => break idx,
  112. Ok(_) => println!("错误:输入的编号超出范围,请重新输入"),
  113. Err(_) => println!("错误:请输入有效的数字"),
  114. }
  115. };
  116. let port_name = &ports[port_idx].port_name;
  117. println!("正在尝试打开串口 {}...", port_name);
  118. // Configure and open the serial port
  119. let mut port = serialport::new(port_name, 9600)
  120. .timeout(Duration::from_millis(1000))
  121. .open()
  122. .with_context(|| format!("无法打开串口 {}", port_name))?;
  123. println!("成功打开串口 {}!", port_name);
  124. println!("正在读取数据... (按 Ctrl+C 退出)");
  125. // Read data continuously
  126. let mut serial_buf: Vec<u8> = vec![0; 1000];
  127. loop {
  128. match port.read(serial_buf.as_mut_slice()) {
  129. Ok(t) => {
  130. let data = String::from_utf8_lossy(&serial_buf[..t]);
  131. let trimmed_data = data.trim();
  132. if !trimmed_data.is_empty() {
  133. println!("收到数据: {}", trimmed_data);
  134. // 创建权重数据结构
  135. let weight_data = WeightData {
  136. weight: trimmed_data.to_string(),
  137. timestamp: std::time::SystemTime::now()
  138. .duration_since(std::time::UNIX_EPOCH)
  139. .unwrap()
  140. .as_secs(),
  141. };
  142. // 开始循环发送数据
  143. loop {
  144. // 序列化数据
  145. let json_data = serde_json::to_string(&weight_data)?;
  146. println!("当前JSON数据: {}", json_data);
  147. // 获取MAC地址
  148. let mac_address = get_mac_address();
  149. println!("MAC地址: {}", mac_address);
  150. // 发布到 MQTT,主题中包含MAC地址
  151. if let Err(e) = client.publish(format!("weight/data/{}", mac_address), QoS::AtLeastOnce, false, json_data) {
  152. eprintln!("MQTT发布错误: {:?}", e);
  153. } else {
  154. println!("成功发送数据到MQTT");
  155. }
  156. // 等待3秒
  157. thread::sleep(Duration::from_secs(3));
  158. }
  159. }
  160. }
  161. Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
  162. // Timeout is not an error, just continue
  163. continue;
  164. }
  165. Err(e) => {
  166. eprintln!("发生错误: {}", e);
  167. break;
  168. }
  169. }
  170. }
  171. Ok(())
  172. }