# portable_parallel **Repository Path**: yikh/portable_parallel ## Basic Information - **Project Name**: portable_parallel - **Description**: 这是一个rust实现的易于使用的并行计算库,目的就是即开即用的进行各种数据的并行计算处理任务。 - **Primary Language**: Rust - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2024-06-19 - **Last Updated**: 2024-06-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # portable_parallel #### 介绍 这是一个rust实现的易于使用的并行计算库,目的就是即开即用的进行各种数据的并行计算处理任务。 #### 软件架构 始于一个非常简单的现实需求,当我同时有大量异步io任务与大量cpu密集处理任务的时候,把他们都揉在一起是不合适的, 这时候如果有一个即开即用的并行库可供使用就好了,项目还处于比较早期的关键技术调研验证阶段。 ##### 现阶段我的需求: 1)数据并行切割处理 2)数据处理任务带有优先级,并且高优先级的永远优先执行 3)数据处理结果可以汇总,并且保持正确的顺序 ##### 初步的想法分这么几个大的部分: 1) 数据的并行切割 2) 切割后的数据以及相关处理共同形成并行任务列表 3) 并行线程池,对并行任务调度执行 4) 并行数据处理结果汇总 5) 一次数据并行提交为一个整体,只要开始处理其中的一部分,就会把这次的处理任务全部做完,然后按序收集结果 ### 性能测试结果 目前纯验证Task/Job调度能力,10个并行线程,180%cpu消耗,大概550W/s 的Job调度能力 ### 其他思考的点 目前数据并行处理结果collect,采用的是unsafe(RefCell>)的方式,每个并行线程把结果直接填入对应index的位置, 这样也保证了对一个Task处理结果的有序性,同时没有并发消耗。 一个Task的Job是否完全处理完成,所有并行线程是依靠一个AtomicUsize来同步的,跟交错进行的处理Job相比,损耗暂可不计。 关于处理结果,目前采用的是Box的方式进行处理结果返回,由于要同时兼容不同的处理参数以及不同的返回结果,此处还不是泛型的方式。 #### ---------------------------------------------------------------- #### 重点学习并行库[rayon](https://github.com/rayon-rs/rayon)的核心机制 ##### WorkerThread之JobRef注册与调度机制 struct WorkerThread{ worker: Worker, //半内部任务队列 fifo: JobFifo, //纯内部fifo任务队列 registry: Arc, //共享的JobRef注册管理结构体 } struct Registry{ sleep: Sleep, //WorkerThread基于CondVar的睡眠与唤醒管理 injected_jobs: Injector, //全局公共的任务队列 } 通过上面两个结构体,再结合tokio的基于窃取的任务调度机制,基本上对rayon的Job任务调度机制能猜个大概了: 每个WorkerThread都有自己的纯内部任务队列,仅供自己调度,也是fifo的 每个WorkerThread都有一个半内部任务队列,供自己以及其他空闲WorkerThread调度或者窃取 全局的injected任务队列,供所有WorkerThread竞争调度 为防止饿死,所有的WorkerThread都需要在固定节拍之后去看一眼全局injected任务队列 任务是可重入的吗?任务会在这几个队列之间跳转吗? 下面通过细读代码来一一证实或者修正 spawn一个JobRef的逻辑 1. 如果是WorkerThread自身调用spawn,则把JobRef放入WorkerThread的Worker任务队列(push操作,WorkerThread半内部任务队列),并且看情况进行WorkerThread线程唤醒 2. 如果是非WorkerThread自身调用spawn,则把JobRef放入Registry的injected_jobs任务队列(inject操作,公共任务队列,任意空闲WorkerThread都可以来进行任务窃取),并且看情况进行WorkerThread线程唤醒 ##### WorkThread工作模式 unsafe fn wait_until_cold(&self, latch: &CoreLatch) { let abort_guard = unwind::AbortIfPanic; let mut idle_state = self.registry.sleep.start_looking(self.index, latch); while !latch.probe() { if let Some(job) = self .take_local_job() .or_else(|| self.steal()) .or_else(|| self.registry.pop_injected_job(self.index)) { self.registry.sleep.work_found(idle_state); self.execute(job); idle_state = self.registry.sleep.start_looking(self.index, latch); } else { self.registry .sleep .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job()) } } self.registry.sleep.work_found(idle_state); self.log(|| ThreadSawLatchSet { worker: self.index, latch_addr: latch.addr(), }); mem::forget(abort_guard); // successful execution, do not abort } 1. 优先从本地worker半内部任务队列里面取任务,否则从其他WorkerThread的半任务队列里面取任务,最后尝试从全局的injected队列里面取任务 2. 如果有成功取到任务,则execute(job) 3. 从其他WorkerThread的半任务队列里面取任务,采用了一种快速伪随机数的方式来选取一个WorkerThread进行任务窃取 ##### 从其他WorkerThread进行任务窃取的代码设计 pub struct Stealer { /// A reference to the inner representation of the queue. inner: Arc>>, /// The flavor of the queue. flavor: Flavor, } struct Inner { /// The front index. front: AtomicIsize, /// The back index. back: AtomicIsize, /// The underlying buffer. buffer: CachePadded>>, } 真正进行窃取动作的代码 pub fn steal(&self) -> Steal { let f = self.inner.front.load(Ordering::Acquire); if epoch::is_pinned() { atomic::fence(Ordering::SeqCst); } let guard = &epoch::pin(); let b = self.inner.back.load(Ordering::Acquire); if b.wrapping_sub(f) <= 0 { return Steal::Empty; } let buffer = self.inner.buffer.load(Ordering::Acquire, guard); let task = unsafe { buffer.deref().read(f) }; if self.inner.buffer.load(Ordering::Acquire, guard) != buffer || self .inner .front .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) .is_err() { return Steal::Retry; } // Return the stolen task. Steal::Success(unsafe { task.assume_init() }) } 基本上就是利用一个AtomicIsize进行CAS操作来窃取任务,跟tokio的窃取机制稍微有点不一样,tokio基于CAS自己取和窃取别人的,操作细节步骤上有细微差异。