diff --git a/tcp_client/config.toml b/tcp_client/config.toml index e7fad58..8fa73db 100644 --- a/tcp_client/config.toml +++ b/tcp_client/config.toml @@ -4,7 +4,7 @@ host = "127.0.0.1" port = 9090 [client] -max_retries = 3 +max_retries = 2 retry_delay_secs = 3 read_timeout_secs = 5 write_timeout_secs = 5 @@ -17,6 +17,7 @@ user = "postgres" password = "Auseft@2025qwer" # 要同步的表配置 + [[tables]] name = "hy_instrument" query = "SELECT * FROM \"hy_instrument\"" @@ -33,4 +34,91 @@ key_field = "UpdateTime" name = "hy_record" query = "SELECT * FROM \"hy_record\"" incremental = false # 是否增量同步 -key_field = "UpdateTime" # 增量同步的时间字段 \ No newline at end of file +key_field = "UpdateTime" # 增量同步的时间字段 + + + + +[[tables]] +name = "hy_allot" +query = "SELECT * FROM \"hy_allot\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_cytask" +query = "SELECT * FROM \"hy_cytask\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_fullwatersample" +query = "SELECT * FROM \"hy_fullwatersample\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_informationnorm" +query = "SELECT * FROM \"hy_informationnorm\"" +incremental = false +key_field = "UpdateTime" + + + +[[tables]] +name = "hy_itemdetail" +query = "SELECT * FROM \"hy_itemdetail\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_laboratoryinstrument" +query = "SELECT * FROM \"hy_laboratoryinstrument\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_materialanalysis_type" +query = "SELECT * FROM \"hy_materialanalysis_type\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_materialdetail" +query = "SELECT * FROM \"hy_materialdetail\"" +incremental = false +key_field = "UpdateTime" + + +[[tables]] +name = "hy_norm" +query = "SELECT * FROM \"hy_norm\"" +incremental = false +key_field = "UpdateTime" + + +[[tables]] +name = "hy_weight_input" +query = "SELECT * FROM \"hy_weight_input\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_warmhumid" +query = "SELECT * FROM \"hy_warmhumid\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_task" +query = "SELECT * FROM \"hy_task\"" +incremental = false +key_field = "UpdateTime" + +[[tables]] +name = "hy_spotcheck" +query = "SELECT * FROM \"hy_spotcheck\"" +incremental = false +key_field = "UpdateTime" + + diff --git a/tcp_server/src/main.rs b/tcp_server/src/main.rs index c49bdb0..1a0b937 100644 --- a/tcp_server/src/main.rs +++ b/tcp_server/src/main.rs @@ -4,7 +4,7 @@ use config::Config; use tokio_postgres::{NoTls, Error as PgError}; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use chrono::{DateTime, Utc, NaiveDateTime}; +use chrono::{DateTime, Utc, NaiveDateTime, NaiveDate}; use std::error::Error; use rust_decimal::Decimal; @@ -79,6 +79,420 @@ struct RecordInfo { isnormal: Option, } +#[derive(Debug, Deserialize)] +struct HyCyTask { + id: i32, + task_code: Option, + task_type: Option, + task_status: Option, + create_time: Option, + create_user: Option, + remark: Option, +} + +#[derive(Debug, Deserialize)] +struct HyAllot { + id: i32, + userid: Option, + username: Option, + informationid: Option, + allottime: Option, + hy_code: Option, + hy_type: Option, + hy_method: Option, + hy_quest: Option +} + +#[derive(Debug, Deserialize)] +struct HyFullWaterSample { + id: i32, + qs_code: Option, + qs_tonnage: Option, + mt: Option, + remark: Option, + onecode: Option, + towcode: Option, + fx_code: Option, + fx_onecode: Option, + fx_twocode: Option +} + +#[derive(Debug, Deserialize)] +struct HyInformationNorm { + id: i32, + information_id: Option, + hy_id: Option, + norm_name: Option, + flag: Option, + apparatus_id: Option, + need_compute: Option, + formula: Option, + secondformula: Option, + mapping: Option, + input_type: Option, + round: Option, + sort: Option +} + +#[derive(Debug, Deserialize)] +struct HyItemDetail { + id: i32, + record_id: Option, + information_id: Option, + laboratory_id: Option, + number: Option, + cancellation: Option, + detectionuser: Option, + detectiontime: Option, + original_num: Option, + hy_check: Option, + checkuser: Option, + checktime: Option, + oversize: Option +} + +#[derive(Debug, Deserialize)] +struct HyLaboratoryInstrument { + id: i32, + norm_id: Option, + instrument_id: Option +} + +#[derive(Debug, Deserialize)] +struct HyMaterialAnalysisType { + id: i32, + name: Option, + flag: Option, + sort: Option, + createtime: Option, + createuser: Option +} + +#[derive(Debug, Deserialize)] +struct HyMaterialDetail { + id: i32, + name: Option, + flag: Option, + sort: Option, + createtime: Option, + createuser: Option, + analysistypeid: Option, + materialid: Option +} + +#[derive(Debug, Deserialize)] +struct HyNorm { + id: i32, + norm_id: Option, + zbvalues: Option, + itemdetail_id: Option, + hy_user: Option, + checktime: Option, + explain: Option +} + +#[derive(Debug, Deserialize)] +struct HySampleCollectionDetail { + id: i32, + num: i32, + unit_num: i32, + time: Option, + r#type: Option, + sy_method: Option, + sy_time: Option, + one_num: Option, + two_num: Option, + three_num: Option, + sy_starttime: Option, + sy_endtime: Option, + cy_startnum: Option, + cy_endnum: Option, + sy_user: Option, + sy_car_count: Option, + sy_dun_weight: Option, + byz_bag_count: Option, + y_liu: Option, + zy_user: Option, + one_num_createtime: Option, + one_num_user: Option, + one_num_review_status: Option, + one_num_review_user: Option, + one_num_review_time: Option, + two_num_createtime: Option, + two_num_user: Option, + two_num_review_status: Option, + two_num_review_user: Option, + two_num_review_time: Option, + three_num_createtime: Option, + three_num_user: Option, + three_num_review_status: Option, + three_num_review_user: Option, + three_num_review_time: Option, + hy_createtime: Option, + hy_time: Option, + hy_user: Option, + hy_review_user: Option, + hy_review_status: Option, + hy_review_time: Option, + record_sort: Option, + is_print: Option, + print_msg: Option, + pring_user: Option, + hy_approve: Option, + hy_approve_time: Option, + hy_approve_user: Option, + mt: Option, + mad_clp: Option, + mad_my: Option, + mad_hh: Option, + mad: Option, + a_hm: Option, + a_my: Option, + a_hh: Option, + a_hm1: Option, + a_my1: Option, + a_hh1: Option, + aad: Option, + ad: Option, + v_gg: Option, + v_my: Option, + v_hh: Option, + vad: Option, + vdaf: Option, + var_data: Option, + vd: Option, + fcad: Option, + st_ad: Option, + st_d: Option, + st_ar: Option, + had: Option, + hd: Option, + qb_ad: Option, + qgr_ad: Option, + qnet_ar: Option, + qgr_d: Option, + qnet_ar1: Option, + byz_bag_user: Option, + byz_bag_time: Option, + byz_bag_remark: Option, + fc: Option, + fc_reason: Option, + fc_user: Option, + fc_time: Option, + xk_time: Option, + xk_user: Option, + xk_card_num: Option, + allow_sync: Option, + sync: Option, + sync_time: Option, + mc_unit: Option, + mc_review: Option, + msg_produce: Option, + supply_num: Option, + v_data: Option, + cc: Option, + cc_user: Option, + cc_time: Option, + aar: Option, + is_delete: Option, + alarm_remark: Option, + zs: Option, + zs_detail: Option, + zs_sy_num: Option, + kf_qs: Option, + kf_qs_time: Option, + kf_qs_reason: Option, + qs_time: Option, + crc: Option, + dk_hy: Option, + dk_hy_num: Option, + hy_copy: Option, + hy_copy_num: Option, + hy_copy_user: Option, + hy_copy_time: Option, + already_send: Option, + send_time: Option, + st_daf: Option, + st: Option, + two_num_weight: Option +} + +#[derive(Debug, Serialize, Deserialize)] +struct SampleCollectionDetail { + id: i32, + num: i32, + unit_num: i32, + time: Option, + r#type: Option, + sy_method: Option, + sy_time: Option, + one_num: Option, + two_num: Option, + three_num: Option, + sy_starttime: Option, + sy_endtime: Option, + cy_startnum: Option, + cy_endnum: Option, + sy_user: Option, + sy_car_count: Option, + sy_dun_weight: Option, + byz_bag_count: Option, + y_liu: Option, + zy_user: Option, + one_num_createtime: Option, + one_num_user: Option, + one_num_review_status: Option, + one_num_review_user: Option, + one_num_review_time: Option, + two_num_createtime: Option, + two_num_user: Option, + two_num_review_status: Option, + two_num_review_user: Option, + two_num_review_time: Option +} + +#[derive(Debug, Serialize, Deserialize)] +struct SpotCheck { + id: i32, + spotcheck_code: Option, + spotcheck_user: Option, + spotcheck_time: Option, + spotcheck_type: Option, + coal_sample_code: Option, + sample_custodian: Option, + sampling_time: Option, + quality_incoming: Option, + granularity: Option, + spotcheck_compare: Option, + mt: Option, + mad: Option, + aad: Decimal, + ad: Option, + vad: Option, + vdaf: Option, + var: Option, + st_ad: Option, + st_d: Option, + qb_ad: Option, + had: Option, + qnet_ar: Option, + qnet_ar1: Option, + qgr_d: Option, + qgr_ad: Option, + vd: Option, + aar: Option, + st_ar: Option, + hd: Option, + fcad: Option, + crc: Option, + st_daf: Option, + cad: Option, + cd: Option, + isauto: Option, + hy_type: Option, + isnormal: Option, +} + +#[derive(Debug, Deserialize)] +struct HySampleDelivery { + id: i32, + sample_number: Option, + coal_sample: Option, + sample_weight: Option, + sampler_user: Option, + state: Option, + check_weight: Option, + sample_type: Option, + time: Option, + entering_type: Option, + sample_delivery_type: Option, + granularity: Option, + container_weight: Option, + sample_delivery_time: Option, + receive_time: Option, + sample_delivery_user: Option, + receive_user: Option, + notes: Option, + serial_number: Option, + r#type: Option, + receive_number: Option, + samples_number_t: Option, + samples_number_d: Option, + coal_sample_d: Option, + receive_state: Option, + coal_sample_t: Option +} + +#[derive(Debug, Deserialize)] +struct HyTask { + id: i32, + task_name: Option, + task_type: Option, + task_num: Option, + is_auto: Option, + task_time: Option, + state: Option, + create_by: Option, + create_time: Option, + update_by: Option, + update_time: Option +} + +#[derive(Debug, Deserialize)] +struct HyWarmHumid { + id: i32, + laboratoryid: Option, + temperature: Option, + humidity: Option, + begintime: Option, + endtime: Option, + username: Option +} + +#[derive(Debug, Deserialize)] +struct HyWeightInput { + id: i32, + information_id: i32, + information_norm_id: i32 +} + +#[derive(Debug, Deserialize)] +struct HySpotcheck { + id: i32, + spotcheck_code: Option, + spotcheck_user: Option, + spotcheck_time: Option, + spotcheck_type: Option, + coal_sample_code: Option, + sample_custodian: Option, + sampling_time: Option, + quality_incoming: Option, + granularity: Option, + spotcheck_compare: Option, + mt: Option, + mad: Option, + aad: Decimal, + ad: Option, + vad: Option, + vdaf: Option, + var: Option, + st_ad: Option, + st_d: Option, + qb_ad: Option, + had: Option, + qnet_ar: Option, + qnet_ar1: Option, + qgr_d: Option, + qgr_ad: Option, + vd: Option, + aar: Option, + st_ar: Option, + hd: Option, + fcad: Option, + crc: Option, + st_daf: Option +} + async fn connect_db(config: &Config) -> Result { let host = config.get_string("database.host").unwrap(); let port = config.get_int("database.port").unwrap() as u16; @@ -222,93 +636,767 @@ async fn insert_record(client: &tokio_postgres::Client, info: &RecordInfo) -> Re Ok(()) } -async fn handle_client(socket: &mut TcpStream, client: &tokio_postgres::Client) -> Result<(), Box> { - let mut buf = [0; 1024 * 64]; // 增加缓冲区大小到64KB - - loop { - let n = socket.read(&mut buf).await?; - if n == 0 { - break; - } +async fn insert_cytask(client: &tokio_postgres::Client, info: &HyCyTask) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_cytask WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); - let data = String::from_utf8_lossy(&buf[..n]); - println!("接收到的数据: {}", data); + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } - // 首先解析为通用JSON以获取table_name - let result = if let Ok(json) = serde_json::from_str::(&data) { - if let Some(table_name) = json.get("table_name").and_then(|v| v.as_str()) { - match table_name { - "hy_instrument" => { - if let Ok(info) = serde_json::from_str::(&data) { - println!("接收到仪器信息: {:?}", info); - match insert_instrument(client, &info).await { - Ok(_) => true, - Err(e) => { - eprintln!("插入仪器信息失败: {}", e); - false - } - } - } else { - eprintln!("解析仪器信息失败"); - false - } - }, - "hy_information" => { - if let Ok(info) = serde_json::from_str::(&data) { - println!("接收到分析信息: {:?}", info); - match insert_information(client, &info).await { - Ok(_) => true, - Err(e) => { - eprintln!("插入分析信息失败: {}", e); - false - } - } - } else { - eprintln!("解析分析信息失败"); - false - } - }, - "hy_record" => { - if let Ok(info) = serde_json::from_str::(&data) { - println!("接收到记录信息: {:?}", info); - match insert_record(client, &info).await { - Ok(_) => true, - Err(e) => { - eprintln!("插入记录信息失败: {}", e); - false - } - } - } else { - eprintln!("解析记录信息失败"); - false - } - }, - _ => { - eprintln!("未知的表名: {}", table_name); - false - } - } - } else { - eprintln!("JSON数据中缺少table_name字段"); - false - } - } else { - eprintln!("无法解析为JSON数据"); - false - }; + client.execute( + "INSERT INTO public.hy_cytask (id, task_code, task_type, task_status, create_time, create_user, remark) + OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7)", + &[ + &info.id, &info.task_code, &info.task_type, &info.task_status, + &info.create_time, &info.create_user, &info.remark, + ], + ) + .await?; - // 发送响应 - let response = if result { 0xFF } else { 0x00 }; - if let Err(e) = socket.write_all(&[response]).await { - eprintln!("发送响应失败: {}", e); - break; - } + println!("成功插入采样任务: ID {}", info.id); + Ok(()) +} + +async fn insert_allot(client: &tokio_postgres::Client, info: &HyAllot) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_allot WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); } - println!("客户端断开连接: {}", socket.peer_addr()?); + client.execute( + "INSERT INTO public.hy_allot (id, userid, username, informationid, allottime, hy_code, hy_type, hy_method, hy_quest) + OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + &[ + &info.id, &info.userid, &info.username, &info.informationid, &info.allottime, + &info.hy_code, &info.hy_type, &info.hy_method, &info.hy_quest, + ], + ) + .await?; + + println!("成功插入分配信息: ID {}", info.id); Ok(()) } +async fn insert_sample_collection_detail(client: &tokio_postgres::Client, info: &HySampleCollectionDetail) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_sample_collection_detail WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_sample_collection_detail ( + id, num, unit_num, time, type, sy_method, sy_time, one_num, two_num, three_num, + sy_starttime, sy_endtime, cy_startnum, cy_endnum, sy_user, sy_car_count, + sy_dun_weight, byz_bag_count, y_liu, zy_user, one_num_createtime, one_num_user, + one_num_review_status, one_num_review_user, one_num_review_time, two_num_createtime, + two_num_user, two_num_review_status, two_num_review_user, two_num_review_time, + three_num_createtime, three_num_user, three_num_review_status, three_num_review_user, + three_num_review_time, hy_createtime, hy_time, hy_user, hy_review_user, hy_review_status, + hy_review_time, record_sort, is_print, print_msg, pring_user, hy_approve, + hy_approve_time, hy_approve_user, mt, mad_clp, mad_my, mad_hh, mad, a_hm, + a_my, a_hh, a_hm1, a_my1, a_hh1, aad, ad, v_gg, v_my, v_hh, vad, vdaf, + var_data, vd, fcad, st_ad, st_d, st_ar, had, hd, qb_ad, qgr_ad, qnet_ar, + qgr_d, qnet_ar1, byz_bag_user, byz_bag_time, byz_bag_remark, fc, fc_reason, + fc_user, fc_time, xk_time, xk_user, xk_card_num, allow_sync, sync, sync_time, + mc_unit, mc_review, msg_produce, supply_num, v_data, cc, cc_user, cc_time, + aar, is_delete, alarm_remark, zs, zs_detail, zs_sy_num, kf_qs, kf_qs_time, + kf_qs_reason, qs_time, crc, dk_hy, dk_hy_num, hy_copy, hy_copy_num, + hy_copy_user, hy_copy_time, already_send, send_time, st_daf, st, two_num_weight + ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, + $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, + $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42, $43, $44, $45, + $46, $47, $48, $49, $50, $51, $52, $53, $54, $55, $56, $57, $58, $59, $60, + $61, $62, $63, $64, $65, $66, $67, $68, $69, $70, $71, $72, $73, $74, $75, + $76, $77, $78, $79, $80, $81, $82, $83, $84, $85, $86, $87, $88, $89, $90, + $91, $92, $93, $94, $95, $96, $97, $98, $99, $100, $101, $102, $103, $104, + $105, $106, $107, $108, $109, $110, $111, $112, $113, $114, $115, $116)", + &[ + &info.id, &info.num, &info.unit_num, &info.time, &info.r#type, &info.sy_method, + &info.sy_time, &info.one_num, &info.two_num, &info.three_num, &info.sy_starttime, + &info.sy_endtime, &info.cy_startnum, &info.cy_endnum, &info.sy_user, + &info.sy_car_count, &info.sy_dun_weight, &info.byz_bag_count, &info.y_liu, + &info.zy_user, &info.one_num_createtime, &info.one_num_user, + &info.one_num_review_status, &info.one_num_review_user, &info.one_num_review_time, + &info.two_num_createtime, &info.two_num_user, &info.two_num_review_status, + &info.two_num_review_user, &info.two_num_review_time, &info.three_num_createtime, + &info.three_num_user, &info.three_num_review_status, &info.three_num_review_user, + &info.three_num_review_time, &info.hy_createtime, &info.hy_time, &info.hy_user, + &info.hy_review_user, &info.hy_review_status, &info.hy_review_time, + &info.record_sort, &info.is_print, &info.print_msg, &info.pring_user, + &info.hy_approve, &info.hy_approve_time, &info.hy_approve_user, &info.mt, + &info.mad_clp, &info.mad_my, &info.mad_hh, &info.mad, &info.a_hm, &info.a_my, + &info.a_hh, &info.a_hm1, &info.a_my1, &info.a_hh1, &info.aad, &info.ad, + &info.v_gg, &info.v_my, &info.v_hh, &info.vad, &info.vdaf, &info.var_data, + &info.vd, &info.fcad, &info.st_ad, &info.st_d, &info.st_ar, &info.had, + &info.hd, &info.qb_ad, &info.qgr_ad, &info.qnet_ar, &info.qgr_d, + &info.qnet_ar1, &info.byz_bag_user, &info.byz_bag_time, &info.byz_bag_remark, + &info.fc, &info.fc_reason, &info.fc_user, &info.fc_time, &info.xk_time, + &info.xk_user, &info.xk_card_num, &info.allow_sync, &info.sync, + &info.sync_time, &info.mc_unit, &info.mc_review, &info.msg_produce, + &info.supply_num, &info.v_data, &info.cc, &info.cc_user, &info.cc_time, + &info.aar, &info.is_delete, &info.alarm_remark, &info.zs, &info.zs_detail, + &info.zs_sy_num, &info.kf_qs, &info.kf_qs_time, &info.kf_qs_reason, + &info.qs_time, &info.crc, &info.dk_hy, &info.dk_hy_num, &info.hy_copy, + &info.hy_copy_num, &info.hy_copy_user, &info.hy_copy_time, &info.already_send, + &info.send_time, &info.st_daf, &info.st, &info.two_num_weight + ], + ).await?; + + println!("成功插入样品采集明细: ID {}", info.id); + Ok(()) +} + +async fn insert_spotcheck(client: &tokio_postgres::Client, info: &HySpotcheck) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_spotcheck WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_spotcheck ( + id, spotcheck_code, spotcheck_user, spotcheck_time, spotcheck_type, + coal_sample_code, sample_custodian, sampling_time, quality_incoming, + granularity, spotcheck_compare, mt, mad, aad, ad, vad, vdaf, var, + st_ad, st_d, qb_ad, had, qnet_ar, qnet_ar1, qgr_d, qgr_ad, vd, + aar, st_ar, hd, fcad, crc, st_daf + ) OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, + $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, + $28, $29, $30, $31, $32, $33)", + &[ + &info.id, &info.spotcheck_code, &info.spotcheck_user, &info.spotcheck_time, + &info.spotcheck_type, &info.coal_sample_code, &info.sample_custodian, + &info.sampling_time, &info.quality_incoming, &info.granularity, + &info.spotcheck_compare, &info.mt, &info.mad, &info.aad, &info.ad, + &info.vad, &info.vdaf, &info.var, &info.st_ad, &info.st_d, &info.qb_ad, + &info.had, &info.qnet_ar, &info.qnet_ar1, &info.qgr_d, &info.qgr_ad, + &info.vd, &info.aar, &info.st_ar, &info.hd, &info.fcad, &info.crc, + &info.st_daf + ], + ).await?; + + println!("成功插入抽查记录: ID {}", info.id); + Ok(()) +} + +async fn insert_sample_delivery(client: &tokio_postgres::Client, info: &HySampleDelivery) -> Result<(), PgError> { + client.execute( + "INSERT INTO public.hy_sample_delivery ( + id, sample_number, coal_sample, sample_weight, sampler_user, state, + check_weight, sample_type, time, entering_type, sample_delivery_type, + granularity, container_weight, sample_delivery_time, receive_time, + sample_delivery_user, receive_user, notes, serial_number, type, + receive_number, samples_number_t, samples_number_d, coal_sample_d, + receive_state, coal_sample_t + ) OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, + $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)", + &[ + &info.id, &info.sample_number, &info.coal_sample, &info.sample_weight, + &info.sampler_user, &info.state, &info.check_weight, &info.sample_type, + &info.time, &info.entering_type, &info.sample_delivery_type, + &info.granularity, &info.container_weight, &info.sample_delivery_time, + &info.receive_time, &info.sample_delivery_user, &info.receive_user, + &info.notes, &info.serial_number, &info.r#type, &info.receive_number, + &info.samples_number_t, &info.samples_number_d, &info.coal_sample_d, + &info.receive_state, &info.coal_sample_t + ], + ).await?; + Ok(()) +} + +async fn insert_task(client: &tokio_postgres::Client, info: &HyTask) -> Result<(), PgError> { + client.execute( + "INSERT INTO public.hy_task ( + id, task_name, task_type, task_num, is_auto, task_time, state, + create_by, create_time, update_by, update_time + ) OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)", + &[ + &info.id, &info.task_name, &info.task_type, &info.task_num, + &info.is_auto, &info.task_time, &info.state, &info.create_by, + &info.create_time, &info.update_by, &info.update_time + ], + ).await?; + Ok(()) +} + +async fn insert_warmhumid(client: &tokio_postgres::Client, info: &HyWarmHumid) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_warmhumid WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_warmhumid (id, laboratoryid, temperature, humidity, begintime, endtime, username) + OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7)", + &[ + &info.id, &info.laboratoryid, &info.temperature, &info.humidity, + &info.begintime, &info.endtime, &info.username + ], + ).await?; + + println!("成功插入温湿度记录: ID {}", info.id); + Ok(()) +} + +async fn insert_weight_input(client: &tokio_postgres::Client, info: &HyWeightInput) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_weight_input WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_weight_input (id, information_id, information_norm_id) OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3)", + &[&info.id, &info.information_id, &info.information_norm_id], + ).await?; + + println!("成功插入重量输入: ID {}", info.id); + Ok(()) +} + +async fn insert_laboratory_instrument(client: &tokio_postgres::Client, info: &HyLaboratoryInstrument) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_laboratoryinstrument WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_laboratoryinstrument (id, norm_id, instrument_id) OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3)", + &[&info.id, &info.norm_id, &info.instrument_id], + ).await?; + + println!("成功插入实验室仪器: ID {}", info.id); + Ok(()) +} + +async fn insert_material_analysis_type(client: &tokio_postgres::Client, info: &HyMaterialAnalysisType) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_materialanalysis_type WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_materialanalysis_type (id, name, flag, sort, createtime, createuser) + OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6)", + &[&info.id, &info.name, &info.flag, &info.sort, &info.createtime, &info.createuser], + ).await?; + + println!("成功插入物料分析类型: ID {}", info.id); + Ok(()) +} + +async fn insert_material_detail(client: &tokio_postgres::Client, info: &HyMaterialDetail) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_materialdetail WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_materialdetail (id, name, flag, sort, createtime, createuser, analysistypeid, materialid) + OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + &[ + &info.id, &info.name, &info.flag, &info.sort, &info.createtime, + &info.createuser, &info.analysistypeid, &info.materialid + ], + ).await?; + + println!("成功插入物料明细: ID {}", info.id); + Ok(()) +} + +async fn insert_norm(client: &tokio_postgres::Client, info: &HyNorm) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_norm WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_norm (id, norm_id, zbvalues, itemdetail_id, hy_user, checktime, explain) + OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7)", + &[ + &info.id, &info.norm_id, &info.zbvalues, &info.itemdetail_id, + &info.hy_user, &info.checktime, &info.explain + ], + ).await?; + + println!("成功插入化验规范: ID {}", info.id); + Ok(()) +} + +async fn insert_fullwatersample(client: &tokio_postgres::Client, info: &HyFullWaterSample) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_fullwatersample WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_fullwatersample (id, qs_code, qs_tonnage, mt, remark, onecode, towcode, fx_code, fx_onecode, fx_twocode) + OVERRIDING SYSTEM VALUE + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + &[ + &info.id, &info.qs_code, &info.qs_tonnage, &info.mt, &info.remark, &info.onecode, &info.towcode, &info.fx_code, &info.fx_onecode, &info.fx_twocode], + ).await?; + + println!("成功插入全水样品: ID {}", info.id); + Ok(()) +} + +async fn insert_informationnorm(client: &tokio_postgres::Client, info: &HyInformationNorm) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_informationnorm WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_informationnorm (id, information_id, hy_id, norm_name, flag, apparatus_id, need_compute, formula, secondformula, mapping, input_type, round, sort) + OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", + &[ + &info.id, &info.information_id, &info.hy_id, &info.norm_name, &info.flag, &info.apparatus_id, + &info.need_compute, &info.formula, &info.secondformula, &info.mapping, &info.input_type, + &info.round, &info.sort + ], + ).await?; + + println!("成功插入化验信息规范: ID {}", info.id); + Ok(()) +} + +async fn insert_itemdetail(client: &tokio_postgres::Client, info: &HyItemDetail) -> Result<(), PgError> { + let exists = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM public.hy_itemdetail WHERE id = $1)", + &[&info.id], + ) + .await? + .get::<_, bool>(0); + + if exists { + println!("ID {} 已存在,跳过插入", info.id); + return Ok(()); + } + + client.execute( + "INSERT INTO public.hy_itemdetail (id, record_id, information_id, laboratory_id, number, cancellation, detectionuser, detectiontime, original_num, hy_check, checkuser, checktime, oversize) + OVERRIDING SYSTEM VALUE VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", + &[ + &info.id, &info.record_id, &info.information_id, &info.laboratory_id, &info.number, + &info.cancellation, &info.detectionuser, &info.detectiontime, &info.original_num, + &info.hy_check, &info.checkuser, &info.checktime, &info.oversize + ], + ).await?; + + println!("成功插入化验项目明细: ID {}", info.id); + Ok(()) +} + +async fn handle_client(socket: &mut TcpStream, client: &tokio_postgres::Client) -> Result<(), Box> { + let mut buf = [0; 1024 * 64]; + + loop { + let n = socket.read(&mut buf).await?; + if n == 0 { + return Ok(()); + } + + let data = String::from_utf8_lossy(&buf[..n]); + println!("接收到的数据: {}", data); + + let result = if let Ok(json) = serde_json::from_str::(&data) { + if let Some(table_name) = json.get("table_name").and_then(|v| v.as_str()) { + match table_name { + "hy_instrument" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到仪器信息: {:?}", info); + match insert_instrument(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入仪器信息失败: {}", e); + false + } + } + } else { + eprintln!("解析仪器信息失败"); + false + } + }, + "hy_information" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到分析信息: {:?}", info); + match insert_information(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入分析信息失败: {}", e); + false + } + } + } else { + eprintln!("解析分析信息失败"); + false + } + }, + "hy_record" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到记录信息: {:?}", info); + match insert_record(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入记录信息失败: {}", e); + false + } + } + } else { + eprintln!("解析记录信息失败"); + false + } + }, + "hy_cytask" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到采样任务信息: {:?}", info); + match insert_cytask(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入采样任务信息失败: {}", e); + false + } + } + } else { + eprintln!("解析采样任务信息失败"); + false + } + }, + "hy_allot" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到分配信息: {:?}", info); + match insert_allot(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入分配信息失败: {}", e); + false + } + } + } else { + eprintln!("解析分配信息失败"); + false + } + }, + "hy_sample_collection_detail" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到样品采集明细信息: {:?}", info); + match insert_sample_collection_detail(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入样品采集明细信息失败: {}", e); + false + } + } + } else { + eprintln!("解析样品采集明细信息失败"); + false + } + }, + "hy_spotcheck" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到抽查信息: {:?}", info); + match insert_spotcheck(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入抽查信息失败: {}", e); + false + } + } + } else { + eprintln!("解析抽查信息失败"); + false + } + }, + "hy_sample_delivery" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到样品交付信息: {:?}", info); + match insert_sample_delivery(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入样品交付信息失败: {}", e); + false + } + } + } else { + eprintln!("解析样品交付信息失败"); + false + } + }, + "hy_task" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到任务信息: {:?}", info); + match insert_task(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入任务信息失败: {}", e); + false + } + } + } else { + eprintln!("解析任务信息失败"); + false + } + }, + "hy_warmhumid" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到温湿度信息: {:?}", info); + match insert_warmhumid(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入温湿度信息失败: {}", e); + false + } + } + } else { + eprintln!("解析温湿度信息失败"); + false + } + }, + "hy_weight_input" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到重量输入信息: {:?}", info); + match insert_weight_input(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入重量输入信息失败: {}", e); + false + } + } + } else { + eprintln!("解析重量输入信息失败"); + false + } + }, + "hy_laboratoryinstrument" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到实验室仪器信息: {:?}", info); + match insert_laboratory_instrument(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入实验室仪器信息失败: {}", e); + false + } + } + } else { + eprintln!("解析实验室仪器信息失败"); + false + } + }, + "hy_materialanalysis_type" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到物料分析类型信息: {:?}", info); + match insert_material_analysis_type(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入物料分析类型信息失败: {}", e); + false + } + } + } else { + eprintln!("解析物料分析类型信息失败"); + false + } + }, + "hy_materialdetail" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到物料详细信息: {:?}", info); + match insert_material_detail(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入物料详细信息失败: {}", e); + false + } + } + } else { + eprintln!("解析物料详细信息失败"); + false + } + }, + "hy_norm" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到标准信息: {:?}", info); + match insert_norm(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入标准信息失败: {}", e); + false + } + } + } else { + eprintln!("解析标准信息失败"); + false + } + }, + "hy_fullwatersample" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到全水样品信息: {:?}", info); + match insert_fullwatersample(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入全水样品信息失败: {}", e); + false + } + } + } else { + eprintln!("解析全水样品信息失败"); + false + } + }, + "hy_informationnorm" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到化验信息规范信息: {:?}", info); + match insert_informationnorm(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入化验信息规范信息失败: {}", e); + false + } + } + } else { + eprintln!("解析化验信息规范信息失败"); + false + } + }, + "hy_itemdetail" => { + if let Ok(info) = serde_json::from_str::(&data) { + println!("接收到化验项目明细信息: {:?}", info); + match insert_itemdetail(client, &info).await { + Ok(_) => true, + Err(e) => { + eprintln!("插入化验项目明细信息失败: {}", e); + false + } + } + } else { + eprintln!("解析化验项目明细信息失败"); + false + } + }, + _ => { + eprintln!("未知的表名: {}", table_name); + false + } + } + } else { + eprintln!("JSON数据中缺少table_name字段"); + false + } + } else { + eprintln!("无效的JSON数据"); + false + }; + + let response = if result { 0xFF } else { 0x00 }; + if let Err(e) = socket.write_all(&[response]).await { + eprintln!("发送响应失败: {}", e); + return Ok(()); + } + } +} + #[tokio::main] async fn main() -> Result<(), Box> { // 读取配置文件 diff --git a/tcp_server/target/debug/tcp_server.exe b/tcp_server/target/debug/tcp_server.exe index 2395420..b40d960 100644 Binary files a/tcp_server/target/debug/tcp_server.exe and b/tcp_server/target/debug/tcp_server.exe differ diff --git a/tcp_server/target/debug/tcp_server.pdb b/tcp_server/target/debug/tcp_server.pdb index ff860fb..3434842 100644 Binary files a/tcp_server/target/debug/tcp_server.pdb and b/tcp_server/target/debug/tcp_server.pdb differ