数据同步
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

173 Zeilen
6.5KB

  1. mod db;
  2. use std::net::TcpStream;
  3. use std::io::{Read, Write};
  4. use std::thread;
  5. use std::time::Duration;
  6. use serde::Deserialize;
  7. use config::Config;
  8. use db::{Database, DatabaseConfig, TableConfig};
  9. #[derive(Debug, Deserialize)]
  10. struct ServerConfig {
  11. host: String,
  12. port: u16,
  13. }
  14. #[derive(Debug, Deserialize)]
  15. struct ClientConfig {
  16. max_retries: u32,
  17. retry_delay_secs: u64,
  18. read_timeout_secs: u64,
  19. write_timeout_secs: u64,
  20. }
  21. #[derive(Debug, Deserialize)]
  22. struct SchedulerConfig {
  23. interval_hours: u64,
  24. }
  25. #[derive(Debug, Deserialize)]
  26. struct Settings {
  27. server: ServerConfig,
  28. client: ClientConfig,
  29. database: DatabaseConfig,
  30. scheduler: SchedulerConfig,
  31. tables: Vec<TableConfig>,
  32. }
  33. fn load_config() -> Result<Settings, config::ConfigError> {
  34. let settings = Config::builder()
  35. .add_source(config::File::with_name("config"))
  36. .build()?;
  37. settings.try_deserialize()
  38. }
  39. async fn sync_data(db: &mut Database, config: &Settings) {
  40. for table in &config.tables {
  41. println!("\n开始获取表 {} 的数据...", table.name);
  42. match db.get_table_data(table).await {
  43. Ok(rows) => {
  44. println!("从数据库获取到 {} 条记录", rows.len());
  45. let mut success_count = 0;
  46. let mut fail_count = 0;
  47. // 将每行数据发送到服务器
  48. for (index, row) in rows.iter().enumerate() {
  49. let msg = match serde_json::to_string(&row) {
  50. Ok(msg) => msg,
  51. Err(e) => {
  52. println!("序列化数据失败 (第{}条记录): {}", index + 1, e);
  53. fail_count += 1;
  54. continue;
  55. }
  56. };
  57. // 尝试发送数据到服务器
  58. let mut retry_count = 0;
  59. let server_addr = format!("{}:{}", config.server.host, config.server.port);
  60. while retry_count < config.client.max_retries {
  61. match TcpStream::connect(&server_addr) {
  62. Ok(mut stream) => {
  63. // 设置读写超时
  64. stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap_or_default();
  65. stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap_or_default();
  66. // 发送数据
  67. if let Err(e) = stream.write_all(msg.as_bytes()) {
  68. println!("发送数据失败 (第{}条记录): {}", index + 1, e);
  69. retry_count += 1;
  70. thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
  71. continue;
  72. }
  73. // 等待服务器响应
  74. let mut response = [0; 1024];
  75. match stream.read(&mut response) {
  76. Ok(_) => {
  77. success_count += 1;
  78. break;
  79. }
  80. Err(e) => {
  81. println!("读取服务器响应失败 (第{}条记录): {}", index + 1, e);
  82. retry_count += 1;
  83. thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
  84. continue;
  85. }
  86. }
  87. }
  88. Err(e) => {
  89. println!("连接服务器失败 (第{}条记录): {}", index + 1, e);
  90. retry_count += 1;
  91. thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
  92. continue;
  93. }
  94. }
  95. }
  96. if retry_count >= config.client.max_retries {
  97. println!("发送数据失败,已达到最大重试次数 (第{}条记录)", index + 1);
  98. fail_count += 1;
  99. }
  100. }
  101. println!("表 {} 同步完成", table.name);
  102. println!("成功: {} 条记录", success_count);
  103. println!("失败: {} 条记录", fail_count);
  104. }
  105. Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
  106. }
  107. }
  108. }
  109. #[tokio::main]
  110. async fn main() {
  111. // 加载配置
  112. let config = match load_config() {
  113. Ok(cfg) => cfg,
  114. Err(e) => {
  115. println!("加载配置文件失败: {}", e);
  116. return;
  117. }
  118. };
  119. println!("配置信息:");
  120. println!("数据库主机: {}", config.database.host);
  121. println!("数据库端口: {}", config.database.port);
  122. println!("数据库名称: {}", config.database.name);
  123. println!("数据库用户: {}", config.database.user);
  124. println!("要同步的表数量: {}", config.tables.len());
  125. println!("定时任务间隔: {}小时", config.scheduler.interval_hours);
  126. loop {
  127. // 建立数据库连接
  128. let mut db = match Database::connect(&config.database).await {
  129. Ok(db) => {
  130. println!("数据库连接成功!");
  131. db
  132. }
  133. Err(e) => {
  134. println!("数据库连接失败: {}", e);
  135. println!("请检查数据库配置和网络连接:");
  136. println!("1. 确认数据库服务器是否运行");
  137. println!("2. 确认数据库服务器的IP地址是否正确");
  138. println!("3. 确认数据库服务器的端口是否正确");
  139. println!("4. 确认数据库用户和密码是否正确");
  140. println!("5. 确认网络连接是否正常");
  141. // 等待一段时间后重试
  142. tokio::time::sleep(Duration::from_secs(60)).await;
  143. continue;
  144. }
  145. };
  146. // 执行同步
  147. sync_data(&mut db, &config).await;
  148. // 等待下一次执行
  149. println!("\n等待 {} 小时后执行下一次同步...", config.scheduler.interval_hours);
  150. tokio::time::sleep(Duration::from_secs(config.scheduler.interval_hours * 3600)).await;
  151. }
  152. }