Преглед на файлове

指定数据表同步

master
OCEAN преди 2 месеца
родител
ревизия
566e1542e8
променени са 5 файла, в които са добавени 430 реда и са изтрити 43 реда
  1. +153
    -0
      tcp_client/Cargo.lock
  2. +3
    -1
      tcp_client/Cargo.toml
  3. +111
    -1
      tcp_client/config.toml
  4. +121
    -37
      tcp_client/src/db.rs
  5. +42
    -4
      tcp_client/src/main.rs

+ 153
- 0
tcp_client/Cargo.lock Целия файл

@@ -28,6 +28,21 @@ dependencies = [
"version_check",
]

[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"

[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]

[[package]]
name = "async-trait"
version = "0.1.88"
@@ -111,12 +126,36 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"

[[package]]
name = "cc"
version = "1.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362"
dependencies = [
"shlex",
]

[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"

[[package]]
name = "chrono"
version = "0.4.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
]

[[package]]
name = "config"
version = "0.13.4"
@@ -136,6 +175,12 @@ dependencies = [
"yaml-rust",
]

[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"

[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -289,6 +334,30 @@ dependencies = [
"digest",
]

[[package]]
name = "iana-time-zone"
version = "0.1.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]

[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]

[[package]]
name = "itoa"
version = "1.0.15"
@@ -402,6 +471,15 @@ dependencies = [
"minimal-lexical",
]

[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]

[[package]]
name = "object"
version = "0.36.7"
@@ -562,6 +640,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48"
dependencies = [
"bytes",
"chrono",
"fallible-iterator",
"postgres-protocol",
]
@@ -664,6 +743,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"

[[package]]
name = "rustversion"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"

[[package]]
name = "ryu"
version = "1.0.20"
@@ -719,6 +804,12 @@ dependencies = [
"digest",
]

[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"

[[package]]
name = "signal-hook-registry"
version = "1.4.2"
@@ -791,8 +882,10 @@ dependencies = [
name = "tcp_client"
version = "0.1.0"
dependencies = [
"chrono",
"config",
"serde",
"serde_json",
"tokio",
"tokio-postgres",
]
@@ -983,6 +1076,7 @@ checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]

@@ -1053,6 +1147,65 @@ dependencies = [
"web-sys",
]

[[package]]
name = "windows-core"
version = "0.61.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]

[[package]]
name = "windows-implement"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "windows-interface"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "windows-link"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"

[[package]]
name = "windows-result"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252"
dependencies = [
"windows-link",
]

[[package]]
name = "windows-strings"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
dependencies = [
"windows-link",
]

[[package]]
name = "windows-sys"
version = "0.52.0"


+ 3
- 1
tcp_client/Cargo.toml Целия файл

@@ -5,6 +5,8 @@ edition = "2021"

[dependencies]
tokio = { version = "1.28", features = ["full"] }
tokio-postgres = "0.7"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
config = "0.13"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }

+ 111
- 1
tcp_client/config.toml Целия файл

@@ -1,6 +1,6 @@
[server]
#host = "10.180.4.88"
host = "127.0.0.1"
host = "10.180.4.88"
port = 9090

[client]
@@ -8,3 +8,113 @@ max_retries = 3
retry_delay_secs = 3
read_timeout_secs = 5
write_timeout_secs = 5

[database]
host = "10.180.4.100"
port = 5432
name = "Auseft_RL_Web"
user = "postgres"
password = "Auseft@2025qwer"

# 要同步的表配置
[[tables]]
name = "hy_record"
query = "SELECT * FROM \"hy_record\""
incremental = false # 是否增量同步
key_field = "UpdateTime" # 增量同步的时间字段

[[tables]]
name = "hy_allot"
query = "SELECT * FROM \"hy_allot\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_cytask"
query = "SELECT * FROM \"hy_cytask\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_fullwatersample"
query = "SELECT * FROM \"hy_fullwatersample\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_informationnorm"
query = "SELECT * FROM \"hy_informationnorm\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_information"
query = "SELECT * FROM \"hy_information\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_itemdetail"
query = "SELECT * FROM \"hy_itemdetail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_laboratoryinstrument"
query = "SELECT * FROM \"hy_laboratoryinstrument\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_materialanalysis_type"
query = "SELECT * FROM \"hy_materialanalysis_type\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_materialdetail"
query = "SELECT * FROM \"hy_materialdetail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_instrument"
query = "SELECT * FROM \"hy_instrument\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_norm"
query = "SELECT * FROM \"hy_norm\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_sample_collection_detail"
query = "SELECT * FROM \"hy_sample_collection_detail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_weight_input"
query = "SELECT * FROM \"hy_weight_input\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_warmhumid"
query = "SELECT * FROM \"hy_warmhumid\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_task"
query = "SELECT * FROM \"hy_task\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_spotcheck"
query = "SELECT * FROM \"hy_spotcheck\""
incremental = false
key_field = "UpdateTime"

+ 121
- 37
tcp_client/src/db.rs Целия файл

@@ -1,52 +1,136 @@
use tokio_postgres::{NoTls, Error};

pub struct DbConfig {
host: String,
port: u16,
dbname: String,
user: String,
password: String,
use tokio_postgres::{NoTls, Error, Row};
use serde::Deserialize;
use std::collections::HashMap;
use chrono::{DateTime, Utc, NaiveDateTime};

#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
pub host: String,
pub port: u16,
pub name: String,
pub user: String,
pub password: String,
}

impl DbConfig {
pub fn new() -> Self {
Self {
host: String::from("10.180.4.100"),
port: 5432,
dbname: String::from("Auseft_RL_Web"),
user: String::from("postgres"),
password: String::from("Auseft@2025qwer"),
}
}
#[derive(Debug, Deserialize, Clone)]
pub struct TableConfig {
pub name: String,
pub query: String,
pub incremental: bool,
pub key_field: String,
}

impl DatabaseConfig {
pub fn connection_string(&self) -> String {
format!(
"host={} port={} dbname={} user={} password={}",
self.host, self.port, self.dbname, self.user, self.password
self.host, self.port, self.name, self.user, self.password
)
}
}

pub async fn connect_db() -> Result<tokio_postgres::Client, Error> {
let config = DbConfig::new();
let connection_string = config.connection_string();
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
pub struct Database {
client: tokio_postgres::Client,
last_sync_times: HashMap<String, DateTime<Utc>>,
}

impl Database {
pub async fn connect(config: &DatabaseConfig) -> Result<Self, Error> {
let connection_string = config.connection_string();
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("数据库连接错误: {}", e);
}
});

Ok(Self {
client,
last_sync_times: HashMap::new(),
})
}

pub async fn execute_table_query(&self, table: &TableConfig, last_sync: Option<DateTime<Utc>>) -> Result<Vec<Row>, Error> {
let query = &table.query;
let rows = if table.incremental {
let last_sync = last_sync.unwrap_or_else(|| DateTime::<Utc>::from_naive_utc_and_offset(NaiveDateTime::default(), Utc));
self.client.query(query, &[&last_sync]).await?
} else {
self.client.query(query, &[]).await?
};
Ok(rows)
}

pub async fn get_table_data(&mut self, table: &TableConfig) -> Result<Vec<Row>, Error> {
let last_sync = self.last_sync_times.get(&table.name).cloned();
let rows = self.execute_table_query(table, last_sync).await?;
if table.incremental && !rows.is_empty() {
// 找到key_field对应的列索引
if let Some(first_row) = rows.first() {
if let Some(col_idx) = first_row.columns().iter().position(|col| col.name() == table.key_field) {
if let Some(max_time) = rows.iter()
.filter_map(|row| row.try_get::<_, DateTime<Utc>>(col_idx).ok())
.max() {
self.last_sync_times.insert(table.name.clone(), max_time);
}
}
}
}
});
Ok(rows)
}

Ok(client)
pub async fn test_connection(&self) -> Result<bool, Error> {
let rows = self.client.query("SELECT 1", &[]).await?;
Ok(!rows.is_empty())
}
}

// Example function to test the connection
pub async fn test_connection() -> Result<bool, Error> {
let client = connect_db().await?;
let rows = client.query("SELECT 1", &[]).await?;
Ok(!rows.is_empty())
pub fn format_row_as_json(row: &Row) -> serde_json::Value {
let mut map = serde_json::Map::new();
for (i, column) in row.columns().iter().enumerate() {
let name = column.name();
let value = match column.type_().name() {
"int4" => serde_json::Value::Number(serde_json::Number::from(
row.try_get::<_, i32>(i).unwrap_or_default()
)),
"int8" => serde_json::Value::Number(serde_json::Number::from(
row.try_get::<_, i64>(i).unwrap_or_default()
)),
"float4" => {
let val: Option<f32> = row.try_get(i).ok();
match val.and_then(|v| serde_json::Number::from_f64(v as f64)) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::Null,
}
},
"float8" => {
let val: Option<f64> = row.try_get(i).ok();
match val.and_then(|v| serde_json::Number::from_f64(v)) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::Null,
}
},
"text" | "varchar" => serde_json::Value::String(
row.try_get::<_, String>(i).unwrap_or_default()
),
"bool" => serde_json::Value::Bool(
row.try_get::<_, bool>(i).unwrap_or_default()
),
"timestamptz" => {
match row.try_get::<_, DateTime<Utc>>(i) {
Ok(dt) => serde_json::Value::String(dt.to_rfc3339()),
Err(_) => serde_json::Value::Null,
}
},
_ => serde_json::Value::Null,
};
map.insert(name.to_string(), value);
}
serde_json::Value::Object(map)
}

+ 42
- 4
tcp_client/src/main.rs Целия файл

@@ -6,6 +6,7 @@ use std::thread;
use std::time::Duration;
use serde::Deserialize;
use config::Config;
use db::{Database, DatabaseConfig, TableConfig};

#[derive(Debug, Deserialize)]
struct ServerConfig {
@@ -25,6 +26,8 @@ struct ClientConfig {
struct Settings {
server: ServerConfig,
client: ClientConfig,
database: DatabaseConfig,
tables: Vec<TableConfig>,
}

fn load_config() -> Result<Settings, config::ConfigError> {
@@ -46,28 +49,63 @@ async fn main() {
}
};

// 连接数据库
let mut db = match Database::connect(&config.database).await {
Ok(db) => {
println!("数据库连接成功!");
db
}
Err(e) => {
println!("数据库连接失败: {}", e);
return;
}
};

// 测试数据库连接
match db.test_connection().await {
Ok(true) => println!("数据库连接测试成功"),
Ok(false) => println!("数据库连接测试失败"),
Err(e) => println!("数据库测试错误: {}", e),
}

// 获取配置的表数据
for table in &config.tables {
println!("\n开始获取表 {} 的数据...", table.name);
match db.get_table_data(table).await {
Ok(rows) => {
println!("成功获取 {} 条记录", rows.len());
// 将每行数据转换为JSON
for row in rows {
let json_data = db::format_row_as_json(&row);
println!("数据: {}", serde_json::to_string_pretty(&json_data).unwrap());
}
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
}
}


return;
let mut retry_count = 0;
let server_addr = format!("{}:{}", config.server.host, config.server.port);

// TCP连接部分
while retry_count < config.client.max_retries {
println!("尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
println!("成功连接到服务器!");
// 设置读写超时
stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap();

// 发送数据
let msg = "HelloOK";
match stream.write(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => println!("发送消息失败: {}", e),
}

// 接收数据
let mut buffer = [0; 1];
match stream.read(&mut buffer) {
Ok(n) => {


Loading…
Отказ
Запис