# lg-elasticsearch **Repository Path**: sunli1103_admin/lg-elasticsearch ## Basic Information - **Project Name**: lg-elasticsearch - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-10-09 - **Last Updated**: 2022-03-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 作业说明 #### 课程介绍 > **第七阶段 分布式搜索引擎进阶** > > 模块二 海量日志分析平台Elastic Stack > > 本模块将针对Elasticsearch的数据模型分析、构建和算法扩展进行深入讲解,结合拉勾网亿级数据量进行搜索实战,最后对es进行性能调优。 #### 作业内容 > 1. 在MySQL数据中建立lagou_db数据库, 将position.sql中的数据导入到mysql 数据中。 > > 2. 选择一种合理的方式(可以使用上课中使用的方式 也可以选择自己熟悉的插件等) 将mysql中的数据导入到ES中。 > > 3. 使用SpringBoot 访问ES 使用positionName 字段检索职位信息 如果检索到的职位信息不够5条则需要启用positionAdvantage 查找 美女多、员工福利好 的企业职位信息进行补充够5条。 > > 提示: > > 搜索一次查询不能满足要求时发起第二次搜索请求。搜索一次满足需求不需要发送第二次搜索请求。 > > 作业需求: > 1. 提交导入数据的过程视频 和 案例讲解视频 > > 2. 提交案例项目代码 > > sql文件:https://gitee.com/tian_lanlan/java_001/blob/master/%E4%BD%9C%E4%B8%9A%E8%B5%84%E6%96%99/position.sql #### 软件版本 ``` centos-7.7 jdk-11.0.7 mysql-5.7 elasticsearch-7.3.0 elasticsearch-analysis-ik-7.3.0 elasticsearch-head-5.0 node-v10.15.3 phantomjs-2.1.1 ``` #### 注意问题 1. 内存不足 因为虚拟机内存不足,需要修改JVM内存限制。推荐虚拟机至少设置为2g,JVM设置为512m。 2. 权限问题 如果 `chown -R estest /usr/elasticsearch/` 不好用,就用root用户执行 `chmod -R 777 /usr/elasticsearch/`,重启系统或者生成新日志文件后可能需要重新设置。 3. head重装依赖 如果启动出错,则把第三步中的依赖再安装一遍。 4. 无法下载 所有联网安装的安装包,如果无法访问,可以手动下载。比如head代码。 5. 删除data 如果集群启动错误,可以尝试删除data目录后再尝试。 6. 注意启动各个节点时别冲突 开启多个shell窗口后,可能会使用相同命令启动3个节点(使用同一台虚拟机),注意不同的节点目录: ```shell /opt/elasticsearch-7.3.0/bin/elasticsearch /opt/elasticsearch-7.3.0-2/bin/elasticsearch /opt/elasticsearch-7.3.0-3/bin/elasticsearch ``` 而不是: ```shell /opt/elasticsearch-7.3.0/bin/elasticsearch /opt/elasticsearch-7.3.0/bin/elasticsearch /opt/elasticsearch-7.3.0/bin/elasticsearch ``` #### 实现步骤 **1. 导入MySQL数据** 在MySQL数据中建立lagou_db数据库, 将position.sql中的数据导入到mysql 数据中。 **2. 安装Elasticsearch** 采用集群方式安装,一共3个节点。 1. 配置安装文件 ```shell tar -zxvf jdk-11.0.7-linux-x64.tar.gz -C /opt/java/ tar -zxvf elasticsearch-7.3.0-linux-x86_64.tar.gz -C /opt/ ``` 2. 配置jdk环境变量 ```shell vim /etc/profile ``` ```shell export JAVA_HOME=/opt/java/jdk-11.0.7 export PATH=$PATH:$JAVA_HOME/bin ``` 生效配置 ```shell source /etc/profile ``` 检查java版本 ```shell java -version ``` 3. 按需修改内存设置 /opt/elasticsearch-7.3.0/config/jvm.options ```shell -Xms512m -Xmx512m ``` 4. 复制集群节点 ```shell cp -rf /opt/elasticsearch-7.3.0 /opt/elasticsearch-7.3.0-2 cp -rf /opt/elasticsearch-7.3.0 /opt/elasticsearch-7.3.0-3 ``` 5. 配置es的文件 /opt/elasticsearch-7.3.0/config/elasticsearch.yml ```shell cluster.name: my-es node.name: node-1 node.master: true network.host: 0.0.0.0 http.port: 9200 transport.port: 9300 discovery.seed_hosts: ["127.0.0.1:9300", "127.0.0.1:9301","127.0.0.1:9302"] cluster.initial_master_nodes: ["node-1", "node-2", "node-3"] http.cors.enabled: true http.cors.allow-origin: "*" ``` /opt/elasticsearch-7.3.0-2/config/elasticsearch.yml ```shell cluster.name: my-es node.name: node-2 node.master: true network.host: 0.0.0.0 http.port: 9201 transport.port: 9301 discovery.seed_hosts: ["127.0.0.1:9300", "127.0.0.1:9301","127.0.0.1:9302"] cluster.initial_master_nodes: ["node-1", "node-2", "node-3"] http.cors.enabled: true http.cors.allow-origin: "*" ``` /opt/elasticsearch-7.3.0-3/config/elasticsearch.yml ```shell cluster.name: my-es node.name: node-3 node.master: true network.host: 0.0.0.0 http.port: 9202 transport.port: 9302 discovery.seed_hosts: ["127.0.0.1:9300", "127.0.0.1:9301","127.0.0.1:9302"] cluster.initial_master_nodes: ["node-1", "node-2", "node-3"] http.cors.enabled: true http.cors.allow-origin: "*" ``` 6. 添加用户 添加es用户,es默认root用户无法启动,需要改为其他用户 ```shell useradd estest passwd estest ``` 改变es目录拥有者账号 ```shell chown -R estest /opt/elasticsearch-7.3.0 chown -R estest /opt/elasticsearch-7.3.0-2 chown -R estest /opt/elasticsearch-7.3.0-3 ``` 7. 修改/etc/sysctl.conf,添加 ```shell vm.max_map_count=655360 ``` 生效配置 ```shell sysctl -p ``` 8. 修改/etc/security/limits.conf,添加 ```shell * soft nofile 65536 * hard nofile 65536 * soft nproc 4096 * hard nproc 4096 ``` 9. 启动es ```shell su estest /opt/elasticsearch-7.3.0/bin/elasticsearch /opt/elasticsearch-7.3.0-2/bin/elasticsearch /opt/elasticsearch-7.3.0-3/bin/elasticsearch ``` 10. 测试验证 ```shell http://192.168.0.111:9200/_cat/health?v ``` ![输入图片说明](https://images.gitee.com/uploads/images/2020/1010/171152_92cc2f6a_1712191.png "屏幕截图.png") **3. 安装head插件** 1. nodejs安装 如果无法访问,可手动下载安装包 ```shell wget https://nodejs.org/dist/v10.15.3/node-v10.15.3-linux-x64.tar.xz tar xf node-v10.15.3-linux-x64.tar.xz cd node-v10.15.3-linux-x64 ./bin/node -v ``` 解压文件的 bin 目录底下包含了 node、npm 等命令,我们可以使用 ln 命令来设置软连接: ```shell ln -s /root/node-v10.15.3-linux-x64/bin/npm /usr/local/bin/ ln -s /root/node-v10.15.3-linux-x64/bin/node /usr/local/bin/ ``` 2. phantomjs安装配置 ```shell cd /usr/local wget https://github.com/Medium/phantomjs/releases/download/v2.1.1/phantomjs-2.1.1-linux-x86_64.tar.bz2 yum install -y bzip2 tar -jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2 ``` ```shell vim /etc/profile ``` ```shell export PATH=$PATH:/usr/local/phantomjs-2.1.1-linux-x86_64/bin ``` ```shell source /etc/profile ``` 3. 安装elasticsearch-head ```shell cd /opt/ npm install -g grunt-cli npm install grunt npm install grunt-contrib-clean npm install grunt-contrib-concat npm install grunt-contrib-watch npm install grunt-contrib-connect yum -y install git git clone git://github.com/mobz/elasticsearch-head.git cd elasticsearch-head npm install -g cnpm --registry=https://registry.npm.taobao.org ``` 如果git访问不到,可以手动下载head安装包 https://github.com/mobz/elasticsearch-head ```shell tar -zxvf elasticsearch-head-master.zip -C /opt/ mv /opt/elasticsearch-head-master /opt/elasticsearch-head ``` 4. 修改es配置(如果之前设置过,可以忽略这一步) /opt/elasticsearch-7.3.0/config/elasticsearch.yml ```shell http.cors.enabled: true http.cors.allow-origin: "*" ``` 5. 启动 ```shell cd /opt/elasticsearch-head npm run start ``` 如果启动出错 则把第三步中的依赖再安装一遍 6. 启动完成后,查看主从环境配置正常 http://192.168.0.111:9100 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1010/170421_cde5b35a_1712191.png "屏幕截图.png") **4. 安装ik中文分词器** 1. 下载分词器 在es的bin目录下执行以下命令 ```shell /opt/elasticsearch-7.3.0/bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.3.0/elasticsearch-analysis-ik-7.3.0.zip ``` 2. 下载完成后会提示 Continue with installation? 输入 y 即可完成安装 3. 重启es 4. 另外两个节点也需要分别安装 **5. SpringBoot项目** 1. 配置信息 EsConfig.java ```java package com.lagou.elasticsearch.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EsConfig { @Value("${spring.elasticsearch.rest.uris}") private String hostlist; @Bean public RestHighLevelClient client() { // 解析hostlist配置信息 String[] split = hostlist.split(","); // 创建HttpHost数组,其中存放es主机和端口的配置信息 HttpHost[] httpHostArray = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) { String item = split[i]; System.out.println(item); httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http"); } // 创建RestHighLevelClient客户端 return new RestHighLevelClient(RestClient.builder(httpHostArray)); } } ``` DBHelper.java ```java package com.lagou.elasticsearch.util; import java.sql.Connection; import java.sql.DriverManager; public class DBHelper { public static final String url = "jdbc:mysql://localhost:3306/lagou_position?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"; public static final String name = "com.mysql.cj.jdbc.Driver"; public static final String user = "root"; public static final String password = "root"; public static Connection conn = null; public static Connection getConn() { try { Class.forName(name); conn = DriverManager.getConnection(url, user, password); } catch (Exception e) { e.printStackTrace(); } return conn; } } ``` application.yml ```yml spring: jackson: time-zone: GMT-8 date-format: yyy-MM-dd HH:mm:ss devtools: restart: enabled: true #设置开启热部署 additional-paths: src/main/java #重启目录 exclude: WEB-INF/** freemarker: cache: false #页面不加载缓存,修改即时生效 elasticsearch: rest: uris: 192.168.0.111:9200,192.168.0.111:9201,192.168.0.111:9202 ``` 2. 导入es数据 PositionController.java ```java @RequestMapping("/importAll") @ResponseBody public String importAll() { try { service.importAll(); } catch (IOException e) { e.printStackTrace(); } return "success"; } ``` PositionServiceImpl.java ```java ... @Autowired private RestHighLevelClient client; @Override public void importAll() { writeMysqlDataToES(POSITIOIN_INDEX); } /** * 讲数据批量写入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); logger.info("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要 设置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList> dataList = new ArrayList>(); // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式 HashMap map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap(128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条写一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { logger.info("Mysql handle data number : " + count); // 将数据添加到 bulkProcessor 中 for (HashMap hashMap2 : dataList) { bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便将map与list清空 map.clear(); dataList.clear(); } } // 处理未提交的数据 for (HashMap hashMap2 : dataList) { bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } logger.info("-------------------------- Finally insert number total:" + count); // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间 bulkProcessor.flush(); } catch (Exception e) { logger.error(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); logger.info(terminatedFlag); } catch (Exception e) { logger.error(e.getMessage()); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: " + executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Bulk is unsuccess : " + failure + ", executionId:" + executionId); } }; BiConsumer> bulkConsumer = (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { logger.error(e1.getMessage()); } } return bulkProcessor; } ... ``` Position.java ```java package com.lagou.elasticsearch.entity; import lombok.Data; import java.io.Serializable; @Data public class Position implements Serializable { private String id; private String positionName; private String positionAdvantage; private String salary; private String workYear; private String education; private String city; private String companyName; private String publishTime; } ``` 导入结果 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1018/012629_034f2b14_1712191.png "屏幕截图.png") 3. 使用SpringBoot访问es PositionController.java ```java @GetMapping("/search/{keyword}/{pageNo}/{pageSize}") @ResponseBody public List> searchPosition(@PathVariable("keyword") String keyword, @PathVariable("pageNo") int pageNo, @PathVariable("pageSize") int pageSize) throws IOException { List> list = service.searchPos(keyword, pageNo, pageSize); return list; } ``` PositionServiceImpl.java ```java ... private static final String POSITIOIN_INDEX = "position"; @Autowired private RestHighLevelClient client; // 查找职位 @Override public List> searchPos(String keyword, int pageNo, int pageSize) throws IOException { if (pageNo <= 1) { pageNo = 1; } QueryBuilder builder = QueryBuilders.matchQuery("positionName", keyword); SearchResponse searchResponse = search(builder, pageNo, pageSize); SearchHit[] hits = searchResponse.getHits().getHits(); System.out.println(hits.length); Arrays.stream(hits).forEach(e -> System.out.println(e.getSourceAsMap().get("positionName") + ", " + e.getSourceAsMap().get("positionAdvantage"))); if (hits.length < 5) { System.out.println("======================================"); builder = QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery("positionName", keyword)) .should(QueryBuilders.matchQuery("positionAdvantage", "美女多、员工福利好")); searchResponse = search(builder, pageNo, pageSize); hits = searchResponse.getHits().getHits(); System.out.println(hits.length); Arrays.stream(hits).forEach(e -> System.out.println(e.getSourceAsMap().get("positionName") + ", " + e.getSourceAsMap().get("positionAdvantage"))); } ArrayList> list = new ArrayList<>(); for (SearchHit hit : hits) { list.add(hit.getSourceAsMap()); } return list; } private SearchResponse search(QueryBuilder builder, int pageNo, int pageSize) throws IOException { // 条件搜索 SearchRequest searchRequest = new SearchRequest(POSITIOIN_INDEX); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 分页 index = (当前页-1)*一页显示条数 searchSourceBuilder.from((pageNo - 1) * pageSize); searchSourceBuilder.size(pageSize); searchSourceBuilder.query(builder); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); // 执行搜索 searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); return searchResponse; } ... ``` #### 测试流程 1. 使用SpringBoot 访问es使用positionName 字段检索职位信息。 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1018/012728_dcec8e4a_1712191.png "屏幕截图.png") 2. 如果检索到的职位信息不够5条 则需要启用positionAdvantage 查找【美女多、员工福利好】的企业职位信息进行补充够5条。 查询结果 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1018/012311_c5e821c9_1712191.png "屏幕截图.png") #### 视频讲解 ![视频讲解](reference/md-videos/elasticsearch.mp4)