diff --git a/tcp_client/config.toml b/tcp_client/config.toml index 6a22941..1a00a08 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -5,10 +5,12 @@ host = "10.180.4.88" port = 9090 [client] -max_retries = 2 -retry_delay_secs = 3 -read_timeout_secs = 5 -write_timeout_secs = 5 +# 定时任务配置 +interval_seconds = 60 # 每隔多少秒执行一次 +max_retries = 3 +read_timeout_secs = 10 +write_timeout_secs = 10 +retry_delay_secs = 2 [database] #host = "10.180.4.100" @@ -24,4 +26,3 @@ name = "hy_record" query = "SELECT * FROM hy_record " incremental = false # 是否增量同步 key_field = "UpdateTime" # 增量同步的时间字段 - diff --git a/tcp_client/src/main.rs b/tcp_client/src/main.rs index c24ccaf..dd852b8 100644 --- a/tcp_client/src/main.rs +++ b/tcp_client/src/main.rs @@ -2,8 +2,8 @@ mod db; use std::net::TcpStream; use std::io::{Read, Write}; -use std::thread; use std::time::Duration; +use std::thread; use serde::Deserialize; use config::Config; use db::{Database, DatabaseConfig, TableConfig}; @@ -20,6 +20,7 @@ struct ClientConfig { retry_delay_secs: u64, read_timeout_secs: u64, write_timeout_secs: u64, + interval_seconds: u64, // 定时任务间隔时间 } #[derive(Debug, Deserialize)] @@ -49,110 +50,117 @@ async fn main() { } }; - // 连接数据库 - let mut db = match Database::connect(&config.database).await { - Ok(db) => { - println!("数据库连接成功!"); - db - } - Err(e) => { - println!("数据库连接失败: {}", e); - return; + loop { + // 连接数据库 + let mut db = match Database::connect(&config.database).await { + Ok(db) => { + println!("数据库连接成功!"); + db + } + Err(e) => { + println!("数据库连接失败: {}", e); + thread::sleep(Duration::from_secs(config.client.interval_seconds)); + continue; + } + }; + + // 测试数据库连接 + match db.test_connection().await { + Ok(true) => println!("数据库连接测试成功"), + Ok(false) => println!("数据库连接测试失败"), + Err(e) => println!("数据库测试错误: {}", e), } - }; - // 测试数据库连接 - match db.test_connection().await { - Ok(true) => println!("数据库连接测试成功"), - Ok(false) => println!("数据库连接测试失败"), - Err(e) => println!("数据库测试错误: {}", e), - } + // 获取配置的表数据 + for table in &config.tables { + println!("\n开始获取表 {} 的数据...", table.name); + match db.get_table_data(table).await { + Ok(rows) => { + println!("成功获取 {} 条记录", rows.len()); + // 将每行数据转换为JSON并发送到服务器 + for row in rows { + let json_data = db::format_row_as_json(&row, &table.name); + let msg = serde_json::to_string(&json_data).unwrap(); - // 获取配置的表数据 - for table in &config.tables { - println!("\n开始获取表 {} 的数据...", table.name); - match db.get_table_data(table).await { - Ok(rows) => { - println!("成功获取 {} 条记录", rows.len()); - // 将每行数据转换为JSON并发送到服务器 - for row in rows { - let json_data = db::format_row_as_json(&row, &table.name); - let msg = serde_json::to_string(&json_data).unwrap(); - - // 创建TCP连接 - let server_addr = format!("{}:{}", config.server.host, config.server.port); - let mut retry_count = 0; - - while retry_count < config.client.max_retries { - println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); - - match TcpStream::connect(&server_addr) { - Ok(mut stream) => { - println!("成功连接到服务器!"); - - if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { - println!("设置读取超时失败: {}", e); - continue; - } - - if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { - println!("设置写入超时失败: {}", e); - continue; - } + // 创建TCP连接 + let server_addr = format!("{}:{}", config.server.host, config.server.port); + let mut retry_count = 0; - // 发送数据 - match stream.write_all(msg.as_bytes()) { - Ok(_) => println!("成功发送消息: {}", msg), - Err(e) => { - println!("发送消息失败: {}", e); - retry_count += 1; + while retry_count < config.client.max_retries { + println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); + + match TcpStream::connect(&server_addr) { + Ok(mut stream) => { + println!("成功连接到服务器!"); + + if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { + println!("设置读取超时失败: {}", e); + continue; + } + + if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { + println!("设置写入超时失败: {}", e); continue; } - } - // 读取响应 - let mut buffer = [0; 1]; - match stream.read_exact(&mut buffer) { - Ok(_) => { - let bit = buffer[0]; - // println!("接收到比特值: {}", bit); - if(bit==255){ - println!("接收成功"); - break; - } - else if(bit==0){ - println!("接收失败"); + // 发送数据 + match stream.write_all(msg.as_bytes()) { + Ok(_) => println!("成功发送消息: {}", msg), + Err(e) => { + println!("发送消息失败: {}", e); retry_count += 1; continue; } - else{ - println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit); + } + + // 读取响应 + let mut buffer = [0; 1]; + match stream.read_exact(&mut buffer) { + Ok(_) => { + let bit = buffer[0]; + // println!("接收到比特值: {}", bit); + if(bit==255){ + println!("接收成功"); + break; + } + else if(bit==0){ + println!("接收失败"); + retry_count += 1; + continue; + } + else{ + println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit); + retry_count += 1; + } + } + Err(e) => { + println!("接收数据失败: {}", e); retry_count += 1; } } - Err(e) => { - println!("接收数据失败: {}", e); - retry_count += 1; - } } - } - Err(e) => { - println!("连接服务器失败: {}", e); - retry_count += 1; - if retry_count < config.client.max_retries { - println!("等待{}秒后重试...", config.client.retry_delay_secs); - thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + Err(e) => { + println!("连接服务器失败: {}", e); + retry_count += 1; + if retry_count < config.client.max_retries { + println!("等待{}秒后重试...", config.client.retry_delay_secs); + thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + } } } } - } - if retry_count >= config.client.max_retries { - println!("达到最大重试次数,跳过当前数据"); + if retry_count >= config.client.max_retries { + println!("达到最大重试次数,跳过当前数据"); + } } } + Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), } - Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), } + + // 等待下一次执行 + println!("等待 {} 秒后执行下一次同步...", config.client.interval_seconds); + thread::sleep(Duration::from_secs(config.client.interval_seconds)); } }