From 2e05a5ce397b1ad3b889e200ee278f9efb5276f8 Mon Sep 17 00:00:00 2001 From: liyuanr Date: Tue, 30 Jan 2024 15:04:13 +0800 Subject: [PATCH 1/2] proxy: Add unit tests and delete useless dependencies from Cargo.toml Add the proxy unit test ,modify the AgentCallClient , AgentClient and ProxyController struct design to make it easier to mock the agent invoking. Delete useless dependencies from Cargo.toml and Cargo.lock Signed-off-by: liyuanr --- KubeOS-Rust/Cargo.lock | 14 -- KubeOS-Rust/proxy/Cargo.toml | 3 +- .../proxy/src/controller/agentclient.rs | 74 ++++------ .../proxy/src/controller/apiserver_mock.rs | 137 +++++++++++++++--- .../proxy/src/controller/controller.rs | 121 ++++++++-------- KubeOS-Rust/proxy/src/controller/mod.rs | 2 +- KubeOS-Rust/proxy/src/controller/utils.rs | 12 +- KubeOS-Rust/proxy/src/drain.rs | 24 +-- KubeOS-Rust/proxy/src/main.rs | 9 +- 9 files changed, 232 insertions(+), 164 deletions(-) diff --git a/KubeOS-Rust/Cargo.lock b/KubeOS-Rust/Cargo.lock index c44c152d..2342c7b2 100644 --- a/KubeOS-Rust/Cargo.lock +++ b/KubeOS-Rust/Cargo.lock @@ -1180,18 +1180,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "mockall_double" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dffc15b97456ecc84d2bde8c1df79145e154f45225828c4361f676e1b82acd6" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "mockito" version = "0.31.1" @@ -1503,7 +1491,6 @@ dependencies = [ "anyhow", "assert-json-diff", "async-trait", - "chrono", "cli", "env_logger", "futures", @@ -1515,7 +1502,6 @@ dependencies = [ "log", "manager", "mockall", - "mockall_double", "regex", "reqwest", "schemars", diff --git a/KubeOS-Rust/proxy/Cargo.toml b/KubeOS-Rust/proxy/Cargo.toml index fe657afa..94e3b3c8 100644 --- a/KubeOS-Rust/proxy/Cargo.toml +++ b/KubeOS-Rust/proxy/Cargo.toml @@ -17,7 +17,6 @@ path = "src/main.rs" [dependencies] anyhow = "1.0.44" async-trait = "0.1" -chrono = { version = "0.4", default-features = false, features = ["std"] } cli = { version = "1.0.5", path = "../cli" } env_logger = "0.9.0" futures = "0.3.17" @@ -45,4 +44,4 @@ http = "0.2.9" hyper = "0.14.25" tower-test = "0.4.0" mockall = { version = "=0.11.3" } -mockall_double = "0.2.1" + diff --git a/KubeOS-Rust/proxy/src/controller/agentclient.rs b/KubeOS-Rust/proxy/src/controller/agentclient.rs index 73489a9b..b833f276 100644 --- a/KubeOS-Rust/proxy/src/controller/agentclient.rs +++ b/KubeOS-Rust/proxy/src/controller/agentclient.rs @@ -22,13 +22,6 @@ use cli::{ }; use manager::api::{CertsInfo, ConfigureRequest, KeyInfo as AgentKeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest}; -#[cfg_attr(test, double)] -use agent_call::AgentCallClient; -#[cfg(test)] -use mockall::automock; -#[cfg(test)] -use mockall_double::double; - pub struct UpgradeInfo { pub version: String, pub image_type: String, @@ -57,45 +50,40 @@ pub struct KeyInfo { pub operation: String, } -#[cfg_attr(test, automock)] pub trait AgentMethod { - fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo, agent_call: AgentCallClient) -> Result<(), Error>; - fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; - fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; - fn configure_method(&self, config_info: ConfigInfo, agent_call: AgentCallClient) -> Result<(), Error>; + fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo) -> Result<(), Error>; + fn upgrade_method(&self) -> Result<(), Error>; + fn rollback_method(&self) -> Result<(), Error>; + fn configure_method(&self, config_info: ConfigInfo) -> Result<(), Error>; } - -pub mod agent_call { - use super::{Client, Error, RpcMethod}; - #[cfg(test)] - use mockall::automock; - - #[derive(Default)] - pub struct AgentCallClient {} - - #[cfg_attr(test, automock)] - impl AgentCallClient { - pub fn call_agent(&self, client: &Client, method: T) -> Result<(), Error> { - match method.call(client) { - Ok(_resp) => Ok(()), - Err(e) => Err(Error::AgentError { source: e }), - } - } - } +pub trait AgentCall { + fn call_agent(&self, client: &Client, method: T) -> Result<(), Error>; } -pub struct AgentClient { +pub struct AgentClient { pub agent_client: Client, + pub agent_call_client: T, +} + +impl AgentClient { + pub fn new>(socket_path: P, agent_call_client: T) -> Self { + AgentClient { agent_client: Client::new(socket_path), agent_call_client } + } } -impl AgentClient { - pub fn new>(socket_path: P) -> Self { - AgentClient { agent_client: Client::new(socket_path) } +#[derive(Default)] +pub struct AgentCallClient {} +impl AgentCall for AgentCallClient { + fn call_agent(&self, client: &Client, method: T) -> Result<(), Error> { + match method.call(client) { + Ok(_resp) => Ok(()), + Err(e) => Err(Error::AgentError { source: e }), + } } } -impl AgentMethod for AgentClient { - fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo, agent_call: AgentCallClient) -> Result<(), Error> { +impl AgentMethod for AgentClient { + fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo) -> Result<(), Error> { let upgrade_request = UpgradeRequest { version: upgrade_info.version, image_type: upgrade_info.image_type, @@ -110,27 +98,27 @@ impl AgentMethod for AgentClient { client_key: upgrade_info.clientkey, }, }; - match agent_call.call_agent(&self.agent_client, PrepareUpgradeMethod::new(upgrade_request)) { + match self.agent_call_client.call_agent(&self.agent_client, PrepareUpgradeMethod::new(upgrade_request)) { Ok(_resp) => Ok(()), Err(e) => Err(e), } } - fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { - match agent_call.call_agent(&self.agent_client, UpgradeMethod::default()) { + fn upgrade_method(&self) -> Result<(), Error> { + match self.agent_call_client.call_agent(&self.agent_client, UpgradeMethod::default()) { Ok(_resp) => Ok(()), Err(e) => Err(e), } } - fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { - match agent_call.call_agent(&self.agent_client, RollbackMethod::default()) { + fn rollback_method(&self) -> Result<(), Error> { + match self.agent_call_client.call_agent(&self.agent_client, RollbackMethod::default()) { Ok(_resp) => Ok(()), Err(e) => Err(e), } } - fn configure_method(&self, config_info: ConfigInfo, agent_call: AgentCallClient) -> Result<(), Error> { + fn configure_method(&self, config_info: ConfigInfo) -> Result<(), Error> { let mut agent_configs: Vec = Vec::new(); for config in config_info.configs { let mut contents_tmp: HashMap = HashMap::new(); @@ -147,7 +135,7 @@ impl AgentMethod for AgentClient { }) } let config_request = ConfigureRequest { configs: agent_configs }; - match agent_call.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) { + match self.agent_call_client.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) { Ok(_resp) => Ok(()), Err(e) => Err(e), } diff --git a/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs b/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs index c46d26a4..ef5977c1 100644 --- a/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs +++ b/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs @@ -2,15 +2,20 @@ use self::mock_error::Error; use super::{ agentclient::*, crd::{Configs, OSInstanceStatus}, - values::{NODE_STATUS_CONFIG, NODE_STATUS_UPGRADE}, + values::{NODE_STATUS_CONFIG, NODE_STATUS_UPGRADE, OPERATION_TYPE_ROLLBACK}, }; use crate::controller::{ apiclient::{ApplyApi, ControllerClient}, - crd::{OSInstance, OSInstanceSpec, OSSpec, OS}, + crd::{Config, Content, OSInstance, OSInstanceSpec, OSSpec, OS}, values::{LABEL_OSINSTANCE, LABEL_UPGRADING, NODE_STATUS_IDLE}, ProxyController, }; use anyhow::Result; +use cli::client::Client; +use cli::method::{ + callable_method::RpcMethod, configure::ConfigureMethod, prepare_upgrade::PrepareUpgradeMethod, + rollback::RollbackMethod, upgrade::UpgradeMethod, +}; use http::{Request, Response}; use hyper::{body::to_bytes, Body}; use k8s_openapi::api::core::v1::Pod; @@ -19,7 +24,8 @@ use kube::{ api::ObjectMeta, core::{ListMeta, ObjectList}, }; -use kube::{Client, Resource, ResourceExt}; +use kube::{Client as KubeClient, Resource, ResourceExt}; +use mockall::mock; use std::collections::BTreeMap; type ApiServerHandle = tower_test::mock::Handle, Response>; @@ -34,6 +40,7 @@ pub enum Testcases { ConfigNormal(OSInstance), ConfigVersionMismatchReassign(OSInstance), ConfigVersionMismatchUpdate(OSInstance), + Rollback(OSInstance), } pub async fn timeout_after_5s(handle: tokio::task::JoinHandle<()>) { @@ -59,7 +66,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_get(osi) .await - }, + } Testcases::UpgradeNormal(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -78,7 +85,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_pod_list_get(osi) .await - }, + } Testcases::UpgradeUpgradeconfigsVersionMismatch(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -97,7 +104,7 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_nodestatus_idle(osi) .await - }, + } Testcases::UpgradeOSInstaceNodestatusConfig(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -107,7 +114,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_get_with_label(osi.clone()) .await - }, + } Testcases::UpgradeOSInstaceNodestatusIdle(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -123,7 +130,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_uncordon(osi) .await - }, + } Testcases::ConfigNormal(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -139,7 +146,7 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_nodestatus_idle(osi) .await - }, + } Testcases::ConfigVersionMismatchReassign(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -152,7 +159,7 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_nodestatus_idle(osi) .await - }, + } Testcases::ConfigVersionMismatchUpdate(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -165,7 +172,26 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_spec_sysconfig_v2(osi) .await - }, + } + Testcases::Rollback(osi) => { + self.handler_osinstance_get_exist(osi.clone()) + .await + .unwrap() + .handler_osinstance_get_exist(osi.clone()) + .await + .unwrap() + .handler_node_get_with_label(osi.clone()) + .await + .unwrap() + .handler_osinstance_patch_upgradeconfig_v2(osi.clone()) + .await + .unwrap() + .handler_node_cordon(osi.clone()) + .await + .unwrap() + .handler_node_pod_list_get(osi) + .await + } } .expect("Case completed without errors"); }) @@ -437,6 +463,7 @@ impl ApiServerVerifier { pub mod mock_error { use thiserror::Error; + #[derive(Error, Debug)] pub enum Error { #[error("Kubernetes reported error: {source}")] @@ -447,17 +474,27 @@ pub mod mock_error { } } -impl ProxyController { - pub fn test() -> (ProxyController, ApiServerVerifier) { +mock! { + pub AgentCallClient{} + impl AgentCall for AgentCallClient{ + fn call_agent(&self, client:&Client, method: T) -> Result<(), agent_error::Error> { + Ok(()) + } + } + +} +impl ProxyController { + pub fn test() -> (ProxyController, ApiServerVerifier) { let (mock_service, handle) = tower_test::mock::pair::, Response>(); - let mock_k8s_client = Client::new(mock_service, "default"); + let mock_k8s_client = KubeClient::new(mock_service, "default"); let mock_api_client = ControllerClient::new(mock_k8s_client.clone()); - let mut mock_agent_client: MockAgentMethod = MockAgentMethod::new(); - mock_agent_client.expect_rollback_method().returning(|_x| Ok(())); - mock_agent_client.expect_prepare_upgrade_method().returning(|_x, _y| Ok(())); - mock_agent_client.expect_upgrade_method().returning(|_x| Ok(())); - mock_agent_client.expect_configure_method().returning(|_x, _y| Ok(())); - let proxy_controller: ProxyController = + let mut mock_agent_call_client = MockAgentCallClient::new(); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + let mock_agent_client = AgentClient::new("test", mock_agent_call_client); + let proxy_controller: ProxyController = ProxyController::new(mock_k8s_client, mock_api_client, mock_agent_client); (proxy_controller, ApiServerVerifier(handle)) } @@ -495,7 +532,7 @@ impl OSInstance { } pub fn set_osi_nodestatus_config(node_name: &str, namespace: &str) -> Self { - // return osinstance with nodestatus = upgrade, upgradeconfig.version=v1, sysconfig.version=v1 + // return osinstance with nodestatus = config, upgradeconfig.version=v1, sysconfig.version=v1 let mut osinstance = OSInstance::set_osi_default(node_name, namespace); osinstance.spec.nodestatus = NODE_STATUS_CONFIG.to_string(); osinstance @@ -512,7 +549,18 @@ impl OSInstance { // return osinstance with nodestatus = upgrade, upgradeconfig.version=v2, sysconfig.version=v1 let mut osinstance = OSInstance::set_osi_default(node_name, namespace); osinstance.spec.nodestatus = NODE_STATUS_UPGRADE.to_string(); - osinstance.spec.upgradeconfigs.as_mut().unwrap().version = Some(String::from("v2")); + osinstance.spec.upgradeconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); osinstance } @@ -520,7 +568,18 @@ impl OSInstance { // return osinstance with nodestatus = upgrade, upgradeconfig.version=v2, sysconfig.version=v1 let mut osinstance = OSInstance::set_osi_default(node_name, namespace); osinstance.spec.nodestatus = NODE_STATUS_CONFIG.to_string(); - osinstance.spec.sysconfigs.as_mut().unwrap().version = Some(String::from("v2")); + osinstance.spec.sysconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); osinstance } } @@ -549,7 +608,37 @@ impl OS { pub fn set_os_syscon_v2_opstype_config() -> Self { let mut os = OS::set_os_default(); os.spec.opstype = String::from("config"); - os.spec.sysconfigs = Some(Configs { version: Some(String::from("v2")), configs: None }); + os.spec.sysconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); + os + } + + pub fn set_os_rollback_osversion_v2_upgradecon_v2() -> Self { + let mut os = OS::set_os_default(); + os.spec.osversion = String::from("KubeOS v2"); + os.spec.opstype = OPERATION_TYPE_ROLLBACK.to_string(); + os.spec.upgradeconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); os } } diff --git a/KubeOS-Rust/proxy/src/controller/controller.rs b/KubeOS-Rust/proxy/src/controller/controller.rs index b2bb332a..c21f3044 100644 --- a/KubeOS-Rust/proxy/src/controller/controller.rs +++ b/KubeOS-Rust/proxy/src/controller/controller.rs @@ -24,10 +24,8 @@ use kube::{ use log::{debug, error, info}; use reconciler_error::Error; -#[cfg_attr(test, double)] -use super::agentclient::agent_call::AgentCallClient; use super::{ - agentclient::{AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo}, + agentclient::{AgentCall, AgentClient, AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo}, apiclient::ApplyApi, crd::{Configs, Content, OSInstance, OS}, utils::{check_version, get_config_version, ConfigOperation, ConfigType}, @@ -36,12 +34,10 @@ use super::{ REQUEUE_ERROR, REQUEUE_NORMAL, }, }; -#[cfg(test)] -use mockall_double::double; -pub async fn reconcile( +pub async fn reconcile( os: OS, - ctx: Context>, + ctx: Context>, ) -> Result { debug!("start reconcile"); let proxy_controller = ctx.get_ref(); @@ -76,7 +72,7 @@ pub async fn reconcile( ) .await?; return Ok(REQUEUE_NORMAL); - }, + } ConfigOperation::UpdateConfig => { debug!("start update config"); osinstance.spec.sysconfigs = os_cr.spec.sysconfigs.clone(); @@ -85,8 +81,8 @@ pub async fn reconcile( .update_osinstance_spec(&osinstance.name(), &namespace, &osinstance.spec) .await?; return Ok(REQUEUE_ERROR); - }, - _ => {}, + } + _ => {} } proxy_controller.set_config(&mut osinstance, ConfigType::SysConfig).await?; proxy_controller @@ -108,8 +104,8 @@ pub async fn reconcile( ) .await?; return Ok(REQUEUE_NORMAL); - }, - _ => {}, + } + _ => {} } if node.labels().contains_key(LABEL_UPGRADING) { if osinstance.spec.nodestatus == NODE_STATUS_IDLE { @@ -133,9 +129,9 @@ pub async fn reconcile( Ok(REQUEUE_NORMAL) } -pub fn error_policy( +pub fn error_policy( error: &Error, - _ctx: Context>, + _ctx: Context>, ) -> ReconcilerAction { error!("Reconciliation error:{}", error.to_string()); REQUEUE_ERROR @@ -145,31 +141,31 @@ struct ControllerResources { osinstance: OSInstance, node: Node, } -pub struct ProxyController { +pub struct ProxyController { k8s_client: Client, controller_client: T, - agent_client: U, + agent_client: AgentClient, } -impl ProxyController { - pub fn new(k8s_client: Client, controller_client: T, agent_client: U) -> Self { +impl ProxyController { + pub fn new(k8s_client: Client, controller_client: T, agent_client: AgentClient) -> Self { ProxyController { k8s_client, controller_client, agent_client } } } -impl ProxyController { +impl ProxyController { async fn check_osi_exisit(&self, namespace: &str, node_name: &str) -> Result<(), Error> { let osi_api: Api = Api::namespaced(self.k8s_client.clone(), namespace); match osi_api.get(node_name).await { Ok(osi) => { debug!("osinstance is exist {:?}", osi.name()); return Ok(()); - }, + } Err(kube::Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => { info!("Create OSInstance {}", node_name); self.controller_client.create_osinstance(node_name, namespace).await?; Ok(()) - }, + } Err(err) => Err(Error::KubeError { source: err }), } } @@ -257,17 +253,16 @@ impl ProxyController { if config_info.need_config { match config_info.configs.and_then(convert_to_agent_config) { Some(agent_configs) => { - let agent_call_client = AgentCallClient::default(); - match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }, agent_call_client) { - Ok(_resp) => {}, + match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }) { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } - }, + } None => { - info!("config is none, no need to config"); - }, + info!("config is none, No content can be configured."); + } }; self.update_osi_status(osinstance, config_type).await?; } @@ -290,35 +285,34 @@ impl ProxyController { clientcert: os_cr.spec.clientcert.clone().unwrap_or_default(), clientkey: os_cr.spec.clientkey.clone().unwrap_or_default(), }; - let agent_call_client = AgentCallClient::default(); - match self.agent_client.prepare_upgrade_method(upgrade_info, agent_call_client) { - Ok(_resp) => {}, + + match self.agent_client.prepare_upgrade_method(upgrade_info) { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?; - let agent_call_client = AgentCallClient::default(); - match self.agent_client.upgrade_method(agent_call_client) { - Ok(_resp) => {}, + match self.agent_client.upgrade_method() { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } - }, + } OPERATION_TYPE_ROLLBACK => { self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?; - let agent_call_client = AgentCallClient::default(); - match self.agent_client.rollback_method(agent_call_client) { - Ok(_resp) => {}, + + match self.agent_client.rollback_method() { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } - }, + } _ => { return Err(Error::OperationError { value: os_cr.spec.opstype.clone() }); - }, + } } Ok(()) } @@ -329,12 +323,12 @@ impl ProxyController { node_api.cordon(node_name).await?; info!("Cordon node Successfully{}, start drain nodes", node_name); match self.drain_node(node_name, evict_pod_force).await { - Ok(()) => {}, + Ok(()) => {} Err(e) => { node_api.uncordon(node_name).await?; info!("Drain node {} error, uncordon node successfully", node_name); return Err(e); - }, + } } Ok(()) } @@ -360,7 +354,7 @@ fn convert_to_agent_config(configs: Configs) -> Option> { contents: contents_tmp, }; agent_configs.push(config_tmp) - }, + } None => { info!( "model {} which has configpath {} do not has any contents no need to configure", @@ -368,7 +362,7 @@ fn convert_to_agent_config(configs: Configs) -> Option> { config.configpath.unwrap_or_default() ); continue; - }, + } }; } if agent_configs.len() == 0 { @@ -437,16 +431,13 @@ pub mod reconciler_error { #[cfg(test)] mod test { use super::{error_policy, reconcile, Context, OSInstance, ProxyController, OS}; + use crate::controller::apiserver_mock::{timeout_after_5s, MockAgentCallClient, Testcases}; use crate::controller::ControllerClient; - use crate::controller::{ - agentclient::MockAgentMethod, - apiserver_mock::{timeout_after_5s, Testcases}, - }; use std::env; #[tokio::test] async fn test_create_osinstance_with_no_upgrade_or_configuration() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_default(); let context = Context::new(test_proxy_controller); @@ -457,7 +448,7 @@ mod test { } #[tokio::test] async fn test_upgrade_normal() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_upgradecon_v2(); let context = Context::new(test_proxy_controller); @@ -471,7 +462,7 @@ mod test { #[tokio::test] async fn test_diff_osversion_opstype_config() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -488,7 +479,7 @@ mod test { #[tokio::test] async fn test_upgradeconfigs_version_mismatch() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_upgradecon_v2(); let context = Context::new(test_proxy_controller); @@ -501,7 +492,7 @@ mod test { #[tokio::test] async fn test_upgrade_nodestatus_idle() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_upgradecon_v2(); let context = Context::new(test_proxy_controller); @@ -513,7 +504,7 @@ mod test { #[tokio::test] async fn test_config_normal() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_syscon_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -525,7 +516,7 @@ mod test { #[tokio::test] async fn test_sysconfig_version_mismatch_reassign() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_syscon_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -539,7 +530,7 @@ mod test { #[tokio::test] async fn test_sysconfig_version_mismatch_update() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_syscon_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -550,4 +541,16 @@ mod test { reconcile(os, context.clone()).await.expect("reconciler"); timeout_after_5s(mocksrv).await; } + + #[tokio::test] + async fn test_rollback() { + let (test_proxy_controller, fakeserver) = ProxyController::::test(); + env::set_var("NODE_NAME", "openeuler"); + let os = OS::set_os_rollback_osversion_v2_upgradecon_v2(); + let context = Context::new(test_proxy_controller); + let mocksrv = fakeserver + .run(Testcases::Rollback(OSInstance::set_osi_nodestatus_upgrade_upgradecon_v2("openeuler", "default"))); + reconcile(os, context.clone()).await.expect("reconciler"); + timeout_after_5s(mocksrv).await; + } } diff --git a/KubeOS-Rust/proxy/src/controller/mod.rs b/KubeOS-Rust/proxy/src/controller/mod.rs index 73be45c9..e30c8df7 100644 --- a/KubeOS-Rust/proxy/src/controller/mod.rs +++ b/KubeOS-Rust/proxy/src/controller/mod.rs @@ -19,7 +19,7 @@ mod crd; mod utils; mod values; -pub use agentclient::AgentClient; +pub use agentclient::{AgentCallClient, AgentClient}; pub use apiclient::ControllerClient; pub use controller::{error_policy, reconcile, reconciler_error::Error, ProxyController}; pub use crd::OS; diff --git a/KubeOS-Rust/proxy/src/controller/utils.rs b/KubeOS-Rust/proxy/src/controller/utils.rs index 78502db0..0f568788 100644 --- a/KubeOS-Rust/proxy/src/controller/utils.rs +++ b/KubeOS-Rust/proxy/src/controller/utils.rs @@ -56,7 +56,7 @@ impl ConfigType { ); return ConfigOperation::Reassign; } - }, + } ConfigType::SysConfig => { let os_config_version = get_config_version(os.spec.sysconfigs.as_ref()); let osi_config_version = get_config_version(osinstance.spec.sysconfigs.as_ref()); @@ -78,7 +78,7 @@ impl ConfigType { return ConfigOperation::UpdateConfig; } } - }, + } }; ConfigOperation::DoNothing } @@ -96,7 +96,7 @@ impl ConfigType { status_config_version = get_config_version(None); } configs = osinstance.spec.upgradeconfigs.clone(); - }, + } ConfigType::SysConfig => { spec_config_version = get_config_version(osinstance.spec.sysconfigs.as_ref()); if let Some(osinstance_status) = osinstance.status.as_ref() { @@ -105,7 +105,7 @@ impl ConfigType { status_config_version = get_config_version(None); } configs = osinstance.spec.sysconfigs.clone(); - }, + } } debug!( "=======osinstance soec config version is {},status config version is {}", @@ -127,7 +127,7 @@ impl ConfigType { sysconfigs: None, }) } - }, + } ConfigType::SysConfig => { if let Some(osi_status) = &mut osinstance.status { osi_status.sysconfigs = osinstance.spec.sysconfigs.clone(); @@ -135,7 +135,7 @@ impl ConfigType { osinstance.status = Some(OSInstanceStatus { upgradeconfigs: None, sysconfigs: osinstance.spec.sysconfigs.clone() }) } - }, + } } } } diff --git a/KubeOS-Rust/proxy/src/drain.rs b/KubeOS-Rust/proxy/src/drain.rs index 09cf6625..72836f98 100644 --- a/KubeOS-Rust/proxy/src/drain.rs +++ b/KubeOS-Rust/proxy/src/drain.rs @@ -66,7 +66,7 @@ async fn get_pods_deleted( Ok(pods @ ObjectList { .. }) => pods, Err(err) => { return Err(GetPodListsError { source: err, node_name: node_name.to_string() }); - }, + } }; let mut filterd_pods_list: Vec = Vec::new(); let mut filterd_err: Vec = Vec::new(); @@ -189,14 +189,14 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e let name = (&p).name_any(); info!("Pod {} deleted.", name); break; - }, + } Ok(_) => { info!("Pod '{}' is not yet deleted. Waiting {}s.", pod.name_any(), EVERY_DELETION_CHECK.as_secs_f64()); - }, + } Err(kube::Error::Api(e)) if e.code == response_error_not_found => { info!("Pod {} is deleted.", pod.name_any()); break; - }, + } Err(e) => { error!( "Get pod {} reported error: '{}', whether pod is deleted cannot be determined, waiting {}s.", @@ -204,7 +204,7 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e e, EVERY_DELETION_CHECK.as_secs_f64() ); - }, + } } if start_time.elapsed() > TIMEOUT { return Err(WaitDeletionError { pod_name: pod.name_any(), max_wait: TIMEOUT }); @@ -241,7 +241,7 @@ impl PodFilter for FinishedOrFailedFilter { return match pod.status.as_ref() { Some(PodStatus { phase: Some(phase), .. }) if phase == "Failed" || phase == "Succeeded" => { FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay) - }, + } _ => FilterResult::create_filter_result(false, "", PodDeleteStatus::Okay), }; } @@ -269,7 +269,7 @@ impl PodFilter for DaemonFilter { let description = format!("Cannot drain Pod '{}': Pod is member of a DaemonSet", pod.name_any()); Box::new(FilterResult { result: false, desc: description, status: PodDeleteStatus::Error }) } - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -287,7 +287,7 @@ impl PodFilter for MirrorFilter { Some(annotations) if annotations.contains_key("kubernetes.io/config.mirror") => { let description = format!("Ignore Pod '{}': Pod is a static Mirror Pod", pod.name_any()); FilterResult::create_filter_result(false, &description.to_string(), PodDeleteStatus::Warning) - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -312,7 +312,7 @@ impl PodFilter for LocalStorageFilter { let description = format!("Cannot drain Pod '{}': Pod has local Storage", pod.name_any()); Box::new(FilterResult { result: false, desc: description, status: PodDeleteStatus::Error }) } - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -365,7 +365,7 @@ impl PodFilter for DeletedFilter { && now - Duration::from_secs(time.0.timestamp() as u64) >= self.delete_wait_timeout => { FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay) - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -471,7 +471,7 @@ impl ErrorHandleStrategy { return match self { Self::TolerateStrategy => { return backoff.take(0); - }, + } Self::RetryStrategy => backoff.take(MAX_RETRIES_TIMES), }; @@ -488,7 +488,7 @@ impl tokio_retry::Condition for ErrorHandleStrategy { } else { false } - }, + } } } } diff --git a/KubeOS-Rust/proxy/src/main.rs b/KubeOS-Rust/proxy/src/main.rs index cd601d0f..ad36b642 100644 --- a/KubeOS-Rust/proxy/src/main.rs +++ b/KubeOS-Rust/proxy/src/main.rs @@ -20,7 +20,9 @@ use kube::{ }; use log::{error, info}; mod controller; -use controller::{error_policy, reconcile, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH}; +use controller::{ + error_policy, reconcile, AgentCallClient, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH, +}; const PROXY_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); #[tokio::main] @@ -29,14 +31,15 @@ async fn main() -> Result<()> { let client = Client::try_default().await?; let os: Api = Api::all(client.clone()); let controller_client = ControllerClient::new(client.clone()); - let agent_client = AgentClient::new(SOCK_PATH); + let agent_call_client = AgentCallClient::default(); + let agent_client = AgentClient::new(SOCK_PATH, agent_call_client); let proxy_controller = ProxyController::new(client, controller_client, agent_client); info!("os-proxy version is {}, start renconcile", PROXY_VERSION.unwrap_or("Not Found")); Controller::new(os, ListParams::default()) .run(reconcile, error_policy, Context::new(proxy_controller)) .for_each(|res| async move { match res { - Ok(_o) => {}, + Ok(_o) => {} Err(e) => error!("reconcile failed: {}", e.to_string()), } }) -- Gitee From 1c5fcb965561dd7fb48118ca50952a5323ae93be Mon Sep 17 00:00:00 2001 From: liyuanr Date: Tue, 30 Jan 2024 15:05:20 +0800 Subject: [PATCH 2/2] proxy: fix code review issues 1. Fixed the enumeration naming problem. 2. Resolved the problem of redundant return keywords. 3. Fix unnecessary reference issues. 4. Fix unnecessary matches and replace them with if let. 5. Fix unnecessary copying of bool values. Signed-off-by: liyuanr --- .../proxy/src/controller/controller.rs | 47 +++++++++---------- KubeOS-Rust/proxy/src/controller/utils.rs | 12 ++--- 2 files changed, 28 insertions(+), 31 deletions(-) diff --git a/KubeOS-Rust/proxy/src/controller/controller.rs b/KubeOS-Rust/proxy/src/controller/controller.rs index c21f3044..ad443802 100644 --- a/KubeOS-Rust/proxy/src/controller/controller.rs +++ b/KubeOS-Rust/proxy/src/controller/controller.rs @@ -59,7 +59,7 @@ pub async fn reconcile( .ok_or(Error::MissingSubResource { value: String::from("node.status.node_info") })? .os_image; debug!("os expected osversion is {},actual osversion is {}", os_cr.spec.osversion, node_os_image); - if check_version(&os_cr.spec.osversion, &node_os_image) { + if check_version(&os_cr.spec.osversion, node_os_image) { match ConfigType::SysConfig.check_config_version(&os, &osinstance) { ConfigOperation::Reassign => { debug!("start reassign"); @@ -92,8 +92,7 @@ pub async fn reconcile( if os_cr.spec.opstype == NODE_STATUS_CONFIG { return Err(Error::UpgradeBeforeConfig); } - match ConfigType::UpgradeConfig.check_config_version(&os, &osinstance) { - ConfigOperation::Reassign => { + if let ConfigOperation::Reassign = ConfigType::UpgradeConfig.check_config_version(&os, &osinstance) { debug!("start reassign"); proxy_controller .refresh_node( @@ -104,8 +103,6 @@ pub async fn reconcile( ) .await?; return Ok(REQUEUE_NORMAL); - } - _ => {} } if node.labels().contains_key(LABEL_UPGRADING) { if osinstance.spec.nodestatus == NODE_STATUS_IDLE { @@ -159,14 +156,14 @@ impl ProxyController { match osi_api.get(node_name).await { Ok(osi) => { debug!("osinstance is exist {:?}", osi.name()); - return Ok(()); + Ok(()) } Err(kube::Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => { info!("Create OSInstance {}", node_name); self.controller_client.create_osinstance(node_name, namespace).await?; Ok(()) } - Err(err) => Err(Error::KubeError { source: err }), + Err(err) => Err(Error::KubeClient { source: err }), } } @@ -243,7 +240,7 @@ impl ProxyController { let namespace = &osinstance .namespace() .ok_or(Error::MissingObjectKey { resource: "osinstance".to_string(), value: "namespace".to_string() })?; - self.controller_client.update_osinstance_status(&osinstance.name(), &namespace, &osinstance.status).await?; + self.controller_client.update_osinstance_status(&osinstance.name(), namespace, &osinstance.status).await?; Ok(()) } @@ -256,7 +253,7 @@ impl ProxyController { match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }) { Ok(_resp) => {} Err(e) => { - return Err(Error::AgentError { source: e }); + return Err(Error::Agent { source: e }); } } } @@ -278,9 +275,9 @@ impl ProxyController { image_type: os_cr.spec.imagetype.clone(), check_sum: os_cr.spec.checksum.clone(), container_image: os_cr.spec.containerimage.clone(), - flagsafe: os_cr.spec.flagsafe.clone(), + flagsafe: os_cr.spec.flagsafe, imageurl: os_cr.spec.imageurl.clone(), - mtls: os_cr.spec.mtls.clone(), + mtls: os_cr.spec.mtls, cacert: os_cr.spec.cacert.clone().unwrap_or_default(), clientcert: os_cr.spec.clientcert.clone().unwrap_or_default(), clientkey: os_cr.spec.clientkey.clone().unwrap_or_default(), @@ -289,14 +286,14 @@ impl ProxyController { match self.agent_client.prepare_upgrade_method(upgrade_info) { Ok(_resp) => {} Err(e) => { - return Err(Error::AgentError { source: e }); + return Err(Error::Agent { source: e }); } } self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?; match self.agent_client.upgrade_method() { Ok(_resp) => {} Err(e) => { - return Err(Error::AgentError { source: e }); + return Err(Error::Agent { source: e }); } } } @@ -306,12 +303,12 @@ impl ProxyController { match self.agent_client.rollback_method() { Ok(_resp) => {} Err(e) => { - return Err(Error::AgentError { source: e }); + return Err(Error::Agent { source: e }); } } } _ => { - return Err(Error::OperationError { value: os_cr.spec.opstype.clone() }); + return Err(Error::Operation { value: os_cr.spec.opstype.clone() }); } } Ok(()) @@ -336,7 +333,7 @@ impl ProxyController { async fn drain_node(&self, node_name: &str, force: bool) -> Result<(), Error> { use drain::error::DrainError::*; match drain_os(&self.k8s_client.clone(), node_name, force).await { - Err(DeletePodsError { errors, .. }) => Err(Error::DrainNodeError { value: errors.join("; ") }), + Err(DeletePodsError { errors, .. }) => Err(Error::DrainNode { value: errors.join("; ") }), _ => Ok(()), } } @@ -365,13 +362,13 @@ fn convert_to_agent_config(configs: Configs) -> Option> { } }; } - if agent_configs.len() == 0 { + if agent_configs.is_empty() { info!("no contents in all models, no need to configure"); return None; } return Some(agent_configs); } - return None; + None } fn convert_to_config_hashmap(contents: Vec) -> Option> { @@ -381,7 +378,7 @@ fn convert_to_config_hashmap(contents: Vec) -> Option