diff --git a/tcp_client/config.toml b/tcp_client/config.toml index d1168e0..361cf28 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -1,6 +1,6 @@ [server] -#host = "10.180.4.88" -host = "127.0.0.1" +host = "10.180.4.88" +#host = "127.0.0.1" port = 9090 [client] @@ -10,18 +10,109 @@ read_timeout_secs = 5 write_timeout_secs = 5 [database] -host = "192.168.0.100" +#host = "192.168.0.100" +host = "10.180.4.100" port = 5432 name = "Auseft_RL_Web" user = "postgres" password = "Auseft@2025qwer" +[scheduler] +interval_hours = 1 # 定时任务执行间隔(小时) + # 要同步的表配置 +[[tables]] +name = "hy_record" +query = "SELECT * FROM \"hy_record\" " +incremental = false # 是否增量同步 +key_field = "UpdateTime" # 增量同步的时间字段 +[[tables]] +name = "hy_itemdetail" +query = "SELECT * FROM \"hy_itemdetail\"" +incremental = false +key_field = "UpdateTime" +[[tables]] +name = "hy_norm" +query = "SELECT * FROM \"hy_norm\"" +incremental = false +key_field = "UpdateTime" [[tables]] name = "hy_instrument" query = "SELECT * FROM \"hy_instrument\"" incremental = false key_field = "UpdateTime" + +[[tables]] +name = "hy_information" +query = "SELECT * FROM \"hy_information\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_allot" +query = "SELECT * FROM \"hy_allot\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_cytask" +query = "SELECT * FROM \"hy_cytask\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_fullwatersample" +query = "SELECT * FROM \"hy_fullwatersample\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_informationnorm" +query = "SELECT * FROM \"hy_informationnorm\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_laboratoryinstrument" +query = "SELECT * FROM \"hy_laboratoryinstrument\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_materialanalysis_type" +query = "SELECT * FROM \"hy_materialanalysis_type\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_materialdetail" +query = "SELECT * FROM \"hy_materialdetail\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_weight_input" +query = "SELECT * FROM \"hy_weight_input\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_warmhumid" +query = "SELECT * FROM \"hy_warmhumid\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_task" +query = "SELECT * FROM \"hy_task\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_spotcheck" +query = "SELECT * FROM \"hy_spotcheck\"" +incremental = false +key_field = "UpdateTime" \ No newline at end of file diff --git a/tcp_client/src/main.rs b/tcp_client/src/main.rs index 0d09bd5..24dab6d 100644 --- a/tcp_client/src/main.rs +++ b/tcp_client/src/main.rs @@ -22,11 +22,17 @@ struct ClientConfig { write_timeout_secs: u64, } +#[derive(Debug, Deserialize)] +struct SchedulerConfig { + interval_hours: u64, +} + #[derive(Debug, Deserialize)] struct Settings { server: ServerConfig, client: ClientConfig, database: DatabaseConfig, + scheduler: SchedulerConfig, tables: Vec, } @@ -38,50 +44,7 @@ fn load_config() -> Result { settings.try_deserialize() } -#[tokio::main] -async fn main() { - // 加载配置 - let config = match load_config() { - Ok(cfg) => cfg, - Err(e) => { - println!("加载配置文件失败: {}", e); - return; - } - }; - - println!("配置信息:"); - println!("数据库主机: {}", config.database.host); - println!("数据库端口: {}", config.database.port); - println!("数据库名称: {}", config.database.name); - println!("数据库用户: {}", config.database.user); - println!("要同步的表数量: {}", config.tables.len()); - - // 建立数据库连接 - let mut db = match Database::connect(&config.database).await { - Ok(db) => { - println!("数据库连接成功!"); - db - } - Err(e) => { - println!("数据库连接失败: {}", e); - println!("请检查数据库配置和网络连接:"); - println!("1. 确认数据库服务器是否运行"); - println!("2. 确认数据库服务器的IP地址是否正确"); - println!("3. 确认数据库服务器的端口是否正确"); - println!("4. 确认数据库用户和密码是否正确"); - println!("5. 确认网络连接是否正常"); - return; - } - }; - - // 测试数据库连接 - match db.test_connection().await { - Ok(true) => println!("数据库连接测试成功"), - Ok(false) => println!("数据库连接测试失败"), - Err(e) => println!("数据库测试错误: {}", e), - } - - // 获取配置的表数据 +async fn sync_data(db: &mut Database, config: &Settings) { for table in &config.tables { println!("\n开始获取表 {} 的数据...", table.name); match db.get_table_data(table).await { @@ -101,88 +64,109 @@ async fn main() { } }; - // 创建TCP连接 - let server_addr = format!("{}:{}", config.server.host, config.server.port); + // 尝试发送数据到服务器 let mut retry_count = 0; - let mut success = false; - + let server_addr = format!("{}:{}", config.server.host, config.server.port); + while retry_count < config.client.max_retries { - if retry_count > 0 { - println!("正在重试第 {} 条记录 (第{}次)...", index + 1, retry_count + 1); - thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); - } - match TcpStream::connect(&server_addr) { Ok(mut stream) => { - // 设置超时 - if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { - println!("设置读取超时失败: {}", e); - retry_count += 1; - continue; - } - - if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { - println!("设置写入超时失败: {}", e); - retry_count += 1; - continue; - } + // 设置读写超时 + stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap_or_default(); + stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap_or_default(); // 发送数据 - match stream.write_all(msg.as_bytes()) { - Ok(_) => println!("成功发送消息: {}", msg), - Err(e) => { - println!("发送数据失败 (第{}条记录): {}", index + 1, e); - retry_count += 1; - continue; - } + if let Err(e) = stream.write_all(msg.as_bytes()) { + println!("发送数据失败 (第{}条记录): {}", index + 1, e); + retry_count += 1; + thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + continue; } - // 读取响应 - let mut buffer = [0; 1]; - match stream.read_exact(&mut buffer) { + // 等待服务器响应 + let mut response = [0; 1024]; + match stream.read(&mut response) { Ok(_) => { - match buffer[0] { - 255 => { - success = true; - break; - } - 0 => { - println!("服务器拒绝接收数据 (第{}条记录)", index + 1); - retry_count += 1; - } - _ => { - println!("收到非预期响应: {} (第{}条记录)", buffer[0], index + 1); - retry_count += 1; - } - } + success_count += 1; + break; } Err(e) => { println!("读取服务器响应失败 (第{}条记录): {}", index + 1, e); retry_count += 1; + thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + continue; } } } Err(e) => { println!("连接服务器失败 (第{}条记录): {}", index + 1, e); retry_count += 1; + thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + continue; } } } - if success { - success_count += 1; - } else { - println!("达到最大重试次数,跳过第{}条记录", index + 1); + if retry_count >= config.client.max_retries { + println!("发送数据失败,已达到最大重试次数 (第{}条记录)", index + 1); fail_count += 1; } } - println!("\n表 {} 同步完成:", table.name); - println!("总记录数: {}", rows.len()); - println!("成功: {} 条", success_count); - println!("失败: {} 条", fail_count); + println!("表 {} 同步完成", table.name); + println!("成功: {} 条记录", success_count); + println!("失败: {} 条记录", fail_count); } Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), } } } + +#[tokio::main] +async fn main() { + // 加载配置 + let config = match load_config() { + Ok(cfg) => cfg, + Err(e) => { + println!("加载配置文件失败: {}", e); + return; + } + }; + + println!("配置信息:"); + println!("数据库主机: {}", config.database.host); + println!("数据库端口: {}", config.database.port); + println!("数据库名称: {}", config.database.name); + println!("数据库用户: {}", config.database.user); + println!("要同步的表数量: {}", config.tables.len()); + println!("定时任务间隔: {}小时", config.scheduler.interval_hours); + + loop { + // 建立数据库连接 + let mut db = match Database::connect(&config.database).await { + Ok(db) => { + println!("数据库连接成功!"); + db + } + Err(e) => { + println!("数据库连接失败: {}", e); + println!("请检查数据库配置和网络连接:"); + println!("1. 确认数据库服务器是否运行"); + println!("2. 确认数据库服务器的IP地址是否正确"); + println!("3. 确认数据库服务器的端口是否正确"); + println!("4. 确认数据库用户和密码是否正确"); + println!("5. 确认网络连接是否正常"); + // 等待一段时间后重试 + tokio::time::sleep(Duration::from_secs(60)).await; + continue; + } + }; + + // 执行同步 + sync_data(&mut db, &config).await; + + // 等待下一次执行 + println!("\n等待 {} 小时后执行下一次同步...", config.scheduler.interval_hours); + tokio::time::sleep(Duration::from_secs(config.scheduler.interval_hours * 3600)).await; + } +} diff --git a/tcp_server/target/debug/tcp_server.exe b/tcp_server/target/debug/tcp_server.exe index b4be464..3eaccf2 100644 Binary files a/tcp_server/target/debug/tcp_server.exe and b/tcp_server/target/debug/tcp_server.exe differ diff --git a/tcp_server/target/debug/tcp_server.pdb b/tcp_server/target/debug/tcp_server.pdb index b67b66a..1feab59 100644 Binary files a/tcp_server/target/debug/tcp_server.pdb and b/tcp_server/target/debug/tcp_server.pdb differ