diff --git a/Cargo.toml b/Cargo.toml index e251f16da7f5913050db4b4c104e35d7feba5def..701c1c3373c7ffe8e57619e359af02a6bc622026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,10 @@ keywords = ["apiserver", "kubernetes", "k8s", "fleet"] repository = "https://gitee.com/iscas-system/apiserver" readme = "README.md" +[[bin]] +name = "event-client-example" +path = "examples/event_client_example/src/main.rs" + [dependencies] #feventbus = "0.3.0" feventbus = { git = "https://gitee.com/iscas-system/eventbus.git" } diff --git a/examples/event_client_example/Cargo.toml b/examples/event_client_example/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..4a4e98117b1ff1bf8e8355128bed651cdd2ae78e --- /dev/null +++ b/examples/event_client_example/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "event_client_example" +version = "0.1.0" +edition = "2021" + +[dependencies] +fleet_apiserver = { path = "../../../apiserver" } diff --git a/examples/event_client_example/src/main.rs b/examples/event_client_example/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..77d653254dd9f3695ee94149f9a48e48ebb0e437 --- /dev/null +++ b/examples/event_client_example/src/main.rs @@ -0,0 +1,210 @@ +use std::sync::Arc; +use std::time; +use env_logger::{Builder, Target}; +use feventbus::impls::nats::nats::NatsCli; +use feventbus::traits::controller::EventBus; +use fleetmod::pod::Pod; +use serde_json::{json, Value}; +use fleet_apiserver::APIServerEventClient; +use fleet_apiserver::cores::services::{APIServerResult, APIServerServiceParamsBuilder, APIServerStatusCode}; + +const DATABASE_URL: &str = "sqlite://./test-database.sqlite"; +const ADDRESS: &str = "localhost:8000"; + +fn setup_logger() { + let mut builder = Builder::from_default_env(); + builder.target(Target::Stdout); + builder.init(); +} + +#[tokio::main] +async fn main() { + setup_logger(); + + // 启动apiserver + tokio::spawn(async move { + fleet_apiserver::start_server(DATABASE_URL, ADDRESS).await.unwrap(); + }); + + // 等待服务器启动 todo! 未来添加阻塞等待服务启动完毕的方法 + tokio::time::sleep(time::Duration::from_secs(1)).await; + + // 新建NatsClient + let nats_client = NatsCli::new().await.unwrap(); + // 新建EventClient. 30s表示客户端设置的超时时间 + let event_cli = APIServerEventClient::new(Arc::new(nats_client), Some(time::Duration::from_secs(30))); + + let pod_name = "test-create-pod".to_string(); + let namespace = "ns1".to_string(); + let version = "v1".to_string(); + let plural = "pods".to_string(); + let kind = "Pod".to_string(); + // 创建测试用pod + let pod = mock_pod(pod_name.clone(), Some(namespace.clone())); + + // with_name_params: 相当于url: /api/v1/namespaces/ns1/pods/test-create-pod + let with_name_params = APIServerServiceParamsBuilder::new() + .namespace(namespace.clone()) + .version(version.clone()) + .kind(kind.clone()) // .plural(plural.clone()) 可以传入plural或kind,选一个即可 + .name(pod_name.clone()).build().unwrap(); + + // without_name_params: 相当于url: /api/v1/namespaces/ns1/pods + let without_name_params = APIServerServiceParamsBuilder::new() + .namespace(namespace.clone()) + .version(version.clone()) + .plural(plural.clone()) // .kind(kind.clone()) 可以传入plural或kind,选一个即可 + .build().unwrap(); + + // APIServerServiceParamsBuilder 必须传入的参数有version,plural或kind,其他参数name,namespace,group可选 + + // 在创建前,先删除可能存在的pod + let res: APIServerResult = event_cli.delete(with_name_params.clone()).await; + log::info!("delete before create result: {:?}", res); + + // 创建pod + let res = event_cli + .create_by_resource(pod.clone()) + .await; + // 也可以用下面的方式创建,但是需要传入without_name_params + // let res = event_cli + // .create(without_name_params, pod.clone()) + // .await; + log::info!("create by resource result: {:?}", res); + assert_eq!(pod, res.ok().unwrap()); + + // 测试获取单个pod:因为需要返回单个列表,所以需要指定具体pod名称,因此传入with_name_params + let res = event_cli + .get(with_name_params.clone(), Value::Null) // 第二个参数为Value类型的query,后续可支持分页等查询条件,目前传空即可 + .await; + log::info!("get one pod result: {:?}", res); + assert!(res.is_ok()); + assert_eq!(pod, res.ok().unwrap()); + + // 测试获取pod列表:因为需要返回列表,所以不要指定具体的pod名称,因此传入without_name_params + let res = event_cli + .get(without_name_params.clone(), Value::Null)// 第二个参数为Value类型的query,后续可支持分页等查询条件,目前传空即可 + .await; + log::info!("list pods result {:?}", res); + assert!(res.is_ok()); + let pods_from_list: Vec = res.ok().unwrap(); + let found = pods_from_list.iter().any(|p| p.metadata.name == pod_name); + assert!(found); + let pod_from_list = pods_from_list.iter().find(|p| p.metadata.name == pod_name).unwrap(); + assert_eq!(pod, *pod_from_list); + + // 测试patch功能:因为需要指定具体的pod名称,所以传with_name_params + let res = event_cli + .patch(with_name_params.clone(), json!({"metadata": {"labels": {"app": "my-app-patched", "patch-new-key": "patch-new-value"}}})).await; + log::info!("patch result: {:?}", res); + assert!(res.is_ok()); + let mut pod_after_pad_should_be = pod.clone(); + pod_after_pad_should_be.metadata.labels.as_mut().unwrap().insert("app".to_string(), "my-app-patched".to_string()); + pod_after_pad_should_be.metadata.labels.as_mut().unwrap().insert("patch-new-key".to_string(), "patch-new-value".to_string()); + assert_eq!(pod_after_pad_should_be, res.ok().unwrap()); + + // 测试删除功能:因为需要指定具体的pod名称,所以传with_name_params + let res = event_cli + .delete(with_name_params.clone()) + .await; + log::info!("delete result: {:?}", res); + assert_eq!(pod_after_pad_should_be, res.ok().unwrap()); + + // 测试是否删除成功 + let res: APIServerResult = event_cli + .get(with_name_params.clone(), Value::Null) + .await; + log::info!("get result after deletion: {:?}", res); + assert!(res.is_err()); + assert_eq!(res.err().unwrap().status_code, APIServerStatusCode::NotFound); +} + +fn mock_pod(name: String, namespace: Option) -> Pod { + let pod_yaml = r#" +apiVersion: v1 +kind: Pod +metadata: + name: example-pod + namespace: default + labels: + app: my-app + creationTimestamp: 2024-12-10T08:00:00+08:00 +spec: + nodeName: my-node + hostname: my-hostname + hostAliases: + - ip: "127.0.0.1" + hostnames: + - "my-local-host" + - "another-host" + containers: + - name: example-container + image: example-image:latest + imagePullPolicy: IfNotPresent + command: ["nginx"] + args: ["-g", "daemon off;"] + workingDir: /usr/share/nginx/html + ports: + - name: http + containerPort: 80 + protocol: TCP + - name: https + containerPort: 443 + protocol: TCP + env: + - name: ENV_MODE + value: production + - name: ENV_VERSION + valueFrom: + fieldRef: + fieldPath: metadata.name + resources: + requests: + memory: "128Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "1" + volumeMounts: + - name: config-volume + mountPath: /etc/nginx/conf.d + readOnly: true + - name: data-volume + mountPath: /usr/share/nginx/html + - name: data-host-volume + mountPath: /usr/share/nginx/a.txt + volumeDevices: + - name: device-volume + devicePath: /dev/sdb + securityContext: + runAsUser: 1000 + runAsGroup: 1000 + readOnlyRootFilesystem: true + allowPrivilegeEscalation: true + privileged: true + volumes: + - name: example-volume + configMap: + name: nginx-config + - name: data-volume + emptyDir: {} + - name: device-volume + hostPath: + path: /dev/sdb + type: Directory + - name: device-volume + hostPath: + path: /dev/sdb + type: Directory +status: + phase: Pending + message: begin handle + podIP: 10.42.0.9 + podIPs: + - ip: 10.42.0.9 +"#; + let mut pod: Pod = serde_yaml::from_str(pod_yaml).expect("Failed to parse YAML"); + pod.metadata.name = name.to_string(); + pod.metadata.namespace = namespace.map(|s| s.to_string()); + pod +} \ No newline at end of file diff --git a/src/cores/events.rs b/src/cores/events.rs index 5b06c08558a4854f5e1b2673a27283fe578fe86e..24eb59c786104f9777c55ea74a22f3724f33f58c 100644 --- a/src/cores/events.rs +++ b/src/cores/events.rs @@ -448,6 +448,8 @@ impl APIServerEventClient { where T: Serialize + DeserializeOwned + Clone, { + let mut params = params.clone(); + params.name = None; // 创建资源时在路径参数中不要包含name self.write_event(P2PEventTopic::Create, params, Self::data_to_value(data)?).await } diff --git a/src/cores/services.rs b/src/cores/services.rs index 9e671c57356a5d40170cb53ecdec24c24ccd9a8a..058fdbdfc05841edb91306ff329eaaaef0a88068 100644 --- a/src/cores/services.rs +++ b/src/cores/services.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::sync::{Arc, LazyLock, Mutex}; +use actix_web::http::Method; use fleetmod::FleetResource; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -247,6 +248,7 @@ impl APIServerService { data: Value, app_state: Arc, ) -> APIServerResult { + Self::log_request("create_resource", ¶ms); let ServiceCtx { mut db_conn, watch_event_publisher, api_version_str, plural, namespace, .. } = Self::prepare_ctx(params, app_state, Some(false))?; if plural == "crds" { @@ -318,6 +320,7 @@ impl APIServerService { params: APIServerServiceParams, app_state: Arc, ) -> APIServerResult { + Self::log_request("delete_resource", ¶ms); let ServiceCtx { mut db_conn, watch_event_publisher, api_version_str, plural, namespace, name, .. } = Self::prepare_ctx(params, app_state, Some(true))?; let name = name.unwrap(); @@ -364,6 +367,7 @@ impl APIServerService { data: Value, app_state: Arc, ) -> APIServerResult { + Self::log_request("update_resource", ¶ms); let ServiceCtx { mut db_conn, watch_event_publisher, api_version_str, plural, namespace, name, .. } = Self::prepare_ctx(params, app_state, Some(true))?; let name = name.unwrap(); @@ -408,6 +412,7 @@ impl APIServerService { _query: Value, app_state: Arc, ) -> APIServerResult { + Self::log_request("get_resource", ¶ms); let ServiceCtx { mut db_conn, api_version_str, plural, namespace, name, .. } = Self::prepare_ctx(params, app_state, None)?; // 检查 metadata 是否存在 Self::check_metadata_exists(db_conn.get_mut(), plural.as_str(), api_version_str.as_str(), namespace.is_some()) @@ -451,6 +456,7 @@ impl APIServerService { data: Value, app_state: Arc, ) -> APIServerResult { + Self::log_request("patch_resource", ¶ms); let ServiceCtx { mut db_conn, watch_event_publisher, api_version_str, plural, namespace, name, .. } = Self::prepare_ctx(params, app_state, Some(true))?; let name = name.unwrap(); Self::check_metadata_exists(db_conn.get_mut(), plural.as_str(), api_version_str.as_str(), namespace.is_some()) @@ -501,6 +507,10 @@ struct ServiceCtx { } impl APIServerService { + fn log_request(method: &str, params: &APIServerServiceParams) { + log::debug!("APIServerService收到请求,方法:{:?}, 参数为:{:?}", method, params); + } + fn prepare_ctx(params: APIServerServiceParams, app_state: Arc, name_required: Option) -> APIServerResult { // 获取 path 参数 let APIServerServiceParams { group, version, plural_or_kind, namespace, name } = params; diff --git a/src/lib.rs b/src/lib.rs index 5368cf181328fc044a481465152cd1a37bf561c8..4cfa5af4962830f11fb13f6d012797ea343f97cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,3 +10,4 @@ pub mod schema; pub mod db; pub use cores::{prepare_app_state, start_server}; +pub use cores::events::APIServerEventClient; diff --git a/tests/server_tests.rs b/tests/server_tests.rs index 0e0abce28ee591a430763314e8da596bafd8cf96..de2e5d8f81b70b16b721b082510b2b5f9bff4e54 100644 --- a/tests/server_tests.rs +++ b/tests/server_tests.rs @@ -1,27 +1,18 @@ #[cfg(test)] mod tests { use super::*; - use actix_service::ServiceFactory; - use actix_web::dev::ServiceRequest; use actix_web::{test, web, App, Error, HttpResponse}; use anyhow::Result; - use dotenv::dotenv; use env_logger::{Builder, Target}; - use feventbus::traits::controller::EventBus; use fleet_apiserver::cores::apiserver::{AppState, K8sStylePathParams, K8sStyleRoute}; use fleet_apiserver::cores::events::{APIServerEventClient, P2PEventServer}; - use fleet_apiserver::cores::handlers::{APIServerResponse, Handler}; - use fleet_apiserver::cores::services::{APIServerResult, APIServerServiceParams, APIServerServiceParamsBuilder, APIServerStatusCode}; - use fleet_apiserver::{prepare_app_state, start_server}; + use fleet_apiserver::cores::handlers::APIServerResponse; + use fleet_apiserver::cores::services::{APIServerResult, APIServerServiceParams, APIServerStatusCode}; + use fleet_apiserver::prepare_app_state; use fleetmod::pod::Pod; - use log::log; - use once_cell::sync::Lazy; use serde_json::{json, Value}; use serial_test::serial; - use std::env; - use std::sync::atomic::AtomicBool; use std::sync::Arc; - use std::thread::spawn; use tokio::time::sleep; @@ -161,14 +152,14 @@ mod tests { let res = event_cli .create_by_resource(pod.clone()) .await; - log::info!("create res {:?}", res); + log::info!("event cli create res {:?}", res); assert_eq!(pod, res.ok().unwrap()); // test get one let res = event_cli .get(with_name_params.clone(), Value::Null) .await; - log::info!("getone res {:?}", res); + log::info!("event cli getone res {:?}", res); assert!(res.is_ok()); assert_eq!(pod, res.ok().unwrap()); @@ -176,7 +167,7 @@ mod tests { let res = event_cli .get(without_name_params.clone(), Value::Null) .await; - log::info!("list res {:?}", res); + log::info!("event cli list res {:?}", res); assert!(res.is_ok()); let pods_from_list: Vec = res.ok().unwrap(); let found = pods_from_list.iter().any(|p| p.metadata.name == name); @@ -190,7 +181,7 @@ mod tests { target_pod.metadata.labels.as_mut().unwrap().insert("patch-new-key".to_string(), "patch-new-value".to_string()); let res = event_cli .patch(with_name_params.clone(), json!({"metadata": {"labels": {"app": "my-app-patched", "patch-new-key": "patch-new-value"}}})).await; - log::info!("patch res {:?}", res); + log::info!("event cli patch res {:?}", res); assert!(res.is_ok()); assert_eq!(target_pod, res.ok().unwrap()); @@ -198,12 +189,12 @@ mod tests { let res = event_cli .delete(with_name_params.clone()) .await; - log::info!("delete res {:?}", res); + log::info!("event cli delete res {:?}", res); assert_eq!(target_pod, res.ok().unwrap()); let res: APIServerResult = event_cli .get(with_name_params.clone(), Value::Null) .await; - log::info!("get res {:?}", res); + log::info!("event cli get res {:?}", res); assert!(res.is_err()); assert_eq!(res.err().unwrap().status_code, APIServerStatusCode::NotFound); }