From 94575f26f5f2accc9d0e655987161cd5ff1f7a14 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Sat, 20 May 2023 18:10:18 +0800 Subject: [PATCH] Use tokio select to handle graceful shutdown event --- Cargo.toml | 5 ++- src/application/datakey.rs | 27 +++++++----- src/application/user.rs | 25 +++++++---- src/control_server_entrypoint.rs | 30 ++++++++++--- src/data_server_entrypoint.rs | 32 ++++++++++---- src/infra/encryption/algorithm/aes.rs | 1 - src/presentation/server/data_server.rs | 25 ++++++----- src/util/config.rs | 60 ++++++++++++++------------ 8 files changed, 130 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f55a0d..2a5b6bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,12 +25,13 @@ tonic = {version = "0.8.2", features = ["tls", "tls-roots", "transport", "channe prost = "0.11.0" signal-hook = "0.3.14" tokio-stream = "0.1.11" -tokio = {version = "1.21.2", features = ["rt-multi-thread", "fs", "sync"]} +tokio = {version = "1.21.2", features = ["rt-multi-thread", "fs", "sync", "time"]} +tokio-util = "0.7.8" log = "0.4.17" async_once = "0.2.6" async-trait = "0.1.60" md-5 = "0.10.5" -notify = "5.0.0" +notify = { version = "6.0.0", default-features = false, features = ["macos_kqueue"] } env_logger = "0.10.0" anyhow = "1.0.66" thiserror = "1.0.38" diff --git a/src/application/datakey.rs b/src/application/datakey.rs index 0d78f34..3be42db 100644 --- a/src/application/datakey.rs +++ b/src/application/datakey.rs @@ -19,13 +19,11 @@ use crate::domain::sign_service::SignBackend; use crate::util::error::{Error, Result}; use async_trait::async_trait; use crate::domain::datakey::entity::{DataKey, KeyState, Visibility}; -use std::sync::{Arc, atomic::AtomicBool}; - -use tokio::time::{Duration, sleep}; +use tokio::time::{Duration, self}; use crate::util::signer_container::DataKeyContainer; use std::collections::HashMap; -use std::sync::atomic::Ordering; +use tokio_util::sync::CancellationToken; use crate::presentation::handler::control::model::user::dto::UserIdentity; #[async_trait] @@ -42,7 +40,7 @@ pub trait KeyService: Send + Sync{ async fn sign(&self, key_type: String, key_name: String, options: &HashMap, data: Vec) ->Result>; //method below used for maintenance - fn start_loop(&self, signal: Arc) -> Result<()>; + fn start_loop(&self, cancel_token: CancellationToken) -> Result<()>; } @@ -158,14 +156,23 @@ where &self.container.get_data_key(key_type, key_name).await?, data, options.clone()).await } - fn start_loop(&self, signal: Arc) -> Result<()> { + fn start_loop(&self, cancel_token: CancellationToken) -> Result<()> { let container = self.container.clone(); + let mut interval = time::interval(Duration::from_secs(120)); tokio::spawn(async move { - while !signal.load(Ordering::Relaxed) { - debug!("start to clear the container keys"); - sleep(Duration::from_secs(120)).await; - container.clear_keys().await; + loop { + tokio::select! { + _ = interval.tick() => { + info!("start to clear the container keys"); + container.clear_keys().await; + } + _ = cancel_token.cancelled() => { + info!("cancel token received, will quit datakey refresher"); + break; + } + } } + }); Ok(()) } diff --git a/src/application/user.rs b/src/application/user.rs index 4973431..9ab6d7d 100644 --- a/src/application/user.rs +++ b/src/application/user.rs @@ -23,7 +23,6 @@ use crate::util::error::{Result, Error}; use async_trait::async_trait; use std::sync::Arc; use std::sync::RwLock; -use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::RwLock as AsyncRwLock; use chrono::Utc; use serde::{Deserialize}; @@ -37,7 +36,8 @@ use openidconnect::{ }; use openidconnect::{JsonWebKeySet, ClientId, AuthUrl, UserInfoUrl, TokenUrl, RedirectUrl, ClientSecret, IssuerUrl}; use url::Url; -use tokio::time::{Duration, sleep}; +use tokio::time::{Duration, self}; +use tokio_util::sync::CancellationToken; use crate::presentation::handler::control::model::token::dto::{CreateTokenDTO}; use crate::util::key::{generate_api_token}; @@ -54,7 +54,7 @@ pub trait UserService: Send + Sync{ async fn validate_user(&self, code: &str) -> Result; async fn validate_token_and_email(&self, email: &str, token: &str) -> Result; //method below used for maintenance - fn start_loop(&self, signal: Arc) -> Result<()>; + fn start_loop(&self, cancel_token: CancellationToken) -> Result<()>; } #[derive(Deserialize, Debug)] @@ -239,14 +239,23 @@ where Ok(email == user.email) } - fn start_loop(&self, signal: Arc) -> Result<()> { + fn start_loop(&self, cancel_token: CancellationToken) -> Result<()> { let tokens = self.tokens.clone(); + let mut interval = time::interval(Duration::from_secs(120)); tokio::spawn(async move { - while !signal.load(Ordering::Relaxed) { - debug!("start to clear the container tokens"); - sleep(Duration::from_secs(120)).await; - tokens.write().await.clear(); + loop { + tokio::select! { + _ = interval.tick() => { + info!("start to clear the container tokens"); + tokens.write().await.clear(); + } + _ = cancel_token.cancelled() => { + info!("cancel token received, will quit user token refresher"); + break; + } + } } + }); Ok(()) } diff --git a/src/control_server_entrypoint.rs b/src/control_server_entrypoint.rs index 5a20a31..e86eed3 100644 --- a/src/control_server_entrypoint.rs +++ b/src/control_server_entrypoint.rs @@ -19,7 +19,12 @@ use crate::util::error::Result; use clap::Parser; use config::Config; use std::env; -use std::sync::{atomic::AtomicBool, Arc, RwLock}; +use std::sync::{Arc, RwLock}; +use tokio_util::sync::CancellationToken; +use tokio::{ + select, + signal::unix::{signal, SignalKind}, +}; mod infra; mod domain; @@ -46,19 +51,30 @@ pub struct App { } lazy_static! { - pub static ref SIGNAL: Arc = { - let signal = Arc::new(AtomicBool::new(false)); + pub static ref CANCEL_TOKEN: CancellationToken = { + let cancel_token = CancellationToken::new(); + let cancel_token_handler = cancel_token.clone(); //setup up signal handler - signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&signal)).expect("failed to register sigterm signal"); - signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&signal)).expect("failed to register sigint signal"); - signal + tokio::spawn(async move { + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + loop { + select! { + _ = sigterm.recv() => {}, + _ = sigint.recv() => {}, + }; + info!("received quit signal, canceling all sub tasks"); + cancel_token_handler.cancel(); + } + }); + cancel_token }; pub static ref SERVERCONFIG: Arc> = { let app = App::parse(); let path = app.config.unwrap_or(format!("{}/{}", env::current_dir().expect("current dir not found").display(), "config/server.toml")); let server_config = util::config::ServerConfig::new(path); - server_config.watch(Arc::clone(&SIGNAL)).expect("failed to watch configure file"); + server_config.watch(CANCEL_TOKEN.clone()).expect("failed to watch configure file"); server_config.config }; } diff --git a/src/data_server_entrypoint.rs b/src/data_server_entrypoint.rs index 46865ce..fa1638f 100644 --- a/src/data_server_entrypoint.rs +++ b/src/data_server_entrypoint.rs @@ -19,7 +19,12 @@ use crate::util::error::Result; use clap::Parser; use config::Config; use std::env; -use std::sync::{atomic::AtomicBool, Arc, RwLock}; +use std::sync::{Arc, RwLock}; +use tokio_util::sync::CancellationToken; +use tokio::{ + select, + signal::unix::{signal, SignalKind}, +}; use crate::presentation::server::data_server::DataServer; @@ -48,19 +53,30 @@ pub struct App { } lazy_static! { - pub static ref SIGNAL: Arc = { - let signal = Arc::new(AtomicBool::new(false)); + pub static ref CANCEL_TOKEN: CancellationToken = { + let cancel_token = CancellationToken::new(); + let cancel_token_handler = cancel_token.clone(); //setup up signal handler - signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&signal)).expect("failed to register sigterm signal"); - signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&signal)).expect("failed to register sigint signal"); - signal + tokio::spawn(async move { + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + loop { + select! { + _ = sigterm.recv() => {}, + _ = sigint.recv() => {}, + }; + info!("received quit signal, canceling all sub tasks"); + cancel_token_handler.cancel(); + } + }); + cancel_token }; pub static ref SERVERCONFIG: Arc> = { let app = App::parse(); let path = app.config.unwrap_or(format!("{}/{}", env::current_dir().expect("current dir not found").display(), "config/server.toml")); let server_config = util::config::ServerConfig::new(path); - server_config.watch(Arc::clone(&SIGNAL)).expect("failed to watch configure file"); + server_config.watch(CANCEL_TOKEN.clone()).expect("failed to watch configure file"); server_config.config }; } @@ -70,7 +86,7 @@ async fn main() -> Result<()> { //prepare config and logger env_logger::init(); //data server starts - let data_server: DataServer = DataServer::new(SERVERCONFIG.clone(), SIGNAL.clone()).await?; + let data_server: DataServer = DataServer::new(SERVERCONFIG.clone(), CANCEL_TOKEN.clone()).await?; data_server.run().await?; Ok(()) } diff --git a/src/infra/encryption/algorithm/aes.rs b/src/infra/encryption/algorithm/aes.rs index 641669f..b5abefa 100644 --- a/src/infra/encryption/algorithm/aes.rs +++ b/src/infra/encryption/algorithm/aes.rs @@ -22,7 +22,6 @@ use aes_gcm_siv::{ }; use generic_array::GenericArray; use rand::{thread_rng, Rng}; -use crate::util::error; use crate::util::error::Result; pub const NONCE_LENGTH: usize = 12; diff --git a/src/presentation/server/data_server.rs b/src/presentation/server/data_server.rs index b5d4d1c..4c0a4dc 100644 --- a/src/presentation/server/data_server.rs +++ b/src/presentation/server/data_server.rs @@ -15,11 +15,10 @@ */ use std::net::SocketAddr; -use std::sync::{Arc, atomic::AtomicBool, RwLock}; -use std::sync::atomic::Ordering; +use std::sync::{Arc, RwLock}; use config::Config; use tokio::fs; -use tokio::time::{Duration, sleep}; +use tokio_util::sync::CancellationToken; use tonic::{ transport::{ Certificate, @@ -42,18 +41,18 @@ use crate::util::error::Result; pub struct DataServer { server_config: Arc>, - signal: Arc, + cancel_token: CancellationToken, server_identity: Option, ca_cert: Option, } impl DataServer { - pub async fn new(server_config: Arc>, signal: Arc) -> Result { + pub async fn new(server_config: Arc>, cancel_token: CancellationToken) -> Result { let database = server_config.read()?.get_table("database")?; create_pool(&database).await?; let mut server = DataServer { server_config, - signal, + cancel_token, server_identity: None, ca_cert: None, }; @@ -86,10 +85,14 @@ impl DataServer { } async fn shutdown_signal(&self) { - while !self.signal.load(Ordering::Relaxed) { - sleep(Duration::from_secs(1)).await; + loop { + tokio::select! { + _ = self.cancel_token.cancelled() => { + info!("cancel token received, will quit data server"); + break; + } + } } - info!("quit signal received...") } pub async fn run(&self) -> Result<()> { @@ -116,8 +119,8 @@ impl DataServer { let token_repo = TokenRepository::new(get_db_pool()?); let user_service = DBUserService::new(user_repo, token_repo, self.server_config.clone())?; - key_service.start_loop(self.signal.clone())?; - user_service.start_loop(self.signal.clone())?; + key_service.start_loop(self.cancel_token.clone())?; + user_service.start_loop(self.cancel_token.clone())?; if let Some(identity) = self.server_identity.clone() { server .tls_config(ServerTlsConfig::new().identity(identity).client_ca_root(self.ca_cert.clone().unwrap()))? diff --git a/src/util/config.rs b/src/util/config.rs index dd7605e..b703c04 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -16,12 +16,12 @@ use crate::util::error::Result; use config::{Config, File, FileFormat}; -use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher, Error}; use std::path::Path; -use std::sync::mpsc::channel; -use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, RwLock}; -use std::thread; +use tokio::sync::mpsc; +use std::sync::{Arc, RwLock}; use std::time::Duration; +use tokio_util::sync::CancellationToken; pub struct ServerConfig { pub config: Arc>, @@ -41,38 +41,42 @@ impl ServerConfig { path, } } - pub fn watch(&self, signal: Arc) -> Result<()> { - let (tx, rx) = channel(); + pub fn watch(&self, cancel_token: CancellationToken) -> Result<()> { + let (tx, mut rx) = mpsc::channel(10); let watch_file = self.path.clone(); let config = self.config.clone(); - let mut watcher: RecommendedWatcher = Watcher::new( - tx, + let mut watcher: RecommendedWatcher = RecommendedWatcher::new( + move |result: std::result::Result| { + tx.blocking_send(result).expect("Failed to send event"); + }, notify::Config::default().with_poll_interval(Duration::from_secs(5)), ) .expect("configure file watch failed to setup"); - thread::spawn(move || { - watcher - .watch(Path::new(watch_file.as_str()), RecursiveMode::NonRecursive) - .expect("failed to watch configuration file"); - //TODO: handle signal correctly - while !signal.load(Ordering::Relaxed) { - match rx.recv() { - Ok(Ok(Event { - kind: notify::event::EventKind::Modify(_), - .. - })) => { - info!("server configuration changed ..."); - config - .write() - .unwrap() - .refresh() - .expect("failed to write configuration file"); + watcher + .watch(Path::new(watch_file.as_str()), RecursiveMode::NonRecursive) + .expect("failed to watch configuration file"); + tokio::spawn(async move { + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + info!("cancel token received, will quit configuration watcher"); + break; + } + event = rx.recv() => { + match event { + Some(Ok(Event { + kind: notify::event::EventKind::Modify(_), + .. + })) => { + info!("server configuration changed ..."); + config.write().unwrap().refresh().expect("failed to write configuration file"); + } + Some(Err(e)) => error!("watch error: {:?}", e), + _ => {} + } } - Err(e) => error!("watch error: {:?}", e), - _ => {} } } - info!("signal received, will quit"); }); Ok(()) } -- Gitee