From 47bda5c1f077866e04217ee7286858d1a56a08cb Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Mon, 2 Dec 2024 08:56:00 +0000 Subject: [PATCH 01/10] =?UTF-8?q?1.kine=E5=92=8Cmetadata=E5=9D=87=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E4=B8=89=E4=BB=BD=E5=89=AF=E6=9C=AC=202.=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96=E6=93=8D=E4=BD=9C=E8=A6=86=E7=9B=96=E6=89=80?= =?UTF-8?q?=E6=9C=89=E5=89=AF=E6=9C=AC=EF=BC=8C=E5=B9=B6=E4=B8=94=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=8E=9F=E5=AD=90=E5=8C=96=E6=93=8D=E4=BD=9C=EF=BC=8C?= =?UTF-8?q?=E4=BB=BB=E6=84=8F=E8=A1=A8=E6=8F=92=E5=85=A5=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E5=88=99=E8=BF=9B=E8=A1=8C=E5=9B=9E=E6=BB=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../down.sql | 6 +- .../up.sql | 42 +++++ src/cores/db.rs | 146 ++++++++++++++---- src/schema.rs | 50 ++++++ 4 files changed, 217 insertions(+), 27 deletions(-) diff --git a/migrations/2024-11-03-082237_create_metadata_table/down.sql b/migrations/2024-11-03-082237_create_metadata_table/down.sql index 55d1278..d1214a6 100644 --- a/migrations/2024-11-03-082237_create_metadata_table/down.sql +++ b/migrations/2024-11-03-082237_create_metadata_table/down.sql @@ -1,3 +1,7 @@ -- This file should undo anything in `up.sql` DROP TABLE IF EXISTS metadata; -DROP TABLE IF EXISTS kine; \ No newline at end of file +DROP TABLE IF EXISTS metadata_replica1; +DROP TABLE IF EXISTS metadata_replica2; +DROP TABLE IF EXISTS kine; +DROP TABLE IF EXISTS kine_replica1; +DROP TABLE IF EXISTS kine_replica2; \ No newline at end of file diff --git a/migrations/2024-11-03-082237_create_metadata_table/up.sql b/migrations/2024-11-03-082237_create_metadata_table/up.sql index 5515cb3..5ae8177 100644 --- a/migrations/2024-11-03-082237_create_metadata_table/up.sql +++ b/migrations/2024-11-03-082237_create_metadata_table/up.sql @@ -9,7 +9,49 @@ CREATE TABLE IF NOT EXISTS metadata ( PRIMARY KEY (name, namespace, apigroup) ); +CREATE TABLE IF NOT EXISTS metadata_replica1 ( + name VARCHAR(256), + namespace BOOLEAN, + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (name, namespace, apigroup) + ); + +CREATE TABLE IF NOT EXISTS metadata_replica2 ( + name VARCHAR(256), + namespace BOOLEAN, + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (name, namespace, apigroup) + ); + CREATE TABLE IF NOT EXISTS kine ( + kind VARCHAR(256), + name VARCHAR(256), + namespace VARCHAR(256), + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (kind, name, namespace, apigroup) + ); + +CREATE TABLE IF NOT EXISTS kine_replica1 ( + kind VARCHAR(256), + name VARCHAR(256), + namespace VARCHAR(256), + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (kind, name, namespace, apigroup) + ); + +CREATE TABLE IF NOT EXISTS kine_replica2 ( kind VARCHAR(256), name VARCHAR(256), namespace VARCHAR(256), diff --git a/src/cores/db.rs b/src/cores/db.rs index 1a888dd..6e9fd6a 100644 --- a/src/cores/db.rs +++ b/src/cores/db.rs @@ -6,6 +6,7 @@ use diesel::sqlite::SqliteConnection; use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use serde_json::{json, Value}; +use diesel::connection::Connection; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); @@ -97,52 +98,145 @@ fn resource_templates() -> HashMap<&'static str, (bool, Value)> { templates } +fn initialize_metadata_in_transaction_pg( + transaction: &mut PgConnection, + templates: &HashMap<&str, (bool, Value)>, +) -> QueryResult<()> { + for (resource_name, (supports_namespace, template)) in templates.iter() { + let apigroup = template + .get("apiVersion") + .and_then(Value::as_str) + .unwrap_or("v1"); -impl DbPool { - // 内存数据库,用于编写单元测试函数 - pub fn new_in_memory() -> Result { - let manager = ConnectionManager::::new("file::memory:?cache=shared"); - let pool = Pool::builder().build(manager)?; - let mut conn = pool.get()?; - conn.run_pending_migrations(MIGRATIONS).unwrap(); - Self::initialize_metadata(&mut DbPool::Sqlite(pool.clone()).get_connection()?).unwrap(); - Ok(DbPool::Sqlite(pool)) - } + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}, '{}', '{}', '{}', '{}') + ON CONFLICT DO NOTHING;", + table_name, + resource_name, + supports_namespace, + apigroup, + template.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); - // 初始化所有资源的 metadata 和表结构 - pub fn initialize_metadata(conn: &mut DbConnection) -> QueryResult<()> { - let templates = resource_templates(); + sql_query(insert_metadata_query).execute(transaction)?; + } + } + Ok(()) +} - for (table_name, (supports_namespace, template)) in templates.iter() { +fn initialize_metadata_in_transaction_sqlite( + transaction: &mut SqliteConnection, + templates: &HashMap<&str, (bool, Value)>, +) -> QueryResult<()> { + for (resource_name, (supports_namespace, template)) in templates.iter() { + let apigroup = template + .get("apiVersion") + .and_then(Value::as_str) + .unwrap_or("v1"); - // 从模板中提取 `apiVersion` 字段作为 `apigroup` - let apigroup = template.get("apiVersion") - .and_then(Value::as_str) - .unwrap_or("v1"); + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + for table_name in table_array { let insert_metadata_query = format!( - "INSERT OR IGNORE INTO metadata (name, namespace, apigroup, data, created_time, updated_time) + "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) VALUES ('{}', {}, '{}', '{}', '{}', '{}');", table_name, - *supports_namespace, + resource_name, + supports_namespace, apigroup, template.to_string(), Utc::now().naive_utc().to_string(), Utc::now().naive_utc().to_string() ); - match conn { - DbConnection::Pg(pg_conn) => { - sql_query(insert_metadata_query).execute(pg_conn)?; - }, - DbConnection::Sqlite(sqlite_conn) => { - sql_query(insert_metadata_query).execute(sqlite_conn)?; + sql_query(insert_metadata_query).execute(transaction)?; + } + } + Ok(()) +} + + +impl DbPool { + + // 初始化所有资源的 metadata 表 + pub fn initialize_metadata(conn: &mut DbConnection) -> QueryResult<()> { + let templates = resource_templates(); + + match conn { + DbConnection::Pg(pg_conn) => { + pg_conn.transaction(|transaction| { + initialize_metadata_in_transaction_pg(transaction, &templates) + }) + } + DbConnection::Sqlite(sqlite_conn) => { + sqlite_conn.transaction(|transaction| { + initialize_metadata_in_transaction_sqlite(transaction, &templates) + }) + } + } + } + + + +/* pub fn initialize_metadata(conn: &mut DbConnection) -> QueryResult<()> { + let templates = resource_templates(); + + for (resource_name, (supports_namespace, template)) in templates.iter() { + + // 从模板中提取 `apiVersion` 字段作为 `apigroup` + let apigroup = template.get("apiVersion") + .and_then(Value::as_str) + .unwrap_or("v1"); + + let table_array: [String; 3] = [ + String::from("metadata"), + String::from("metadata_replica1"), + String::from("metadata_replica2"), + ]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}, '{}', '{}', '{}', '{}');", + table_name, + resource_name, + *supports_namespace, + apigroup, + template.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); + + match conn { + DbConnection::Pg(pg_conn) => { + sql_query(insert_metadata_query).execute(pg_conn)?; + }, + DbConnection::Sqlite(sqlite_conn) => { + sql_query(insert_metadata_query).execute(sqlite_conn)?; + } } } + } Ok(()) + }*/ + + // 内存数据库,用于编写单元测试函数 + pub fn new_in_memory() -> Result { + let manager = ConnectionManager::::new("file::memory:?cache=shared"); + let pool = Pool::builder().build(manager)?; + let mut conn = pool.get()?; + conn.run_pending_migrations(MIGRATIONS).unwrap(); + Self::initialize_metadata(&mut DbPool::Sqlite(pool.clone()).get_connection()?).unwrap(); + Ok(DbPool::Sqlite(pool)) } + pub fn get_connection(&self) -> Result { match self { DbPool::Pg(pool) => pool.get().map(DbConnection::Pg), diff --git a/src/schema.rs b/src/schema.rs index 687d79b..045e81a 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -12,6 +12,30 @@ diesel::table! { } } +diesel::table! { + kine_replica1 (kind, name, namespace, apigroup) { + kind -> Nullable, + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + +diesel::table! { + kine_replica2 (kind, name, namespace, apigroup) { + kind -> Nullable, + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + diesel::table! { metadata (name, namespace, apigroup) { name -> Nullable, @@ -23,7 +47,33 @@ diesel::table! { } } +diesel::table! { + metadata_replica1 (name, namespace, apigroup) { + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + +diesel::table! { + metadata_replica2 (name, namespace, apigroup) { + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + diesel::allow_tables_to_appear_in_same_query!( kine, + kine_replica1, + kine_replica2, metadata, + metadata_replica1, + metadata_replica2, ); -- Gitee From 7ec29462781d0f22ab3d6bb8cac07331f8944cac Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 03:29:12 +0000 Subject: [PATCH 02/10] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=83=A8=E5=88=86?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E5=87=BD=E6=95=B0=E5=8F=8A=E6=8F=92=E5=85=A5?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E4=BB=A5=E9=80=82=E9=85=8D=E4=B8=89=E8=A1=A8?= =?UTF-8?q?=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 348 ++++++++++++++++++++++++++++++++---------- 1 file changed, 264 insertions(+), 84 deletions(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index a3ec741..5ddf6d8 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -5,7 +5,7 @@ use std::{time}; use async_trait::async_trait; use actix_web::{HttpResponse, Result, web, Error}; use chrono::Utc; -use diesel::{QueryResult, RunQueryDsl, QueryDsl, ExpressionMethods, OptionalExtension, QueryableByName}; +use diesel::{QueryResult, RunQueryDsl, QueryDsl, ExpressionMethods, OptionalExtension, QueryableByName, PgConnection, sql_query, SqliteConnection, Connection}; use serde_json::{json}; use serde_json::Value; use crate::cores::db::DbConnection; @@ -19,6 +19,7 @@ use feventbus::traits::producer::Producer; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use actix_web::web::Bytes; +use diesel::expression::BoxableExpression; // 定义全局哈希表来获取model名 static GLOBAL_HASHMAP: LazyLock>> = LazyLock::new(|| { @@ -303,7 +304,6 @@ impl DefaultHandler { } - // 查询 metadata 表中的 plural 是否存在,并检查 namespace 要求是否满足 async fn check_metadata( conn: &mut DbConnection, @@ -312,31 +312,65 @@ async fn check_metadata( requires_namespace: bool, ) -> QueryResult { use diesel::dsl::count_star; - use crate::schema::metadata::dsl::*; + use crate::schema::metadata::dsl as metadata_dsl; + use crate::schema::metadata_replica1::dsl as replica1_dsl; + use crate::schema::metadata_replica2::dsl as replica2_dsl; let count; + let count_replica1; + let count_replica2; match conn { DbConnection::Pg(pg_conn) => { - count = metadata - .filter(name.eq(plural)) - .filter(apigroup.eq(version)) - .filter(namespace.eq(requires_namespace)) + count = metadata_dsl::metadata + .filter(metadata_dsl::name.eq(plural)) + .filter(metadata_dsl::apigroup.eq(version)) + .filter(metadata_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica1 = replica1_dsl::metadata_replica1 + .filter(replica1_dsl::name.eq(plural)) + .filter(replica1_dsl::apigroup.eq(version)) + .filter(replica1_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica2 = replica2_dsl::metadata_replica2 + .filter(replica2_dsl::name.eq(plural)) + .filter(replica2_dsl::apigroup.eq(version)) + .filter(replica2_dsl::namespace.eq(requires_namespace)) .select(count_star()) .first::(pg_conn)?; }, DbConnection::Sqlite(sqlite_conn) => { - count = metadata - .filter(name.eq(plural)) - .filter(apigroup.eq(version)) - .filter(namespace.eq(requires_namespace)) + count = metadata_dsl::metadata + .filter(metadata_dsl::name.eq(plural)) + .filter(metadata_dsl::apigroup.eq(version)) + .filter(metadata_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica1 = replica1_dsl::metadata_replica1 + .filter(replica1_dsl::name.eq(plural)) + .filter(replica1_dsl::apigroup.eq(version)) + .filter(replica1_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica2 = replica2_dsl::metadata_replica2 + .filter(replica2_dsl::name.eq(plural)) + .filter(replica2_dsl::apigroup.eq(version)) + .filter(replica2_dsl::namespace.eq(requires_namespace)) .select(count_star()) .first::(sqlite_conn)?; } } - Ok(count > 0) + let positive_count = [count, count_replica1, count_replica2].iter().filter(|&&x| x > 0).count(); + Ok(positive_count >= 2) } + // 查询 kine 表中指定的数据是否存在 async fn check_kine( conn: &mut DbConnection, @@ -346,52 +380,170 @@ async fn check_kine( item_namespace: Option<&str>, ) -> QueryResult { use diesel::dsl::count_star; - use crate::schema::kine::dsl::*; + use crate::schema::kine::dsl as kine_dsl; + use crate::schema::kine_replica1::dsl as replica1_dsl; + use crate::schema::kine_replica2::dsl as replica2_dsl; let count; + let count_replica1; + let count_replica2; match conn { DbConnection::Pg(pg_conn) => { if let Some(_) = item_namespace { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) - .filter(namespace.eq(item_namespace)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .filter(kine_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .filter(replica1_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) + .filter(replica2_dsl::namespace.eq(item_namespace)) .select(count_star()) .first::(pg_conn)?; } else { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) .select(count_star()) .first::(pg_conn)?; } }, DbConnection::Sqlite(sqlite_conn) => { if let Some(_) = item_namespace { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) - .filter(namespace.eq(item_namespace)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .filter(kine_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .filter(replica1_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) + .filter(replica2_dsl::namespace.eq(item_namespace)) .select(count_star()) .first::(sqlite_conn)?; } else { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) .select(count_star()) .first::(sqlite_conn)?; } } } - Ok(count > 0) + let positive_count = [count, count_replica1, count_replica2].iter().filter(|&&x| x > 0).count(); + Ok(positive_count >= 2) } +fn insert_metadata_in_transaction_pg( + transaction: &mut PgConnection, + plural: &str, + version: &str, + namespace_required: bool, + json_data: &Value +) -> QueryResult<()> { + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}, '{}', '{}', '{}', '{}') + ON CONFLICT DO NOTHING;", + table_name, + plural, + namespace_required, + version, + json_data.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); + + sql_query(insert_metadata_query).execute(transaction)?; + } + Ok(()) +} + +fn insert_metadata_in_transaction_sqlite( + transaction: &mut SqliteConnection, + plural: &str, + version: &str, + namespace_required: bool, + json_data: &Value +) -> QueryResult<()> { + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}, '{}', '{}', '{}', '{}');", + table_name, + plural, + namespace_required, + version, + json_data.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); + + sql_query(insert_metadata_query).execute(transaction)?; + } + Ok(()) +} -// 在 metadata 表中插入新记录 async fn insert_metadata( conn: &mut DbConnection, plural: &str, @@ -399,40 +551,82 @@ async fn insert_metadata( namespace_required: bool, json_data: &Value ) -> QueryResult<()> { - use crate::schema::metadata::dsl::*; match conn { DbConnection::Pg(pg_conn) => { - diesel::insert_into(metadata) - .values(( - name.eq(plural), - apigroup.eq(version), - namespace.eq(namespace_required), - data.eq(json_data.to_string()), - created_time.eq(Utc::now().naive_utc().to_string()), - updated_time.eq(Utc::now().naive_utc().to_string()), - )) - .execute(pg_conn)?; - }, + pg_conn.transaction(|transaction| { + insert_metadata_in_transaction_pg(transaction, plural, version, namespace_required, json_data) + }) + } DbConnection::Sqlite(sqlite_conn) => { - diesel::insert_into(metadata) - .values(( - name.eq(plural), - apigroup.eq(version), - namespace.eq(namespace_required), - data.eq(json_data.to_string()), - created_time.eq(Utc::now().naive_utc().to_string()), - updated_time.eq(Utc::now().naive_utc().to_string()), - )) - .execute(sqlite_conn)?; + sqlite_conn.transaction(|transaction| { + insert_metadata_in_transaction_sqlite(transaction, plural, version, namespace_required, json_data) + }) } + }.expect("unknow conn in insert_metadata"); + Ok(()) +} + +fn insert_kine_in_transaction_pg( + transaction: &mut PgConnection, + item_kind: &str, + item_name: &str, + json_data: &Value, + version: &str, + namespace: Option<&str>, +) -> QueryResult<()> { + let table_array: [&str; 3] = ["kine", "kine_replica1", "kine_replica2"]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', '{}', {}, '{}', '{}', '{}', '{}') + ON CONFLICT DO NOTHING;", + table_name, + item_kind, + item_name, + namespace.unwrap_or(""), + version, + json_data.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); + + sql_query(insert_metadata_query).execute(transaction)?; } Ok(()) } +fn insert_kine_in_transaction_sqlite( + transaction: &mut SqliteConnection, + item_kind: &str, + item_name: &str, + json_data: &Value, + version: &str, + namespace: Option<&str>, +) -> QueryResult<()> { + let table_array: [&str; 3] = ["kine", "kine_replica1", "kine_replica2"]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT OR IGNORE INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}', {}, '{}', '{}', '{}', '{}');", + table_name, + item_kind, + item_name, + namespace.unwrap_or(""), + version, + json_data.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); + sql_query(insert_metadata_query).execute(transaction)?; + } + Ok(()) +} -// 插入数据到指定的表 -async fn insert_into_table( +// 在 kine 表中插入新记录 +async fn insert_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, @@ -442,34 +636,20 @@ async fn insert_into_table( ) -> QueryResult<()> { match conn { DbConnection::Pg(pg_conn) => { - let insert_query = "INSERT INTO kine (kind, name, namespace, apigroup, data, created_time, updated_time) VALUES ($1, $2, $3, $4, $5, $6, $7)".to_string(); - diesel::sql_query(insert_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace.unwrap_or("")) - .bind::(version) - .bind::(json_data.to_string()) // 将 JSON 数据存储为 TEXT - .bind::(Utc::now().naive_utc().to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .execute(pg_conn)?; - }, + pg_conn.transaction(|transaction| { + insert_kine_in_transaction_pg(transaction, item_kind, item_name, json_data, version, namespace) + }) + } DbConnection::Sqlite(sqlite_conn) => { - let insert_query = "INSERT INTO kine (kind, name, namespace, apigroup, data, created_time, updated_time) VALUES (?, ?, ?, ?, ?, ?, ?)".to_string(); - diesel::sql_query(insert_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace.unwrap_or("")) - .bind::(version) - .bind::(json_data.to_string()) // 将 JSON 数据存储为 TEXT - .bind::(Utc::now().naive_utc().to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .execute(sqlite_conn)?; + sqlite_conn.transaction(|transaction| { + insert_kine_in_transaction_sqlite(transaction, item_kind, item_name, json_data, version, namespace) + }) } - } + }.expect("unknow conn in insert_kine"); Ok(()) } -// 从 plural 表中删除特定 name 的记录 +// 从 kine 表中删除特定 name 的记录 async fn delete_from_table( conn: &mut DbConnection, item_kind: &str, @@ -896,7 +1076,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &version, None) + insert_kine(db_connection, &plural, item_name, &data, &version, None) .await .map_err(ErrorInternalServerError)?; } @@ -989,7 +1169,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &version, Some(&namespace)) + insert_kine(db_connection, &plural, item_name, &data, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; } @@ -1083,7 +1263,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &ver, None) + insert_kine(db_connection, &plural, item_name, &data, &ver, None) .await .map_err(ErrorInternalServerError)?; } @@ -1178,7 +1358,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &ver, Some(&namespace)) + insert_kine(db_connection, &plural, item_name, &data, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; } -- Gitee From 68b3976ed565fb4217c3f517b2d07eb488b34de9 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 14:30:48 +0800 Subject: [PATCH 03/10] =?UTF-8?q?delete=20=E5=92=8C=20update=20=E9=80=82?= =?UTF-8?q?=E9=85=8D=E4=B8=89=E8=A1=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 270 +++++++++++++++++++++++++----------------- 1 file changed, 160 insertions(+), 110 deletions(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index 5ddf6d8..8f28901 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -19,7 +19,6 @@ use feventbus::traits::producer::Producer; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use actix_web::web::Bytes; -use diesel::expression::BoxableExpression; // 定义全局哈希表来获取model名 static GLOBAL_HASHMAP: LazyLock>> = LazyLock::new(|| { @@ -650,66 +649,92 @@ async fn insert_kine( } // 从 kine 表中删除特定 name 的记录 -async fn delete_from_table( +async fn delete_from_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, item_version: &str, item_namespace: Option<&str>, ) -> QueryResult { - // 根据是否存在 namespace 动态构建删除查询 - let delete_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "DELETE FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), - DbConnection::Sqlite(_) => "DELETE FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "DELETE FROM kine WHERE kind = $1 AND name = $2 AND apigroup = $3".to_string(), - DbConnection::Sqlite(_) => "DELETE FROM kine WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), - } - }; + use diesel::sql_types::Text; + + // 表名列表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + + // 遍历每个表,执行删除操作 + let mut total_rows_affected = 0; + + for &table in &tables { + let delete_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "DELETE FROM {} WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4", + table + ), + DbConnection::Sqlite(_) => format!( + "DELETE FROM {} WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "DELETE FROM {} WHERE kind = $1 AND name = $2 AND apigroup = $3", + table + ), + DbConnection::Sqlite(_) => format!( + "DELETE FROM {} WHERE kind = ? AND name = ? AND apigroup = ?", + table + ), + } + }; - // 执行删除操作 - let rows_affected = match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(pg_conn)? - } else { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(pg_conn)? + // 执行删除 + let rows_affected = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(pg_conn)? + } else { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(pg_conn)? + } } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(sqlite_conn)? - } else { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(sqlite_conn)? + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(sqlite_conn)? + } else { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(sqlite_conn)? + } } - } - }; - Ok(rows_affected > 0) + }; + + total_rows_affected += rows_affected; + } + + // 如果至少有两个表进行了删除,则返回 true + Ok(total_rows_affected > 1) } -async fn update_data_in_table( + +async fn update_data_in_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, @@ -717,65 +742,90 @@ async fn update_data_in_table( item_namespace: Option<&str>, json_data: &Value, ) -> QueryResult { + use diesel::sql_types::Text; + use chrono::Utc; + + // 需要更新的表列表 + let tables = ["kine", "kine_replica", "kine_replica2"]; + let mut total_rows_affected = 0; + + for &table in &tables { + let update_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "UPDATE {} SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND namespace = $5 AND apigroup = $6", + table + ), + DbConnection::Sqlite(_) => format!( + "UPDATE {} SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "UPDATE {} SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND apigroup = $5", + table + ), + DbConnection::Sqlite(_) => format!( + "UPDATE {} SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND apigroup = ?", + table + ), + } + }; - let update_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "UPDATE kine SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND namespace = $5 AND apigroup = $6".to_string(), - DbConnection::Sqlite(_) => "UPDATE kine SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "UPDATE kine SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND apigroup = $5".to_string(), - DbConnection::Sqlite(_) => "UPDATE kine SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), - } - }; - - let rows_affected = match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(pg_conn)? - } else { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(pg_conn)? + // 执行更新操作 + let rows_affected = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(pg_conn)? + } else { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(pg_conn)? + } } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(sqlite_conn)? - } else { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(sqlite_conn)? + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(sqlite_conn)? + } else { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(sqlite_conn)? + } } - } - }; + }; - Ok(rows_affected > 0) + total_rows_affected += rows_affected; + } + + // 如果至少有两个表更新成功,则返回 true + Ok(total_rows_affected > 1) } + // 辅助查询函数,用于获取数据的 `data` 字段 #[derive(QueryableByName)] struct DataResult { @@ -1423,7 +1473,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; // 从 plural 表中删除指定的 name - let deleted = delete_from_table(db_connection, &plural, &name, &version, None) + let deleted = delete_from_kine(db_connection, &plural, &name, &version, None) .await .map_err(ErrorInternalServerError)?; @@ -1489,7 +1539,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; // 从 plural 表中删除指定的数据 - let deleted = delete_from_table(db_connection, &plural, &name, &version, Some(&namespace)) + let deleted = delete_from_kine(db_connection, &plural, &name, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; @@ -1555,7 +1605,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Delete, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; - let deleted = delete_from_table(db_connection, &plural, &name, &ver, None) + let deleted = delete_from_kine(db_connection, &plural, &name, &ver, None) .await .map_err(ErrorInternalServerError)?; @@ -1621,7 +1671,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Delete, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; - let deleted = delete_from_table(db_connection, &plural, &name, &ver, Some(&namespace)) + let deleted = delete_from_kine(db_connection, &plural, &name, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; @@ -1690,7 +1740,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; // 查询 plural 表中是否存在 name 匹配的数据 - let updated = update_data_in_table(db_connection, &plural, &name, &version, None, &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &version, None, &data) .await .map_err(ErrorInternalServerError)?; @@ -1759,7 +1809,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Update, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; - let updated = update_data_in_table(db_connection, &plural, &name, &version, Some(&namespace), &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &version, Some(&namespace), &data) .await .map_err(ErrorInternalServerError)?; @@ -1828,7 +1878,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Update, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; - let updated = update_data_in_table(db_connection, &plural, &name, &ver, None, &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &ver, None, &data) .await .map_err(ErrorInternalServerError)?; @@ -1897,7 +1947,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Update, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; - let updated = update_data_in_table(db_connection, &plural, &name, &ver, Some(&namespace), &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &ver, Some(&namespace), &data) .await .map_err(ErrorInternalServerError)?; -- Gitee From d2105d0af8f7108815ca5c9125753ead09b1f8c4 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 14:48:22 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0get=20=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=20list=E7=BB=93=E6=9E=84=E6=94=AF=E6=8C=81=E4=B8=89?= =?UTF-8?q?=E8=A1=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 418 ++++++++++++++++++++++++++++++++---------- 1 file changed, 320 insertions(+), 98 deletions(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index 8f28901..2a161e3 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -833,126 +833,348 @@ struct DataResult { data: String, } -async fn get_data_from_table( + + +async fn get_data_from_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, item_version: &str, item_namespace: Option<&str>, ) -> QueryResult> { - let select_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), + use diesel::sql_types::Text; + + // 表名列表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + + // 存储每个表的查询结果 + let mut results = HashMap::new(); + + for &table in &tables { + let select_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND name = $2 AND apigroup = $3", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND name = ? AND apigroup = ?", + table + ), + } + }; + + let data_result = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(pg_conn) + .optional()? + .map(|res| res.data) + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(pg_conn) + .optional()? + .map(|res| res.data) + } + } + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional()? + .map(|res| res.data) + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional()? + .map(|res| res.data) + } + } + }; + + if let Some(data) = data_result { + *results.entry(data).or_insert(0) += 1; } + } + + // 按少数服从多数规则返回数据 + if results.len() == 1 { + // 如果三个表的结果一致,直接返回任意结果 + Ok(results.into_iter().next().map(|(data, _)| data)) + } else if let Some((data, _)) = results.into_iter().max_by_key(|&(_, count)| count) { + // 如果有多数一致的数据,返回该数据 + Ok(Some(data)) } else { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND apigroup = $3".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), - } - }; + // 如果三表数据均不同,返回 kine 表的数据 + let fallback_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), + DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), + } + } else { + match conn { + DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND apigroup = $3".to_string(), + DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), + } + }; - match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .get_result::(pg_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) - } else { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .get_result::(pg_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) + match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(pg_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) + } else { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(pg_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) + } } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .get_result::(sqlite_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) - } else { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .get_result::(sqlite_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) + } else { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) + } } } } } + // 辅助函数:从指定表中获取所有符合条件的数据的 `data` 字段 -async fn get_all_data_from_table( +async fn get_all_data_from_kine( conn: &mut DbConnection, item_kind: &str, item_version: &str, item_namespace: Option<&str>, ) -> QueryResult> { - let select_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND namespace = $2 AND apigroup = $3".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND apigroup = $2".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND apigroup = ?".to_string(), - } - }; + use diesel::sql_types::Text; + use std::collections::HashMap; - match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(namespace) - .bind::(item_version) - .load::(pg_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) - } else { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_version) - .load::(pg_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) + // 定义需要查询的表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + + // 用于统计所有表的查询结果 + let mut all_results: HashMap = HashMap::new(); + + // 遍历每个表进行查询 + for &table in &tables { + let select_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND namespace = $2 AND apigroup = $3", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND namespace = ? AND apigroup = ?", + table + ), } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(namespace) - .bind::(item_version) - .load::(sqlite_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) - } else { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_version) - .load::(sqlite_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) + } else { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND apigroup = $2", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND apigroup = ?", + table + ), } + }; + + // 执行查询 + let results: Vec = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(namespace) + .bind::(item_version) + .load::(pg_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_version) + .load::(pg_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } + } + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(namespace) + .bind::(item_version) + .load::(sqlite_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_version) + .load::(sqlite_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } + } + }; + + // 将结果添加到统计中 + for data in results { + *all_results.entry(data).or_insert(0) += 1; + } + } + + // 处理少数服从多数逻辑 + if all_results.len() == 1 { + // 如果所有表结果一致,直接返回 + Ok(all_results.into_iter().map(|(data, _)| data).collect()) + } else { + // 找出出现次数最多的结果 + let max_count = all_results + .iter() + .max_by_key(|&(_, count)| count) + .map(|(_, count)| *count); + + if let Some(max_count) = max_count { + let filtered_results: Vec = all_results + .into_iter() + .filter(|(_, count)| *count == max_count) + .map(|(data, _)| data) + .collect(); + Ok(filtered_results) + } else { + // 如果所有表数据都不一致,返回主表 `kine` 的数据 + let fallback_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => { + "SELECT data FROM kine WHERE kind = $1 AND namespace = $2 AND apigroup = $3" + .to_string() + } + DbConnection::Sqlite(_) => { + "SELECT data FROM kine WHERE kind = ? AND namespace = ? AND apigroup = ?" + .to_string() + } + } + } else { + match conn { + DbConnection::Pg(_) => { + "SELECT data FROM kine WHERE kind = $1 AND apigroup = $2".to_string() + } + DbConnection::Sqlite(_) => { + "SELECT data FROM kine WHERE kind = ? AND apigroup = ?".to_string() + } + } + }; + + let fallback_results = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(namespace) + .bind::(item_version) + .load::(pg_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } else { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_version) + .load::(pg_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } + } + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(namespace) + .bind::(item_version) + .load::(sqlite_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } else { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_version) + .load::(sqlite_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } + } + }; + + Ok(fallback_results) } } } + // 发送请求并等待响应 async fn send_request( message: Message, @@ -1975,7 +2197,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &version, None) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &version, None) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -2004,7 +2226,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &version, Some(&namespace)) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -2035,7 +2257,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &ver, None) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &ver, None) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -2066,7 +2288,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &ver, Some(&namespace)) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -2095,7 +2317,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &version, None) + let data_list = get_all_data_from_kine(db_connection, &plural, &version, None) .await .map_err(ErrorInternalServerError)?; @@ -2125,7 +2347,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &version, Some(&namespace)) + let data_list = get_all_data_from_kine(db_connection, &plural, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; @@ -2157,7 +2379,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &ver, None) + let data_list = get_all_data_from_kine(db_connection, &plural, &ver, None) .await .map_err(ErrorInternalServerError)?; @@ -2189,7 +2411,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &ver, Some(&namespace)) + let data_list = get_all_data_from_kine(db_connection, &plural, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; -- Gitee From 3b9a713b47089136536f3ce824be2e1175f34fc5 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 14:49:39 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dkine=E8=A1=A8=E6=8F=92?= =?UTF-8?q?=E5=85=A5=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index 2a161e3..17118fd 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -500,7 +500,7 @@ fn insert_metadata_in_transaction_pg( for table_name in table_array { let insert_metadata_query = format!( "INSERT INTO {} (name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', {}, '{}', '{}', '{}', '{}') + VALUES ('{}', '{}', '{}', '{}', '{}', '{}') ON CONFLICT DO NOTHING;", table_name, plural, @@ -528,7 +528,7 @@ fn insert_metadata_in_transaction_sqlite( for table_name in table_array { let insert_metadata_query = format!( "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', {}, '{}', '{}', '{}', '{}');", + VALUES ('{}', '{}', '{}', '{}', '{}', '{}');", table_name, plural, namespace_required, @@ -578,7 +578,7 @@ fn insert_kine_in_transaction_pg( for table_name in table_array { let insert_metadata_query = format!( "INSERT INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', '{}', {}, '{}', '{}', '{}', '{}') + VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}') ON CONFLICT DO NOTHING;", table_name, item_kind, @@ -608,7 +608,7 @@ fn insert_kine_in_transaction_sqlite( for table_name in table_array { let insert_metadata_query = format!( "INSERT OR IGNORE INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', {}', {}, '{}', '{}', '{}', '{}');", + VALUES ('{}', {}', '{}', '{}', '{}', '{}', '{}');", table_name, item_kind, item_name, -- Gitee From 3400a3e73a49ea45054dea98d09ec8c7133b3969 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 14:52:23 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dkine=E8=A1=A8=E6=8F=92?= =?UTF-8?q?=E5=85=A5=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index 17118fd..5a7733f 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -608,7 +608,7 @@ fn insert_kine_in_transaction_sqlite( for table_name in table_array { let insert_metadata_query = format!( "INSERT OR IGNORE INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', {}', '{}', '{}', '{}', '{}', '{}');", + VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}');", table_name, item_kind, item_name, -- Gitee From 98efacf10e4d5a02876de8b56246c92b85575580 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 15:05:50 +0800 Subject: [PATCH 07/10] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=8F=92=E5=85=A5?= =?UTF-8?q?=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 112 ++++++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 43 deletions(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index 5a7733f..b0daa02 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -493,56 +493,72 @@ fn insert_metadata_in_transaction_pg( plural: &str, version: &str, namespace_required: bool, - json_data: &Value + json_data: &Value, ) -> QueryResult<()> { + use diesel::sql_types::{Bool}; + + // 表名列表 let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; for table_name in table_array { + // 使用参数绑定构建插入查询 let insert_metadata_query = format!( "INSERT INTO {} (name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', '{}', '{}', '{}', '{}', '{}') - ON CONFLICT DO NOTHING;", - table_name, - plural, - namespace_required, - version, - json_data.to_string(), - Utc::now().naive_utc().to_string(), - Utc::now().naive_utc().to_string() + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT DO NOTHING;", + table_name ); - sql_query(insert_metadata_query).execute(transaction)?; + // 执行插入操作 + sql_query(insert_metadata_query) + .bind::(plural) // 名称 + .bind::(namespace_required) // 是否需要命名空间 + .bind::(version) // 版本 + .bind::(json_data.to_string()) // JSON 数据 + .bind::(Utc::now().naive_utc().to_string()) // 创建时间 + .bind::(Utc::now().naive_utc().to_string()) // 更新时间 + .execute(transaction)?; } + Ok(()) } + fn insert_metadata_in_transaction_sqlite( transaction: &mut SqliteConnection, plural: &str, version: &str, namespace_required: bool, - json_data: &Value + json_data: &Value, ) -> QueryResult<()> { + use diesel::sql_types::{Bool}; + + // 表名列表 let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; for table_name in table_array { + // 使用参数绑定构建插入查询 let insert_metadata_query = format!( "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', '{}', '{}', '{}', '{}', '{}');", - table_name, - plural, - namespace_required, - version, - json_data.to_string(), - Utc::now().naive_utc().to_string(), - Utc::now().naive_utc().to_string() + VALUES (?, ?, ?, ?, ?, ?);", + table_name ); - sql_query(insert_metadata_query).execute(transaction)?; + // 执行插入操作 + sql_query(insert_metadata_query) + .bind::(plural) // 名称 + .bind::(namespace_required) // 是否需要命名空间 + .bind::(version) // 版本 + .bind::(json_data.to_string()) // JSON 数据 + .bind::(Utc::now().naive_utc().to_string()) // 创建时间 + .bind::(Utc::now().naive_utc().to_string()) // 更新时间 + .execute(transaction)?; } + Ok(()) } + async fn insert_metadata( conn: &mut DbConnection, plural: &str, @@ -573,28 +589,35 @@ fn insert_kine_in_transaction_pg( version: &str, namespace: Option<&str>, ) -> QueryResult<()> { + + // 表列表 let table_array: [&str; 3] = ["kine", "kine_replica1", "kine_replica2"]; for table_name in table_array { + // 使用参数绑定构建插入查询 let insert_metadata_query = format!( "INSERT INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}') - ON CONFLICT DO NOTHING;", - table_name, - item_kind, - item_name, - namespace.unwrap_or(""), - version, - json_data.to_string(), - Utc::now().naive_utc().to_string(), - Utc::now().naive_utc().to_string() + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT DO NOTHING;", + table_name ); - sql_query(insert_metadata_query).execute(transaction)?; + // 执行插入操作 + sql_query(insert_metadata_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace.unwrap_or("")) // 空字符串作为默认 namespace + .bind::(version) + .bind::(json_data.to_string()) // 将 JSON 数据转换为字符串 + .bind::(Utc::now().naive_utc().to_string()) // 创建时间 + .bind::(Utc::now().naive_utc().to_string()) // 更新时间 + .execute(transaction)?; } + Ok(()) } + fn insert_kine_in_transaction_sqlite( transaction: &mut SqliteConnection, item_kind: &str, @@ -603,27 +626,30 @@ fn insert_kine_in_transaction_sqlite( version: &str, namespace: Option<&str>, ) -> QueryResult<()> { + let table_array: [&str; 3] = ["kine", "kine_replica1", "kine_replica2"]; for table_name in table_array { let insert_metadata_query = format!( "INSERT OR IGNORE INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) - VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}');", - table_name, - item_kind, - item_name, - namespace.unwrap_or(""), - version, - json_data.to_string(), - Utc::now().naive_utc().to_string(), - Utc::now().naive_utc().to_string() + VALUES (?, ?, ?, ?, ?, ?, ?);", + table_name ); - sql_query(insert_metadata_query).execute(transaction)?; + sql_query(insert_metadata_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace.unwrap_or("")) // 使用 Nullable 处理空值 + .bind::(version) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .execute(transaction)?; } Ok(()) } + // 在 kine 表中插入新记录 async fn insert_kine( conn: &mut DbConnection, @@ -746,7 +772,7 @@ async fn update_data_in_kine( use chrono::Utc; // 需要更新的表列表 - let tables = ["kine", "kine_replica", "kine_replica2"]; + let tables = ["kine", "kine_replica1", "kine_replica2"]; let mut total_rows_affected = 0; for &table in &tables { -- Gitee From c72f518263316e4c674c28d61dc990c29a2b5bda Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 3 Dec 2024 15:58:51 +0800 Subject: [PATCH 08/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=89=E8=A1=A8?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=97=B6=E5=B0=91=E6=95=B0=E6=9C=8D=E4=BB=8E?= =?UTF-8?q?=E5=A4=9A=E6=95=B0=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers.rs | 225 ++++++++++++++++-------------------------- 1 file changed, 83 insertions(+), 142 deletions(-) diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index b0daa02..4aca858 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -869,6 +869,7 @@ async fn get_data_from_kine( item_namespace: Option<&str>, ) -> QueryResult> { use diesel::sql_types::Text; + use std::collections::HashMap; // 表名列表 let tables = ["kine", "kine_replica1", "kine_replica2"]; @@ -953,63 +954,77 @@ async fn get_data_from_kine( if results.len() == 1 { // 如果三个表的结果一致,直接返回任意结果 Ok(results.into_iter().next().map(|(data, _)| data)) + } else if results.values().all(|&count| count == 1) { + // 如果所有表结果不同,直接回退到 kine 表的数据 + get_data_from_kine_primary(conn, item_kind, item_name, item_version, item_namespace) } else if let Some((data, _)) = results.into_iter().max_by_key(|&(_, count)| count) { // 如果有多数一致的数据,返回该数据 Ok(Some(data)) } else { - // 如果三表数据均不同,返回 kine 表的数据 - let fallback_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND apigroup = $3".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), - } - }; + // 默认回退到 kine 表的数据 + get_data_from_kine_primary(conn, item_kind, item_name, item_version, item_namespace) + } +} +/// 获取主表 `kine` 的数据 +fn get_data_from_kine_primary( + conn: &mut DbConnection, + item_kind: &str, + item_name: &str, + item_version: &str, + item_namespace: Option<&str>, +) -> QueryResult> { + let fallback_query = if let Some(_) = item_namespace { match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .get_result::(pg_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) - } else { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .get_result::(pg_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) - } + DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), + DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), + } + } else { + match conn { + DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND apigroup = $3".to_string(), + DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), + } + }; + + match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(pg_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) + } else { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(pg_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) } - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .get_result::(sqlite_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) - } else { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .get_result::(sqlite_conn) - .optional() - .map(|res| res.map(|data_result| data_result.data)) - } + } + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) + } else { + diesel::sql_query(fallback_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional() + .map(|res| res.map(|data_result| data_result.data)) } } } @@ -1017,6 +1032,7 @@ async fn get_data_from_kine( + // 辅助函数:从指定表中获取所有符合条件的数据的 `data` 字段 async fn get_all_data_from_kine( conn: &mut DbConnection, @@ -1030,8 +1046,8 @@ async fn get_all_data_from_kine( // 定义需要查询的表 let tables = ["kine", "kine_replica1", "kine_replica2"]; - // 用于统计所有表的查询结果 - let mut all_results: HashMap = HashMap::new(); + // 存储每个表的查询结果 + let mut table_results: HashMap<&str, Vec> = HashMap::new(); // 遍历每个表进行查询 for &table in &tables { @@ -1103,104 +1119,29 @@ async fn get_all_data_from_kine( } }; - // 将结果添加到统计中 - for data in results { - *all_results.entry(data).or_insert(0) += 1; - } + table_results.insert(table, results); } - // 处理少数服从多数逻辑 - if all_results.len() == 1 { - // 如果所有表结果一致,直接返回 - Ok(all_results.into_iter().map(|(data, _)| data).collect()) - } else { - // 找出出现次数最多的结果 - let max_count = all_results - .iter() - .max_by_key(|&(_, count)| count) - .map(|(_, count)| *count); - - if let Some(max_count) = max_count { - let filtered_results: Vec = all_results - .into_iter() - .filter(|(_, count)| *count == max_count) - .map(|(data, _)| data) - .collect(); - Ok(filtered_results) - } else { - // 如果所有表数据都不一致,返回主表 `kine` 的数据 - let fallback_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => { - "SELECT data FROM kine WHERE kind = $1 AND namespace = $2 AND apigroup = $3" - .to_string() - } - DbConnection::Sqlite(_) => { - "SELECT data FROM kine WHERE kind = ? AND namespace = ? AND apigroup = ?" - .to_string() - } - } - } else { - match conn { - DbConnection::Pg(_) => { - "SELECT data FROM kine WHERE kind = $1 AND apigroup = $2".to_string() - } - DbConnection::Sqlite(_) => { - "SELECT data FROM kine WHERE kind = ? AND apigroup = ?".to_string() - } - } - }; - - let fallback_results = match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(namespace) - .bind::(item_version) - .load::(pg_conn)? - .into_iter() - .map(|res| res.data) - .collect() - } else { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(item_version) - .load::(pg_conn)? - .into_iter() - .map(|res| res.data) - .collect() - } - } - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(namespace) - .bind::(item_version) - .load::(sqlite_conn)? - .into_iter() - .map(|res| res.data) - .collect() - } else { - diesel::sql_query(fallback_query) - .bind::(item_kind) - .bind::(item_version) - .load::(sqlite_conn)? - .into_iter() - .map(|res| res.data) - .collect() - } - } - }; - - Ok(fallback_results) + // 检查所有表的数据是否一致 + let mut unique_results: HashMap = HashMap::new(); + for results in table_results.values() { + for result in results { + *unique_results.entry(result.clone()).or_insert(0) += 1; } } + // 筛选出出现次数大于等于 2 的数据 + let filtered_results: Vec = unique_results + .into_iter() + .filter(|(_, count)| *count >= 2) // 过滤条件 + .map(|(data, _)| data) // 提取数据部分 + .collect(); + + Ok(filtered_results) } + // 发送请求并等待响应 async fn send_request( message: Message, -- Gitee From a295d648186a5923a3a4f0201dc680744213ae03 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Wed, 4 Dec 2024 01:34:05 +0000 Subject: [PATCH 09/10] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- database.sqlite | Bin 0 -> 61440 bytes 3 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 database.sqlite diff --git a/Cargo.lock b/Cargo.lock index 4a3d527..f509f09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "actix-codec" @@ -940,7 +940,7 @@ dependencies = [ [[package]] name = "fleet_apiserver" -version = "0.3.5" +version = "0.3.6" dependencies = [ "actix-http", "actix-service", diff --git a/Cargo.toml b/Cargo.toml index 66c88d3..1e3061b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fleet_apiserver" -version = "0.3.5" +version = "0.3.6" license = "Apache-2.0" authors = ["wuheng ", "chenhongyu23@otcaix.iscas.ac.cn"] categories = ["api-bindings"] diff --git a/database.sqlite b/database.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..b33cefefb2230930405061eb32ab59e8207c41f0 GIT binary patch literal 61440 zcmeI)%}?8A9Kdm#l)SYer4ovIK(eAqG!m59Nt}dUmeP#T0^LIBT1{1C5|5_K+Y*B| zbsR{0+z1)z zF~2O=%tF&_RLVs|>r0Hy=dzi6R>@~>FJ+a!BISl(5sSzxDGxHa`HwTX8(Kn-On0(t zM%8r4ZZ(YJVP!_6ywR+0HV#>>WLSoh&pym|H(qR-hGmutR{793*=&gV&c&bP7MC-* zN6NkIBjtwO{AtB*ZTEMlannk#Es;)VBIz-AXn9JOL}l~q)>jpA8Bj1bt$HV2=wm6U zeaT>Z=CTuYUrqIjBei;=sIUB6YPFwQ4YaTMJfVpR`Gse1RNMQ)KLC4?Jg5Bb(5;D~ zL$@;yFSkP;PiS^le*U}yx<_{Jxcgtv z0Y@GE8LvCEHtR$<(D8ygkmYNKMu*`Jtf9Jx;`Yu6=K#LtUTZm*jP#j0*V|PV*9lu@vn95_?fkrMxn~mdj;V@`b%B{>bWxJG7B;vT$x^L3J*3iEAmZeLWQr z4>|aoBz{;BKmY**5I_I{1Q0*~0R#|0;LHhpAiE+lEvm&;H5!YiG%c32Q!}bI6OAiT zbxw=TC1QTjFFE+9Bz{;BKmY**5I_I{1Q0*~0R#|0;9LldyCS~+TLO0c-xZjYf>#57 zh$j{V5I_I{1Q0*~0R#|0;9?5w+;e$D-tqSM!a}WHGFw~fv#^-W@u2&)@9`ct!LiBfxVgGjfmX1TTz-kuJ(8fMY1+NhW2 z7w_cWlud^l&H7fkWHuM?yvg0(J|u>FGnR_#sko|j!o3Fqyt{>3I@J!3cqe?5&l3v^ zPwVTgb;Bw?nH}g-FZ)f$?$+0j-!IFoHY#G~n%&1?H2Wxu^_pdfIZjO>_qhOXZrFrI zvuS)I+EK0=8~X?cNFQ&D#R*A|s+#Itn2x4)spN~)@!JXi7mL{d!ncluI@ zDAZF4d;UN1|95^S;SCT#009ILKmY**5I_I{1Q0k`0pkBB>&ey-KmY**5I_I{1Q0;r zVhEh2_$ z1Q0*~0R#|0009ILKmdVrCUA`SzjRfS#;kMZKz9&8009ILKmY**5I_I{1Wti~{M{*N z1WgbEcKqM|`#<;!4g?TD009ILKmY**5I_I{1Q0m00^#8;xBve?@&DmQlp=rt0tg_0 z00IagfB*sr99Mw&|8eC`dOhO*CoRhM5I_I{1Q0*~0R#|0009ILcn<=^|K9^TZ;1c` z2q1s}0tg_000Iag&=ny5e_BQY@&D7ZCuj!(2q1s}0tg_000IagfB*vjLEwn^|3BO? BucQC~ literal 0 HcmV?d00001 -- Gitee From fbae273faa7be23e0d509ba767a41ddac921635e Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Wed, 4 Dec 2024 01:34:34 +0000 Subject: [PATCH 10/10] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- database.sqlite | Bin 61440 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 database.sqlite diff --git a/database.sqlite b/database.sqlite deleted file mode 100644 index b33cefefb2230930405061eb32ab59e8207c41f0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 61440 zcmeI)%}?8A9Kdm#l)SYer4ovIK(eAqG!m59Nt}dUmeP#T0^LIBT1{1C5|5_K+Y*B| zbsR{0+z1)z zF~2O=%tF&_RLVs|>r0Hy=dzi6R>@~>FJ+a!BISl(5sSzxDGxHa`HwTX8(Kn-On0(t zM%8r4ZZ(YJVP!_6ywR+0HV#>>WLSoh&pym|H(qR-hGmutR{793*=&gV&c&bP7MC-* zN6NkIBjtwO{AtB*ZTEMlannk#Es;)VBIz-AXn9JOL}l~q)>jpA8Bj1bt$HV2=wm6U zeaT>Z=CTuYUrqIjBei;=sIUB6YPFwQ4YaTMJfVpR`Gse1RNMQ)KLC4?Jg5Bb(5;D~ zL$@;yFSkP;PiS^le*U}yx<_{Jxcgtv z0Y@GE8LvCEHtR$<(D8ygkmYNKMu*`Jtf9Jx;`Yu6=K#LtUTZm*jP#j0*V|PV*9lu@vn95_?fkrMxn~mdj;V@`b%B{>bWxJG7B;vT$x^L3J*3iEAmZeLWQr z4>|aoBz{;BKmY**5I_I{1Q0*~0R#|0;LHhpAiE+lEvm&;H5!YiG%c32Q!}bI6OAiT zbxw=TC1QTjFFE+9Bz{;BKmY**5I_I{1Q0*~0R#|0;9LldyCS~+TLO0c-xZjYf>#57 zh$j{V5I_I{1Q0*~0R#|0;9?5w+;e$D-tqSM!a}WHGFw~fv#^-W@u2&)@9`ct!LiBfxVgGjfmX1TTz-kuJ(8fMY1+NhW2 z7w_cWlud^l&H7fkWHuM?yvg0(J|u>FGnR_#sko|j!o3Fqyt{>3I@J!3cqe?5&l3v^ zPwVTgb;Bw?nH}g-FZ)f$?$+0j-!IFoHY#G~n%&1?H2Wxu^_pdfIZjO>_qhOXZrFrI zvuS)I+EK0=8~X?cNFQ&D#R*A|s+#Itn2x4)spN~)@!JXi7mL{d!ncluI@ zDAZF4d;UN1|95^S;SCT#009ILKmY**5I_I{1Q0k`0pkBB>&ey-KmY**5I_I{1Q0;r zVhEh2_$ z1Q0*~0R#|0009ILKmdVrCUA`SzjRfS#;kMZKz9&8009ILKmY**5I_I{1Wti~{M{*N z1WgbEcKqM|`#<;!4g?TD009ILKmY**5I_I{1Q0m00^#8;xBve?@&DmQlp=rt0tg_0 z00IagfB*sr99Mw&|8eC`dOhO*CoRhM5I_I{1Q0*~0R#|0009ILcn<=^|K9^TZ;1c` z2q1s}0tg_000Iag&=ny5e_BQY@&D7ZCuj!(2q1s}0tg_000IagfB*vjLEwn^|3BO? BucQC~ -- Gitee