diff --git a/tcp_client/config.toml b/tcp_client/config.toml index aca9148..29be5d1 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -22,7 +22,7 @@ password = "Auseft@2025qwer" # 要同步的表配置 [[tables]] -name = "erp_shipping_details" -query = "SELECT id, plan_id, vehicle_id, vehicle_number, gross_weight::text as gross_weight, tare_weight::text as tare_weight, net_weight::text as net_weight, card_number, appointment_number, appointment_time, appointment_weight::text as appointment_weight, is_cancel, is_end, driver_phone, remarks, create_time, create_by, update_time, update_by FROM public.erp_shipping_details" +name = "erp_total_shipping_plan" +query = "SELECT id, order_number, start_time, end_time, finish, ship_name, voyage, total_quantity::text as total_quantity, deviation_quantity::text as deviation_quantity, supplier_code, supplier_name, material_code, material_name, shipping_station_code, shipping_station_name, departure_port, departure_time, port_destination, berthing_time, supplier_id, material_id FROM public.erp_total_shipping_plan" incremental = false -key_field = "UpdateTime" +key_field = "UpdateTime" \ No newline at end of file diff --git a/tcp_client/src/db.rs b/tcp_client/src/db.rs index 4aa1822..9894a7c 100644 --- a/tcp_client/src/db.rs +++ b/tcp_client/src/db.rs @@ -1,7 +1,7 @@ use tokio_postgres::{NoTls, Error, Row}; use serde::Deserialize; use std::collections::HashMap; -use chrono::{DateTime, Utc, NaiveDateTime}; +use chrono::{DateTime, Utc, NaiveDateTime, NaiveDate}; use bigdecimal::BigDecimal; #[derive(Debug, Deserialize, Clone)] @@ -147,7 +147,7 @@ pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value { Ok(Some(val)) => serde_json::Value::String(if val { "1".to_string() } else { "0".to_string() }), _ => serde_json::Value::Null }, - "timestamp" | "timestamptz" | "date" => { + "timestamp" | "timestamptz" => { match row.try_get::<_, Option>(i) { Ok(Some(dt)) => { let formatted = dt.format("%Y-%m-%dT%H:%M:%S").to_string(); @@ -156,6 +156,15 @@ pub fn format_row_as_json(row: &Row, table_name: &str) -> serde_json::Value { _ => serde_json::Value::Null } }, + "date" => { + match row.try_get::<_, Option>(i) { + Ok(Some(dt)) => { + let formatted = dt.format("%Y-%m-%d").to_string(); + serde_json::Value::String(formatted) + }, + _ => serde_json::Value::Null + } + }, _ => serde_json::Value::Null }; data_map.insert(name.to_string(), value); diff --git a/tcp_server/src/main.rs b/tcp_server/src/main.rs index 05e2589..48b5225 100644 --- a/tcp_server/src/main.rs +++ b/tcp_server/src/main.rs @@ -1,8 +1,9 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use config::Config; -use tokio_postgres::{NoTls, Error as PgError}; +use tokio_postgres::{Error as PgError, NoTls}; use serde::{Deserialize, Serialize, Deserializer}; +use std::str::FromStr; fn deserialize_string_to_bool<'de, D>(deserializer: D) -> Result where @@ -570,6 +571,37 @@ struct ErpShippingPlan { warehouse_name: Option } +#[derive(Debug, Deserialize)] +struct ErpTotalShippingPlan { + id: i32, + order_number: String, + #[serde(deserialize_with = "deserialize_option_date")] + start_time: Option, + #[serde(deserialize_with = "deserialize_option_date")] + end_time: Option, + finish: Option, + ship_name: Option, + voyage: Option, + #[serde(deserialize_with = "deserialize_option_decimal")] + total_quantity: Option, + #[serde(deserialize_with = "deserialize_option_decimal")] + deviation_quantity: Option, + supplier_code: Option, + supplier_name: Option, + material_code: Option, + material_name: Option, + shipping_station_code: Option, + shipping_station_name: Option, + departure_port: Option, + #[serde(deserialize_with = "deserialize_option_date")] + departure_time: Option, + port_destination: Option, + #[serde(deserialize_with = "deserialize_option_date")] + berthing_time: Option, + supplier_id: Option, + material_id: Option +} + #[derive(Debug, Deserialize)] struct ErpShippingDetails { id: i64, @@ -1004,7 +1036,7 @@ async fn insert_erp_shipping_plan(client: &tokio_postgres::Client, info: &ErpShi // Insert new record, note that id is GENERATED ALWAYS AS IDENTITY client.execute( "INSERT INTO public.erp_shipping_plan ( - cust_id, material_id, plan_type, plan_start_date, + id, cust_id, material_id, plan_type, plan_start_date, plan_end_date, plan_days, plan_quantity, through_put, calorific_value, sulfur_content, transport_vehicles, sample_code, batch_code, update_by, update_time, @@ -1016,7 +1048,7 @@ async fn insert_erp_shipping_plan(client: &tokio_postgres::Client, info: &ErpShi $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30)", &[ - &info.cust_id, &info.material_id, &info.plan_type, + &info.id, &info.cust_id, &info.material_id, &info.plan_type, &info.plan_start_date, &info.plan_end_date, &info.plan_days, &info.plan_quantity, &info.through_put, &info.calorific_value, &info.sulfur_content, &info.transport_vehicles, &info.sample_code, @@ -1031,12 +1063,100 @@ async fn insert_erp_shipping_plan(client: &tokio_postgres::Client, info: &ErpShi Ok(()) } +fn deserialize_option_date<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: Option = Option::deserialize(deserializer)?; + match s { + Some(s) if s.is_empty() => Ok(None), + Some(s) => NaiveDate::parse_from_str(&s, "%Y-%m-%d") + .map(Some) + .map_err(serde::de::Error::custom), + None => Ok(None), + } +} + +fn deserialize_option_decimal<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: Option = Option::deserialize(deserializer)?; + match s { + Some(s) if s.is_empty() => Ok(None), + Some(s) => Decimal::from_str(&s) + .map(Some) + .map_err(serde::de::Error::custom), + None => Ok(None), + } +} + +async fn insert_erp_total_shipping_plan(client: &tokio_postgres::Client, info: &ErpTotalShippingPlan) -> Result<(), PgError> { + // Check if record exists using order_number + let exists = client + .query_one( + "SELECT COUNT(*) FROM public.erp_total_shipping_plan WHERE id = $1", + &[&info.id] + ) + .await? + .get::<_, i64>(0) > 0; + + if exists { + // Update existing record + client.execute( + "UPDATE public.erp_total_shipping_plan SET + start_time = $1, end_time = $2, finish = $3, ship_name = $4, + voyage = $5, total_quantity = $6, deviation_quantity = $7, + supplier_code = $8, supplier_name = $9, material_code = $10, + material_name = $11, shipping_station_code = $12, + shipping_station_name = $13, departure_port = $14, + departure_time = $15, port_destination = $16, berthing_time = $17, + supplier_id = $18, material_id = $19,order_number = $20 + WHERE id = $21", + &[ + &info.start_time, &info.end_time, &info.finish, &info.ship_name, + &info.voyage, &info.total_quantity, &info.deviation_quantity, + &info.supplier_code, &info.supplier_name, &info.material_code, + &info.material_name, &info.shipping_station_code, + &info.shipping_station_name, &info.departure_port, + &info.departure_time, &info.port_destination, &info.berthing_time, + &info.supplier_id, &info.material_id, &info.order_number, + &info.id + ], + ).await?; + } else { + // Insert new record + client.execute( + "INSERT INTO public.erp_total_shipping_plan (id, + order_number, start_time, end_time, finish, ship_name, + voyage, total_quantity, deviation_quantity, supplier_code, + supplier_name, material_code, material_name, + shipping_station_code, shipping_station_name, departure_port, + departure_time, port_destination, berthing_time, + supplier_id, material_id + ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, + $13, $14, $15, $16, $17, $18, $19, $20, $21)", + &[ + &info.id, + &info.order_number, &info.start_time, &info.end_time, &info.finish, + &info.ship_name, &info.voyage, &info.total_quantity, + &info.deviation_quantity, &info.supplier_code, &info.supplier_name, + &info.material_code, &info.material_name, &info.shipping_station_code, + &info.shipping_station_name, &info.departure_port, &info.departure_time, + &info.port_destination, &info.berthing_time, &info.supplier_id, + &info.material_id + ], + ).await?; + } + Ok(()) +} + async fn insert_erp_shipping_details(client: &tokio_postgres::Client, info: &ErpShippingDetails) -> Result<(), PgError> { // Check if record exists using plan_id and vehicle_number let exists = client .query_one( - "SELECT COUNT(*) FROM public.erp_shipping_details WHERE plan_id = $1 ", - &[&info.plan_id] + "SELECT COUNT(*) FROM public.erp_shipping_details WHERE id=$1", + &[&info.id] ) .await? .get::<_, i64>(0) > 0; @@ -1050,35 +1170,36 @@ async fn insert_erp_shipping_details(client: &tokio_postgres::Client, info: &Erp appointment_time = $7, appointment_weight = $8, is_cancel = $9, is_end = $10, driver_phone = $11, remarks = $12, create_time = $13, create_by = $14, update_time = $15, - update_by = $16, vehicle_number = $18,plan_id = $17 - WHERE id = $19 ", + update_by = $16,plan_id = $17 AND vehicle_number = $18 + WHERE id = $19", &[ &info.vehicle_id, &info.gross_weight, &info.tare_weight, &info.net_weight, &info.card_number, &info.appointment_number, &info.appointment_time, &info.appointment_weight, &info.is_cancel, &info.is_end, &info.driver_phone, &info.remarks, &info.create_time, &info.create_by, &info.update_time, - &info.update_by, &info.plan_id, &info.vehicle_number,&info.id + &info.update_by, &info.plan_id, &info.vehicle_number, + &info.id ], ).await?; } else { // Insert new record client.execute( - "INSERT INTO public.erp_shipping_details ( + "INSERT INTO public.erp_shipping_details (id, plan_id, vehicle_id, vehicle_number, gross_weight, tare_weight, net_weight, card_number, appointment_number, appointment_time, appointment_weight, is_cancel, is_end, driver_phone, remarks, create_time, create_by, - update_time, update_by,id - ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, - $13, $14, $15, $16, $17, $18,$19)", + update_time, update_by + ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, + $13, $14, $15, $16, $17, $18)", &[ - &info.plan_id, &info.vehicle_id, &info.vehicle_number, + &info.id, &info.plan_id, &info.vehicle_id, &info.vehicle_number, &info.gross_weight, &info.tare_weight, &info.net_weight, &info.card_number, &info.appointment_number, &info.appointment_time, &info.appointment_weight, &info.is_cancel, &info.is_end, &info.driver_phone, &info.remarks, &info.create_time, - &info.create_by, &info.update_time, &info.update_by, &info.id + &info.create_by, &info.update_time, &info.update_by ], ).await?; } @@ -1661,7 +1782,7 @@ async fn insert_comm_sp_delivering_sampling(client: &tokio_postgres::Client, inf sampling_weight, sampling_weights, sampling_remark, sampler, sample_time, creator_id, creation_time, last_modifier_id, last_modification_reason, last_modification_time, deleted, deleter_id, deletion_reason, deletion_time - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, + ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36)", &[ @@ -1735,7 +1856,7 @@ async fn insert_comm_bch_delivering_sampling(client: &tokio_postgres::Client, in sampling_weight, sampling_weights, sampling_remark, sampler, sample_time, creator_id, creation_time, last_modifier_id, last_modification_reason, last_modification_time, deleted, deleter_id, deletion_reason - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, + ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35)", &[ @@ -2359,6 +2480,21 @@ async fn handle_client(socket: &mut TcpStream, client: &tokio_postgres::Client) false } }, + "erp_total_shipping_plan" => { + if let Ok(info) = serde_json::from_str::(data_str) { + println!("接收到总发运计划信息: {:?}", info); + match insert_erp_total_shipping_plan(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入总发运计划信息失败: {}", e); + false + } + } + } else { + eprintln!("解析总发运计划信息失败"); + false + } + }, _ => { eprintln!("未知的表名: {}", table_name); false diff --git a/tcp_server/target/debug/tcp_server.exe b/tcp_server/target/debug/tcp_server.exe index e0d7520..1eb3874 100644 Binary files a/tcp_server/target/debug/tcp_server.exe and b/tcp_server/target/debug/tcp_server.exe differ diff --git a/tcp_server/target/debug/tcp_server.pdb b/tcp_server/target/debug/tcp_server.pdb index 4a7c74e..9260091 100644 Binary files a/tcp_server/target/debug/tcp_server.pdb and b/tcp_server/target/debug/tcp_server.pdb differ