瀏覽代碼

增加erp_total_shipping_plan

晋江
OCEAN 1 月之前
父節點
當前提交
4f99888874
共有 5 個文件被更改,包括 166 次插入21 次删除
  1. +3
    -3
      tcp_client/config.toml
  2. +11
    -2
      tcp_client/src/db.rs
  3. +152
    -16
      tcp_server/src/main.rs
  4. 二進制
      tcp_server/target/debug/tcp_server.exe
  5. 二進制
      tcp_server/target/debug/tcp_server.pdb

+ 3
- 3
tcp_client/config.toml 查看文件

@@ -22,7 +22,7 @@ password = "Auseft@2025qwer"
# 要同步的表配置

[[tables]]
name = "erp_shipping_details"
query = "SELECT id, plan_id, vehicle_id, vehicle_number, gross_weight::text as gross_weight, tare_weight::text as tare_weight, net_weight::text as net_weight, card_number, appointment_number, appointment_time, appointment_weight::text as appointment_weight, is_cancel, is_end, driver_phone, remarks, create_time, create_by, update_time, update_by FROM public.erp_shipping_details"
name = "erp_total_shipping_plan"
query = "SELECT id, order_number, start_time, end_time, finish, ship_name, voyage, total_quantity::text as total_quantity, deviation_quantity::text as deviation_quantity, supplier_code, supplier_name, material_code, material_name, shipping_station_code, shipping_station_name, departure_port, departure_time, port_destination, berthing_time, supplier_id, material_id FROM public.erp_total_shipping_plan"
incremental = false
key_field = "UpdateTime"
key_field = "UpdateTime"

+ 11
- 2
tcp_client/src/db.rs 查看文件

@@ -1,7 +1,7 @@
use tokio_postgres::{NoTls, Error, Row};
use serde::Deserialize;
use std::collections::HashMap;
use chrono::{DateTime, Utc, NaiveDateTime};
use chrono::{DateTime, Utc, NaiveDateTime, NaiveDate};
use bigdecimal::BigDecimal;

#[derive(Debug, Deserialize, Clone)]
@@ -147,7 +147,7 @@ pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value {
Ok(Some(val)) => serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() }),
_ => serde_json::Value::Null
},
"timestamp" | "timestamptz" | "date" => {
"timestamp" | "timestamptz" => {
match row.try_get::<_, Option<NaiveDateTime>>(i) {
Ok(Some(dt)) => {
let formatted = dt.format("%Y-%m-%dT%H:%M:%S").to_string();
@@ -156,6 +156,15 @@ pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value {
_ => serde_json::Value::Null
}
},
"date" => {
match row.try_get::<_, Option<NaiveDate>>(i) {
Ok(Some(dt)) => {
let formatted = dt.format("%Y-%m-%d").to_string();
serde_json::Value::String(formatted)
},
_ => serde_json::Value::Null
}
},
_ => serde_json::Value::Null
};
data_map.insert(name.to_string(), value);


+ 152
- 16
tcp_server/src/main.rs 查看文件

@@ -1,8 +1,9 @@
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use config::Config;
use tokio_postgres::{NoTls, Error as PgError};
use tokio_postgres::{Error as PgError, NoTls};
use serde::{Deserialize, Serialize, Deserializer};
use std::str::FromStr;

fn deserialize_string_to_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
@@ -570,6 +571,37 @@ struct ErpShippingPlan {
warehouse_name: Option<String>
}

#[derive(Debug, Deserialize)]
struct ErpTotalShippingPlan {
id: i32,
order_number: String,
#[serde(deserialize_with = "deserialize_option_date")]
start_time: Option<NaiveDate>,
#[serde(deserialize_with = "deserialize_option_date")]
end_time: Option<NaiveDate>,
finish: Option<i16>,
ship_name: Option<String>,
voyage: Option<String>,
#[serde(deserialize_with = "deserialize_option_decimal")]
total_quantity: Option<Decimal>,
#[serde(deserialize_with = "deserialize_option_decimal")]
deviation_quantity: Option<Decimal>,
supplier_code: Option<String>,
supplier_name: Option<String>,
material_code: Option<String>,
material_name: Option<String>,
shipping_station_code: Option<String>,
shipping_station_name: Option<String>,
departure_port: Option<String>,
#[serde(deserialize_with = "deserialize_option_date")]
departure_time: Option<NaiveDate>,
port_destination: Option<String>,
#[serde(deserialize_with = "deserialize_option_date")]
berthing_time: Option<NaiveDate>,
supplier_id: Option<i32>,
material_id: Option<i32>
}

#[derive(Debug, Deserialize)]
struct ErpShippingDetails {
id: i64,
@@ -1004,7 +1036,7 @@ async fn insert_erp_shipping_plan(client: &tokio_postgres::Client, info: &ErpShi
// Insert new record, note that id is GENERATED ALWAYS AS IDENTITY
client.execute(
"INSERT INTO public.erp_shipping_plan (
cust_id, material_id, plan_type, plan_start_date,
id, cust_id, material_id, plan_type, plan_start_date,
plan_end_date, plan_days, plan_quantity, through_put,
calorific_value, sulfur_content, transport_vehicles,
sample_code, batch_code, update_by, update_time,
@@ -1016,7 +1048,7 @@ async fn insert_erp_shipping_plan(client: &tokio_postgres::Client, info: &ErpShi
$14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24,
$25, $26, $27, $28, $29, $30)",
&[
&info.cust_id, &info.material_id, &info.plan_type,
&info.id, &info.cust_id, &info.material_id, &info.plan_type,
&info.plan_start_date, &info.plan_end_date, &info.plan_days,
&info.plan_quantity, &info.through_put, &info.calorific_value,
&info.sulfur_content, &info.transport_vehicles, &info.sample_code,
@@ -1031,12 +1063,100 @@ async fn insert_erp_shipping_plan(client: &tokio_postgres::Client, info: &ErpShi
Ok(())
}

fn deserialize_option_date<'de, D>(deserializer: D) -> Result<Option<NaiveDate>, D::Error>
where
D: Deserializer<'de>,
{
let s: Option<String> = Option::deserialize(deserializer)?;
match s {
Some(s) if s.is_empty() => Ok(None),
Some(s) => NaiveDate::parse_from_str(&s, "%Y-%m-%d")
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}

fn deserialize_option_decimal<'de, D>(deserializer: D) -> Result<Option<Decimal>, D::Error>
where
D: Deserializer<'de>,
{
let s: Option<String> = Option::deserialize(deserializer)?;
match s {
Some(s) if s.is_empty() => Ok(None),
Some(s) => Decimal::from_str(&s)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}

async fn insert_erp_total_shipping_plan(client: &tokio_postgres::Client, info: &ErpTotalShippingPlan) -> Result<(), PgError> {
// Check if record exists using order_number
let exists = client
.query_one(
"SELECT COUNT(*) FROM public.erp_total_shipping_plan WHERE id = $1",
&[&info.id]
)
.await?
.get::<_, i64>(0) > 0;

if exists {
// Update existing record
client.execute(
"UPDATE public.erp_total_shipping_plan SET
start_time = $1, end_time = $2, finish = $3, ship_name = $4,
voyage = $5, total_quantity = $6, deviation_quantity = $7,
supplier_code = $8, supplier_name = $9, material_code = $10,
material_name = $11, shipping_station_code = $12,
shipping_station_name = $13, departure_port = $14,
departure_time = $15, port_destination = $16, berthing_time = $17,
supplier_id = $18, material_id = $19,order_number = $20
WHERE id = $21",
&[
&info.start_time, &info.end_time, &info.finish, &info.ship_name,
&info.voyage, &info.total_quantity, &info.deviation_quantity,
&info.supplier_code, &info.supplier_name, &info.material_code,
&info.material_name, &info.shipping_station_code,
&info.shipping_station_name, &info.departure_port,
&info.departure_time, &info.port_destination, &info.berthing_time,
&info.supplier_id, &info.material_id, &info.order_number,
&info.id
],
).await?;
} else {
// Insert new record
client.execute(
"INSERT INTO public.erp_total_shipping_plan (id,
order_number, start_time, end_time, finish, ship_name,
voyage, total_quantity, deviation_quantity, supplier_code,
supplier_name, material_code, material_name,
shipping_station_code, shipping_station_name, departure_port,
departure_time, port_destination, berthing_time,
supplier_id, material_id
) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12,
$13, $14, $15, $16, $17, $18, $19, $20, $21)",
&[
&info.id,
&info.order_number, &info.start_time, &info.end_time, &info.finish,
&info.ship_name, &info.voyage, &info.total_quantity,
&info.deviation_quantity, &info.supplier_code, &info.supplier_name,
&info.material_code, &info.material_name, &info.shipping_station_code,
&info.shipping_station_name, &info.departure_port, &info.departure_time,
&info.port_destination, &info.berthing_time, &info.supplier_id,
&info.material_id
],
).await?;
}
Ok(())
}

async fn insert_erp_shipping_details(client: &tokio_postgres::Client, info: &ErpShippingDetails) -> Result<(), PgError> {
// Check if record exists using plan_id and vehicle_number
let exists = client
.query_one(
"SELECT COUNT(*) FROM public.erp_shipping_details WHERE plan_id = $1 ",
&[&info.plan_id]
"SELECT COUNT(*) FROM public.erp_shipping_details WHERE id=$1",
&[&info.id]
)
.await?
.get::<_, i64>(0) > 0;
@@ -1050,35 +1170,36 @@ async fn insert_erp_shipping_details(client: &tokio_postgres::Client, info: &Erp
appointment_time = $7, appointment_weight = $8, is_cancel = $9,
is_end = $10, driver_phone = $11, remarks = $12,
create_time = $13, create_by = $14, update_time = $15,
update_by = $16, vehicle_number = $18,plan_id = $17
WHERE id = $19 ",
update_by = $16,plan_id = $17 AND vehicle_number = $18
WHERE id = $19",
&[
&info.vehicle_id, &info.gross_weight, &info.tare_weight,
&info.net_weight, &info.card_number, &info.appointment_number,
&info.appointment_time, &info.appointment_weight, &info.is_cancel,
&info.is_end, &info.driver_phone, &info.remarks,
&info.create_time, &info.create_by, &info.update_time,
&info.update_by, &info.plan_id, &info.vehicle_number,&info.id
&info.update_by, &info.plan_id, &info.vehicle_number,
&info.id
],
).await?;
} else {
// Insert new record
client.execute(
"INSERT INTO public.erp_shipping_details (
"INSERT INTO public.erp_shipping_details (id,
plan_id, vehicle_id, vehicle_number, gross_weight,
tare_weight, net_weight, card_number, appointment_number,
appointment_time, appointment_weight, is_cancel, is_end,
driver_phone, remarks, create_time, create_by,
update_time, update_by,id
) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12,
$13, $14, $15, $16, $17, $18,$19)",
update_time, update_by
) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12,
$13, $14, $15, $16, $17, $18)",
&[
&info.plan_id, &info.vehicle_id, &info.vehicle_number,
&info.id, &info.plan_id, &info.vehicle_id, &info.vehicle_number,
&info.gross_weight, &info.tare_weight, &info.net_weight,
&info.card_number, &info.appointment_number, &info.appointment_time,
&info.appointment_weight, &info.is_cancel, &info.is_end,
&info.driver_phone, &info.remarks, &info.create_time,
&info.create_by, &info.update_time, &info.update_by, &info.id
&info.create_by, &info.update_time, &info.update_by
],
).await?;
}
@@ -1661,7 +1782,7 @@ async fn insert_comm_sp_delivering_sampling(client: &tokio_postgres::Client, inf
sampling_weight, sampling_weights, sampling_remark, sampler, sample_time,
creator_id, creation_time, last_modifier_id, last_modification_reason,
last_modification_time, deleted, deleter_id, deletion_reason, deletion_time
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
$17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30,
$31, $32, $33, $34, $35, $36)",
&[
@@ -1735,7 +1856,7 @@ async fn insert_comm_bch_delivering_sampling(client: &tokio_postgres::Client, in
sampling_weight, sampling_weights, sampling_remark, sampler, sample_time,
creator_id, creation_time, last_modifier_id, last_modification_reason,
last_modification_time, deleted, deleter_id, deletion_reason
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
$17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30,
$31, $32, $33, $34, $35)",
&[
@@ -2359,6 +2480,21 @@ async fn handle_client(socket: &mut TcpStream, client: &tokio_postgres::Client)
false
}
},
"erp_total_shipping_plan" => {
if let Ok(info) = serde_json::from_str::<ErpTotalShippingPlan>(data_str) {
println!("接收到总发运计划信息: {:?}", info);
match insert_erp_total_shipping_plan(client, &info).await {
Ok(_) => true,
Err(e) => {
eprintln!("插入总发运计划信息失败: {}", e);
false
}
}
} else {
eprintln!("解析总发运计划信息失败");
false
}
},
_ => {
eprintln!("未知的表名: {}", table_name);
false


二進制
tcp_server/target/debug/tcp_server.exe 查看文件


二進制
tcp_server/target/debug/tcp_server.pdb 查看文件


Loading…
取消
儲存