# dataops **Repository Path**: dlfdd/dataops ## Basic Information - **Project Name**: dataops - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-08-03 - **Last Updated**: 2021-09-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # DataOps 相关介绍 ## 项目背景 这个项目主要是为了能够使用可视化的方法对数据进行处理。 在使用代码处理一组数据的过程中,代码/整个流程 可以切分为几个步骤。有的步骤只能作为处理此类数据或特定数据的步骤,而有的步骤在其他种类的数据当中也可以使用。此时我们将这种步骤提取出来包装称为一个模块。集成到一个系统当中去。在系统中上传需要处理的数据并通过流程图的方式画出数据处理需要的步骤以及顺序等,便可以可视化的处理一个数据。同时系统也能够对步骤(后称为计算模块)进行扩张。当开发出一个新的模块之后,模块能够在不影响系统正常使用的情况下集成进系统当中。系统则是对模块的调度和数据的维护。 ## 主要实现流程 ​ 在前端绘制流程图并上传数据文件(目前只做了csv)后面则不限制文件类型。后端获取到流程图之后把流程图封装成一个task实体(调度端进行调度的基本单元。task实体包括流程图的基本信息和前端上传csv文件之后,csv文件里面的数据。封装完task之后把task添加到kafka队列里面去。kafka消费端收到task之后便对task内的节点(流程图里面的一个流程称之为一个节点)队列进行遍历,并调用。 ​ 具体调用过程:首先根据节点里面的信息获取到调用的url,获取到url之后把数据文件的内容通过http的方式调用python计算端的计算方法,计算方法获取到数据之后遍历所有的数据内容,找到当前模块需要操作的数据并进行处理,处理完成之后封装为一个数据模块返回。调度端收到返回信息之后拿到计算端返回的信息对调度端存的数据进行更新。更新完成之后再对下一个节点进行调度。直到所有节点调度完成之后把数据保存为一个json文件并通过QQ消息(暂时)的形式返回给用户。 ## 目前使用到的技术以及架构 + 前端: + 技术:Vue2+flowchart(进行流程图绘制) + 主要工作: 进行流程图绘制,并在绘制完成并提交时对流程图进行拓扑排序并上传服务器。上传数据文件到服务器。 + 完成程度: + 完成了流程图的绘制、拓扑排序、数据文件上传。 + ⚠️在流程图的绘制中,流程图节点的编辑。节点处理,节点信息完善等等的一些逻辑暂时没做。(前端就做了个小demo) + 目前需要做的: + 能够修改单个节点的一些数据格式信息,查看单个节点的数据返回信息,单个节点操作数据的一些信息等等。暂时不紧急 + 后台: + 技术:SpringCloud (暂时想用springcloud alibaba)+kafka + 完成工作: + 拿到前端传过来的节点信息和数据信息之后封装为一个task实体,并添加到task队列(mq) + 对task队列里的task实体进行调度,调度完成之后返回结果 + 目前需要做的: + 在运行过程中的异常处理没做。目前只能够跑happy path,出异常了得翻日志。 + 在调度的时候使用的是http的方式,python计算端开了一个web服务进行调用。但是,在实际的处理过程中计算的时间并不是只有几秒,http一直长连接的方式不知道是不是得体? + 目前只支持上传csv格式的数据,后期需要加入数据库,txt,excel等等格式的数据,但是下层的调度方式不能因此改变,所以需要一个adapter对不同来源不同格式的数据进行统一的转换。 + 目前数据传输的形式是一次性把数据全部读取出来,再封装到task传到调度端,调度端再把数据全部发送给单个模块,单个模块只挑选其用得到的进行处理并返回。对这个方式的改进思路是。 + 在计算端集成模块的时候每个模块就申明自己处理的数据的desc(也就是数据的id),调度端根据计算端的desc只发送计算端要处理的数据。计算端也只返回处理之后的数据。 + 数据全部读取到内存再进行传输的方式消耗太多资源,后期能否只传元数据的方式(元数据定义为数据文件的信息,包括路径、大小等等)。但是这种方式又存在很多问题。因为微服务的每一个服务都是使用docker部署的。上传的数据文件在一个容器里面,没办法给其他端共享所以目前又只能够全部读取进行传输。 + ⚠️ springcloud 边看边搞的,maven的依赖管理这一个块并不是很好。整个代码结构那些并不是很好。 必要的话可重构 + ‼️ Kafka现在用的是dockerhub上别人搭建好了的,传数据那些很容易就超过了默认大小,需要调整配置参数等等。 + 为了实现计算模块的热插拔,Java端需要写一个“模块的注册中心”然后每一个模块都变成一个单独的web服务。每个模块直接单独部署到docker,部署成功之后计算端的web服务向注册中心进行注册并报告自己的计算节点的信息(端口、调用地址、计算信息等等的信息),心跳机制的方式保持健康状态的通知。注册中心维护一张计算节点表。前端获取计算节点信息就直接去注册中心获取。这样计算模块的添加就不会重启所有的pythonweb服务。实现热插拔。 + 消息调度可以用kafka消息队列的方式替换掉http请求的方式?(暂时的不成熟的想法) + python计算端: + 技术:fast api ⚠️ 不知道这个框架效果怎么样,没有经过过实际检验。有好的可以换。 + 完成工作: + 添加装饰器和结构的微调,添加模块的时候只需要新建一个文件见在文件夹里面修改即可,不需要改动其他文件。 + 可以简单的集成一些计算模块。 + 目前需要做的: + 不同的计算模块需要不同的数据结构。 + ‼️ 和杨箫集成的过程中发现大多数据都是dataFrame的形式传输的但是直接封装dataFrame返回要报错,所以转换了一下格式。因此调度端需要定义一个结果返回的**标准和规范**。并且格式转换需要封装到装饰器里并完成**adapter**的编写对不同格式的数据进行转换。以便模块之间的数据能够接得上。(一个模块的输出是另一个模块的输入) + 完成上述之后为了实现模块的热插拔需要写心跳机制的一些东西等等。然后简化模块的集成步骤。