Selaa lähdekoodia

服务端更新以及发送端原数据发送

晋江
OCEAN 2 kuukautta sitten
vanhempi
commit
a6d93365d7
6 muutettua tiedostoa jossa 237 lisäystä ja 195 poistoa
  1. +1
    -101
      tcp_client/config.toml
  2. +98
    -28
      tcp_client/src/db.rs
  3. +2
    -1
      tcp_server/config.toml
  4. +136
    -65
      tcp_server/src/main.rs
  5. BIN
      tcp_server/target/debug/tcp_server.exe
  6. BIN
      tcp_server/target/debug/tcp_server.pdb

+ 1
- 101
tcp_client/config.toml Näytä tiedosto

@@ -18,108 +18,8 @@ user = "postgres"
password = "Auseft@2025qwer"

# 要同步的表配置

[[tables]]
name = "hy_instrument"
query = "SELECT * FROM \"hy_instrument\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_information"
query = "SELECT * FROM \"hy_information\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_record"
query = "SELECT * FROM \"hy_record\""
query = "SELECT * FROM hy_record where hy_code= 'R25041402C'"
incremental = false # 是否增量同步
key_field = "UpdateTime" # 增量同步的时间字段




[[tables]]
name = "hy_allot"
query = "SELECT * FROM \"hy_allot\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_cytask"
query = "SELECT * FROM \"hy_cytask\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_fullwatersample"
query = "SELECT * FROM \"hy_fullwatersample\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_informationnorm"
query = "SELECT * FROM \"hy_informationnorm\""
incremental = false
key_field = "UpdateTime"



[[tables]]
name = "hy_itemdetail"
query = "SELECT * FROM \"hy_itemdetail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_laboratoryinstrument"
query = "SELECT * FROM \"hy_laboratoryinstrument\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_materialanalysis_type"
query = "SELECT * FROM \"hy_materialanalysis_type\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_materialdetail"
query = "SELECT * FROM \"hy_materialdetail\""
incremental = false
key_field = "UpdateTime"


[[tables]]
name = "hy_norm"
query = "SELECT * FROM \"hy_norm\""
incremental = false
key_field = "UpdateTime"


[[tables]]
name = "hy_weight_input"
query = "SELECT * FROM \"hy_weight_input\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_warmhumid"
query = "SELECT * FROM \"hy_warmhumid\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_task"
query = "SELECT * FROM \"hy_task\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_spotcheck"
query = "SELECT * FROM \"hy_spotcheck\""
incremental = false
key_field = "UpdateTime"



+ 98
- 28
tcp_client/src/db.rs Näytä tiedosto

@@ -97,40 +97,110 @@ pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value {
for (i, column) in row.columns().iter().enumerate() {
let name = column.name();
let value = match column.type_().name() {
"int4" => serde_json::Value::Number(serde_json::Number::from(
row.try_get::<_, i32>(i).unwrap_or_default()
)),
"int8" => serde_json::Value::Number(serde_json::Number::from(
row.try_get::<_, i64>(i).unwrap_or_default()
)),
"float4" => {
let val: Option<f32> = row.try_get(i).ok();
match val.and_then(|v| serde_json::Number::from_f64(v as f64)) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::Null,
let type_name = column.type_().name();
println!("字段 {} 的类型是: {}", name, type_name);
let value = match type_name {
"int2" | "int4" | "int8" => {
match row.try_get::<_, Option<i64>>(i) {
Ok(Some(val)) => {
println!("成功读取整数: {} = {}", name, val);
serde_json::Value::Number(serde_json::Number::from(val))
},
Ok(None) => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
},
Err(e) => {
println!("读取整数失败 {}: {}", name, e);
// 尝试其他整数类型
match row.try_get::<_, Option<i32>>(i) {
Ok(Some(val)) => {
println!("成功读取 i32: {} = {}", name, val);
serde_json::Value::Number(serde_json::Number::from(val))
},
Ok(None) => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
},
Err(_) => {
match row.try_get::<_, Option<i16>>(i) {
Ok(Some(val)) => {
println!("成功读取 i16: {} = {}", name, val);
serde_json::Value::Number(serde_json::Number::from(val))
},
Ok(None) => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
},
Err(_) => serde_json::Value::Null
}
}
}
}
}
},
"float8" => {
let val: Option<f64> = row.try_get(i).ok();
match val.and_then(|v| serde_json::Number::from_f64(v)) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::Null,
"numeric" | "float4" | "float8" => {
match row.try_get::<_, Option<f64>>(i) {
Ok(Some(val)) => {
println!("成功读取浮点数: {} = {}", name, val);
match serde_json::Number::from_f64(val) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::Null
}
},
Ok(None) => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
},
Err(e) => {
println!("读取浮点数失败 {}: {}", name, e);
serde_json::Value::Null
}
}
},
"text" | "varchar" => serde_json::Value::String(
row.try_get::<_, String>(i).unwrap_or_default()
),
"bool" => serde_json::Value::Bool(
row.try_get::<_, bool>(i).unwrap_or_default()
),
"timestamptz" => {
match row.try_get::<_, DateTime<Utc>>(i) {
Ok(dt) => serde_json::Value::String(dt.to_rfc3339()),
Err(_) => serde_json::Value::Null,
"text" | "varchar" => {
match row.try_get::<_, Option<String>>(i) {
Ok(Some(val)) => {
println!("成功读取字符串: {} = {}", name, val);
serde_json::Value::String(val)
},
_ => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
}
}
},
_ => serde_json::Value::Null,
"bool" => {
match row.try_get::<_, Option<bool>>(i) {
Ok(Some(val)) => {
println!("成功读取布尔值: {} = {}", name, val);
serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() })
},
_ => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
}
}
},
"timestamp" | "timestamptz" | "date" => {
println!("处理时间字段: {}", name);
match row.try_get::<_, Option<NaiveDateTime>>(i) {
Ok(Some(dt)) => {
let formatted = dt.format("%Y-%m-%d %H:%M:%S").to_string();
println!("成功读取本地时间: {} = {}", name, formatted);
serde_json::Value::String(formatted)
},
_ => {
println!("字段 {} 为 null", name);
serde_json::Value::Null
}
}
},
_ => {
println!("未知类型字段: {} (类型: {})", name, type_name);
serde_json::Value::Null
}
};
map.insert(name.to_string(), value);
}


+ 2
- 1
tcp_server/config.toml Näytä tiedosto

@@ -3,7 +3,8 @@ address = "127.0.0.1"
port = 9090

[database]
host = "10.180.4.100"
#host = "10.180.4.100"
host = "192.168.0.100"
port = 5432
name = "Auseft_RL_WEB_3"
user = "postgres"


+ 136
- 65
tcp_server/src/main.rs Näytä tiedosto

@@ -4,7 +4,7 @@ use config::Config;
use tokio_postgres::{NoTls, Error as PgError};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use chrono::{DateTime, Utc, NaiveDateTime, NaiveDate};
use chrono::{NaiveDateTime, NaiveDate};
use std::error::Error;
use rust_decimal::Decimal;

@@ -518,7 +518,6 @@ async fn connect_db(config: &Config) -> Result<tokio_postgres::Client, PgError>
}

async fn insert_instrument(client: &tokio_postgres::Client, info: &InstrumentInfo) -> Result<(), PgError> {
// 先检查 ID 是否存在
let exists = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM public.hy_instrument WHERE id = $1)",
@@ -528,33 +527,35 @@ async fn insert_instrument(client: &tokio_postgres::Client, info: &InstrumentInf
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_instrument
SET laboratoryid = $2, name = $3, instrumentcode = $4, informationid = $5, specification = $6, remark = $7
WHERE id = $1",
&[
&info.id, &info.laboratoryid, &info.name, &info.instrumentcode,
&info.informationid, &info.specification, &info.remark,
],
).await?;
println!("成功更新仪器信息: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_instrument (id, informationid, instrumentcode, laboratoryid, name, remark, specification)
OVERRIDING SYSTEM VALUE
"INSERT INTO public.hy_instrument (id, laboratoryid, name, instrumentcode, informationid, specification, remark)
VALUES ($1, $2, $3, $4, $5, $6, $7)",
&[
&info.id,
&info.informationid,
&info.instrumentcode,
&info.laboratoryid,
&info.name,
&info.remark,
&info.specification,
&info.id, &info.laboratoryid, &info.name, &info.instrumentcode,
&info.informationid, &info.specification, &info.remark,
],
)
.await?;
).await?;

println!("成功插入仪器信息: {} (ID: {})", info.instrumentcode, info.id);
println!("成功插入仪器信息: ID {}", info.id);
Ok(())
}

async fn insert_information(client: &tokio_postgres::Client, info: &InformationInfo) -> Result<(), PgError> {
// 先检查 ID 是否存在
let exists = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM public.hy_information WHERE id = $1)",
@@ -564,31 +565,35 @@ async fn insert_information(client: &tokio_postgres::Client, info: &InformationI
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_information
SET hy_name = $2, flag = $3, laboratory_id = $4, analysistypeid = $5
WHERE id = $1",
&[
&info.id, &info.hy_name, &info.flag, &info.laboratory_id,
&info.analysistypeid,
],
).await?;
println!("成功更新分析信息: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_information (id, hy_name, flag, laboratory_id, analysistypeid)
OVERRIDING SYSTEM VALUE
VALUES ($1, $2, $3, $4, $5)",
&[
&info.id,
&info.hy_name,
&info.flag,
&info.laboratory_id,
&info.id, &info.hy_name, &info.flag, &info.laboratory_id,
&info.analysistypeid,
],
)
.await?;
).await?;

println!("成功插入信息记录: ID {}", info.id);
println!("成功插入分析信息: ID {}", info.id);
Ok(())
}

async fn insert_record(client: &tokio_postgres::Client, info: &RecordInfo) -> Result<(), PgError> {
// 先检查 ID 是否存在
let exists = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM public.hy_record WHERE id = $1)",
@@ -598,21 +603,48 @@ async fn insert_record(client: &tokio_postgres::Client, info: &RecordInfo) -> Re
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_record SET
hy_code = $2, type = $3, hy_check = $4, hy_approve = $5, check_time = $6,
approve_time = $7, approve_user = $8, check_user = $9, hy_time = $10,
hy_values = $11, accept_time = $12, accept_user = $13, mt = $14,
mad = $15, aad = $16, ad = $17, vad = $18, vd = $19, var = $20,
vdaf = $21, fcad = $22, st_ar = $23, st_ad = $24, st_d = $25,
had = $26, hd = $27, qb_ad = $28, qgr_ad = $29, qgr_d = $30,
qnet_ar_mj_kg = $31, qnet_ar_j_cal = $32, v = $33, aar = $34,
qnet_ar = $35, qnet_ar1 = $36, crc = $37, st_daf = $38, cad = $39,
cd = $40, isauto = $41, hy_type = $42, isnormal = $43
WHERE id = $1",
&[
&info.id, &info.hy_code, &info.r#type, &info.hy_check, &info.hy_approve,
&info.check_time, &info.approve_time, &info.approve_user, &info.check_user,
&info.hy_time, &info.hy_values, &info.accept_time, &info.accept_user,
&info.mt, &info.mad, &info.aad, &info.ad, &info.vad, &info.vd,
&info.var, &info.vdaf, &info.fcad, &info.st_ar, &info.st_ad,
&info.st_d, &info.had, &info.hd, &info.qb_ad, &info.qgr_ad,
&info.qgr_d, &info.qnet_ar_mj_kg, &info.qnet_ar_j_cal, &info.v,
&info.aar, &info.qnet_ar, &info.qnet_ar1, &info.crc, &info.st_daf,
&info.cad, &info.cd, &info.isauto, &info.hy_type, &info.isnormal
],
).await?;
println!("成功更新记录信息: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_record (
id, hy_code, type, hy_check, hy_approve, check_time, approve_time,
approve_user, check_user, hy_time, hy_values, accept_time, accept_user,
mt, mad, aad, ad, vad, vd, var, vdaf, fcad, st_ar, st_ad, st_d,
had, hd, qb_ad, qgr_ad, qgr_d, qnet_ar_mj_kg, qnet_ar_j_cal, v,
aar, qnet_ar, qnet_ar1, crc, st_daf, cad, cd, isauto, hy_type, isnormal
)
OVERRIDING SYSTEM VALUE
VALUES (
id, hy_code, type, hy_check, hy_approve, check_time,
approve_time, approve_user, check_user, hy_time,
hy_values, accept_time, accept_user, mt,
mad, aad, ad, vad, vd, var,
vdaf, fcad, st_ar, st_ad, st_d,
had, hd, qb_ad, qgr_ad, qgr_d,
qnet_ar_mj_kg, qnet_ar_j_cal, v, aar,
qnet_ar, qnet_ar1, crc, st_daf, cad,
cd, isauto, hy_type, isnormal
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
$15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26,
$27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38,
@@ -622,17 +654,16 @@ async fn insert_record(client: &tokio_postgres::Client, info: &RecordInfo) -> Re
&info.id, &info.hy_code, &info.r#type, &info.hy_check, &info.hy_approve,
&info.check_time, &info.approve_time, &info.approve_user, &info.check_user,
&info.hy_time, &info.hy_values, &info.accept_time, &info.accept_user,
&info.mt, &info.mad, &info.aad, &info.ad, &info.vad, &info.vd, &info.var,
&info.vdaf, &info.fcad, &info.st_ar, &info.st_ad, &info.st_d, &info.had,
&info.hd, &info.qb_ad, &info.qgr_ad, &info.qgr_d, &info.qnet_ar_mj_kg,
&info.qnet_ar_j_cal, &info.v, &info.aar, &info.qnet_ar, &info.qnet_ar1,
&info.crc, &info.st_daf, &info.cad, &info.cd, &info.isauto, &info.hy_type,
&info.isnormal
&info.mt, &info.mad, &info.aad, &info.ad, &info.vad, &info.vd,
&info.var, &info.vdaf, &info.fcad, &info.st_ar, &info.st_ad,
&info.st_d, &info.had, &info.hd, &info.qb_ad, &info.qgr_ad,
&info.qgr_d, &info.qnet_ar_mj_kg, &info.qnet_ar_j_cal, &info.v,
&info.aar, &info.qnet_ar, &info.qnet_ar1, &info.crc, &info.st_daf,
&info.cad, &info.cd, &info.isauto, &info.hy_type, &info.isnormal
],
)
.await?;
).await?;

println!("成功插入记录: ID {}", info.id);
println!("成功插入记录信息: ID {}", info.id);
Ok(())
}

@@ -646,22 +677,31 @@ async fn insert_cytask(client: &tokio_postgres::Client, info: &HyCyTask) -> Resu
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_cytask
SET task_code = $2, task_type = $3, task_status = $4, create_time = $5, create_user = $6, remark = $7
WHERE id = $1",
&[
&info.id, &info.task_code, &info.task_type, &info.task_status,
&info.create_time, &info.create_user, &info.remark,
],
).await?;
println!("成功更新采样任务信息: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_cytask (id, task_code, task_type, task_status, create_time, create_user, remark)
OVERRIDING SYSTEM VALUE
VALUES ($1, $2, $3, $4, $5, $6, $7)",
&[
&info.id, &info.task_code, &info.task_type, &info.task_status,
&info.create_time, &info.create_user, &info.remark,
],
)
.await?;
).await?;

println!("成功插入采样任务: ID {}", info.id);
println!("成功插入采样任务信息: ID {}", info.id);
Ok(())
}

@@ -675,20 +715,29 @@ async fn insert_allot(client: &tokio_postgres::Client, info: &HyAllot) -> Result
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_allot
SET userid = $2, username = $3, informationid = $4, allottime = $5, hy_code = $6, hy_type = $7, hy_method = $8, hy_quest = $9
WHERE id = $1",
&[
&info.id, &info.userid, &info.username, &info.informationid, &info.allottime,
&info.hy_code, &info.hy_type, &info.hy_method, &info.hy_quest,
],
).await?;
println!("成功更新分配信息: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_allot (id, userid, username, informationid, allottime, hy_code, hy_type, hy_method, hy_quest)
OVERRIDING SYSTEM VALUE
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
&[
&info.id, &info.userid, &info.username, &info.informationid, &info.allottime,
&info.hy_code, &info.hy_type, &info.hy_method, &info.hy_quest,
],
)
.await?;
).await?;

println!("成功插入分配信息: ID {}", info.id);
Ok(())
@@ -793,8 +842,8 @@ async fn insert_spotcheck(client: &tokio_postgres::Client, info: &HySpotcheck) -
st_ad, st_d, qb_ad, had, qnet_ar, qnet_ar1, qgr_d, qgr_ad, vd,
aar, st_ar, hd, fcad, crc, st_daf
) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
$15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27,
$28, $29, $30, $31, $32, $33)",
$15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26,
$27, $28, $29, $30, $31, $32, $33)",
&[
&info.id, &info.spotcheck_code, &info.spotcheck_user, &info.spotcheck_time,
&info.spotcheck_type, &info.coal_sample_code, &info.sample_custodian,
@@ -969,13 +1018,21 @@ async fn insert_material_analysis_type(client: &tokio_postgres::Client, info: &H
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_materialanalysis_type
SET name = $2, flag = $3, sort = $4, createtime = $5, createuser = $6
WHERE id = $1",
&[&info.id, &info.name, &info.flag, &info.sort, &info.createtime, &info.createuser],
).await?;
println!("成功更新物料分析类型: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_materialanalysis_type (id, name, flag, sort, createtime, createuser)
OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6)",
VALUES ($1, $2, $3, $4, $5, $6)",
&[&info.id, &info.name, &info.flag, &info.sort, &info.createtime, &info.createuser],
).await?;

@@ -993,13 +1050,23 @@ async fn insert_material_detail(client: &tokio_postgres::Client, info: &HyMateri
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_materialdetail
SET name = $2, flag = $3, sort = $4, createtime = $5, createuser = $6, analysistypeid = $7, materialid = $8
WHERE id = $1",
&[
&info.id, &info.name, &info.flag, &info.sort, &info.createtime,
&info.createuser, &info.analysistypeid, &info.materialid
],
).await?;
println!("成功更新物料明细: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_materialdetail (id, name, flag, sort, createtime, createuser, analysistypeid, materialid)
OVERRIDING SYSTEM VALUE
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
&[
&info.id, &info.name, &info.flag, &info.sort, &info.createtime,
@@ -1021,18 +1088,22 @@ async fn insert_norm(client: &tokio_postgres::Client, info: &HyNorm) -> Result<(
.get::<_, bool>(0);

if exists {
println!("ID {} 已存在,跳过插入", info.id);
// 如果记录存在,执行更新操作
client.execute(
"UPDATE public.hy_norm
SET norm_id = $2, zbvalues = $3, itemdetail_id = $4, hy_user = $5, checktime = $6, explain = $7
WHERE id = $1",
&[&info.id, &info.norm_id, &info.zbvalues, &info.itemdetail_id, &info.hy_user, &info.checktime, &info.explain],
).await?;
println!("成功更新化验规范: ID {}", info.id);
return Ok(());
}

// ID 不存在,执行插入
client.execute(
"INSERT INTO public.hy_norm (id, norm_id, zbvalues, itemdetail_id, hy_user, checktime, explain)
OVERRIDING SYSTEM VALUE
VALUES ($1, $2, $3, $4, $5, $6, $7)",
&[
&info.id, &info.norm_id, &info.zbvalues, &info.itemdetail_id,
&info.hy_user, &info.checktime, &info.explain
],
&[&info.id, &info.norm_id, &info.zbvalues, &info.itemdetail_id, &info.hy_user, &info.checktime, &info.explain],
).await?;

println!("成功插入化验规范: ID {}", info.id);


BIN
tcp_server/target/debug/tcp_server.exe Näytä tiedosto


BIN
tcp_server/target/debug/tcp_server.pdb Näytä tiedosto


Loading…
Peruuta
Tallenna