Browse Source

将查询出来的数据发送到服务端并成功返回响应

master
OCEAN 2 months ago
parent
commit
fae889244f
7 changed files with 79 additions and 157 deletions
  1. +1
    -97
      tcp_client/config.toml
  2. +72
    -55
      tcp_client/src/main.rs
  3. +6
    -5
      tcp_server/src/main.rs
  4. BIN
      tcp_server/target/debug/deps/tcp_server.exe
  5. BIN
      tcp_server/target/debug/deps/tcp_server.pdb
  6. BIN
      tcp_server/target/debug/tcp_server.exe
  7. BIN
      tcp_server/target/debug/tcp_server.pdb

+ 1
- 97
tcp_client/config.toml View File

@@ -1,6 +1,6 @@
[server]
#host = "10.180.4.88"
host = "10.180.4.88"
host = "127.0.0.1"
port = 9090

[client]
@@ -17,104 +17,8 @@ user = "postgres"
password = "Auseft@2025qwer"

# 要同步的表配置
[[tables]]
name = "hy_record"
query = "SELECT * FROM \"hy_record\""
incremental = false # 是否增量同步
key_field = "UpdateTime" # 增量同步的时间字段

[[tables]]
name = "hy_allot"
query = "SELECT * FROM \"hy_allot\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_cytask"
query = "SELECT * FROM \"hy_cytask\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_fullwatersample"
query = "SELECT * FROM \"hy_fullwatersample\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_informationnorm"
query = "SELECT * FROM \"hy_informationnorm\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_information"
query = "SELECT * FROM \"hy_information\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_itemdetail"
query = "SELECT * FROM \"hy_itemdetail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_laboratoryinstrument"
query = "SELECT * FROM \"hy_laboratoryinstrument\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_materialanalysis_type"
query = "SELECT * FROM \"hy_materialanalysis_type\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_materialdetail"
query = "SELECT * FROM \"hy_materialdetail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_instrument"
query = "SELECT * FROM \"hy_instrument\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_norm"
query = "SELECT * FROM \"hy_norm\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_sample_collection_detail"
query = "SELECT * FROM \"hy_sample_collection_detail\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_weight_input"
query = "SELECT * FROM \"hy_weight_input\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_warmhumid"
query = "SELECT * FROM \"hy_warmhumid\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_task"
query = "SELECT * FROM \"hy_task\""
incremental = false
key_field = "UpdateTime"

[[tables]]
name = "hy_spotcheck"
query = "SELECT * FROM \"hy_spotcheck\""
incremental = false
key_field = "UpdateTime"

+ 72
- 55
tcp_client/src/main.rs View File

@@ -74,68 +74,85 @@ async fn main() {
match db.get_table_data(table).await {
Ok(rows) => {
println!("成功获取 {} 条记录", rows.len());
// 将每行数据转换为JSON
// 将每行数据转换为JSON并发送到服务器
for row in rows {
let json_data = db::format_row_as_json(&row);
println!("数据: {}", serde_json::to_string_pretty(&json_data).unwrap());
}
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
}
}


return;
let mut retry_count = 0;
let server_addr = format!("{}:{}", config.server.host, config.server.port);

// TCP连接部分
while retry_count < config.client.max_retries {
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
println!("成功连接到服务器!");
stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))).unwrap();

let msg = "HelloOK";
match stream.write(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => println!("发送消息失败: {}", e),
}

let mut buffer = [0; 1];
match stream.read(&mut buffer) {
Ok(n) => {
if n == 1 {
let bit = buffer[0];
if bit == 0 || bit == 1 {
println!("接收到比特值: {}", bit);
} else {
println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit);
let msg = serde_json::to_string(&json_data).unwrap();

// 创建TCP连接
let server_addr = format!("{}:{}", config.server.host, config.server.port);
let mut retry_count = 0;

while retry_count < config.client.max_retries {
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
println!("成功连接到服务器!");
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) {
println!("设置读取超时失败: {}", e);
continue;
}
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) {
println!("设置写入超时失败: {}", e);
continue;
}

// 发送数据
match stream.write_all(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => {
println!("发送消息失败: {}", e);
retry_count += 1;
continue;
}
}

// 读取响应
let mut buffer = [0; 1];
match stream.read_exact(&mut buffer) {
Ok(_) => {
let bit = buffer[0];
println!("接收到比特值: {}", bit);
if(bit==255){
println!("接收成功");
break;
}
else if(bit==0){
println!("接收失败");
retry_count += 1;
continue;
}
else{
println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit);
retry_count += 1;
}
}
Err(e) => {
println!("接收数据失败: {}", e);
retry_count += 1;
}
}
}
Err(e) => {
println!("连接服务器失败: {}", e);
retry_count += 1;
if retry_count < config.client.max_retries {
println!("等待{}秒后重试...", config.client.retry_delay_secs);
thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
}
}
} else {
println!("未接收到数据");
}
}
Err(e) => println!("接收数据失败: {}", e),
}
break;
}
Err(e) => {
println!("连接服务器失败: {}", e);
retry_count += 1;
if retry_count < config.client.max_retries {
println!("等待{}秒后重试...", config.client.retry_delay_secs);
thread::sleep(Duration::from_secs(config.client.retry_delay_secs));

if retry_count >= config.client.max_retries {
println!("达到最大重试次数,跳过当前数据");
}
}
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
}
}

if retry_count >= config.client.max_retries {
println!("达到最大重试次数,程序退出");
}
}

+ 6
- 5
tcp_server/src/main.rs View File

@@ -4,11 +4,12 @@ use config::Config;

fn check_ok_message(message: &[u8]) -> u8 {
let message_str = String::from_utf8_lossy(message);
if message_str.contains("OK") {
0xFF
} else {
0x00
}
0xFF
// if message_str.contains("OK") {
// 0xFF
// } else {
// 0x00
// }
}

#[tokio::main]


BIN
tcp_server/target/debug/deps/tcp_server.exe View File


BIN
tcp_server/target/debug/deps/tcp_server.pdb View File


BIN
tcp_server/target/debug/tcp_server.exe View File


BIN
tcp_server/target/debug/tcp_server.pdb View File


Loading…
Cancel
Save