Explorar el Código

增加定时执行时间

晋江
OCEAN hace 2 meses
padre
commit
62944d173f
Se han modificado 2 ficheros con 98 adiciones y 89 borrados
  1. +6
    -5
      tcp_client/config.toml
  2. +92
    -84
      tcp_client/src/main.rs

+ 6
- 5
tcp_client/config.toml Ver fichero

@@ -5,10 +5,12 @@ host = "10.180.4.88"
port = 9090

[client]
max_retries = 2
retry_delay_secs = 3
read_timeout_secs = 5
write_timeout_secs = 5
# 定时任务配置
interval_seconds = 60 # 每隔多少秒执行一次
max_retries = 3
read_timeout_secs = 10
write_timeout_secs = 10
retry_delay_secs = 2

[database]
#host = "10.180.4.100"
@@ -24,4 +26,3 @@ name = "hy_record"
query = "SELECT * FROM hy_record "
incremental = false # 是否增量同步
key_field = "UpdateTime" # 增量同步的时间字段


+ 92
- 84
tcp_client/src/main.rs Ver fichero

@@ -2,8 +2,8 @@ mod db;

use std::net::TcpStream;
use std::io::{Read, Write};
use std::thread;
use std::time::Duration;
use std::thread;
use serde::Deserialize;
use config::Config;
use db::{Database, DatabaseConfig, TableConfig};
@@ -20,6 +20,7 @@ struct ClientConfig {
retry_delay_secs: u64,
read_timeout_secs: u64,
write_timeout_secs: u64,
interval_seconds: u64, // 定时任务间隔时间
}

#[derive(Debug, Deserialize)]
@@ -49,110 +50,117 @@ async fn main() {
}
};

// 连接数据库
let mut db = match Database::connect(&config.database).await {
Ok(db) => {
println!("数据库连接成功!");
db
}
Err(e) => {
println!("数据库连接失败: {}", e);
return;
loop {
// 连接数据库
let mut db = match Database::connect(&config.database).await {
Ok(db) => {
println!("数据库连接成功!");
db
}
Err(e) => {
println!("数据库连接失败: {}", e);
thread::sleep(Duration::from_secs(config.client.interval_seconds));
continue;
}
};

// 测试数据库连接
match db.test_connection().await {
Ok(true) => println!("数据库连接测试成功"),
Ok(false) => println!("数据库连接测试失败"),
Err(e) => println!("数据库测试错误: {}", e),
}
};

// 测试数据库连接
match db.test_connection().await {
Ok(true) => println!("数据库连接测试成功"),
Ok(false) => println!("数据库连接测试失败"),
Err(e) => println!("数据库测试错误: {}", e),
}
// 获取配置的表数据
for table in &config.tables {
println!("\n开始获取表 {} 的数据...", table.name);
match db.get_table_data(table).await {
Ok(rows) => {
println!("成功获取 {} 条记录", rows.len());
// 将每行数据转换为JSON并发送到服务器
for row in rows {
let json_data = db::format_row_as_json(&row, &table.name);
let msg = serde_json::to_string(&json_data).unwrap();

// 获取配置的表数据
for table in &config.tables {
println!("\n开始获取表 {} 的数据...", table.name);
match db.get_table_data(table).await {
Ok(rows) => {
println!("成功获取 {} 条记录", rows.len());
// 将每行数据转换为JSON并发送到服务器
for row in rows {
let json_data = db::format_row_as_json(&row, &table.name);
let msg = serde_json::to_string(&json_data).unwrap();

// 创建TCP连接
let server_addr = format!("{}:{}", config.server.host, config.server.port);
let mut retry_count = 0;

while retry_count < config.client.max_retries {
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
println!("成功连接到服务器!");
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) {
println!("设置读取超时失败: {}", e);
continue;
}
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) {
println!("设置写入超时失败: {}", e);
continue;
}
// 创建TCP连接
let server_addr = format!("{}:{}", config.server.host, config.server.port);
let mut retry_count = 0;

// 发送数据
match stream.write_all(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => {
println!("发送消息失败: {}", e);
retry_count += 1;
while retry_count < config.client.max_retries {
println!("\n尝试连接服务器 {} (第{}次)...", server_addr, retry_count + 1);
match TcpStream::connect(&server_addr) {
Ok(mut stream) => {
println!("成功连接到服务器!");
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(config.client.read_timeout_secs))) {
println!("设置读取超时失败: {}", e);
continue;
}
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(config.client.write_timeout_secs))) {
println!("设置写入超时失败: {}", e);
continue;
}
}

// 读取响应
let mut buffer = [0; 1];
match stream.read_exact(&mut buffer) {
Ok(_) => {
let bit = buffer[0];
// println!("接收到比特值: {}", bit);
if(bit==255){
println!("接收成功");
break;
}
else if(bit==0){
println!("接收失败");
// 发送数据
match stream.write_all(msg.as_bytes()) {
Ok(_) => println!("成功发送消息: {}", msg),
Err(e) => {
println!("发送消息失败: {}", e);
retry_count += 1;
continue;
}
else{
println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit);
}

// 读取响应
let mut buffer = [0; 1];
match stream.read_exact(&mut buffer) {
Ok(_) => {
let bit = buffer[0];
// println!("接收到比特值: {}", bit);
if(bit==255){
println!("接收成功");
break;
}
else if(bit==0){
println!("接收失败");
retry_count += 1;
continue;
}
else{
println!("接收到非预期的值: {} (十六进制: {:02X})", bit, bit);
retry_count += 1;
}
}
Err(e) => {
println!("接收数据失败: {}", e);
retry_count += 1;
}
}
Err(e) => {
println!("接收数据失败: {}", e);
retry_count += 1;
}
}
}
Err(e) => {
println!("连接服务器失败: {}", e);
retry_count += 1;
if retry_count < config.client.max_retries {
println!("等待{}秒后重试...", config.client.retry_delay_secs);
thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
Err(e) => {
println!("连接服务器失败: {}", e);
retry_count += 1;
if retry_count < config.client.max_retries {
println!("等待{}秒后重试...", config.client.retry_delay_secs);
thread::sleep(Duration::from_secs(config.client.retry_delay_secs));
}
}
}
}
}

if retry_count >= config.client.max_retries {
println!("达到最大重试次数,跳过当前数据");
if retry_count >= config.client.max_retries {
println!("达到最大重试次数,跳过当前数据");
}
}
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
}
Err(e) => println!("获取表 {} 数据失败: {}", table.name, e),
}

// 等待下一次执行
println!("等待 {} 秒后执行下一次同步...", config.client.interval_seconds);
thread::sleep(Duration::from_secs(config.client.interval_seconds));
}
}

Cargando…
Cancelar
Guardar