From 566e1542e8477d52228d79a007d057bf804b9c20 Mon Sep 17 00:00:00 2001 From: OCEAN <1010331798@qq.com> Date: Sun, 20 Apr 2025 15:27:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=87=E5=AE=9A=E6=95=B0=E6=8D=AE=E8=A1=A8?= =?UTF-8?q?=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tcp_client/Cargo.lock | 153 +++++++++++++++++++++++++++++++++++++++ tcp_client/Cargo.toml | 4 +- tcp_client/config.toml | 112 ++++++++++++++++++++++++++++- tcp_client/src/db.rs | 158 +++++++++++++++++++++++++++++++---------- tcp_client/src/main.rs | 46 ++++++++++-- 5 files changed, 430 insertions(+), 43 deletions(-) diff --git a/tcp_client/Cargo.lock b/tcp_client/Cargo.lock index bb3d33e..1523505 100644 --- a/tcp_client/Cargo.lock +++ b/tcp_client/Cargo.lock @@ -28,6 +28,21 @@ dependencies = [ "version_check", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "async-trait" version = "0.1.88" @@ -111,12 +126,36 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "cc" +version = "1.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "config" version = "0.13.4" @@ -136,6 +175,12 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -289,6 +334,30 @@ dependencies = [ "digest", ] +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "itoa" version = "1.0.15" @@ -402,6 +471,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -562,6 +640,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] @@ -664,6 +743,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustversion" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" + [[package]] name = "ryu" version = "1.0.20" @@ -719,6 +804,12 @@ dependencies = [ "digest", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -791,8 +882,10 @@ dependencies = [ name = "tcp_client" version = "0.1.0" dependencies = [ + "chrono", "config", "serde", + "serde_json", "tokio", "tokio-postgres", ] @@ -983,6 +1076,7 @@ checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ "cfg-if", "once_cell", + "rustversion", "wasm-bindgen-macro", ] @@ -1053,6 +1147,65 @@ dependencies = [ "web-sys", ] +[[package]] +name = "windows-core" +version = "0.61.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + +[[package]] +name = "windows-result" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/tcp_client/Cargo.toml b/tcp_client/Cargo.toml index 2a1db69..a155f31 100644 --- a/tcp_client/Cargo.toml +++ b/tcp_client/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] tokio = { version = "1.28", features = ["full"] } -tokio-postgres = "0.7" +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } config = "0.13" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +chrono = { version = "0.4", features = ["serde"] } diff --git a/tcp_client/config.toml b/tcp_client/config.toml index 49e1502..deaf531 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -1,6 +1,6 @@ [server] #host = "10.180.4.88" -host = "127.0.0.1" +host = "10.180.4.88" port = 9090 [client] @@ -8,3 +8,113 @@ max_retries = 3 retry_delay_secs = 3 read_timeout_secs = 5 write_timeout_secs = 5 + +[database] +host = "10.180.4.100" +port = 5432 +name = "Auseft_RL_Web" +user = "postgres" +password = "Auseft@2025qwer" + +# 要同步的表配置 +[[tables]] +name = "hy_record" +query = "SELECT * FROM \"hy_record\"" +incremental = false # 是否增量同步 +key_field = "UpdateTime" # 增量同步的时间字段 + +[[tables]] +name = "hy_allot" +query = "SELECT * FROM \"hy_allot\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_cytask" +query = "SELECT * FROM \"hy_cytask\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_fullwatersample" +query = "SELECT * FROM \"hy_fullwatersample\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_informationnorm" +query = "SELECT * FROM \"hy_informationnorm\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_information" +query = "SELECT * FROM \"hy_information\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_itemdetail" +query = "SELECT * FROM \"hy_itemdetail\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_laboratoryinstrument" +query = "SELECT * FROM \"hy_laboratoryinstrument\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_materialanalysis_type" +query = "SELECT * FROM \"hy_materialanalysis_type\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_materialdetail" +query = "SELECT * FROM \"hy_materialdetail\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_instrument" +query = "SELECT * FROM \"hy_instrument\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_norm" +query = "SELECT * FROM \"hy_norm\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_sample_collection_detail" +query = "SELECT * FROM \"hy_sample_collection_detail\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_weight_input" +query = "SELECT * FROM \"hy_weight_input\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_warmhumid" +query = "SELECT * FROM \"hy_warmhumid\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_task" +query = "SELECT * FROM \"hy_task\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_spotcheck" +query = "SELECT * FROM \"hy_spotcheck\"" +incremental = false +key_field = "UpdateTime" diff --git a/tcp_client/src/db.rs b/tcp_client/src/db.rs index 9b9e22b..a94d5a4 100644 --- a/tcp_client/src/db.rs +++ b/tcp_client/src/db.rs @@ -1,52 +1,136 @@ -use tokio_postgres::{NoTls, Error}; - -pub struct DbConfig { - host: String, - port: u16, - dbname: String, - user: String, - password: String, +use tokio_postgres::{NoTls, Error, Row}; +use serde::Deserialize; +use std::collections::HashMap; +use chrono::{DateTime, Utc, NaiveDateTime}; + +#[derive(Debug, Deserialize, Clone)] +pub struct DatabaseConfig { + pub host: String, + pub port: u16, + pub name: String, + pub user: String, + pub password: String, } -impl DbConfig { - pub fn new() -> Self { - Self { - host: String::from("10.180.4.100"), - port: 5432, - dbname: String::from("Auseft_RL_Web"), - user: String::from("postgres"), - password: String::from("Auseft@2025qwer"), - } - } +#[derive(Debug, Deserialize, Clone)] +pub struct TableConfig { + pub name: String, + pub query: String, + pub incremental: bool, + pub key_field: String, +} +impl DatabaseConfig { pub fn connection_string(&self) -> String { format!( "host={} port={} dbname={} user={} password={}", - self.host, self.port, self.dbname, self.user, self.password + self.host, self.port, self.name, self.user, self.password ) } } -pub async fn connect_db() -> Result { - let config = DbConfig::new(); - let connection_string = config.connection_string(); - - let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; - - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("Connection error: {}", e); +pub struct Database { + client: tokio_postgres::Client, + last_sync_times: HashMap>, +} + +impl Database { + pub async fn connect(config: &DatabaseConfig) -> Result { + let connection_string = config.connection_string(); + let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("数据库连接错误: {}", e); + } + }); + + Ok(Self { + client, + last_sync_times: HashMap::new(), + }) + } + + pub async fn execute_table_query(&self, table: &TableConfig, last_sync: Option>) -> Result, Error> { + let query = &table.query; + let rows = if table.incremental { + let last_sync = last_sync.unwrap_or_else(|| DateTime::::from_naive_utc_and_offset(NaiveDateTime::default(), Utc)); + self.client.query(query, &[&last_sync]).await? + } else { + self.client.query(query, &[]).await? + }; + + Ok(rows) + } + + pub async fn get_table_data(&mut self, table: &TableConfig) -> Result, Error> { + let last_sync = self.last_sync_times.get(&table.name).cloned(); + let rows = self.execute_table_query(table, last_sync).await?; + + if table.incremental && !rows.is_empty() { + // 找到key_field对应的列索引 + if let Some(first_row) = rows.first() { + if let Some(col_idx) = first_row.columns().iter().position(|col| col.name() == table.key_field) { + if let Some(max_time) = rows.iter() + .filter_map(|row| row.try_get::<_, DateTime>(col_idx).ok()) + .max() { + self.last_sync_times.insert(table.name.clone(), max_time); + } + } + } } - }); + + Ok(rows) + } - Ok(client) + pub async fn test_connection(&self) -> Result { + let rows = self.client.query("SELECT 1", &[]).await?; + Ok(!rows.is_empty()) + } } -// Example function to test the connection -pub async fn test_connection() -> Result { - let client = connect_db().await?; - let rows = client.query("SELECT 1", &[]).await?; - Ok(!rows.is_empty()) +pub fn format_row_as_json(row: &Row) -> serde_json::Value { + let mut map = serde_json::Map::new(); + + for (i, column) in row.columns().iter().enumerate() { + let name = column.name(); + let value = match column.type_().name() { + "int4" => serde_json::Value::Number(serde_json::Number::from( + row.try_get::<_, i32>(i).unwrap_or_default() + )), + "int8" => serde_json::Value::Number(serde_json::Number::from( + row.try_get::<_, i64>(i).unwrap_or_default() + )), + "float4" => { + let val: Option = row.try_get(i).ok(); + match val.and_then(|v| serde_json::Number::from_f64(v as f64)) { + Some(n) => serde_json::Value::Number(n), + None => serde_json::Value::Null, + } + }, + "float8" => { + let val: Option = row.try_get(i).ok(); + match val.and_then(|v| serde_json::Number::from_f64(v)) { + Some(n) => serde_json::Value::Number(n), + None => serde_json::Value::Null, + } + }, + "text" | "varchar" => serde_json::Value::String( + row.try_get::<_, String>(i).unwrap_or_default() + ), + "bool" => serde_json::Value::Bool( + row.try_get::<_, bool>(i).unwrap_or_default() + ), + "timestamptz" => { + match row.try_get::<_, DateTime>(i) { + Ok(dt) => serde_json::Value::String(dt.to_rfc3339()), + Err(_) => serde_json::Value::Null, + } + }, + _ => serde_json::Value::Null, + }; + map.insert(name.to_string(), value); + } + + serde_json::Value::Object(map) } diff --git a/tcp_client/src/main.rs b/tcp_client/src/main.rs index 683c8a7..4ef552f 100644 --- a/tcp_client/src/main.rs +++ b/tcp_client/src/main.rs @@ -6,6 +6,7 @@ use std::thread; use std::time::Duration; use serde::Deserialize; use config::Config; +use db::{Database, DatabaseConfig, TableConfig}; #[derive(Debug, Deserialize)] struct ServerConfig { @@ -25,6 +26,8 @@ struct ClientConfig { struct Settings { server: ServerConfig, client: ClientConfig, + database: DatabaseConfig, + tables: Vec, } fn load_config() -> Result { @@ -46,28 +49,63 @@ async fn main() { } }; + // 连接数据库 + let mut db = match Database::connect(&config.database).await { + Ok(db) => { + println!("数据库连接成功!"); + db + } + Err(e) => { + println!("数据库连接失败: {}", e); + return; + } + }; + + // 测试数据库连接 + match db.test_connection().await { + Ok(true) => println!("数据库连接测试成功"), + Ok(false) => println!("数据库连接测试失败"), + Err(e) => println!("数据库测试错误: {}", e), + } + + // 获取配置的表数据 + for table in &config.tables { + println!("\n开始获取表 {} 的数据...", table.name); + match db.get_table_data(table).await { + Ok(rows) => { + println!("成功获取 {} 条记录", rows.len()); + // 将每行数据转换为JSON + for row in rows { + let json_data = db::format_row_as_json(&row); + println!("数据: {}", serde_json::to_string_pretty(&json_data).unwrap()); + } + } + Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), + } + } + + + return; let mut retry_count = 0; let server_addr = format!("{}:{}", config.server.host, config.server.port); + // TCP连接部分 while retry_count < config.client.max_retries { - println!("尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); + println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); match TcpStream::connect(&server_addr) { Ok(mut stream) => { println!("成功连接到服务器!"); - // 设置读写超时 stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap(); stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap(); - // 发送数据 let msg = "HelloOK"; match stream.write(msg.as_bytes()) { Ok(_) => println!("成功发送消息: {}", msg), Err(e) => println!("发送消息失败: {}", e), } - // 接收数据 let mut buffer = [0; 1]; match stream.read(&mut buffer) { Ok(n) => {