diff --git a/Cargo.lock b/Cargo.lock index 4a3d5272b94a120e25d4a7f13c68726d41cb71a2..f509f09ec3dee0bbb9b3d6803d7094a369546c9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "actix-codec" @@ -940,7 +940,7 @@ dependencies = [ [[package]] name = "fleet_apiserver" -version = "0.3.5" +version = "0.3.6" dependencies = [ "actix-http", "actix-service", diff --git a/Cargo.toml b/Cargo.toml index 66c88d36dccff38a293fddba14df2b7487b75e3f..1e3061b018d99dd817f175371e6844b2e50a29a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fleet_apiserver" -version = "0.3.5" +version = "0.3.6" license = "Apache-2.0" authors = ["wuheng ", "chenhongyu23@otcaix.iscas.ac.cn"] categories = ["api-bindings"] diff --git a/migrations/2024-11-03-082237_create_metadata_table/down.sql b/migrations/2024-11-03-082237_create_metadata_table/down.sql index 55d1278e1d51874b9c2cdf8decb6d49d70c62bfa..d1214a6c42573405f00fcb75eb8f69541099fed5 100644 --- a/migrations/2024-11-03-082237_create_metadata_table/down.sql +++ b/migrations/2024-11-03-082237_create_metadata_table/down.sql @@ -1,3 +1,7 @@ -- This file should undo anything in `up.sql` DROP TABLE IF EXISTS metadata; -DROP TABLE IF EXISTS kine; \ No newline at end of file +DROP TABLE IF EXISTS metadata_replica1; +DROP TABLE IF EXISTS metadata_replica2; +DROP TABLE IF EXISTS kine; +DROP TABLE IF EXISTS kine_replica1; +DROP TABLE IF EXISTS kine_replica2; \ No newline at end of file diff --git a/migrations/2024-11-03-082237_create_metadata_table/up.sql b/migrations/2024-11-03-082237_create_metadata_table/up.sql index 5515cb39dee951d2b8f6f25ceebe12c2f243d01c..5ae8177f7aab655612a090d257e1c24650c6ff40 100644 --- a/migrations/2024-11-03-082237_create_metadata_table/up.sql +++ b/migrations/2024-11-03-082237_create_metadata_table/up.sql @@ -9,7 +9,49 @@ CREATE TABLE IF NOT EXISTS metadata ( PRIMARY KEY (name, namespace, apigroup) ); +CREATE TABLE IF NOT EXISTS metadata_replica1 ( + name VARCHAR(256), + namespace BOOLEAN, + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (name, namespace, apigroup) + ); + +CREATE TABLE IF NOT EXISTS metadata_replica2 ( + name VARCHAR(256), + namespace BOOLEAN, + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (name, namespace, apigroup) + ); + CREATE TABLE IF NOT EXISTS kine ( + kind VARCHAR(256), + name VARCHAR(256), + namespace VARCHAR(256), + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (kind, name, namespace, apigroup) + ); + +CREATE TABLE IF NOT EXISTS kine_replica1 ( + kind VARCHAR(256), + name VARCHAR(256), + namespace VARCHAR(256), + apigroup VARCHAR(256), + data TEXT, + created_time VARCHAR(256), + updated_time VARCHAR(256), + PRIMARY KEY (kind, name, namespace, apigroup) + ); + +CREATE TABLE IF NOT EXISTS kine_replica2 ( kind VARCHAR(256), name VARCHAR(256), namespace VARCHAR(256), diff --git a/src/cores/db.rs b/src/cores/db.rs index 1a888dd743861ec2806db35ea965cce21d8457ef..6e9fd6adbd82175cc54d2144199bfa68cdb021fc 100644 --- a/src/cores/db.rs +++ b/src/cores/db.rs @@ -6,6 +6,7 @@ use diesel::sqlite::SqliteConnection; use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use serde_json::{json, Value}; +use diesel::connection::Connection; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); @@ -97,52 +98,145 @@ fn resource_templates() -> HashMap<&'static str, (bool, Value)> { templates } +fn initialize_metadata_in_transaction_pg( + transaction: &mut PgConnection, + templates: &HashMap<&str, (bool, Value)>, +) -> QueryResult<()> { + for (resource_name, (supports_namespace, template)) in templates.iter() { + let apigroup = template + .get("apiVersion") + .and_then(Value::as_str) + .unwrap_or("v1"); -impl DbPool { - // 内存数据库,用于编写单元测试函数 - pub fn new_in_memory() -> Result { - let manager = ConnectionManager::::new("file::memory:?cache=shared"); - let pool = Pool::builder().build(manager)?; - let mut conn = pool.get()?; - conn.run_pending_migrations(MIGRATIONS).unwrap(); - Self::initialize_metadata(&mut DbPool::Sqlite(pool.clone()).get_connection()?).unwrap(); - Ok(DbPool::Sqlite(pool)) - } + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}, '{}', '{}', '{}', '{}') + ON CONFLICT DO NOTHING;", + table_name, + resource_name, + supports_namespace, + apigroup, + template.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); - // 初始化所有资源的 metadata 和表结构 - pub fn initialize_metadata(conn: &mut DbConnection) -> QueryResult<()> { - let templates = resource_templates(); + sql_query(insert_metadata_query).execute(transaction)?; + } + } + Ok(()) +} - for (table_name, (supports_namespace, template)) in templates.iter() { +fn initialize_metadata_in_transaction_sqlite( + transaction: &mut SqliteConnection, + templates: &HashMap<&str, (bool, Value)>, +) -> QueryResult<()> { + for (resource_name, (supports_namespace, template)) in templates.iter() { + let apigroup = template + .get("apiVersion") + .and_then(Value::as_str) + .unwrap_or("v1"); - // 从模板中提取 `apiVersion` 字段作为 `apigroup` - let apigroup = template.get("apiVersion") - .and_then(Value::as_str) - .unwrap_or("v1"); + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + for table_name in table_array { let insert_metadata_query = format!( - "INSERT OR IGNORE INTO metadata (name, namespace, apigroup, data, created_time, updated_time) + "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) VALUES ('{}', {}, '{}', '{}', '{}', '{}');", table_name, - *supports_namespace, + resource_name, + supports_namespace, apigroup, template.to_string(), Utc::now().naive_utc().to_string(), Utc::now().naive_utc().to_string() ); - match conn { - DbConnection::Pg(pg_conn) => { - sql_query(insert_metadata_query).execute(pg_conn)?; - }, - DbConnection::Sqlite(sqlite_conn) => { - sql_query(insert_metadata_query).execute(sqlite_conn)?; + sql_query(insert_metadata_query).execute(transaction)?; + } + } + Ok(()) +} + + +impl DbPool { + + // 初始化所有资源的 metadata 表 + pub fn initialize_metadata(conn: &mut DbConnection) -> QueryResult<()> { + let templates = resource_templates(); + + match conn { + DbConnection::Pg(pg_conn) => { + pg_conn.transaction(|transaction| { + initialize_metadata_in_transaction_pg(transaction, &templates) + }) + } + DbConnection::Sqlite(sqlite_conn) => { + sqlite_conn.transaction(|transaction| { + initialize_metadata_in_transaction_sqlite(transaction, &templates) + }) + } + } + } + + + +/* pub fn initialize_metadata(conn: &mut DbConnection) -> QueryResult<()> { + let templates = resource_templates(); + + for (resource_name, (supports_namespace, template)) in templates.iter() { + + // 从模板中提取 `apiVersion` 字段作为 `apigroup` + let apigroup = template.get("apiVersion") + .and_then(Value::as_str) + .unwrap_or("v1"); + + let table_array: [String; 3] = [ + String::from("metadata"), + String::from("metadata_replica1"), + String::from("metadata_replica2"), + ]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ('{}', {}, '{}', '{}', '{}', '{}');", + table_name, + resource_name, + *supports_namespace, + apigroup, + template.to_string(), + Utc::now().naive_utc().to_string(), + Utc::now().naive_utc().to_string() + ); + + match conn { + DbConnection::Pg(pg_conn) => { + sql_query(insert_metadata_query).execute(pg_conn)?; + }, + DbConnection::Sqlite(sqlite_conn) => { + sql_query(insert_metadata_query).execute(sqlite_conn)?; + } } } + } Ok(()) + }*/ + + // 内存数据库,用于编写单元测试函数 + pub fn new_in_memory() -> Result { + let manager = ConnectionManager::::new("file::memory:?cache=shared"); + let pool = Pool::builder().build(manager)?; + let mut conn = pool.get()?; + conn.run_pending_migrations(MIGRATIONS).unwrap(); + Self::initialize_metadata(&mut DbPool::Sqlite(pool.clone()).get_connection()?).unwrap(); + Ok(DbPool::Sqlite(pool)) } + pub fn get_connection(&self) -> Result { match self { DbPool::Pg(pool) => pool.get().map(DbConnection::Pg), diff --git a/src/cores/handlers.rs b/src/cores/handlers.rs index a3ec74178d88dd2a5e873477435fb0d75f3ce413..4aca85807939fca3dedb41034df4ba2a33930c9b 100644 --- a/src/cores/handlers.rs +++ b/src/cores/handlers.rs @@ -5,7 +5,7 @@ use std::{time}; use async_trait::async_trait; use actix_web::{HttpResponse, Result, web, Error}; use chrono::Utc; -use diesel::{QueryResult, RunQueryDsl, QueryDsl, ExpressionMethods, OptionalExtension, QueryableByName}; +use diesel::{QueryResult, RunQueryDsl, QueryDsl, ExpressionMethods, OptionalExtension, QueryableByName, PgConnection, sql_query, SqliteConnection, Connection}; use serde_json::{json}; use serde_json::Value; use crate::cores::db::DbConnection; @@ -303,7 +303,6 @@ impl DefaultHandler { } - // 查询 metadata 表中的 plural 是否存在,并检查 namespace 要求是否满足 async fn check_metadata( conn: &mut DbConnection, @@ -312,31 +311,65 @@ async fn check_metadata( requires_namespace: bool, ) -> QueryResult { use diesel::dsl::count_star; - use crate::schema::metadata::dsl::*; + use crate::schema::metadata::dsl as metadata_dsl; + use crate::schema::metadata_replica1::dsl as replica1_dsl; + use crate::schema::metadata_replica2::dsl as replica2_dsl; let count; + let count_replica1; + let count_replica2; match conn { DbConnection::Pg(pg_conn) => { - count = metadata - .filter(name.eq(plural)) - .filter(apigroup.eq(version)) - .filter(namespace.eq(requires_namespace)) + count = metadata_dsl::metadata + .filter(metadata_dsl::name.eq(plural)) + .filter(metadata_dsl::apigroup.eq(version)) + .filter(metadata_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica1 = replica1_dsl::metadata_replica1 + .filter(replica1_dsl::name.eq(plural)) + .filter(replica1_dsl::apigroup.eq(version)) + .filter(replica1_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica2 = replica2_dsl::metadata_replica2 + .filter(replica2_dsl::name.eq(plural)) + .filter(replica2_dsl::apigroup.eq(version)) + .filter(replica2_dsl::namespace.eq(requires_namespace)) .select(count_star()) .first::(pg_conn)?; }, DbConnection::Sqlite(sqlite_conn) => { - count = metadata - .filter(name.eq(plural)) - .filter(apigroup.eq(version)) - .filter(namespace.eq(requires_namespace)) + count = metadata_dsl::metadata + .filter(metadata_dsl::name.eq(plural)) + .filter(metadata_dsl::apigroup.eq(version)) + .filter(metadata_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica1 = replica1_dsl::metadata_replica1 + .filter(replica1_dsl::name.eq(plural)) + .filter(replica1_dsl::apigroup.eq(version)) + .filter(replica1_dsl::namespace.eq(requires_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica2 = replica2_dsl::metadata_replica2 + .filter(replica2_dsl::name.eq(plural)) + .filter(replica2_dsl::apigroup.eq(version)) + .filter(replica2_dsl::namespace.eq(requires_namespace)) .select(count_star()) .first::(sqlite_conn)?; } } - Ok(count > 0) + let positive_count = [count, count_replica1, count_replica2].iter().filter(|&&x| x > 0).count(); + Ok(positive_count >= 2) } + // 查询 kine 表中指定的数据是否存在 async fn check_kine( conn: &mut DbConnection, @@ -346,52 +379,186 @@ async fn check_kine( item_namespace: Option<&str>, ) -> QueryResult { use diesel::dsl::count_star; - use crate::schema::kine::dsl::*; + use crate::schema::kine::dsl as kine_dsl; + use crate::schema::kine_replica1::dsl as replica1_dsl; + use crate::schema::kine_replica2::dsl as replica2_dsl; let count; + let count_replica1; + let count_replica2; match conn { DbConnection::Pg(pg_conn) => { if let Some(_) = item_namespace { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) - .filter(namespace.eq(item_namespace)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .filter(kine_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .filter(replica1_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) + .filter(replica2_dsl::namespace.eq(item_namespace)) .select(count_star()) .first::(pg_conn)?; } else { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(pg_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) .select(count_star()) .first::(pg_conn)?; } }, DbConnection::Sqlite(sqlite_conn) => { if let Some(_) = item_namespace { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) - .filter(namespace.eq(item_namespace)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .filter(kine_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .filter(replica1_dsl::namespace.eq(item_namespace)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) + .filter(replica2_dsl::namespace.eq(item_namespace)) .select(count_star()) .first::(sqlite_conn)?; } else { - count = kine - .filter(kind.eq(item_kind)) - .filter(name.eq(item_name)) - .filter(apigroup.eq(item_version)) + count = kine_dsl::kine + .filter(kine_dsl::kind.eq(item_kind)) + .filter(kine_dsl::name.eq(item_name)) + .filter(kine_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica1 = replica1_dsl::kine_replica1 + .filter(replica1_dsl::kind.eq(item_kind)) + .filter(replica1_dsl::name.eq(item_name)) + .filter(replica1_dsl::apigroup.eq(item_version)) + .select(count_star()) + .first::(sqlite_conn)?; + + count_replica2 = replica2_dsl::kine_replica2 + .filter(replica2_dsl::kind.eq(item_kind)) + .filter(replica2_dsl::name.eq(item_name)) + .filter(replica2_dsl::apigroup.eq(item_version)) .select(count_star()) .first::(sqlite_conn)?; } } } - Ok(count > 0) + let positive_count = [count, count_replica1, count_replica2].iter().filter(|&&x| x > 0).count(); + Ok(positive_count >= 2) +} + +fn insert_metadata_in_transaction_pg( + transaction: &mut PgConnection, + plural: &str, + version: &str, + namespace_required: bool, + json_data: &Value, +) -> QueryResult<()> { + use diesel::sql_types::{Bool}; + + // 表名列表 + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + + for table_name in table_array { + // 使用参数绑定构建插入查询 + let insert_metadata_query = format!( + "INSERT INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT DO NOTHING;", + table_name + ); + + // 执行插入操作 + sql_query(insert_metadata_query) + .bind::(plural) // 名称 + .bind::(namespace_required) // 是否需要命名空间 + .bind::(version) // 版本 + .bind::(json_data.to_string()) // JSON 数据 + .bind::(Utc::now().naive_utc().to_string()) // 创建时间 + .bind::(Utc::now().naive_utc().to_string()) // 更新时间 + .execute(transaction)?; + } + + Ok(()) +} + + +fn insert_metadata_in_transaction_sqlite( + transaction: &mut SqliteConnection, + plural: &str, + version: &str, + namespace_required: bool, + json_data: &Value, +) -> QueryResult<()> { + use diesel::sql_types::{Bool}; + + // 表名列表 + let table_array: [&str; 3] = ["metadata", "metadata_replica1", "metadata_replica2"]; + + for table_name in table_array { + // 使用参数绑定构建插入查询 + let insert_metadata_query = format!( + "INSERT OR IGNORE INTO {} (name, namespace, apigroup, data, created_time, updated_time) + VALUES (?, ?, ?, ?, ?, ?);", + table_name + ); + + // 执行插入操作 + sql_query(insert_metadata_query) + .bind::(plural) // 名称 + .bind::(namespace_required) // 是否需要命名空间 + .bind::(version) // 版本 + .bind::(json_data.to_string()) // JSON 数据 + .bind::(Utc::now().naive_utc().to_string()) // 创建时间 + .bind::(Utc::now().naive_utc().to_string()) // 更新时间 + .execute(transaction)?; + } + + Ok(()) } -// 在 metadata 表中插入新记录 async fn insert_metadata( conn: &mut DbConnection, plural: &str, @@ -399,40 +566,92 @@ async fn insert_metadata( namespace_required: bool, json_data: &Value ) -> QueryResult<()> { - use crate::schema::metadata::dsl::*; match conn { DbConnection::Pg(pg_conn) => { - diesel::insert_into(metadata) - .values(( - name.eq(plural), - apigroup.eq(version), - namespace.eq(namespace_required), - data.eq(json_data.to_string()), - created_time.eq(Utc::now().naive_utc().to_string()), - updated_time.eq(Utc::now().naive_utc().to_string()), - )) - .execute(pg_conn)?; - }, + pg_conn.transaction(|transaction| { + insert_metadata_in_transaction_pg(transaction, plural, version, namespace_required, json_data) + }) + } DbConnection::Sqlite(sqlite_conn) => { - diesel::insert_into(metadata) - .values(( - name.eq(plural), - apigroup.eq(version), - namespace.eq(namespace_required), - data.eq(json_data.to_string()), - created_time.eq(Utc::now().naive_utc().to_string()), - updated_time.eq(Utc::now().naive_utc().to_string()), - )) - .execute(sqlite_conn)?; + sqlite_conn.transaction(|transaction| { + insert_metadata_in_transaction_sqlite(transaction, plural, version, namespace_required, json_data) + }) } + }.expect("unknow conn in insert_metadata"); + Ok(()) +} + +fn insert_kine_in_transaction_pg( + transaction: &mut PgConnection, + item_kind: &str, + item_name: &str, + json_data: &Value, + version: &str, + namespace: Option<&str>, +) -> QueryResult<()> { + + // 表列表 + let table_array: [&str; 3] = ["kine", "kine_replica1", "kine_replica2"]; + + for table_name in table_array { + // 使用参数绑定构建插入查询 + let insert_metadata_query = format!( + "INSERT INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT DO NOTHING;", + table_name + ); + + // 执行插入操作 + sql_query(insert_metadata_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace.unwrap_or("")) // 空字符串作为默认 namespace + .bind::(version) + .bind::(json_data.to_string()) // 将 JSON 数据转换为字符串 + .bind::(Utc::now().naive_utc().to_string()) // 创建时间 + .bind::(Utc::now().naive_utc().to_string()) // 更新时间 + .execute(transaction)?; } + Ok(()) } +fn insert_kine_in_transaction_sqlite( + transaction: &mut SqliteConnection, + item_kind: &str, + item_name: &str, + json_data: &Value, + version: &str, + namespace: Option<&str>, +) -> QueryResult<()> { -// 插入数据到指定的表 -async fn insert_into_table( + let table_array: [&str; 3] = ["kine", "kine_replica1", "kine_replica2"]; + + for table_name in table_array { + let insert_metadata_query = format!( + "INSERT OR IGNORE INTO {} (kind, name, namespace, apigroup, data, created_time, updated_time) + VALUES (?, ?, ?, ?, ?, ?, ?);", + table_name + ); + + sql_query(insert_metadata_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace.unwrap_or("")) // 使用 Nullable 处理空值 + .bind::(version) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .execute(transaction)?; + } + Ok(()) +} + + +// 在 kine 表中插入新记录 +async fn insert_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, @@ -442,94 +661,106 @@ async fn insert_into_table( ) -> QueryResult<()> { match conn { DbConnection::Pg(pg_conn) => { - let insert_query = "INSERT INTO kine (kind, name, namespace, apigroup, data, created_time, updated_time) VALUES ($1, $2, $3, $4, $5, $6, $7)".to_string(); - diesel::sql_query(insert_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace.unwrap_or("")) - .bind::(version) - .bind::(json_data.to_string()) // 将 JSON 数据存储为 TEXT - .bind::(Utc::now().naive_utc().to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .execute(pg_conn)?; - }, + pg_conn.transaction(|transaction| { + insert_kine_in_transaction_pg(transaction, item_kind, item_name, json_data, version, namespace) + }) + } DbConnection::Sqlite(sqlite_conn) => { - let insert_query = "INSERT INTO kine (kind, name, namespace, apigroup, data, created_time, updated_time) VALUES (?, ?, ?, ?, ?, ?, ?)".to_string(); - diesel::sql_query(insert_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace.unwrap_or("")) - .bind::(version) - .bind::(json_data.to_string()) // 将 JSON 数据存储为 TEXT - .bind::(Utc::now().naive_utc().to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .execute(sqlite_conn)?; + sqlite_conn.transaction(|transaction| { + insert_kine_in_transaction_sqlite(transaction, item_kind, item_name, json_data, version, namespace) + }) } - } + }.expect("unknow conn in insert_kine"); Ok(()) } -// 从 plural 表中删除特定 name 的记录 -async fn delete_from_table( +// 从 kine 表中删除特定 name 的记录 +async fn delete_from_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, item_version: &str, item_namespace: Option<&str>, ) -> QueryResult { - // 根据是否存在 namespace 动态构建删除查询 - let delete_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "DELETE FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), - DbConnection::Sqlite(_) => "DELETE FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "DELETE FROM kine WHERE kind = $1 AND name = $2 AND apigroup = $3".to_string(), - DbConnection::Sqlite(_) => "DELETE FROM kine WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), - } - }; + use diesel::sql_types::Text; + + // 表名列表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + + // 遍历每个表,执行删除操作 + let mut total_rows_affected = 0; + + for &table in &tables { + let delete_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "DELETE FROM {} WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4", + table + ), + DbConnection::Sqlite(_) => format!( + "DELETE FROM {} WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "DELETE FROM {} WHERE kind = $1 AND name = $2 AND apigroup = $3", + table + ), + DbConnection::Sqlite(_) => format!( + "DELETE FROM {} WHERE kind = ? AND name = ? AND apigroup = ?", + table + ), + } + }; - // 执行删除操作 - let rows_affected = match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(pg_conn)? - } else { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(pg_conn)? + // 执行删除 + let rows_affected = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(pg_conn)? + } else { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(pg_conn)? + } } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(sqlite_conn)? - } else { - diesel::sql_query(delete_query) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(sqlite_conn)? + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(sqlite_conn)? + } else { + diesel::sql_query(delete_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(sqlite_conn)? + } } - } - }; - Ok(rows_affected > 0) + }; + + total_rows_affected += rows_affected; + } + + // 如果至少有两个表进行了删除,则返回 true + Ok(total_rows_affected > 1) } -async fn update_data_in_table( + +async fn update_data_in_kine( conn: &mut DbConnection, item_kind: &str, item_name: &str, @@ -537,65 +768,90 @@ async fn update_data_in_table( item_namespace: Option<&str>, json_data: &Value, ) -> QueryResult { + use diesel::sql_types::Text; + use chrono::Utc; + + // 需要更新的表列表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + let mut total_rows_affected = 0; + + for &table in &tables { + let update_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "UPDATE {} SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND namespace = $5 AND apigroup = $6", + table + ), + DbConnection::Sqlite(_) => format!( + "UPDATE {} SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "UPDATE {} SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND apigroup = $5", + table + ), + DbConnection::Sqlite(_) => format!( + "UPDATE {} SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND apigroup = ?", + table + ), + } + }; - let update_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "UPDATE kine SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND namespace = $5 AND apigroup = $6".to_string(), - DbConnection::Sqlite(_) => "UPDATE kine SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "UPDATE kine SET data = $1, updated_time = $2 WHERE kind = $3 AND name = $4 AND apigroup = $5".to_string(), - DbConnection::Sqlite(_) => "UPDATE kine SET data = ?, updated_time = ? WHERE kind = ? AND name = ? AND apigroup = ?".to_string(), - } - }; - - let rows_affected = match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(pg_conn)? - } else { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(pg_conn)? + // 执行更新操作 + let rows_affected = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(pg_conn)? + } else { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(pg_conn)? + } } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(namespace) - .bind::(item_version) - .execute(sqlite_conn)? - } else { - diesel::sql_query(update_query) - .bind::(json_data.to_string()) - .bind::(Utc::now().naive_utc().to_string()) - .bind::(item_kind) - .bind::(item_name) - .bind::(item_version) - .execute(sqlite_conn)? + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .execute(sqlite_conn)? + } else { + diesel::sql_query(update_query) + .bind::(json_data.to_string()) + .bind::(Utc::now().naive_utc().to_string()) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .execute(sqlite_conn)? + } } - } - }; + }; - Ok(rows_affected > 0) + total_rows_affected += rows_affected; + } + + // 如果至少有两个表更新成功,则返回 true + Ok(total_rows_affected > 1) } + // 辅助查询函数,用于获取数据的 `data` 字段 #[derive(QueryableByName)] struct DataResult { @@ -603,14 +859,122 @@ struct DataResult { data: String, } -async fn get_data_from_table( + + +async fn get_data_from_kine( + conn: &mut DbConnection, + item_kind: &str, + item_name: &str, + item_version: &str, + item_namespace: Option<&str>, +) -> QueryResult> { + use diesel::sql_types::Text; + use std::collections::HashMap; + + // 表名列表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + + // 存储每个表的查询结果 + let mut results = HashMap::new(); + + for &table in &tables { + let select_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND name = $2 AND apigroup = $3", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND name = ? AND apigroup = ?", + table + ), + } + }; + + let data_result = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(pg_conn) + .optional()? + .map(|res| res.data) + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(pg_conn) + .optional()? + .map(|res| res.data) + } + } + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(namespace) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional()? + .map(|res| res.data) + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_name) + .bind::(item_version) + .get_result::(sqlite_conn) + .optional()? + .map(|res| res.data) + } + } + }; + + if let Some(data) = data_result { + *results.entry(data).or_insert(0) += 1; + } + } + + // 按少数服从多数规则返回数据 + if results.len() == 1 { + // 如果三个表的结果一致,直接返回任意结果 + Ok(results.into_iter().next().map(|(data, _)| data)) + } else if results.values().all(|&count| count == 1) { + // 如果所有表结果不同,直接回退到 kine 表的数据 + get_data_from_kine_primary(conn, item_kind, item_name, item_version, item_namespace) + } else if let Some((data, _)) = results.into_iter().max_by_key(|&(_, count)| count) { + // 如果有多数一致的数据,返回该数据 + Ok(Some(data)) + } else { + // 默认回退到 kine 表的数据 + get_data_from_kine_primary(conn, item_kind, item_name, item_version, item_namespace) + } +} + +/// 获取主表 `kine` 的数据 +fn get_data_from_kine_primary( conn: &mut DbConnection, item_kind: &str, item_name: &str, item_version: &str, item_namespace: Option<&str>, ) -> QueryResult> { - let select_query = if let Some(_) = item_namespace { + let fallback_query = if let Some(_) = item_namespace { match conn { DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND name = $2 AND namespace = $3 AND apigroup = $4".to_string(), DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND name = ? AND namespace = ? AND apigroup = ?".to_string(), @@ -625,7 +989,7 @@ async fn get_data_from_table( match conn { DbConnection::Pg(pg_conn) => { if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) + diesel::sql_query(fallback_query) .bind::(item_kind) .bind::(item_name) .bind::(namespace) @@ -634,7 +998,7 @@ async fn get_data_from_table( .optional() .map(|res| res.map(|data_result| data_result.data)) } else { - diesel::sql_query(select_query) + diesel::sql_query(fallback_query) .bind::(item_kind) .bind::(item_name) .bind::(item_version) @@ -642,10 +1006,10 @@ async fn get_data_from_table( .optional() .map(|res| res.map(|data_result| data_result.data)) } - }, + } DbConnection::Sqlite(sqlite_conn) => { if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) + diesel::sql_query(fallback_query) .bind::(item_kind) .bind::(item_name) .bind::(namespace) @@ -654,7 +1018,7 @@ async fn get_data_from_table( .optional() .map(|res| res.map(|data_result| data_result.data)) } else { - diesel::sql_query(select_query) + diesel::sql_query(fallback_query) .bind::(item_kind) .bind::(item_name) .bind::(item_version) @@ -667,62 +1031,117 @@ async fn get_data_from_table( } + + // 辅助函数:从指定表中获取所有符合条件的数据的 `data` 字段 -async fn get_all_data_from_table( +async fn get_all_data_from_kine( conn: &mut DbConnection, item_kind: &str, item_version: &str, item_namespace: Option<&str>, ) -> QueryResult> { - let select_query = if let Some(_) = item_namespace { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND namespace = $2 AND apigroup = $3".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND namespace = ? AND apigroup = ?".to_string(), - } - } else { - match conn { - DbConnection::Pg(_) => "SELECT data FROM kine WHERE kind = $1 AND apigroup = $2".to_string(), - DbConnection::Sqlite(_) => "SELECT data FROM kine WHERE kind = ? AND apigroup = ?".to_string(), - } - }; + use diesel::sql_types::Text; + use std::collections::HashMap; + + // 定义需要查询的表 + let tables = ["kine", "kine_replica1", "kine_replica2"]; + + // 存储每个表的查询结果 + let mut table_results: HashMap<&str, Vec> = HashMap::new(); + + // 遍历每个表进行查询 + for &table in &tables { + let select_query = if let Some(_) = item_namespace { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND namespace = $2 AND apigroup = $3", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND namespace = ? AND apigroup = ?", + table + ), + } + } else { + match conn { + DbConnection::Pg(_) => format!( + "SELECT data FROM {} WHERE kind = $1 AND apigroup = $2", + table + ), + DbConnection::Sqlite(_) => format!( + "SELECT data FROM {} WHERE kind = ? AND apigroup = ?", + table + ), + } + }; - match conn { - DbConnection::Pg(pg_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(namespace) - .bind::(item_version) - .load::(pg_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) - } else { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_version) - .load::(pg_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) + // 执行查询 + let results: Vec = match conn { + DbConnection::Pg(pg_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(namespace) + .bind::(item_version) + .load::(pg_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_version) + .load::(pg_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } } - }, - DbConnection::Sqlite(sqlite_conn) => { - if let Some(namespace) = item_namespace { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(namespace) - .bind::(item_version) - .load::(sqlite_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) - } else { - diesel::sql_query(select_query) - .bind::(item_kind) - .bind::(item_version) - .load::(sqlite_conn) - .map(|results| results.into_iter().map(|res| res.data).collect()) + DbConnection::Sqlite(sqlite_conn) => { + if let Some(namespace) = item_namespace { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(namespace) + .bind::(item_version) + .load::(sqlite_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } else { + diesel::sql_query(select_query) + .bind::(item_kind) + .bind::(item_version) + .load::(sqlite_conn)? + .into_iter() + .map(|res| res.data) + .collect() + } } + }; + + table_results.insert(table, results); + } + + // 检查所有表的数据是否一致 + let mut unique_results: HashMap = HashMap::new(); + for results in table_results.values() { + for result in results { + *unique_results.entry(result.clone()).or_insert(0) += 1; } } + // 筛选出出现次数大于等于 2 的数据 + let filtered_results: Vec = unique_results + .into_iter() + .filter(|(_, count)| *count >= 2) // 过滤条件 + .map(|(data, _)| data) // 提取数据部分 + .collect(); + + Ok(filtered_results) } + + // 发送请求并等待响应 async fn send_request( message: Message, @@ -896,7 +1315,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &version, None) + insert_kine(db_connection, &plural, item_name, &data, &version, None) .await .map_err(ErrorInternalServerError)?; } @@ -989,7 +1408,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &version, Some(&namespace)) + insert_kine(db_connection, &plural, item_name, &data, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; } @@ -1083,7 +1502,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &ver, None) + insert_kine(db_connection, &plural, item_name, &data, &ver, None) .await .map_err(ErrorInternalServerError)?; } @@ -1178,7 +1597,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Create, data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Create, data.clone()).await; - insert_into_table(db_connection, &plural, item_name, &data, &ver, Some(&namespace)) + insert_kine(db_connection, &plural, item_name, &data, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; } @@ -1243,7 +1662,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; // 从 plural 表中删除指定的 name - let deleted = delete_from_table(db_connection, &plural, &name, &version, None) + let deleted = delete_from_kine(db_connection, &plural, &name, &version, None) .await .map_err(ErrorInternalServerError)?; @@ -1309,7 +1728,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; // 从 plural 表中删除指定的数据 - let deleted = delete_from_table(db_connection, &plural, &name, &version, Some(&namespace)) + let deleted = delete_from_kine(db_connection, &plural, &name, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; @@ -1375,7 +1794,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Delete, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; - let deleted = delete_from_table(db_connection, &plural, &name, &ver, None) + let deleted = delete_from_kine(db_connection, &plural, &name, &ver, None) .await .map_err(ErrorInternalServerError)?; @@ -1441,7 +1860,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Delete, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Delete, request_data).await; - let deleted = delete_from_table(db_connection, &plural, &name, &ver, Some(&namespace)) + let deleted = delete_from_kine(db_connection, &plural, &name, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; @@ -1510,7 +1929,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; // 查询 plural 表中是否存在 name 匹配的数据 - let updated = update_data_in_table(db_connection, &plural, &name, &version, None, &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &version, None, &data) .await .map_err(ErrorInternalServerError)?; @@ -1579,7 +1998,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Update, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; - let updated = update_data_in_table(db_connection, &plural, &name, &version, Some(&namespace), &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &version, Some(&namespace), &data) .await .map_err(ErrorInternalServerError)?; @@ -1648,7 +2067,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Update, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; - let updated = update_data_in_table(db_connection, &plural, &name, &ver, None, &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &ver, None, &data) .await .map_err(ErrorInternalServerError)?; @@ -1717,7 +2136,7 @@ impl Handler for DefaultHandler { event_manager.send_event(&resource_type, EventType::Update, request_data.clone()).await; event_manager.send_event(&watchone_resource_type, EventType::Update, request_data).await; - let updated = update_data_in_table(db_connection, &plural, &name, &ver, Some(&namespace), &data) + let updated = update_data_in_kine(db_connection, &plural, &name, &ver, Some(&namespace), &data) .await .map_err(ErrorInternalServerError)?; @@ -1745,7 +2164,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &version, None) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &version, None) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -1774,7 +2193,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &version, Some(&namespace)) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -1805,7 +2224,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &ver, None) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &ver, None) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -1836,7 +2255,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - if let Some(data) = get_data_from_table(db_connection, &plural, &name, &ver, Some(&namespace)) + if let Some(data) = get_data_from_kine(db_connection, &plural, &name, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)? { // 将字符串转换为 JSON 格式 @@ -1865,7 +2284,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &version, None) + let data_list = get_all_data_from_kine(db_connection, &plural, &version, None) .await .map_err(ErrorInternalServerError)?; @@ -1895,7 +2314,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &version, Some(&namespace)) + let data_list = get_all_data_from_kine(db_connection, &plural, &version, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; @@ -1927,7 +2346,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &ver, None) + let data_list = get_all_data_from_kine(db_connection, &plural, &ver, None) .await .map_err(ErrorInternalServerError)?; @@ -1959,7 +2378,7 @@ impl Handler for DefaultHandler { return Ok(HttpResponse::NotFound().json(json!({ "error": "该 plural 不存在,请检查 plural 版本以及是否需要 namespace" }))); } - let data_list = get_all_data_from_table(db_connection, &plural, &ver, Some(&namespace)) + let data_list = get_all_data_from_kine(db_connection, &plural, &ver, Some(&namespace)) .await .map_err(ErrorInternalServerError)?; diff --git a/src/schema.rs b/src/schema.rs index 687d79b8dfea450dca6315b0417741ddc75c342f..045e81a1d6e3afd4fd58925140a20f9cfbaef963 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -12,6 +12,30 @@ diesel::table! { } } +diesel::table! { + kine_replica1 (kind, name, namespace, apigroup) { + kind -> Nullable, + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + +diesel::table! { + kine_replica2 (kind, name, namespace, apigroup) { + kind -> Nullable, + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + diesel::table! { metadata (name, namespace, apigroup) { name -> Nullable, @@ -23,7 +47,33 @@ diesel::table! { } } +diesel::table! { + metadata_replica1 (name, namespace, apigroup) { + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + +diesel::table! { + metadata_replica2 (name, namespace, apigroup) { + name -> Nullable, + namespace -> Nullable, + apigroup -> Nullable, + data -> Nullable, + created_time -> Nullable, + updated_time -> Nullable, + } +} + diesel::allow_tables_to_appear_in_same_query!( kine, + kine_replica1, + kine_replica2, metadata, + metadata_replica1, + metadata_replica2, );