diff --git "a/\345\274\200\346\272\220\346\225\205\344\272\213/\346\210\221\344\270\216[DataSphereStudio]\347\232\204\346\225\205\344\272\213" "b/\345\274\200\346\272\220\346\225\205\344\272\213/\346\210\221\344\270\216[DataSphereStudio]\347\232\204\346\225\205\344\272\213" new file mode 100644 index 0000000000000000000000000000000000000000..44158524b895a4ffb41ae51cf378af8b9c852111 --- /dev/null +++ "b/\345\274\200\346\272\220\346\225\205\344\272\213/\346\210\221\344\270\216[DataSphereStudio]\347\232\204\346\225\205\344\272\213" @@ -0,0 +1,135 @@ +故事起源: +公司的数据分析团队目前使用的是sas软件,界面较落后,需要在每台机器上都安装客户端,单机环境,依赖于机器配置,无法与集群速度相比,缺少高可用高并发的特性。目前公司的数据处理方式,需要先将数据下载到本地,再通过sas编写脚本处理,原始数据、脚本、结果数据会极大占用机器空间。并且sas是国外的成熟商业软件,每年的采购价格不菲。为了节省开支,以及支持办公软件国产化,所以寻找国内优秀的开源软件用以逐步替代sas。 + +解决问题: +使用了dss与linkis组件,以页面化的形式开发脚本,方便快捷,能够使用多种类型脚本直接操作hive数据,不需要数据导出到本地后再处理,对sas节省资源开销,相比较于sas方便的管理HDFS中的文件。目前阶段处于分析人员使用dss开发数据处理脚本,以及使用过程中的问题修复。 + +使用情况: +目前阶段处于linkis适配公司环境,以及修复使用问题,还未新增功能或者新引擎支持,后续如有会分享出来。 +公司的大数据相关环境有专门的团队负责,我们在安装使用的过程中进行了一系列的适配: + +1.大数据环境使用的是CDH 5.16.1,而源码是社区版本,所以根据具体的版本我们进行了重新编译。 + +2.HDFS权限被ACL接管,不与Linux系统权限同步,所以直接用hql脚本查询数据时,遇到了HDFS目录无权访问的情况。经过沟通了解到数据团队规定必须通过jdbc,经过域账号验证后,才可访问hive数据。所以在使用中暂时将hql和sql的脚本隐藏,主要使用jdbc和python脚本来处理数据。 + +3.公司spark版本是2.4.0.cloudera2,修改了后台识别版本的逻辑。 + +4.CDH对版本校验比较严格,所以修改了pyspark.zip包中的content.py文件,将社区版中的分支判断补充进来: + +if allow_insecure_env == "1" or allow_insecure_env.lower() == "true": +warnings.warn( +"You are passing in an insecure Py4j gateway. This " +"presents a security risk, and will be completely forbidden in Spark 3.0") +5.因为数据分析人员较多,所以需要创建多个账号,linkis的使用条件需要在linux机器中有对应的用户,以及linux、hdfs上的工作空间目录,权限设置等,所以我们开发了一个user-init脚本,专门针对新用户使用linkis的初始化操作。并且公司AD域账号全部为小写字母,修改登陆逻辑不强制将用户名转小写。 + +6.Linkis数据库列表的数据来源通过读取hive元数据库方式,为配合权限管控,修改为hiveserver2校验账号密码的方式获取有权限的数据库列表,相应的点击数据库后的查询表操作,以及查询表内容等操作,全部修改为jdbc方式。 + +7.增加方法函数支持python。 + +问题修复: +1.scriptis中左侧数据库操作,从hql转为jdbc方式,并且修改了dbs接口: + +private List getDbsByUser(String userName) throws ErrorException, SQLException { + JdbcSettingResponse jdbcSetting = queryJdbcSetting(userName); + + if(StringUtils.isEmpty(jdbcSetting.url()) || StringUtils.isEmpty(jdbcSetting.userName()) || StringUtils.isEmpty(jdbcSetting.passwd())) + { + throw new ErrorException(9999, "JDBC配置为空,请在控制台中进行JDBC连接设置。"); + } + + HiveDatabaseAndTableRetriever retriever = new HiveDatabaseAndTableRetriever(jdbcSetting.url(), jdbcSetting.userName(), jdbcSetting.passwd()); + return retriever.getDbs(); +} + +private JdbcSettingResponse queryJdbcSetting(String userName) { + JdbcSettingRequest request = new JdbcSettingRequest(userName); + Sender sender = Sender.getSender("cloud-publicservice"); + return (JdbcSettingResponse)sender.ask(request); +} +2.scriptis中打开多个tab页时,最右边标签无法关闭: + +this.tabMove.maxTabLen = Math.floor(this.width / 100) - 1; +3.脚本自定义参数保存失败: + +const params = ismodifyByOldTab ? option.params : this.convertSettingParams(rst.metadata); +4.工作流中的节点脚本执行完成没有耗时显示: + + 中增加:cost-time="script.progress.costTime" +5.工作流中的节点点击历史后,无法返回编辑页。 + +6.脚本开发运行结果全屏后无法退出。 + +7.控制台中jdbc连接设置修改后,立即生效: +原有的缓存时间是120秒,为了避免修改CacheableRPCInterceptor影响其他,所以新建了一个ShortCacheableRPCInterceptor类,缓存时间设置为3秒: + +@Component +class ShortCacheableRPCInterceptor extends RPCInterceptor with Logging{ + + private val guavaCache: Cache[Any, Any] = CacheBuilder.newBuilder().concurrencyLevel(5) + .expireAfterWrite(3000, TimeUnit.MILLISECONDS).initialCapacity(20) //TODO Make parameters(做成参数) + .maximumSize(1000).recordStats().removalListener(new RemovalListener[Any, Any] { + override def onRemoval(removalNotification: RemovalNotification[Any, Any]): Unit = { + debug(s"CacheSender removed key => ${removalNotification.getKey}, value => ${removalNotification.getValue}.") + } + }).asInstanceOf[CacheBuilder[Any, Any]].build() + + override val order: Int = 11 + + override def intercept(interceptorExchange: RPCInterceptorExchange, chain: RPCInterceptorChain): Any = interceptorExchange.getProtocol match { + case cacheable: ShortCacheableProtocol => + guavaCache.get(cacheable.toString, new Callable[Any] { + override def call(): Any = { + val returnMsg = chain.handle(interceptorExchange) + returnMsg match { + case warn: WarnException => + throw warn + case _ => + returnMsg + } + } + }) + case _ => chain.handle(interceptorExchange) + } +} +8.scriptis中左侧数据库进行删表操作后,刷新库时的定位错误问题: + +this.currentAcitved = find(this.tableList, (db) => db.name === this.currentAcitved.dbName || this.currentAcitved.name); +9.用户提出希望有组概念,同组中的用户可以看到各自的脚本文件。现有的scriptis设计是租户隔离,所以我们新增了user_group表,保存用户和组关系,在user-init脚本中新增了添加用户组逻辑,修改了getUserRootPath的接口,增加了一层组目录判断(没有添加组的用户,逻辑与原本保持不变),并挂载到nas盘上,实现了同组用户在windows中可以互相查看脚本的需求。 + +10.scriptis中的脚本名称包含特定字符时,出现没有小图标以及丢失脚本执行按钮的问题,正则表达式判断有误: + +{ rule: /(表详情)|(Table\sdetails)/, executable: false, isCanBeOpen: true }, +{ rule: /(建表向导)|(Table\screation\sguide)/, executable: false, isCanBeOpen: true } +11.python支持方法函数: + +//ConstantVar 新增 +public final static int FUNCTION_PYTHON = 11; +12.python脚本在执行过程中取消后,再次执行时会延续上一次取消前的状态,且找不到系统方法定义 ,修改了pythonEngineExecutor的逻辑: + +//executeLine中增加 +if(!this.isEngineInitialized) { + savedHookCodes.add(code) +} + +if(this.pySession == null) this synchronized { + if(this.pySession == null) { + this.pySession = new PythonSession + for(i <- 0 to savedHookCodes.size() -1){ + info("executeLine code when pySession is null:" + savedHookCodes.get(i)) + executeLine(engineExecutorContext, savedHookCodes.get(i)) + } + } +} +13.jdbc脚本并发操作时会出现持续处于运行中状态的问题: +将全局变量中的connection statement 修改为executeLine方法中的局部变量来解决多线程并发问题,同时将所有的close方法修改到finally中,改动后还未复现。 + +14.jdbc脚本查询结果超过entrance中规定的最大缓存时(一般数据量为5000左右就会超过),查询结果不完整的问题: +executeLine中resultSetWriter没有正常关闭导致生成的dolphin文件不全。 + +期望功能: + +脚本信息打印:当一个脚本中有多段数据处理过程时,只能看到最终执行结果,无法查询每一段的执行情况:耗时,执行生效条数等信息,使用人员分析时较为困难。 +用户管理,要使用dss需要在服务器上创建系统用户,目前是通过我们自己编写的用户初始化bash脚本,创建用户、相关工作目录以及权限设置,希望后续dss版本中增加用户管理功能。 +多数据源管理:可以处理hive以外的其他数据源数据。 +目前版本的linkis脚本引擎启动时间过长,希望可以改善。 +目前遇到了一些脚本持续停留在排队中的情况,重启引擎后才可以正常执行。 \ No newline at end of file