数据同步
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
4.7KB

  1. use tokio::net::TcpListener;
  2. use tokio::io::{AsyncReadExt, AsyncWriteExt};
  3. use config::Config;
  4. use tokio_postgres::{NoTls, Error as PgError};
  5. use serde::{Deserialize, Serialize};
  6. use std::sync::Arc;
  7. #[derive(Debug, Serialize, Deserialize)]
  8. struct InstrumentInfo {
  9. id: i32,
  10. informationid: i32,
  11. instrumentcode: String,
  12. laboratoryid: i32,
  13. name: String,
  14. remark: String,
  15. specification: String,
  16. }
  17. async fn connect_db(config: &Config) -> Result<tokio_postgres::Client, PgError> {
  18. let host = config.get_string("database.host").unwrap();
  19. let port = config.get_int("database.port").unwrap() as u16;
  20. let dbname = config.get_string("database.name").unwrap();
  21. let user = config.get_string("database.user").unwrap();
  22. let password = config.get_string("database.password").unwrap();
  23. let connection_string = format!(
  24. "host={} port={} dbname={} user={} password={}",
  25. host, port, dbname, user, password
  26. );
  27. let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;
  28. // 在后台运行连接
  29. tokio::spawn(async move {
  30. if let Err(e) = connection.await {
  31. eprintln!("数据库连接错误: {}", e);
  32. }
  33. });
  34. Ok(client)
  35. }
  36. async fn insert_instrument(client: &tokio_postgres::Client, info: &InstrumentInfo) -> Result<(), PgError> {
  37. // 先检查 ID 是否存在
  38. let exists = client
  39. .query_one(
  40. "SELECT EXISTS(SELECT 1 FROM public.hy_instrument WHERE id = $1)",
  41. &[&info.id],
  42. )
  43. .await?
  44. .get::<_, bool>(0);
  45. if exists {
  46. println!("ID {} 已存在,跳过插入", info.id);
  47. return Ok(());
  48. }
  49. // ID 不存在,执行插入
  50. client.execute(
  51. "INSERT INTO public.hy_instrument (id, informationid, instrumentcode, laboratoryid, name, remark, specification)
  52. OVERRIDING SYSTEM VALUE
  53. VALUES ($1, $2, $3, $4, $5, $6, $7)",
  54. &[
  55. &info.id,
  56. &info.informationid,
  57. &info.instrumentcode,
  58. &info.laboratoryid,
  59. &info.name,
  60. &info.remark,
  61. &info.specification,
  62. ],
  63. )
  64. .await?;
  65. println!("成功插入仪器信息: {} (ID: {})", info.instrumentcode, info.id);
  66. Ok(())
  67. }
  68. fn check_ok_message(message: &[u8]) -> u8 {
  69. 0xFF
  70. }
  71. #[tokio::main]
  72. async fn main() -> Result<(), Box<dyn std::error::Error>> {
  73. // 读取配置文件
  74. let settings = Config::builder()
  75. .add_source(config::File::with_name("config"))
  76. .build()?;
  77. // 连接数据库
  78. let client = connect_db(&settings).await?;
  79. let client = Arc::new(client);
  80. println!("数据库连接成功");
  81. let address = settings.get_string("server.address")?;
  82. let port = settings.get_int("server.port")? as u16;
  83. let bind_address = format!("{}:{}", address, port);
  84. let listener = TcpListener::bind(&bind_address).await?;
  85. println!("服务器监听地址: {}", bind_address);
  86. loop {
  87. let (mut socket, addr) = listener.accept().await?;
  88. println!("新客户端连接: {}", addr);
  89. let client = Arc::clone(&client);
  90. tokio::spawn(async move {
  91. let mut buf = [0; 1024 * 64]; // 增加缓冲区大小到64KB
  92. loop {
  93. match socket.read(&mut buf).await {
  94. Ok(0) => {
  95. println!("客户端断开连接: {}", addr);
  96. return;
  97. }
  98. Ok(n) => {
  99. let data = &buf[..n];
  100. match serde_json::from_slice::<InstrumentInfo>(data) {
  101. Ok(info) => {
  102. println!("接收到仪器信息: {:?}", info);
  103. if let Err(e) = insert_instrument(&client, &info).await {
  104. eprintln!("插入数据失败: {}", e);
  105. }
  106. }
  107. Err(e) => {
  108. eprintln!("解析JSON失败: {}", e);
  109. eprintln!("接收到的数据: {}", String::from_utf8_lossy(data));
  110. }
  111. }
  112. let response = check_ok_message(data);
  113. if let Err(e) = socket.write_all(&[response]).await {
  114. println!("发送响应失败: {}", e);
  115. return;
  116. }
  117. }
  118. Err(e) => {
  119. println!("读取数据失败: {}", e);
  120. return;
  121. }
  122. }
  123. }
  124. });
  125. }
  126. }