# ebatch **Repository Path**: easy16/ebatch ## Basic Information - **Project Name**: ebatch - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-03-19 - **Last Updated**: 2023-03-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ebatch common batch process for java application # 通用的批处理场景 > 业务开发中,经常会遇到需要处理批次任务的场景,如果单个任务处理耗时比较久(文件处理,网络处理,耗时查询等),为了更加快速的 完成批处理任务,我们可以使用多线程来跑批,通常为了保证批次完成后处理后续的流程,会有如下代码结构: ```java class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = ... for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }} ``` > 老爷子为CountDownLatch写的通用使用demo,这种结构相信大部分后端批处理都有使用到,为了避免在多个业务系统总 使用批处理的时候都需要开发一套上面CountDownLatch代码结构,使用spring的ThreadPoolTaskExecutor配合CountDownLatch 开发了一套通用的批处理框架。 > 首先定义抽象接口,批处理的入口参数为List 类型,返回参数EBatchRes封装了一个Map> resMap key为原来List参数在List中的位置,Pair返回批处理IBusiService.doService的入参和返回参数 ```java public interface IBatchExecutor { /** * @description 同步批次处理,所有批次同步执行完成后返回执行结果 * @Date 2020/8/8 12:49 * @param batchList * @param singleService * @return java.util.Map **/ EBatchRes syncBatch(List batchList,IBusiService singleService) throws InterruptedException; /** * @description 同步批次处理,所有批次同步执行完成后返回执行结果 * @Date 2020/8/8 12:49 **/ EBatchRes syncBatchRunnable(List batchList) throws InterruptedException; /** * @description 同步批次处理,所有批次同步执行完成后返回执行结果 * @Date 2020/8/8 12:49 **/ EBatchRes syncBatchCallable(List> batchList) throws InterruptedException; /** * @description 异步执行批次处理,异常不中断,所以任务执行完 * @Date 2020/8/8 12:53 * @param batchList * @param singleService * @return **/ void asyncBatch(List batchList,IBusiService singleService) throws InterruptedException; /** * @description 异步执行批次处理,异常不中断,所以任务执行完 * @Date 2020/8/8 12:49 **/ void asyncBatchRunnable(List batchList) throws InterruptedException; /** * @description 异步执行批次处理,异常不中断,所以任务执行完 * @Date 2020/8/8 12:49 **/ void asyncBatchCallable(List> batchList) throws InterruptedException; } ``` > 批处理的业务处理接口 ```java interface IBusiService{ /** *@description 执行doService方法,调用批处理时需要实现 {@link IBusiService} *@param e 业务执行的入参 *@return R 执行返回结果 **/ R doService(E e); } ``` > 批处理的返回对象 ```java class EBatchRes { //返回执行结果 private Map> resMap; //保存执行异常 中断剩余任务 private volatile Throwable exception; public EBatchRes() { } public EBatchRes(Map> resMap, Throwable exception) { this.resMap = resMap; this.exception = exception; } } ``` > EBatchExecutor对核心方法IBatchExecutor#syncBatch的实现 ```java @Override public EBatchRes syncBatch(List batchList, IBusiService singleService) throws InterruptedException { //partition List> partitions = partition(batchList); //transfer partitions to batch callable List>> partitionCall = Transfers.transferBusiService(partitions,singleService); //submit partition return submit(partitionCall); } ``` 首先根据线程池核心线程数量来对任务分区,使用核心线程数的一半来分区,不让一个批次就占满CPU也让批次任务更具伸缩性 当然实际任务分区应该根据业务场景来决定,这里只提供一种默认思路 ```java /** * @description // 默认使用线程池核心线程数的一半作为分区数量 * @date 2020/8/9 13:10 * @params [batchList] * @return java.util.List> 分区后的结果:返回每个线程需要执行的任务数 **/ private List> partition(List batchList) { // partition for the batchList int coreSize = eBatchThreadPool.getCorePoolSize(); int perSize = (batchList.size() / coreSize) << 1; perSize = (perSize <= 0 ? batchList.size() : perSize); List> partition = Lists.partition(batchList, perSize); logger.info("batch start info : batchListSize【{}】,coreSize【{}】,threadSize【{}】,perThreadSize【{}】", batchList.size(),coreSize,partition.size(),perSize); return partition; } ``` 然后将分区后的参数根据不同的类型转换成批处理接口IBatchCallable为任务提交作准备 ```java public static List>> transferBusiService(List> partitions, IBusiService singleService) { List>> partitionCall = Lists.newArrayListWithCapacity(partitions.size()); AtomicInteger index = new AtomicInteger(); for (List partition : partitions) { List> calls = Lists.newArrayListWithCapacity(partition.size()); partition.forEach((e) -> { calls.add(new EBatchExecutor.AdaptedBusiService<>(singleService,e, index.getAndIncrement())); }); partitionCall.add(calls); } return partitionCall; } //具体接口的实现 /** * Adaptor for busiService */ static final class AdaptedBusiService implements IBatchCallable { final IBusiService busiService; E param; Integer index; AdaptedBusiService(IBusiService busiService,E param,Integer index) { if (busiService == null) throw new NullPointerException(); this.busiService = busiService; this.param = param; this.index = index; } @Override public E param() { return param; } @Override public R call() throws Exception { return busiService.doService(param); } @Override public Integer index() { return index; } } ``` 看核心的提交逻辑,使用ThreadPoolTaskExecutor#submitListenable来提交任务,该任务返回一个可监听的Future对象能够 更加方便对线程池提交任务的结果作处理 ```java /** * @param partition 按分区提交任务到线程池执行 * @param 参数 * @param 返回结果 * @return 返回统一处理实体 * @throws InterruptedException 外部线程中断 */ private EBatchRes submit(List>> partition) throws InterruptedException { final EBatchRes res = new EBatchRes<>(); res.setResMap(Maps.newConcurrentMap()); List> futures = Lists.newArrayList(); for (final List> list : partition) { futures.add(eBatchThreadPool.submitListenable(new SyncBatchCallable( res,list ))); } //handle callback handleCallback(res, futures); return res; } // 核心的同步分区处理接口 /** * @description //同步处理分区 返回分区执行结果 * @date 2020/8/9 13:32 * @params * @return **/ private class SyncBatchCallable implements Callable>{ private EBatchRes res; private List> batchCallables; SyncBatchCallable(EBatchRes res, List> batchCallables) { this.res = res; this.batchCallables = batchCallables; } /** * 遇到单个任务的异常之后,取消同分区任务的后续所有任务 * 发现异常后其它分区任务都取消 **/ @Override public Map call() throws Exception { Map resMap = Maps.newHashMapWithExpectedSize(batchCallables.size()); for (IBatchCallable call : batchCallables) { if (null == res.getException()) { R exeRes = call.call(); resMap.put(call.param(), exeRes); res.getResMap().put(call.index(), Pair.of(call.param(), exeRes)); } else { if(logger.isDebugEnabled()){ logger.debug("current batch has exception,stop next task"); } } } return resMap; } } ``` 对线程池处理结果的回调处理 ```java /** * @description //处理futures回调 * @date 2020/8/9 16:17 * @params [res, futures] * @return void **/ private void handleCallback(EBatchRes res, List> futures) throws InterruptedException { // use to control all the task finish final CountDownLatch countDownLatch = new CountDownLatch(futures.size()); // each partition thread execute itself callback for (ListenableFuture f : futures) { f.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { if(logger.isDebugEnabled()){ logger.debug("batch task execute with exception",ex); } if (null == res.getException()) { res.setException(ex); } countDownLatch.countDown(); } @Override public void onSuccess(Object result) { countDownLatch.countDown(); } }); } countDownLatch.await(); } ```