@@ -87,6 +87,19 @@ version = "0.22.1" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" | |||
[[package]] | |||
name = "bigdecimal" | |||
version = "0.4.8" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013" | |||
dependencies = [ | |||
"autocfg", | |||
"libm", | |||
"num-bigint", | |||
"num-integer", | |||
"num-traits", | |||
] | |||
[[package]] | |||
name = "bitflags" | |||
version = "1.3.2" | |||
@@ -200,56 +213,6 @@ dependencies = [ | |||
"typenum", | |||
] | |||
[[package]] | |||
name = "diesel" | |||
version = "2.1.6" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "ff236accb9a5069572099f0b350a92e9560e8e63a9b8d546162f4a5e03026bb2" | |||
dependencies = [ | |||
"bitflags 2.9.0", | |||
"byteorder", | |||
"chrono", | |||
"diesel_derives", | |||
"itoa", | |||
"pq-sys", | |||
"serde_json", | |||
] | |||
[[package]] | |||
name = "diesel-async" | |||
version = "0.4.1" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be" | |||
dependencies = [ | |||
"async-trait", | |||
"diesel", | |||
"futures-util", | |||
"scoped-futures", | |||
"tokio", | |||
"tokio-postgres", | |||
] | |||
[[package]] | |||
name = "diesel_derives" | |||
version = "2.1.4" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "14701062d6bed917b5c7103bdffaee1e4609279e240488ad24e7bd979ca6866c" | |||
dependencies = [ | |||
"diesel_table_macro_syntax", | |||
"proc-macro2", | |||
"quote", | |||
"syn", | |||
] | |||
[[package]] | |||
name = "diesel_table_macro_syntax" | |||
version = "0.1.0" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" | |||
dependencies = [ | |||
"syn", | |||
] | |||
[[package]] | |||
name = "digest" | |||
version = "0.10.7" | |||
@@ -447,6 +410,12 @@ version = "0.2.172" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" | |||
[[package]] | |||
name = "libm" | |||
version = "0.2.13" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" | |||
[[package]] | |||
name = "linked-hash-map" | |||
version = "0.5.6" | |||
@@ -521,6 +490,25 @@ dependencies = [ | |||
"minimal-lexical", | |||
] | |||
[[package]] | |||
name = "num-bigint" | |||
version = "0.4.6" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" | |||
dependencies = [ | |||
"num-integer", | |||
"num-traits", | |||
] | |||
[[package]] | |||
name = "num-integer" | |||
version = "0.1.46" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" | |||
dependencies = [ | |||
"num-traits", | |||
] | |||
[[package]] | |||
name = "num-traits" | |||
version = "0.2.19" | |||
@@ -690,6 +678,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" | |||
dependencies = [ | |||
"bytes", | |||
"chrono", | |||
"fallible-iterator", | |||
"postgres-protocol", | |||
] | |||
@@ -703,15 +692,6 @@ dependencies = [ | |||
"zerocopy", | |||
] | |||
[[package]] | |||
name = "pq-sys" | |||
version = "0.4.8" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" | |||
dependencies = [ | |||
"vcpkg", | |||
] | |||
[[package]] | |||
name = "proc-macro2" | |||
version = "1.0.95" | |||
@@ -813,15 +793,6 @@ version = "1.0.20" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" | |||
[[package]] | |||
name = "scoped-futures" | |||
version = "0.1.4" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" | |||
dependencies = [ | |||
"pin-project-lite", | |||
] | |||
[[package]] | |||
name = "scopeguard" | |||
version = "1.2.0" | |||
@@ -949,14 +920,14 @@ dependencies = [ | |||
name = "tcp_client" | |||
version = "0.1.0" | |||
dependencies = [ | |||
"bigdecimal", | |||
"chrono", | |||
"config", | |||
"diesel", | |||
"diesel-async", | |||
"postgres-types", | |||
"serde", | |||
"serde_json", | |||
"tokio", | |||
"urlencoding", | |||
"tokio-postgres", | |||
] | |||
[[package]] | |||
@@ -1110,18 +1081,6 @@ version = "0.1.3" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" | |||
[[package]] | |||
name = "urlencoding" | |||
version = "2.1.3" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" | |||
[[package]] | |||
name = "vcpkg" | |||
version = "0.2.15" | |||
source = "registry+https://github.com/rust-lang/crates.io-index" | |||
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" | |||
[[package]] | |||
name = "version_check" | |||
version = "0.9.5" | |||
@@ -5,10 +5,10 @@ edition = "2021" | |||
[dependencies] | |||
tokio = { version = "1.28", features = ["full"] } | |||
diesel = { version = "2.1.0", features = ["postgres", "chrono", "serde_json"] } | |||
diesel-async = { version = "0.4.1", features = ["postgres"] } | |||
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } | |||
config = "0.13" | |||
serde = { version = "1.0", features = ["derive"] } | |||
serde_json = "1.0" | |||
chrono = { version = "0.4", features = ["serde"] } | |||
urlencoding = "2.1" | |||
postgres-types = "0.2" | |||
bigdecimal = "0.4" |
@@ -1,44 +1,47 @@ | |||
[server] | |||
host = "10.180.4.88" | |||
#host = "127.0.0.1" | |||
#host = "10.180.4.88" | |||
host = "127.0.0.1" | |||
port = 9090 | |||
[client] | |||
# 定时任务配置 | |||
interval_seconds = 3600 # 每隔多少秒执行一次 | |||
max_retries = 3 | |||
retry_delay_secs =2 | |||
read_timeout_secs = 5 | |||
write_timeout_secs = 5 | |||
read_timeout_secs = 10 | |||
write_timeout_secs = 10 | |||
retry_delay_secs = 2 | |||
[database] | |||
#host = "192.168.0.100" | |||
host = "10.180.4.100" | |||
#host = "192.168.0.100" | |||
port = 5432 | |||
name = "Auseft_RL_Web" | |||
user = "postgres" | |||
password = "Auseft@2025qwer" | |||
[scheduler] | |||
interval_hours = 1 # 定时任务执行间隔(小时) | |||
# 要同步的表配置 | |||
[[tables]] | |||
name = "hy_record" | |||
query = "SELECT * FROM \"hy_record\" " | |||
incremental = false # 是否增量同步 | |||
key_field = "UpdateTime" # 增量同步的时间字段 | |||
query = "SELECT id, hy_code, type, hy_check, hy_approve, check_time, approve_time, approve_user, check_user, hy_time, hy_values, accept_time, accept_user, mt::text as mt, mad::text as mad, aad::text as aad, ad::text as ad, vad::text as vad, vd::text as vd, var::text as var, vdaf::text as vdaf, fcad::text as fcad, st_ar::text as st_ar, st_ad::text as st_ad, st_d::text as st_d, had::text as had, hd::text as hd, qb_ad::text as qb_ad, qgr_ad::text as qgr_ad, qgr_d::text as qgr_d, qnet_ar_mj_kg::text as qnet_ar_mj_kg, qnet_ar_j_cal::text as qnet_ar_j_cal, v::text as v, aar::text as aar, qnet_ar::text as qnet_ar, qnet_ar1::text as qnet_ar1, crc::text as crc, st_daf::text as st_daf, cad ::text as cad, cd::text as cd, isauto, hy_type, isnormal FROM hy_record where where hy_check=0 and accept_time >= NOW() - INTERVAL '10 days' " | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_itemdetail" | |||
query = "SELECT * FROM \"hy_itemdetail\"" | |||
query = "SELECT * FROM hy_itemdetail where record_id in (select id from hy_record where hy_check=0 and accept_time >= NOW() - INTERVAL '10 days') " | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_norm" | |||
query = "SELECT * FROM \"hy_norm\"" | |||
query = " select id,norm_id,zbvalues::text as zbvalues,itemdetail_id,hy_user,checktime,\"explain\" from hy_norm where itemdetail_id in (SELECT t1.id FROM hy_itemdetail t1 inner join hy_record t2 on t1.record_id=t2.id where t2.hy_check=0 and t2.accept_time >= NOW() - INTERVAL '10 days')" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_instrument" | |||
query = "SELECT * FROM \"hy_instrument\"" | |||
@@ -51,6 +54,8 @@ query = "SELECT * FROM \"hy_information\"" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_allot" | |||
query = "SELECT * FROM \"hy_allot\"" | |||
@@ -65,7 +70,7 @@ key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_fullwatersample" | |||
query = "SELECT * FROM \"hy_fullwatersample\"" | |||
query = "select id,qs_code,qs_tonnage::text as qs_tonnage,mt::text as mt,remark,onecode,towcode,fx_code,fx_onecode,fx_twocode from hy_fullwatersample" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
@@ -75,6 +80,10 @@ query = "SELECT * FROM \"hy_informationnorm\"" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_laboratoryinstrument" | |||
query = "SELECT * FROM \"hy_laboratoryinstrument\"" | |||
@@ -93,26 +102,17 @@ query = "SELECT * FROM \"hy_materialdetail\"" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_weight_input" | |||
query = "SELECT * FROM \"hy_weight_input\"" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_warmhumid" | |||
query = "SELECT * FROM \"hy_warmhumid\"" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_task" | |||
query = "SELECT * FROM \"hy_task\"" | |||
name = "hy_warmhumid" | |||
query = "select id,laboratoryid,temperature::text as temperature,humidity::text humidity,begintime,endtime, username from hy_warmhumid" | |||
incremental = false | |||
key_field = "UpdateTime" | |||
[[tables]] | |||
name = "hy_spotcheck" | |||
query = "SELECT * FROM \"hy_spotcheck\"" | |||
incremental = false | |||
key_field = "UpdateTime" |
@@ -1,15 +1,8 @@ | |||
use diesel::prelude::*; | |||
use diesel_async::{AsyncPgConnection, RunQueryDsl, AsyncConnection}; | |||
use diesel::sql_types::*; | |||
#[derive(QueryableByName)] | |||
struct TestResult { | |||
#[diesel(sql_type = Integer)] | |||
pub result: i32, | |||
} | |||
use serde::{Deserialize, Serialize}; | |||
use tokio_postgres::{NoTls, Error, Row}; | |||
use serde::Deserialize; | |||
use std::collections::HashMap; | |||
use chrono::{DateTime, Utc}; | |||
use chrono::{DateTime, Utc, NaiveDateTime}; | |||
use bigdecimal::BigDecimal; | |||
#[derive(Debug, Deserialize, Clone)] | |||
pub struct DatabaseConfig { | |||
@@ -20,19 +13,6 @@ pub struct DatabaseConfig { | |||
pub password: String, | |||
} | |||
impl DatabaseConfig { | |||
pub fn connection_string(&self) -> String { | |||
format!( | |||
"postgres://{}:{}@{}:{}/{}", | |||
encode(&self.user), | |||
encode(&self.password), | |||
self.host, | |||
self.port, | |||
self.name | |||
) | |||
} | |||
} | |||
#[derive(Debug, Deserialize, Clone)] | |||
pub struct TableConfig { | |||
pub name: String, | |||
@@ -41,76 +21,149 @@ pub struct TableConfig { | |||
pub key_field: String, | |||
} | |||
use urlencoding::encode; | |||
impl DatabaseConfig { | |||
pub fn connection_string(&self) -> String { | |||
format!( | |||
"host={} port={} dbname={} user={} password={}", | |||
self.host, self.port, self.name, self.user, self.password | |||
) | |||
} | |||
} | |||
pub struct Database { | |||
conn: AsyncPgConnection, | |||
client: tokio_postgres::Client, | |||
last_sync_times: HashMap<String, DateTime<Utc>>, | |||
} | |||
#[derive(QueryableByName, Debug, Serialize)] | |||
pub struct DynamicRow { | |||
#[diesel(sql_type = Text)] | |||
pub table_name: String, | |||
#[diesel(sql_type = Text)] | |||
pub data: String, | |||
} | |||
impl Database { | |||
pub async fn connect(config: &DatabaseConfig) -> Result<Self, diesel::result::Error> { | |||
let conn = AsyncPgConnection::establish(&config.connection_string()).await.map_err(|e| diesel::result::Error::DatabaseError( | |||
diesel::result::DatabaseErrorKind::Unknown, | |||
Box::new(format!("Failed to establish connection: {}", e)) | |||
))?; | |||
pub async fn connect(config: &DatabaseConfig) -> Result<Self, Error> { | |||
let connection_string = config.connection_string(); | |||
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; | |||
tokio::spawn(async move { | |||
if let Err(e) = connection.await { | |||
eprintln!("数据库连接错误: {}", e); | |||
} | |||
}); | |||
Ok(Self { | |||
conn, | |||
client, | |||
last_sync_times: HashMap::new(), | |||
}) | |||
} | |||
pub async fn execute_table_query(&mut self, table: &TableConfig, last_sync: Option<DateTime<Utc>>) -> Result<Vec<DynamicRow>, diesel::result::Error> { | |||
let query = if table.incremental { | |||
let last_sync = last_sync.unwrap_or_else(|| DateTime::<Utc>::default()); | |||
format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t WHERE {} > '{}'", table.name, table.query, table.key_field, last_sync.to_rfc3339()) | |||
pub async fn execute_table_query(&self, table: &TableConfig, last_sync: Option<DateTime<Utc>>) -> Result<Vec<Row>, Error> { | |||
let query = &table.query; | |||
let rows = if table.incremental { | |||
let last_sync = last_sync.unwrap_or_else(|| DateTime::<Utc>::from_naive_utc_and_offset(NaiveDateTime::default(), Utc)); | |||
self.client.query(query, &[&last_sync]).await? | |||
} else { | |||
format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t", table.name, table.query) | |||
self.client.query(query, &[]).await? | |||
}; | |||
let rows = diesel::sql_query(query) | |||
.load::<DynamicRow>(&mut self.conn) | |||
.await?; | |||
Ok(rows) | |||
} | |||
pub async fn get_table_data(&mut self, table: &TableConfig) -> Result<Vec<DynamicRow>, diesel::result::Error> { | |||
let last_sync = if table.incremental { | |||
self.last_sync_times.get(&table.name).cloned() | |||
} else { | |||
None | |||
}; | |||
pub async fn get_table_data(&mut self, table: &TableConfig) -> Result<Vec<Row>, Error> { | |||
let last_sync = self.last_sync_times.get(&table.name).cloned(); | |||
let rows = self.execute_table_query(table, last_sync).await?; | |||
if !rows.is_empty() { | |||
// 解析 JSON 字符串 | |||
for row in &rows { | |||
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(&row.data) { | |||
if let Some(time_str) = json_value.get(&table.key_field).and_then(|v| v.as_str()) { | |||
if let Ok(time) = DateTime::parse_from_rfc3339(time_str) { | |||
self.last_sync_times.insert(table.name.clone(), time.with_timezone(&Utc)); | |||
} | |||
if table.incremental && !rows.is_empty() { | |||
// 找到key_field对应的列索引 | |||
if let Some(first_row) = rows.first() { | |||
if let Some(col_idx) = first_row.columns().iter().position(|col| col.name() == table.key_field) { | |||
if let Some(max_time) = rows.iter() | |||
.filter_map(|row| row.try_get::<_, DateTime<Utc>>(col_idx).ok()) | |||
.max() { | |||
self.last_sync_times.insert(table.name.clone(), max_time); | |||
} | |||
} | |||
} | |||
} | |||
Ok(rows) | |||
} | |||
pub async fn test_connection(&mut self) -> Result<bool, diesel::result::Error> { | |||
let result: Vec<TestResult> = diesel::sql_query("SELECT 1::integer as result") | |||
.load(&mut self.conn) | |||
.await?; | |||
Ok(!result.is_empty() && result[0].result == 1) | |||
pub async fn test_connection(&self) -> Result<bool, Error> { | |||
let rows = self.client.query("SELECT 1", &[]).await?; | |||
Ok(!rows.is_empty()) | |||
} | |||
} | |||
pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value { | |||
let mut data_map = serde_json::Map::new(); | |||
for (i, column) in row.columns().iter().enumerate() { | |||
let name = column.name(); | |||
let type_name = column.type_().name(); | |||
let value = match type_name { | |||
"int2" => match row.try_get::<_, Option<i16>>(i) { | |||
Ok(Some(val)) => serde_json::Value::Number(serde_json::Number::from(val)), | |||
_ => serde_json::Value::Null | |||
}, | |||
"int4" => match row.try_get::<_, Option<i32>>(i) { | |||
Ok(Some(val)) => serde_json::Value::Number(serde_json::Number::from(val)), | |||
_ => serde_json::Value::Null | |||
}, | |||
"int8" => match row.try_get::<_, Option<i64>>(i) { | |||
Ok(Some(val)) => serde_json::Value::Number(serde_json::Number::from(val)), | |||
_ => serde_json::Value::Null | |||
}, | |||
// numeric 类型用 f64 读取,增加调试输出 | |||
"numeric" => { | |||
match row.try_get::<_, Option<f64>>(i) { | |||
Ok(Some(val)) => match serde_json::Number::from_f64(val) { | |||
Some(n) => serde_json::Value::Number(n), | |||
None => serde_json::Value::String(val.to_string()), | |||
}, | |||
Ok(None) => { | |||
println!("字段 {} 为 NULL", name); | |||
serde_json::Value::Null | |||
}, | |||
Err(e) => { | |||
println!("读取字段 {} 出错: {:?}", name, e); | |||
serde_json::Value::Null | |||
} | |||
} | |||
}, | |||
"float4" => match row.try_get::<_, Option<f32>>(i) { | |||
Ok(Some(val)) => match serde_json::Number::from_f64(val as f64) { | |||
Some(n) => serde_json::Value::Number(n), | |||
None => serde_json::Value::String(val.to_string()) | |||
}, | |||
_ => serde_json::Value::Null | |||
}, | |||
"float8" => match row.try_get::<_, Option<f64>>(i) { | |||
Ok(Some(val)) => match serde_json::Number::from_f64(val) { | |||
Some(n) => serde_json::Value::Number(n), | |||
None => serde_json::Value::String(val.to_string()) | |||
}, | |||
_ => serde_json::Value::Null | |||
}, | |||
"text" | "varchar" => match row.try_get::<_, Option<String>>(i) { | |||
Ok(Some(val)) => serde_json::Value::String(val), | |||
_ => serde_json::Value::Null | |||
}, | |||
"bool" => match row.try_get::<_, Option<bool>>(i) { | |||
Ok(Some(val)) => serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() }), | |||
_ => serde_json::Value::Null | |||
}, | |||
"timestamp" | "timestamptz" | "date" => { | |||
match row.try_get::<_, Option<NaiveDateTime>>(i) { | |||
Ok(Some(dt)) => { | |||
let formatted = dt.format("%Y-%m-%dT%H:%M:%S").to_string(); | |||
serde_json::Value::String(formatted) | |||
}, | |||
_ => serde_json::Value::Null | |||
} | |||
}, | |||
_ => serde_json::Value::Null | |||
}; | |||
data_map.insert(name.to_string(), value); | |||
} | |||
let mut map = serde_json::Map::new(); | |||
map.insert("table_name".to_string(), serde_json::Value::String(table_name.to_string())); | |||
// 将data_map转为字符串 | |||
let data_string = serde_json::to_string(&data_map).unwrap_or("{}".to_string()); | |||
map.insert("data".to_string(), serde_json::Value::String(data_string)); | |||
serde_json::Value::Object(map) | |||
} |
@@ -2,8 +2,8 @@ mod db; | |||
use std::net::TcpStream; | |||
use std::io::{Read, Write}; | |||
use std::thread; | |||
use std::time::Duration; | |||
use std::thread; | |||
use serde::Deserialize; | |||
use config::Config; | |||
use db::{Database, DatabaseConfig, TableConfig}; | |||
@@ -20,11 +20,7 @@ struct ClientConfig { | |||
retry_delay_secs: u64, | |||
read_timeout_secs: u64, | |||
write_timeout_secs: u64, | |||
} | |||
#[derive(Debug, Deserialize)] | |||
struct SchedulerConfig { | |||
interval_hours: u64, | |||
interval_seconds: u64, // 定时任务间隔时间 | |||
} | |||
#[derive(Debug, Deserialize)] | |||
@@ -32,7 +28,6 @@ struct Settings { | |||
server: ServerConfig, | |||
client: ClientConfig, | |||
database: DatabaseConfig, | |||
scheduler: SchedulerConfig, | |||
tables: Vec<TableConfig>, | |||
} | |||
@@ -44,84 +39,6 @@ fn load_config() -> Result<Settings, config::ConfigError> { | |||
settings.try_deserialize() | |||
} | |||
async fn sync_data(db: &mut Database, config: &Settings) { | |||
for table in &config.tables { | |||
println!("\n开始获取表 {} 的数据...", table.name); | |||
match db.get_table_data(table).await { | |||
Ok(rows) => { | |||
println!("从数据库获取到 {} 条记录", rows.len()); | |||
let mut success_count = 0; | |||
let mut fail_count = 0; | |||
// 将每行数据发送到服务器 | |||
for (index, row) in rows.iter().enumerate() { | |||
let msg = match serde_json::to_string(&row) { | |||
Ok(msg) => msg, | |||
Err(e) => { | |||
println!("序列化数据失败 (第{}条记录): {}", index + 1, e); | |||
fail_count += 1; | |||
continue; | |||
} | |||
}; | |||
// 尝试发送数据到服务器 | |||
let mut retry_count = 0; | |||
let server_addr = format!("{}:{}", config.server.host, config.server.port); | |||
while retry_count < config.client.max_retries { | |||
match TcpStream::connect(&server_addr) { | |||
Ok(mut stream) => { | |||
// 设置读写超时 | |||
stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap_or_default(); | |||
stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap_or_default(); | |||
// 发送数据 | |||
if let Err(e) = stream.write_all(msg.as_bytes()) { | |||
println!("发送数据失败 (第{}条记录): {}", index + 1, e); | |||
retry_count += 1; | |||
thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); | |||
continue; | |||
} | |||
// 等待服务器响应 | |||
let mut response = [0; 1024]; | |||
match stream.read(&mut response) { | |||
Ok(_) => { | |||
success_count += 1; | |||
break; | |||
} | |||
Err(e) => { | |||
println!("读取服务器响应失败 (第{}条记录): {}", index + 1, e); | |||
retry_count += 1; | |||
thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); | |||
continue; | |||
} | |||
} | |||
} | |||
Err(e) => { | |||
println!("连接服务器失败 (第{}条记录): {}", index + 1, e); | |||
retry_count += 1; | |||
thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); | |||
continue; | |||
} | |||
} | |||
} | |||
if retry_count >= config.client.max_retries { | |||
println!("发送数据失败,已达到最大重试次数 (第{}条记录)", index + 1); | |||
fail_count += 1; | |||
} | |||
} | |||
println!("表 {} 同步完成", table.name); | |||
println!("成功: {} 条记录", success_count); | |||
println!("失败: {} 条记录", fail_count); | |||
} | |||
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), | |||
} | |||
} | |||
} | |||
#[tokio::main] | |||
async fn main() { | |||
// 加载配置 | |||
@@ -133,16 +50,8 @@ async fn main() { | |||
} | |||
}; | |||
println!("配置信息:"); | |||
println!("数据库主机: {}", config.database.host); | |||
println!("数据库端口: {}", config.database.port); | |||
println!("数据库名称: {}", config.database.name); | |||
println!("数据库用户: {}", config.database.user); | |||
println!("要同步的表数量: {}", config.tables.len()); | |||
println!("定时任务间隔: {}小时", config.scheduler.interval_hours); | |||
loop { | |||
// 建立数据库连接 | |||
// 连接数据库 | |||
let mut db = match Database::connect(&config.database).await { | |||
Ok(db) => { | |||
println!("数据库连接成功!"); | |||
@@ -150,23 +59,108 @@ async fn main() { | |||
} | |||
Err(e) => { | |||
println!("数据库连接失败: {}", e); | |||
println!("请检查数据库配置和网络连接:"); | |||
println!("1. 确认数据库服务器是否运行"); | |||
println!("2. 确认数据库服务器的IP地址是否正确"); | |||
println!("3. 确认数据库服务器的端口是否正确"); | |||
println!("4. 确认数据库用户和密码是否正确"); | |||
println!("5. 确认网络连接是否正常"); | |||
// 等待一段时间后重试 | |||
tokio::time::sleep(Duration::from_secs(60)).await; | |||
thread::sleep(Duration::from_secs(config.client.interval_seconds)); | |||
continue; | |||
} | |||
}; | |||
// 执行同步 | |||
sync_data(&mut db, &config).await; | |||
// 测试数据库连接 | |||
match db.test_connection().await { | |||
Ok(true) => println!("数据库连接测试成功"), | |||
Ok(false) => println!("数据库连接测试失败"), | |||
Err(e) => println!("数据库测试错误: {}", e), | |||
} | |||
// 获取配置的表数据 | |||
for table in &config.tables { | |||
println!("\n开始获取表 {} 的数据...", table.name); | |||
match db.get_table_data(table).await { | |||
Ok(rows) => { | |||
println!("成功获取 {} 条记录", rows.len()); | |||
// 将每行数据转换为JSON并发送到服务器 | |||
for row in rows { | |||
let json_data = db::format_row_as_json(&row, &table.name); | |||
let msg = serde_json::to_string(&json_data).unwrap(); | |||
// 创建TCP连接 | |||
let server_addr = format!("{}:{}", config.server.host, config.server.port); | |||
let mut retry_count = 0; | |||
while retry_count < config.client.max_retries { | |||
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); | |||
match TcpStream::connect(&server_addr) { | |||
Ok(mut stream) => { | |||
println!("成功连接到服务器!"); | |||
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { | |||
println!("设置读取超时失败: {}", e); | |||
continue; | |||
} | |||
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { | |||
println!("设置写入超时失败: {}", e); | |||
continue; | |||
} | |||
// 发送数据 | |||
match stream.write_all(msg.as_bytes()) { | |||
Ok(_) => println!("成功发送消息: {}", msg), | |||
Err(e) => { | |||
println!("发送消息失败: {}", e); | |||
retry_count += 1; | |||
continue; | |||
} | |||
} | |||
// 读取响应 | |||
let mut buffer = [0; 1]; | |||
match stream.read_exact(&mut buffer) { | |||
Ok(_) => { | |||
let bit = buffer[0]; | |||
// println!("接收到比特值: {}", bit); | |||
if(bit==255){ | |||
println!("接收成功"); | |||
break; | |||
} | |||
else if(bit==0){ | |||
println!("接收失败"); | |||
retry_count += 1; | |||
continue; | |||
} | |||
else{ | |||
println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit); | |||
retry_count += 1; | |||
} | |||
} | |||
Err(e) => { | |||
println!("接收数据失败: {}", e); | |||
retry_count += 1; | |||
} | |||
} | |||
} | |||
Err(e) => { | |||
println!("连接服务器失败: {}", e); | |||
retry_count += 1; | |||
if retry_count < config.client.max_retries { | |||
println!("等待{}秒后重试...", config.client.retry_delay_secs); | |||
thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); | |||
} | |||
} | |||
} | |||
} | |||
if retry_count >= config.client.max_retries { | |||
println!("达到最大重试次数,跳过当前数据"); | |||
} | |||
} | |||
} | |||
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), | |||
} | |||
} | |||
// 等待下一次执行 | |||
println!("\n等待 {} 小时后执行下一次同步...", config.scheduler.interval_hours); | |||
tokio::time::sleep(Duration::from_secs(config.scheduler.interval_hours * 3600)).await; | |||
println!("等待 {} 秒后执行下一次同步...", config.client.interval_seconds); | |||
thread::sleep(Duration::from_secs(config.client.interval_seconds)); | |||
} | |||
} |