Ver código fonte

调整优化

晋江
OCEAN 2 meses atrás
pai
commit
c928645a7a
5 arquivos alterados com 281 adições e 248 exclusões
  1. +83
    -2
      tcp_client/Cargo.lock
  2. +3
    -1
      tcp_client/Cargo.toml
  3. +12
    -13
      tcp_client/config.toml
  4. +69
    -140
      tcp_client/src/db.rs
  5. +114
    -92
      tcp_client/src/main.rs

+ 83
- 2
tcp_client/Cargo.lock Ver arquivo

@@ -200,6 +200,56 @@ dependencies = [
"typenum",
]

[[package]]
name = "diesel"
version = "2.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff236accb9a5069572099f0b350a92e9560e8e63a9b8d546162f4a5e03026bb2"
dependencies = [
"bitflags 2.9.0",
"byteorder",
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
"serde_json",
]

[[package]]
name = "diesel-async"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be"
dependencies = [
"async-trait",
"diesel",
"futures-util",
"scoped-futures",
"tokio",
"tokio-postgres",
]

[[package]]
name = "diesel_derives"
version = "2.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14701062d6bed917b5c7103bdffaee1e4609279e240488ad24e7bd979ca6866c"
dependencies = [
"diesel_table_macro_syntax",
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "diesel_table_macro_syntax"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5"
dependencies = [
"syn",
]

[[package]]
name = "digest"
version = "0.10.7"
@@ -640,7 +690,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48"
dependencies = [
"bytes",
"chrono",
"fallible-iterator",
"postgres-protocol",
]
@@ -654,6 +703,15 @@ dependencies = [
"zerocopy",
]

[[package]]
name = "pq-sys"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
dependencies = [
"vcpkg",
]

[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -755,6 +813,15 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"

[[package]]
name = "scoped-futures"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091"
dependencies = [
"pin-project-lite",
]

[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -884,10 +951,12 @@ version = "0.1.0"
dependencies = [
"chrono",
"config",
"diesel",
"diesel-async",
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"urlencoding",
]

[[package]]
@@ -1041,6 +1110,18 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"

[[package]]
name = "urlencoding"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"

[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"

[[package]]
name = "version_check"
version = "0.9.5"


+ 3
- 1
tcp_client/Cargo.toml Ver arquivo

@@ -5,8 +5,10 @@ edition = "2021"

[dependencies]
tokio = { version = "1.28", features = ["full"] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
diesel = { version = "2.1.0", features = ["postgres", "chrono", "serde_json"] }
diesel-async = { version = "0.4.1", features = ["postgres"] }
config = "0.13"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
urlencoding = "2.1"

+ 12
- 13
tcp_client/config.toml Ver arquivo

@@ -1,19 +1,15 @@
[server]
host = "10.180.4.88"
#host = "127.0.0.1"

#host = "10.180.4.88"
host = "127.0.0.1"
port = 9090

[client]
# 定时任务配置
interval_seconds = 60 # 每隔多少秒执行一次
max_retries = 3
read_timeout_secs = 10
write_timeout_secs = 10
retry_delay_secs = 2
retry_delay_secs =2
read_timeout_secs = 5
write_timeout_secs = 5

[database]
#host = "10.180.4.100"
host = "192.168.0.100"
port = 5432
name = "Auseft_RL_Web"
@@ -21,8 +17,11 @@ user = "postgres"
password = "Auseft@2025qwer"

# 要同步的表配置



[[tables]]
name = "hy_record"
query = "SELECT * FROM hy_record "
incremental = false # 是否增量同步
key_field = "UpdateTime" # 增量同步的时间字段
name = "hy_instrument"
query = "SELECT * FROM \"hy_instrument\""
incremental = false
key_field = "UpdateTime"

+ 69
- 140
tcp_client/src/db.rs Ver arquivo

@@ -1,7 +1,15 @@
use tokio_postgres::{NoTls, Error, Row};
use serde::Deserialize;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl, AsyncConnection};
use diesel::sql_types::*;

#[derive(QueryableByName)]
struct TestResult {
#[diesel(sql_type = Integer)]
pub result: i32,
}
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use chrono::{DateTime, Utc, NaiveDateTime};
use chrono::{DateTime, Utc};

#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
@@ -12,6 +20,19 @@ pub struct DatabaseConfig {
pub password: String,
}

impl DatabaseConfig {
pub fn connection_string(&self) -> String {
format!(
"postgres://{}:{}@{}:{}/{}",
encode(&self.user),
encode(&self.password),
self.host,
self.port,
self.name
)
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct TableConfig {
pub name: String,
@@ -20,168 +41,76 @@ pub struct TableConfig {
pub key_field: String,
}

impl DatabaseConfig {
pub fn connection_string(&self) -> String {
format!(
"host={} port={} dbname={} user={} password={}",
self.host, self.port, self.name, self.user, self.password
)
}
}
use urlencoding::encode;

pub struct Database {
client: tokio_postgres::Client,
conn: AsyncPgConnection,
last_sync_times: HashMap<String, DateTime<Utc>>,
}

#[derive(QueryableByName, Debug, Serialize)]
pub struct DynamicRow {
#[diesel(sql_type = Text)]
pub table_name: String,
#[diesel(sql_type = Text)]
pub data: String,
}

impl Database {
pub async fn connect(config: &DatabaseConfig) -> Result<Self, Error> {
let connection_string = config.connection_string();
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("数据库连接错误: {}", e);
}
});
pub async fn connect(config: &DatabaseConfig) -> Result<Self, diesel::result::Error> {
let conn = AsyncPgConnection::establish(&config.connection_string()).await.map_err(|e| diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::Unknown,
Box::new(format!("Failed to establish connection: {}", e))
))?;

Ok(Self {
client,
conn,
last_sync_times: HashMap::new(),
})
}

pub async fn execute_table_query(&self, table: &TableConfig, last_sync: Option<DateTime<Utc>>) -> Result<Vec<Row>, Error> {
let query = &table.query;
let rows = if table.incremental {
let last_sync = last_sync.unwrap_or_else(|| DateTime::<Utc>::from_naive_utc_and_offset(NaiveDateTime::default(), Utc));
self.client.query(query, &[&last_sync]).await?
pub async fn execute_table_query(&mut self, table: &TableConfig, last_sync: Option<DateTime<Utc>>) -> Result<Vec<DynamicRow>, diesel::result::Error> {
let query = if table.incremental {
let last_sync = last_sync.unwrap_or_else(|| DateTime::<Utc>::default());
format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t WHERE {} > '{}'", table.name, table.query, table.key_field, last_sync.to_rfc3339())
} else {
self.client.query(query, &[]).await?
format!("SELECT '{}' as table_name, row_to_json(t)::text as data FROM ({}) t", table.name, table.query)
};

let rows = diesel::sql_query(query)
.load::<DynamicRow>(&mut self.conn)
.await?;
Ok(rows)
}

pub async fn get_table_data(&mut self, table: &TableConfig) -> Result<Vec<Row>, Error> {
let last_sync = self.last_sync_times.get(&table.name).cloned();
pub async fn get_table_data(&mut self, table: &TableConfig) -> Result<Vec<DynamicRow>, diesel::result::Error> {
let last_sync = if table.incremental {
self.last_sync_times.get(&table.name).cloned()
} else {
None
};

let rows = self.execute_table_query(table, last_sync).await?;
if table.incremental && !rows.is_empty() {
// 找到key_field对应的列索引
if let Some(first_row) = rows.first() {
if let Some(col_idx) = first_row.columns().iter().position(|col| col.name() == table.key_field) {
if let Some(max_time) = rows.iter()
.filter_map(|row| row.try_get::<_, DateTime<Utc>>(col_idx).ok())
.max() {
self.last_sync_times.insert(table.name.clone(), max_time);
if !rows.is_empty() {
// 解析 JSON 字符串
for row in &rows {
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(&row.data) {
if let Some(time_str) = json_value.get(&table.key_field).and_then(|v| v.as_str()) {
if let Ok(time) = DateTime::parse_from_rfc3339(time_str) {
self.last_sync_times.insert(table.name.clone(), time.with_timezone(&Utc));
}
}
}
}
}
Ok(rows)
}

pub async fn test_connection(&self) -> Result<bool, Error> {
let rows = self.client.query("SELECT 1", &[]).await?;
Ok(!rows.is_empty())
}
}

pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value {
let mut map = serde_json::Map::new();
map.insert("table_name".to_string(), serde_json::Value::String(table_name.to_string()));
for (i, column) in row.columns().iter().enumerate() {
let name = column.name();
let type_name = column.type_().name();
// println!("字段 {} 的类型是: {}", name, type_name);
let value = match type_name {
"int2" => match row.try_get::<_, Option<i16>>(i) {
Ok(Some(val)) => {
// println!("成功读取 int2: {} = {}", name, val);
serde_json::Value::Number(serde_json::Number::from(val))
},
_ => serde_json::Value::Null
},
"int4" => match row.try_get::<_, Option<i32>>(i) {
Ok(Some(val)) => {
// println!("成功读取 int4: {} = {}", name, val);
serde_json::Value::Number(serde_json::Number::from(val))
},
_ => serde_json::Value::Null
},
"int8" => match row.try_get::<_, Option<i64>>(i) {
Ok(Some(val)) => {
// println!("成功读取 int8: {} = {}", name, val);
serde_json::Value::Number(serde_json::Number::from(val))
},
_ => serde_json::Value::Null
},
"numeric" => match row.try_get::<_, Option<f64>>(i) {
Ok(Some(val)) => {
// println!("成功读取 numeric: {} = {}", name, val);
match serde_json::Number::from_f64(val) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::String(val.to_string())
}
},
_ => serde_json::Value::Null
},
"float4" => match row.try_get::<_, Option<f32>>(i) {
Ok(Some(val)) => {
// println!("成功读取 float4: {} = {}", name, val);
match serde_json::Number::from_f64(val as f64) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::String(val.to_string())
}
},
_ => serde_json::Value::Null
},
"float8" => match row.try_get::<_, Option<f64>>(i) {
Ok(Some(val)) => {
// println!("成功读取 float8: {} = {}", name, val);
match serde_json::Number::from_f64(val) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::String(val.to_string())
}
},
_ => serde_json::Value::Null
},
"text" | "varchar" => match row.try_get::<_, Option<String>>(i) {
Ok(Some(val)) => {
// println!("成功读取字符串: {} = {}", name, val);
serde_json::Value::String(val)
},
_ => serde_json::Value::Null
},
"bool" => match row.try_get::<_, Option<bool>>(i) {
Ok(Some(val)) => {
// println!("成功读取布尔值: {} = {}", name, val);
serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() })
},
_ => serde_json::Value::Null
},
"timestamp" | "timestamptz" | "date" => {
// println!("处理时间字段: {}", name);
match row.try_get::<_, Option<NaiveDateTime>>(i) {
Ok(Some(dt)) => {
let formatted = dt.format("%Y-%m-%d %H:%M:%S").to_string();
// println!("成功读取本地时间: {} = {}", name, formatted);
serde_json::Value::String(formatted)
},
_ => serde_json::Value::Null
}
},
_ => {
println!("未知类型字段: {} (类型: {})", name, type_name);
serde_json::Value::Null
}
};
map.insert(name.to_string(), value);
pub async fn test_connection(&mut self) -> Result<bool, diesel::result::Error> {
let result: Vec<TestResult> = diesel::sql_query("SELECT 1::integer as result")
.load(&mut self.conn)
.await?;
Ok(!result.is_empty() && result[0].result == 1)
}
serde_json::Value::Object(map)
}

+ 114
- 92
tcp_client/src/main.rs Ver arquivo

@@ -2,8 +2,8 @@ mod db;

use std::net::TcpStream;
use std::io::{Read, Write};
use std::time::Duration;
use std::thread;
use std::time::Duration;
use serde::Deserialize;
use config::Config;
use db::{Database, DatabaseConfig, TableConfig};
@@ -20,7 +20,6 @@ struct ClientConfig {
retry_delay_secs: u64,
read_timeout_secs: u64,
write_timeout_secs: u64,
interval_seconds: u64, // 定时任务间隔时间
}

#[derive(Debug, Deserialize)]
@@ -50,117 +49,140 @@ async fn main() {
}
};

loop {
// 连接数据库
let mut db = match Database::connect(&config.database).await {
Ok(db) => {
println!("数据库连接成功!");
db
}
Err(e) => {
println!("数据库连接失败: {}", e);
thread::sleep(Duration::from_secs(config.client.interval_seconds));
continue;
}
};

// 测试数据库连接
match db.test_connection().await {
Ok(true) => println!("数据库连接测试成功"),
Ok(false) => println!("数据库连接测试失败"),
Err(e) => println!("数据库测试错误: {}", e),
println!("配置信息:");
println!("数据库主机: {}", config.database.host);
println!("数据库端口: {}", config.database.port);
println!("数据库名称: {}", config.database.name);
println!("数据库用户: {}", config.database.user);
println!("要同步的表数量: {}", config.tables.len());

// 建立数据库连接
let mut db = match Database::connect(&config.database).await {
Ok(db) => {
println!("数据库连接成功!");
db
}
Err(e) => {
println!("数据库连接失败: {}", e);
println!("请检查数据库配置和网络连接:");
println!("1. 确认数据库服务器是否运行");
println!("2. 确认数据库服务器的IP地址是否正确");
println!("3. 确认数据库服务器的端口是否正确");
println!("4. 确认数据库用户和密码是否正确");
println!("5. 确认网络连接是否正常");
return;
}
};

// 获取配置的表数据
for table in &config.tables {
println!("\n开始获取表 {} 的数据...", table.name);
match db.get_table_data(table).await {
Ok(rows) => {
println!("成功获取 {} 条记录", rows.len());
// 将每行数据转换为JSON并发送到服务器
for row in rows {
let json_data = db::format_row_as_json(&row, &table.name);
let msg = serde_json::to_string(&json_data).unwrap();

// 创建TCP连接
let server_addr = format!("{}:{}", config.server.host, config.server.port);
let mut retry_count = 0;

while retry_count < config.client.max_retries {
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
println!("成功连接到服务器!");
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) {
println!("设置读取超时失败: {}", e);
continue;
}
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) {
println!("设置写入超时失败: {}", e);
continue;
}
// 测试数据库连接
match db.test_connection().await {
Ok(true) => println!("数据库连接测试成功"),
Ok(false) => println!("数据库连接测试失败"),
Err(e) => println!("数据库测试错误: {}", e),
}

// 发送数据
match stream.write_all(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => {
println!("发送消息失败: {}", e);
retry_count += 1;
continue;
}
// 获取配置的表数据
for table in &config.tables {
println!("\n开始获取表 {} 的数据...", table.name);
match db.get_table_data(table).await {
Ok(rows) => {
println!("从数据库获取到 {} 条记录", rows.len());
let mut success_count = 0;
let mut fail_count = 0;

// 将每行数据发送到服务器
for (index, row) in rows.iter().enumerate() {
let msg = match serde_json::to_string(&row) {
Ok(msg) => msg,
Err(e) => {
println!("序列化数据失败 (第{}条记录): {}", index + 1, e);
fail_count += 1;
continue;
}
};

// 创建TCP连接
let server_addr = format!("{}:{}", config.server.host, config.server.port);
let mut retry_count = 0;
let mut success = false;

while retry_count < config.client.max_retries {
if retry_count > 0 {
println!("正在重试第 {} 条记录 (第{}次)...", index + 1, retry_count + 1);
thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
}
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
// 设置超时
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) {
println!("设置读取超时失败: {}", e);
retry_count += 1;
continue;
}
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) {
println!("设置写入超时失败: {}", e);
retry_count += 1;
continue;
}

// 发送数据
match stream.write_all(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => {
println!("发送数据失败 (第{}条记录): {}", index + 1, e);
retry_count += 1;
continue;
}
}

// 读取响应
let mut buffer = [0; 1];
match stream.read_exact(&mut buffer) {
Ok(_) => {
let bit = buffer[0];
// println!("接收到比特值: {}", bit);
if(bit==255){
println!("接收成功");
// 读取响应
let mut buffer = [0; 1];
match stream.read_exact(&mut buffer) {
Ok(_) => {
match buffer[0] {
255 => {
success = true;
break;
}
else if(bit==0){
println!("接收失败");
0 => {
println!("服务器拒绝接收数据 (第{}条记录)", index + 1);
retry_count += 1;
continue;
}
else{
println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit);
_ => {
println!("收到非预期响应: {} (第{}条记录)", buffer[0], index + 1);
retry_count += 1;
}
}
Err(e) => {
println!("接收数据失败: {}", e);
retry_count += 1;
}
}
}
Err(e) => {
println!("连接服务器失败: {}", e);
retry_count += 1;
if retry_count < config.client.max_retries {
println!("等待{}秒后重试...", config.client.retry_delay_secs);
thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
Err(e) => {
println!("读取服务器响应失败 (第{}条记录): {}", index + 1, e);
retry_count += 1;
}
}
}
Err(e) => {
println!("连接服务器失败 (第{}条记录): {}", index + 1, e);
retry_count += 1;
}
}
}

if retry_count >= config.client.max_retries {
println!("达到最大重试次数,跳过当前数据");
}
if success {
success_count += 1;
} else {
println!("达到最大重试次数,跳过第{}条记录", index + 1);
fail_count += 1;
}
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),

println!("\n表 {} 同步完成:", table.name);
println!("总记录数: {}", rows.len());
println!("成功: {} 条", success_count);
println!("失败: {} 条", fail_count);
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
}

// 等待下一次执行
println!("等待 {} 秒后执行下一次同步...", config.client.interval_seconds);
thread::sleep(Duration::from_secs(config.client.interval_seconds));
}
}

Carregando…
Cancelar
Salvar