From 98c960e51921c26f3daeca76d23fb6241e9f9ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=8F=E6=9E=97?= Date: Thu, 21 Jan 2021 15:09:04 +0800 Subject: [PATCH] =?UTF-8?q?add=20=E5=BC=80=E6=BA=90=E6=95=85=E4=BA=8B/link?= =?UTF-8?q?is=E4=B8=8ESQL=E4=B8=AD=E9=97=B4=E4=BB=B6=EF=BC=88=E8=B7=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=BA=90=E6=B7=B7=E6=9F=A5=EF=BC=89=E7=BB=93?= =?UTF-8?q?=E5=90=88=E5=AE=9E=E8=B7=B5=E5=88=86=E4=BA=AB.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...\236\350\267\265\345\210\206\344\272\253 " | 309 ++++++++++++++++++ 1 file changed, 309 insertions(+) create mode 100644 "\345\274\200\346\272\220\346\225\205\344\272\213/linkis\344\270\216SQL\344\270\255\351\227\264\344\273\266\357\274\210\350\267\250\346\225\260\346\215\256\346\272\220\346\267\267\346\237\245\357\274\211\347\273\223\345\220\210\345\256\236\350\267\265\345\210\206\344\272\253 " diff --git "a/\345\274\200\346\272\220\346\225\205\344\272\213/linkis\344\270\216SQL\344\270\255\351\227\264\344\273\266\357\274\210\350\267\250\346\225\260\346\215\256\346\272\220\346\267\267\346\237\245\357\274\211\347\273\223\345\220\210\345\256\236\350\267\265\345\210\206\344\272\253 " "b/\345\274\200\346\272\220\346\225\205\344\272\213/linkis\344\270\216SQL\344\270\255\351\227\264\344\273\266\357\274\210\350\267\250\346\225\260\346\215\256\346\272\220\346\267\267\346\237\245\357\274\211\347\273\223\345\220\210\345\256\236\350\267\265\345\210\206\344\272\253 " new file mode 100644 index 0000000..2950caa --- /dev/null +++ "b/\345\274\200\346\272\220\346\225\205\344\272\213/linkis\344\270\216SQL\344\270\255\351\227\264\344\273\266\357\274\210\350\267\250\346\225\260\346\215\256\346\272\220\346\267\267\346\237\245\357\274\211\347\273\223\345\220\210\345\256\236\350\267\265\345\210\206\344\272\253 " @@ -0,0 +1,309 @@ +# 标题:linkis与SQL中间件(跨数据源混查)结合实践分享 + + +## 背景需求 + +-**业务需求:** + +在公司有很多运营、数据分析的童鞋,虽然自有的BI产品功能丰富,有各式各样的定制化分析报表、各种维度的图表。 +但是有时需要临时查询数据,发现数据是分布在不同数据源里的,也有可能数据来自不同业务的不同集群。 +比如查询hive表的数据,但是维度映射数据在mysql里。还有要两个mysql数据库(不同服务器)关联查询等情况, +平时这些需求都需要程序猿大神们写程序实现。 + +现在可以通过Linkis与SQL中间件完美结合满足上述需求。难道不香吗? + + +-**技术迭代:** + +之前使用的Apache Livy作为SQL执行入口,把查询的请求都提交给Livy,但是体验上不尽人意。 +一直想找个替代组件, 后来发现功能强大的linkis。至于Linkis跟Apache Livy的对比,可以查看官方相关文档。 + + +## SQL中间件介绍: + +SQL中间件是基于公司目前开源的XSQL和Quicksql两款SQL中间件,两种都支持跨数据源混查,两个都很优秀, +至于大家选择集成哪个可以根据自身情况决定。因为之前使用过XSQL,所以是在linkis增加了xsql查询引擎。 + +以下分别简单介绍下两款开源组件: + +**XSQL:** + +XSQL是一款低门槛、更稳定的分布式查询引擎。它允许你快速、近实时地查询大量数据。对于一些数据源(例如:Elasticsearch、MongoDB、Druid等),他可以大幅地降低学习曲线,并节省人力成本。除Hive外,每种数据源都支持除子查询外的下推执行优化。用户有时希望将位于不同数据源上的数据关联起来进行查询,但是由于各种数据源是异构的且一些数据源不支持SQL或者支持的SQL语法非常有限,因此传统互联网公司的做法是,将不同的数据同步到统一的存储介质中,再进行OLAP的查询。数据同步的过程中可能面临数据迁移、主从同步、网络带宽等诸多困难和挑战,而且需要浪费大量的人力、物力及时间,无法满足大数据产品当前阶段对于近实时甚至准实时的场景。通过XSQL你将可以避免数据迁移和时间浪费,更加专注于业务本身。XSQL可以通过下推、并行计算、迭代计算等底层支撑技术,对各种数据源的查询加速。 + + +功能特性: +- 内置8种数据源,包括:Hive、Mysql、EleasticSearch、Mongo、Kafka、Hbase、Redis、Druid等。 +- XSQL采用数据源(DataSource)、数据库(Database)、数据表(Table)的三层元信息,为异构数据源提供了统一视图,进而实现了跨数据源的数据关联 +- SQL Everything,将程序与数据源具体版本解耦,程序迁移能力得到加强 +- 对DDL、DML、可下推查询,延迟与Yarn的交互及资源申请,进而提升效率并节省资源。 +- 相比很多开源分布式查询引擎,XSQL替换了Spark SQL,因而只需要一次SQL解析,避免多次解析带来的时延。 +- XSQL允许用户将聚合、过滤、投影等操作下推至数据源计算引擎,相比DataSet API更容易实现毫秒级响应。 +- XSQL借鉴了业内优秀的开源项目,放弃元数据的中心化,因此避免了数据同步、数据不一致,数据延迟等不利因素。XSQL也因此在部署上更加轻量、简便。 +- XSQL对元数据的缓存有两种级别,既能减少对底层数据源的压力,也提升了XSQL的执行效率。 +- XSQL可以按照用户需要,设置元数据白名单来避免缓存多余的元信息,进一步提升执行效率。 +- 可适配到Spark 2.x任意版本,解压即可运行,不需要引入额外依赖。且与原生SparkSQL隔离运行,不影响现有程序运行 + + +![111](https://user-images.githubusercontent.com/3014381/98097358-e0933100-1ec7-11eb-81c8-bf1863422da5.png) + + +**Quicksql:** + +Quicksql是一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。 + +功能特性: +- 支持8种数据源查询:Hive, MySQL, Kylin, Elasticsearch, Oracle, MongoDB, PostgreSQL, GBase-8s; +- 支持Spark、Flink双计算引擎; +- 支持基础CLI命令行查询和JDBC远程连接查询; +- JDBC类型数据源可通过YAML配置快速接入,无需修改代码; +- 提供方言/语法对接模板,支持用户对新数据源的语法自定义; +- 提供元数据采集功能,批量拉取预置元数据; +- 支持落地HDFS,支持可配置的异步响应机制 + + +![p1](https://user-images.githubusercontent.com/3014381/98097392-eb4dc600-1ec7-11eb-92c3-8ea674f3da3d.png) + + +## 执行流程图 + +![333](https://user-images.githubusercontent.com/3014381/98097418-f274d400-1ec7-11eb-82e9-fe38d8f8dd0b.png) + + +## 实践过程 + +参考linkis官方文档《如何快速实现新的底层引擎》、《Spark引擎介绍》,然后在uejs/definedEngines下创建xsql模块进行相关开发。 + +功能点: +- 1、支持按照不同集群加载相关配置 +- 2、支持自定义结果存储路径 +- 3、支持是否开启默认limit 5000限制保护 +- 4、linkis网关上socket支持token user认证。 +- 5、适配公司内部hadoop版本 +- 6、增加XSQL执行引擎 + + +## 实现过程简述: + + +由于公司大数据是有多集群的,为了节省客户端资源,可以复用客户端提交任务到不同集群,这时就需要能够灵活指定不同集群的配置文件。 +目前是将用到的集群配置文件放到linkis-hadoop-conf文件夹下,用于在启动引擎时以及己启动的执行引擎里进入动态加载。 + +``` + +├── client-viewfs.xml +├── core-site-cluster1.xml +├── hbase-site-cluster1.xml +├── hdfs-site.xml +├── hive-default.xml +├── hive-exec-log4j.properties +├── hive-log4j.properties +├── hive-site-cluster1.xml +├── ivysettings.xml +├── mapred-site-cluster1.xml +├── spark-defaults-cluster1.conf +├── xsql-spark-defaults-cluster1.conf +└── yarn-site-cluster1.xml + +``` +在linkis-gateway网关模块,改造让socket支持token user认证。比如在创建socket连接时可以通过传入token相关参数来完成用户认证。 + +``` +ws://gateway.linkis.net:9001/ws/api/entrance/connect?Token-User=xxxx&Token-Code=BML-AUTH + +``` + +``` +{ + //这个地址也需要增加token参数 + "method":"/api/rest_j/v1/entrance/execute?Token-User=xxx&Token-Code=BML-AUTH", + "data":{ + "params": { + "variable":{ + }, + "configuration":{ + "special":{ + + }, + "runtime":{ + + }, + "startup":{ + } + } + }, + "executeApplicationName":"xsql", + "executionCode":"SELECT * FROM abc limit 5;", + "runType":"sql" + } +} + +``` +由于业务实际查询时是需要全量数据,不需要进行limit限制。 +而且是想根据每次请求中参数动态来设置是否需要Limit,而不是通过全局配置统一禁用还是开启。 +业务需要自定义存储结果路径,比如跨集群跨账号存储查询结果。 + +以上是Linkis\ujes\entrance入口模块里进行参数接受处理。 + + + **XSQL执行引擎实现:** + +- 目录结构 + +![222](https://user-images.githubusercontent.com/3014381/98097446-fb65a580-1ec7-11eb-9908-8a7717fcc1f2.png) + + +由于xsql是基于spark实现的。所以xsql执行引擎基本是复用了linkis spark引擎代码。 + + +重点是修改如下: + + +主要涉及到linkis-ujes-xsql-engine 模块相关改动 + +- pom.xml + +``` + + + +2.4.3.xsql-0.6.0, + +``` + +2.4.3.xsql-0.6.0这个版本请根据从开源xsql编译时获取,由于适配了公司内部,所以版本号可能略有不同。 + + +SparkEngineExecutorFactory 类 + +``` +override def createExecutor(options: JMap[String, String]): SparkEngineExecutor = { + + val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath + SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) => + k.startsWith("spark.") + }.foreach { case (k, v) => + conf.set(k, v) + sys.props.getOrElseUpdate(k, v) + } + +} + +def createSparkSession(outputDir: File, conf: SparkConf, addPythonSupport: Boolean = false): SparkSession = { + + + //val builder = SparkSession.builder.config(conf) + //builder.enableHiveSupport().getOrCreate() + + //划重点:将enableHiveSupport改成enableXSQLSupport() + val builder = SparkSession.builder.config(conf) + builder.enableXSQLSupport().getOrCreate() +} + +``` + +SparkEngineExecutor 类 + +``` +override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally { + + //同样要增加加载配置代码段 + val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath + SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) => + k.startsWith("spark.") + }.foreach { case (k, v) => + sc.getConf.set(k, v) + sys.props.getOrElseUpdate(k, v) + } + +} +``` + +## 如何使用 + +提交参数如下: + +``` +{ + "params":{ + "variable":{ + }, + "configuration":{ + "special":{ + }, + "runtime":{ + "clusterName":"cluster1", + "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf", + "userName":"hadoop", + "wds.linkis.yarnqueue":"hadoop", + //可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限 + "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei" + //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true, + //否则不需要传这个参数,linkis则默认会进行limit 5000限制 + //"allowNoLimit" : true + }, + "startup":{ + "clusterName":"cluster1", + "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf", + "userName":"hadoop", + "wds.linkis.yarnqueue":"hadoop", + //可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限 + "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei" + //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true, + //否则不需要传这个参数,linkis则默认会进行limit 5000限制 + //"allowNoLimit" : true + } + } + }, + "executeApplicationName":"xsql", + "executionCode":" + REMOVE DATASOURCE IF EXISTS mysql_connect_name; + ADD DATASOURCE mysql_connect_name(type='mysql',url='jdbc:mysql://10.22.22.22:3306',user='root',password='123456',pushdown='false',useSSL='false',version='5.7.28'); + REMOVE DATASOURCE IF EXISTS hive_cluster1; + ADD DATASOURCE hive_cluster1(type='hive',metastore.url='thrift://10.222.222.222:9083',user='test',password='test',version='1.2.1'); + SELECT t1.id,t1.name,t1.title,t2.time,t2.url,t2.partner,t2.m2 FROM (SELECT id,name,title,ip FROM mysql_connect_name.database_name.mysql_tables) t1 JOIN + (SELECT m,time,url,partner,ip FROM hive_cluster1.database_name.hive_tablse WHERE day = 20200903) t2 + ON t1.ip=t2.ip order by t2.time;", + "runType":"sql" +} + +``` + +XSQL语法说明: + +删除数据源时请使用这种语法 REMOVE DATASOURCE IF EXISTS 数据源名称; 避免直接REMOVE DATASOURCE 数据源名称, 因为上来就执行删除数据源,会因为找不到数据源来报异常。 + +查询的表名需要增加数据源以及数据库进行限定,要符合三段式表名。比如:hive_cluster1.database_name.table_name + +第一段数据源名称,就是添加数据源语法时自定义的名称hive_cluster1,比如ADD DATASOURCE hive_cluster1(... ...) + +第二段数据库名称,这个需要是真实的数据库,比如database_name + +第三段表名,表要是第二段数据库下真实的表名。 + +更多XSQL使用语法,可以查看官方相关文档。https://qihoo360.github.io/XSQL/tutorial/syntax/ + + +这样就可以实现mysql与hive数据进行关联查询了。 + +## 相关版本 + +hive 1.2.1 + +spark 2.4.3 + +linkis 0.9.3 + +xsql 0.6.0 + +java 1.8+ + +hadoop 2.7.2 + + +## 相关资源 + +https://github.com/WeBankFinTech/Linkis + +https://github.com/Qihoo360/XSQL + +https://github.com/Qihoo360/Quicksql -- Gitee