From 943b525d7525a87aa714acc12e50468a1db1df64 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Thu, 25 May 2023 14:19:53 +0800 Subject: [PATCH] Support rotate cluster key in control server --- README.md | 2 +- config/server.toml | 2 +- deploy/config.yaml | 2 +- ...101_remove-clusterkey-expire-time.down.sql | 2 + ...90101_remove-clusterkey-expire-time.up.sql | 2 + src/application/datakey.rs | 68 +++++++++++--- src/application/user.rs | 4 +- src/client_entrypoint.rs | 4 - src/control_admin_entrypoint.rs | 5 +- src/control_server_entrypoint.rs | 2 +- src/domain/clusterkey/entity.rs | 14 +-- src/domain/datakey/repository.rs | 1 + src/domain/encryption_engine.rs | 1 + src/domain/sign_service.rs | 1 + src/infra/database/model/clusterkey/dto.rs | 3 - .../database/model/clusterkey/repository.rs | 3 +- .../database/model/datakey/repository.rs | 12 +++ src/infra/encryption/engine.rs | 94 +++++++++++-------- src/infra/sign_backend/memory/backend.rs | 4 + .../handler/control/datakey_handler.rs | 2 +- src/presentation/server/control_server.rs | 8 +- src/presentation/server/data_server.rs | 15 +-- 22 files changed, 165 insertions(+), 86 deletions(-) create mode 100644 migrations/20230525090101_remove-clusterkey-expire-time.down.sql create mode 100644 migrations/20230525090101_remove-clusterkey-expire-time.up.sql diff --git a/README.md b/README.md index 8ad5936..0b74753 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ username = "" password = "" domain="" [memory.encryption-engine] -keep_in_days = 180 +rotate_in_days = 90 algorithm = "aes256gsm" ``` diff --git a/config/server.toml b/config/server.toml index bdebca4..e1123ee 100644 --- a/config/server.toml +++ b/config/server.toml @@ -31,7 +31,7 @@ username = "freesky-edward" password = "" domain="freesky-edward" [memory.encryption-engine] -keep_in_days = 180 +rotate_in_days = 90 algorithm = "aes256gsm" [database] connection_url = "mysql://test:test@127.0.0.1:3306/signatrust" diff --git a/deploy/config.yaml b/deploy/config.yaml index 093ff21..32db1a0 100644 --- a/deploy/config.yaml +++ b/deploy/config.yaml @@ -43,7 +43,7 @@ data: [memory.kms-provider] type = "dummy" [memory.encryption-engine] - keep_in_days = 180 + rotate_in_days = 90 algorithm = "aes256gsm" [database] connection_url = "mysql://test:test@signatrust-database.signatrust-local-development.svc.cluster.local:3306/signatrust" diff --git a/migrations/20230525090101_remove-clusterkey-expire-time.down.sql b/migrations/20230525090101_remove-clusterkey-expire-time.down.sql new file mode 100644 index 0000000..192c370 --- /dev/null +++ b/migrations/20230525090101_remove-clusterkey-expire-time.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +ALTER TABLE cluster_key ADD expire_at DATETIME DEFAULT CURRENT_TIMESTAMP AFTER `create_at`; \ No newline at end of file diff --git a/migrations/20230525090101_remove-clusterkey-expire-time.up.sql b/migrations/20230525090101_remove-clusterkey-expire-time.up.sql new file mode 100644 index 0000000..efa2564 --- /dev/null +++ b/migrations/20230525090101_remove-clusterkey-expire-time.up.sql @@ -0,0 +1,2 @@ +-- Add up migration script here +ALTER TABLE cluster_key DROP expire_at; \ No newline at end of file diff --git a/src/application/datakey.rs b/src/application/datakey.rs index d044dbf..3e5cbba 100644 --- a/src/application/datakey.rs +++ b/src/application/datakey.rs @@ -23,6 +23,8 @@ use tokio::time::{Duration, self}; use crate::util::signer_container::DataKeyContainer; use std::collections::HashMap; +use std::sync::{Arc}; +use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use crate::presentation::handler::control::model::user::dto::UserIdentity; @@ -31,7 +33,8 @@ pub trait KeyService: Send + Sync{ async fn create(&self, data: &mut DataKey) -> Result; async fn import(&self, data: &mut DataKey) -> Result; async fn key_name_exists(&self, name: &String) -> Result; - async fn get_all(&self, user: Option, visibility: Visibility) -> Result>; + async fn get_by_visibility(&self, user: Option, visibility: Visibility) -> Result>; + async fn get_all(&self) -> Result>; async fn get_one(&self, user: Option, id: i32) -> Result; async fn request_delete(&self, user: UserIdentity, id: i32) -> Result<()>; async fn cancel_delete(&self, user: UserIdentity, id: i32) -> Result<()>; @@ -41,7 +44,8 @@ pub trait KeyService: Send + Sync{ async fn sign(&self, key_type: String, key_name: String, options: &HashMap, data: Vec) ->Result>; //method below used for maintenance - fn start_loop(&self, cancel_token: CancellationToken) -> Result<()>; + fn start_cache_cleanup_loop(&self, cancel_token: CancellationToken) -> Result<()>; + fn start_key_rotate_loop(&self, cancel_token: CancellationToken) -> Result<()>; } @@ -49,22 +53,22 @@ pub trait KeyService: Send + Sync{ pub struct DBKeyService where R: DatakeyRepository + Clone + 'static, - S: SignBackend + ?Sized + S: SignBackend + ?Sized + 'static { repository: R, - sign_service: Box, + sign_service: Arc>>, container: DataKeyContainer } impl DBKeyService where R: DatakeyRepository + Clone + 'static, - S: SignBackend + ?Sized + S: SignBackend + ?Sized + 'static { pub fn new(repository: R, sign_service: Box) -> Self { Self { repository: repository.clone(), - sign_service, + sign_service: Arc::new(RwLock::new(sign_service)), container: DataKeyContainer::new(repository) } } @@ -84,16 +88,16 @@ impl DBKeyService #[async_trait] impl KeyService for DBKeyService where - R: DatakeyRepository + Clone, - S: SignBackend + ?Sized + R: DatakeyRepository + Clone + 'static, + S: SignBackend + ?Sized + 'static { async fn create(&self, data: &mut DataKey) -> Result { - self.sign_service.generate_keys(data).await?; + self.sign_service.read().await.generate_keys(data).await?; self.repository.create(data.clone()).await } async fn import(&self, data: &mut DataKey) -> Result { - self.sign_service.validate_and_update(data).await?; + self.sign_service.read().await.validate_and_update(data).await?; self.repository.create(data.clone()).await } @@ -104,7 +108,7 @@ where Ok(false) } - async fn get_all(&self, user: Option, visibility: Visibility) -> Result> { + async fn get_by_visibility(&self, user: Option, visibility: Visibility) -> Result> { if visibility == Visibility::Private { if user.is_none() { return Err(Error::UnprivilegedError); @@ -114,6 +118,10 @@ where self.repository.get_public_keys().await } + async fn get_all(&self) -> Result> { + self.repository.get_all_keys().await + } + async fn get_one(&self, user: Option, id: i32) -> Result { self.get_and_check_permission(user, id).await } @@ -145,7 +153,7 @@ where async fn export_one(&self, user: Option, id: i32) -> Result { let mut key = self.get_and_check_permission(user, id).await?; - self.sign_service.decode_public_keys(&mut key).await?; + self.sign_service.read().await.decode_public_keys(&mut key).await?; Ok(key) } @@ -160,11 +168,11 @@ where } async fn sign(&self, key_type: String, key_name: String, options: &HashMap, data: Vec) -> Result> { - self.sign_service.sign( + self.sign_service.read().await.sign( &self.container.get_data_key(key_type, key_name).await?, data, options.clone()).await } - fn start_loop(&self, cancel_token: CancellationToken) -> Result<()> { + fn start_cache_cleanup_loop(&self, cancel_token: CancellationToken) -> Result<()> { let container = self.container.clone(); let mut interval = time::interval(Duration::from_secs(120)); tokio::spawn(async move { @@ -175,7 +183,37 @@ where container.clear_keys().await; } _ = cancel_token.cancelled() => { - info!("cancel token received, will quit datakey refresher"); + info!("cancel token received, will quit datakey clean loop"); + break; + } + } + } + + }); + Ok(()) + } + + fn start_key_rotate_loop(&self, cancel_token: CancellationToken) -> Result<()> { + let sign_service = self.sign_service.clone(); + let mut interval = time::interval(Duration::from_secs(60 * 60 * 2)); + tokio::spawn(async move { + loop { + tokio::select! { + _ = interval.tick() => { + info!("start to rotate the keys"); + match sign_service.write().await.rotate_key().await { + Ok(changed) => { + if changed { + info!("keys has been successfully rotated"); + } + } + Err(e) => { + error!("failed to rotate key: {}", e); + } + } + } + _ = cancel_token.cancelled() => { + info!("cancel token received, will quit key rotate loop"); break; } } diff --git a/src/application/user.rs b/src/application/user.rs index 9ab6d7d..139ee61 100644 --- a/src/application/user.rs +++ b/src/application/user.rs @@ -54,7 +54,7 @@ pub trait UserService: Send + Sync{ async fn validate_user(&self, code: &str) -> Result; async fn validate_token_and_email(&self, email: &str, token: &str) -> Result; //method below used for maintenance - fn start_loop(&self, cancel_token: CancellationToken) -> Result<()>; + fn start_cache_cleanup_loop(&self, cancel_token: CancellationToken) -> Result<()>; } #[derive(Deserialize, Debug)] @@ -239,7 +239,7 @@ where Ok(email == user.email) } - fn start_loop(&self, cancel_token: CancellationToken) -> Result<()> { + fn start_cache_cleanup_loop(&self, cancel_token: CancellationToken) -> Result<()> { let tokens = self.tokens.clone(); let mut interval = time::interval(Duration::from_secs(120)); tokio::spawn(async move { diff --git a/src/client_entrypoint.rs b/src/client_entrypoint.rs index 1db3696..61caab5 100644 --- a/src/client_entrypoint.rs +++ b/src/client_entrypoint.rs @@ -25,11 +25,7 @@ use crate::client::cmd::traits::SignCommand; mod util; mod client; -mod infra; mod domain; -mod application; -mod presentation; - #[macro_use] extern crate log; diff --git a/src/control_admin_entrypoint.rs b/src/control_admin_entrypoint.rs index 2e60769..6e42b6d 100644 --- a/src/control_admin_entrypoint.rs +++ b/src/control_admin_entrypoint.rs @@ -24,13 +24,13 @@ use crate::util::error::{Result}; use crate::util::sign::KeyType; use clap::{Parser, Subcommand}; use clap::{Args}; +use tokio_util::sync::CancellationToken; use crate::domain::datakey::entity::{DataKey}; use crate::domain::user::entity::User; use crate::presentation::handler::control::model::datakey::dto::{CreateDataKeyDTO}; use crate::presentation::handler::control::model::user::dto::UserIdentity; mod util; -mod client; mod infra; mod domain; mod application; @@ -151,7 +151,8 @@ async fn main() -> Result<()> { let path = app.config.unwrap_or(format!("{}/{}", env::current_dir().expect("current dir not found").display(), "config/server.toml")); let server_config = util::config::ServerConfig::new(path); - let control_server = presentation::server::control_server::ControlServer::new(server_config.config).await?; + //cancel token will never been used/canceled here cause it's only used for background threads in control server instance. + let control_server = presentation::server::control_server::ControlServer::new(server_config.config, CancellationToken::new()).await?; //handle commands match app.command { Some(Commands::CreateAdmin(create_admin)) => { diff --git a/src/control_server_entrypoint.rs b/src/control_server_entrypoint.rs index e86eed3..82a77f5 100644 --- a/src/control_server_entrypoint.rs +++ b/src/control_server_entrypoint.rs @@ -84,7 +84,7 @@ async fn main() -> Result<()> { //prepare config and logger env_logger::init(); //control server starts - let control_server = presentation::server::control_server::ControlServer::new(SERVERCONFIG.clone()).await?; + let control_server = presentation::server::control_server::ControlServer::new(SERVERCONFIG.clone(), CANCEL_TOKEN.clone()).await?; control_server.run().await?; Ok(()) } diff --git a/src/domain/clusterkey/entity.rs b/src/domain/clusterkey/entity.rs index df36ed7..62f3590 100644 --- a/src/domain/clusterkey/entity.rs +++ b/src/domain/clusterkey/entity.rs @@ -16,7 +16,7 @@ use crate::util::{error::Result, key}; use secstr::SecVec; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use std::fmt::{Display, Formatter}; use std::vec::Vec; @@ -29,7 +29,6 @@ pub struct ClusterKey { pub algorithm: String, pub identity: String, pub create_at: DateTime, - pub expire_at: DateTime, } impl Default for ClusterKey { @@ -40,7 +39,6 @@ impl Default for ClusterKey { algorithm: "".to_string(), identity: "".to_string(), create_at: Default::default(), - expire_at: Default::default(), } } } @@ -56,7 +54,7 @@ impl Display for ClusterKey { } impl ClusterKey { - pub fn new(data: Vec, algorithm: String, keep_in_days: i64) -> Result { + pub fn new(data: Vec, algorithm: String) -> Result { let now = Utc::now(); let identity = format!("{}-{}", algorithm, now.format("%d-%m-%Y")); Ok(ClusterKey { @@ -65,7 +63,6 @@ impl ClusterKey { algorithm, identity, create_at: now, - expire_at: now + Duration::days(keep_in_days), }) } } @@ -76,6 +73,7 @@ pub struct SecClusterKey { pub data: SecVec, pub algorithm: String, pub identity: String, + pub create_at: DateTime, } impl Default for SecClusterKey { @@ -86,6 +84,7 @@ impl Default for SecClusterKey { data: SecVec::new(vec![0, 0, 0, 0]), algorithm: "".to_string(), identity: "".to_string(), + create_at: Default::default(), } } } @@ -103,6 +102,7 @@ impl SecClusterKey { )), identity: cluster_key.identity, algorithm: cluster_key.algorithm, + create_at: cluster_key.create_at, }) } } @@ -111,8 +111,8 @@ impl Display for SecClusterKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "id: {}, data: ******, algorithm: {}", - self.id, self.algorithm + "id: {}, data: ******, algorithm: {} create_at: {}", + self.id, self.algorithm, self.create_at ) } } diff --git a/src/domain/datakey/repository.rs b/src/domain/datakey/repository.rs index ce6aff3..b060a7c 100644 --- a/src/domain/datakey/repository.rs +++ b/src/domain/datakey/repository.rs @@ -22,6 +22,7 @@ use crate::domain::datakey::entity::{KeyState}; #[async_trait] pub trait Repository: Send + Sync { async fn create(&self, data_key: DataKey) -> Result; + async fn get_all_keys(&self) -> Result>; async fn get_public_keys(&self) -> Result>; async fn get_private_keys(&self, user_id: i32) -> Result>; async fn get_by_id(&self, id: i32) -> Result; diff --git a/src/domain/encryption_engine.rs b/src/domain/encryption_engine.rs index 3d72bb9..1912629 100644 --- a/src/domain/encryption_engine.rs +++ b/src/domain/encryption_engine.rs @@ -20,6 +20,7 @@ use crate::util::error::Result; #[async_trait] pub trait EncryptionEngine: Send + Sync { async fn initialize(&mut self) -> Result<()>; + async fn rotate_key(&mut self) -> Result; async fn encode(&self, content: Vec) -> Result>; async fn decode(&self, content: Vec) -> Result>; } \ No newline at end of file diff --git a/src/domain/sign_service.rs b/src/domain/sign_service.rs index 4271baf..a6c9376 100644 --- a/src/domain/sign_service.rs +++ b/src/domain/sign_service.rs @@ -41,6 +41,7 @@ impl FromStr for SignBackendType { pub trait SignBackend: Send + Sync{ async fn validate_and_update(&self, data_key: &mut DataKey) -> Result<()>; async fn generate_keys(&self, data_key: &mut DataKey) -> Result<()>; + async fn rotate_key(&mut self) -> Result; async fn sign(&self, data_key: &DataKey, content: Vec, options: HashMap) -> Result>; async fn decode_public_keys(&self, data_key: &mut DataKey) -> Result<()>; } diff --git a/src/infra/database/model/clusterkey/dto.rs b/src/infra/database/model/clusterkey/dto.rs index 9db872b..e9a8864 100644 --- a/src/infra/database/model/clusterkey/dto.rs +++ b/src/infra/database/model/clusterkey/dto.rs @@ -27,7 +27,6 @@ pub(super) struct ClusterKeyDTO { pub algorithm: String, pub identity: String, pub create_at: chrono::DateTime, - pub expire_at: chrono::DateTime, } impl From for ClusterKey { @@ -38,7 +37,6 @@ impl From for ClusterKey { algorithm: dto.algorithm, identity: dto.identity, create_at: dto.create_at, - expire_at: dto.expire_at, } } } @@ -51,7 +49,6 @@ impl From for ClusterKeyDTO { algorithm: cluster_key.algorithm, identity: cluster_key.identity, create_at: cluster_key.create_at, - expire_at: cluster_key.expire_at, } } } diff --git a/src/infra/database/model/clusterkey/repository.rs b/src/infra/database/model/clusterkey/repository.rs index 7194f37..e34357e 100644 --- a/src/infra/database/model/clusterkey/repository.rs +++ b/src/infra/database/model/clusterkey/repository.rs @@ -39,12 +39,11 @@ impl ClusterKeyRepository { impl Repository for ClusterKeyRepository { async fn create(&self, cluster_key: ClusterKey) -> Result<()> { let dto = ClusterKeyDTO::from(cluster_key); - let _ : Option = sqlx::query_as("INSERT IGNORE INTO cluster_key(data, algorithm, identity, create_at, expire_at) VALUES (?, ?, ?, ?, ?)") + let _ : Option = sqlx::query_as("INSERT IGNORE INTO cluster_key(data, algorithm, identity, create_at) VALUES (?, ?, ?, ?)") .bind(&dto.data) .bind(&dto.algorithm) .bind(&dto.identity) .bind(dto.create_at) - .bind(dto.expire_at) .fetch_optional(&self.db_pool) .await?; Ok(()) diff --git a/src/infra/database/model/datakey/repository.rs b/src/infra/database/model/datakey/repository.rs index 68b4b88..eda2f72 100644 --- a/src/infra/database/model/datakey/repository.rs +++ b/src/infra/database/model/datakey/repository.rs @@ -94,6 +94,18 @@ impl Repository for DataKeyRepository { } Ok(results) } + + async fn get_all_keys(&self) -> Result> { + let dtos: Vec = sqlx::query_as("SELECT * FROM data_key WHERE key_state != ?") + .bind(KeyState::Deleted.to_string()) + .fetch_all(&self.db_pool) + .await?; + let mut results = vec![]; + for dto in dtos.into_iter() { + results.push(DataKey::try_from(dto)?); + } + Ok(results) + } async fn get_private_keys(&self, user_id: i32) -> Result> { let dtos: Vec = sqlx::query_as("SELECT * FROM data_key WHERE key_state != ? and visibility = ? and user = ?") .bind(KeyState::Deleted.to_string()) diff --git a/src/infra/encryption/engine.rs b/src/infra/encryption/engine.rs index 1d2acd5..af3d692 100644 --- a/src/infra/encryption/engine.rs +++ b/src/infra/encryption/engine.rs @@ -24,12 +24,13 @@ use async_trait::async_trait; use config::Value; use std::collections::HashMap; use std::sync::Arc; +use chrono::{Utc, Duration}; use tokio::sync::RwLock; use crate::domain::kms_provider::KMSProvider; pub const KEY_SIZE: usize = 2; - +pub const DEFAULT_ROTATE_IN_DAYS: i64 = 90; pub struct EncryptionEngineWithClusterKey where C: ClusterKeyRepository, @@ -40,8 +41,8 @@ where cluster_repository: C, kms_provider: Box, encryptor: Box, - keep_in_days: i64, - latest_cluster_key: Box, + rotate_in_days: i64, + latest_cluster_key: Arc>, cluster_key_container: Arc>> // cluster key id -> cluster key } @@ -64,24 +65,28 @@ where encryptor: Box, config: &HashMap, kms_provider: Box) -> Result { + let rotate_in_days = config + .get("rotate_in_days") + .expect("rotate in days should configured") + .to_string() + .parse().unwrap_or(DEFAULT_ROTATE_IN_DAYS); + if rotate_in_days < DEFAULT_ROTATE_IN_DAYS { + return Err(Error::ConfigError(format!("rotate in days should greater than {}", rotate_in_days))); + } Ok(EncryptionEngineWithClusterKey { cluster_repository, encryptor, - keep_in_days: config - .get("keep_in_days") - .expect("encryption engine should configured") - .to_string() - .parse()?, - latest_cluster_key: Box::::default(), + rotate_in_days, + latest_cluster_key: Arc::new(RwLock::new(SecClusterKey::default())), kms_provider, cluster_key_container: Arc::new(RwLock::new(HashMap::new())) }) } - fn append_cluster_key_hex(&self, data: &mut Vec) -> Vec { + async fn append_cluster_key_hex(&self, data: &mut Vec) -> Vec { let mut result = vec![]; result.append(&mut key::decode_hex_string_to_u8(&format!( "{:04X}", - self.latest_cluster_key.id + self.latest_cluster_key.read().await.id, ))); result.append(data); result @@ -97,6 +102,31 @@ where self.cluster_key_container.write().await.insert(cluster_id, cluster_key.clone()); Ok(cluster_key) } + + async fn generate_new_key(&self) -> Result<()> { + //generate new key identified with date time + let cluster_key = ClusterKey::new( + self.kms_provider.encode( + key::encode_u8_to_hex_string(&self.encryptor.generate_key())) + .await?.as_bytes().to_vec(), + self.encryptor.algorithm().to_string(), + )?; + //insert when no records + self.cluster_repository.create(cluster_key).await?; + match self + .cluster_repository + .get_latest(&self.encryptor.algorithm().to_string()) + .await? + { + None => { + return Err(Error::ConfigError( + "can't find latest cluster key from database".to_string(), + )) + } + Some(cluster) => *self.latest_cluster_key.write().await = SecClusterKey::load(cluster, &self.kms_provider).await?, + }; + Ok(()) + } } #[async_trait] @@ -107,46 +137,36 @@ where E: Encryptor + ?Sized { async fn initialize(&mut self) -> Result<()> { - //generate new symmetric keys when there is no db record + //generate new cluster keys only when there is no db record match the date let key = self .cluster_repository .get_latest(&self.encryptor.algorithm().to_string()) .await?; match key { - Some(k) => *self.latest_cluster_key = SecClusterKey::load(k, &self.kms_provider).await?, + Some(k) => *self.latest_cluster_key.write().await = SecClusterKey::load(k, &self.kms_provider).await?, None => { - let cluster_key = ClusterKey::new( - self.kms_provider.encode( - key::encode_u8_to_hex_string(&self.encryptor.generate_key())) - .await?.as_bytes().to_vec(), - self.encryptor.algorithm().to_string(), - self.keep_in_days, - )?; - //insert when no records - self.cluster_repository.create(cluster_key).await?; - match self - .cluster_repository - .get_latest(&self.encryptor.algorithm().to_string()) - .await? - { - None => { - return Err(Error::ConfigError( - "can't find latest cluster key from database".to_string(), - )) - } - Some(cluster) => *self.latest_cluster_key = SecClusterKey::load(cluster, &self.kms_provider).await?, - } + self.generate_new_key().await?; } } - info!("current cluster key is: {}", self.latest_cluster_key); + info!("cluster key is found or generated : {}", self.latest_cluster_key.read().await); Ok(()) } + + async fn rotate_key(&mut self) -> Result { + if Utc::now() < self.latest_cluster_key.read().await.create_at + Duration::days(1) { + return Ok(false); + } + self.generate_new_key().await?; + info!("cluster key is rotated : {}", self.latest_cluster_key.read().await); + Ok(true) + } + async fn encode(&self, content: Vec) -> Result> { //always use latest cluster key to encode data let mut secret = self .encryptor - .encrypt(self.latest_cluster_key.data.unsecure().to_owned(), content)?; - Ok(self.append_cluster_key_hex(&mut secret)) + .encrypt(self.latest_cluster_key.read().await.data.unsecure().to_owned(), content)?; + Ok(self.append_cluster_key_hex(&mut secret).await) } async fn decode(&self, content: Vec) -> Result> { diff --git a/src/infra/sign_backend/memory/backend.rs b/src/infra/sign_backend/memory/backend.rs index f9ea141..7d26116 100644 --- a/src/infra/sign_backend/memory/backend.rs +++ b/src/infra/sign_backend/memory/backend.rs @@ -98,6 +98,10 @@ impl SignBackend for MemorySignBackend { Ok(()) } + async fn rotate_key(&mut self) -> Result { + self.engine.rotate_key().await + } + async fn sign(&self, data_key: &DataKey, content: Vec, options: HashMap) -> Result> { let sec_key = SecDataKey::load(data_key, &self.engine).await?; Signers::load_from_data_key(&data_key.key_type, sec_key)?.sign(content, options) diff --git a/src/presentation/handler/control/datakey_handler.rs b/src/presentation/handler/control/datakey_handler.rs index 848753f..ef1ca0e 100644 --- a/src/presentation/handler/control/datakey_handler.rs +++ b/src/presentation/handler/control/datakey_handler.rs @@ -141,7 +141,7 @@ async fn create_data_key(user: UserIdentity, key_service: web::Data, key_query: web::Query) -> Result { let key_visibility = Visibility::from_str(key_query.visibility.as_str())?; - let keys = key_service.into_inner().get_all(Some(user), key_visibility).await?; + let keys = key_service.into_inner().get_by_visibility(Some(user), key_visibility).await?; let mut results = vec![]; for k in keys { results.push(DataKeyDTO::try_from(k)?) diff --git a/src/presentation/server/control_server.rs b/src/presentation/server/control_server.rs index 5da2f18..be7ba1d 100644 --- a/src/presentation/server/control_server.rs +++ b/src/presentation/server/control_server.rs @@ -34,6 +34,7 @@ use crate::infra::database::model::datakey::repository as datakeyRepository; use crate::infra::database::pool::{create_pool, get_db_pool}; use crate::presentation::handler::control::*; use actix_web::{dev::ServiceRequest}; +use tokio_util::sync::CancellationToken; use crate::util::error::Result; use crate::application::datakey::{DBKeyService, KeyService}; @@ -51,7 +52,7 @@ pub struct ControlServer { server_config: Arc>, user_service: Arc, key_service: Arc, - + cancel_token: CancellationToken, } struct SecurityAddon; @@ -106,7 +107,7 @@ impl Modify for SecurityAddon { struct ControlApiDoc; impl ControlServer { - pub async fn new(server_config: Arc>) -> Result { + pub async fn new(server_config: Arc>, cancel_token: CancellationToken) -> Result { let database = server_config.read()?.get_table("database")?; create_pool(&database).await?; let data_repository = datakeyRepository::DataKeyRepository::new( @@ -130,6 +131,7 @@ impl ControlServer { user_service, key_service, server_config, + cancel_token, }; Ok(server) } @@ -157,6 +159,8 @@ impl ControlServer { let key_service = web::Data::from( self.key_service.clone()); + key_service.start_key_rotate_loop(self.cancel_token.clone())?; + //prepare redis store let store = RedisSessionStore::new(&redis_connection).await?; let limiter = web::Data::new( diff --git a/src/presentation/server/data_server.rs b/src/presentation/server/data_server.rs index 4c0a4dc..9cd84da 100644 --- a/src/presentation/server/data_server.rs +++ b/src/presentation/server/data_server.rs @@ -75,12 +75,13 @@ impl DataServer { info!("tls key and cert not configured, data server tls will be disabled"); return Ok(()); } + let ca_root = self.server_config.read()?.get_string("ca_root").expect("ca_root not configured"); + let tls_cert = self.server_config.read()?.get_string("tls_cert")?; + let tls_key = self.server_config.read()?.get_string("tls_key")?; self.ca_cert = Some( - Certificate::from_pem( - fs::read(self.server_config.read()?.get_string("ca_root")?).await?)); - self.server_identity = Some(Identity::from_pem( - fs::read(self.server_config.read()?.get_string("tls_cert")?).await?, - fs::read(self.server_config.read()?.get_string("tls_key")?).await?)); + Certificate::from_pem(fs::read(ca_root).await?)); + self.server_identity = Some(Identity::from_pem(fs::read(tls_cert).await?, + fs::read(tls_key).await?)); Ok(()) } @@ -119,8 +120,8 @@ impl DataServer { let token_repo = TokenRepository::new(get_db_pool()?); let user_service = DBUserService::new(user_repo, token_repo, self.server_config.clone())?; - key_service.start_loop(self.cancel_token.clone())?; - user_service.start_loop(self.cancel_token.clone())?; + key_service.start_cache_cleanup_loop(self.cancel_token.clone())?; + user_service.start_cache_cleanup_loop(self.cancel_token.clone())?; if let Some(identity) = self.server_identity.clone() { server .tls_config(ServerTlsConfig::new().identity(identity).client_ca_root(self.ca_cert.clone().unwrap()))? -- Gitee