diff --git a/migrations/2025-04-01-015135_create_routermgr_table/up.sql b/migrations/2025-04-01-015135_create_routermgr_table/up.sql index 23bc69c2270dfce11a98dc75ba835d53c33ecddb..25c886cb5c8ae25586548521a6abc521d519949a 100644 --- a/migrations/2025-04-01-015135_create_routermgr_table/up.sql +++ b/migrations/2025-04-01-015135_create_routermgr_table/up.sql @@ -1,27 +1,27 @@ -- Your SQL goes here CREATE TABLE IF NOT EXISTS routermgr ( msg_from TEXT NOT NULL, - prefix TEXT NOT NULL, + dst_ip TEXT NOT NULL, prefixLen INTEGER NOT NULL, nextHop TEXT NOT NULL, intfID INTEGER NOT NULL, - PRIMARY KEY (msg_from, prefix, prefixLen) + PRIMARY KEY (msg_from, dst_ip, prefixLen) ); CREATE TABLE IF NOT EXISTS routermgr_replica1 ( msg_from TEXT NOT NULL, - prefix TEXT NOT NULL, + dst_ip TEXT NOT NULL, prefixLen INTEGER NOT NULL, nextHop TEXT NOT NULL, intfID INTEGER NOT NULL, - PRIMARY KEY (msg_from, prefix, prefixLen) + PRIMARY KEY (msg_from, dst_ip, prefixLen) ); CREATE TABLE IF NOT EXISTS routermgr_replica2 ( msg_from TEXT NOT NULL, - prefix TEXT NOT NULL, + dst_ip TEXT NOT NULL, prefixLen INTEGER NOT NULL, nextHop TEXT NOT NULL, intfID INTEGER NOT NULL, - PRIMARY KEY (msg_from, prefix, prefixLen) + PRIMARY KEY (msg_from, dst_ip, prefixLen) ); \ No newline at end of file diff --git a/src/cores/handlers/mod.rs b/src/cores/handlers/mod.rs index 6bc76cbab7a4aefa147d1dfb8d85e1659821ef24..394b1bd2ce6136bb578bc958eebf4e5d31f3f7cf 100644 --- a/src/cores/handlers/mod.rs +++ b/src/cores/handlers/mod.rs @@ -4,6 +4,7 @@ pub mod consensus; pub mod datamgr; pub mod network_status; pub mod router_mgr; +pub mod router_topo; pub mod security; pub mod test; pub mod users; diff --git a/src/cores/handlers/router_mgr.rs b/src/cores/handlers/router_mgr.rs index f8d4c639922e023998c519761dce3bfa11710261..891cc8b55f97215370994bee01540120a407b583 100644 --- a/src/cores/handlers/router_mgr.rs +++ b/src/cores/handlers/router_mgr.rs @@ -14,7 +14,7 @@ fn map_diesel_err(error: diesel::result::Error) -> ServerError { #[derive(Debug, Serialize, Deserialize)] struct RouterMgrInfoRequest { - prefix: String, + dst_ip: String, #[serde(rename = "prefixLen")] prefix_len: i32, #[serde(rename = "nextHop")] @@ -54,43 +54,43 @@ pub async fn routermgr_action_handler( let router_exists = check_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len ).await.map_err(map_diesel_err)?; - if router_exists { - return Err(ServerError::duplicated("This router status is exist")); + if !router_exists { + return Err(ServerError::duplicated("This router status is not exist")); } - insert_routermgr( + delete_from_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len, - &req.next_hop, - req.intf_id, ).await.map_err(map_diesel_err)?; Ok(serde_json::to_value(req)?) - }, + }, 1 => { let router_exists = check_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len ).await.map_err(map_diesel_err)?; - if !router_exists { - return Err(ServerError::duplicated("This router status is not exist")); + if router_exists { + return Err(ServerError::duplicated("This router status is exist")); } - delete_from_routermgr( + insert_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len, + &req.next_hop, + req.intf_id, ).await.map_err(map_diesel_err)?; Ok(serde_json::to_value(req)?) @@ -100,7 +100,7 @@ pub async fn routermgr_action_handler( let router_exists = check_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len ).await.map_err(map_diesel_err)?; @@ -111,7 +111,7 @@ pub async fn routermgr_action_handler( update_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len, &req.next_hop, req.intf_id, diff --git a/src/cores/handlers/router_topo.rs b/src/cores/handlers/router_topo.rs new file mode 100644 index 0000000000000000000000000000000000000000..4bff97c30494e2109a71cec09d0c499a9152c069 --- /dev/null +++ b/src/cores/handlers/router_topo.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; +use std::collections::HashMap; +use crate::cores::state::AppState; +use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; +use serde_json::Value; +// 导入路由结构体 +use crate::db::get::{query_all_routermgr, RouterMgrInfoResult}; + + +// 构建拓扑:源 -> (目的, 出端口) +fn build_topology(entries: &Vec) -> HashMap> { + let mut topo = HashMap::new(); + for entry in entries { + topo.entry(entry.msg_from().to_string()) + .or_insert_with(Vec::new) + .push((entry.next_hop().to_string(), entry.intf_id())); + } + topo +} + +// 请求处理:生成拓扑 +pub async fn generate_topology_handler( + app_state: Arc, + _server_request: ServerRequest, +) -> ServerResult { + // 从数据库连接池中获取连接 + let db_conn = app_state.db_pool.get_connection(); + if let Err(e) = db_conn { + log::error!("error getting db conn: {}", e); + return Err(ServerError::internal_error("DB pool error")); + } + let mut db_conn = db_conn.unwrap(); + + // 查询路由管理数据 + let entries = query_all_routermgr(&mut db_conn) + .await + .map_err(|e| ServerError::internal_error(&format!("Query error: {}", e)))?; + + // 直接使用查询结果构建拓扑 + let topology = build_topology(&entries); + + // 返回拓扑结构 + Ok(serde_json::to_value(topology)?) +} \ No newline at end of file diff --git a/src/cores/router.rs b/src/cores/router.rs index 2f9d26ddab5b20df4788eb6f8b81c75788283fe7..b1e61370c10a7c8fca74e5c5635b0d7c6bf2384c 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -42,6 +42,9 @@ pub enum RouterKey { // router_mgr key RouterMgrAction, + // router topo key + GenerateTopology, + // Data mgr router key DataMgrUploadData, DataMgrQueryData, @@ -280,6 +283,15 @@ impl Router { ); } + // router_topo routers + { + add_route!( + router, + RouterKey::GenerateTopology, + handlers::router_topo::generate_topology_handler + ); + } + // data mgr routers { add_route!( diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 03e5ec49aa7bac3bb2d51d3bfcdda6fbd2f54c88..c1440d9ad1e968bad7b3b9e04c11aa32b1863711 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -99,6 +99,7 @@ macro_rules! init_app { .configure(users::configure_routes) .configure(security::configure_routes) .configure(router_mgr::configure_routes) + .configure(router_topo::configure_routes) .configure(consensus::configure_routes) .configure(network_status::configure_routes) .with_json_spec_at("/api/spec/v2") @@ -675,6 +676,26 @@ pub mod router_mgr { ); } +pub mod router_topo { + use super::*; + + pub fn configure_routes(app: &mut web::ServiceConfig) { + // 配置拓扑生成接口 + app.service(web::scope("/topology").service(generate_topology)); + } + + define_json_response_method!( + body_required + generate_topology, + post, + "/generate", + (), + (), + RouterKey::GenerateTopology, + "RouterTopology" + ); +} + pub mod datamgr { use super::*; use paperclip::actix::Apiv2Schema; diff --git a/src/db/check_exist.rs b/src/db/check_exist.rs index b576ae3af7ee3571c5d3c7dfecea2ef9e551924b..2c94976b3a80279102fbcdf0d30af03f67599233 100644 --- a/src/db/check_exist.rs +++ b/src/db/check_exist.rs @@ -300,7 +300,7 @@ pub async fn check_users_by_id( pub async fn check_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, ) -> QueryResult { use crate::schema::routermgr::dsl as routermgr_dsl; @@ -315,21 +315,21 @@ pub async fn check_routermgr( DbConnection::Pg(pg_conn) => { count = routermgr_dsl::routermgr .filter(routermgr::msg_from.eq(msg_from)) - .filter(routermgr::prefix.eq(prefix)) + .filter(routermgr::dst_ip.eq(dst_ip)) .filter(routermgr::prefixLen.eq(prefix_len)) .select(count_star()) .first::(pg_conn)?; count_replica1 = replica1_dsl::routermgr_replica1 .filter(replica1_dsl::msg_from.eq(msg_from)) - .filter(replica1_dsl::prefix.eq(prefix)) + .filter(replica1_dsl::dst_ip.eq(dst_ip)) .filter(replica1_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(pg_conn)?; count_replica2 = replica2_dsl::routermgr_replica2 .filter(replica2_dsl::msg_from.eq(msg_from)) - .filter(replica2_dsl::prefix.eq(prefix)) + .filter(replica2_dsl::dst_ip.eq(dst_ip)) .filter(replica2_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(pg_conn)?; @@ -337,21 +337,21 @@ pub async fn check_routermgr( DbConnection::Sqlite(sqlite_conn) => { count = routermgr_dsl::routermgr .filter(routermgr::msg_from.eq(msg_from)) - .filter(routermgr::prefix.eq(prefix)) + .filter(routermgr::dst_ip.eq(dst_ip)) .filter(routermgr::prefixLen.eq(prefix_len)) .select(count_star()) .first::(sqlite_conn)?; count_replica1 = replica1_dsl::routermgr_replica1 .filter(replica1_dsl::msg_from.eq(msg_from)) - .filter(replica1_dsl::prefix.eq(prefix)) + .filter(replica1_dsl::dst_ip.eq(dst_ip)) .filter(replica1_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(sqlite_conn)?; count_replica2 = replica2_dsl::routermgr_replica2 .filter(replica2_dsl::msg_from.eq(msg_from)) - .filter(replica2_dsl::prefix.eq(prefix)) + .filter(replica2_dsl::dst_ip.eq(dst_ip)) .filter(replica2_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(sqlite_conn)?; diff --git a/src/db/delete.rs b/src/db/delete.rs index 6adce16bbd46c798a8a935a22df6e8aa3c33234e..15993726b73ee08d5e0d849d7b6e5036ec74560b 100644 --- a/src/db/delete.rs +++ b/src/db/delete.rs @@ -138,7 +138,7 @@ pub async fn delete_from_users_by_id( pub async fn delete_from_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, ) -> QueryResult { @@ -151,11 +151,11 @@ pub async fn delete_from_routermgr( for &table in &tables { let delete_query = match conn { DbConnection::Pg(_) => format!( - "DELETE FROM {} WHERE msg_from = $1 AND prefix = $2 AND prefixLen = $3", + "DELETE FROM {} WHERE msg_from = $1 AND dst_ip = $2 AND prefixLen = $3", table ), DbConnection::Sqlite(_) => format!( - "DELETE FROM {} WHERE msg_from = ? AND prefix = ? AND prefixLen = ?", + "DELETE FROM {} WHERE msg_from = ? AND dst_ip = ? AND prefixLen = ?", table ), }; @@ -165,14 +165,14 @@ pub async fn delete_from_routermgr( DbConnection::Pg(pg_conn) => { diesel::sql_query(delete_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(pg_conn)? } DbConnection::Sqlite(sqlite_conn) => { diesel::sql_query(delete_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(sqlite_conn)? } diff --git a/src/db/get.rs b/src/db/get.rs index 71ef55f051ec67bdab2f70f317607e89f8543d5b..f810e134d968031dbf17b19af2185b73bfe3bff3 100644 --- a/src/db/get.rs +++ b/src/db/get.rs @@ -930,4 +930,140 @@ fn get_data_from_security_primary_by_image_id( .map(|res| res.map(|data_result| data_result)) } } +} + + +use serde::Deserialize; +use diesel::sql_types::Integer; +#[derive(Debug, QueryableByName, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)] +pub struct RouterMgrInfoResult { + #[diesel(sql_type = Text)] + msg_from: String, + #[diesel(sql_type = Text)] + dst_ip: String, + #[diesel(sql_type = Integer)] + #[serde(rename = "prefixLen")] + prefix_len: i32, + #[diesel(sql_type = Text)] + #[serde(rename = "nextHop")] + next_hop: String, + #[diesel(sql_type = Integer)] + #[serde(rename = "intfID")] + intf_id: i32, + // #[diesel(sql_type = Integer)] + // #[serde(rename = "OpCode")] + // op_code: i32 +} + + + +impl RouterMgrInfoResult { + pub fn msg_from(&self) -> &str { + &self.msg_from + } + + pub fn dst_ip(&self) -> &str { + &self.dst_ip + } + + pub fn prefix_len(&self) -> i32 { + self.prefix_len + } + + pub fn next_hop(&self) -> &str { + &self.next_hop + } + + pub fn intf_id(&self) -> i32 { + self.intf_id + } + + // pub fn op_code(&self) -> i32 { + // self.op_code + // } +} + + +pub async fn query_all_routermgr( + conn: &mut DbConnection, +) -> Result, diesel::result::Error> { + use diesel::prelude::*; + // use diesel::sql_types::{Integer, Text}; + use std::collections::HashMap; + + // 定义要查询的表 + let tables = ["routermgr", "routermgr_replica1", "routermgr_replica2"]; + + // 存储每个表的查询结果 + let mut table_results: HashMap<&str, Vec> = HashMap::new(); + + for &table in &tables { + // 构建 SQL 查询语句 + // let select_query = format!( + // "SELECT msg_from, prefix, prefixLen, nextHop, intfID FROM {}", + // table + // ); + // let select_query = format!( + // "SELECT * FROM {}", + // table + // ); + + // let select_query = format!( + // r#" + // SELECT + // msg_from, + // prefix, + // "prefixLen" AS prefix_len, + // "nextHop" AS next_hop, + // "intfID" AS intf_id, + // "OpCode" AS op_code + // FROM {} + // "#, + // table + // ); + + let select_query = format!( + r#" + SELECT + msg_from, + dst_ip, + "prefixLen" AS prefix_len, + "nextHop" AS next_hop, + "intfID" AS intf_id + FROM {} + "#, + table + ); + + // 执行查询 + let results: Vec = match conn { + DbConnection::Pg(pg_conn) => { + diesel::sql_query(&select_query) + .load::(pg_conn)? + } + DbConnection::Sqlite(sqlite_conn) => { + diesel::sql_query(&select_query) + .load::(sqlite_conn)? + } + }; + + table_results.insert(table, results); + } + + // 统计每条记录在三个表中的出现次数 + let mut record_counts: HashMap = HashMap::new(); + for records in table_results.values() { + for record in records { + *record_counts.entry(record.clone()).or_insert(0) += 1; + } + } + + // 筛选出在至少两个表中出现的记录 + let filtered_results: Vec = record_counts + .into_iter() + .filter(|&(_, count)| count >= 2) + .map(|(record, _)| record) + .collect(); + + Ok(filtered_results) } \ No newline at end of file diff --git a/src/db/insert.rs b/src/db/insert.rs index 32af52ca91df72a081b0b44580b3123412ad391e..ca70d3860b98e9fc6fa02246a16a1602ab75f3a9 100644 --- a/src/db/insert.rs +++ b/src/db/insert.rs @@ -401,7 +401,7 @@ pub async fn insert_security( fn insert_routermgr_in_transaction_pg( transaction: &mut PgConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -412,7 +412,7 @@ fn insert_routermgr_in_transaction_pg( for table_name in table_array { // 使用参数绑定构建插入查询 let insert_users_query = format!( - "INSERT INTO {} (msg_from, prefix, prefixLen, nextHop, intfID) + "INSERT INTO {} (msg_from, dst_ip, prefixLen, nextHop, intfID) VALUES ($1, $2, $3, $4, $5);", table_name ); @@ -420,7 +420,7 @@ fn insert_routermgr_in_transaction_pg( // 执行插入操作 sql_query(insert_users_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .bind::(next_hop) .bind::(intf_id) @@ -433,7 +433,7 @@ fn insert_routermgr_in_transaction_pg( fn insert_routermgr_in_transaction_sqlite( transaction: &mut SqliteConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -442,14 +442,14 @@ fn insert_routermgr_in_transaction_sqlite( for table_name in table_array { let insert_users_query = format!( - "INSERT OR IGNORE INTO {} (msg_from, prefix, prefixLen, nextHop, intfID) + "INSERT OR IGNORE INTO {} (msg_from, dst_ip, prefixLen, nextHop, intfID) VALUES (?, ?, ?, ?, ?);", table_name ); sql_query(insert_users_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .bind::(next_hop) .bind::(intf_id) @@ -462,7 +462,7 @@ fn insert_routermgr_in_transaction_sqlite( pub async fn insert_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -472,7 +472,7 @@ pub async fn insert_routermgr( insert_routermgr_in_transaction_pg( transaction, msg_from, - prefix, + dst_ip, prefix_len, next_hop, intf_id, @@ -482,7 +482,7 @@ pub async fn insert_routermgr( insert_routermgr_in_transaction_sqlite( transaction, msg_from, - prefix, + dst_ip, prefix_len, next_hop, intf_id, diff --git a/src/db/update.rs b/src/db/update.rs index 88b377a0549d709375aaddc5994bc065e0579345..5583eb449ff1401912c19f0c3cec83df55dfbf53 100644 --- a/src/db/update.rs +++ b/src/db/update.rs @@ -198,7 +198,7 @@ pub async fn update_username_in_users_by_id( pub async fn update_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -211,11 +211,11 @@ pub async fn update_routermgr( for &table in &tables { let update_query = match conn { DbConnection::Pg(_) => format!( - "UPDATE {} SET nextHop = $1, intfID = $2 WHERE msg_from = $3 AND prefix = $4 AND prefixLen = $5", + "UPDATE {} SET nextHop = $1, intfID = $2 WHERE msg_from = $3 AND dst_ip = $4 AND prefixLen = $5", table ), DbConnection::Sqlite(_) => format!( - "UPDATE {} SET nextHop = ?, intfID = ? WHERE msg_from = ? AND prefix = ? AND prefixLen = ?", + "UPDATE {} SET nextHop = ?, intfID = ? WHERE msg_from = ? AND dst_ip = ? AND prefixLen = ?", table ), }; @@ -227,7 +227,7 @@ pub async fn update_routermgr( .bind::(next_hop) .bind::(intf_id) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(pg_conn)? @@ -237,7 +237,7 @@ pub async fn update_routermgr( .bind::(next_hop) .bind::(intf_id) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(sqlite_conn)? } diff --git a/src/schema.rs b/src/schema.rs index d17c6c6b49d3c232d58810dd3255cbec0d054e76..59b6eff156fe4c1952a3d4330ae1ab6a6b32ccb4 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -136,9 +136,9 @@ diesel::table! { } diesel::table! { - routermgr (msg_from, prefix, prefixLen) { + routermgr (msg_from, dst_ip, prefixLen) { msg_from -> Text, - prefix -> Text, + dst_ip -> Text, prefixLen -> Integer, nextHop -> Text, intfID -> Integer, @@ -146,9 +146,9 @@ diesel::table! { } diesel::table! { - routermgr_replica1 (msg_from, prefix, prefixLen) { + routermgr_replica1 (msg_from, dst_ip, prefixLen) { msg_from -> Text, - prefix -> Text, + dst_ip -> Text, prefixLen -> Integer, nextHop -> Text, intfID -> Integer, @@ -156,9 +156,9 @@ diesel::table! { } diesel::table! { - routermgr_replica2 (msg_from, prefix, prefixLen) { + routermgr_replica2 (msg_from, dst_ip, prefixLen) { msg_from -> Text, - prefix -> Text, + dst_ip -> Text, prefixLen -> Integer, nextHop -> Text, intfID -> Integer,