From a97c76a1302188138a3b80ef6265cbbfef35e182 Mon Sep 17 00:00:00 2001 From: liyuanr Date: Tue, 26 Dec 2023 18:31:50 +0800 Subject: [PATCH] proxy:add agentclient to proxy and modify drain Add agentclient to proxy, decouple proxy and agent, modify drain, unify the use of log library, log printing, and variable names.Delete snafu and tracing from Cargo.toml for deleting no use packages. Signed-off-by: liyuanr --- KubeOS-Rust/Cargo.lock | 36 ---- KubeOS-Rust/proxy/Cargo.toml | 4 +- .../proxy/src/controller/agentclient.rs | 173 ++++++++++++++++++ .../proxy/src/controller/controller.rs | 79 ++++---- KubeOS-Rust/proxy/src/controller/drain.rs | 163 ++++++++--------- KubeOS-Rust/proxy/src/controller/mod.rs | 2 + KubeOS-Rust/proxy/src/main.rs | 5 +- 7 files changed, 293 insertions(+), 169 deletions(-) create mode 100644 KubeOS-Rust/proxy/src/controller/agentclient.rs diff --git a/KubeOS-Rust/Cargo.lock b/KubeOS-Rust/Cargo.lock index 8069b70c..08ad969e 100644 --- a/KubeOS-Rust/Cargo.lock +++ b/KubeOS-Rust/Cargo.lock @@ -330,12 +330,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "doc-comment" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" - [[package]] name = "downcast" version = "0.11.0" @@ -622,12 +616,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" - [[package]] name = "hermit-abi" version = "0.1.19" @@ -1621,13 +1609,11 @@ dependencies = [ "schemars", "serde", "serde_json", - "snafu", "socket2", "thiserror", "thread_local", "tokio 1.14.0", "tokio-retry", - "tracing", ] [[package]] @@ -2021,28 +2007,6 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" -[[package]] -name = "snafu" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" -dependencies = [ - "doc-comment", - "snafu-derive", -] - -[[package]] -name = "snafu-derive" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "socket2" version = "0.4.9" diff --git a/KubeOS-Rust/proxy/Cargo.toml b/KubeOS-Rust/proxy/Cargo.toml index 58cb1f51..a0c48358 100644 --- a/KubeOS-Rust/proxy/Cargo.toml +++ b/KubeOS-Rust/proxy/Cargo.toml @@ -2,6 +2,8 @@ name = "proxy" version = "0.1.0" edition = "2021" +description = "KubeOS os-proxy" +license = "MulanPSL-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -15,7 +17,6 @@ serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" thiserror = "1.0.29" env_logger = "0.9.0" -tracing = "0.1.29" schemars = "=0.8.10" socket2 = "=0.4.9" log = "=0.4.15" @@ -23,7 +24,6 @@ thread_local = "=1.1.4" async-trait = "0.1" regex = "=1.7.3" chrono = { version = "0.4", default-features = false, features = ["std"] } -snafu = "0.7" h2 = "=0.3.16" tokio-retry = "0.3" reqwest = { version = "=0.11.10", default-features = false, features = [ "json" ] } diff --git a/KubeOS-Rust/proxy/src/controller/agentclient.rs b/KubeOS-Rust/proxy/src/controller/agentclient.rs new file mode 100644 index 00000000..4d954ee8 --- /dev/null +++ b/KubeOS-Rust/proxy/src/controller/agentclient.rs @@ -0,0 +1,173 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * KubeOS is licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +use cli::{ + client::Client, + method::{ + callable_method::RpcMethod, configure::ConfigureMethod, + prepare_upgrade::PrepareUpgradeMethod, rollback::RollbackMethod, upgrade::UpgradeMethod, + }, +}; + +use agent_call::AgentCallClient; +use agent_error::Error; +use manager::api::{ + ConfigureRequest, KeyInfo as AgentKeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest, +}; +use std::collections::HashMap; +use std::path::Path; + +pub struct UpgradeInfo { + pub version: String, + pub image_type: String, + pub check_sum: String, + pub container_image: String, +} + +pub struct ConfigInfo { + pub configs: Vec, +} + +pub struct Sysconfig { + pub model: String, + pub config_path: String, + pub contents: HashMap, +} + +pub struct KeyInfo { + pub value: String, + pub operation: String, +} + +pub trait AgentMethod { + fn prepare_upgrade_method( + &self, + upgrade_info: UpgradeInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error>; + fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; + fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; + fn configure_method( + &self, + config_info: ConfigInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error>; +} + +pub mod agent_call { + use super::{Client, Error, RpcMethod}; + #[derive(Default)] + pub struct AgentCallClient {} + + impl AgentCallClient { + pub fn call_agent( + &self, + client: &Client, + method: T, + ) -> Result<(), Error> { + match method.call(client) { + Ok(_resp) => Ok(()), + Err(e) => Err(Error::AgentError { source: e }), + } + } + } +} + +pub struct AgentClient { + pub agent_client: Client, +} + +impl AgentClient { + pub fn new>(socket_path: P) -> Self { + AgentClient { + agent_client: Client::new(socket_path), + } + } +} + +impl AgentMethod for AgentClient { + fn prepare_upgrade_method( + &self, + upgrade_info: UpgradeInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error> { + let upgrade_request = UpgradeRequest { + version: upgrade_info.version, + image_type: upgrade_info.image_type, + check_sum: upgrade_info.check_sum, + container_image: upgrade_info.container_image, + }; + match agent_call.call_agent( + &self.agent_client, + PrepareUpgradeMethod::new(upgrade_request), + ) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } + + fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { + match agent_call.call_agent(&self.agent_client, UpgradeMethod::new()) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } + + fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { + match agent_call.call_agent(&self.agent_client, RollbackMethod::new()) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } + + fn configure_method( + &self, + config_info: ConfigInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error> { + let mut agent_configs: Vec = Vec::new(); + for config in config_info.configs { + let mut contents_tmp: HashMap = HashMap::new(); + for (key, key_info) in config.contents.iter() { + contents_tmp.insert( + key.to_string(), + AgentKeyInfo { + value: key_info.value.clone(), + operation: key_info.operation.clone(), + }, + ); + } + agent_configs.push(AgentSysconfig { + model: config.model, + config_path: config.config_path, + contents: contents_tmp, + }) + } + let config_request = ConfigureRequest { + configs: agent_configs, + }; + match agent_call.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } +} + +pub mod agent_error { + use thiserror::Error; + + #[derive(Error, Debug)] + pub enum Error { + #[error("{source}")] + AgentError { source: anyhow::Error }, + } +} diff --git a/KubeOS-Rust/proxy/src/controller/controller.rs b/KubeOS-Rust/proxy/src/controller/controller.rs index 6410cce1..da2e031e 100644 --- a/KubeOS-Rust/proxy/src/controller/controller.rs +++ b/KubeOS-Rust/proxy/src/controller/controller.rs @@ -10,25 +10,20 @@ * See the Mulan PSL v2 for more details. */ -use super::crd::{Content, OSInstance, OS}; -use super::drain::drain_os; -use super::utils::{check_version, get_config_version, ConfigOperation, ConfigType}; -use super::values::{ - LABEL_UPGRADING, NODE_STATUS_CONFIG, NODE_STATUS_IDLE, OPERATION_TYPE_ROLLBACK, - OPERATION_TYPE_UPGRADE, REQUEUE_ERROR, REQUEUE_NORMAL, -}; use super::{ - apiclient::{ApplyApi, ControllerClient}, - crd::Configs, -}; -use anyhow::Result; -use cli::{ - client::Client as AgentClient, - method::{ - callable_method::RpcMethod, configure::ConfigureMethod, - prepare_upgrade::PrepareUpgradeMethod, rollback::RollbackMethod, upgrade::UpgradeMethod, + agentclient::{ + agent_call::AgentCallClient, AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo, + }, + apiclient::ApplyApi, + crd::{Configs, Content, OSInstance, OS}, + drain::drain_os, + utils::{check_version, get_config_version, ConfigOperation, ConfigType}, + values::{ + LABEL_UPGRADING, NODE_STATUS_CONFIG, NODE_STATUS_IDLE, OPERATION_TYPE_ROLLBACK, + OPERATION_TYPE_UPGRADE, REQUEUE_ERROR, REQUEUE_NORMAL, }, }; +use anyhow::Result; use k8s_openapi::api::core::v1::Node; use kube::{ api::{Api, PostParams}, @@ -37,14 +32,13 @@ use kube::{ Client, ResourceExt, }; use log::{debug, error, info}; -use manager::api::{ConfigureRequest, KeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest}; use reconciler_error::Error; use std::collections::HashMap; use std::env; pub async fn reconcile( os: OS, - ctx: Context>, + ctx: Context>, ) -> Result { debug!("start reconcile"); let proxy_controller = ctx.get_ref(); @@ -159,7 +153,7 @@ pub async fn reconcile( pub fn error_policy( error: &Error, - _ctx: Context>, + _ctx: Context>, ) -> ReconcilerAction { error!("Reconciliation error:{}", error.to_string()); REQUEUE_ERROR @@ -169,14 +163,14 @@ struct ControllerResources { osinstance: OSInstance, node: Node, } -pub struct ProxyController { +pub struct ProxyController { k8s_client: Client, controller_client: T, - agent_client: AgentClient, + agent_client: U, } -impl ProxyController { - pub fn new(k8s_client: Client, controller_client: T, agent_client: AgentClient) -> Self { +impl ProxyController { + pub fn new(k8s_client: Client, controller_client: T, agent_client: U) -> Self { ProxyController { k8s_client, controller_client, @@ -185,7 +179,7 @@ impl ProxyController { } } -impl ProxyController { +impl ProxyController { async fn check_osi_exisit(&self, namespace: &str, node_name: &str) -> Result<(), Error> { let osi_api: Api = Api::namespaced(self.k8s_client.clone(), namespace); match osi_api.get(node_name).await { @@ -312,10 +306,13 @@ impl ProxyController { if config_info.need_config { match config_info.configs.and_then(convert_to_agent_config) { Some(agent_configs) => { - let config_request = ConfigureRequest { - configs: agent_configs, - }; - match ConfigureMethod::new(config_request).call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self.agent_client.configure_method( + ConfigInfo { + configs: agent_configs, + }, + agent_call_client, + ) { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -333,16 +330,19 @@ impl ProxyController { async fn upgrade_node(&self, os_cr: &OS, node: &Node) -> Result<(), Error> { debug!("start upgrade node"); - match os_cr.spec.opstype.as_str() { OPERATION_TYPE_UPGRADE => { - let upgrade_request = UpgradeRequest { + let upgrade_info = UpgradeInfo { version: os_cr.spec.osversion.clone(), image_type: os_cr.spec.imagetype.clone(), check_sum: os_cr.spec.checksum.clone(), container_image: os_cr.spec.containerimage.clone(), }; - match PrepareUpgradeMethod::new(upgrade_request).call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self + .agent_client + .prepare_upgrade_method(upgrade_info, agent_call_client) + { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -350,7 +350,8 @@ impl ProxyController { } self.evict_node(&node.name(), os_cr.spec.evictpodforce) .await?; - match UpgradeMethod::new().call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self.agent_client.upgrade_method(agent_call_client) { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -360,7 +361,8 @@ impl ProxyController { OPERATION_TYPE_ROLLBACK => { self.evict_node(&node.name(), os_cr.spec.evictpodforce) .await?; - match RollbackMethod::new().call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self.agent_client.rollback_method(agent_call_client) { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -395,7 +397,6 @@ impl ProxyController { async fn drain_node(&self, node_name: &str, force: bool) -> Result<(), Error> { use crate::controller::drain::error::DrainError::*; match drain_os(&self.k8s_client.clone(), node_name, force).await { - Err(FindTargetPods { source, .. }) => Err(Error::KubeError { source: source }), Err(DeletePodsError { errors, .. }) => Err(Error::DrainNodeError { value: errors.join("; "), }), @@ -404,13 +405,13 @@ impl ProxyController { } } -fn convert_to_agent_config(configs: Configs) -> Option> { - let mut agent_configs: Vec = Vec::new(); +fn convert_to_agent_config(configs: Configs) -> Option> { + let mut agent_configs: Vec = Vec::new(); if let Some(config_list) = configs.configs { for config in config_list.into_iter() { match config.contents.and_then(convert_to_config_hashmap) { Some(contents_tmp) => { - let config_tmp = AgentSysconfig { + let config_tmp = Sysconfig { model: config.model.unwrap_or_default(), config_path: config.configpath.unwrap_or_default(), contents: contents_tmp, @@ -445,6 +446,7 @@ fn convert_to_config_hashmap(contents: Vec) -> Option = match pods_api.list(&lp).await { Ok(pods @ ObjectList { .. }) => pods, Err(err) => { - return Err(DrainError::FindTargetPods { + return Err(GetPodListsError { source: err, node_name: node_name.to_string(), }); @@ -85,7 +87,7 @@ async fn get_pods_deleted( } } if filterd_err.len() > 0 { - return Err(error::DrainError::DeletePodsError { + return Err(DeletePodsError { errors: filterd_err, }); } @@ -114,16 +116,27 @@ async fn evict_pod( match eviction_result { Ok(_) => { pod.name(); - event!(Level::INFO, "Successfully evicted Pod '{}'", pod.name_any()); + debug!("Successfully evicted Pod '{}'", pod.name_any()); break; } Err(kube::Error::Api(e)) => { let status_code = StatusCode::from_u16(e.code); match status_code { - Ok(StatusCode::TOO_MANY_REQUESTS) => { - event!( - Level::ERROR, - "Too many requests when creating Eviction for Pod '{}': '{}'. This is likely due to respecting a Pod Disruption Budget. Retrying in {:.2}s.", + Ok(StatusCode::FORBIDDEN) => { + return Err(EvictionErrorNoRetry { + source: kube::Error::Api(e.clone()), + pod_name: pod.name_any(), + }); + } + Ok(StatusCode::NOT_FOUND) => { + return Err(EvictionErrorNoRetry { + source: kube::Error::Api(e.clone()), + pod_name: pod.name_any(), + }); + } + Ok(StatusCode::INTERNAL_SERVER_ERROR) => { + error!( + "Evict pod {} reported error: '{}' and will retry in {:.2}s. This error maybe is due to misconfigured PodDisruptionBudgets.", pod.name_any(), e, EVERY_EVICTION_RETRY.as_secs_f64() @@ -131,10 +144,8 @@ async fn evict_pod( sleep(EVERY_EVICTION_RETRY).await; continue; } - Ok(StatusCode::INTERNAL_SERVER_ERROR) => { - event!( - Level::ERROR, - "Error when evicting Pod '{}': '{}'. Check for misconfigured PodDisruptionBudgets. Retrying in {:.2}s.", + Ok(StatusCode::TOO_MANY_REQUESTS) => { + error!("Evict pod {} reported error: '{}' and will retry in {:.2}s. This error maybe is due to PodDisruptionBugets.", pod.name_any(), e, EVERY_EVICTION_RETRY.as_secs_f64() @@ -142,37 +153,24 @@ async fn evict_pod( sleep(EVERY_EVICTION_RETRY).await; continue; } - Ok(StatusCode::NOT_FOUND) => { - return Err(error::EvictionError::NonRetriableEviction { - source: kube::Error::Api(e.clone()), - pod_name: pod.name_any(), - }); - } - Ok(StatusCode::FORBIDDEN) => { - return Err(error::EvictionError::NonRetriableEviction { - source: kube::Error::Api(e.clone()), - pod_name: pod.name_any(), - }); - } Ok(_) => { - event!( - Level::ERROR, - "Error when evicting Pod '{}': '{}'.", + error!( + "Evict pod {} reported error: '{}'.", pod.name_any(), e ); - return Err(error::EvictionError::RetriableEviction { + return Err(EvictionErrorRetry { source: kube::Error::Api(e.clone()), pod_name: pod.name_any(), }); } Err(_) => { - event!( - Level::ERROR, - "Received invalid response code from Kubernetes API: '{}'", + error!( + "Evict pod {} reported error: '{}'.Received invalid response code from Kubernetes API", + pod.name_any(), e ); - return Err(error::EvictionError::RetriableEviction { + return Err(EvictionErrorRetry { source: kube::Error::Api(e.clone()), pod_name: pod.name_any(), }); @@ -180,8 +178,8 @@ async fn evict_pod( } } Err(e) => { - event!(Level::ERROR, "Eviction failed: '{}'. Retrying...", e); - return Err(error::EvictionError::RetriableEviction { + error!("Evict pod {} reported error: '{}' and will retry", pod.name_any(),e); + return Err(EvictionErrorRetry { source: e, pod_name: pod.name_any(), }); @@ -198,37 +196,28 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e let start_time = Instant::now(); let pod_api: Api = get_pod_api_with_namespace(k8s_client, pod); + let response_error_not_found: u16 = 404; loop { match pod_api.get(&pod.name_any()).await { - Err(kube::Error::Api(e)) if e.code == 404 => { - event!(Level::INFO, "Pod {} deleted.", pod.name_any()); - break; - } - Ok(p) if p.uid() != pod.uid() => { - let name = p - .metadata - .name - .clone() - .or_else(|| p.metadata.generate_name.clone()) - .unwrap_or_default(); - event!(Level::INFO, "Pod {} deleted.", name); + let name = (&p).name_any(); + info!("Pod {} deleted.", name); break; } - Ok(_) => { - event!( - Level::DEBUG, - "Pod '{}' not yet deleted. Waiting {}s.", + info!( + "Pod '{}' is not yet deleted. Waiting {}s.", pod.name_any(), EVERY_DELETION_CHECK.as_secs_f64() ); } - + Err(kube::Error::Api(e)) if e.code == response_error_not_found => { + info!("Pod {} is deleted.", pod.name_any()); + break; + } Err(e) => { - event!( - Level::ERROR, - "Could not determine if Pod '{}' has been deleted: '{}'. Waiting {}s.", + error!( + "Get pod {} reported error: '{}', whether pod is deleted cannot be determined, waiting {}s.", pod.name_any(), e, EVERY_DELETION_CHECK.as_secs_f64() @@ -236,7 +225,7 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e } } if start_time.elapsed() > TIMEOUT { - return Err(error::DrainError::WaitForDeletion { + return Err(WaitDeletionError { pod_name: pod.name_any(), max_wait: TIMEOUT, }); @@ -246,12 +235,14 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e } Ok(()) } + fn get_pod_api_with_namespace(client: &kube::Client, pod: &Pod) -> Api { match pod.metadata.namespace.as_ref() { Some(namespace) => Api::namespaced(client.clone(), namespace), None => Api::default_namespaced(client.clone()), } } + trait NameAny { fn name_any(self: &Self) -> String; } @@ -480,7 +471,7 @@ impl PodFilter for CombinedFilter { fn filter(self: &Self, pod: &Pod) -> Box { let mut filter_res = self.deleted_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -489,7 +480,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.daemon_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -498,7 +489,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.mirror_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -507,7 +498,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.local_storage_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -516,7 +507,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.unreplicated_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -592,7 +583,7 @@ impl tokio_retry::Condition for ErrorHandleStrategy { match self { Self::TolerateStrategy => false, Self::RetryStrategy => { - if let error::EvictionError::RetriableEviction { .. } = error { + if let error::EvictionError::EvictionErrorRetry { .. } = error { true } else { false @@ -603,46 +594,36 @@ impl tokio_retry::Condition for ErrorHandleStrategy { } pub mod error { - use snafu::Snafu; + use thiserror::Error; use tokio::time::Duration; - #[derive(Debug, Snafu)] - #[snafu(visibility(pub))] + #[derive(Debug, Error)] pub enum DrainError { - #[snafu(display("Unable to find drainable Pods for Node '{}': '{}'", node_name, source))] - FindTargetPods { + #[error("Get node {} pods list error reported: {}", node_name, source)] + GetPodListsError { source: kube::Error, node_name: String, }, - #[snafu( - display( - "Pod '{}' was not deleted in the time allocated ({:.2}s).", - pod_name, - max_wait.as_secs_f64() - ) - )] - WaitForDeletion { + #[error("Pod '{}' was not deleted in the time allocated ({:.2}s).",pod_name,max_wait.as_secs_f64())] + WaitDeletionError { pod_name: String, max_wait: Duration, }, - DeletePodsError { - errors: Vec, - }, + #[error("")] + DeletePodsError { errors: Vec }, } - #[derive(Debug, Snafu)] - #[snafu(visibility(pub))] + #[derive(Debug, Error)] pub enum EvictionError { - #[snafu(display("Unable to evict Pod '{}': '{}'", pod_name, source))] - RetriableEviction { + #[error("Evict Pod {} error: '{}'", pod_name, source)] + EvictionErrorRetry { source: kube::Error, pod_name: String, }, - #[snafu(display("Unable to evict Pod '{}': '{}'", pod_name, source))] - /// A fatal error occurred while attempting to evict a Pod. This will not be retried. - NonRetriableEviction { + #[error("Evict Pod {} error: '{}'", pod_name, source)] + EvictionErrorNoRetry { source: kube::Error, pod_name: String, }, diff --git a/KubeOS-Rust/proxy/src/controller/mod.rs b/KubeOS-Rust/proxy/src/controller/mod.rs index e2e06493..090febc6 100644 --- a/KubeOS-Rust/proxy/src/controller/mod.rs +++ b/KubeOS-Rust/proxy/src/controller/mod.rs @@ -10,6 +10,7 @@ * See the Mulan PSL v2 for more details. */ +mod agentclient; mod apiclient; mod controller; mod crd; @@ -17,6 +18,7 @@ mod drain; mod utils; mod values; +pub use agentclient::AgentClient; pub use apiclient::ControllerClient; pub use controller::{error_policy, reconcile, reconciler_error::Error, ProxyController}; pub use crd::OS; diff --git a/KubeOS-Rust/proxy/src/main.rs b/KubeOS-Rust/proxy/src/main.rs index 43610a64..ef53117d 100644 --- a/KubeOS-Rust/proxy/src/main.rs +++ b/KubeOS-Rust/proxy/src/main.rs @@ -20,8 +20,9 @@ use kube::{ }; use log::{error, info}; mod controller; -use cli::client::Client as AgentClient; -use controller::{error_policy, reconcile, ControllerClient, ProxyController, OS, SOCK_PATH}; +use controller::{ + error_policy, reconcile, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH, +}; const PROXY_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); #[tokio::main] -- Gitee