diff --git a/KubeOS-Rust/Cargo.lock b/KubeOS-Rust/Cargo.lock index 8069b70c853bc3528eaeb30f166567a4cf6af753..08ad969e1521f7947ded4b4c8e7f7f85cd4c6867 100644 --- a/KubeOS-Rust/Cargo.lock +++ b/KubeOS-Rust/Cargo.lock @@ -330,12 +330,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "doc-comment" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" - [[package]] name = "downcast" version = "0.11.0" @@ -622,12 +616,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" - [[package]] name = "hermit-abi" version = "0.1.19" @@ -1621,13 +1609,11 @@ dependencies = [ "schemars", "serde", "serde_json", - "snafu", "socket2", "thiserror", "thread_local", "tokio 1.14.0", "tokio-retry", - "tracing", ] [[package]] @@ -2021,28 +2007,6 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" -[[package]] -name = "snafu" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" -dependencies = [ - "doc-comment", - "snafu-derive", -] - -[[package]] -name = "snafu-derive" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "socket2" version = "0.4.9" diff --git a/KubeOS-Rust/proxy/Cargo.toml b/KubeOS-Rust/proxy/Cargo.toml index 58cb1f5180571f42e876b9d001ca7e067602e642..a0c483583a0fec8dd638458fc0964b727a789948 100644 --- a/KubeOS-Rust/proxy/Cargo.toml +++ b/KubeOS-Rust/proxy/Cargo.toml @@ -2,6 +2,8 @@ name = "proxy" version = "0.1.0" edition = "2021" +description = "KubeOS os-proxy" +license = "MulanPSL-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -15,7 +17,6 @@ serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" thiserror = "1.0.29" env_logger = "0.9.0" -tracing = "0.1.29" schemars = "=0.8.10" socket2 = "=0.4.9" log = "=0.4.15" @@ -23,7 +24,6 @@ thread_local = "=1.1.4" async-trait = "0.1" regex = "=1.7.3" chrono = { version = "0.4", default-features = false, features = ["std"] } -snafu = "0.7" h2 = "=0.3.16" tokio-retry = "0.3" reqwest = { version = "=0.11.10", default-features = false, features = [ "json" ] } diff --git a/KubeOS-Rust/proxy/src/controller/agentclient.rs b/KubeOS-Rust/proxy/src/controller/agentclient.rs new file mode 100644 index 0000000000000000000000000000000000000000..4d954ee8229ccf8da8fa8e74053724bbf0ac74d0 --- /dev/null +++ b/KubeOS-Rust/proxy/src/controller/agentclient.rs @@ -0,0 +1,173 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * KubeOS is licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +use cli::{ + client::Client, + method::{ + callable_method::RpcMethod, configure::ConfigureMethod, + prepare_upgrade::PrepareUpgradeMethod, rollback::RollbackMethod, upgrade::UpgradeMethod, + }, +}; + +use agent_call::AgentCallClient; +use agent_error::Error; +use manager::api::{ + ConfigureRequest, KeyInfo as AgentKeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest, +}; +use std::collections::HashMap; +use std::path::Path; + +pub struct UpgradeInfo { + pub version: String, + pub image_type: String, + pub check_sum: String, + pub container_image: String, +} + +pub struct ConfigInfo { + pub configs: Vec, +} + +pub struct Sysconfig { + pub model: String, + pub config_path: String, + pub contents: HashMap, +} + +pub struct KeyInfo { + pub value: String, + pub operation: String, +} + +pub trait AgentMethod { + fn prepare_upgrade_method( + &self, + upgrade_info: UpgradeInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error>; + fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; + fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; + fn configure_method( + &self, + config_info: ConfigInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error>; +} + +pub mod agent_call { + use super::{Client, Error, RpcMethod}; + #[derive(Default)] + pub struct AgentCallClient {} + + impl AgentCallClient { + pub fn call_agent( + &self, + client: &Client, + method: T, + ) -> Result<(), Error> { + match method.call(client) { + Ok(_resp) => Ok(()), + Err(e) => Err(Error::AgentError { source: e }), + } + } + } +} + +pub struct AgentClient { + pub agent_client: Client, +} + +impl AgentClient { + pub fn new>(socket_path: P) -> Self { + AgentClient { + agent_client: Client::new(socket_path), + } + } +} + +impl AgentMethod for AgentClient { + fn prepare_upgrade_method( + &self, + upgrade_info: UpgradeInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error> { + let upgrade_request = UpgradeRequest { + version: upgrade_info.version, + image_type: upgrade_info.image_type, + check_sum: upgrade_info.check_sum, + container_image: upgrade_info.container_image, + }; + match agent_call.call_agent( + &self.agent_client, + PrepareUpgradeMethod::new(upgrade_request), + ) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } + + fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { + match agent_call.call_agent(&self.agent_client, UpgradeMethod::new()) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } + + fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { + match agent_call.call_agent(&self.agent_client, RollbackMethod::new()) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } + + fn configure_method( + &self, + config_info: ConfigInfo, + agent_call: AgentCallClient, + ) -> Result<(), Error> { + let mut agent_configs: Vec = Vec::new(); + for config in config_info.configs { + let mut contents_tmp: HashMap = HashMap::new(); + for (key, key_info) in config.contents.iter() { + contents_tmp.insert( + key.to_string(), + AgentKeyInfo { + value: key_info.value.clone(), + operation: key_info.operation.clone(), + }, + ); + } + agent_configs.push(AgentSysconfig { + model: config.model, + config_path: config.config_path, + contents: contents_tmp, + }) + } + let config_request = ConfigureRequest { + configs: agent_configs, + }; + match agent_call.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) { + Ok(_resp) => Ok(()), + Err(e) => Err(e), + } + } +} + +pub mod agent_error { + use thiserror::Error; + + #[derive(Error, Debug)] + pub enum Error { + #[error("{source}")] + AgentError { source: anyhow::Error }, + } +} diff --git a/KubeOS-Rust/proxy/src/controller/controller.rs b/KubeOS-Rust/proxy/src/controller/controller.rs index 6410cce11a228e96076e8f1f92daf0d9147f76a3..da2e031e08baf29e5a84ff5dbd514f191aaf3e53 100644 --- a/KubeOS-Rust/proxy/src/controller/controller.rs +++ b/KubeOS-Rust/proxy/src/controller/controller.rs @@ -10,25 +10,20 @@ * See the Mulan PSL v2 for more details. */ -use super::crd::{Content, OSInstance, OS}; -use super::drain::drain_os; -use super::utils::{check_version, get_config_version, ConfigOperation, ConfigType}; -use super::values::{ - LABEL_UPGRADING, NODE_STATUS_CONFIG, NODE_STATUS_IDLE, OPERATION_TYPE_ROLLBACK, - OPERATION_TYPE_UPGRADE, REQUEUE_ERROR, REQUEUE_NORMAL, -}; use super::{ - apiclient::{ApplyApi, ControllerClient}, - crd::Configs, -}; -use anyhow::Result; -use cli::{ - client::Client as AgentClient, - method::{ - callable_method::RpcMethod, configure::ConfigureMethod, - prepare_upgrade::PrepareUpgradeMethod, rollback::RollbackMethod, upgrade::UpgradeMethod, + agentclient::{ + agent_call::AgentCallClient, AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo, + }, + apiclient::ApplyApi, + crd::{Configs, Content, OSInstance, OS}, + drain::drain_os, + utils::{check_version, get_config_version, ConfigOperation, ConfigType}, + values::{ + LABEL_UPGRADING, NODE_STATUS_CONFIG, NODE_STATUS_IDLE, OPERATION_TYPE_ROLLBACK, + OPERATION_TYPE_UPGRADE, REQUEUE_ERROR, REQUEUE_NORMAL, }, }; +use anyhow::Result; use k8s_openapi::api::core::v1::Node; use kube::{ api::{Api, PostParams}, @@ -37,14 +32,13 @@ use kube::{ Client, ResourceExt, }; use log::{debug, error, info}; -use manager::api::{ConfigureRequest, KeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest}; use reconciler_error::Error; use std::collections::HashMap; use std::env; pub async fn reconcile( os: OS, - ctx: Context>, + ctx: Context>, ) -> Result { debug!("start reconcile"); let proxy_controller = ctx.get_ref(); @@ -159,7 +153,7 @@ pub async fn reconcile( pub fn error_policy( error: &Error, - _ctx: Context>, + _ctx: Context>, ) -> ReconcilerAction { error!("Reconciliation error:{}", error.to_string()); REQUEUE_ERROR @@ -169,14 +163,14 @@ struct ControllerResources { osinstance: OSInstance, node: Node, } -pub struct ProxyController { +pub struct ProxyController { k8s_client: Client, controller_client: T, - agent_client: AgentClient, + agent_client: U, } -impl ProxyController { - pub fn new(k8s_client: Client, controller_client: T, agent_client: AgentClient) -> Self { +impl ProxyController { + pub fn new(k8s_client: Client, controller_client: T, agent_client: U) -> Self { ProxyController { k8s_client, controller_client, @@ -185,7 +179,7 @@ impl ProxyController { } } -impl ProxyController { +impl ProxyController { async fn check_osi_exisit(&self, namespace: &str, node_name: &str) -> Result<(), Error> { let osi_api: Api = Api::namespaced(self.k8s_client.clone(), namespace); match osi_api.get(node_name).await { @@ -312,10 +306,13 @@ impl ProxyController { if config_info.need_config { match config_info.configs.and_then(convert_to_agent_config) { Some(agent_configs) => { - let config_request = ConfigureRequest { - configs: agent_configs, - }; - match ConfigureMethod::new(config_request).call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self.agent_client.configure_method( + ConfigInfo { + configs: agent_configs, + }, + agent_call_client, + ) { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -333,16 +330,19 @@ impl ProxyController { async fn upgrade_node(&self, os_cr: &OS, node: &Node) -> Result<(), Error> { debug!("start upgrade node"); - match os_cr.spec.opstype.as_str() { OPERATION_TYPE_UPGRADE => { - let upgrade_request = UpgradeRequest { + let upgrade_info = UpgradeInfo { version: os_cr.spec.osversion.clone(), image_type: os_cr.spec.imagetype.clone(), check_sum: os_cr.spec.checksum.clone(), container_image: os_cr.spec.containerimage.clone(), }; - match PrepareUpgradeMethod::new(upgrade_request).call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self + .agent_client + .prepare_upgrade_method(upgrade_info, agent_call_client) + { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -350,7 +350,8 @@ impl ProxyController { } self.evict_node(&node.name(), os_cr.spec.evictpodforce) .await?; - match UpgradeMethod::new().call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self.agent_client.upgrade_method(agent_call_client) { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -360,7 +361,8 @@ impl ProxyController { OPERATION_TYPE_ROLLBACK => { self.evict_node(&node.name(), os_cr.spec.evictpodforce) .await?; - match RollbackMethod::new().call(&self.agent_client) { + let agent_call_client = AgentCallClient::default(); + match self.agent_client.rollback_method(agent_call_client) { Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); @@ -395,7 +397,6 @@ impl ProxyController { async fn drain_node(&self, node_name: &str, force: bool) -> Result<(), Error> { use crate::controller::drain::error::DrainError::*; match drain_os(&self.k8s_client.clone(), node_name, force).await { - Err(FindTargetPods { source, .. }) => Err(Error::KubeError { source: source }), Err(DeletePodsError { errors, .. }) => Err(Error::DrainNodeError { value: errors.join("; "), }), @@ -404,13 +405,13 @@ impl ProxyController { } } -fn convert_to_agent_config(configs: Configs) -> Option> { - let mut agent_configs: Vec = Vec::new(); +fn convert_to_agent_config(configs: Configs) -> Option> { + let mut agent_configs: Vec = Vec::new(); if let Some(config_list) = configs.configs { for config in config_list.into_iter() { match config.contents.and_then(convert_to_config_hashmap) { Some(contents_tmp) => { - let config_tmp = AgentSysconfig { + let config_tmp = Sysconfig { model: config.model.unwrap_or_default(), config_path: config.configpath.unwrap_or_default(), contents: contents_tmp, @@ -445,6 +446,7 @@ fn convert_to_config_hashmap(contents: Vec) -> Option = match pods_api.list(&lp).await { Ok(pods @ ObjectList { .. }) => pods, Err(err) => { - return Err(DrainError::FindTargetPods { + return Err(GetPodListsError { source: err, node_name: node_name.to_string(), }); @@ -85,7 +87,7 @@ async fn get_pods_deleted( } } if filterd_err.len() > 0 { - return Err(error::DrainError::DeletePodsError { + return Err(DeletePodsError { errors: filterd_err, }); } @@ -114,16 +116,27 @@ async fn evict_pod( match eviction_result { Ok(_) => { pod.name(); - event!(Level::INFO, "Successfully evicted Pod '{}'", pod.name_any()); + debug!("Successfully evicted Pod '{}'", pod.name_any()); break; } Err(kube::Error::Api(e)) => { let status_code = StatusCode::from_u16(e.code); match status_code { - Ok(StatusCode::TOO_MANY_REQUESTS) => { - event!( - Level::ERROR, - "Too many requests when creating Eviction for Pod '{}': '{}'. This is likely due to respecting a Pod Disruption Budget. Retrying in {:.2}s.", + Ok(StatusCode::FORBIDDEN) => { + return Err(EvictionErrorNoRetry { + source: kube::Error::Api(e.clone()), + pod_name: pod.name_any(), + }); + } + Ok(StatusCode::NOT_FOUND) => { + return Err(EvictionErrorNoRetry { + source: kube::Error::Api(e.clone()), + pod_name: pod.name_any(), + }); + } + Ok(StatusCode::INTERNAL_SERVER_ERROR) => { + error!( + "Evict pod {} reported error: '{}' and will retry in {:.2}s. This error maybe is due to misconfigured PodDisruptionBudgets.", pod.name_any(), e, EVERY_EVICTION_RETRY.as_secs_f64() @@ -131,10 +144,8 @@ async fn evict_pod( sleep(EVERY_EVICTION_RETRY).await; continue; } - Ok(StatusCode::INTERNAL_SERVER_ERROR) => { - event!( - Level::ERROR, - "Error when evicting Pod '{}': '{}'. Check for misconfigured PodDisruptionBudgets. Retrying in {:.2}s.", + Ok(StatusCode::TOO_MANY_REQUESTS) => { + error!("Evict pod {} reported error: '{}' and will retry in {:.2}s. This error maybe is due to PodDisruptionBugets.", pod.name_any(), e, EVERY_EVICTION_RETRY.as_secs_f64() @@ -142,37 +153,24 @@ async fn evict_pod( sleep(EVERY_EVICTION_RETRY).await; continue; } - Ok(StatusCode::NOT_FOUND) => { - return Err(error::EvictionError::NonRetriableEviction { - source: kube::Error::Api(e.clone()), - pod_name: pod.name_any(), - }); - } - Ok(StatusCode::FORBIDDEN) => { - return Err(error::EvictionError::NonRetriableEviction { - source: kube::Error::Api(e.clone()), - pod_name: pod.name_any(), - }); - } Ok(_) => { - event!( - Level::ERROR, - "Error when evicting Pod '{}': '{}'.", + error!( + "Evict pod {} reported error: '{}'.", pod.name_any(), e ); - return Err(error::EvictionError::RetriableEviction { + return Err(EvictionErrorRetry { source: kube::Error::Api(e.clone()), pod_name: pod.name_any(), }); } Err(_) => { - event!( - Level::ERROR, - "Received invalid response code from Kubernetes API: '{}'", + error!( + "Evict pod {} reported error: '{}'.Received invalid response code from Kubernetes API", + pod.name_any(), e ); - return Err(error::EvictionError::RetriableEviction { + return Err(EvictionErrorRetry { source: kube::Error::Api(e.clone()), pod_name: pod.name_any(), }); @@ -180,8 +178,8 @@ async fn evict_pod( } } Err(e) => { - event!(Level::ERROR, "Eviction failed: '{}'. Retrying...", e); - return Err(error::EvictionError::RetriableEviction { + error!("Evict pod {} reported error: '{}' and will retry", pod.name_any(),e); + return Err(EvictionErrorRetry { source: e, pod_name: pod.name_any(), }); @@ -198,37 +196,28 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e let start_time = Instant::now(); let pod_api: Api = get_pod_api_with_namespace(k8s_client, pod); + let response_error_not_found: u16 = 404; loop { match pod_api.get(&pod.name_any()).await { - Err(kube::Error::Api(e)) if e.code == 404 => { - event!(Level::INFO, "Pod {} deleted.", pod.name_any()); - break; - } - Ok(p) if p.uid() != pod.uid() => { - let name = p - .metadata - .name - .clone() - .or_else(|| p.metadata.generate_name.clone()) - .unwrap_or_default(); - event!(Level::INFO, "Pod {} deleted.", name); + let name = (&p).name_any(); + info!("Pod {} deleted.", name); break; } - Ok(_) => { - event!( - Level::DEBUG, - "Pod '{}' not yet deleted. Waiting {}s.", + info!( + "Pod '{}' is not yet deleted. Waiting {}s.", pod.name_any(), EVERY_DELETION_CHECK.as_secs_f64() ); } - + Err(kube::Error::Api(e)) if e.code == response_error_not_found => { + info!("Pod {} is deleted.", pod.name_any()); + break; + } Err(e) => { - event!( - Level::ERROR, - "Could not determine if Pod '{}' has been deleted: '{}'. Waiting {}s.", + error!( + "Get pod {} reported error: '{}', whether pod is deleted cannot be determined, waiting {}s.", pod.name_any(), e, EVERY_DELETION_CHECK.as_secs_f64() @@ -236,7 +225,7 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e } } if start_time.elapsed() > TIMEOUT { - return Err(error::DrainError::WaitForDeletion { + return Err(WaitDeletionError { pod_name: pod.name_any(), max_wait: TIMEOUT, }); @@ -246,12 +235,14 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e } Ok(()) } + fn get_pod_api_with_namespace(client: &kube::Client, pod: &Pod) -> Api { match pod.metadata.namespace.as_ref() { Some(namespace) => Api::namespaced(client.clone(), namespace), None => Api::default_namespaced(client.clone()), } } + trait NameAny { fn name_any(self: &Self) -> String; } @@ -480,7 +471,7 @@ impl PodFilter for CombinedFilter { fn filter(self: &Self, pod: &Pod) -> Box { let mut filter_res = self.deleted_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -489,7 +480,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.daemon_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -498,7 +489,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.mirror_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -507,7 +498,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.local_storage_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -516,7 +507,7 @@ impl PodFilter for CombinedFilter { } filter_res = self.unreplicated_filter.filter(pod); if !filter_res.result { - event!(Level::INFO, filter_res.desc); + info!("{}", filter_res.desc); return Box::new(FilterResult { result: filter_res.result, desc: filter_res.desc.clone(), @@ -592,7 +583,7 @@ impl tokio_retry::Condition for ErrorHandleStrategy { match self { Self::TolerateStrategy => false, Self::RetryStrategy => { - if let error::EvictionError::RetriableEviction { .. } = error { + if let error::EvictionError::EvictionErrorRetry { .. } = error { true } else { false @@ -603,46 +594,36 @@ impl tokio_retry::Condition for ErrorHandleStrategy { } pub mod error { - use snafu::Snafu; + use thiserror::Error; use tokio::time::Duration; - #[derive(Debug, Snafu)] - #[snafu(visibility(pub))] + #[derive(Debug, Error)] pub enum DrainError { - #[snafu(display("Unable to find drainable Pods for Node '{}': '{}'", node_name, source))] - FindTargetPods { + #[error("Get node {} pods list error reported: {}", node_name, source)] + GetPodListsError { source: kube::Error, node_name: String, }, - #[snafu( - display( - "Pod '{}' was not deleted in the time allocated ({:.2}s).", - pod_name, - max_wait.as_secs_f64() - ) - )] - WaitForDeletion { + #[error("Pod '{}' was not deleted in the time allocated ({:.2}s).",pod_name,max_wait.as_secs_f64())] + WaitDeletionError { pod_name: String, max_wait: Duration, }, - DeletePodsError { - errors: Vec, - }, + #[error("")] + DeletePodsError { errors: Vec }, } - #[derive(Debug, Snafu)] - #[snafu(visibility(pub))] + #[derive(Debug, Error)] pub enum EvictionError { - #[snafu(display("Unable to evict Pod '{}': '{}'", pod_name, source))] - RetriableEviction { + #[error("Evict Pod {} error: '{}'", pod_name, source)] + EvictionErrorRetry { source: kube::Error, pod_name: String, }, - #[snafu(display("Unable to evict Pod '{}': '{}'", pod_name, source))] - /// A fatal error occurred while attempting to evict a Pod. This will not be retried. - NonRetriableEviction { + #[error("Evict Pod {} error: '{}'", pod_name, source)] + EvictionErrorNoRetry { source: kube::Error, pod_name: String, }, diff --git a/KubeOS-Rust/proxy/src/controller/mod.rs b/KubeOS-Rust/proxy/src/controller/mod.rs index e2e064939630570340fd44af4eb8a3bb064f33ba..090febc6ded4de68755a1fb2948ea7233cf32e73 100644 --- a/KubeOS-Rust/proxy/src/controller/mod.rs +++ b/KubeOS-Rust/proxy/src/controller/mod.rs @@ -10,6 +10,7 @@ * See the Mulan PSL v2 for more details. */ +mod agentclient; mod apiclient; mod controller; mod crd; @@ -17,6 +18,7 @@ mod drain; mod utils; mod values; +pub use agentclient::AgentClient; pub use apiclient::ControllerClient; pub use controller::{error_policy, reconcile, reconciler_error::Error, ProxyController}; pub use crd::OS; diff --git a/KubeOS-Rust/proxy/src/main.rs b/KubeOS-Rust/proxy/src/main.rs index 43610a64ac1422a13229b122d66758045737715a..ef53117dde8ed9a61654b60f5a102e48bea2db67 100644 --- a/KubeOS-Rust/proxy/src/main.rs +++ b/KubeOS-Rust/proxy/src/main.rs @@ -20,8 +20,9 @@ use kube::{ }; use log::{error, info}; mod controller; -use cli::client::Client as AgentClient; -use controller::{error_policy, reconcile, ControllerClient, ProxyController, OS, SOCK_PATH}; +use controller::{ + error_policy, reconcile, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH, +}; const PROXY_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); #[tokio::main]