From df3af0916e90508bd354348f3417c2dbbd06fcec Mon Sep 17 00:00:00 2001 From: OCEAN <1010331798@qq.com> Date: Mon, 28 Apr 2025 16:37:00 +0800 Subject: [PATCH] =?UTF-8?q?=E9=BA=92=E9=BA=9F=E6=97=A0=E6=B3=95=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E5=90=8E=E7=9A=84=E9=87=8D=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tcp_client/Cargo.lock | 125 +++++++++---------------- tcp_client/Cargo.toml | 6 +- tcp_client/config.toml | 56 +++++------ tcp_client/src/db.rs | 191 ++++++++++++++++++++++++-------------- tcp_client/src/main.rs | 206 ++++++++++++++++++++--------------------- 5 files changed, 295 insertions(+), 289 deletions(-) diff --git a/tcp_client/Cargo.lock b/tcp_client/Cargo.lock index 3196bf4..9ea5ea7 100644 --- a/tcp_client/Cargo.lock +++ b/tcp_client/Cargo.lock @@ -87,6 +87,19 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bigdecimal" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -200,56 +213,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "diesel" -version = "2.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff236accb9a5069572099f0b350a92e9560e8e63a9b8d546162f4a5e03026bb2" -dependencies = [ - "bitflags 2.9.0", - "byteorder", - "chrono", - "diesel_derives", - "itoa", - "pq-sys", - "serde_json", -] - -[[package]] -name = "diesel-async" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be" -dependencies = [ - "async-trait", - "diesel", - "futures-util", - "scoped-futures", - "tokio", - "tokio-postgres", -] - -[[package]] -name = "diesel_derives" -version = "2.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14701062d6bed917b5c7103bdffaee1e4609279e240488ad24e7bd979ca6866c" -dependencies = [ - "diesel_table_macro_syntax", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "diesel_table_macro_syntax" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" -dependencies = [ - "syn", -] - [[package]] name = "digest" version = "0.10.7" @@ -447,6 +410,12 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "libm" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -521,6 +490,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -690,6 +678,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] @@ -703,15 +692,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "pq-sys" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" -dependencies = [ - "vcpkg", -] - [[package]] name = "proc-macro2" version = "1.0.95" @@ -813,15 +793,6 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" -[[package]] -name = "scoped-futures" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" -dependencies = [ - "pin-project-lite", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -949,14 +920,14 @@ dependencies = [ name = "tcp_client" version = "0.1.0" dependencies = [ + "bigdecimal", "chrono", "config", - "diesel", - "diesel-async", + "postgres-types", "serde", "serde_json", "tokio", - "urlencoding", + "tokio-postgres", ] [[package]] @@ -1110,18 +1081,6 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.5" diff --git a/tcp_client/Cargo.toml b/tcp_client/Cargo.toml index 078ee29..d3e7e50 100644 --- a/tcp_client/Cargo.toml +++ b/tcp_client/Cargo.toml @@ -5,10 +5,10 @@ edition = "2021" [dependencies] tokio = { version = "1.28", features = ["full"] } -diesel = { version = "2.1.0", features = ["postgres", "chrono", "serde_json"] } -diesel-async = { version = "0.4.1", features = ["postgres"] } +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } config = "0.13" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } -urlencoding = "2.1" \ No newline at end of file +postgres-types = "0.2" +bigdecimal = "0.4" diff --git a/tcp_client/config.toml b/tcp_client/config.toml index 361cf28..45d1fde 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -1,44 +1,47 @@ [server] -host = "10.180.4.88" -#host = "127.0.0.1" +#host = "10.180.4.88" +host = "127.0.0.1" + port = 9090 [client] +# 定时任务配置 +interval_seconds = 3600 # 每隔多少秒执行一次 max_retries = 3 -retry_delay_secs =2 -read_timeout_secs = 5 -write_timeout_secs = 5 +read_timeout_secs = 10 +write_timeout_secs = 10 +retry_delay_secs = 2 [database] -#host = "192.168.0.100" host = "10.180.4.100" +#host = "192.168.0.100" port = 5432 name = "Auseft_RL_Web" user = "postgres" password = "Auseft@2025qwer" -[scheduler] -interval_hours = 1 # 定时任务执行间隔(小时) - # 要同步的表配置 + [[tables]] name = "hy_record" -query = "SELECT * FROM \"hy_record\" " -incremental = false # 是否增量同步 -key_field = "UpdateTime" # 增量同步的时间字段 +query = "SELECT id, hy_code, type, hy_check, hy_approve, check_time, approve_time, approve_user, check_user, hy_time, hy_values, accept_time, accept_user, mt::text as mt, mad::text as mad, aad::text as aad, ad::text as ad, vad::text as vad, vd::text as vd, var::text as var, vdaf::text as vdaf, fcad::text as fcad, st_ar::text as st_ar, st_ad::text as st_ad, st_d::text as st_d, had::text as had, hd::text as hd, qb_ad::text as qb_ad, qgr_ad::text as qgr_ad, qgr_d::text as qgr_d, qnet_ar_mj_kg::text as qnet_ar_mj_kg, qnet_ar_j_cal::text as qnet_ar_j_cal, v::text as v, aar::text as aar, qnet_ar::text as qnet_ar, qnet_ar1::text as qnet_ar1, crc::text as crc, st_daf::text as st_daf, cad ::text as cad, cd::text as cd, isauto, hy_type, isnormal FROM hy_record where where hy_check=0 and accept_time >= NOW() - INTERVAL '10 days' " +incremental = false +key_field = "UpdateTime" + [[tables]] name = "hy_itemdetail" -query = "SELECT * FROM \"hy_itemdetail\"" +query = "SELECT * FROM hy_itemdetail where record_id in (select id from hy_record where hy_check=0 and accept_time >= NOW() - INTERVAL '10 days') " incremental = false key_field = "UpdateTime" [[tables]] name = "hy_norm" -query = "SELECT * FROM \"hy_norm\"" +query = " select id,norm_id,zbvalues::text as zbvalues,itemdetail_id,hy_user,checktime,\"explain\" from hy_norm where itemdetail_id in (SELECT t1.id FROM hy_itemdetail t1 inner join hy_record t2 on t1.record_id=t2.id where t2.hy_check=0 and t2.accept_time >= NOW() - INTERVAL '10 days')" incremental = false key_field = "UpdateTime" + [[tables]] name = "hy_instrument" query = "SELECT * FROM \"hy_instrument\"" @@ -51,6 +54,8 @@ query = "SELECT * FROM \"hy_information\"" incremental = false key_field = "UpdateTime" + + [[tables]] name = "hy_allot" query = "SELECT * FROM \"hy_allot\"" @@ -65,7 +70,7 @@ key_field = "UpdateTime" [[tables]] name = "hy_fullwatersample" -query = "SELECT * FROM \"hy_fullwatersample\"" +query = "select id,qs_code,qs_tonnage::text as qs_tonnage,mt::text as mt,remark,onecode,towcode,fx_code,fx_onecode,fx_twocode from hy_fullwatersample" incremental = false key_field = "UpdateTime" @@ -75,6 +80,10 @@ query = "SELECT * FROM \"hy_informationnorm\"" incremental = false key_field = "UpdateTime" + + + + [[tables]] name = "hy_laboratoryinstrument" query = "SELECT * FROM \"hy_laboratoryinstrument\"" @@ -93,26 +102,17 @@ query = "SELECT * FROM \"hy_materialdetail\"" incremental = false key_field = "UpdateTime" + + [[tables]] name = "hy_weight_input" query = "SELECT * FROM \"hy_weight_input\"" incremental = false key_field = "UpdateTime" -[[tables]] -name = "hy_warmhumid" -query = "SELECT * FROM \"hy_warmhumid\"" -incremental = false -key_field = "UpdateTime" [[tables]] -name = "hy_task" -query = "SELECT * FROM \"hy_task\"" +name = "hy_warmhumid" +query = "select id,laboratoryid,temperature::text as temperature,humidity::text humidity,begintime,endtime, username from hy_warmhumid" incremental = false key_field = "UpdateTime" - -[[tables]] -name = "hy_spotcheck" -query = "SELECT * FROM \"hy_spotcheck\"" -incremental = false -key_field = "UpdateTime" \ No newline at end of file diff --git a/tcp_client/src/db.rs b/tcp_client/src/db.rs index 68deeaf..4aa1822 100644 --- a/tcp_client/src/db.rs +++ b/tcp_client/src/db.rs @@ -1,15 +1,8 @@ -use diesel::prelude::*; -use diesel_async::{AsyncPgConnection, RunQueryDsl, AsyncConnection}; -use diesel::sql_types::*; - -#[derive(QueryableByName)] -struct TestResult { - #[diesel(sql_type = Integer)] - pub result: i32, -} -use serde::{Deserialize, Serialize}; +use tokio_postgres::{NoTls, Error, Row}; +use serde::Deserialize; use std::collections::HashMap; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc, NaiveDateTime}; +use bigdecimal::BigDecimal; #[derive(Debug, Deserialize, Clone)] pub struct DatabaseConfig { @@ -20,19 +13,6 @@ pub struct DatabaseConfig { pub password: String, } -impl DatabaseConfig { - pub fn connection_string(&self) -> String { - format!( - "postgres://{}:{}@{}:{}/{}", - encode(&self.user), - encode(&self.password), - self.host, - self.port, - self.name - ) - } -} - #[derive(Debug, Deserialize, Clone)] pub struct TableConfig { pub name: String, @@ -41,76 +21,149 @@ pub struct TableConfig { pub key_field: String, } -use urlencoding::encode; +impl DatabaseConfig { + pub fn connection_string(&self) -> String { + format!( + "host={} port={} dbname={} user={} password={}", + self.host, self.port, self.name, self.user, self.password + ) + } +} pub struct Database { - conn: AsyncPgConnection, + client: tokio_postgres::Client, last_sync_times: HashMap>, } -#[derive(QueryableByName, Debug, Serialize)] -pub struct DynamicRow { - #[diesel(sql_type = Text)] - pub table_name: String, - #[diesel(sql_type = Text)] - pub data: String, -} - impl Database { - pub async fn connect(config: &DatabaseConfig) -> Result { - let conn = AsyncPgConnection::establish(&config.connection_string()).await.map_err(|e| diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::Unknown, - Box::new(format!("Failed to establish connection: {}", e)) - ))?; + pub async fn connect(config: &DatabaseConfig) -> Result { + let connection_string = config.connection_string(); + let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("数据库连接错误: {}", e); + } + }); Ok(Self { - conn, + client, last_sync_times: HashMap::new(), }) } - pub async fn execute_table_query(&mut self, table: &TableConfig, last_sync: Option>) -> Result, diesel::result::Error> { - let query = if table.incremental { - let last_sync = last_sync.unwrap_or_else(|| DateTime::::default()); - format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t WHERE {} > '{}'", table.name, table.query, table.key_field, last_sync.to_rfc3339()) + pub async fn execute_table_query(&self, table: &TableConfig, last_sync: Option>) -> Result, Error> { + let query = &table.query; + let rows = if table.incremental { + let last_sync = last_sync.unwrap_or_else(|| DateTime::::from_naive_utc_and_offset(NaiveDateTime::default(), Utc)); + self.client.query(query, &[&last_sync]).await? } else { - format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t", table.name, table.query) + self.client.query(query, &[]).await? }; - - let rows = diesel::sql_query(query) - .load::(&mut self.conn) - .await?; Ok(rows) } - pub async fn get_table_data(&mut self, table: &TableConfig) -> Result, diesel::result::Error> { - let last_sync = if table.incremental { - self.last_sync_times.get(&table.name).cloned() - } else { - None - }; - + pub async fn get_table_data(&mut self, table: &TableConfig) -> Result, Error> { + let last_sync = self.last_sync_times.get(&table.name).cloned(); let rows = self.execute_table_query(table, last_sync).await?; - if !rows.is_empty() { - // 解析 JSON 字符串 - for row in &rows { - if let Ok(json_value) = serde_json::from_str::(&row.data) { - if let Some(time_str) = json_value.get(&table.key_field).and_then(|v| v.as_str()) { - if let Ok(time) = DateTime::parse_from_rfc3339(time_str) { - self.last_sync_times.insert(table.name.clone(), time.with_timezone(&Utc)); - } + + if table.incremental && !rows.is_empty() { + // 找到key_field对应的列索引 + if let Some(first_row) = rows.first() { + if let Some(col_idx) = first_row.columns().iter().position(|col| col.name() == table.key_field) { + if let Some(max_time) = rows.iter() + .filter_map(|row| row.try_get::<_, DateTime>(col_idx).ok()) + .max() { + self.last_sync_times.insert(table.name.clone(), max_time); } } } } + Ok(rows) } - pub async fn test_connection(&mut self) -> Result { - let result: Vec = diesel::sql_query("SELECT 1::integer as result") - .load(&mut self.conn) - .await?; - Ok(!result.is_empty() && result[0].result == 1) + pub async fn test_connection(&self) -> Result { + let rows = self.client.query("SELECT 1", &[]).await?; + Ok(!rows.is_empty()) + } +} + +pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value { + let mut data_map = serde_json::Map::new(); + for (i, column) in row.columns().iter().enumerate() { + let name = column.name(); + let type_name = column.type_().name(); + let value = match type_name { + "int2" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => serde_json::Value::Number(serde_json::Number::from(val)), + _ => serde_json::Value::Null + }, + "int4" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => serde_json::Value::Number(serde_json::Number::from(val)), + _ => serde_json::Value::Null + }, + "int8" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => serde_json::Value::Number(serde_json::Number::from(val)), + _ => serde_json::Value::Null + }, + // numeric 类型用 f64 读取,增加调试输出 + "numeric" => { + match row.try_get::<_, Option>(i) { + Ok(Some(val)) => match serde_json::Number::from_f64(val) { + Some(n) => serde_json::Value::Number(n), + None => serde_json::Value::String(val.to_string()), + }, + Ok(None) => { + println!("字段 {} 为 NULL", name); + serde_json::Value::Null + }, + Err(e) => { + println!("读取字段 {} 出错: {:?}", name, e); + serde_json::Value::Null + } + } + }, + "float4" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => match serde_json::Number::from_f64(val as f64) { + Some(n) => serde_json::Value::Number(n), + None => serde_json::Value::String(val.to_string()) + }, + _ => serde_json::Value::Null + }, + "float8" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => match serde_json::Number::from_f64(val) { + Some(n) => serde_json::Value::Number(n), + None => serde_json::Value::String(val.to_string()) + }, + _ => serde_json::Value::Null + }, + "text" | "varchar" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => serde_json::Value::String(val), + _ => serde_json::Value::Null + }, + "bool" => match row.try_get::<_, Option>(i) { + Ok(Some(val)) => serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() }), + _ => serde_json::Value::Null + }, + "timestamp" | "timestamptz" | "date" => { + match row.try_get::<_, Option>(i) { + Ok(Some(dt)) => { + let formatted = dt.format("%Y-%m-%dT%H:%M:%S").to_string(); + serde_json::Value::String(formatted) + }, + _ => serde_json::Value::Null + } + }, + _ => serde_json::Value::Null + }; + data_map.insert(name.to_string(), value); } + let mut map = serde_json::Map::new(); + map.insert("table_name".to_string(), serde_json::Value::String(table_name.to_string())); + // 将data_map转为字符串 + let data_string = serde_json::to_string(&data_map).unwrap_or("{}".to_string()); + map.insert("data".to_string(), serde_json::Value::String(data_string)); + serde_json::Value::Object(map) } diff --git a/tcp_client/src/main.rs b/tcp_client/src/main.rs index 24dab6d..dd852b8 100644 --- a/tcp_client/src/main.rs +++ b/tcp_client/src/main.rs @@ -2,8 +2,8 @@ mod db; use std::net::TcpStream; use std::io::{Read, Write}; -use std::thread; use std::time::Duration; +use std::thread; use serde::Deserialize; use config::Config; use db::{Database, DatabaseConfig, TableConfig}; @@ -20,11 +20,7 @@ struct ClientConfig { retry_delay_secs: u64, read_timeout_secs: u64, write_timeout_secs: u64, -} - -#[derive(Debug, Deserialize)] -struct SchedulerConfig { - interval_hours: u64, + interval_seconds: u64, // 定时任务间隔时间 } #[derive(Debug, Deserialize)] @@ -32,7 +28,6 @@ struct Settings { server: ServerConfig, client: ClientConfig, database: DatabaseConfig, - scheduler: SchedulerConfig, tables: Vec, } @@ -44,84 +39,6 @@ fn load_config() -> Result { settings.try_deserialize() } -async fn sync_data(db: &mut Database, config: &Settings) { - for table in &config.tables { - println!("\n开始获取表 {} 的数据...", table.name); - match db.get_table_data(table).await { - Ok(rows) => { - println!("从数据库获取到 {} 条记录", rows.len()); - let mut success_count = 0; - let mut fail_count = 0; - - // 将每行数据发送到服务器 - for (index, row) in rows.iter().enumerate() { - let msg = match serde_json::to_string(&row) { - Ok(msg) => msg, - Err(e) => { - println!("序列化数据失败 (第{}条记录): {}", index + 1, e); - fail_count += 1; - continue; - } - }; - - // 尝试发送数据到服务器 - let mut retry_count = 0; - let server_addr = format!("{}:{}", config.server.host, config.server.port); - - while retry_count < config.client.max_retries { - match TcpStream::connect(&server_addr) { - Ok(mut stream) => { - // 设置读写超时 - stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap_or_default(); - stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap_or_default(); - - // 发送数据 - if let Err(e) = stream.write_all(msg.as_bytes()) { - println!("发送数据失败 (第{}条记录): {}", index + 1, e); - retry_count += 1; - thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); - continue; - } - - // 等待服务器响应 - let mut response = [0; 1024]; - match stream.read(&mut response) { - Ok(_) => { - success_count += 1; - break; - } - Err(e) => { - println!("读取服务器响应失败 (第{}条记录): {}", index + 1, e); - retry_count += 1; - thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); - continue; - } - } - } - Err(e) => { - println!("连接服务器失败 (第{}条记录): {}", index + 1, e); - retry_count += 1; - thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); - continue; - } - } - } - - if retry_count >= config.client.max_retries { - println!("发送数据失败,已达到最大重试次数 (第{}条记录)", index + 1); - fail_count += 1; - } - } - - println!("表 {} 同步完成", table.name); - println!("成功: {} 条记录", success_count); - println!("失败: {} 条记录", fail_count); - } - Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), - } - } -} - #[tokio::main] async fn main() { // 加载配置 @@ -133,16 +50,8 @@ async fn main() { } }; - println!("配置信息:"); - println!("数据库主机: {}", config.database.host); - println!("数据库端口: {}", config.database.port); - println!("数据库名称: {}", config.database.name); - println!("数据库用户: {}", config.database.user); - println!("要同步的表数量: {}", config.tables.len()); - println!("定时任务间隔: {}小时", config.scheduler.interval_hours); - loop { - // 建立数据库连接 + // 连接数据库 let mut db = match Database::connect(&config.database).await { Ok(db) => { println!("数据库连接成功!"); @@ -150,23 +59,108 @@ async fn main() { } Err(e) => { println!("数据库连接失败: {}", e); - println!("请检查数据库配置和网络连接:"); - println!("1. 确认数据库服务器是否运行"); - println!("2. 确认数据库服务器的IP地址是否正确"); - println!("3. 确认数据库服务器的端口是否正确"); - println!("4. 确认数据库用户和密码是否正确"); - println!("5. 确认网络连接是否正常"); - // 等待一段时间后重试 - tokio::time::sleep(Duration::from_secs(60)).await; + thread::sleep(Duration::from_secs(config.client.interval_seconds)); continue; } }; - // 执行同步 - sync_data(&mut db, &config).await; + // 测试数据库连接 + match db.test_connection().await { + Ok(true) => println!("数据库连接测试成功"), + Ok(false) => println!("数据库连接测试失败"), + Err(e) => println!("数据库测试错误: {}", e), + } + + // 获取配置的表数据 + for table in &config.tables { + println!("\n开始获取表 {} 的数据...", table.name); + match db.get_table_data(table).await { + Ok(rows) => { + println!("成功获取 {} 条记录", rows.len()); + // 将每行数据转换为JSON并发送到服务器 + for row in rows { + let json_data = db::format_row_as_json(&row, &table.name); + let msg = serde_json::to_string(&json_data).unwrap(); + + // 创建TCP连接 + let server_addr = format!("{}:{}", config.server.host, config.server.port); + let mut retry_count = 0; + + while retry_count < config.client.max_retries { + println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); + + match TcpStream::connect(&server_addr) { + Ok(mut stream) => { + println!("成功连接到服务器!"); + + if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { + println!("设置读取超时失败: {}", e); + continue; + } + + if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { + println!("设置写入超时失败: {}", e); + continue; + } + + // 发送数据 + match stream.write_all(msg.as_bytes()) { + Ok(_) => println!("成功发送消息: {}", msg), + Err(e) => { + println!("发送消息失败: {}", e); + retry_count += 1; + continue; + } + } + + // 读取响应 + let mut buffer = [0; 1]; + match stream.read_exact(&mut buffer) { + Ok(_) => { + let bit = buffer[0]; + // println!("接收到比特值: {}", bit); + if(bit==255){ + println!("接收成功"); + break; + } + else if(bit==0){ + println!("接收失败"); + retry_count += 1; + continue; + } + else{ + println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit); + retry_count += 1; + } + } + Err(e) => { + println!("接收数据失败: {}", e); + retry_count += 1; + } + } + } + Err(e) => { + println!("连接服务器失败: {}", e); + retry_count += 1; + if retry_count < config.client.max_retries { + println!("等待{}秒后重试...", config.client.retry_delay_secs); + thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + } + } + } + } + + if retry_count >= config.client.max_retries { + println!("达到最大重试次数,跳过当前数据"); + } + } + } + Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), + } + } // 等待下一次执行 - println!("\n等待 {} 小时后执行下一次同步...", config.scheduler.interval_hours); - tokio::time::sleep(Duration::from_secs(config.scheduler.interval_hours * 3600)).await; + println!("等待 {} 秒后执行下一次同步...", config.client.interval_seconds); + thread::sleep(Duration::from_secs(config.client.interval_seconds)); } }