From b027f6ce5e3452aadc5c706835b53657688f2c89 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Fri, 19 May 2023 13:27:00 +0800 Subject: [PATCH] Support private keys on data-server --- config/client.toml | 1 + proto/signatrust.proto | 1 + src/application/datakey.rs | 2 +- src/application/user.rs | 35 ++++++++++++++- src/client/cmd/add.rs | 4 +- src/client/worker/signer.rs | 5 ++- src/infra/kms/huaweicloud.rs | 2 +- src/presentation/handler/data/sign_handler.rs | 45 +++++++++++++++---- src/presentation/server/data_server.rs | 11 ++++- 9 files changed, 90 insertions(+), 16 deletions(-) diff --git a/config/client.toml b/config/client.toml index 55e84ec..d017e48 100644 --- a/config/client.toml +++ b/config/client.toml @@ -3,6 +3,7 @@ worker_threads = 8 buffer_size = 20480 # consider the memory consumption if number bumped since all binaries will be stored in memory max_concurrency = 100 +token = "" [server] domain_name = "signatrust.test.osinfra.cn" tls_cert = "/Users/tommylike/Work/codes/rust-projects/signatrust/.data/certs/client/server.crt" diff --git a/proto/signatrust.proto b/proto/signatrust.proto index 3e84864..723df87 100644 --- a/proto/signatrust.proto +++ b/proto/signatrust.proto @@ -13,6 +13,7 @@ message SignStreamRequest { string key_type = 2; string key_id = 3; map options = 4; + string token = 5; } message SignStreamResponse { diff --git a/src/application/datakey.rs b/src/application/datakey.rs index 811f6cf..0d78f34 100644 --- a/src/application/datakey.rs +++ b/src/application/datakey.rs @@ -163,7 +163,7 @@ where tokio::spawn(async move { while !signal.load(Ordering::Relaxed) { debug!("start to clear the container keys"); - sleep(Duration::from_secs(60)).await; + sleep(Duration::from_secs(120)).await; container.clear_keys().await; } }); diff --git a/src/application/user.rs b/src/application/user.rs index 34c00f0..4973431 100644 --- a/src/application/user.rs +++ b/src/application/user.rs @@ -14,6 +14,7 @@ * */ +use std::collections::HashMap; use crate::domain::user::entity::User; use crate::domain::token::entity::Token; use crate::domain::user::repository::Repository as UserRepository; @@ -22,6 +23,8 @@ use crate::util::error::{Result, Error}; use async_trait::async_trait; use std::sync::Arc; use std::sync::RwLock; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::RwLock as AsyncRwLock; use chrono::Utc; use serde::{Deserialize}; use config::Config; @@ -34,6 +37,7 @@ use openidconnect::{ }; use openidconnect::{JsonWebKeySet, ClientId, AuthUrl, UserInfoUrl, TokenUrl, RedirectUrl, ClientSecret, IssuerUrl}; use url::Url; +use tokio::time::{Duration, sleep}; use crate::presentation::handler::control::model::token::dto::{CreateTokenDTO}; use crate::util::key::{generate_api_token}; @@ -48,6 +52,9 @@ pub trait UserService: Send + Sync{ async fn generate_token(&self, u: &UserIdentity, token: CreateTokenDTO) -> Result; async fn get_login_url(&self) -> Result; async fn validate_user(&self, code: &str) -> Result; + async fn validate_token_and_email(&self, email: &str, token: &str) -> Result; + //method below used for maintenance + fn start_loop(&self, signal: Arc) -> Result<()>; } #[derive(Deserialize, Debug)] @@ -78,7 +85,8 @@ where user_repository: R, token_repository: T, oidc_config: OIDCConfig, - client: CoreClient + client: CoreClient, + tokens: Arc>>, } impl DBUserService @@ -110,7 +118,8 @@ impl DBUserService user_repository, token_repository, oidc_config, - client + client, + tokens: Arc::new(AsyncRwLock::new(HashMap::new())) }) } @@ -219,4 +228,26 @@ where } } } + + async fn validate_token_and_email(&self, email: &str, token: &str) -> Result { + if let Some(e) = self.tokens.read().await.get(token) { + return Ok(e == email) + } + let tk = self.get_valid_token(token).await?; + let user = self.user_repository.get_by_id(tk.user_id).await?; + self.tokens.write().await.insert(token.to_string(), user.email.clone()); + Ok(email == user.email) + } + + fn start_loop(&self, signal: Arc) -> Result<()> { + let tokens = self.tokens.clone(); + tokio::spawn(async move { + while !signal.load(Ordering::Relaxed) { + debug!("start to clear the container tokens"); + sleep(Duration::from_secs(120)).await; + tokens.write().await.clear(); + } + }); + Ok(()) + } } diff --git a/src/client/cmd/add.rs b/src/client/cmd/add.rs index 02c697b..e34cd90 100644 --- a/src/client/cmd/add.rs +++ b/src/client/cmd/add.rs @@ -85,6 +85,7 @@ pub struct CommandAddHandler { detached: bool, max_concurrency: usize, sign_type: SignType, + token: String, } impl CommandAddHandler { @@ -162,6 +163,7 @@ impl SignCommand for CommandAddHandler { detached: command.detached, max_concurrency: config.read()?.get_string("max_concurrency")?.parse()?, sign_type: command.sign_type, + token: config.read()?.get_string("token").unwrap_or("".to_string()), }) } @@ -197,7 +199,7 @@ impl SignCommand for CommandAddHandler { runtime.block_on(async { let channel = ChannelFactory::new( &lb_config).await.unwrap().get_channel().unwrap(); - let mut signer = RemoteSigner::new(channel, self.buffer_size); + let mut signer = RemoteSigner::new(channel, self.buffer_size, self.token.clone()); //split file let send_handlers = files.into_iter().map(|file|{ let task_split_s = split_s.clone(); diff --git a/src/client/worker/signer.rs b/src/client/worker/signer.rs index 815e8fe..8c6afaf 100644 --- a/src/client/worker/signer.rs +++ b/src/client/worker/signer.rs @@ -34,15 +34,17 @@ use std::io::{Cursor, Read}; pub struct RemoteSigner { client: SignatrustClient, buffer_size: usize, + token: String } impl RemoteSigner { - pub fn new(channel: Channel, buffer_size: usize) -> Self { + pub fn new(channel: Channel, buffer_size: usize, token: String) -> Self { Self { client: SignatrustClient::new(channel), buffer_size, + token, } } } @@ -66,6 +68,7 @@ impl SignHandler for RemoteSigner { options: item.sign_options.borrow().clone(), key_type: format!("{}", item.key_type), key_id: item.key_id.clone(), + token: self.token.clone() }); } let result = self.client.sign_stream( diff --git a/src/infra/kms/huaweicloud.rs b/src/infra/kms/huaweicloud.rs index 97e2310..1cd8184 100644 --- a/src/infra/kms/huaweicloud.rs +++ b/src/infra/kms/huaweicloud.rs @@ -415,7 +415,7 @@ mod test { //create kms client let kms_client = HuaweiCloudKMS::new(&config).expect("create huaweicloud client should be successful"); let request_url = format!("{}/kms/fake_endpoint", url); - let result = kms_client.do_request(&request_url, &fake_request).await.expect_err("always failed to invoke request"); + let _result = kms_client.do_request(&request_url, &fake_request).await.expect_err("always failed to invoke request"); //auth and request should be invoked twice. mock_auth.expect_at_least(2).assert(); diff --git a/src/presentation/handler/data/sign_handler.rs b/src/presentation/handler/data/sign_handler.rs index d543c4c..39ebb93 100644 --- a/src/presentation/handler/data/sign_handler.rs +++ b/src/presentation/handler/data/sign_handler.rs @@ -27,29 +27,48 @@ use signatrust::{ }; use tonic::{Request, Response, Status, Streaming}; use crate::application::datakey::KeyService; +use crate::application::user::UserService; +use crate::util::error::Error; +use crate::util::error::Result as SignatrustResult; -pub struct SignHandler +pub struct SignHandler where K: KeyService + 'static, + U: UserService + 'static, { key_service: K, + user_service: U, } -impl SignHandler +impl SignHandler where K: KeyService + 'static, + U: UserService + 'static, { - pub fn new(key_service: K) -> Self { + pub fn new(key_service: K, user_service: U) -> Self { SignHandler { - key_service + key_service, + user_service } } + + async fn validate_private_key_token(&self, token: &str, name: &str) -> SignatrustResult<()> { + let names: Vec<_> = name.split(':').collect(); + if names.len() <= 1 { + return Ok(()) + } + if token.is_empty() || !self.user_service.validate_token_and_email(names[0], token).await? { + return Err(Error::AuthError("user token and email unmatched".to_string())) + } + Ok(()) + } } #[tonic::async_trait] -impl Signatrust for SignHandler +impl Signatrust for SignHandler where K: KeyService + 'static, + U: UserService + 'static, { async fn sign_stream( &self, @@ -59,6 +78,7 @@ where let mut data: Vec = vec![]; let mut key_name: String = "".to_string(); let mut key_type: String = "".to_string(); + let mut token: String = "".to_string(); let mut options: HashMap = HashMap::new(); while let Some(content) = binaries.next().await { let mut inner_result = content.unwrap(); @@ -66,8 +86,16 @@ where key_name = inner_result.key_id; key_type = inner_result.key_type; options = inner_result.options; + token = inner_result.token; } debug!("begin to sign key_type :{} key_name: {}", key_type, key_name); + //perform token validation on private keys + if let Err(err) = self.validate_private_key_token(&token, &key_name).await { + return Ok(Response::new(SignStreamResponse { + signature: vec![], + error: err.to_string(), + })) + } match self.key_service.sign(key_type, key_name, &options, data).await { Ok(content) => { Ok(Response::new(SignStreamResponse { @@ -85,10 +113,11 @@ where } } -pub fn get_grpc_handler(key_service: K) -> SignatrustServer> +pub fn get_grpc_handler(key_service: K, user_service: U) -> SignatrustServer> where - K: KeyService + 'static + K: KeyService + 'static, + U: UserService + 'static { - let app = SignHandler::new(key_service); + let app = SignHandler::new(key_service, user_service); SignatrustServer::new(app) } diff --git a/src/presentation/server/data_server.rs b/src/presentation/server/data_server.rs index ddad7f3..b5d4d1c 100644 --- a/src/presentation/server/data_server.rs +++ b/src/presentation/server/data_server.rs @@ -27,8 +27,11 @@ use tonic::{ }, }; use crate::application::datakey::{DBKeyService, KeyService}; +use crate::application::user::{DBUserService, UserService}; use crate::infra::database::model::datakey::repository; +use crate::infra::database::model::token::repository::TokenRepository; +use crate::infra::database::model::user::repository::UserRepository; use crate::infra::database::pool::{create_pool, get_db_pool}; use crate::infra::sign_backend::factory::SignBackendFactory; @@ -109,17 +112,21 @@ impl DataServer { let data_repository = repository::DataKeyRepository::new( get_db_pool()?); let key_service = DBKeyService::new(data_repository, sign_backend); + let user_repo = UserRepository::new(get_db_pool()?); + let token_repo = TokenRepository::new(get_db_pool()?); + let user_service = DBUserService::new(user_repo, token_repo, self.server_config.clone())?; key_service.start_loop(self.signal.clone())?; + user_service.start_loop(self.signal.clone())?; if let Some(identity) = self.server_identity.clone() { server .tls_config(ServerTlsConfig::new().identity(identity).client_ca_root(self.ca_cert.clone().unwrap()))? - .add_service(get_grpc_handler(key_service)) + .add_service(get_grpc_handler(key_service, user_service)) .serve_with_shutdown(addr, self.shutdown_signal()) .await? } else { server - .add_service(get_grpc_handler(key_service)) + .add_service(get_grpc_handler(key_service, user_service)) .serve_with_shutdown(addr, self.shutdown_signal()) .await? } -- Gitee