|
|
@@ -1,53 +1,11 @@ |
|
|
|
use std::error::Error; |
|
|
|
use tokio::net::{TcpListener, TcpStream}; |
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
|
|
|
use config::Config; |
|
|
|
use tokio_postgres::{NoTls, Error as PgError}; |
|
|
|
use serde::{Deserialize, Serialize}; |
|
|
|
use std::sync::Arc; |
|
|
|
use std::net::SocketAddr; |
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
enum AppError { |
|
|
|
Database(PgError), |
|
|
|
Json(serde_json::Error), |
|
|
|
} |
|
|
|
|
|
|
|
impl From<PgError> for AppError { |
|
|
|
fn from(err: PgError) -> Self { |
|
|
|
AppError::Database(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl From<serde_json::Error> for AppError { |
|
|
|
fn from(err: serde_json::Error) -> Self { |
|
|
|
AppError::Json(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl std::fmt::Display for AppError { |
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
|
|
|
match self { |
|
|
|
AppError::Database(e) => write!(f, "数据库错误: {}", e), |
|
|
|
AppError::Json(e) => write!(f, "JSON错误: {}", e), |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl std::error::Error for AppError { |
|
|
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { |
|
|
|
match self { |
|
|
|
AppError::Database(e) => Some(e), |
|
|
|
AppError::Json(e) => Some(e), |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
#[derive(Debug, Serialize, Deserialize)] |
|
|
|
struct TableType { |
|
|
|
table_name: String, |
|
|
|
data: serde_json::Value, |
|
|
|
} |
|
|
|
use chrono::{DateTime, Utc}; |
|
|
|
use std::error::Error; |
|
|
|
|
|
|
|
#[derive(Debug, Serialize, Deserialize)] |
|
|
|
struct InstrumentInfo { |
|
|
@@ -60,118 +18,60 @@ struct InstrumentInfo { |
|
|
|
specification: String, |
|
|
|
} |
|
|
|
|
|
|
|
async fn insert_data(client: &tokio_postgres::Client, table_type: &TableType) -> Result<(), AppError> { |
|
|
|
// 先检查表是否存在 |
|
|
|
let exists = client |
|
|
|
.query_one( |
|
|
|
"SELECT EXISTS( |
|
|
|
SELECT 1 FROM information_schema.tables |
|
|
|
WHERE table_schema = 'public' |
|
|
|
AND table_name = $1 |
|
|
|
)", |
|
|
|
&[&table_type.table_name], |
|
|
|
) |
|
|
|
.await? |
|
|
|
.get::<_, bool>(0); |
|
|
|
|
|
|
|
if !exists { |
|
|
|
println!("表 {} 不存在", table_type.table_name); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
|
|
|
|
// 获取表的列信息 |
|
|
|
let columns = client |
|
|
|
.query( |
|
|
|
"SELECT column_name, data_type |
|
|
|
FROM information_schema.columns |
|
|
|
WHERE table_schema = 'public' |
|
|
|
AND table_name = $1 |
|
|
|
ORDER BY ordinal_position", |
|
|
|
&[&table_type.table_name], |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
if columns.is_empty() { |
|
|
|
println!("表 {} 没有列信息", table_type.table_name); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
|
|
|
|
// 检查 ID 是否存在 |
|
|
|
if let Some(id) = table_type.data.get("id") { |
|
|
|
let id_value = id.as_i64().unwrap_or(0); |
|
|
|
let exists = client |
|
|
|
.query_one( |
|
|
|
&format!("SELECT EXISTS(SELECT 1 FROM public.{} WHERE id = $1)", table_type.table_name), |
|
|
|
&[&id_value], |
|
|
|
) |
|
|
|
.await? |
|
|
|
.get::<_, bool>(0); |
|
|
|
|
|
|
|
if exists { |
|
|
|
println!("表 {} 中 ID {} 已存在,跳过插入", table_type.table_name, id); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 构建插入语句 |
|
|
|
let mut column_names = Vec::new(); |
|
|
|
let mut placeholders = Vec::new(); |
|
|
|
let mut param_count = 1; |
|
|
|
let mut query_values: Vec<Box<dyn tokio_postgres::types::ToSql + Sync>> = Vec::new(); |
|
|
|
|
|
|
|
for row in &columns { |
|
|
|
let column_name: String = row.get("column_name"); |
|
|
|
let data_type: String = row.get("data_type"); |
|
|
|
|
|
|
|
if let Some(value) = table_type.data.get(&column_name) { |
|
|
|
column_names.push(column_name); |
|
|
|
placeholders.push(format!("${}", param_count)); |
|
|
|
param_count += 1; |
|
|
|
|
|
|
|
// 根据数据类型转换值 |
|
|
|
match data_type.as_str() { |
|
|
|
"integer" | "bigint" => { |
|
|
|
if let Some(n) = value.as_i64() { |
|
|
|
query_values.push(Box::new(n)); |
|
|
|
} |
|
|
|
} |
|
|
|
"character varying" | "text" => { |
|
|
|
if let Some(s) = value.as_str() { |
|
|
|
query_values.push(Box::new(s.to_string())); |
|
|
|
} |
|
|
|
} |
|
|
|
"boolean" => { |
|
|
|
if let Some(b) = value.as_bool() { |
|
|
|
query_values.push(Box::new(b)); |
|
|
|
} |
|
|
|
} |
|
|
|
"double precision" | "numeric" => { |
|
|
|
if let Some(n) = value.as_f64() { |
|
|
|
query_values.push(Box::new(n)); |
|
|
|
} |
|
|
|
} |
|
|
|
// 可以根据需要添加更多数据类型的处理 |
|
|
|
_ => println!("不支持的数据类型: {}", data_type), |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
let query = format!( |
|
|
|
"INSERT INTO public.{} ({}) OVERRIDING SYSTEM VALUE VALUES ({})", |
|
|
|
table_type.table_name, |
|
|
|
column_names.join(", "), |
|
|
|
placeholders.join(", ") |
|
|
|
); |
|
|
|
|
|
|
|
let values: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_values |
|
|
|
.iter() |
|
|
|
.map(|v| v.as_ref()) |
|
|
|
.collect(); |
|
|
|
|
|
|
|
client.execute(&query, &values[..]).await?; |
|
|
|
#[derive(Debug, Deserialize)] |
|
|
|
struct InformationInfo { |
|
|
|
id: i32, |
|
|
|
hy_name: Option<String>, |
|
|
|
flag: Option<i16>, |
|
|
|
laboratory_id: Option<i32>, |
|
|
|
analysistypeid: Option<i32>, |
|
|
|
} |
|
|
|
|
|
|
|
println!("成功插入数据到表 {}", table_type.table_name); |
|
|
|
Ok(()) |
|
|
|
#[derive(Debug, Deserialize)] |
|
|
|
struct RecordInfo { |
|
|
|
id: i32, |
|
|
|
hy_code: Option<String>, |
|
|
|
r#type: Option<String>, |
|
|
|
hy_check: Option<i16>, |
|
|
|
hy_approve: Option<i16>, |
|
|
|
check_time: Option<DateTime<Utc>>, |
|
|
|
approve_time: Option<DateTime<Utc>>, |
|
|
|
approve_user: Option<String>, |
|
|
|
check_user: Option<String>, |
|
|
|
hy_time: Option<DateTime<Utc>>, |
|
|
|
hy_values: Option<String>, |
|
|
|
accept_time: Option<DateTime<Utc>>, |
|
|
|
accept_user: Option<String>, |
|
|
|
mt: Option<f64>, |
|
|
|
mad: Option<f64>, |
|
|
|
aad: Option<f64>, |
|
|
|
ad: Option<f64>, |
|
|
|
vad: Option<f64>, |
|
|
|
vd: Option<f64>, |
|
|
|
var: Option<f64>, |
|
|
|
vdaf: Option<f64>, |
|
|
|
fcad: Option<f64>, |
|
|
|
st_ar: Option<f64>, |
|
|
|
st_ad: Option<f64>, |
|
|
|
st_d: Option<f64>, |
|
|
|
had: Option<f64>, |
|
|
|
hd: Option<f64>, |
|
|
|
qb_ad: Option<f64>, |
|
|
|
qgr_ad: Option<f64>, |
|
|
|
qgr_d: Option<f64>, |
|
|
|
qnet_ar_mj_kg: Option<f64>, |
|
|
|
qnet_ar_j_cal: Option<f64>, |
|
|
|
v: Option<f64>, |
|
|
|
aar: Option<f64>, |
|
|
|
qnet_ar: Option<f64>, |
|
|
|
qnet_ar1: Option<f64>, |
|
|
|
crc: Option<f64>, |
|
|
|
st_daf: Option<f64>, |
|
|
|
cad: Option<f64>, |
|
|
|
cd: Option<f64>, |
|
|
|
isauto: Option<i16>, |
|
|
|
hy_type: Option<String>, |
|
|
|
isnormal: Option<i32>, |
|
|
|
} |
|
|
|
|
|
|
|
async fn connect_db(config: &Config) -> Result<tokio_postgres::Client, PgError> { |
|
|
@@ -189,7 +89,7 @@ async fn connect_db(config: &Config) -> Result<tokio_postgres::Client, PgError> |
|
|
|
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; |
|
|
|
|
|
|
|
// 在后台运行连接 |
|
|
|
tokio::task::spawn(async move { |
|
|
|
tokio::spawn(async move { |
|
|
|
if let Err(e) = connection.await { |
|
|
|
eprintln!("数据库连接错误: {}", e); |
|
|
|
} |
|
|
@@ -198,17 +98,180 @@ async fn connect_db(config: &Config) -> Result<tokio_postgres::Client, PgError> |
|
|
|
Ok(client) |
|
|
|
} |
|
|
|
|
|
|
|
async fn insert_instrument(client: &tokio_postgres::Client, info: &InstrumentInfo) -> Result<(), AppError> { |
|
|
|
let json_data = serde_json::to_value(info)?; |
|
|
|
let table_type = TableType { |
|
|
|
table_name: "hy_instrument".to_string(), |
|
|
|
data: json_data, |
|
|
|
}; |
|
|
|
insert_data(client, &table_type).await |
|
|
|
async fn insert_instrument(client: &tokio_postgres::Client, info: &InstrumentInfo) -> Result<(), PgError> { |
|
|
|
// 先检查 ID 是否存在 |
|
|
|
let exists = client |
|
|
|
.query_one( |
|
|
|
"SELECT EXISTS(SELECT 1 FROM public.hy_instrument WHERE id = $1)", |
|
|
|
&[&info.id], |
|
|
|
) |
|
|
|
.await? |
|
|
|
.get::<_, bool>(0); |
|
|
|
|
|
|
|
if exists { |
|
|
|
println!("ID {} 已存在,跳过插入", info.id); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
|
|
|
|
// ID 不存在,执行插入 |
|
|
|
client.execute( |
|
|
|
"INSERT INTO public.hy_instrument (id, informationid, instrumentcode, laboratoryid, name, remark, specification) |
|
|
|
OVERRIDING SYSTEM VALUE |
|
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)", |
|
|
|
&[ |
|
|
|
&info.id, |
|
|
|
&info.informationid, |
|
|
|
&info.instrumentcode, |
|
|
|
&info.laboratoryid, |
|
|
|
&info.name, |
|
|
|
&info.remark, |
|
|
|
&info.specification, |
|
|
|
], |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
println!("成功插入仪器信息: {} (ID: {})", info.instrumentcode, info.id); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn insert_information(client: &tokio_postgres::Client, info: &InformationInfo) -> Result<(), PgError> { |
|
|
|
// 先检查 ID 是否存在 |
|
|
|
let exists = client |
|
|
|
.query_one( |
|
|
|
"SELECT EXISTS(SELECT 1 FROM public.hy_information WHERE id = $1)", |
|
|
|
&[&info.id], |
|
|
|
) |
|
|
|
.await? |
|
|
|
.get::<_, bool>(0); |
|
|
|
|
|
|
|
if exists { |
|
|
|
println!("ID {} 已存在,跳过插入", info.id); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
|
|
|
|
// ID 不存在,执行插入 |
|
|
|
client.execute( |
|
|
|
"INSERT INTO public.hy_information (id, hy_name, flag, laboratory_id, analysistypeid) |
|
|
|
OVERRIDING SYSTEM VALUE |
|
|
|
VALUES ($1, $2, $3, $4, $5)", |
|
|
|
&[ |
|
|
|
&info.id, |
|
|
|
&info.hy_name, |
|
|
|
&info.flag, |
|
|
|
&info.laboratory_id, |
|
|
|
&info.analysistypeid, |
|
|
|
], |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
println!("成功插入信息记录: ID {}", info.id); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn insert_record(client: &tokio_postgres::Client, info: &RecordInfo) -> Result<(), PgError> { |
|
|
|
// 先检查 ID 是否存在 |
|
|
|
let exists = client |
|
|
|
.query_one( |
|
|
|
"SELECT EXISTS(SELECT 1 FROM public.hy_record WHERE id = $1)", |
|
|
|
&[&info.id], |
|
|
|
) |
|
|
|
.await? |
|
|
|
.get::<_, bool>(0); |
|
|
|
|
|
|
|
if exists { |
|
|
|
println!("ID {} 已存在,跳过插入", info.id); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
|
|
|
|
// ID 不存在,执行插入 |
|
|
|
client.execute( |
|
|
|
"INSERT INTO public.hy_record ( |
|
|
|
id, hy_code, type, hy_check, hy_approve, check_time, approve_time, |
|
|
|
approve_user, check_user, hy_time, hy_values, accept_time, accept_user, |
|
|
|
mt, mad, aad, ad, vad, vd, var, vdaf, fcad, st_ar, st_ad, st_d, |
|
|
|
had, hd, qb_ad, qgr_ad, qgr_d, qnet_ar_mj_kg, qnet_ar_j_cal, v, |
|
|
|
aar, qnet_ar, qnet_ar1, crc, st_daf, cad, cd, isauto, hy_type, isnormal |
|
|
|
) |
|
|
|
OVERRIDING SYSTEM VALUE |
|
|
|
VALUES ( |
|
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, |
|
|
|
$15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, |
|
|
|
$27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, |
|
|
|
$39, $40, $41, $42, $43 |
|
|
|
)", |
|
|
|
&[ |
|
|
|
&info.id, &info.hy_code, &info.r#type, &info.hy_check, &info.hy_approve, |
|
|
|
&info.check_time, &info.approve_time, &info.approve_user, &info.check_user, |
|
|
|
&info.hy_time, &info.hy_values, &info.accept_time, &info.accept_user, |
|
|
|
&info.mt, &info.mad, &info.aad, &info.ad, &info.vad, &info.vd, &info.var, |
|
|
|
&info.vdaf, &info.fcad, &info.st_ar, &info.st_ad, &info.st_d, &info.had, |
|
|
|
&info.hd, &info.qb_ad, &info.qgr_ad, &info.qgr_d, &info.qnet_ar_mj_kg, |
|
|
|
&info.qnet_ar_j_cal, &info.v, &info.aar, &info.qnet_ar, &info.qnet_ar1, |
|
|
|
&info.crc, &info.st_daf, &info.cad, &info.cd, &info.isauto, &info.hy_type, |
|
|
|
&info.isnormal |
|
|
|
], |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
println!("成功插入记录: ID {}", info.id); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
fn check_ok_message(_message: &[u8]) -> u8 { |
|
|
|
0xFF |
|
|
|
async fn handle_client(socket: &mut TcpStream, client: &tokio_postgres::Client) -> Result<(), Box<dyn Error>> { |
|
|
|
let mut buf = [0; 1024 * 64]; // 增加缓冲区大小到64KB |
|
|
|
|
|
|
|
loop { |
|
|
|
let n = socket.read(&mut buf).await?; |
|
|
|
if n == 0 { |
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
let data = String::from_utf8_lossy(&buf[..n]); |
|
|
|
println!("接收到的数据: {}", data); |
|
|
|
|
|
|
|
// 尝试解析为不同的数据类型并处理 |
|
|
|
let result = if let Ok(info) = serde_json::from_str::<InstrumentInfo>(&data) { |
|
|
|
println!("接收到仪器信息: {:?}", info); |
|
|
|
match insert_instrument(client, &info).await { |
|
|
|
Ok(_) => true, |
|
|
|
Err(e) => { |
|
|
|
eprintln!("插入仪器信息失败: {}", e); |
|
|
|
false |
|
|
|
} |
|
|
|
} |
|
|
|
} else if let Ok(info) = serde_json::from_str::<InformationInfo>(&data) { |
|
|
|
println!("接收到分析信息: {:?}", info); |
|
|
|
match insert_information(client, &info).await { |
|
|
|
Ok(_) => true, |
|
|
|
Err(e) => { |
|
|
|
eprintln!("插入分析信息失败: {}", e); |
|
|
|
false |
|
|
|
} |
|
|
|
} |
|
|
|
} else if let Ok(info) = serde_json::from_str::<RecordInfo>(&data) { |
|
|
|
println!("接收到记录信息: {:?}", info); |
|
|
|
match insert_record(client, &info).await { |
|
|
|
Ok(_) => true, |
|
|
|
Err(e) => { |
|
|
|
eprintln!("插入记录信息失败: {}", e); |
|
|
|
false |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
println!("无法解析JSON数据为任何已知类型"); |
|
|
|
false |
|
|
|
}; |
|
|
|
|
|
|
|
// 发送响应 |
|
|
|
let response = if result { 0xFF } else { 0x00 }; |
|
|
|
if let Err(e) = socket.write_all(&[response]).await { |
|
|
|
eprintln!("发送响应失败: {}", e); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
println!("客户端断开连接: {}", socket.peer_addr()?); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
#[tokio::main] |
|
|
@@ -231,66 +294,14 @@ async fn main() -> Result<(), Box<dyn Error>> { |
|
|
|
println!("服务器监听地址: {}", bind_address); |
|
|
|
|
|
|
|
loop { |
|
|
|
let (socket, addr) = listener.accept().await?; |
|
|
|
let (mut socket, addr) = listener.accept().await?; |
|
|
|
println!("新客户端连接: {}", addr); |
|
|
|
let client = Arc::clone(&client); |
|
|
|
|
|
|
|
// 使用 spawn_blocking 来处理连接 |
|
|
|
tokio::task::spawn(async move { |
|
|
|
if let Err(e) = process_connection(socket, addr, client).await { |
|
|
|
eprintln!("处理连接错误: {}", e); |
|
|
|
tokio::spawn(async move { |
|
|
|
if let Err(e) = handle_client(&mut socket, &client).await { |
|
|
|
eprintln!("处理客户端错误: {}", e); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
async fn process_connection(mut socket: TcpStream, addr: SocketAddr, client: Arc<tokio_postgres::Client>) -> Result<(), Box<dyn Error + Send + Sync>> { |
|
|
|
let mut buf = [0; 1024 * 64]; // 增加缓冲区大小到64KB |
|
|
|
|
|
|
|
loop { |
|
|
|
match socket.read(&mut buf).await { |
|
|
|
Ok(0) => { |
|
|
|
println!("客户端断开连接: {}", addr); |
|
|
|
break; |
|
|
|
} |
|
|
|
Ok(n) => { |
|
|
|
let data = &buf[..n]; |
|
|
|
// 尝试解析为 TableType |
|
|
|
match serde_json::from_slice::<TableType>(data) { |
|
|
|
Ok(table_type) => { |
|
|
|
println!("接收到表 {} 的数据", table_type.table_name); |
|
|
|
if let Err(e) = insert_data(&client, &table_type).await { |
|
|
|
eprintln!("错误: {}", e); |
|
|
|
} |
|
|
|
} |
|
|
|
Err(_) => { |
|
|
|
// 如果不是 TableType 格式,尝试解析为 InstrumentInfo(保持向后兼容) |
|
|
|
match serde_json::from_slice::<InstrumentInfo>(data) { |
|
|
|
Ok(info) => { |
|
|
|
println!("接收到仪器信息: {:?}", info); |
|
|
|
if let Err(e) = insert_instrument(&client, &info).await { |
|
|
|
eprintln!("错误: {}", e); |
|
|
|
} |
|
|
|
} |
|
|
|
Err(e) => { |
|
|
|
eprintln!("解析JSON失败: {}", e); |
|
|
|
eprintln!("接收到的数据: {}", String::from_utf8_lossy(data)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
let response = check_ok_message(data); |
|
|
|
if let Err(e) = socket.write_all(&[response]).await { |
|
|
|
eprintln!("发送响应失败: {}", e); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
Err(e) => { |
|
|
|
eprintln!("读取数据失败: {}", e); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(()) |
|
|
|
} |