From 73216c3512ba2995f2cdfdf08646a49a6dd449e4 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Fri, 5 May 2023 22:09:22 +0800 Subject: [PATCH] Fix database sync issue --- src/application/datakey.rs | 23 +++++++++++++++++++++-- src/domain/clusterkey/entity.rs | 1 + src/infra/encryption/engine.rs | 12 ++++++++++-- src/presentation/server/data_server.rs | 4 +++- src/util/signer_container.rs | 5 +++++ 5 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/application/datakey.rs b/src/application/datakey.rs index 1a1552b..4565312 100644 --- a/src/application/datakey.rs +++ b/src/application/datakey.rs @@ -19,9 +19,13 @@ use crate::domain::sign_service::SignBackend; use crate::util::error::{Result}; use async_trait::async_trait; use crate::domain::datakey::entity::{DataKey, KeyState}; +use std::sync::{Arc, atomic::AtomicBool}; + +use tokio::time::{Duration, sleep}; use crate::util::signer_container::DataKeyContainer; use std::collections::HashMap; +use std::sync::atomic::Ordering; #[async_trait] pub trait KeyService: Send + Sync{ @@ -34,13 +38,16 @@ pub trait KeyService: Send + Sync{ async fn enable(&self, id: i32) -> Result<()>; async fn disable(&self, id: i32) -> Result<()>; async fn sign(&self, key_type: String, key_name: String, options: &HashMap, data: Vec) ->Result>; + + //method below used for maintenance + fn start_loop(&self, signal: Arc) -> Result<()>; } pub struct DBKeyService where - R: DatakeyRepository + Clone, + R: DatakeyRepository + Clone + 'static, S: SignBackend + ?Sized { repository: R, @@ -50,7 +57,7 @@ where impl DBKeyService where - R: DatakeyRepository + Clone, + R: DatakeyRepository + Clone + 'static, S: SignBackend + ?Sized { pub fn new(repository: R, sign_service: Box) -> Self { @@ -111,4 +118,16 @@ where self.sign_service.sign( &self.container.get_data_key(key_type, key_name).await?, data, options.clone()).await } + + fn start_loop(&self, signal: Arc) -> Result<()> { + let container = self.container.clone(); + tokio::spawn(async move { + while !signal.load(Ordering::Relaxed) { + debug!("start to clear the container keys"); + sleep(Duration::from_secs(60)).await; + container.clear_keys().await; + } + }); + Ok(()) + } } diff --git a/src/domain/clusterkey/entity.rs b/src/domain/clusterkey/entity.rs index 74ae3f0..df36ed7 100644 --- a/src/domain/clusterkey/entity.rs +++ b/src/domain/clusterkey/entity.rs @@ -70,6 +70,7 @@ impl ClusterKey { } } +#[derive(Clone)] pub struct SecClusterKey { pub id: i32, pub data: SecVec, diff --git a/src/infra/encryption/engine.rs b/src/infra/encryption/engine.rs index f3939e3..1d2acd5 100644 --- a/src/infra/encryption/engine.rs +++ b/src/infra/encryption/engine.rs @@ -23,6 +23,8 @@ use crate::util::key; use async_trait::async_trait; use config::Value; use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; use crate::domain::kms_provider::KMSProvider; @@ -40,6 +42,7 @@ where encryptor: Box, keep_in_days: i64, latest_cluster_key: Box, + cluster_key_container: Arc>> // cluster key id -> cluster key } /// considering we have rotated cluster key for safety concern @@ -71,7 +74,7 @@ where .parse()?, latest_cluster_key: Box::::default(), kms_provider, - + cluster_key_container: Arc::new(RwLock::new(HashMap::new())) }) } fn append_cluster_key_hex(&self, data: &mut Vec) -> Vec { @@ -87,7 +90,12 @@ where async fn get_used_sec_cluster_key(&self, data: &[u8]) -> Result { //convert the cluster back and obtain from database, hard code here. let cluster_id: i32 = (data[0] as i32) * 256 + data[1] as i32; - SecClusterKey::load( self.cluster_repository.get_by_id(cluster_id).await?, &self.kms_provider).await + if let Some(cluster_key) = self.cluster_key_container.read().await.get(&cluster_id) { + return Ok((*cluster_key).clone()) + } + let cluster_key = SecClusterKey::load( self.cluster_repository.get_by_id(cluster_id).await?, &self.kms_provider).await?; + self.cluster_key_container.write().await.insert(cluster_id, cluster_key.clone()); + Ok(cluster_key) } } diff --git a/src/presentation/server/data_server.rs b/src/presentation/server/data_server.rs index 0f6c753..ddad7f3 100644 --- a/src/presentation/server/data_server.rs +++ b/src/presentation/server/data_server.rs @@ -26,7 +26,7 @@ use tonic::{ Identity, Server, ServerTlsConfig, }, }; -use crate::application::datakey::DBKeyService; +use crate::application::datakey::{DBKeyService, KeyService}; use crate::infra::database::model::datakey::repository; use crate::infra::database::pool::{create_pool, get_db_pool}; @@ -109,6 +109,8 @@ impl DataServer { let data_repository = repository::DataKeyRepository::new( get_db_pool()?); let key_service = DBKeyService::new(data_repository, sign_backend); + + key_service.start_loop(self.signal.clone())?; if let Some(identity) = self.server_identity.clone() { server .tls_config(ServerTlsConfig::new().identity(identity).client_ca_root(self.ca_cert.clone().unwrap()))? diff --git a/src/util/signer_container.rs b/src/util/signer_container.rs index 0e8adf5..589bc9b 100644 --- a/src/util/signer_container.rs +++ b/src/util/signer_container.rs @@ -21,6 +21,7 @@ use crate::util::error::Result; use crate::domain::datakey::repository::Repository; use crate::domain::datakey::entity::DataKey; +#[derive(Clone)] pub struct DataKeyContainer where R: Repository @@ -53,4 +54,8 @@ where fn get_identity(&self, key_type: &str, key_name: &str) -> String { format!("{}-{}",key_type, key_name) } + + pub async fn clear_keys(&self) { + self.containers.write().await.clear(); + } } \ No newline at end of file -- Gitee