diff --git a/monorepo-buck2/taurus/Cargo.toml b/monorepo-buck2/taurus/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..5eb8d0725f4a96ef12d959837f6c08116c692486 --- /dev/null +++ b/monorepo-buck2/taurus/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "taurus" +version = "0.1.0" +edition = "2021" + +[lib] +name = "taurus" +path = "src/lib.rs" + +[dependencies] +common = { workspace = true } +jupiter = { workspace = true } +callisto = { workspace = true } + +axum = { workspace = true } +async-trait = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"]} +tracing = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } +crossbeam-channel = "0.5.10" diff --git a/monorepo-buck2/taurus/README.md b/monorepo-buck2/taurus/README.md new file mode 100644 index 0000000000000000000000000000000000000000..2e060d070936fb1758ed3fa21025506fdc0b9eb2 --- /dev/null +++ b/monorepo-buck2/taurus/README.md @@ -0,0 +1,21 @@ +# Message Queue Module + +## Intro +This module offers mega the ability to send and handle specific events. + +After sending the event you created into a global message queue, it will be received in a handler thread and run the callback function defined in trait `EventBase`. + +The events would also be asynchronously flush into database for further investigation. + +## New Customized Event + +If you want to make a new event type and use it in other modules, you should do as follows. + +- Create a event struct which implements trait `EventBase`. +- Give your new event a way to enqueue itself into the message queue. +- Add a enum variation in `EventType` which is defined in `src/event/mod.rs`. +- Fill the missing match arms in `src/event/mod.rs` and `src/queue.rs`. +- Import and use it. + +> Tips: +> Check `src/event/api_request.rs` for a brief example. diff --git a/monorepo-buck2/taurus/src/cache.rs b/monorepo-buck2/taurus/src/cache.rs new file mode 100644 index 0000000000000000000000000000000000000000..cfbb1cf2215bffc9f57485d7dd651b94c887a754 --- /dev/null +++ b/monorepo-buck2/taurus/src/cache.rs @@ -0,0 +1,98 @@ +use std::{mem::swap, sync::{atomic::{AtomicBool, AtomicI64}, Arc, Mutex, OnceLock}, time::Duration}; + +use chrono::Utc; + +use crate::{event::Message, queue::{get_mq, MessageQueue}}; + +const FLUSH_INTERVAL: u64 = 10; + +// Lazy initialized static MessageCache instance. +pub fn get_mcache() -> &'static MessageCache { + static MC: OnceLock = OnceLock::new(); + MC.get_or_init(|| { + let mc = MessageCache::new(); + mc.start(); + + mc + }) +} + +// Automatically flush message cache into database +// eveny 10 seconds or 1024 message. +pub struct MessageCache { + inner: Arc>>, + bound_mq: &'static MessageQueue, + last_flush: Arc, + stop: Arc, +} + +impl MessageCache { + fn new() -> Self { + let now: chrono::DateTime = Utc::now(); + + MessageCache { + inner: Arc::new(Mutex::new(Vec::new())), + bound_mq: get_mq(), + last_flush: Arc::new(AtomicI64::new(now.timestamp_millis())), + stop: Arc::new(AtomicBool::new(false)) + } + } + + fn start(&self) { + let stop = self.stop.clone(); + tokio::spawn(async move { + loop { + if stop.load(std::sync::atomic::Ordering::Acquire) { + return + } + tokio::time::sleep(Duration::from_secs(FLUSH_INTERVAL)).await; + + instant_flush().await; + } + }); + } + + fn get_cache(&self) -> Vec { + let mut res = Vec::new(); + let inner = self.inner.clone(); + + let mut locked = inner.lock().unwrap(); + if locked.len() != 0 { + swap(locked.as_mut(), &mut res); + } + + res + } + + pub(crate) async fn add(&self, msg: Message) -> &Self { + let inner = self.inner.clone(); + let should_flush: bool; + { + let mut locked = inner.lock().unwrap(); + let l = locked.len(); + should_flush = l >= 1; + locked.push(msg); + } + + if should_flush { + instant_flush().await + } + + self + } +} + +pub async fn instant_flush() { + use callisto::mq_storage::Model; + + let mc = get_mcache(); + let st = mc.bound_mq.context.services.mq_storage.clone(); + let data = mc + .get_cache() + .into_iter().map(Into::::into) + .collect::>(); + st.save_messages(data).await; + + let now = Utc::now(); + mc.last_flush.to_owned().store(now.timestamp_millis(), std::sync::atomic::Ordering::Relaxed); +} diff --git a/monorepo-buck2/taurus/src/event/api_request.rs b/monorepo-buck2/taurus/src/event/api_request.rs new file mode 100644 index 0000000000000000000000000000000000000000..b2cd77227fcb5365b86b676f63c42c80240596f9 --- /dev/null +++ b/monorepo-buck2/taurus/src/event/api_request.rs @@ -0,0 +1,99 @@ +use common::config::Config; +use serde::{Deserialize, Serialize}; +use async_trait::async_trait; + +use crate::{event::EventBase, event::EventType, queue::get_mq}; + +/// # Api Request Event +/// +/// This is a example event definition for using message queue. +/// +/// Your customized event should implement `EventBase` trait. +/// Then the event can be wrapped and put into message queue. +/// The message `id` and `create_time` will be attached to your +/// event and then wrapped as a `Message`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ApiRequestEvent { + pub api: ApiType, + pub config: common::config::Config, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ApiType { + // Common Api enum for api_routers + CreateFile, + LastestCommit, + CommitInfo, + TreeInfo, + Blob, + Publish, + + // Merge Api enum for mr_routers + MergeRequest, + MergeDone, + MergeList, + MergeDetail, + MergeFiles, +} + +impl std::fmt::Display for ApiRequestEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Api Request Event: {:?}", self.api) + } +} + +#[async_trait] +impl EventBase for ApiRequestEvent { + async fn process(&self) { + tracing::info!("Handling Api Request event: [{}]", &self); + } +} + +impl ApiRequestEvent { + // Create and enqueue this event. + pub fn notify(api: ApiType, config: &Config) { + get_mq().send(EventType::ApiRequest(ApiRequestEvent { + api, + config: config.clone(), + })); + } +} + +// For storing the data into database. +impl From for serde_json::Value { + fn from(value: ApiRequestEvent) -> Self { + serde_json::to_value(value).unwrap() + } +} + +impl TryFrom for ApiRequestEvent { + type Error = crate::event::Error; + + fn try_from(value: serde_json::Value) -> Result { + let res: ApiRequestEvent = serde_json::from_value(value)?; + Ok(res) + } + +} + +#[cfg(test)] +mod tests { + use super::{ApiRequestEvent, ApiType}; + use common::config::Config; + use serde_json::Value; + + const SER: &str = + r#"{"api":"Blob","config":{"base_dir":"","database":{"db_path":"/tmp/.mega/mega.db","db_type":"sqlite","db_url":"postgres://mega:mega@localhost:5432/mega","max_connection":32,"min_connection":16,"sqlx_logging":false},"lfs":{"enable_split":true,"split_size":1073741824},"log":{"level":"info","log_path":"/tmp/.mega/logs","print_std":true},"monorepo":{"import_dir":"/third-part"},"oauth":{"github_client_id":"","github_client_secret":""},"pack":{"channel_message_size":1000000,"clean_cache_after_decode":true,"pack_decode_cache_path":"/tmp/.mega/cache","pack_decode_mem_size":4},"ssh":{"ssh_key_path":"/tmp/.mega/ssh"},"storage":{"big_obj_threshold":1024,"lfs_obj_local_path":"/tmp/.mega/lfs","obs_access_key":"","obs_endpoint":"https://obs.cn-east-3.myhuaweicloud.com","obs_region":"cn-east-3","obs_secret_key":"","raw_obj_local_path":"/tmp/.mega/objects","raw_obj_storage_type":"LOCAL"},"ztm":{"agent":"127.0.0.1:7777","ca":"127.0.0.1:9999","hub":"127.0.0.1:8888"}}}"#; + + #[test] + fn test_conversion() { + let evt = ApiRequestEvent {api: ApiType::Blob, config: Config::default()}; + + // Convert into value + let serialized: Value = Value::from(evt); + assert_eq!(serialized.to_string().as_str(), SER); + + // Convert from value + let _ = ApiRequestEvent::try_from(serialized).unwrap(); + } +} diff --git a/monorepo-buck2/taurus/src/event/github_webhook.rs b/monorepo-buck2/taurus/src/event/github_webhook.rs new file mode 100644 index 0000000000000000000000000000000000000000..84bbb9fc55dc45b34aac840cc164c3afbe0f62b6 --- /dev/null +++ b/monorepo-buck2/taurus/src/event/github_webhook.rs @@ -0,0 +1,68 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use crate::event::{EventBase, EventType}; +use crate::queue::get_mq; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GithubWebhookEvent { + pub _type: WebhookType, + pub payload: Value, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum WebhookType { + PullRequest, + Issues, + Unknown(String), +} + +impl From<&str> for WebhookType { + fn from(value: &str) -> Self { + match value { + "pull_request" => WebhookType::PullRequest, + "issues" => WebhookType::Issues, + _ => WebhookType::Unknown(value.to_string()), + } + } +} + +impl std::fmt::Display for GithubWebhookEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GitHub Webhook Event: {:?}", self._type) + } +} + +#[async_trait] +impl EventBase for GithubWebhookEvent { + async fn process(&self) { + tracing::info!("Processing: [{}]", &self); + tracing::info!("Payload: {:#?}", &self.payload); + } +} + +impl GithubWebhookEvent { + // Create and enqueue this event. + pub fn notify(_type: WebhookType, payload: Value) { + get_mq().send(EventType::GithubWebhook(GithubWebhookEvent { + _type, + payload, + })); + } +} + +// For storing the data into database. +impl From for Value { + fn from(value: GithubWebhookEvent) -> Self { + serde_json::to_value(value).unwrap() + } +} + +impl TryFrom for GithubWebhookEvent { + type Error = crate::event::Error; + + fn try_from(value: Value) -> Result { + let res: GithubWebhookEvent = serde_json::from_value(value)?; + Ok(res) + } +} \ No newline at end of file diff --git a/monorepo-buck2/taurus/src/event/mod.rs b/monorepo-buck2/taurus/src/event/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..8cb3cf688b7276b6c78306d0eed1a6a77f690b6a --- /dev/null +++ b/monorepo-buck2/taurus/src/event/mod.rs @@ -0,0 +1,126 @@ +use std::fmt::Display; + +use api_request::ApiRequestEvent; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use thiserror::Error; +use github_webhook::GithubWebhookEvent; + +pub mod api_request; +pub mod github_webhook; + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum EventType { + ApiRequest(ApiRequestEvent), + GithubWebhook(GithubWebhookEvent), + + // Reserved + ErrorEvent, +} + +#[derive(Debug, Clone)] +pub struct Message { + pub(crate) id: i64, + pub(crate) create_time: DateTime, + pub(crate) evt: EventType, +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error converting from database")] + MismatchedData(#[from] serde_json::error::Error), +} + +#[async_trait] +pub trait EventBase: + Send + Sync + std::fmt::Display + Into + TryFrom +{ + // defines the callback function for this event. + async fn process(&self); +} + +impl Display for EventType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl EventType { + pub(crate) async fn process(&self) { + match self { + // I can't easily add a trait bound for the enum members, + // so you have to manually add a process logic for your event here. + EventType::ApiRequest(evt) => evt.process().await, + // EventType::SomeOtherEvent(xxx) => xxx.process().await, + + EventType::GithubWebhook(evt) => evt.process().await, + + // This won't happen unless failed to load events from database. + // And that's because of a event conversion error. + // You should recheck yout conversion code logic. + EventType::ErrorEvent => panic!("Got error event"), + } + } +} + +impl Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ID: {}, Created at: {}", + self.id, self.create_time + ) + } +} + +impl From for callisto::mq_storage::Model { + fn from(val: Message) -> Self { + use callisto::mq_storage::Model; + + let category = match val.evt { + EventType::ApiRequest(_) => "ApiRequestEvent".into(), + + #[allow(unreachable_patterns)] + _ => "Unknown".into(), + }; + + let content: Value = match val.evt { + EventType::ApiRequest(evt) => evt.into(), + + #[allow(unreachable_patterns)] + _ => Value::Null, + }; + + Model { + id: val.id, + category, + create_time: val.create_time.naive_utc(), + content: Some(content.to_string()), + } + } +} + +impl From for Message { + fn from(value: callisto::mq_storage::Model) -> Self { + let id = value.id; + let create_time = value.create_time.and_utc(); + let evt = match value.category.as_str() { + "ApiRequestEvent" => { + if let Some(s) = value.content { + let evt = serde_json::from_str(&s).unwrap(); + EventType::ApiRequest(evt) + } else { + EventType::ErrorEvent + } + }, + + _ => EventType::ErrorEvent + }; + + Self { id, create_time, evt } + } +} diff --git a/monorepo-buck2/taurus/src/init.rs b/monorepo-buck2/taurus/src/init.rs new file mode 100644 index 0000000000000000000000000000000000000000..9d7b3cf0a2bcd625156f6892d36a422f85622855 --- /dev/null +++ b/monorepo-buck2/taurus/src/init.rs @@ -0,0 +1,16 @@ +use common::config::Config; +use jupiter::context::Context; +use crate::queue::{MessageQueue, MQ}; + +pub async fn init_mq(config: &Config) { + let ctx = Context::new(config.clone()).await; + let seq = match ctx.services.mq_storage.get_latest_message().await { + Some(model) => model.id + 1, + None => 1, + }; + + let mq = MessageQueue::new(12, seq, ctx); + mq.start(); + + MQ.set(mq).unwrap(); +} diff --git a/monorepo-buck2/taurus/src/lib.rs b/monorepo-buck2/taurus/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..55f636af016b2767598517607ab291328f3d8e97 --- /dev/null +++ b/monorepo-buck2/taurus/src/lib.rs @@ -0,0 +1,4 @@ +pub mod init; +pub mod event; +pub mod queue; +pub mod cache; diff --git a/monorepo-buck2/taurus/src/queue.rs b/monorepo-buck2/taurus/src/queue.rs new file mode 100644 index 0000000000000000000000000000000000000000..2e4149479e322272b2dfd78bf9f70dcee3c3dd70 --- /dev/null +++ b/monorepo-buck2/taurus/src/queue.rs @@ -0,0 +1,90 @@ +use std::fmt::Debug; +use std::sync::atomic::AtomicI64; +use std::sync::{Arc, OnceLock}; + +use chrono::Utc; +use crossbeam_channel::{unbounded, Sender}; +use crossbeam_channel::Receiver; +use jupiter::context::Context; +use tokio::runtime::{Builder, Runtime}; + +use crate::cache::get_mcache; +use crate::event::{Message, EventType}; + +// Lazy initialized static MessageQueue instance. +pub(crate) static MQ: OnceLock = OnceLock::new(); +pub fn get_mq() -> &'static MessageQueue { + MQ.get().unwrap() +} + +pub struct MessageQueue { + sender: Sender, + receiver: Receiver, + // sem: Arc, + runtime: Arc, + cur_id: Arc, + pub(crate) context: Context, +} + +unsafe impl Send for MessageQueue{} +unsafe impl Sync for MessageQueue{} + +impl Debug for MessageQueue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Just ignore context field. + f.debug_struct("MessageQueue").field("sender", &self.sender).field("receiver", &self.receiver).field("runtime", &self.runtime).finish() + } +} + +impl MessageQueue { + // Should be singleton. + pub(crate) fn new(n_workers: usize, seq: i64, ctx: Context) -> Self { + let (s, r) = unbounded::(); + let rt = Builder::new_multi_thread() + .worker_threads(n_workers) + .build() + .unwrap(); + + MessageQueue { + sender: s.to_owned(), + receiver: r.to_owned(), + // sem: Arc::new(Semaphore::new(n_workers)), + runtime: Arc::new(rt), + cur_id: Arc::new(AtomicI64::new(seq)), + context: ctx, + } + } + + pub(crate) fn start(&self) { + let receiver = self.receiver.clone(); + // let sem = self.sem.clone(); + let rt = self.runtime.clone(); + + tokio::spawn(async move { + let mc = get_mcache(); + loop { + match receiver.recv() { + Ok(msg) => { + let stored = msg.clone(); + mc.add(stored).await; + rt.spawn(async move { + msg.evt.process().await; + }); + }, + Err(e) => { + // Should not error here. + panic!("Event Loop Panic: {e}"); + } + } + } + }); + } + + pub(crate) fn send(&self, evt: EventType) { + let _ = self.sender.send(Message { + id: self.cur_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed), + create_time: Utc::now(), + evt + }); + } +}