From f290ca212ce5f8250fe5e8ea16cc3b4e8028cbac Mon Sep 17 00:00:00 2001 From: lulingliuwang Date: Mon, 26 May 2025 16:31:16 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=93=8D=E4=BD=9C=E7=A0=81=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E4=BF=AE=E6=94=B9,prefix=E5=AD=97=E6=AE=B5=E6=94=B9?= =?UTF-8?q?=E4=B8=BAdst=5Fip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../up.sql | 12 +++---- src/cores/handlers/router_mgr.rs | 32 +++++++++---------- src/cores/handlers/router_topo.rs | 0 src/db/check_exist.rs | 14 ++++---- src/db/delete.rs | 10 +++--- src/db/insert.rs | 18 +++++------ src/db/update.rs | 10 +++--- src/schema.rs | 12 +++---- 8 files changed, 54 insertions(+), 54 deletions(-) create mode 100644 src/cores/handlers/router_topo.rs diff --git a/migrations/2025-04-01-015135_create_routermgr_table/up.sql b/migrations/2025-04-01-015135_create_routermgr_table/up.sql index 23bc69c..25c886c 100644 --- a/migrations/2025-04-01-015135_create_routermgr_table/up.sql +++ b/migrations/2025-04-01-015135_create_routermgr_table/up.sql @@ -1,27 +1,27 @@ -- Your SQL goes here CREATE TABLE IF NOT EXISTS routermgr ( msg_from TEXT NOT NULL, - prefix TEXT NOT NULL, + dst_ip TEXT NOT NULL, prefixLen INTEGER NOT NULL, nextHop TEXT NOT NULL, intfID INTEGER NOT NULL, - PRIMARY KEY (msg_from, prefix, prefixLen) + PRIMARY KEY (msg_from, dst_ip, prefixLen) ); CREATE TABLE IF NOT EXISTS routermgr_replica1 ( msg_from TEXT NOT NULL, - prefix TEXT NOT NULL, + dst_ip TEXT NOT NULL, prefixLen INTEGER NOT NULL, nextHop TEXT NOT NULL, intfID INTEGER NOT NULL, - PRIMARY KEY (msg_from, prefix, prefixLen) + PRIMARY KEY (msg_from, dst_ip, prefixLen) ); CREATE TABLE IF NOT EXISTS routermgr_replica2 ( msg_from TEXT NOT NULL, - prefix TEXT NOT NULL, + dst_ip TEXT NOT NULL, prefixLen INTEGER NOT NULL, nextHop TEXT NOT NULL, intfID INTEGER NOT NULL, - PRIMARY KEY (msg_from, prefix, prefixLen) + PRIMARY KEY (msg_from, dst_ip, prefixLen) ); \ No newline at end of file diff --git a/src/cores/handlers/router_mgr.rs b/src/cores/handlers/router_mgr.rs index f8d4c63..891cc8b 100644 --- a/src/cores/handlers/router_mgr.rs +++ b/src/cores/handlers/router_mgr.rs @@ -14,7 +14,7 @@ fn map_diesel_err(error: diesel::result::Error) -> ServerError { #[derive(Debug, Serialize, Deserialize)] struct RouterMgrInfoRequest { - prefix: String, + dst_ip: String, #[serde(rename = "prefixLen")] prefix_len: i32, #[serde(rename = "nextHop")] @@ -54,43 +54,43 @@ pub async fn routermgr_action_handler( let router_exists = check_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len ).await.map_err(map_diesel_err)?; - if router_exists { - return Err(ServerError::duplicated("This router status is exist")); + if !router_exists { + return Err(ServerError::duplicated("This router status is not exist")); } - insert_routermgr( + delete_from_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len, - &req.next_hop, - req.intf_id, ).await.map_err(map_diesel_err)?; Ok(serde_json::to_value(req)?) - }, + }, 1 => { let router_exists = check_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len ).await.map_err(map_diesel_err)?; - if !router_exists { - return Err(ServerError::duplicated("This router status is not exist")); + if router_exists { + return Err(ServerError::duplicated("This router status is exist")); } - delete_from_routermgr( + insert_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len, + &req.next_hop, + req.intf_id, ).await.map_err(map_diesel_err)?; Ok(serde_json::to_value(req)?) @@ -100,7 +100,7 @@ pub async fn routermgr_action_handler( let router_exists = check_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len ).await.map_err(map_diesel_err)?; @@ -111,7 +111,7 @@ pub async fn routermgr_action_handler( update_routermgr( &mut db_conn, &msg_from, - &req.prefix, + &req.dst_ip, req.prefix_len, &req.next_hop, req.intf_id, diff --git a/src/cores/handlers/router_topo.rs b/src/cores/handlers/router_topo.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/db/check_exist.rs b/src/db/check_exist.rs index b576ae3..2c94976 100644 --- a/src/db/check_exist.rs +++ b/src/db/check_exist.rs @@ -300,7 +300,7 @@ pub async fn check_users_by_id( pub async fn check_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, ) -> QueryResult { use crate::schema::routermgr::dsl as routermgr_dsl; @@ -315,21 +315,21 @@ pub async fn check_routermgr( DbConnection::Pg(pg_conn) => { count = routermgr_dsl::routermgr .filter(routermgr::msg_from.eq(msg_from)) - .filter(routermgr::prefix.eq(prefix)) + .filter(routermgr::dst_ip.eq(dst_ip)) .filter(routermgr::prefixLen.eq(prefix_len)) .select(count_star()) .first::(pg_conn)?; count_replica1 = replica1_dsl::routermgr_replica1 .filter(replica1_dsl::msg_from.eq(msg_from)) - .filter(replica1_dsl::prefix.eq(prefix)) + .filter(replica1_dsl::dst_ip.eq(dst_ip)) .filter(replica1_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(pg_conn)?; count_replica2 = replica2_dsl::routermgr_replica2 .filter(replica2_dsl::msg_from.eq(msg_from)) - .filter(replica2_dsl::prefix.eq(prefix)) + .filter(replica2_dsl::dst_ip.eq(dst_ip)) .filter(replica2_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(pg_conn)?; @@ -337,21 +337,21 @@ pub async fn check_routermgr( DbConnection::Sqlite(sqlite_conn) => { count = routermgr_dsl::routermgr .filter(routermgr::msg_from.eq(msg_from)) - .filter(routermgr::prefix.eq(prefix)) + .filter(routermgr::dst_ip.eq(dst_ip)) .filter(routermgr::prefixLen.eq(prefix_len)) .select(count_star()) .first::(sqlite_conn)?; count_replica1 = replica1_dsl::routermgr_replica1 .filter(replica1_dsl::msg_from.eq(msg_from)) - .filter(replica1_dsl::prefix.eq(prefix)) + .filter(replica1_dsl::dst_ip.eq(dst_ip)) .filter(replica1_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(sqlite_conn)?; count_replica2 = replica2_dsl::routermgr_replica2 .filter(replica2_dsl::msg_from.eq(msg_from)) - .filter(replica2_dsl::prefix.eq(prefix)) + .filter(replica2_dsl::dst_ip.eq(dst_ip)) .filter(replica2_dsl::prefixLen.eq(prefix_len)) .select(count_star()) .first::(sqlite_conn)?; diff --git a/src/db/delete.rs b/src/db/delete.rs index 6adce16..1599372 100644 --- a/src/db/delete.rs +++ b/src/db/delete.rs @@ -138,7 +138,7 @@ pub async fn delete_from_users_by_id( pub async fn delete_from_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, ) -> QueryResult { @@ -151,11 +151,11 @@ pub async fn delete_from_routermgr( for &table in &tables { let delete_query = match conn { DbConnection::Pg(_) => format!( - "DELETE FROM {} WHERE msg_from = $1 AND prefix = $2 AND prefixLen = $3", + "DELETE FROM {} WHERE msg_from = $1 AND dst_ip = $2 AND prefixLen = $3", table ), DbConnection::Sqlite(_) => format!( - "DELETE FROM {} WHERE msg_from = ? AND prefix = ? AND prefixLen = ?", + "DELETE FROM {} WHERE msg_from = ? AND dst_ip = ? AND prefixLen = ?", table ), }; @@ -165,14 +165,14 @@ pub async fn delete_from_routermgr( DbConnection::Pg(pg_conn) => { diesel::sql_query(delete_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(pg_conn)? } DbConnection::Sqlite(sqlite_conn) => { diesel::sql_query(delete_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(sqlite_conn)? } diff --git a/src/db/insert.rs b/src/db/insert.rs index 32af52c..ca70d38 100644 --- a/src/db/insert.rs +++ b/src/db/insert.rs @@ -401,7 +401,7 @@ pub async fn insert_security( fn insert_routermgr_in_transaction_pg( transaction: &mut PgConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -412,7 +412,7 @@ fn insert_routermgr_in_transaction_pg( for table_name in table_array { // 使用参数绑定构建插入查询 let insert_users_query = format!( - "INSERT INTO {} (msg_from, prefix, prefixLen, nextHop, intfID) + "INSERT INTO {} (msg_from, dst_ip, prefixLen, nextHop, intfID) VALUES ($1, $2, $3, $4, $5);", table_name ); @@ -420,7 +420,7 @@ fn insert_routermgr_in_transaction_pg( // 执行插入操作 sql_query(insert_users_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .bind::(next_hop) .bind::(intf_id) @@ -433,7 +433,7 @@ fn insert_routermgr_in_transaction_pg( fn insert_routermgr_in_transaction_sqlite( transaction: &mut SqliteConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -442,14 +442,14 @@ fn insert_routermgr_in_transaction_sqlite( for table_name in table_array { let insert_users_query = format!( - "INSERT OR IGNORE INTO {} (msg_from, prefix, prefixLen, nextHop, intfID) + "INSERT OR IGNORE INTO {} (msg_from, dst_ip, prefixLen, nextHop, intfID) VALUES (?, ?, ?, ?, ?);", table_name ); sql_query(insert_users_query) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .bind::(next_hop) .bind::(intf_id) @@ -462,7 +462,7 @@ fn insert_routermgr_in_transaction_sqlite( pub async fn insert_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -472,7 +472,7 @@ pub async fn insert_routermgr( insert_routermgr_in_transaction_pg( transaction, msg_from, - prefix, + dst_ip, prefix_len, next_hop, intf_id, @@ -482,7 +482,7 @@ pub async fn insert_routermgr( insert_routermgr_in_transaction_sqlite( transaction, msg_from, - prefix, + dst_ip, prefix_len, next_hop, intf_id, diff --git a/src/db/update.rs b/src/db/update.rs index 88b377a..5583eb4 100644 --- a/src/db/update.rs +++ b/src/db/update.rs @@ -198,7 +198,7 @@ pub async fn update_username_in_users_by_id( pub async fn update_routermgr( conn: &mut DbConnection, msg_from: &str, - prefix: &str, + dst_ip: &str, prefix_len: i32, next_hop: &str, intf_id: i32, @@ -211,11 +211,11 @@ pub async fn update_routermgr( for &table in &tables { let update_query = match conn { DbConnection::Pg(_) => format!( - "UPDATE {} SET nextHop = $1, intfID = $2 WHERE msg_from = $3 AND prefix = $4 AND prefixLen = $5", + "UPDATE {} SET nextHop = $1, intfID = $2 WHERE msg_from = $3 AND dst_ip = $4 AND prefixLen = $5", table ), DbConnection::Sqlite(_) => format!( - "UPDATE {} SET nextHop = ?, intfID = ? WHERE msg_from = ? AND prefix = ? AND prefixLen = ?", + "UPDATE {} SET nextHop = ?, intfID = ? WHERE msg_from = ? AND dst_ip = ? AND prefixLen = ?", table ), }; @@ -227,7 +227,7 @@ pub async fn update_routermgr( .bind::(next_hop) .bind::(intf_id) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(pg_conn)? @@ -237,7 +237,7 @@ pub async fn update_routermgr( .bind::(next_hop) .bind::(intf_id) .bind::(msg_from) - .bind::(prefix) + .bind::(dst_ip) .bind::(prefix_len) .execute(sqlite_conn)? } diff --git a/src/schema.rs b/src/schema.rs index d17c6c6..59b6eff 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -136,9 +136,9 @@ diesel::table! { } diesel::table! { - routermgr (msg_from, prefix, prefixLen) { + routermgr (msg_from, dst_ip, prefixLen) { msg_from -> Text, - prefix -> Text, + dst_ip -> Text, prefixLen -> Integer, nextHop -> Text, intfID -> Integer, @@ -146,9 +146,9 @@ diesel::table! { } diesel::table! { - routermgr_replica1 (msg_from, prefix, prefixLen) { + routermgr_replica1 (msg_from, dst_ip, prefixLen) { msg_from -> Text, - prefix -> Text, + dst_ip -> Text, prefixLen -> Integer, nextHop -> Text, intfID -> Integer, @@ -156,9 +156,9 @@ diesel::table! { } diesel::table! { - routermgr_replica2 (msg_from, prefix, prefixLen) { + routermgr_replica2 (msg_from, dst_ip, prefixLen) { msg_from -> Text, - prefix -> Text, + dst_ip -> Text, prefixLen -> Integer, nextHop -> Text, intfID -> Integer, -- Gitee From 030644776fd1f7045238f189005a65891aa1aae8 Mon Sep 17 00:00:00 2001 From: lulingliuwang Date: Mon, 26 May 2025 16:49:57 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BB=8Eroutermgr=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E8=B7=AF=E7=94=B1=E7=8A=B6=E6=80=81=E4=BF=A1=E6=81=AF=E5=90=8E?= =?UTF-8?q?=E8=B7=AF=E7=94=B1=E6=8B=93=E6=89=91=E7=94=9F=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/mod.rs | 1 + src/cores/handlers/router_topo.rs | 44 ++++++++++ src/cores/router.rs | 12 +++ src/cores/servers/actix_web/mod.rs | 21 +++++ src/db/get.rs | 136 +++++++++++++++++++++++++++++ 5 files changed, 214 insertions(+) diff --git a/src/cores/handlers/mod.rs b/src/cores/handlers/mod.rs index 6bc76cb..394b1bd 100644 --- a/src/cores/handlers/mod.rs +++ b/src/cores/handlers/mod.rs @@ -4,6 +4,7 @@ pub mod consensus; pub mod datamgr; pub mod network_status; pub mod router_mgr; +pub mod router_topo; pub mod security; pub mod test; pub mod users; diff --git a/src/cores/handlers/router_topo.rs b/src/cores/handlers/router_topo.rs index e69de29..4bff97c 100644 --- a/src/cores/handlers/router_topo.rs +++ b/src/cores/handlers/router_topo.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; +use std::collections::HashMap; +use crate::cores::state::AppState; +use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; +use serde_json::Value; +// 导入路由结构体 +use crate::db::get::{query_all_routermgr, RouterMgrInfoResult}; + + +// 构建拓扑:源 -> (目的, 出端口) +fn build_topology(entries: &Vec) -> HashMap> { + let mut topo = HashMap::new(); + for entry in entries { + topo.entry(entry.msg_from().to_string()) + .or_insert_with(Vec::new) + .push((entry.next_hop().to_string(), entry.intf_id())); + } + topo +} + +// 请求处理:生成拓扑 +pub async fn generate_topology_handler( + app_state: Arc, + _server_request: ServerRequest, +) -> ServerResult { + // 从数据库连接池中获取连接 + let db_conn = app_state.db_pool.get_connection(); + if let Err(e) = db_conn { + log::error!("error getting db conn: {}", e); + return Err(ServerError::internal_error("DB pool error")); + } + let mut db_conn = db_conn.unwrap(); + + // 查询路由管理数据 + let entries = query_all_routermgr(&mut db_conn) + .await + .map_err(|e| ServerError::internal_error(&format!("Query error: {}", e)))?; + + // 直接使用查询结果构建拓扑 + let topology = build_topology(&entries); + + // 返回拓扑结构 + Ok(serde_json::to_value(topology)?) +} \ No newline at end of file diff --git a/src/cores/router.rs b/src/cores/router.rs index 2f9d26d..b1e6137 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -42,6 +42,9 @@ pub enum RouterKey { // router_mgr key RouterMgrAction, + // router topo key + GenerateTopology, + // Data mgr router key DataMgrUploadData, DataMgrQueryData, @@ -280,6 +283,15 @@ impl Router { ); } + // router_topo routers + { + add_route!( + router, + RouterKey::GenerateTopology, + handlers::router_topo::generate_topology_handler + ); + } + // data mgr routers { add_route!( diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 03e5ec4..c1440d9 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -99,6 +99,7 @@ macro_rules! init_app { .configure(users::configure_routes) .configure(security::configure_routes) .configure(router_mgr::configure_routes) + .configure(router_topo::configure_routes) .configure(consensus::configure_routes) .configure(network_status::configure_routes) .with_json_spec_at("/api/spec/v2") @@ -675,6 +676,26 @@ pub mod router_mgr { ); } +pub mod router_topo { + use super::*; + + pub fn configure_routes(app: &mut web::ServiceConfig) { + // 配置拓扑生成接口 + app.service(web::scope("/topology").service(generate_topology)); + } + + define_json_response_method!( + body_required + generate_topology, + post, + "/generate", + (), + (), + RouterKey::GenerateTopology, + "RouterTopology" + ); +} + pub mod datamgr { use super::*; use paperclip::actix::Apiv2Schema; diff --git a/src/db/get.rs b/src/db/get.rs index 71ef55f..f810e13 100644 --- a/src/db/get.rs +++ b/src/db/get.rs @@ -930,4 +930,140 @@ fn get_data_from_security_primary_by_image_id( .map(|res| res.map(|data_result| data_result)) } } +} + + +use serde::Deserialize; +use diesel::sql_types::Integer; +#[derive(Debug, QueryableByName, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)] +pub struct RouterMgrInfoResult { + #[diesel(sql_type = Text)] + msg_from: String, + #[diesel(sql_type = Text)] + dst_ip: String, + #[diesel(sql_type = Integer)] + #[serde(rename = "prefixLen")] + prefix_len: i32, + #[diesel(sql_type = Text)] + #[serde(rename = "nextHop")] + next_hop: String, + #[diesel(sql_type = Integer)] + #[serde(rename = "intfID")] + intf_id: i32, + // #[diesel(sql_type = Integer)] + // #[serde(rename = "OpCode")] + // op_code: i32 +} + + + +impl RouterMgrInfoResult { + pub fn msg_from(&self) -> &str { + &self.msg_from + } + + pub fn dst_ip(&self) -> &str { + &self.dst_ip + } + + pub fn prefix_len(&self) -> i32 { + self.prefix_len + } + + pub fn next_hop(&self) -> &str { + &self.next_hop + } + + pub fn intf_id(&self) -> i32 { + self.intf_id + } + + // pub fn op_code(&self) -> i32 { + // self.op_code + // } +} + + +pub async fn query_all_routermgr( + conn: &mut DbConnection, +) -> Result, diesel::result::Error> { + use diesel::prelude::*; + // use diesel::sql_types::{Integer, Text}; + use std::collections::HashMap; + + // 定义要查询的表 + let tables = ["routermgr", "routermgr_replica1", "routermgr_replica2"]; + + // 存储每个表的查询结果 + let mut table_results: HashMap<&str, Vec> = HashMap::new(); + + for &table in &tables { + // 构建 SQL 查询语句 + // let select_query = format!( + // "SELECT msg_from, prefix, prefixLen, nextHop, intfID FROM {}", + // table + // ); + // let select_query = format!( + // "SELECT * FROM {}", + // table + // ); + + // let select_query = format!( + // r#" + // SELECT + // msg_from, + // prefix, + // "prefixLen" AS prefix_len, + // "nextHop" AS next_hop, + // "intfID" AS intf_id, + // "OpCode" AS op_code + // FROM {} + // "#, + // table + // ); + + let select_query = format!( + r#" + SELECT + msg_from, + dst_ip, + "prefixLen" AS prefix_len, + "nextHop" AS next_hop, + "intfID" AS intf_id + FROM {} + "#, + table + ); + + // 执行查询 + let results: Vec = match conn { + DbConnection::Pg(pg_conn) => { + diesel::sql_query(&select_query) + .load::(pg_conn)? + } + DbConnection::Sqlite(sqlite_conn) => { + diesel::sql_query(&select_query) + .load::(sqlite_conn)? + } + }; + + table_results.insert(table, results); + } + + // 统计每条记录在三个表中的出现次数 + let mut record_counts: HashMap = HashMap::new(); + for records in table_results.values() { + for record in records { + *record_counts.entry(record.clone()).or_insert(0) += 1; + } + } + + // 筛选出在至少两个表中出现的记录 + let filtered_results: Vec = record_counts + .into_iter() + .filter(|&(_, count)| count >= 2) + .map(|(record, _)| record) + .collect(); + + Ok(filtered_results) } \ No newline at end of file -- Gitee