From e0603ba37808ff6fb4910874205fe36295a04a30 Mon Sep 17 00:00:00 2001 From: limeng <77954309@qq.com> Date: Thu, 21 Jan 2021 17:11:02 +0800 Subject: [PATCH] =?UTF-8?q?add=20=E5=BC=80=E6=BA=90=E6=95=85=E4=BA=8B/DSS+?= =?UTF-8?q?Linkis=E5=9C=A8=E7=9F=A5=E5=9B=A0=E6=99=BA=E6=85=A7=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E6=83=85=E5=86=B5.md.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...77\347\224\250\346\203\205\345\206\265.md" | 235 ++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 "\345\274\200\346\272\220\346\225\205\344\272\213/DSS+Linkis\345\234\250\347\237\245\345\233\240\346\231\272\346\205\247\344\275\277\347\224\250\346\203\205\345\206\265.md" diff --git "a/\345\274\200\346\272\220\346\225\205\344\272\213/DSS+Linkis\345\234\250\347\237\245\345\233\240\346\231\272\346\205\247\344\275\277\347\224\250\346\203\205\345\206\265.md" "b/\345\274\200\346\272\220\346\225\205\344\272\213/DSS+Linkis\345\234\250\347\237\245\345\233\240\346\231\272\346\205\247\344\275\277\347\224\250\346\203\205\345\206\265.md" new file mode 100644 index 0000000..15485e6 --- /dev/null +++ "b/\345\274\200\346\272\220\346\225\205\344\272\213/DSS+Linkis\345\234\250\347\237\245\345\233\240\346\231\272\346\205\247\344\275\277\347\224\250\346\203\205\345\206\265.md" @@ -0,0 +1,235 @@ +## 一.应用场景 + +首先感谢社区各位大佬的指点,学习到很多。 + +知因智慧是一家toB金融公司,里面需要大量的ETL过程,原先用Shell脚本连接各种Hql,Spark等等,XXL- Job调度,可能一个模块就被一个大的脚本包含住了,耦合性特别强,调度这块也有问题,无法监控中间的报错,2019下半年时看到社区开源组件,一直研究怎么跟公司整合。 + +希望借助社区的力量,结合公司实际情况,打通公司级数据中台的流程,目前数据建设主要集中在元数据管理,数据仓库ETL流程,数据质量,任务调度这几个方面。 + +## 二. 解决的问题 + +>基于LDAP服务 + +基于LDAP管理用户,代理服务模块修改,以组为单位共用账户,公司的整个数据开发人员不多,基于这种方式可以支撑下去。 + +```java +object LDAPUtils extends Logging { + val url = CommonVars("wds.linkis.ldap.proxy.url", "").getValue + val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue + def login(userID: String, password: String): Unit = { + if(userID.isEmpty) throw new NamingException("userID is null") + val env = new Hashtable[String, String]() + val bindDN = "uid="+userID+"," + val bindPassword = password + env.put(Context.SECURITY_AUTHENTICATION, "simple") + env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory") + env.put(Context.PROVIDER_URL, url) + env.put(Context.SECURITY_PRINCIPAL, bindDN+baseDN) + env.put(Context.SECURITY_CREDENTIALS, bindPassword) + new InitialLdapContext(env, null) + info(s"user $userID login success.") + } +} +``` +>放开权限 + +因为资源有限,有些权限管理很繁琐,放开一些权限: + +filesystem.path 存储日志,脚本的权限,统一根据nfs挂载,把目录权限统一根据用户随意修改。 + +linkis-metadata元数据管理,把Hive相关权限放开(HiveMetaDao.xml): + +```xml + + +``` + +>依赖兼容问题 + +因为环境是CDH-5.16.2 编译部署DSS和Linkis是根据原生的版本号,大部分服务都没有问题,但是有的服务有些问题,因为CDH会把组件重新编译,有的指令会改变。 + +原先发生过Hive 和 Spark应该支持的函数,到Scriptis上运行脚本,不支持,这是因为得把两个服务相关的Hive 和 Spark jar 都变成后缀带有CDH的。 + +![图片](https://uploader.shimo.im/f/QsJF5yXp0f7Oi2GW.png!thumbnail?fileGuid=vKDH9hgWXR9Tg98J) + +>tez的支持 + +Linkis Hive引擎对tez的支持: + +* tez相关jar放到linkis-ujes-hive-enginemanager/lib +* linkis.properties配置Hive配置文件目录,hive-site.xml配置文件中 +```xml + +    tez.lib.uris +    hdfs:///apps/tez/tez-0.8.5.tar.gz +    +    hive.tez.container.size +    10240 +``` +>Shell定义变量 + +![图片](https://uploader.shimo.im/f/MnljxtnMAnXdjCUb.png!thumbnail?fileGuid=vKDH9hgWXR9Tg98J) + +自定义变量的支持: + +CustomVariableUtils 工具类中,Shell关枚举都要添加上。 + +```plain +/** + * @Classname ShellScriptCompaction + * @Description TODO + * @Date 2020/8/19 18:22 + * @Created by limeng + */ +class ShellScriptCompaction private extends CommonScriptCompaction{ + override def prefixConf: String = "#conf@set" + override def prefix: String = "#@set" + override def belongTo(suffix: String): Boolean ={ + suffix match { + case "sh"=>true + case _=>false + } + } +} +object ShellScriptCompaction{ + val shellScriptCompaction:ShellScriptCompaction=new ShellScriptCompaction + def apply(): CommonScriptCompaction = shellScriptCompaction +} +``` +ScriptFsWriter Shell相关 def listCompactions(): Array[Compaction] = Array(PYScriptCompaction(),QLScriptCompaction(),ScalaScriptCompaction(),ShellScriptCompaction()) +WorkspaceUtil 工具类正则有问题,无法修改名称,中间有.符号去除 + +```plain +public static void fileAndDirNameSpecialCharCheck(String path) throws WorkSpaceException { + String name = new File(path).getName(); + LOGGER.info(path); + String specialRegEx = "[ _`~!@#$%^&*()+=|{}':;',\\[\\]<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?]|\n|\r|\t"; + Pattern specialPattern = Pattern.compile(specialRegEx); + if(specialPattern.matcher(name).find()){ + throw new WorkSpaceException("the path exist special char"); + } +} +``` +>使用 eventReceiver节点异常(eventchecker组件)#247 +```scala +EventCheckerNodeExecution.scala + Utils.tryFinally { + + resultSetWriter.addMetaData(null) + + resultSetWriter.addRecord(new LineRecord(action.saveKeyAndValue)) + + }(IOUtils.closeQuietly(resultSetWriter)) + + }(Utils.tryQuietly(resultSetWriter.close())) + + } + response.setIsSucceed(true) + + }else{ + +............................................. +AppJointEntranceJob.scala +override def run(): Unit = { + if(!isScheduled) return + info(s"$getId starts to run") + getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(s"$getId starts to execute."))) + startTime = System.currentTimeMillis + getExecutor match { + case appjointEntranceEngine:AppJointEntranceEngine => appjointEntranceEngine.setJob(this) + appjointEntranceEngine.setInstance(Sender.getThisInstance) + } + Utils.tryAndErrorMsg(transition(Running))(s"transition $getId from Scheduler to Running failed.") +``` +>调试服务 + +把有问题的服务,bin目录下启动脚本,远程debug打开 + +![图片](https://uploader.shimo.im/f/U1YucfKzVlEgQNcZ.png!thumbnail?fileGuid=vKDH9hgWXR9Tg98J) + +因为平台Cookie的原因,直接用接口发送请求,有的无法调试: + +```java +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; +/** + * @Classname HttpUtil + * @Description TODO + * @Date 2020/10/30 14:49 + * @Created by limeng + */ +public class HttpUtil { + public static RestTemplate getRestClient(){ + CloseableHttpClient build = HttpClientBuilder.create().useSystemProperties().build(); + return new RestTemplate(new HttpComponentsClientHttpRequestFactory(build)); + } +} +import com.linkis.web.utils.HttpUtil; +import net.sf.json.JSONObject; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; +/** + * @Classname LinkisMain + * @Description TODO + * @Date 2020/10/30 14:54 + * @Created by limeng + * 测试类 EntranceRestfulTest + */ +public class LinkisMain { + public static void main(String[] args) { + RestTemplate restClient = HttpUtil.getRestClient(); + JSONObject postData = new JSONObject(); + postData.put("password","hdfs"); + postData.put("userName","hdfs"); + String loginUrl = "http://192.168.200.116:8088/api/rest_j/v1/user/login"; + ResponseEntity jsonResponseEntity = restClient.postForEntity(loginUrl, postData, JSONObject.class); + System.out.println("状态码:"+jsonResponseEntity.getStatusCodeValue()); + JSONObject body = jsonResponseEntity.getBody(); + System.out.println("body :" + body.toString()); + } +} +``` + +>调度服务 + +邮箱警告的配置调整 + +/plugins/alerter/WebankIMS/conf plugin.properties 设置本地依赖(alerter.class=azkaban.utils.Emailer) + +## 三.最佳实践 + +我以公司标签库为例,讲述下操作流程。 + +对企业数据进行挖掘和分析,建立标签特征体系,创建个性化的多层级标签,并在此基础上进行细分和精准营销场景应用,有利于对企业/集团进行深入经营,充分挖掘企业/集团潜力,提升企业/集团价值。以此为目标的标签库的构建,将有利于了解和深耕企业/集团,可更好地助力于企业金融服务。 + +![图片](https://uploader.shimo.im/f/b0d7VwOMKTsLGp9I.png!thumbnail?fileGuid=vKDH9hgWXR9Tg98J) + +![图片](https://uploader.shimo.im/f/WZnvuiTkgJv7qsTd.png!thumbnail?fileGuid=vKDH9hgWXR9Tg98J) + +这是执行流程图,中间调度过程,Hql,Spark目前运行在DSS平台上。 + +![图片](https://uploader.shimo.im/f/mCLiMk1ClF1eT0co.png!thumbnail?fileGuid=vKDH9hgWXR9Tg98J) + +其中一个企业经营特别的跑批流程。 + +>软件版本 + +CDH-5.16.2 + +Hadoop-2.6 + +Hive-1.1 + +Spark-2.2 + +kafka_2.11-1.0.1 \ No newline at end of file -- Gitee