From c928645a7ad8ad54da20099b725077d1d6e07e19 Mon Sep 17 00:00:00 2001 From: OCEAN <1010331798@qq.com> Date: Mon, 21 Apr 2025 18:42:23 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tcp_client/Cargo.lock | 85 ++++++++++++++++- tcp_client/Cargo.toml | 4 +- tcp_client/config.toml | 25 +++-- tcp_client/src/db.rs | 209 ++++++++++++++--------------------------- tcp_client/src/main.rs | 206 ++++++++++++++++++++++------------------ 5 files changed, 281 insertions(+), 248 deletions(-) diff --git a/tcp_client/Cargo.lock b/tcp_client/Cargo.lock index 1523505..3196bf4 100644 --- a/tcp_client/Cargo.lock +++ b/tcp_client/Cargo.lock @@ -200,6 +200,56 @@ dependencies = [ "typenum", ] +[[package]] +name = "diesel" +version = "2.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff236accb9a5069572099f0b350a92e9560e8e63a9b8d546162f4a5e03026bb2" +dependencies = [ + "bitflags 2.9.0", + "byteorder", + "chrono", + "diesel_derives", + "itoa", + "pq-sys", + "serde_json", +] + +[[package]] +name = "diesel-async" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be" +dependencies = [ + "async-trait", + "diesel", + "futures-util", + "scoped-futures", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "diesel_derives" +version = "2.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14701062d6bed917b5c7103bdffaee1e4609279e240488ad24e7bd979ca6866c" +dependencies = [ + "diesel_table_macro_syntax", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "diesel_table_macro_syntax" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" +dependencies = [ + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -640,7 +690,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" dependencies = [ "bytes", - "chrono", "fallible-iterator", "postgres-protocol", ] @@ -654,6 +703,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "pq-sys" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" +dependencies = [ + "vcpkg", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -755,6 +813,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "scoped-futures" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" +dependencies = [ + "pin-project-lite", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -884,10 +951,12 @@ version = "0.1.0" dependencies = [ "chrono", "config", + "diesel", + "diesel-async", "serde", "serde_json", "tokio", - "tokio-postgres", + "urlencoding", ] [[package]] @@ -1041,6 +1110,18 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/tcp_client/Cargo.toml b/tcp_client/Cargo.toml index a155f31..078ee29 100644 --- a/tcp_client/Cargo.toml +++ b/tcp_client/Cargo.toml @@ -5,8 +5,10 @@ edition = "2021" [dependencies] tokio = { version = "1.28", features = ["full"] } -tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +diesel = { version = "2.1.0", features = ["postgres", "chrono", "serde_json"] } +diesel-async = { version = "0.4.1", features = ["postgres"] } config = "0.13" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } +urlencoding = "2.1" \ No newline at end of file diff --git a/tcp_client/config.toml b/tcp_client/config.toml index 1a00a08..d1168e0 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -1,19 +1,15 @@ [server] -host = "10.180.4.88" -#host = "127.0.0.1" - +#host = "10.180.4.88" +host = "127.0.0.1" port = 9090 [client] -# 定时任务配置 -interval_seconds = 60 # 每隔多少秒执行一次 max_retries = 3 -read_timeout_secs = 10 -write_timeout_secs = 10 -retry_delay_secs = 2 +retry_delay_secs =2 +read_timeout_secs = 5 +write_timeout_secs = 5 [database] -#host = "10.180.4.100" host = "192.168.0.100" port = 5432 name = "Auseft_RL_Web" @@ -21,8 +17,11 @@ user = "postgres" password = "Auseft@2025qwer" # 要同步的表配置 + + + [[tables]] -name = "hy_record" -query = "SELECT * FROM hy_record " -incremental = false # 是否增量同步 -key_field = "UpdateTime" # 增量同步的时间字段 +name = "hy_instrument" +query = "SELECT * FROM \"hy_instrument\"" +incremental = false +key_field = "UpdateTime" diff --git a/tcp_client/src/db.rs b/tcp_client/src/db.rs index 59747e0..68deeaf 100644 --- a/tcp_client/src/db.rs +++ b/tcp_client/src/db.rs @@ -1,7 +1,15 @@ -use tokio_postgres::{NoTls, Error, Row}; -use serde::Deserialize; +use diesel::prelude::*; +use diesel_async::{AsyncPgConnection, RunQueryDsl, AsyncConnection}; +use diesel::sql_types::*; + +#[derive(QueryableByName)] +struct TestResult { + #[diesel(sql_type = Integer)] + pub result: i32, +} +use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use chrono::{DateTime, Utc, NaiveDateTime}; +use chrono::{DateTime, Utc}; #[derive(Debug, Deserialize, Clone)] pub struct DatabaseConfig { @@ -12,6 +20,19 @@ pub struct DatabaseConfig { pub password: String, } +impl DatabaseConfig { + pub fn connection_string(&self) -> String { + format!( + "postgres://{}:{}@{}:{}/{}", + encode(&self.user), + encode(&self.password), + self.host, + self.port, + self.name + ) + } +} + #[derive(Debug, Deserialize, Clone)] pub struct TableConfig { pub name: String, @@ -20,168 +41,76 @@ pub struct TableConfig { pub key_field: String, } -impl DatabaseConfig { - pub fn connection_string(&self) -> String { - format!( - "host={} port={} dbname={} user={} password={}", - self.host, self.port, self.name, self.user, self.password - ) - } -} +use urlencoding::encode; pub struct Database { - client: tokio_postgres::Client, + conn: AsyncPgConnection, last_sync_times: HashMap>, } +#[derive(QueryableByName, Debug, Serialize)] +pub struct DynamicRow { + #[diesel(sql_type = Text)] + pub table_name: String, + #[diesel(sql_type = Text)] + pub data: String, +} + impl Database { - pub async fn connect(config: &DatabaseConfig) -> Result { - let connection_string = config.connection_string(); - let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("数据库连接错误: {}", e); - } - }); + pub async fn connect(config: &DatabaseConfig) -> Result { + let conn = AsyncPgConnection::establish(&config.connection_string()).await.map_err(|e| diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::Unknown, + Box::new(format!("Failed to establish connection: {}", e)) + ))?; Ok(Self { - client, + conn, last_sync_times: HashMap::new(), }) } - pub async fn execute_table_query(&self, table: &TableConfig, last_sync: Option>) -> Result, Error> { - let query = &table.query; - let rows = if table.incremental { - let last_sync = last_sync.unwrap_or_else(|| DateTime::::from_naive_utc_and_offset(NaiveDateTime::default(), Utc)); - self.client.query(query, &[&last_sync]).await? + pub async fn execute_table_query(&mut self, table: &TableConfig, last_sync: Option>) -> Result, diesel::result::Error> { + let query = if table.incremental { + let last_sync = last_sync.unwrap_or_else(|| DateTime::::default()); + format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t WHERE {} > '{}'", table.name, table.query, table.key_field, last_sync.to_rfc3339()) } else { - self.client.query(query, &[]).await? + format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t", table.name, table.query) }; + + let rows = diesel::sql_query(query) + .load::(&mut self.conn) + .await?; Ok(rows) } - pub async fn get_table_data(&mut self, table: &TableConfig) -> Result, Error> { - let last_sync = self.last_sync_times.get(&table.name).cloned(); + pub async fn get_table_data(&mut self, table: &TableConfig) -> Result, diesel::result::Error> { + let last_sync = if table.incremental { + self.last_sync_times.get(&table.name).cloned() + } else { + None + }; + let rows = self.execute_table_query(table, last_sync).await?; - - if table.incremental && !rows.is_empty() { - // 找到key_field对应的列索引 - if let Some(first_row) = rows.first() { - if let Some(col_idx) = first_row.columns().iter().position(|col| col.name() == table.key_field) { - if let Some(max_time) = rows.iter() - .filter_map(|row| row.try_get::<_, DateTime>(col_idx).ok()) - .max() { - self.last_sync_times.insert(table.name.clone(), max_time); + if !rows.is_empty() { + // 解析 JSON 字符串 + for row in &rows { + if let Ok(json_value) = serde_json::from_str::(&row.data) { + if let Some(time_str) = json_value.get(&table.key_field).and_then(|v| v.as_str()) { + if let Ok(time) = DateTime::parse_from_rfc3339(time_str) { + self.last_sync_times.insert(table.name.clone(), time.with_timezone(&Utc)); + } } } } } - Ok(rows) } - pub async fn test_connection(&self) -> Result { - let rows = self.client.query("SELECT 1", &[]).await?; - Ok(!rows.is_empty()) - } -} - -pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value { - let mut map = serde_json::Map::new(); - map.insert("table_name".to_string(), serde_json::Value::String(table_name.to_string())); - - for (i, column) in row.columns().iter().enumerate() { - let name = column.name(); - let type_name = column.type_().name(); - // println!("字段 {} 的类型是: {}", name, type_name); - - let value = match type_name { - "int2" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取 int2: {} = {}", name, val); - serde_json::Value::Number(serde_json::Number::from(val)) - }, - _ => serde_json::Value::Null - }, - "int4" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取 int4: {} = {}", name, val); - serde_json::Value::Number(serde_json::Number::from(val)) - }, - _ => serde_json::Value::Null - }, - "int8" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取 int8: {} = {}", name, val); - serde_json::Value::Number(serde_json::Number::from(val)) - }, - _ => serde_json::Value::Null - }, - "numeric" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取 numeric: {} = {}", name, val); - match serde_json::Number::from_f64(val) { - Some(n) => serde_json::Value::Number(n), - None => serde_json::Value::String(val.to_string()) - } - }, - _ => serde_json::Value::Null - }, - "float4" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取 float4: {} = {}", name, val); - match serde_json::Number::from_f64(val as f64) { - Some(n) => serde_json::Value::Number(n), - None => serde_json::Value::String(val.to_string()) - } - }, - _ => serde_json::Value::Null - }, - "float8" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取 float8: {} = {}", name, val); - match serde_json::Number::from_f64(val) { - Some(n) => serde_json::Value::Number(n), - None => serde_json::Value::String(val.to_string()) - } - }, - _ => serde_json::Value::Null - }, - "text" | "varchar" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取字符串: {} = {}", name, val); - serde_json::Value::String(val) - }, - _ => serde_json::Value::Null - }, - "bool" => match row.try_get::<_, Option>(i) { - Ok(Some(val)) => { - // println!("成功读取布尔值: {} = {}", name, val); - serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() }) - }, - _ => serde_json::Value::Null - }, - "timestamp" | "timestamptz" | "date" => { - // println!("处理时间字段: {}", name); - match row.try_get::<_, Option>(i) { - Ok(Some(dt)) => { - let formatted = dt.format("%Y-%m-%d %H:%M:%S").to_string(); - // println!("成功读取本地时间: {} = {}", name, formatted); - serde_json::Value::String(formatted) - }, - _ => serde_json::Value::Null - } - }, - _ => { - println!("未知类型字段: {} (类型: {})", name, type_name); - serde_json::Value::Null - } - }; - map.insert(name.to_string(), value); + pub async fn test_connection(&mut self) -> Result { + let result: Vec = diesel::sql_query("SELECT 1::integer as result") + .load(&mut self.conn) + .await?; + Ok(!result.is_empty() && result[0].result == 1) } - - serde_json::Value::Object(map) } diff --git a/tcp_client/src/main.rs b/tcp_client/src/main.rs index dd852b8..0d09bd5 100644 --- a/tcp_client/src/main.rs +++ b/tcp_client/src/main.rs @@ -2,8 +2,8 @@ mod db; use std::net::TcpStream; use std::io::{Read, Write}; -use std::time::Duration; use std::thread; +use std::time::Duration; use serde::Deserialize; use config::Config; use db::{Database, DatabaseConfig, TableConfig}; @@ -20,7 +20,6 @@ struct ClientConfig { retry_delay_secs: u64, read_timeout_secs: u64, write_timeout_secs: u64, - interval_seconds: u64, // 定时任务间隔时间 } #[derive(Debug, Deserialize)] @@ -50,117 +49,140 @@ async fn main() { } }; - loop { - // 连接数据库 - let mut db = match Database::connect(&config.database).await { - Ok(db) => { - println!("数据库连接成功!"); - db - } - Err(e) => { - println!("数据库连接失败: {}", e); - thread::sleep(Duration::from_secs(config.client.interval_seconds)); - continue; - } - }; - - // 测试数据库连接 - match db.test_connection().await { - Ok(true) => println!("数据库连接测试成功"), - Ok(false) => println!("数据库连接测试失败"), - Err(e) => println!("数据库测试错误: {}", e), + println!("配置信息:"); + println!("数据库主机: {}", config.database.host); + println!("数据库端口: {}", config.database.port); + println!("数据库名称: {}", config.database.name); + println!("数据库用户: {}", config.database.user); + println!("要同步的表数量: {}", config.tables.len()); + + // 建立数据库连接 + let mut db = match Database::connect(&config.database).await { + Ok(db) => { + println!("数据库连接成功!"); + db } + Err(e) => { + println!("数据库连接失败: {}", e); + println!("请检查数据库配置和网络连接:"); + println!("1. 确认数据库服务器是否运行"); + println!("2. 确认数据库服务器的IP地址是否正确"); + println!("3. 确认数据库服务器的端口是否正确"); + println!("4. 确认数据库用户和密码是否正确"); + println!("5. 确认网络连接是否正常"); + return; + } + }; - // 获取配置的表数据 - for table in &config.tables { - println!("\n开始获取表 {} 的数据...", table.name); - match db.get_table_data(table).await { - Ok(rows) => { - println!("成功获取 {} 条记录", rows.len()); - // 将每行数据转换为JSON并发送到服务器 - for row in rows { - let json_data = db::format_row_as_json(&row, &table.name); - let msg = serde_json::to_string(&json_data).unwrap(); - - // 创建TCP连接 - let server_addr = format!("{}:{}", config.server.host, config.server.port); - let mut retry_count = 0; - - while retry_count < config.client.max_retries { - println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1); - - match TcpStream::connect(&server_addr) { - Ok(mut stream) => { - println!("成功连接到服务器!"); - - if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { - println!("设置读取超时失败: {}", e); - continue; - } - - if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { - println!("设置写入超时失败: {}", e); - continue; - } + // 测试数据库连接 + match db.test_connection().await { + Ok(true) => println!("数据库连接测试成功"), + Ok(false) => println!("数据库连接测试失败"), + Err(e) => println!("数据库测试错误: {}", e), + } - // 发送数据 - match stream.write_all(msg.as_bytes()) { - Ok(_) => println!("成功发送消息: {}", msg), - Err(e) => { - println!("发送消息失败: {}", e); - retry_count += 1; - continue; - } + // 获取配置的表数据 + for table in &config.tables { + println!("\n开始获取表 {} 的数据...", table.name); + match db.get_table_data(table).await { + Ok(rows) => { + println!("从数据库获取到 {} 条记录", rows.len()); + let mut success_count = 0; + let mut fail_count = 0; + + // 将每行数据发送到服务器 + for (index, row) in rows.iter().enumerate() { + let msg = match serde_json::to_string(&row) { + Ok(msg) => msg, + Err(e) => { + println!("序列化数据失败 (第{}条记录): {}", index + 1, e); + fail_count += 1; + continue; + } + }; + + // 创建TCP连接 + let server_addr = format!("{}:{}", config.server.host, config.server.port); + let mut retry_count = 0; + let mut success = false; + + while retry_count < config.client.max_retries { + if retry_count > 0 { + println!("正在重试第 {} 条记录 (第{}次)...", index + 1, retry_count + 1); + thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + } + + match TcpStream::connect(&server_addr) { + Ok(mut stream) => { + // 设置超时 + if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) { + println!("设置读取超时失败: {}", e); + retry_count += 1; + continue; + } + + if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) { + println!("设置写入超时失败: {}", e); + retry_count += 1; + continue; + } + + // 发送数据 + match stream.write_all(msg.as_bytes()) { + Ok(_) => println!("成功发送消息: {}", msg), + Err(e) => { + println!("发送数据失败 (第{}条记录): {}", index + 1, e); + retry_count += 1; + continue; } + } - // 读取响应 - let mut buffer = [0; 1]; - match stream.read_exact(&mut buffer) { - Ok(_) => { - let bit = buffer[0]; - // println!("接收到比特值: {}", bit); - if(bit==255){ - println!("接收成功"); + // 读取响应 + let mut buffer = [0; 1]; + match stream.read_exact(&mut buffer) { + Ok(_) => { + match buffer[0] { + 255 => { + success = true; break; } - else if(bit==0){ - println!("接收失败"); + 0 => { + println!("服务器拒绝接收数据 (第{}条记录)", index + 1); retry_count += 1; - continue; } - else{ - println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit); + _ => { + println!("收到非预期响应: {} (第{}条记录)", buffer[0], index + 1); retry_count += 1; } } - Err(e) => { - println!("接收数据失败: {}", e); - retry_count += 1; - } } - } - Err(e) => { - println!("连接服务器失败: {}", e); - retry_count += 1; - if retry_count < config.client.max_retries { - println!("等待{}秒后重试...", config.client.retry_delay_secs); - thread::sleep(Duration::from_secs(config.client.retry_delay_secs)); + Err(e) => { + println!("读取服务器响应失败 (第{}条记录): {}", index + 1, e); + retry_count += 1; } } } + Err(e) => { + println!("连接服务器失败 (第{}条记录): {}", index + 1, e); + retry_count += 1; + } } + } - if retry_count >= config.client.max_retries { - println!("达到最大重试次数,跳过当前数据"); - } + if success { + success_count += 1; + } else { + println!("达到最大重试次数,跳过第{}条记录", index + 1); + fail_count += 1; } } - Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), + + println!("\n表 {} 同步完成:", table.name); + println!("总记录数: {}", rows.len()); + println!("成功: {} 条", success_count); + println!("失败: {} 条", fail_count); } + Err(e) => println!("获取表 {} 数据失败: {}", table.name, e), } - - // 等待下一次执行 - println!("等待 {} 秒后执行下一次同步...", config.client.interval_seconds); - thread::sleep(Duration::from_secs(config.client.interval_seconds)); } }