# ws-task **Repository Path**: jiahaiyongdeveloper_admin/ws-task ## Basic Information - **Project Name**: ws-task - **Description**: 一个分布式任务分发框架核心包,一个内存型的MQ。将`ws-task-core`引入到自己的application,你就可以将自己的普通application转变成一个分布式任务分发application。`ws-task-core`主要提供了Leader节点自动选举,动态增删节点,task数据分发。提供了两种类型的任务,拉数据模式,推数据模式。数据分发支持轮询,随机,HASH。可配置leader是否参与任务处理,可配置节点是否参与leader选举(见下面示例)。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 22 - **Created**: 2023-11-08 - **Last Updated**: 2023-11-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## A Distributed Task Distribution Framework Core > 一个分布式任务分发框架核心包,一个内存型的MQ。将`ws-task-core`引入到自己的application,你就可以将自己的普通application转变成一个分布式任务分发application。`ws-task-core`主要提供了Leader节点自动选举,动态增删节点,task数据分发。提供了两种类型的任务,拉数据模式,推数据模式。数据分发支持轮询,随机,HASH。可配置leader是否参与任务处理,可配置节点是否参与leader选举(见下面示例)。 > ### 主要功能 * 提供了两种类型的任务,拉数据模式,推数据模式; * 数据分发支持集群模式和广播模式;集群:每一条消息只分配到一个节点;广播:每一条消息分配到所有节点; * 数据分发集群模式支持轮询,随机,HASH; * 支持配置Leader是否参与任务处理; * 支持配置Worker节点的数量; * 支持spring-boot-starter方式,使用`@EnableWsTask`开启功能; * 支持节点可视化,打开`/task-ui.html`即可查看任务分配节点状态; ### 示例 ###### Maven: 在pom.xml中引入jar ~~~xml org.ws.task ws-task-spring-web-ui 0.2.0-SNAPSHOT org.ws.task ws-task-spring-starter 0.2.0-SNAPSHOT ~~~ ###### JAVA:在XXXApplication.java内启动`@EnableWsTask` ~~~java // 开启功能 @EnableWsTask @SpringBootApplication public class ExampleApplication { public static void main(String[] args) { log.info("Example Application start"); SpringApplication.run(ExampleApplication.class, args); } } ~~~ ###### YML:在application.yml中配置参数 ~~~yml # 最简配置 ws-task: zookeeper: namespace: @project.artifactId@ address: 127.0.0.1:2181 netty: serverPort: 8258 connectTimeoutMillis: 3000 ~~~ ##### 任务配置 ~~~java // 拉数据任务配置示例 @Configuration public class PullTaskConfig { // 一个100万的uuid任务文件路径 @Value("${uuid-file-path}") private String uuidFilePath; // 配置拉数据任务 @Bean public PullTask uuidPullTask() throws FileNotFoundException { return TaskBuilder.builder("uuidPullTask") .config() .chunk(10) .fetchTimeout(10_000) .sendModel(SendModel.CLUSTERING) .leaderProcess(false) .pull() .provider(uuidItemProvider()) .builder(); } // Leader端任务数据提供者 @Bean public ItemLeaderProvider uuidItemProvider() throws FileNotFoundException { return new FileLineItemLeaderProvider(uuidFilePath); } } // 推数据任务配置 @Slf4j @Configuration public class PushTaskConfig { // 推数据任务 @Bean public PushTask uuidPushTask() { return TaskBuilder.builder("uuidPushTask") .config() .chunk(10) .fetchTimeout(10_000) .sendModel(SendModel.CLUSTERING) .leaderProcess(false) .push() .processor(uuidItemProcessor()) .builder(); } // worker节点任务处理器Bean @Bean public UuidItemProcessor uuidItemProcessor(){ return new UuidItemProcessor(); } // worker节点任务处理器 public static class UuidItemProcessor implements ItemWorkerProcessor{ private AtomicInteger count = new AtomicInteger(0); @Override public void process(Collection> items) throws ItemProcessException { if(CollectionUtils.isNotEmpty(items)){ int i = count.addAndGet(items.size()); if(i/100 % 10 == 0 || i >= 500000){ log.info("process count:{}",i); } } } public int getCount() { return count.get(); } } } // 在任意一个节点执行推送数据 public void push(){ Collection> items = fileLineItemProvider.getItems(); while (CollectionUtils.isNotEmpty(items)){ uuidPushTask.pushSync(items,3_000); items = fileLineItemProvider.getItems(); } } ~~~ ### 分布式管理 采用集中式设计,集群始终仅存在一个leader,worker通过注册中心查询当前leader并发生心跳链接。使用事件驱动的处理方式,对于不同事件执行相应的操作,能够从各个不健康状态自动调整到健康状态。 #### 启动过程 > 启动过程使用事件驱动,根据事件触发不同的启动操作,并能够对节点和通讯组件自己检测并调整到有效状态,事件类型和操作见下表: | 事件 | 说明 | 触发场景 | 触发操作 | | :--------------------- | -------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | BECOME_LEADER | 成为Leader | 本节点竞选为Leader后触发事件 | 启动server;更新本地Leader数据; | | CHANGE_LEADER | 变更Leader | 监听到Leader节点数据发生变更时触发。 | 更新client端leader地址;更新本地Leade数据;检查节点状态; | | REMOVE_LEADER | 删除Leader | 监听到Leader节点数据删除时触发。 | 更新client端leade地址为null;更新本地leader状态为GONE; | | NOT_LEADER | 不是Leader | 本节点没有竞选到Leader触发。 | 更新client端leader地址;更新本地Leade数据;启动client客户端; | | NOT_FOUND_LEADER | 没有找到Leader | 暂无 | 检查节点状态; | | CHECK_LEADER | 检查Leader | 客户端创建链接Leader不是有效状态触发;客户端Channel建立连接连续10次超时触发;客户端连续10次无法建立有效Channel触发; | 检查节点状态; | | UPDATE_WORKER | 更新Worker | zk启动后同步所有worker节点数据时触发;监听到worker节点下数据变更时触发; | 更新本地节点数据;检查节点状态; | | REMOVE_WORKER | 删除Worker | 监听到worker节点删除时触发(worker掉线或宕机); | 删除本地节点数据;Leade节点执行任务resize;检查节点状态; | | SERVER_STARTUP | 服务端已启动 | netty server启动完成后触发; | 更新leader元数据port和状态为有效;更新本地leader数据;更新client端leader地址;启动client端; | | CHANNEL_CONNECT | 已连接 | server端监听到有新channel建立时触发; | 更新成功建立channel的worker本地数据和节点元数据port和状态为ACTIVE; | | CHANNEL_CLOSE | 已断开 | server端监听到channel关闭或异常时触发; | 更新已断开channel的worker本地数据和节点元数据状态为GONE; | | CHANNEL_IDLE | 已超时 | server端监听到channel已超时触发; | 更新已超时channel的worker本地数据和节点元数据状态为RECOVERING; | | CLIENT_STARTUP | 客户端已启动 | client与server第一次成功建立channel时触发; | 更新本worker节点元数据port和状态为ACTIVE;启动客户端心跳; | | CLIENT_CHANNEL_CONNECT | 客户端已连接 | client与server建立channel变更时触发(重新建立channel时); | 更新本worker节点元数据port和状态为ACTIVE;启动客户端心跳;检查节点状态; | | CLIENT_CHANNEL_IDLE | 客户端已超时 | client监听到channel已超时触发; | 更新本worker节点元数据port和状态为RECOVERING;启动客户端心跳; | | UPDATE_TASK_CONFIG | 更新任务配置 | 暂无 | 暂无 | | REMOVE_TASK_CONFIG | 删除任务配置 | 暂无 | 暂无 | ##### 一个启动过程的图 > 从日志中可以看到在启动过程中的Event。 ![image-20201029235213728](assets/README/image-20201029235213728.png) ![image-20201029235040217](assets/README/image-20201029235040217.png) #### 竞选领导者 所有节点启动时都会去参与竞选领导者,当竞选成功更新领导者元数据状态为准备,并启动服务端,服务端启动完成后更新为就绪状态。所有的worker都监听领导者元数据,当元数据变更为就绪,作业节点启动客户端或新建与领导者的连接并保持心跳。当领导者掉线或宕机,会执行新的领导者选举,所有的节点更新状态为恢复。新的领导者选举成功后,启动服务端并更新为就绪状态,作业节点监听到变更后,建立新的链接,集群恢复健康。 #### 新增节点 向注册中心注册节点元数据,所有的worker加入集群后自己维持与leader的心跳。leader监听节点元数据新增到管理列表,初期状态为准备,当启动客户端并与leader成功建立连接后更新为就绪状态。 #### 移除节点 领导者监听到节点链接关闭,更新节点状态为丢失,若节点恢复再次建立连接,更新状态为有效。若节点掉线或有重启,通过监听到节点移除,移除本地节点元数据。若节点心跳超时,更新节点状态为恢复,同时节点客户端会检查心跳连接,若连接失效会清除当前连接并重新建立,并更新节点数据。服务端接收到连接后,更新节点状态。节点状态恢复健康。 ### 远程通信 分成两端,服务端和客户端。当竞选成为领导者后会启动服务端等待连接。客户端启动后会保持与服务端的心跳以保证自己状态健康。 服务端和客户端都支持同步,异步,单向请求。 #### 协议设计 请求和响应消息为请求消息和响应消息结构类型,通过类型标示是请求还是响应结构,在编解码时可通过标示位进行识别消息类型。消息主体类型根据不同的场景和请求分别为不同的协议结构,通过编码成二进制流进行传输,同时使用主体类型属性识别类型二进制流进行解码。 通信消息和编码设计为: #### MessageEncoder image-20201025113242375 ##### RequestMessage | Message字段 | 类型 | 说明 | | ----------- | ------ | ------------------------------------------------ | | id | int | 请求ID,响应ID与请求一一对应 | | action | int | 请求Action,根据不同的Action执行不同的处理。 | | body | byte[] | 请求的主体信息,将主体信息的对象编码成byte[]传输 | | bodyClass | Class | 请求主体信息的类型,通过该类型对body进行解码 | ##### ResponseMessage | Message字段 | 类型 | 说明 | | ----------- | ------- | ---------------------------------------------- | | id | int | 直接复制请求ID,通过ID找到该请对应响应的处理。 | | status | int | 响应状态 | | success | boolean | 响应成功标示 | | message | String | 响应成功或失败消息 | | body | byte[] | 响应主体信息,将主体信息的对象编码成byte[]传输 | | bodyClass | Class | 响应主体信息的类型,通过该类型对body进行解码 | ##### RequestAction说明 | Action | Code | 说明 | | ------------- | ---- | ------------------------------- | | PULL_TASK | 0 | Worker拉取任务请求,Leader使用`PullItemProviderHandler`处理。 | | PUSH_TASK | 1 | 1.客户端推送数据到Leader,Leader使用`PushItemLeaderHandler`接收数据并执行分发推送;2.服务端推送数据到Worker,Worker使用`PushItemWorkerHandler`接收数据并执行。 | | SYNC_DATA | 2 | 节点相互同步备份任务数据以防止数据丢失和容错(计划实现)。 | | META_DATA | 3 | 节点相互同步备份元数据,与Zookeeper丢失连接仍可继续(计划实现)。 | | COMMIT_STATUS | 4 | Worker完后任务后提交任务请求,Leader使用`CommitStatusHandler`处理。 | | HEARTBEAT | 5 | Worker发送心跳请求,单向请求Leader无需处理。 | ##### RequestAction对应请求和相同Message的body结构说明 ###### PULL_TASK * 请求Body:`PullItemRequest` | 字段 | 类型 | 说明 | | ---------- | ------- | ---------------------- | | taskName | String | 请求拉取数据的任务名称 | | size | Integer | 拉取的数量 | | nodeId | String | 请求的Worker节点id | | timeMillis | Long | 请求时间戳 | * 响应Body:`PullItemResponse` | 字段 | 类型 | 说明 | | -------- | ---------------------- | -------------------- | | taskName | String | 响应数据任务名称 | | index | Integer | 响应数据开始任务索引 | | items | Collection\ | 响应的任务集合 | * 任务项`TaskItem`结构说明 | 字段 | 类型 | 说明 | | ----------- | ---------- | ------------------------------------------------------------ | | id | long | 任务唯一id,自动生成 | | status | TaskStatus | 任务状态:READY,DISTRIBUTED,PROCESSING,SUCCESS,FAILURE,COMMIT_FAILURE | | failedCount | int | 失败次数 | | data | Object | 任务业务数据 | ###### PUSH_TASK * 请求Body:`PushItemRequest` | 字段 | 类型 | 说明 | | -------- | ---------------------- | ------------------ | | taskName | String | 推送的任务名称 | | items | Collection\ | 推送的任务数据集合 | * 响应Body:`PushItemResponse` | 字段 | 类型 | 说明 | | ------- | ------------------ | ------------------ | | itemIds | Collection\ | 接收到的任务id集合 | | success | Boolean | 是否成功 | | message | String | 失败消息 | ###### COMMIT_STATUS * 请求body:`CommitRequest` | 字段 | 类型 | 说明 | | -------- | ---------------- | ------------------ | | taskName | String | 提交数据的任务名称 | | items | List\ | 提交的任务列表 | | nodeId | String | 请求的Worker节点id | * 响应Body:`CommitResponse` | 字段 | 类型 | 说明 | | ------- | -------------------- | -------------- | | results | List\ | 提交的处理结果 | * 提交结果`CommitResult`结构说明 | 字段 | 类型 | 说明 | | ------- | ------- | -------- | | itemId | Long | 任务ID | | success | Boolean | 是否成功 | | message | String | 失败消息 | ### 元数据说明 > 使用Zookeeper进行节点协同和元数据同步。元数据结构如下 ##### /ws-task/{namespace}/leader-data节点数据 ~~~json // leader-data节点存储的数据,解码后元数据示例(实际存储的为编码后的byte[]) { "active":true, // 是否有效 "address":"127.0.0.1:8258", // 地址 "host":"127.0.0.1", // host "id":"03e6135e-7550-4594-8e2c-fbb99b5d25a9", // 节点ID "leader":true, // 是否为leader "port":8258, // server绑定端口 "role":"LEADER", // 角色 "status":"ACTIVE", // 节点状态 "updateTime":1604232336496 // 更新时间 } ~~~ ##### /ws-task/{namespace}/workers/node/{worker-1}节点数据 ~~~json // {worker-1}节点存储的数据,解码后元数据示例(实际存储的为编码后的byte[]) { "active":true, // 是否有效 "address":"127.0.0.1:1251", // 地址 "host":"127.0.0.1", // host "id":"03e6135e-7550-4594-8e2c-fbb99b5d25a9", // 节点ID "leader":false, // 是否为leader "port":1251, // 与server的通讯端口 "role":"WORKER", // 角色 "status":"ACTIVE", // 节点状态 "updateTime":1604232336627 // 更新时间 } ~~~ ### Example说明 ##### ws-task-example-batch > model路径/ws-task-example/ws-task-example-batch > > 在SpringBatch中使用拉数据模式获取任务数据并进行处理。 > > 若没有任务文件请执行test下的UUIDTest生成。 ###### 如何运行 * 启动本地zookeeper,端口为2181,可编辑`application.yml`中`ws-task.zookeeper.address`修改成指定的地址。 * 运行test目录下`org.ws.task.example.batch.UUIDTest`,其中FILE_MAX_LINE可以修改,最大不要超过5000万(1.9G),否则文件size会超出限制。 * 文件的生成和读取都是采用nio方式,可以随意生成或读取不超出文件size限制的任意大小文件。 * 启动 * 单节点启动:可以直接启动spring-boot:run,默认端口为8888 * 多节点启动 * 进入目录: `cd ws-task-example/ws-task-example-batch`; * 执行`mvn package`,若执行`clean`可能会将测试文件删除导致打包错误; * 进入`ws-task-example-batch/bin`,可以执行startNodeX.bat或startNodeX.sh,运行几个节点。 * 运行 * 直接请求地址如:http://127.0.0.1:8888/run执行拉取任务; * 若存在多个节点,执行Leader的run不会处理任务,执行worker的run会执行拉取并处理; * 若多节点下,只执行一个worker则只会执行分配到该节点的任务,其他节点任务在leader的内存中,一直到其他节点执行拉取处理。 ###### 下图为执行一个100万的uuid数据拉取处理大约10s > 采用一个Leader节点负责从文件中读取任务数据并分发,Leader不处理任务。两个worker节点处理分配的任务。 ![image-20201031174009610](assets/README/image-20201031174009610.png) ![](assets/README/pull-batch.gif) ##### ws-task-example-mq > model路径/ws-task-example/ws-task-example-mq > > 在推数据模式下Leader获取任意节点推送的任务数据并负责分发推送到worker进行处理,类似一个内存型的MQ。 > > 若没有任务文件请执行test下的UUIDTest生成。 ###### 如何运行 * 启动本地zookeeper,端口为2181,可编辑`application.yml`中`ws-task.zookeeper.address`修改成指定的地址。 * 运行test目录下`org.ws.task.example.mq.UUIDTest`,其中FILE_MAX_LINE可以修改,最大不要超过5000万(1.9G),否则文件size会超出限制。 * 文件的生成和读取都是采用nio方式,可以随意生成或读取不超出文件size限制的任意大小文件。 * 启动 * 单节点启动:可以直接启动spring-boot:run,默认端口为8888 * 多节点启动 * 进入目录: `cd ws-task-example/ws-task-example-mq`; * 执行`mvn package`,若执行`clean`可能会将测试文件删除导致打包错误; * 进入`ws-task-example-mq/bin`,可以执行startNodeX.bat或startNodeX.sh,运行几个节点。 * 运行 * 直接请求地址如:http://127.0.0.1:8888/run执行推送并处理任务; * 若存在多个节点,执行任意节点的run都会将任务推送到leader并分发推送到所有分配的worker节点执行任务处理; ###### 下图为执行一个100万的uuid数据推数据处理大约10s > 采用一个Leader节点负责从接收需要分发的数据并进行分发任务数据,Leader不处理任务。两个worker节点轮询分配任务。 ![image-20201031175019857](assets/README/image-20201031175019857.png) ![](assets/README/push-mq-2.gif) **PS:每一个任务数据在执行过程中只有提交成功才算是执行完成,若出现提交超时或其他错误,将再次分发重试,以确保任务执行成功。所以任务至少被执行一次,若执行失败为提交会多次重试,最多重试3次。**