diff --git a/migrations/2025-05-12-152521_create_payload_data_table/down.sql b/migrations/2025-05-12-152521_create_payload_data_table/down.sql new file mode 100644 index 0000000000000000000000000000000000000000..637f098ab9f194e4091295dd52fb60a33911ef9b --- /dev/null +++ b/migrations/2025-05-12-152521_create_payload_data_table/down.sql @@ -0,0 +1,4 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS payload_data; +DROP TABLE IF EXISTS payload_data_replica1; +DROP TABLE IF EXISTS payload_data_replica2; \ No newline at end of file diff --git a/migrations/2025-05-12-152521_create_payload_data_table/up.sql b/migrations/2025-05-12-152521_create_payload_data_table/up.sql new file mode 100644 index 0000000000000000000000000000000000000000..51cc856c6c3c00d3fe4203eb71cb76721aca7582 --- /dev/null +++ b/migrations/2025-05-12-152521_create_payload_data_table/up.sql @@ -0,0 +1,21 @@ +-- Your SQL goes here +CREATE TABLE IF NOT EXISTS payload_data ( + cluster_id TEXT NOT NULL, + kind TEXT NOT NULL, + data TEXT NOT NULL, + created_time VARCHAR(256) +); + +CREATE TABLE IF NOT EXISTS payload_data_replica1 ( + cluster_id TEXT NOT NULL, + kind TEXT NOT NULL, + data TEXT NOT NULL, + created_time VARCHAR(256) +); + +CREATE TABLE IF NOT EXISTS payload_data_replica2 ( + cluster_id TEXT NOT NULL, + kind TEXT NOT NULL, + data TEXT NOT NULL, + created_time VARCHAR(256) +); \ No newline at end of file diff --git a/src/cores/handlers/mod.rs b/src/cores/handlers/mod.rs index 394b1bd2ce6136bb578bc958eebf4e5d31f3f7cf..43f480e81da29bf6f2d55c5033b2dacbe00ad5bc 100644 --- a/src/cores/handlers/mod.rs +++ b/src/cores/handlers/mod.rs @@ -8,3 +8,4 @@ pub mod router_topo; pub mod security; pub mod test; pub mod users; +pub mod payload; \ No newline at end of file diff --git a/src/cores/handlers/payload.rs b/src/cores/handlers/payload.rs new file mode 100644 index 0000000000000000000000000000000000000000..9b4e77ad9d80afcc529fb7169d70f8429f3ad31b --- /dev/null +++ b/src/cores/handlers/payload.rs @@ -0,0 +1,133 @@ +use std::sync::Arc; +use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use crate::cores::state::AppState; +use crate::db::delete::delete_from_payload; +use crate::db::get::get_data_from_payload; +use crate::db::insert::insert_payload; + +fn map_diesel_err(error: diesel::result::Error) -> ServerError { + ServerError::internal_error(error.to_string().as_str()) +} + +pub async fn insert_payload_handler( + app_state: Arc, + server_request: ServerRequest, +) -> ServerResult { + + let payload_kind = server_request.params.get("payload_kind").unwrap(); + + let kind = match payload_kind.as_str() { + "orbit-attitude" => "OrbitAttitude", + "perception" => "PerceptionPayload", + "routing" => "RoutingPayload", + "laser" => "LaserPayload", + "remote-sensing" => "RemoteSensingPayload", + "distribution" => "DistributionPayload", + _ => return Err(ServerError::bad_request("Unknown payload kind")), + }; + + let db_conn = app_state.db_pool.get_connection(); + let cluster_id = app_state.cluster_id.clone(); + + if let Err(e) = db_conn { + log::error!("error getting db conn: {}", e); + return Err(ServerError::internal_error("DB pool error")); + } + + let mut db_conn = db_conn.unwrap(); + + let body = server_request.body.as_ref().unwrap(); + + let data_str = std::str::from_utf8(body).map_err(|_| { + ServerError::bad_request("Request body is not valid UTF-8") + })?; + + + insert_payload( + &mut db_conn, + &cluster_id.as_str(), + kind, + data_str, + ).await.map_err(map_diesel_err)?; + + Ok(serde_json::to_value(data_str)?) +} + +pub async fn get_payload_handler( + app_state: Arc, + server_request: ServerRequest, +) -> ServerResult { + + let payload_kind = server_request.params.get("payload_kind").unwrap(); + + let kind = match payload_kind.as_str() { + "orbit-attitude" => "OrbitAttitude", + "perception" => "PerceptionPayload", + "routing" => "RoutingPayload", + "laser" => "LaserPayload", + "remote-sensing" => "RemoteSensingPayload", + "distribution" => "DistributionPayload", + _ => return Err(ServerError::bad_request("Unknown payload kind")), + }; + + let cluster_id = server_request.params.get("cluster_id").unwrap(); + + let db_conn = app_state.db_pool.get_connection(); + + if let Err(e) = db_conn { + log::error!("error getting db conn: {}", e); + return Err(ServerError::internal_error("DB pool error")); + } + + let mut db_conn = db_conn.unwrap(); + + let res = get_data_from_payload( + &mut db_conn, + cluster_id, + kind, + ).await.map_err(map_diesel_err)?; + + Ok(serde_json::to_value(res)?) +} + +pub async fn delete_payload_handler( + app_state: Arc, + server_request: ServerRequest, +) -> ServerResult { + + let payload_kind = server_request.params.get("payload_kind").unwrap(); + + let kind = match payload_kind.as_str() { + "orbit-attitude" => "OrbitAttitude", + "perception" => "PerceptionPayload", + "routing" => "RoutingPayload", + "laser" => "LaserPayload", + "remote-sensing" => "RemoteSensingPayload", + "distribution" => "DistributionPayload", + _ => return Err(ServerError::bad_request("Unknown payload kind")), + }; + + let cluster_id = server_request.params.get("cluster_id").unwrap(); + + let db_conn = app_state.db_pool.get_connection(); + + if let Err(e) = db_conn { + log::error!("error getting db conn: {}", e); + return Err(ServerError::internal_error("DB pool error")); + } + + let mut db_conn = db_conn.unwrap(); + + delete_from_payload( + &mut db_conn, + cluster_id, + kind, + ).await.map_err(map_diesel_err)?; + + Ok(json!({ + "cluster_id": cluster_id, + "kind": kind, + })) +} \ No newline at end of file diff --git a/src/cores/router.rs b/src/cores/router.rs index b1e61370c10a7c8fca74e5c5635b0d7c6bf2384c..3c95573a566289a330b9f1bafe0ecfa4895edbf5 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -73,6 +73,11 @@ pub enum RouterKey { ConsensusRaftChangeMembership, ConsensusRaftMetrics, + // Payload Mgr Router Key + InsertPayload, + GetPayload, + DeletePayload, + // Network Status Router Keys NetworkInterfaceRecord, NetworkInterfaceList, @@ -435,6 +440,24 @@ impl Router { ); } + { + add_route!( + router, + RouterKey::InsertPayload, + handlers::payload::insert_payload_handler + ); + add_route!( + router, + RouterKey::GetPayload, + handlers::payload::get_payload_handler + ); + add_route!( + router, + RouterKey::DeletePayload, + handlers::payload::delete_payload_handler + ); + } + // network status routers { add_route!( diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index c1440d9ad1e968bad7b3b9e04c11aa32b1863711..6d90ff9f62daabb5bb440914043d081f85047efd 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -84,6 +84,21 @@ macro_rules! init_app { name: "NetworkStatus".to_string(), description: Some("Network Status routes".to_string()), external_docs: None, + }, + Tag { + name: "Payload".to_string(), + description: Some("Payload Mgr routes".to_string()), + external_docs: None, + }, + Tag { + name: "Security".to_string(), + description: Some("Security routes".to_string()), + external_docs: None, + }, + Tag { + name: "Users".to_string(), + description: Some("Users routes".to_string()), + external_docs: None, } ]; App::new() @@ -101,6 +116,7 @@ macro_rules! init_app { .configure(router_mgr::configure_routes) .configure(router_topo::configure_routes) .configure(consensus::configure_routes) + .configure(payload::configure_routes) .configure(network_status::configure_routes) .with_json_spec_at("/api/spec/v2") .with_swagger_ui_at("/api/doc") @@ -1011,6 +1027,64 @@ pub mod consensus { ); } +pub mod payload { + use super::*; + + #[derive(Serialize, Deserialize, Apiv2Schema)] + pub struct PayloadKindPath { + pub payload_kind: String, + } + + #[derive(Serialize, Deserialize, Apiv2Schema)] + pub struct PayloadKindPathWithClusterId { + pub payload_kind: String, + pub cluster_id: String, + } + + pub fn configure_routes(app: &mut web::ServiceConfig) { + app.service( + web::scope("/payload") + .service(insert_payload) + .service(get_payload) + .service(delete_payload) + ); + } + + define_json_response_method!( + body_required + insert_payload, + post, + "/{payload_kind}", + PayloadKindPath, + (), + RouterKey::InsertPayload, + "Payload" + ); + + define_json_response_method!( + body_unrequired + get_payload, + get, + "/{payload_kind}/{cluster_id}", + PayloadKindPathWithClusterId, + (), + RouterKey::GetPayload, + "Payload" + ); + + define_json_response_method!( + body_unrequired + delete_payload, + delete, + "/{payload_kind}/{cluster_id}", + PayloadKindPathWithClusterId, + (), + RouterKey::DeletePayload, + "Payload" + ); + +} + pub mod network_status { use super::*; pub fn configure_routes(app: &mut web::ServiceConfig) { diff --git a/src/db/delete.rs b/src/db/delete.rs index 15993726b73ee08d5e0d849d7b6e5036ec74560b..81a8b2667d009dd9a88225eaa17e977499ea170c 100644 --- a/src/db/delete.rs +++ b/src/db/delete.rs @@ -181,6 +181,53 @@ pub async fn delete_from_routermgr( total_rows_affected += rows_affected; } + // 如果至少有两个表进行了删除,则返回 true + Ok(total_rows_affected > 1) +} + +pub async fn delete_from_payload( + conn: &mut DbConnection, + cluster_id: &str, + kind: &str, +) -> QueryResult { + + // 表名列表 + let tables = ["payload_data", "payload_data_replica1", "payload_data_replica2"]; + + // 遍历每个表,执行删除操作 + let mut total_rows_affected = 0; + + for &table in &tables { + let delete_query = match conn { + DbConnection::Pg(_) => format!( + "DELETE FROM {} WHERE cluster_id = $1 AND kind = $2", + table + ), + DbConnection::Sqlite(_) => format!( + "DELETE FROM {} WHERE cluster_id = ? AND kind = ?", + table + ), + }; + + // 执行删除 + let rows_affected = match conn { + DbConnection::Pg(pg_conn) => { + diesel::sql_query(delete_query) + .bind::(cluster_id) + .bind::(kind) + .execute(pg_conn)? + } + DbConnection::Sqlite(sqlite_conn) => { + diesel::sql_query(delete_query) + .bind::(cluster_id) + .bind::(kind) + .execute(sqlite_conn)? + } + }; + + total_rows_affected += rows_affected; + } + // 如果至少有两个表进行了删除,则返回 true Ok(total_rows_affected > 1) } \ No newline at end of file diff --git a/src/db/get.rs b/src/db/get.rs index f810e134d968031dbf17b19af2185b73bfe3bff3..3f26c8e760720a830e11bcded9a1f5caeb0fb854 100644 --- a/src/db/get.rs +++ b/src/db/get.rs @@ -990,7 +990,7 @@ pub async fn query_all_routermgr( use diesel::prelude::*; // use diesel::sql_types::{Integer, Text}; use std::collections::HashMap; - + // 定义要查询的表 let tables = ["routermgr", "routermgr_replica1", "routermgr_replica2"]; @@ -1010,13 +1010,13 @@ pub async fn query_all_routermgr( // let select_query = format!( // r#" - // SELECT - // msg_from, - // prefix, - // "prefixLen" AS prefix_len, - // "nextHop" AS next_hop, - // "intfID" AS intf_id, - // "OpCode" AS op_code + // SELECT + // msg_from, + // prefix, + // "prefixLen" AS prefix_len, + // "nextHop" AS next_hop, + // "intfID" AS intf_id, + // "OpCode" AS op_code // FROM {} // "#, // table @@ -1024,11 +1024,11 @@ pub async fn query_all_routermgr( let select_query = format!( r#" - SELECT - msg_from, - dst_ip, - "prefixLen" AS prefix_len, - "nextHop" AS next_hop, + SELECT + msg_from, + dst_ip, + "prefixLen" AS prefix_len, + "nextHop" AS next_hop, "intfID" AS intf_id FROM {} "#, @@ -1066,4 +1066,99 @@ pub async fn query_all_routermgr( .collect(); Ok(filtered_results) +} + +#[derive(Debug, QueryableByName, Serialize, Clone, PartialEq, Eq, Hash)] +pub struct PayloadInfoResult { + #[diesel(sql_type = Text)] + cluster_id: String, + #[diesel(sql_type = Text)] + kind: String, + #[diesel(sql_type = Text)] + data: String, + #[diesel(sql_type = Text)] + created_time: String, +} + +pub async fn get_data_from_payload( + conn: &mut DbConnection, + cluster_id: &str, + kind: &str, +) -> QueryResult> { + let tables = ["payload_data", "payload_data_replica1", "payload_data_replica2"]; + let mut count_map: HashMap = HashMap::new(); + + for &table in &tables { + let select_query = match conn { + DbConnection::Pg(_) => format!( + "SELECT cluster_id, kind, data, created_time FROM {} WHERE cluster_id = $1 AND kind = $2", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT cluster_id, kind, data, created_time FROM {} WHERE cluster_id = ? AND kind = ?", + table + ), + }; + + let results = match conn { + DbConnection::Pg(pg_conn) => { + diesel::sql_query(&select_query) + .bind::(cluster_id) + .bind::(kind) + .get_results::(pg_conn)? + } + DbConnection::Sqlite(sqlite_conn) => { + diesel::sql_query(&select_query) + .bind::(cluster_id) + .bind::(kind) + .get_results::(sqlite_conn)? + } + }; + + for record in results { + *count_map.entry(record).or_insert(0) += 1; + } + } + + let majority_records: Vec = count_map + .into_iter() + .filter_map(|(data, count)| if count >= 2 { Some(data) } else { None }) + .collect(); + + if majority_records.is_empty() { + // 回退主表所有记录 + get_all_from_payload_primary(conn, cluster_id, kind) + } else { + Ok(majority_records) + } +} + +fn get_all_from_payload_primary( + conn: &mut DbConnection, + cluster_id: &str, + kind: &str, +) -> QueryResult> { + let query = match conn { + DbConnection::Pg(_) => { + "SELECT cluster_id, kind, data, created_time FROM payload_data WHERE cluster_id = $1 AND kind = $2".to_string() + } + DbConnection::Sqlite(_) => { + "SELECT cluster_id, kind, data, created_time FROM payload_data WHERE cluster_id = ? AND kind = ?".to_string() + } + }; + + match conn { + DbConnection::Pg(pg_conn) => { + diesel::sql_query(query) + .bind::(cluster_id) + .bind::(kind) + .get_results::(pg_conn) + } + DbConnection::Sqlite(sqlite_conn) => { + diesel::sql_query(query) + .bind::(cluster_id) + .bind::(kind) + .get_results::(sqlite_conn) + } + } } \ No newline at end of file diff --git a/src/db/insert.rs b/src/db/insert.rs index ca70d3860b98e9fc6fa02246a16a1602ab75f3a9..b74988d58cd52eb9bbd24e7ebd8980059e8cec71 100644 --- a/src/db/insert.rs +++ b/src/db/insert.rs @@ -491,4 +491,87 @@ pub async fn insert_routermgr( } .expect("unknow conn in insert_routermgr"); Ok(()) +} + +fn insert_payload_in_transaction_pg( + transaction: &mut PgConnection, + cluster_id: &str, + kind: &str, + data: &str, +) -> QueryResult<()> { + // 表列表 + let table_array: [&str; 3] = ["payload_data", "payload_data_replica1", "payload_data_replica2"]; + + for table_name in table_array { + // 使用参数绑定构建插入查询 + let insert_users_query = format!( + "INSERT INTO {} (cluster_id, kind, data, created_time) + VALUES ($1, $2, $3, $4);", + table_name + ); + + // 执行插入操作 + sql_query(insert_users_query) + .bind::(cluster_id) + .bind::(kind) + .bind::(data) + .bind::(Utc::now().naive_utc().to_string()) + .execute(transaction)?; + } + + Ok(()) +} + +fn insert_payload_in_transaction_sqlite( + transaction: &mut SqliteConnection, + cluster_id: &str, + kind: &str, + data: &str, +) -> QueryResult<()> { + let table_array: [&str; 3] = ["payload_data", "payload_data_replica1", "payload_data_replica2"]; + + for table_name in table_array { + let insert_users_query = format!( + "INSERT OR IGNORE INTO {} (cluster_id, kind, data, created_time) + VALUES (?, ?, ?, ?);", + table_name + ); + + sql_query(insert_users_query) + .bind::(cluster_id) + .bind::(kind) + .bind::(data) + .bind::(Utc::now().naive_utc().to_string()) + .execute(transaction)?; + } + Ok(()) +} + +// 在 payload 表中插入新记录 +pub async fn insert_payload( + conn: &mut DbConnection, + cluster_id: &str, + kind: &str, + data: &str, +) -> QueryResult<()> { + match conn { + DbConnection::Pg(pg_conn) => pg_conn.transaction(|transaction| { + insert_payload_in_transaction_pg( + transaction, + cluster_id, + kind, + data, + ) + }), + DbConnection::Sqlite(sqlite_conn) => sqlite_conn.transaction(|transaction| { + insert_payload_in_transaction_sqlite( + transaction, + cluster_id, + kind, + data, + ) + }), + } + .expect("unknow conn in insert_payload"); + Ok(()) } \ No newline at end of file diff --git a/src/schema.rs b/src/schema.rs index 59b6eff156fe4c1952a3d4330ae1ab6a6b32ccb4..0f94c5f99069852f7345d6482307626c3a8d1efd 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -96,6 +96,36 @@ diesel::table! { } } +diesel::table! { + payload_data (rowid) { + rowid -> Integer, + cluster_id -> Text, + kind -> Text, + data -> Text, + created_time -> Nullable, + } +} + +diesel::table! { + payload_data_replica1 (rowid) { + rowid -> Integer, + cluster_id -> Text, + kind -> Text, + data -> Text, + created_time -> Nullable, + } +} + +diesel::table! { + payload_data_replica2 (rowid) { + rowid -> Integer, + cluster_id -> Text, + kind -> Text, + data -> Text, + created_time -> Nullable, + } +} + diesel::table! { network_interfaces (id) { id -> Nullable, @@ -244,6 +274,9 @@ diesel::allow_tables_to_appear_in_same_query!( metadata, metadata_replica1, metadata_replica2, + payload_data, + payload_data_replica1, + payload_data_replica2, network_interfaces, network_interfaces_replica1, network_interfaces_replica2,