# scheduler **Repository Path**: AthenaCrafter/scheduler ## Basic Information - **Project Name**: scheduler - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-12-24 - **Last Updated**: 2026-01-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Scheduler A lightweight C++ task scheduler for executing dependent task workflows (DAG - Directed Acyclic Graph). ## Features - **DAG-based Task Scheduling**: Automatically executes tasks based on their dependencies - **Memory Pool Management**: Pre-allocated object pools to reduce memory fragmentation - **Thread Pool**: Efficient concurrent task execution - **Multi-round Pipeline Support**: Submit multiple workflow instances asynchronously - **Type-safe Context**: Safe data sharing between tasks - **Task Registration**: Factory pattern for dynamic task creation ## Architecture ``` scheduler/ ├── include/scheduler/ │ ├── util/ # Memory pool, thread pool, synchronization primitives │ └── workflow/ # Scheduler, tasks, context, DAG traversal ├── src/ │ ├── util/ # Utility implementations │ └── workflow/ # Core scheduler implementations └── examples/ # Usage examples ``` ### Core Components - **BaseScheduler**: Main scheduler managing DAG execution, thread pool, and memory pool - **BaseTask**: Task base class defining inputs, outputs, and dependencies - **Context**: Shared data storage between tasks - **MemoryPool**: Manages object pools by type or key - **ThreadPool**: Concurrent task execution - **TaskRegister**: Task factory for dynamic instantiation ## Building Requirements - C++17 or later - CMake 3.10+ - pthread library ```bash mkdir build && cd build cmake .. make ``` ## Usage ### Basic Example ```cpp #include "scheduler/workflow/base_scheduler.h" #include "scheduler/workflow/task_register.h" #include "scheduler/util/define.h" // Define your task class MyTask : public scheduler::workflow::BaseTask { public: void InitTask(const scheduler::workflow::TaskConfig& conf) override {} void ExecTask() override { // Task logic here auto output = context_->BorrowAllocatedObjectByKey("output_key"); *output = 42; context_->SetValue("output_key", output); } }; int main() { // Register task REGISTER_TASK(MyTask); // Configure scheduler scheduler::workflow::SchedulerConfig conf; conf.threadpool_thread_cnt = 4; conf.threadpool_task_cnt = 10; conf.max_parellel_session_cnt = 20; conf.objectpool_wait_timeout_ms = 10; // Add task configurations conf.task_confs.push_back({ "task1", "MyTask", {"input_key"}, {"output_key"}, {"dep_task"} }); // Initialize and run auto scheduler = scheduler::workflow::BaseScheduler::Create(); scheduler->Init(conf); // Launch DAG auto join_handler = scheduler->LaunchDAG(); join_handler.Join(); // Get results auto context = scheduler->GetSessionContext(join_handler.GetSessionId()).lock(); auto result = context->GetValue("output_key"); scheduler->Stop(); return 0; } ``` ### Running the Sample ```bash ./build/bin/examples/sample ``` The sample demonstrates a price calculation workflow: ``` MockRequestTask → UserDBTask ─┐ PriceCounterTask MockRequestTask → ProductDBTask ─┘ ``` ## Configuration ### SchedulerConfig - `threadpool_thread_cnt`: Number of worker threads - `threadpool_task_cnt`: Task queue capacity - `max_parellel_session_cnt`: Maximum concurrent DAG instances - `objectpool_wait_timeout_ms`: Object pool timeout ### TaskConfig - `name`: Task instance name - `class_name`: Registered task class name - `input_data_keys`: Input data keys - `output_data_keys`: Output data keys - `task_depends`: Dependent task names ## Design Highlights 1. **Memory Efficiency**: Object pools prevent frequent allocations 2. **Thread Safety**: All shared resources are properly synchronized 3. **Dependency Resolution**: Automatic topological execution 4. **Async Support**: Non-blocking DAG submission with join handlers 5. **Extensibility**: Easy to add new task types