diff --git a/ruoyi-admin/src/main/resources/application-auth.yml b/ruoyi-admin/src/main/resources/application-auth.yml index 22363227c8410fd05a08de84a46bdeca21657323..0b516c1b32680195a44fa136a8561aa12a552032 100644 --- a/ruoyi-admin/src/main/resources/application-auth.yml +++ b/ruoyi-admin/src/main/resources/application-auth.yml @@ -47,3 +47,9 @@ spring: username: email # 授权码 password: password + +# 本地没有使用mail时,禁用mail健康检查 +management: + health: + mail: + enabled: false diff --git a/ruoyi-admin/src/main/resources/application-plugins.yml b/ruoyi-admin/src/main/resources/application-plugins.yml index a031d7dd33c8bddd0e2ba6d5f564bc35ea47265c..383124a3707ebc9fe29d8c3d0563a18e81f89308 100644 --- a/ruoyi-admin/src/main/resources/application-plugins.yml +++ b/ruoyi-admin/src/main/resources/application-plugins.yml @@ -39,6 +39,53 @@ spring: username: guest password: guest + elasticsearch: + enable: true + uris: + - http://127.0.0.1:9200 + username: elastic + password: 123456 + socket-timeout: 6000 + connect-timeout: 5000 + connection-request-timeout: 5000 + max-retry-times: 3 + max-connect-num: 10 + max-connect-num-per-route: 5 + + kafka: + enable: true + # Kafka 集群地址 + bootstrap-servers: 127.0.0.1:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + spring.json.add.type.headers: false + acks: all + retries: 3 + batch-size: 16384 + buffer-memory: 33554432 + consumer: + group-id: defense-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: "*" + spring.json.use.type.headers: false + enable-auto-commit: false + max-poll-records: 500 + fetch-max-wait: 5000 + # 监听器配置 + listener: + # 批量监听器 + type: batch + # 手动提交确认 + ack-mode: manual_immediate + # 并发消费者数量 + concurrency: 3 + template: + default-topic: "alarm-topic" netty: websocket: @@ -46,4 +93,13 @@ netty: bossThreads: 4 workerThreads: 16 port: 8081 - enable: true + enable: false + +# 本地没有使用时,禁用健康检查, 使用时删除 +management: + health: + rabbit: + enabled: false + elasticsearch: + enabled: false + diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 6af064ef17129b2572b7fdf3e07b38812ef2ea42..45654ea7d558a1aba0fbb48b853dacf96e37896d 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -86,6 +86,10 @@ spring: restart: # 热部署开关 enabled: true + # kafka和flowable中的有冲突 + autoconfigure: + exclude: + - org.flowable.spring.boot.eventregistry.EventRegistryAutoConfiguration # token配置 token: diff --git a/ruoyi-admin/src/main/resources/logback.xml b/ruoyi-admin/src/main/resources/logback.xml index 9d280b41dcd4584011e7ef8ad14216e246861557..c7cea67adc47c2b8fd1a336e5557700eaebc3a3b 100644 --- a/ruoyi-admin/src/main/resources/logback.xml +++ b/ruoyi-admin/src/main/resources/logback.xml @@ -1,7 +1,7 @@ - + diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java index 8a2ed5f97ddd491b9cc6137ba9a6d6f39dcef1d3..46240bd77f6f90a258939a7a486d032545b8066f 100644 --- a/ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java @@ -10,6 +10,8 @@ import org.springframework.security.authentication.dao.DaoAuthenticationProvider import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity; import org.springframework.security.config.annotation.web.builders.HttpSecurity; import org.springframework.security.config.annotation.web.configuration.WebSecurityCustomizer; +import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer; +import org.springframework.security.config.annotation.web.configurers.HeadersConfigurer; import org.springframework.security.config.http.SessionCreationPolicy; import org.springframework.security.core.userdetails.UserDetailsService; import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; @@ -97,11 +99,11 @@ public class SecurityConfig { SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception { return httpSecurity // CSRF禁用,因为不使用session - .csrf(csrf -> csrf.disable()) + .csrf(AbstractHttpConfigurer::disable) // 禁用HTTP响应标头 .headers((headersCustomizer) -> { - headersCustomizer.cacheControl(cache -> cache.disable()) - .frameOptions(options -> options.sameOrigin()); + headersCustomizer.cacheControl(HeadersConfigurer.CacheControlConfig::disable) + .frameOptions(HeadersConfigurer.FrameOptionsConfig::sameOrigin); }) // 认证失败处理类 .exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler)) @@ -113,10 +115,10 @@ public class SecurityConfig { // 对于登录login 注册register 验证码captchaImage 允许匿名访问 requests.requestMatchers("/login", "/register", "/captchaImage").permitAll() // 静态资源,可匿名访问 - .requestMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", + .requestMatchers(HttpMethod.GET, "/", "/**.html", "/**.css", "/**.js", "/profile/**") .permitAll() - .requestMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", + .requestMatchers("/swagger-ui.html", "/swagger-ui/**", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**", "/*/api-docs/**") .permitAll() // 除上面外的所有请求全部需要鉴权认证 @@ -131,6 +133,7 @@ public class SecurityConfig { .addFilterBefore(corsFilter, LogoutFilter.class) .build(); } + /** * 忽略web安全配置 * 主要是忽略websocket的安全配置 diff --git a/ruoyi-geek-plugins/pom.xml b/ruoyi-geek-plugins/pom.xml index d16ae280bc926ae243ed321c3f57448c48bb5a6f..f660c4117d3b6b2b84790b067ba5157be067b274 100644 --- a/ruoyi-geek-plugins/pom.xml +++ b/ruoyi-geek-plugins/pom.xml @@ -109,6 +109,18 @@ ${ruoyi.version} + + com.ruoyi.geekxd + ruoyi-elastic + ${ruoyi.version} + + + + com.ruoyi.geekxd + ruoyi-kafka + ${ruoyi.version} + + com.ruoyi.geekxd ruoyi-geek-plugins-starter @@ -129,6 +141,8 @@ ruoyi-atomikos ruoyi-rabbitmq ruoyi-cache-redis + ruoyi-elastic + ruoyi-kafka pom diff --git a/ruoyi-geek-plugins/ruoyi-elastic/pom.xml b/ruoyi-geek-plugins/ruoyi-elastic/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..85d9296252fc6758c3b65d441bfc153a99da23fb --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/pom.xml @@ -0,0 +1,48 @@ + + + + ruoyi-geek-plugins + com.ruoyi.geekxd + 3.9.0-G + + 4.0.0 + + ruoyi-elastic + + + elastic系统模块 + + + + + com.ruoyi.geekxd + ruoyi-framework + + + + + org.springframework.boot + spring-boot-starter-data-elasticsearch + + + + co.elastic.clients + elasticsearch-java + 8.18.3 + + + com.fasterxml.jackson.core + jackson-databind + 2.20.1 + + + + org.apache.httpcomponents + httpasyncclient + 4.1.5 + + + + + diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/config/ElasticConfig.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/config/ElasticConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..5f1755ba799c856006ef59aef76051c3fb94f4eb --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/config/ElasticConfig.java @@ -0,0 +1,100 @@ +package com.ruoyi.elastic.config; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.elasticsearch.client.RestClient; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Slf4j +@Data +@Configuration +@EnableAutoConfiguration +@ConditionalOnProperty(prefix = "spring.elasticsearch", name = { "enable" }, havingValue = "true", matchIfMissing = false) +@ConfigurationProperties(prefix = "spring.elasticsearch") +public class ElasticConfig { + + private String[] uris; + + private String username; + + private String password; + + private int connectTimeout; + + private int socketTimeout; + + private int connectionRequestTimeout; + + private int maxRetryTimes; + + private int maxConnectNum; + + private int maxConnectNumPerRoute; + + @Bean + public ElasticsearchClient elasticsearchClient() { + log.info("ElasticSearch初始化开始"); + // 1. 创建低级 REST 客户端 + RestClient restClient = RestClient.builder(createHttpHosts()) + .setHttpClientConfigCallback(this::configureHttpClient) + .setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder + .setConnectTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + .setConnectionRequestTimeout(connectionRequestTimeout) + .setMaxRedirects(maxRetryTimes) + ) + .build(); + + // 2. 使用 Jackson 映射器创建传输层 + ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + + // 3. 创建并返回 API 客户端 + ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport); + log.info("ElasticSearch初始化完毕"); + return elasticsearchClient; + } + + /** + * 将配置的URI字符串转换为HttpHost数组 + */ + private HttpHost[] createHttpHosts() { + HttpHost[] hosts = new HttpHost[uris.length]; + for (int i = 0; i < uris.length; i++) { + String uri = uris[i]; + hosts[i] = HttpHost.create(uri); + } + return hosts; + } + + /** + * 配置HTTP客户端,主要用于处理认证 + */ + private HttpAsyncClientBuilder configureHttpClient(HttpAsyncClientBuilder httpClientBuilder) { + if (!username.isEmpty() && !password.isEmpty()) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + .setMaxConnTotal(maxConnectNum) + .setMaxConnPerRoute(maxConnectNumPerRoute); + } + return httpClientBuilder; + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/controller/TestController.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/controller/TestController.java new file mode 100644 index 0000000000000000000000000000000000000000..1cbfd4107fcda5cd15cd8b1811ad5b6c6ebfa25a --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/controller/TestController.java @@ -0,0 +1,650 @@ +package com.ruoyi.elastic.controller; + +import co.elastic.clients.elasticsearch._types.SortOrder; +import com.ruoyi.common.core.controller.BaseController; +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.common.core.page.PageDomain; +import com.ruoyi.common.core.page.TableSupport; +import com.ruoyi.common.utils.sql.SqlUtil; +import com.ruoyi.elastic.model.param.AggQueryParam; +import com.ruoyi.elastic.model.param.AggregationParam; +import com.ruoyi.elastic.model.param.QueryParam; +import com.ruoyi.elastic.model.param.TestDataParam; +import com.ruoyi.elastic.model.vo.ESPageResult; +import com.ruoyi.elastic.model.vo.EsAggResult; +import com.ruoyi.elastic.model.vo.testVO; +import com.ruoyi.elastic.utils.ElasticsearchAggUtil; +import com.ruoyi.elastic.utils.ElasticsearchUtil; +import jakarta.annotation.Resource; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; +import java.util.*; + +@RestController +@RequestMapping("/es") +public class TestController extends BaseController { + + @Resource + public ElasticsearchUtil elasticsearchUtil; + + @Resource + public ElasticsearchAggUtil elasticsearchAggUtil; + + @PostMapping("/bulk") + public AjaxResult bulkInsert(@RequestBody List params) throws IOException { + + Boolean result = elasticsearchUtil.batchInsert("data", params); + return AjaxResult.success(result); + } + + @PostMapping("/index") + public ESPageResult matchQuery() { + PageDomain pageDomain = TableSupport.buildPageRequest(); + Integer pageNum = pageDomain.getPageNum(); + Integer pageSize = pageDomain.getPageSize(); + String orderBy = SqlUtil.escapeOrderBySql(pageDomain.getOrderBy()); + Map map = new HashMap<>(); + // map.put("lable.keyword", "中华人民共和国"); + // map.put("pub_type", "ipv4"); + Map sortMap = new HashMap<>(); + sortMap.put("pub_type.keyword", SortOrder.Desc); + // sortMap.put("id", SortOrder.Desc); + + Map> ranges = new HashMap<>(); + Map filter = new HashMap<>(); + filter.put("gte", "2023-12-16 00:00:00"); + filter.put("lte", "2023-12-31 00:00:00"); + ranges.put("time", filter); + + QueryParam queryParam = new QueryParam(); + queryParam.setIndexName("data_index"); + queryParam.setFieldValueMap(map); + queryParam.setPage(pageNum); + queryParam.setSize(pageSize); + queryParam.setSortFields(null); + queryParam.setQueryType("term"); + queryParam.setRanges(ranges); + return elasticsearchUtil.multiQuery(queryParam, testVO.class); + } + + @PostMapping("/agg") + public EsAggResult aggQuery() { + + //EsAggResult pageResult = demonstrateTermsAggregation(); + EsAggResult pageResult1 = demonstrateCompositeAggregation(); + + return pageResult1; + } + + + /** + * 术语聚合示例 - 按字段分组统计 + */ + public EsAggResult demonstrateTermsAggregation() { + System.out.println("=== 术语聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("alarm"); + + // 设置查询条件 + Map queryConditions = new HashMap<>(); + //queryConditions.put("lable", "apt32"); + param.setQueryConditions(queryConditions); + + // 构建术语聚合 + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("terms"); + aggParam.setField("ip.keyword"); + aggParam.setSize(2); + aggParam.setOrderBy("_count"); // 按文档计数排序 + aggParam.setOrder("desc"); // 降序排列 + + // 设置分页(如果有上一页的after_key) +// Map afterKey = new HashMap<>(); +// afterKey.put("doc_count", FieldValue.of(3)); +// afterKey.put("key", FieldValue.of("email")); +// aggParam.setAfterKey(afterKey); + + Map aggregations = new HashMap<>(); + aggregations.put("agg", aggParam); + param.setAggregations(aggregations); + + // 执行查询 + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("原始数据总条数: " + result.getTotal()); + System.out.println("聚合后总条数: " + result.getAggTotal()); + System.out.println("聚合结果: " + result.getAggregations()); + + return result; + } + + /** + * 复合聚合示例 - 多字段组合聚合 + */ + public EsAggResult demonstrateCompositeAggregation() { + System.out.println("\n=== 复合聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("data_index"); + + // 设置查询条件 + Map queryConditions = new HashMap<>(); + //queryConditions.put("pub_type.keyword", "ipv4"); + param.setQueryConditions(queryConditions); + + // 构建复合聚合 + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("composite"); + aggParam.setSize(10); + + Map> ranges = new HashMap<>(); + Map filter = new HashMap<>(); + filter.put("gte", "2023-12-16 00:00:00"); + filter.put("lte", "2024-01-31 00:00:00"); + ranges.put("time", filter); + + // 设置复合聚合源(多字段) + Map compositeSources = new HashMap<>(); + compositeSources.put("pub_type.keyword", "terms"); + //compositeSources.put("response_ip.keyword", "terms"); + aggParam.setCompositeSources(compositeSources); + + // 设置排序 + aggParam.setOrderBy("_key"); + aggParam.setOrder("desc"); + + // 设置分页(如果有上一页的after_key) +// Map afterKey = new HashMap<>(); +// afterKey.put("sip.keyword", "192.168.1.57"); +// afterKey.put("dip.keyword", "192.168.5.6"); +// aggParam.setAfterKey(afterKey); + + Map aggregations = new HashMap<>(); + aggregations.put("agg", aggParam); + param.setAggregations(aggregations); + param.setRanges(ranges); + param.setFormat("yyyy-MM-dd HH:mm:ss"); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + long aggregationTotalCount = elasticsearchAggUtil.getAggregationTotalCount(param); + result.setAggTotal(aggregationTotalCount); + + System.out.println("聚合后总条数: " + result.getAggTotal()); + System.out.println("After Key: " + result.getAfterKey()); + System.out.println("聚合计数: " + result.getAggCounts()); + return result; + } + + /** + * 统计聚合示例 - 数值字段统计分析 + */ + public EsAggResult demonstrateStatsAggregation() { + System.out.println("\n=== 统计聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("stats"); + aggParam.setField("price"); + + Map aggregations = new HashMap<>(); + aggregations.put("price_stats", aggParam); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("价格统计结果: " + result.getAggregations()); + return result; + } + + /** + * 求和聚合示例 - 数值字段求和 + */ + public EsAggResult demonstrateSumAggregation() { + System.out.println("\n=== 求和聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("orders"); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("sum"); + aggParam.setField("total_amount"); + + Map aggregations = new HashMap<>(); + aggregations.put("total_sum", aggParam); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("订单总金额: " + result.getAggregations()); + return result; + } + + /** + * 平均值聚合示例 - 数值字段平均值 + */ + public EsAggResult demonstrateAvgAggregation() { + System.out.println("\n=== 平均值聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("avg"); + aggParam.setField("rating"); + + Map aggregations = new HashMap<>(); + aggregations.put("avg_rating", aggParam); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("平均评分: " + result.getAggregations()); + return result; + } + + /** + * 嵌套聚合示例 - 嵌套文档聚合 + */ + public EsAggResult demonstrateNestedAggregation() { + System.out.println("\n=== 嵌套聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + // 主聚合:嵌套聚合 + AggregationParam nestedAgg = new AggregationParam(); + nestedAgg.setType("nested"); + nestedAgg.setPath("variants"); // 嵌套字段路径 + + // 子聚合:在嵌套文档中执行术语聚合 + AggregationParam termsSubAgg = new AggregationParam(); + termsSubAgg.setType("terms"); + termsSubAgg.setField("variants.color.keyword"); + termsSubAgg.setSize(5); + + Map subAggs = new HashMap<>(); + subAggs.put("color_agg", termsSubAgg); + nestedAgg.setSubAggregations(subAggs); + + Map aggregations = new HashMap<>(); + aggregations.put("nested_variants", nestedAgg); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("嵌套聚合结果: " + result.getAggregations()); + return result; + } + + /** + * 过滤聚合示例 - 对聚合结果应用过滤 + */ + public EsAggResult demonstrateFilterAggregation() { + System.out.println("\n=== 过滤聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("filter"); + + // 设置过滤条件 + Map filterConditions = new HashMap<>(); + filterConditions.put("price", 100); + filterConditions.put("category.keyword", "electronics"); + aggParam.setFilterConditions(filterConditions); + + Map aggregations = new HashMap<>(); + aggregations.put("filtered_agg", aggParam); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("过滤聚合结果: " + result.getAggregations()); + return result; + } + + /** + * 多术语聚合示例 - 多字段组合聚合 + */ + public EsAggResult demonstrateMultiTermsAggregation() { + System.out.println("\n=== 多术语聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("multi_terms"); + + // 设置多个字段 + List fields = Arrays.asList("category.keyword", "brand.keyword", "status.keyword"); + aggParam.setFields(fields); + aggParam.setSize(50); + + Map aggregations = new HashMap<>(); + aggregations.put("multi_field_agg", aggParam); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("多字段聚合总条数: " + result.getAggTotal()); + System.out.println("聚合结果: " + result.getAggregations()); + return result; + } + + /** + * 复杂嵌套聚合示例 - 多层嵌套和子聚合 + */ + public EsAggResult demonstrateComplexNestedAggregation() { + System.out.println("\n=== 复杂嵌套聚合示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("orders"); + + // 第一层:按日期分组的术语聚合 + AggregationParam dateTermsAgg = new AggregationParam(); + dateTermsAgg.setType("terms"); + dateTermsAgg.setField("order_date.keyword"); + dateTermsAgg.setSize(10); + dateTermsAgg.setOrderBy("_key"); + dateTermsAgg.setOrder("desc"); + + // 第二层:按客户分组的子聚合 + AggregationParam customerTermsAgg = new AggregationParam(); + customerTermsAgg.setType("terms"); + customerTermsAgg.setField("customer_id.keyword"); + customerTermsAgg.setSize(5); + + // 第三层:统计子聚合 + AggregationParam statsSubAgg = new AggregationParam(); + statsSubAgg.setType("stats"); + statsSubAgg.setField("total_amount"); + + Map statsSubAggs = new HashMap<>(); + statsSubAggs.put("amount_stats", statsSubAgg); + customerTermsAgg.setSubAggregations(statsSubAggs); + + Map customerSubAggs = new HashMap<>(); + customerSubAggs.put("customer_agg", customerTermsAgg); + dateTermsAgg.setSubAggregations(customerSubAggs); + + Map aggregations = new HashMap<>(); + aggregations.put("date_customer_agg", dateTermsAgg); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("复杂嵌套聚合结果: " + result.getAggregations()); + return result; + } + + /** + * 聚合结果过滤示例 - 对聚合结果应用过滤条件 + */ + public EsAggResult demonstrateAggregationFiltering() { + System.out.println("\n=== 聚合结果过滤示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + // 设置聚合结果过滤条件 + Map aggFilterConditions = new HashMap<>(); + aggFilterConditions.put("min_price", 50); + aggFilterConditions.put("max_price", 500); + param.setAggFilterConditions(aggFilterConditions); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("terms"); + aggParam.setField("category.keyword"); + aggParam.setSize(10); + + Map aggregations = new HashMap<>(); + aggregations.put("filtered_category_agg", aggParam); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("过滤后聚合结果: " + result.getAggregations()); + return result; + } + + /** + * 获取聚合总条数示例 + */ + public EsAggResult demonstrateGetAggregationTotalCount() { + System.out.println("\n=== 获取聚合总条数示例 ==="); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("products"); + + AggregationParam aggParam = new AggregationParam(); + aggParam.setType("composite"); + aggParam.setSize(100); + + Map compositeSources = new HashMap<>(); + compositeSources.put("category.keyword", "terms"); + compositeSources.put("brand.keyword", "terms"); + aggParam.setCompositeSources(compositeSources); + + Map aggregations = new HashMap<>(); + aggregations.put("total_count_agg", aggParam); + param.setAggregations(aggregations); + + // 获取聚合总条数(遍历所有分页) + long totalCount = elasticsearchAggUtil.getAggregationTotalCount(param); + + System.out.println("聚合总条数: " + totalCount); + + // 同时执行普通聚合查询获取当前页数据 + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + System.out.println("当前页条数: " + result.getAggTotal()); + System.out.println("After Key: " + result.getAfterKey()); + return result; + } + + /** + * 综合示例:展示所有聚合类型的使用 + */ + public void demonstrateAllAggregationTypes() { + System.out.println("=== Elasticsearch聚合类型综合示例 ==="); + + // 1. 术语聚合 + demonstrateTermsAggregation(); + + // 2. 复合聚合 + demonstrateCompositeAggregation(); + + // 3. 统计聚合 + demonstrateStatsAggregation(); + + // 4. 求和聚合 + demonstrateSumAggregation(); + + // 5. 平均值聚合 + demonstrateAvgAggregation(); + + // 6. 嵌套聚合 + demonstrateNestedAggregation(); + + // 7. 过滤聚合 + demonstrateFilterAggregation(); + + // 8. 多术语聚合 + demonstrateMultiTermsAggregation(); + + // 9. 复杂嵌套聚合 + demonstrateComplexNestedAggregation(); + + // 10. 聚合结果过滤 + demonstrateAggregationFiltering(); + + // 11. 获取聚合总条数 + demonstrateGetAggregationTotalCount(); + + System.out.println("=== 所有聚合示例执行完成 ==="); + } + + /** + * 实际业务场景示例 + */ + public void demonstrateBusinessScenarios() { + System.out.println("\n=== 实际业务场景示例 ==="); + + // 场景1:电商产品分析 + demonstrateEcommerceProductAnalysis(); + + // 场景2:订单数据分析 + demonstrateOrderDataAnalysis(); + + // 场景3:用户行为分析 + demonstrateUserBehaviorAnalysis(); + } + + /** + * 电商产品分析场景 + */ + private void demonstrateEcommerceProductAnalysis() { + System.out.println("\n--- 电商产品分析场景 ---"); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("ecommerce_products"); + + // 查询条件:只分析上架状态的产品 + Map queryConditions = new HashMap<>(); + queryConditions.put("status", "active"); + queryConditions.put("price", 0); // 价格大于0 + param.setQueryConditions(queryConditions); + + // 构建多层聚合分析 + AggregationParam categoryAgg = new AggregationParam(); + categoryAgg.setType("terms"); + categoryAgg.setField("category.keyword"); + categoryAgg.setSize(20); + categoryAgg.setOrderBy("_count"); + categoryAgg.setOrder("desc"); + + // 子聚合1:品牌分布 + AggregationParam brandAgg = new AggregationParam(); + brandAgg.setType("terms"); + brandAgg.setField("brand.keyword"); + brandAgg.setSize(10); + brandAgg.setOrderBy("_count"); + brandAgg.setOrder("desc"); + + // 子聚合2:价格统计 + AggregationParam priceStatsAgg = new AggregationParam(); + priceStatsAgg.setType("stats"); + priceStatsAgg.setField("price"); + + Map priceSubAggs = new HashMap<>(); + priceSubAggs.put("price_stats", priceStatsAgg); + brandAgg.setSubAggregations(priceSubAggs); + + Map brandSubAggs = new HashMap<>(); + brandSubAggs.put("brand_analysis", brandAgg); + categoryAgg.setSubAggregations(brandSubAggs); + + Map aggregations = new HashMap<>(); + aggregations.put("product_analysis", categoryAgg); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("产品分类分析完成"); + System.out.println("总产品数: " + result.getTotal()); + System.out.println("分类数量: " + result.getAggTotal()); + } + + /** + * 订单数据分析场景 + */ + private void demonstrateOrderDataAnalysis() { + System.out.println("\n--- 订单数据分析场景 ---"); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("orders"); + + // 查询条件:最近30天的订单 + Map queryConditions = new HashMap<>(); + queryConditions.put("order_date", "2024-01-01"); + queryConditions.put("status", "completed"); + param.setQueryConditions(queryConditions); + + // 复合聚合:多维度分析 + AggregationParam compositeAgg = new AggregationParam(); + compositeAgg.setType("composite"); + compositeAgg.setSize(1000); + + Map sources = new HashMap<>(); + sources.put("customer_region.keyword", "terms"); + sources.put("product_category.keyword", "terms"); + sources.put("order_date", "date_histogram"); + compositeAgg.setCompositeSources(sources); + + compositeAgg.setDateInterval("1d"); + compositeAgg.setFormat("yyyy-MM-dd"); + + Map aggregations = new HashMap<>(); + aggregations.put("order_analysis", compositeAgg); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("订单多维分析完成"); + System.out.println("分析维度组合数: " + result.getAggTotal()); + } + + /** + * 用户行为分析场景 + */ + private void demonstrateUserBehaviorAnalysis() { + System.out.println("\n--- 用户行为分析场景 ---"); + + AggQueryParam param = new AggQueryParam(); + param.setIndexName("user_events"); + + // 嵌套聚合分析用户行为 + AggregationParam userSessionAgg = new AggregationParam(); + userSessionAgg.setType("terms"); + userSessionAgg.setField("user_id.keyword"); + userSessionAgg.setSize(100); + + // 子聚合:会话内事件分析 + AggregationParam eventTypeAgg = new AggregationParam(); + eventTypeAgg.setType("terms"); + eventTypeAgg.setField("event_type.keyword"); + eventTypeAgg.setSize(10); + + // 孙子聚合:事件属性分析 + AggregationParam eventPropertyAgg = new AggregationParam(); + eventPropertyAgg.setType("terms"); + eventPropertyAgg.setField("event_property.keyword"); + eventPropertyAgg.setSize(5); + + Map propertySubAggs = new HashMap<>(); + propertySubAggs.put("property_analysis", eventPropertyAgg); + eventTypeAgg.setSubAggregations(propertySubAggs); + + Map eventSubAggs = new HashMap<>(); + eventSubAggs.put("event_analysis", eventTypeAgg); + userSessionAgg.setSubAggregations(eventSubAggs); + + Map aggregations = new HashMap<>(); + aggregations.put("user_behavior_analysis", userSessionAgg); + param.setAggregations(aggregations); + + EsAggResult result = elasticsearchAggUtil.aggQuery(param); + + System.out.println("用户行为分析完成"); + System.out.println("分析用户数: " + result.getAggTotal()); + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/AggQueryParam.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/AggQueryParam.java new file mode 100644 index 0000000000000000000000000000000000000000..06b3d424ea51e7149c4c62bb77acf6dfa33ee215 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/AggQueryParam.java @@ -0,0 +1,25 @@ +package com.ruoyi.elastic.model.param; + +import lombok.Data; + +import java.util.Map; + +@Data +public class AggQueryParam { + private String indexName; + + // 源数据查询条件 + private Map queryConditions; + + // 聚合配置 + private Map aggregations; + + // 聚合结果过滤条件 + private Map aggFilterConditions; + + // 范围查询参数(新增) + private Map> ranges; + + // 范围查询时间格式 + private String format; +} \ No newline at end of file diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/AggregationParam.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/AggregationParam.java new file mode 100644 index 0000000000000000000000000000000000000000..d002eb23e43eb06ed11dca59672887c8cfd7b1d3 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/AggregationParam.java @@ -0,0 +1,237 @@ +package com.ruoyi.elastic.model.param; + +import co.elastic.clients.elasticsearch._types.SortOrder; +import lombok.Data; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 聚合查询参数模型 + * 支持多字段聚合、嵌套聚合、复合聚合等高级功能 + */ +@Data +public class AggregationParam { + + /** + * 聚合类型 + * 支持:terms, multi_terms, composite, stats, sum, avg, nested, filter, date_histogram, histogram + */ + private String type; + + /** + * 聚合字段(单字段聚合使用) + */ + private String field; + + /** + * 聚合字段列表(多字段聚合使用) + */ + private List fields; + + /** + * 嵌套聚合路径(nested聚合使用) + */ + private String path; + + /** + * 聚合结果分桶大小 + */ + private Integer size; + + /** + * 排序字段 + */ + private String orderBy; + + /** + * 排序方式:asc, desc + */ + private String order; + + /** + * 过滤条件(filter聚合使用) + */ + private Map filterConditions; + + /** + * 子聚合配置(支持嵌套聚合) + */ + private Map subAggregations; + + /** + * 复合聚合源配置 + * key: 字段名, value: 聚合类型(terms, histogram, date_histogram) + */ + private Map compositeSources; + + private Map fieldOrders = new HashMap<>(); + + /** + * 直方图间隔(histogram聚合使用) + */ + private Double interval; + + /** + * 日期直方图间隔(date_histogram聚合使用) + * 格式:1d, 1h, 30m等 + */ + private String dateInterval; + + /** + * 日期格式(date_histogram聚合使用) + */ + private String format; + + /** + * 复合聚合分页键(composite聚合使用) + */ + private Map afterKey; + + /** + * 默认构造函数 + */ + public AggregationParam() { + this.filterConditions = new HashMap<>(); + this.subAggregations = new HashMap<>(); + this.compositeSources = new HashMap<>(); + } + + /** + * 快速创建术语聚合 + */ + public static AggregationParam terms(String field, Integer size) { + AggregationParam param = new AggregationParam(); + param.setType("terms"); + param.setField(field); + param.setSize(size); + return param; + } + + /** + * 快速创建多字段术语聚合 + */ + public static AggregationParam multiTerms(List fields, Integer size) { + AggregationParam param = new AggregationParam(); + param.setType("multi_terms"); + param.setFields(fields); + param.setSize(size); + return param; + } + + /** + * 快速创建统计聚合 + */ + public static AggregationParam stats(String field) { + AggregationParam param = new AggregationParam(); + param.setType("stats"); + param.setField(field); + return param; + } + + /** + * 快速创建求和聚合 + */ + public static AggregationParam sum(String field) { + AggregationParam param = new AggregationParam(); + param.setType("sum"); + param.setField(field); + return param; + } + + /** + * 快速创建平均值聚合 + */ + public static AggregationParam avg(String field) { + AggregationParam param = new AggregationParam(); + param.setType("avg"); + param.setField(field); + return param; + } + + /** + * 快速创建嵌套聚合 + */ + public static AggregationParam nested(String path) { + AggregationParam param = new AggregationParam(); + param.setType("nested"); + param.setPath(path); + return param; + } + + /** + * 快速创建过滤聚合 + */ + public static AggregationParam filter(Map conditions) { + AggregationParam param = new AggregationParam(); + param.setType("filter"); + param.setFilterConditions(conditions); + return param; + } + + /** + * 添加子聚合 + */ + public AggregationParam addSubAggregation(String name, AggregationParam subAgg) { + if (this.subAggregations == null) { + this.subAggregations = new HashMap<>(); + } + this.subAggregations.put(name, subAgg); + return this; + } + + /** + * 添加复合聚合源 + */ + public AggregationParam addCompositeSource(String field, String aggType) { + if (this.compositeSources == null) { + this.compositeSources = new HashMap<>(); + } + this.compositeSources.put(field, aggType); + return this; + } + + /** + * 添加过滤条件 + */ + public AggregationParam addFilterCondition(String field, Object value) { + if (this.filterConditions == null) { + this.filterConditions = new HashMap<>(); + } + this.filterConditions.put(field, value); + return this; + } + + /** + * 设置排序 + */ + public AggregationParam orderBy(String field, String order) { + this.orderBy = field; + this.order = order; + return this; + } + + /** + * 验证参数有效性 + */ + public boolean isValid() { + if (type == null || type.trim().isEmpty()) { + return false; + } + + return switch (type.toLowerCase()) { + case "terms" -> field != null && !field.trim().isEmpty(); + case "multi_terms" -> fields != null && !fields.isEmpty(); + case "nested" -> path != null && !path.trim().isEmpty(); + case "composite" -> compositeSources != null && !compositeSources.isEmpty(); + case "stats", "sum", "avg" -> field != null && !field.trim().isEmpty(); + case "filter" -> filterConditions != null && !filterConditions.isEmpty(); + default -> true; + }; + } + + // 新增排序字段设置方法 + public void addFieldOrder(String fieldName, SortOrder order) { + this.fieldOrders.put(fieldName, order); + } +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/QueryParam.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/QueryParam.java new file mode 100644 index 0000000000000000000000000000000000000000..d6a6b496f1bd569f09e35a4ba610d9031656171a --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/QueryParam.java @@ -0,0 +1,25 @@ +package com.ruoyi.elastic.model.param; + +import co.elastic.clients.elasticsearch._types.SortOrder; +import lombok.Data; + +import java.util.Map; + +@Data +public class QueryParam { + private String indexName; + private String field; + private String value; + private Map fieldValueMap; + private Integer page = 1; + private Integer size = 10; + private String operator = "AND"; + private Map sortFields; + private String queryType = "match"; + private Map aggregations; + // 范围查询参数(新增) + private Map> ranges; + + // 范围查询时间格式 + private String format = "yyyy-MM-dd HH:mm:ss"; +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/TestDataParam.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/TestDataParam.java new file mode 100644 index 0000000000000000000000000000000000000000..3047ef46eae41767f40e5d39fda3ae2afef44b85 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/param/TestDataParam.java @@ -0,0 +1,13 @@ +package com.ruoyi.elastic.model.param; + +import lombok.Data; + +@Data +public class TestDataParam { + private String pub_data; + private String pub_indicator; + private String source_url; + private String pub_type; + private String data_type; + private String lable; +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/ESPageResult.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/ESPageResult.java new file mode 100644 index 0000000000000000000000000000000000000000..a76cdc3bc11b08341cf8aa3e6899197e7f69f2ac --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/ESPageResult.java @@ -0,0 +1,70 @@ +package com.ruoyi.elastic.model.vo; + +import com.ruoyi.common.constant.HttpStatus; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.util.Collections; +import java.util.List; + +@Data +public class ESPageResult { + /** + * 总记录数 + */ + @Schema(title = "总记录数") + private Long total; + + /** + * 列表数据 + */ + @Schema(title = "列表数据") + private List rows; + + @Schema(title = "当前页码") + private Integer page; + + @Schema(title = "每页大小") + private Integer size; + + /** + * 消息状态码 + */ + @Schema(title = "消息状态码") + private int code; + + /** + * 消息内容 + */ + @Schema(title = "消息内容") + private String msg; + + /** + * 总页数 + */ + @Schema(title = "总页数") + private Integer pages; + + @Schema(title = "是否有上一页") + private Boolean hasPrevious; + + @Schema(title = "是否有下一页") + private Boolean hasNext; + + public ESPageResult(List rows, Long total, Integer page, Integer size, String msg) { + this.rows = rows; + this.total = total; + this.page = page; + this.size = size; + this.pages = (int) Math.ceil((double) total / size); + this.hasPrevious = page > 1; + this.hasNext = page < pages; + this.code = HttpStatus.SUCCESS; + this.msg = msg; + } + + public static ESPageResult empty(Integer page, Integer size) { + return new ESPageResult<>(Collections.emptyList(), 0L, page, size, "success"); + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/EsAggResult.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/EsAggResult.java new file mode 100644 index 0000000000000000000000000000000000000000..1e5a1c111595fa89c22d78e7a0273773a56d400f --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/EsAggResult.java @@ -0,0 +1,14 @@ +package com.ruoyi.elastic.model.vo; + +import lombok.Data; + +import java.util.Map; + +@Data +public class EsAggResult { + private Map aggregations; + private Map afterKey; + private Long aggTotal; + private Long total; + Map aggCounts; +} \ No newline at end of file diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/testVO.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/testVO.java new file mode 100644 index 0000000000000000000000000000000000000000..6dea28013fff3ef16a4274d9796bc7bdf19c3ada --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/model/vo/testVO.java @@ -0,0 +1,40 @@ +package com.ruoyi.elastic.model.vo; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; +import org.springframework.data.annotation.Id; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.annotations.Field; +import org.springframework.data.elasticsearch.annotations.FieldType; + + +@Data +@Document(indexName = "data_index") +@JsonIgnoreProperties(ignoreUnknown = true) +public class testVO { + + @Id + private String id; + + @Field(name = "create_time", type = FieldType.Text) + private String create_time; + + @Field(name = "pub_time", type = FieldType.Text) + private String pub_time; + + @Field(name = "task_id", type = FieldType.Text) + private String task_id; + + @Field(name = "data", type = FieldType.Text) + private String data; + + @Field(name = "content", type = FieldType.Text) + private String content; + + @Field(name = "pub_type", type = FieldType.Keyword) + private String pub_type; + + @Field(name = "lable", type = FieldType.Text) + private String lable; + +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/utils/ElasticsearchAggUtil.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/utils/ElasticsearchAggUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..a3df158cec3b3bf53ebb9c180bbcca4c6fda780b --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/utils/ElasticsearchAggUtil.java @@ -0,0 +1,739 @@ +package com.ruoyi.elastic.utils; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.FieldValue; +import co.elastic.clients.elasticsearch._types.SortOrder; +import co.elastic.clients.elasticsearch._types.aggregations.*; +import co.elastic.clients.elasticsearch._types.query_dsl.*; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.util.NamedValue; +import com.ruoyi.elastic.model.param.AggregationParam; +import com.ruoyi.elastic.model.param.AggQueryParam; +import com.ruoyi.elastic.model.vo.EsAggResult; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.*; + +/** + * Elasticsearch 8.x 聚合查询工具类 + * 修复composite聚合分页查询aggTotal总数计算错误问题 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ElasticsearchAggUtil { + + private final ElasticsearchClient esClient; + + /** + * 聚合查询 - 修复composite聚合分页问题 + */ + public EsAggResult aggQuery(AggQueryParam queryParam) { + try { + SearchRequest searchRequest = buildAggSearchRequest(queryParam); + SearchResponse response = esClient.search(searchRequest, Object.class); + return parseAggSearchResponse(response, queryParam); + } catch (IOException e) { + log.error("Elasticsearch聚合查询失败", e); + throw new RuntimeException("Elasticsearch聚合查询失败", e); + } + } + + /** + * 获取聚合数据总条数 + */ + public long getAggregationTotalCount(AggQueryParam queryParam) { + try { + if (isCompositeAggregation(queryParam)) { + return getCompositeAggregationTotalCount(queryParam); + } else if (isTermsAggregation(queryParam)) { + return getTermsAggregationTotalCount(queryParam); + } else { + EsAggResult result = aggQuery(queryParam); + return result.getAggTotal(); + } + } catch (IOException e) { + log.error("获取聚合总条数失败", e); + throw new RuntimeException("获取聚合总条数失败", e); + } + } + + /** + * 判断是否为复合聚合 + */ + private boolean isCompositeAggregation(AggQueryParam queryParam) { + if (queryParam.getAggregations() == null || queryParam.getAggregations().isEmpty()) { + return false; + } + + for (AggregationParam aggParam : queryParam.getAggregations().values()) { + if ("composite".equalsIgnoreCase(aggParam.getType()) || + "multi_terms".equalsIgnoreCase(aggParam.getType())) { + return true; + } + } + return false; + } + + /** + * 判断是否为术语聚合 + */ + private boolean isTermsAggregation(AggQueryParam queryParam) { + if (queryParam.getAggregations() == null || queryParam.getAggregations().isEmpty()) { + return false; + } + + for (AggregationParam aggParam : queryParam.getAggregations().values()) { + if ("terms".equalsIgnoreCase(aggParam.getType())) { + return true; + } + } + return false; + } + + /** + * 获取复合聚合的总条数 - 修复分页计算错误 + */ + private long getCompositeAggregationTotalCount(AggQueryParam originalQueryParam) throws IOException { + long totalCount = 0L; + Map afterKey = null; + int maxIterations = 1000; + + // 修复:创建新的查询参数,确保清除afterKey,从第一页开始计算 + AggQueryParam queryParam = cloneQueryParam(originalQueryParam); + + // 清除所有复合聚合的afterKey,确保从第一页开始计算总桶数 + if (queryParam.getAggregations() != null) { + for (AggregationParam aggParam : queryParam.getAggregations().values()) { + if ("composite".equalsIgnoreCase(aggParam.getType()) || + "multi_terms".equalsIgnoreCase(aggParam.getType())) { + aggParam.setAfterKey(null); + } + } + } + + for (int i = 0; i < maxIterations; i++) { + if (afterKey != null) { + for (AggregationParam aggParam : queryParam.getAggregations().values()) { + if ("composite".equalsIgnoreCase(aggParam.getType()) || + "multi_terms".equalsIgnoreCase(aggParam.getType())) { + aggParam.setAfterKey(convertObject(afterKey)); + } + } + } + + SearchRequest searchRequest = buildAggSearchRequest(queryParam); + SearchResponse response = esClient.search(searchRequest, Object.class); + + if (response.aggregations() == null || response.aggregations().isEmpty()) { + break; + } + + long currentPageCount = 0; + Map currentAfterKey = null; + + for (Map.Entry entry : response.aggregations().entrySet()) { + Aggregate aggregate = entry.getValue(); + if (aggregate == null) { + continue; + } + if (aggregate.isComposite()) { + CompositeAggregate composite = aggregate.composite(); + currentPageCount = composite.buckets().array().size(); + totalCount += currentPageCount; + + if (composite.afterKey() != null && !composite.afterKey().isEmpty()) { + currentAfterKey = composite.afterKey(); + } + break; // 只处理第一个复合聚合 + } + } + + if (currentAfterKey == null) { + break; + } + + afterKey = currentAfterKey; + + if (currentPageCount == 0) { + break; + } + } + + return totalCount; + } + + /** + * 获取术语聚合的总条数 + */ + private long getTermsAggregationTotalCount(AggQueryParam originalQueryParam) throws IOException { + AggQueryParam queryParam = cloneQueryParam(originalQueryParam); + + if (queryParam.getAggregations() != null) { + for (AggregationParam aggParam : queryParam.getAggregations().values()) { + if ("terms".equalsIgnoreCase(aggParam.getType())) { + aggParam.setSize(10000); + } + } + } + + SearchRequest searchRequest = buildAggSearchRequest(queryParam); + SearchResponse response = esClient.search(searchRequest, Object.class); + + if (response.aggregations() == null || response.aggregations().isEmpty()) { + return 0L; + } + + long totalCount = 0L; + for (Map.Entry entry : response.aggregations().entrySet()) { + Aggregate aggregate = entry.getValue(); + if (aggregate == null) { + continue; + } + if (aggregate.isSterms()) { + StringTermsAggregate terms = aggregate.sterms(); + totalCount = terms.buckets().array().size(); + if (terms.sumOtherDocCount() != null && terms.sumOtherDocCount() > 0) { + log.warn("术语聚合有未返回的桶,总桶数可能不准确。sum_other_doc_count: {}", terms.sumOtherDocCount()); + } + } else if (aggregate.isLterms()) { + LongTermsAggregate terms = aggregate.lterms(); + totalCount = terms.buckets().array().size(); + if (terms.sumOtherDocCount() != null && terms.sumOtherDocCount() > 0) { + log.warn("术语聚合有未返回的桶,总桶数可能不准确。sum_other_doc_count: {}", terms.sumOtherDocCount()); + } + } else if (aggregate.isDterms()) { + DoubleTermsAggregate terms = aggregate.dterms(); + totalCount = terms.buckets().array().size(); + if (terms.sumOtherDocCount() != null && terms.sumOtherDocCount() > 0) { + log.warn("术语聚合有未返回的桶,总桶数可能不准确。sum_other_doc_count: {}", terms.sumOtherDocCount()); + } + } + } + + return totalCount; + } + + /** + * 克隆查询参数 + */ + private AggQueryParam cloneQueryParam(AggQueryParam original) { + AggQueryParam clone = new AggQueryParam(); + clone.setIndexName(original.getIndexName()); + clone.setQueryConditions(original.getQueryConditions() != null ? + new HashMap<>(original.getQueryConditions()) : null); + clone.setAggFilterConditions(original.getAggFilterConditions() != null ? + new HashMap<>(original.getAggFilterConditions()) : null); + clone.setRanges(original.getRanges() != null ? + new HashMap<>(original.getRanges()) : null); + + if (original.getAggregations() != null) { + Map aggClone = new HashMap<>(); + for (Map.Entry entry : original.getAggregations().entrySet()) { + AggregationParam originalAgg = entry.getValue(); + if (originalAgg == null) { + continue; + } + AggregationParam clonedAgg = new AggregationParam(); + + clonedAgg.setType(originalAgg.getType()); + clonedAgg.setField(originalAgg.getField()); + clonedAgg.setFields(originalAgg.getFields() != null ? + new ArrayList<>(originalAgg.getFields()) : null); + clonedAgg.setSize(originalAgg.getSize()); + clonedAgg.setOrderBy(originalAgg.getOrderBy()); + clonedAgg.setOrder(originalAgg.getOrder()); + clonedAgg.setPath(originalAgg.getPath()); + clonedAgg.setInterval(originalAgg.getInterval()); + clonedAgg.setDateInterval(originalAgg.getDateInterval()); + clonedAgg.setFormat(originalAgg.getFormat()); + clonedAgg.setAfterKey(originalAgg.getAfterKey() != null ? + new HashMap<>(originalAgg.getAfterKey()) : null); + clonedAgg.setFilterConditions(originalAgg.getFilterConditions() != null ? + new HashMap<>(originalAgg.getFilterConditions()) : null); + clonedAgg.setCompositeSources(originalAgg.getCompositeSources() != null ? + new HashMap<>(originalAgg.getCompositeSources()) : null); + + aggClone.put(entry.getKey(), clonedAgg); + } + clone.setAggregations(aggClone); + } + + return clone; + } + + /** + * 构建聚合搜索请求 + */ + private SearchRequest buildAggSearchRequest(AggQueryParam queryParam) { + SearchRequest.Builder searchBuilder = new SearchRequest.Builder(); + + searchBuilder.index(queryParam.getIndexName()); + + searchBuilder.query(buildQuery(queryParam)); + + searchBuilder.size(0); + + if (queryParam.getAggregations() != null && !queryParam.getAggregations().isEmpty()) { + buildAggregations(searchBuilder, queryParam); + } + + return searchBuilder.build(); + } + + /** + * 构建查询条件 + */ + private Query buildQuery(AggQueryParam queryParam) { + List mustQueries = new ArrayList<>(); + + // 处理普通查询条件 + if (queryParam.getQueryConditions() != null && !queryParam.getQueryConditions().isEmpty()) { + for (Map.Entry entry : queryParam.getQueryConditions().entrySet()) { + mustQueries.add(Query.of(q -> q.term(t -> t + .field(entry.getKey()) + .value(FieldValue.of(entry.getValue().toString())) + ))); + } + } + + // 处理时间范围查询(新增逻辑) + if (queryParam.getRanges() != null && !queryParam.getRanges().isEmpty()) { + for (Map.Entry> entry : queryParam.getRanges().entrySet()) { + String field = entry.getKey(); + Map range = entry.getValue(); + + RangeQuery rangeQuery = RangeQuery.of(r -> r + .date(n -> { + n.field(field).format(queryParam.getFormat()); + + if (range.containsKey("gt") && range.containsKey("gte")) { + throw new IllegalArgumentException("'大于'和'大于等于'只能包含一个"); + } + if (range.containsKey("lt") && range.containsKey("lte")) { + throw new IllegalArgumentException("'小于'和'小于等于'只能包含一个"); + } + + if (range.containsKey("gt")) { + n.gt(range.get("gt")); + } + if (range.containsKey("gte")) { + n.gte(range.get("gte")); + } + if (range.containsKey("lte")) { + n.lte(range.get("lte")); + } + if (range.containsKey("lt")) { + n.lt(range.get("lt")); + } + + return n; + } + ) + ); + + mustQueries.add(rangeQuery._toQuery()); + } + } + + return Query.of(q -> q.bool(b -> b.must(mustQueries))); + } + + /** + * 构建聚合条件 + */ + private void buildAggregations(SearchRequest.Builder searchBuilder, AggQueryParam queryParam) { + for (Map.Entry entry : queryParam.getAggregations().entrySet()) { + String aggName = entry.getKey(); + AggregationParam aggParam = entry.getValue(); + if (aggParam == null) { + continue; + } + Aggregation aggregation = buildAggregation(aggParam); + searchBuilder.aggregations(aggName, aggregation); + } + } + + /** + * 构建单个聚合 + */ + private Aggregation buildAggregation(AggregationParam aggParam) { + Aggregation.Builder aggBuilder = new Aggregation.Builder(); + + switch (aggParam.getType().toLowerCase()) { + case "terms" -> aggBuilder.terms(buildTermsAggregation(aggParam)); + case "composite" -> aggBuilder.composite(buildCompositeAggregation(aggParam)); + case "stats" -> aggBuilder.stats(b -> b.field(aggParam.getField())); + case "sum" -> aggBuilder.sum(b -> b.field(aggParam.getField())); + case "avg" -> aggBuilder.avg(b -> b.field(aggParam.getField())); + case "nested" -> aggBuilder.nested(b -> b.path(aggParam.getPath())); + case "multi_terms" -> buildMultiTermsAggregation(aggBuilder, aggParam); + default -> throw new IllegalArgumentException("不支持的聚合类型: " + aggParam.getType()); + } + + if (aggParam.getSubAggregations() != null && !aggParam.getSubAggregations().isEmpty()) { + Map subAggs = new HashMap<>(); + for (Map.Entry subEntry : aggParam.getSubAggregations().entrySet()) { + subAggs.put(subEntry.getKey(), buildAggregation(subEntry.getValue())); + } + aggBuilder.aggregations(subAggs); + } + + return aggBuilder.build(); + } + + /** + * 构建多字段术语聚合 + */ + private void buildMultiTermsAggregation(Aggregation.Builder aggBuilder, AggregationParam aggParam) { + if (aggParam.getFields() != null && !aggParam.getFields().isEmpty()) { + CompositeAggregation.Builder compositeBuilder = new CompositeAggregation.Builder(); + List> sources = new ArrayList<>(); + + for (String field : aggParam.getFields()) { + Map sourceMap = new HashMap<>(); + sourceMap.put(field, CompositeAggregationSource.of(b -> b + .terms(t -> t.field(field)) + )); + sources.add(sourceMap); + } + + compositeBuilder.sources(sources); + + if (aggParam.getSize() != null) { + compositeBuilder.size(aggParam.getSize()); + } + + aggBuilder.composite(compositeBuilder.build()); + } + } + + /** + * 构建术语聚合 + */ + private TermsAggregation buildTermsAggregation(AggregationParam aggParam) { + TermsAggregation.Builder termsBuilder = new TermsAggregation.Builder() + .field(aggParam.getField()) + .size(aggParam.getSize() != null ? aggParam.getSize() : 10); + + if (aggParam.getOrderBy() != null && aggParam.getOrder() != null) { + SortOrder sortOrder = "desc".equalsIgnoreCase(aggParam.getOrder()) ? SortOrder.Desc : SortOrder.Asc; + + if ("_count".equals(aggParam.getOrderBy())) { + termsBuilder.order(List.of(NamedValue.of("_count", sortOrder))); + } else if ("_key".equals(aggParam.getOrderBy())) { + termsBuilder.order(List.of(NamedValue.of("_key", sortOrder))); + } else { + termsBuilder.order(List.of(NamedValue.of(aggParam.getOrderBy(), sortOrder))); + } + } + + return termsBuilder.build(); + } + + /** + * 构建复合聚合 + */ + private CompositeAggregation buildCompositeAggregation(AggregationParam aggParam) { + CompositeAggregation.Builder compositeBuilder = new CompositeAggregation.Builder(); + + if (aggParam.getCompositeSources() != null && !aggParam.getCompositeSources().isEmpty()) { + List> sources = new ArrayList<>(); + + for (Map.Entry source : aggParam.getCompositeSources().entrySet()) { + String fieldName = source.getKey(); + String aggType = source.getValue(); + + Map sourceMap = new HashMap<>(); + switch (aggType.toLowerCase()) { + case "terms" -> { + CompositeTermsAggregation.Builder termsBuilder = new CompositeTermsAggregation.Builder() + .field(fieldName); + + if (aggParam.getOrderBy() != null && aggParam.getOrder() != null) { + SortOrder sortOrder = "desc".equalsIgnoreCase(aggParam.getOrder()) ? SortOrder.Desc : SortOrder.Asc; + termsBuilder.order(sortOrder); + } + + sourceMap.put(fieldName, CompositeAggregationSource.of(b -> b.terms(termsBuilder.build()))); + } + case "histogram" -> { + CompositeHistogramAggregation.Builder histogramBuilder = new CompositeHistogramAggregation.Builder() + .field(fieldName) + .interval(aggParam.getInterval() != null ? aggParam.getInterval() : 100); + + if (aggParam.getOrderBy() != null && aggParam.getOrder() != null) { + SortOrder sortOrder = "desc".equalsIgnoreCase(aggParam.getOrder()) ? SortOrder.Desc : SortOrder.Asc; + histogramBuilder.order(sortOrder); + } + + sourceMap.put(fieldName, CompositeAggregationSource.of(b -> b.histogram(histogramBuilder.build()))); + } + case "date_histogram" -> { + CompositeDateHistogramAggregation.Builder dateHistogramBuilder = new CompositeDateHistogramAggregation.Builder() + .field(fieldName) + .fixedInterval(fixed -> fixed.time(aggParam.getDateInterval() != null ? aggParam.getDateInterval() : "1d")) + .format(aggParam.getFormat() != null ? aggParam.getFormat() : "yyyy-MM-dd"); + + if (aggParam.getOrderBy() != null && aggParam.getOrder() != null) { + SortOrder sortOrder = "desc".equalsIgnoreCase(aggParam.getOrder()) ? SortOrder.Desc : SortOrder.Asc; + dateHistogramBuilder.order(sortOrder); + } + + sourceMap.put(fieldName, CompositeAggregationSource.of(b -> b.dateHistogram(dateHistogramBuilder.build()))); + } + } + sources.add(sourceMap); + } + compositeBuilder.sources(sources); + } + + if (aggParam.getSize() != null) { + compositeBuilder.size(aggParam.getSize()); + } + + if (aggParam.getAfterKey() != null && !aggParam.getAfterKey().isEmpty()) { + compositeBuilder.after(convertFieldValueMap(aggParam.getAfterKey())); + } + + return compositeBuilder.build(); + } + + /** + * 解析聚合搜索结果 + */ + private EsAggResult parseAggSearchResponse(SearchResponse response, AggQueryParam queryParam) { + EsAggResult result = new EsAggResult(); + + long aggTotal = 0L; + Map aggCounts = new HashMap<>(); + + if (response.aggregations() != null) { + Map aggResults = new HashMap<>(); + for (Map.Entry entry : response.aggregations().entrySet()) { + + String aggName = entry.getKey(); + Aggregate aggregate = entry.getValue(); + if (aggregate == null) { + continue; + } + + Long currentAggCount = calculateCurrentPageAggCount(aggregate); + aggCounts.put(aggName, currentAggCount); + + if (aggTotal == 0L && currentAggCount != null && currentAggCount > 0) { + aggTotal = currentAggCount; + } + + aggResults.put(entry.getKey(), parseAggregate(entry.getValue())); + } + result.setAggregations(aggResults); + result.setAggCounts(aggCounts); + } + +// if (isTermsAggregation(queryParam)) { +// try { +// aggTotal = getTermsAggregationTotalCount(queryParam); +// } catch (Exception e) { +// log.error("获取术语聚合总条数失败,使用当前页条数", e); +// } +// } else if (isCompositeAggregation(queryParam)) { +// try { +// aggTotal = getCompositeAggregationTotalCount(queryParam); +// } catch (Exception e) { +// log.error("获取复合聚合总条数失败,使用当前页条数", e); +// } +// } + + if (response.aggregations() != null && queryParam.getAggregations() != null) { + for (String aggName : queryParam.getAggregations().keySet()) { + Aggregate aggregate = response.aggregations().get(aggName); + if (aggregate != null && aggregate.isComposite()) { + CompositeAggregate composite = aggregate.composite(); + if (composite.afterKey() != null) { + result.setAfterKey(parseCompositeAfterKey(composite.afterKey())); + } + } + } + } + + result.setTotal(response.hits().total() != null ? response.hits().total().value() : 0L); + result.setAggTotal(aggTotal); + + return result; + } + + /** + * 计算当前页的聚合结果条数 + */ + private Long calculateCurrentPageAggCount(Aggregate aggregate) { + if (aggregate == null) { + return 0L; + } + + if (aggregate.isSterms()) { + StringTermsAggregate terms = aggregate.sterms(); + return (long) terms.buckets().array().size(); + } else if (aggregate.isLterms()) { + LongTermsAggregate terms = aggregate.lterms(); + return (long) terms.buckets().array().size(); + } else if (aggregate.isDterms()) { + DoubleTermsAggregate terms = aggregate.dterms(); + return (long) terms.buckets().array().size(); + } else if (aggregate.isComposite()) { + CompositeAggregate composite = aggregate.composite(); + return (long) composite.buckets().array().size(); + } else if (aggregate.isFilter()) { + return 1L; + } else if (aggregate.isNested()) { + return 1L; + } else if (aggregate.isStats()) { + return 1L; + } + + return 0L; + } + + /** + * 解析聚合结果 + */ + private Object parseAggregate(Aggregate aggregate) { + if (aggregate.isSterms()) { + return parseTermsAggregate(aggregate.sterms()); + } else if (aggregate.isComposite()) { + return parseCompositeAggregate(aggregate.composite()); + } else if (aggregate.isStats()) { + return parseStatsAggregate(aggregate.stats()); + } else if (aggregate.isSum()) { + return aggregate.sum().value(); + } else if (aggregate.isAvg()) { + return aggregate.avg().value(); + } else if (aggregate.isFilter()) { + return parseFilterAggregate(aggregate.filter()); + } + return null; + } + + private List> parseTermsAggregate(StringTermsAggregate terms) { + List> buckets = new ArrayList<>(); + for (StringTermsBucket bucket : terms.buckets().array()) { + Map bucketMap = new HashMap<>(); + bucketMap.put("key", bucket.key().stringValue()); + bucketMap.put("docCount", bucket.docCount()); + + if (bucket.aggregations() != null) { + Map subAggs = new HashMap<>(); + for (Map.Entry entry : bucket.aggregations().entrySet()) { + subAggs.put(entry.getKey(), parseAggregate(entry.getValue())); + } + bucketMap.put("subAggregations", subAggs); + } + + buckets.add(bucketMap); + } + return buckets; + } + + private List> parseCompositeAggregate(CompositeAggregate composite) { + List> buckets = new ArrayList<>(); + for (CompositeBucket bucket : composite.buckets().array()) { + Map bucketMap = new HashMap<>(); + + Map keyMap = parseCompositeKey(bucket.key()); + bucketMap.put("key", keyMap); + bucketMap.put("docCount", bucket.docCount()); + + buckets.add(bucketMap); + } + return buckets; + } + + /** + * 解析复合聚合的key + */ + private Map parseCompositeKey(Map key) { + Map result = new HashMap<>(); + for (Map.Entry entry : key.entrySet()) { + FieldValue fieldValue = entry.getValue(); + if (fieldValue.isString()) { + result.put(entry.getKey(), fieldValue.stringValue()); + } else if (fieldValue.isLong()) { + result.put(entry.getKey(), fieldValue.longValue()); + } else if (fieldValue.isDouble()) { + result.put(entry.getKey(), fieldValue.doubleValue()); + } else if (fieldValue.isBoolean()) { + result.put(entry.getKey(), fieldValue.booleanValue()); + } else { + result.put(entry.getKey(), fieldValue.toString()); + } + } + return result; + } + + /** + * 解析复合聚合的after_key + */ + private Map parseCompositeAfterKey(Map afterKey) { + return parseCompositeKey(afterKey); + } + + private Map parseStatsAggregate(StatsAggregate stats) { + Map statsMap = new HashMap<>(); + statsMap.put("count", stats.count()); + statsMap.put("min", stats.min()); + statsMap.put("max", stats.max()); + statsMap.put("avg", stats.avg()); + statsMap.put("sum", stats.sum()); + return statsMap; + } + + private Object parseFilterAggregate(FilterAggregate filter) { + Map filterResult = new HashMap<>(); + filterResult.put("docCount", filter.docCount()); + return filterResult; + } + + private Map convertFieldValueMap(Map objectMap) { + HashMap fieldMap = new HashMap<>(); + for (Map.Entry entry : objectMap.entrySet()) { + fieldMap.put(entry.getKey(), convertValue(entry.getValue())); + } + return fieldMap; + } + + private FieldValue convertValue(Object value) { + return FieldValue.of(value); + } + + private Object toJavaObject(FieldValue value) { + if (value == null) { + return null; + } + if (value.isString()) { + return value.stringValue(); + } else if (value.isLong()) { + return value.longValue(); + } else if (value.isDouble()) { + return value.doubleValue(); + } else if (value.isBoolean()) { + return value.booleanValue(); + } else if (value.isAny()) { + return value.anyValue(); + } + + return null; + } + + private Map convertObject(Map objectValue) { + Map result = new HashMap<>(); + for (Map.Entry entry : objectValue.entrySet()) { + result.put(entry.getKey(), toJavaObject(entry.getValue())); + } + return result; + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/utils/ElasticsearchUtil.java b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/utils/ElasticsearchUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..383784b67f0b24ad3f6994307536ac5a6f5ec04e --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-elastic/src/main/java/com/ruoyi/elastic/utils/ElasticsearchUtil.java @@ -0,0 +1,254 @@ +package com.ruoyi.elastic.utils; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.FieldValue; +import co.elastic.clients.elasticsearch._types.Refresh; +import co.elastic.clients.elasticsearch._types.SortOrder; +import co.elastic.clients.elasticsearch._types.query_dsl.*; +import co.elastic.clients.elasticsearch.core.*; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.util.ObjectBuilder; +import com.ruoyi.elastic.model.param.QueryParam; +import com.ruoyi.elastic.model.vo.ESPageResult; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.*; + + +@Slf4j +@Component +public class ElasticsearchUtil { + + @Resource + private ElasticsearchClient elasticsearchClient; + + public boolean batchInsert(String indexName, List documents) throws IOException { + BulkRequest.Builder builder = new BulkRequest.Builder(); + + for (T document : documents) { + builder.operations(op -> op + .index(idx -> idx + .index(indexName) + .document(document) + )); + } + + // 不立即刷新 + builder.refresh(Refresh.False); + // 设置等待活动分片数 + builder.waitForActiveShards(w -> w.count(1)); + // 30s 超时 + builder.timeout(t -> t.time("30s")); + // 是否设置详细结果 + builder.requireAlias(false); + + BulkResponse response = elasticsearchClient.bulk(builder.build()); + return handleBulkResponse(response); + } + + private boolean handleBulkResponse(BulkResponse response) { + if (response.errors()) { + log.error("插入数据错误"); + + for (BulkResponseItem item : response.items()) { + if (item.error() != null) { + log.error(item.error().toString()); + } + } + return false; + } else { + log.info("文档插入成功, 处理耗时: {}ms, 处理文档数量: {}", response.took(), response.items().size()); + return true; + } + } + + + /** + * 多参数查询 + * + * @param queryParam 索引名称、字段名称:字段值、页码、单页数据条数、查询方式match/term + * @param clazz 返回结果映射类 + * @return boolean + */ + public ESPageResult multiQuery(QueryParam queryParam, Class clazz) { + try { + SearchRequest searchRequest = buildSearchRequest(queryParam); + SearchResponse response = elasticsearchClient.search(searchRequest, clazz); + return buildPageResult(response, queryParam); + } catch (Exception e) { + log.error(e.getMessage()); + return ESPageResult.empty(queryParam.getPage(), queryParam.getSize()); + } + } + + /** + * 构建搜索请求 + */ + private SearchRequest buildSearchRequest(QueryParam queryParam) { + SearchRequest.Builder builder = new SearchRequest.Builder() + .index(queryParam.getIndexName()) + .query(q -> buildBoolQuery(q, queryParam)) + .from((queryParam.getPage() - 1) * queryParam.getSize()) + .size(queryParam.getSize()); + + addSortFields(builder, queryParam); + + return builder.build(); + } + + /** + * 构建布尔查询 + */ + private ObjectBuilder buildBoolQuery(Query.Builder queryBuilder, QueryParam queryParam) { + BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); + + if (queryParam.getFieldValueMap() != null && !queryParam.getFieldValueMap().isEmpty()) { + for (Map.Entry entry : queryParam.getFieldValueMap().entrySet()) { + if (entry.getValue() != null) { + Query query; + if (queryParam.getQueryType().equals("term")) { + query = TermQuery.of(m -> m + .field(entry.getKey()) + .value(FieldValue.of(entry.getValue().toString())) + )._toQuery(); + } else { + query = MatchQuery.of(m -> m + .field(entry.getKey()) + .query(FieldValue.of(entry.getValue().toString())) + )._toQuery(); + } + + // 根据operator参数决定使用must(AND)还是should(OR) + if ("OR".equalsIgnoreCase(queryParam.getOperator())) { + boolBuilder.should(query); + } else { + boolBuilder.must(query); + } + } + } + } + + // 处理时间范围查询(新增逻辑) + if (queryParam.getRanges() != null && !queryParam.getRanges().isEmpty()) { + for (Map.Entry> entry : queryParam.getRanges().entrySet()) { + String field = entry.getKey(); + Map range = entry.getValue(); + + RangeQuery rangeQuery = RangeQuery.of(r -> r + .date(n -> { + n.field(field).format(queryParam.getFormat()); + + if (range.containsKey("gt") && range.containsKey("gte")) { + throw new IllegalArgumentException("'大于'和'大于等于'只能包含一个"); + } + if (range.containsKey("lt") && range.containsKey("lte")) { + throw new IllegalArgumentException("'小于'和'小于等于'只能包含一个"); + } + + if (range.containsKey("gt")) { + n.gt(range.get("gt")); + } + if (range.containsKey("gte")) { + n.gte(range.get("gte")); + } + if (range.containsKey("lte")) { + n.lte(range.get("lte")); + } + if (range.containsKey("lt")) { + n.lt(range.get("lt")); + } + + return n; + } + ) + ); + + // 根据operator参数决定使用must(AND)还是should(OR) + if ("OR".equalsIgnoreCase(queryParam.getOperator())) { + boolBuilder.should(rangeQuery._toQuery()); + } else { + boolBuilder.must(rangeQuery._toQuery()); + } + } + } + + return queryBuilder.bool(boolBuilder.build()); + } + + /** + * 添加排序字段 + */ + private void addSortFields(SearchRequest.Builder builder, QueryParam queryParam) { + // 动态添加排序字段, 注意text会进行分词无法直接排序,需要使用keyword + if (queryParam.getSortFields() != null && !queryParam.getSortFields().isEmpty()) { + for (Map.Entry sortEntry : queryParam.getSortFields().entrySet()) { + String fieldName = sortEntry.getKey(); + SortOrder order = sortEntry.getValue(); + builder.sort(sort -> sort.field(f -> f.field(fieldName).order(order))); + } + } else { + // 默认按_score降序 + builder.sort(s -> s.field(f -> f.field("_score").order(SortOrder.Desc))); + } + } + + /** + * 通过反射设置字段 + */ + private void setFieldValue(Object obj, Object value) { + try { + Field field = obj.getClass().getDeclaredField("id"); + field.setAccessible(true); + field.set(obj, value); + } catch (Exception e) { + log.error(e.getMessage()); + } + } + + /** + * 构建分页结果 + */ + private ESPageResult buildPageResult(SearchResponse response, QueryParam queryParam) { + List records = response.hits().hits().stream() + .map(hit -> { + T source = hit.source(); + if (source != null) { + setFieldValue(source, hit.id()); + } + return source; + }) + .filter(Objects::nonNull) + .toList(); + Long total = response.hits().total() != null ? response.hits().total().value() : 0L; + return new ESPageResult<>( + records, + total, + queryParam.getPage(), + queryParam.getSize(), + "success" + ); + } + + /** + * 获取总数 + */ + public Long getTotalCount(QueryParam queryParam, Class clazz) { + try { + SearchRequest searchRequest = SearchRequest.of(s -> s + .index(queryParam.getIndexName()) + .query(q -> buildBoolQuery(q, queryParam)) + .size(0) + ); + SearchResponse response = elasticsearchClient.search(searchRequest, clazz); + return response.hits().total() != null ? response.hits().total().value() : 0L; + } catch (Exception e) { + log.error(e.getMessage()); + return 0L; + } + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/pom.xml b/ruoyi-geek-plugins/ruoyi-kafka/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..1c9dcce259683c05042e80fa2943cf5f7c02ee47 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/pom.xml @@ -0,0 +1,46 @@ + + + + ruoyi-geek-plugins + com.ruoyi.geekxd + 3.9.0-G + + 4.0.0 + + ruoyi-kafka + + + kafka模块 + + + + + com.ruoyi.geekxd + ruoyi-framework + + + + org.springframework.kafka + spring-kafka + + + + com.fasterxml.jackson.core + jackson-databind + + + + org.apache.kafka + kafka-clients + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/JacksonConfig.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/JacksonConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..3826af389dafd358e07163827914d8895ec2a945 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/JacksonConfig.java @@ -0,0 +1,41 @@ +package com.ruoyi.kafka.config; + + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.text.SimpleDateFormat; +import java.util.TimeZone; + +@Configuration +public class JacksonConfig { + + @Bean + public ObjectMapper objectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + + // 序列化配置 + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + + // 反序列化配置 + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + objectMapper.enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE); + + // 日期时间配置 + objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); + objectMapper.setTimeZone(TimeZone.getTimeZone("GMT+8")); + + // Java 8 时间模块 + JavaTimeModule javaTimeModule = new JavaTimeModule(); + objectMapper.registerModule(javaTimeModule); + + return objectMapper; + } +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/KafkaConfig.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/KafkaConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..be1d55b7ad846a8aa4f171aa487c9baca02e2507 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/KafkaConfig.java @@ -0,0 +1,117 @@ +package com.ruoyi.kafka.config; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.backoff.FixedBackOff; + +import java.util.HashMap; +import java.util.Map; + +@Data +@Configuration +@ConditionalOnProperty(prefix = "spring.kafka", name = { "enable" }, havingValue = "true", matchIfMissing = false) +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + + @Value("${spring.kafka.template.default-topic}") + private String defaultTopic; + + @Bean + public ProducerFactory producerFactory(ObjectMapper objectMapper) { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(ProducerConfig.ACKS_CONFIG, "all"); + configProps.put(ProducerConfig.RETRIES_CONFIG, 3); + configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1000); + configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); + configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); + + // Jackson 序列化配置 + configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configProps); + factory.setValueSerializer(new JsonSerializer<>(objectMapper)); + + return factory; + } + + @Bean + public ConsumerFactory consumerFactory(ObjectMapper objectMapper) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 5000); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); + + // Jackson 反序列化配置 + props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Object.class); + + DefaultKafkaConsumerFactory factory = + new DefaultKafkaConsumerFactory<>(props); + factory.setValueDeserializer(new JsonDeserializer<>(objectMapper)); + + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory, + KafkaTemplate kafkaTemplate) { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(3); + factory.getContainerProperties().setPollTimeout(3000); + + // 配置错误处理 + DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(kafkaTemplate); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(recover, new FixedBackOff(1000L, 3L)); + errorHandler.addNotRetryableExceptions(IllegalArgumentException.class); + + factory.setCommonErrorHandler(errorHandler); + return factory; + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + KafkaTemplate template = new KafkaTemplate<>(producerFactory); + template.setObservationEnabled(true); + return template; + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/KafkaConfigValidator.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/KafkaConfigValidator.java new file mode 100644 index 0000000000000000000000000000000000000000..8e3ea425bc8860381f2da91f1035c5f5f6bdb855 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/config/KafkaConfigValidator.java @@ -0,0 +1,133 @@ +package com.ruoyi.kafka.config; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.kafka.core.ConsumerFactory; + +import java.util.Map; + +@Configuration +@ConditionalOnProperty(prefix = "spring.kafka", name = { "enable" }, havingValue = "true", matchIfMissing = false) +public class KafkaConfigValidator implements ApplicationRunner { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfigValidator.class); + + private final Environment environment; + private final ConsumerFactory consumerFactory; + + @Value("${spring.kafka.bootstrap-servers:未设置}") + private String configuredBootstrapServers; + + public KafkaConfigValidator(Environment environment, ConsumerFactory consumerFactory) { + this.environment = environment; + this.consumerFactory = consumerFactory; + } + + @Override + public void run(ApplicationArguments args) throws Exception { + validateKafkaConfiguration(); + } + + public void validateKafkaConfiguration() { + logger.info("=== Kafka配置验证开始 ==="); + + // 1. 检查配置文件中的地址 + printConfigurationSources(); + + // 2. 检查实际ConsumerFactory使用的地址 + printConsumerFactoryConfiguration(); + + // 3. 检查环境变量和系统属性 + printEnvironmentVariables(); + + logger.info("=== Kafka配置验证结束 ==="); + } + + private void printConfigurationSources() { + logger.info("1. 配置文件中的Kafka地址:"); + logger.info(" - spring.kafka.bootstrap-servers: {}", configuredBootstrapServers); + + // 检查其他可能的配置源 + String[] configKeys = { + "spring.kafka.bootstrap-servers", + "kafka.bootstrap-servers", + "bootstrap.servers", + "KAFKA_BOOTSTRAP_SERVERS" + }; + + for (String key : configKeys) { + String value = environment.getProperty(key); + if (value != null) { + logger.info(" - {}: {}", key, value); + } + } + } + + private void printConsumerFactoryConfiguration() { + logger.info("2. ConsumerFactory实际配置:"); + try { + Map configs = consumerFactory.getConfigurationProperties(); + String actualBootstrapServers = (String) configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + String actualGroupId = (String) configs.get(ConsumerConfig.GROUP_ID_CONFIG); + + logger.info(" - 实际Bootstrap Servers: {}", actualBootstrapServers); + logger.info(" - 实际Group ID: {}", actualGroupId); + logger.info(" - 配置是否匹配: {}", + configuredBootstrapServers.equals(actualBootstrapServers) ? "是" : "否"); + + if (!configuredBootstrapServers.equals(actualBootstrapServers)) { + logger.error(" ⚠️ 配置不匹配! 期望: {}, 实际: {}", + configuredBootstrapServers, actualBootstrapServers); + } + + } catch (Exception e) { + logger.error("获取ConsumerFactory配置失败: {}", e.getMessage()); + } + } + + private void printEnvironmentVariables() { + logger.info("3. 环境变量和系统属性:"); + + // 检查环境变量 + String[] envVars = { + "KAFKA_BOOTSTRAP_SERVERS", + "SPRING_KAFKA_BOOTSTRAP_SERVERS", + "BOOTSTRAP_SERVERS" + }; + + for (String envVar : envVars) { + String value = System.getenv(envVar); + if (value != null) { + logger.info(" - 环境变量 {}: {}", envVar, value); + } + } + + // 检查系统属性 + String[] systemProps = { + "spring.kafka.bootstrap-servers", + "kafka.bootstrap.servers" + }; + + for (String prop : systemProps) { + String value = System.getProperty(prop); + if (value != null) { + logger.info(" - 系统属性 {}: {}", prop, value); + } + } + } + + /** + * 手动触发配置验证 + */ + public void manualValidate() { + validateKafkaConfiguration(); + } +} + diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/controller/KafkaController.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/controller/KafkaController.java new file mode 100644 index 0000000000000000000000000000000000000000..8397fd481a5272203eb6e6a8c92a5ebb14717060 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/controller/KafkaController.java @@ -0,0 +1,35 @@ +package com.ruoyi.kafka.controller; + +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.kafka.model.param.DemoParam; +import com.ruoyi.kafka.service.KafkaProducerService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +@RestController +@RequestMapping("/kafka") +public class KafkaController { + + @Resource + private KafkaProducerService kafkaProducerService; + + public KafkaController(KafkaProducerService kafkaProducerService) { + this.kafkaProducerService = kafkaProducerService; + } + + @PostMapping("/alarm") + public AjaxResult createAlarm(@RequestBody DemoParam param) { + try { + log.info("Creating alarm: {}", param); + CompletableFuture future = kafkaProducerService.sendDemoData(param, "alarm-topic"); + return AjaxResult.success(future); + } catch (Exception e) { + return AjaxResult.error(e.getMessage()); + } + } + +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/param/DemoParam.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/param/DemoParam.java new file mode 100644 index 0000000000000000000000000000000000000000..fb2ac9afd8b9d84dc7c9ee227cb1923d5a6214cb --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/param/DemoParam.java @@ -0,0 +1,22 @@ +package com.ruoyi.kafka.model.param; + +import lombok.Data; + + +@Data +public class DemoParam { + private String syslog_version; + private String dev_code; + private Integer log_type; + private Integer sub_type; + private Integer line_id; + private Integer iface_type; + private Integer vlan_id; + private String attack_mac; + private String attack_ip; + private Integer attack_port; + private String response_ip; + private Integer response_port; + private Integer proto; + private String time; +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/param/OrderEvent.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/param/OrderEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..9457b7ae9ae2c5ced6d9941d6552141b25b42f42 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/param/OrderEvent.java @@ -0,0 +1,29 @@ +package com.ruoyi.kafka.model.param; + +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +public class OrderEvent { + private String orderId; + private String status; + private LocalDateTime timestamp; + private Double amount; + + public OrderEvent() { + } + + public OrderEvent(String orderId, String status, Double amount) { + this.orderId = orderId; + this.status = status; + this.amount = amount; + this.timestamp = LocalDateTime.now(); + } + + @Override + public String toString() { + return "OrderEvent{orderId='" + orderId + "', status='" + status + + "', amount=" + amount + ", timestamp=" + timestamp + "}"; + } +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/vo/OrderRequest.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/vo/OrderRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..7dde4664c17251673d41e76f31e3460772d82f32 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/model/vo/OrderRequest.java @@ -0,0 +1,13 @@ +package com.ruoyi.kafka.model.vo; + +import lombok.Data; + +import java.math.BigDecimal; + +@Data +public class OrderRequest { + private String customerId; + private BigDecimal amount; + private String productId; + private Integer quantity; +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/service/KafkaConsumerService.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/service/KafkaConsumerService.java new file mode 100644 index 0000000000000000000000000000000000000000..e58c07f62663adb2e23b615271c914ecd4883472 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/service/KafkaConsumerService.java @@ -0,0 +1,62 @@ +package com.ruoyi.kafka.service; + +import com.alibaba.druid.support.json.JSONUtils; +import com.alibaba.fastjson2.JSON; +import com.ruoyi.kafka.model.param.DemoParam; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; + + +@Slf4j +@Service +public class KafkaConsumerService { + + // @KafkaListener(topics = "#{@kafkaConfig.defaultTopic}") + public void listenDemoData(ConsumerRecord record) { + + try { + log.info("Received message from partition: {}, offset: {}", record.partition(), record.offset()); + log.info("Processing data event: {}", record.value()); + String jsonString = JSONUtils.toJSONString(record.value()); + DemoParam demoParam = JSON.parseObject(jsonString, DemoParam.class); + // 业务逻辑处理 + processData(demoParam); + + // 手动提交偏移量 + //ack.acknowledge(); + log.info("Message processed successfully"); + + } catch (Exception e) { + log.error("Error processing order event", e); + // 异常会由 CommonErrorHandler 处理,进行重试或发送到死信队列 + throw new RuntimeException("Order processing failed", e); + } + } + + + private void processData(DemoParam param) { + + // 正常的业务处理 + log.info("attack_ip: {}, response_ip: {}", + param.getAttack_ip(), param.getResponse_ip()); + + // 这里可以添加数据库操作、调用其他服务等 + try { + Thread.sleep(10); // 模拟处理时间 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // 死信队列监听器 + // @KafkaListener(topics = "alarm-topic.DLT", groupId = "dlt-processing-group") + public void listenDlt(DemoParam param, Acknowledgment ack) { + log.warn("Processing message from DLT: {}", param); + // 死信消息的特殊处理逻辑 + ack.acknowledge(); + } +} + diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/service/KafkaProducerService.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/service/KafkaProducerService.java new file mode 100644 index 0000000000000000000000000000000000000000..c83fc4562ca8619e5ff044f6c02328727b0a3f74 --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/service/KafkaProducerService.java @@ -0,0 +1,212 @@ +package com.ruoyi.kafka.service; + +import com.ruoyi.kafka.model.param.OrderEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Service +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + /** + * 发送数据(异步方式) + */ + @Async + public CompletableFuture> sendDemoData(Object message, String topic) { + return sendMessage(topic, "default", message); + } + + /** + * 发送通用消息到指定主题 + */ + @Async + public CompletableFuture> sendMessage(String topic, String key, Object message) { + try { + log.info("准备发送消息到主题: {}, Key: {}", topic, key); + + CompletableFuture> future = + kafkaTemplate.send(topic, key, message); + + future.whenComplete((result, ex) -> { + if (ex == null) { + log.info("消息发送成功 - 主题: {}, 分区: {}, 偏移量: {}", + topic, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } else { + log.error("消息发送失败 - 主题: {}, Key: {}, 错误: {}", + topic, key, ex.getMessage()); + // 这里可以添加重试逻辑或告警 + } + }); + + return future; + + } catch (Exception e) { + log.error("发送消息异常 - 主题: {}, Key: {}", topic, key, e); + CompletableFuture> failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(e); + return failedFuture; + } + } + + /** + * 修正的事务方式发送消息 - 使用正确的回调方式 + */ + @Transactional + public void sendMessageInTransaction(String topic, Object message) { + try { + kafkaTemplate.executeInTransaction(operations -> { + // 在事务中发送消息,使用正确的回调方式 + CompletableFuture> future = operations.send(topic, message); + + // 使用 whenComplete 处理回调 + future.whenComplete((result, ex) -> { + if (ex == null) { + log.info("事务消息发送成功 - 主题: {}, 分区: {}, 偏移量: {}", + topic, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } else { + log.error("事务消息发送失败 - 主题: {}, 错误: {}", topic, ex.getMessage()); + } + }); + + return future; + }); + } catch (Exception e) { + log.error("事务消息发送异常", e); + throw new RuntimeException("事务消息发送失败", e); + } + } + + /** + * 事务方式发送消息(带回调参数) + */ + @Transactional + public CompletableFuture> sendMessageInTransactionWithCallback( + String topic, String key, Object message) { + + return kafkaTemplate.executeInTransaction(operations -> { + CompletableFuture> future = operations.send(topic, key, message); + + future.whenComplete((result, ex) -> { + if (ex == null) { + log.info("事务回调消息发送成功 - 主题: {}, Key: {}, 分区: {}, 偏移量: {}", + topic, key, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } else { + log.error("事务回调消息发送失败 - 主题: {}, Key: {}, 错误: {}", + topic, key, ex.getMessage()); + } + }); + + return future; + }); + } + + /** + * 发送带回调的消息(非事务) + */ + public void sendMessageWithCallback(String topic, String key, Object message) { + kafkaTemplate.send(topic, key, message) + .whenComplete((result, ex) -> { + if (ex == null) { + log.info("回调消息发送成功 - 主题: {}, 分区: {}, 偏移量: {}", + topic, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } else { + log.error("回调消息发送失败 - 主题: {}, Key: {}", topic, key, ex); + } + }); + } + + /** + * 批量发送消息 + */ + public void sendBatchMessages(String topic, Iterable messages) { + int count = 0; + for (Object message : messages) { + sendMessageWithCallback(topic, "batch-" + count++, message); + } + log.info("批量发送完成,共发送 {} 条消息到主题: {}", count, topic); + } + + /** + * 发送高优先级订单事件 + */ + public void sendHighPriorityOrder(OrderEvent orderEvent) { + // 高优先级订单发送到独立主题 + sendMessage("order-events-priority", orderEvent.getOrderId(), orderEvent) + .thenAccept(result -> { + log.info("高优先级订单发送成功: {}", orderEvent.getOrderId()); + }) + .exceptionally(ex -> { + log.error("高优先级订单发送失败: {}", orderEvent.getOrderId(), ex); + return null; + }); + } + + /** + * 获取Kafka模板状态信息 + */ + public String getProducerMetrics() { + // 实际实现中可以获取和返回Kafka生产者的监控指标 + return "Kafka生产者运行正常"; + } + + /** + * 事务方式发送订单事件 + */ + @Transactional + public void sendOrderEventInTransaction(OrderEvent orderEvent) { + sendMessageInTransaction("order-events", orderEvent); + } + + /** + * 安全的发送消息方法,包含重试机制 + */ + public CompletableFuture> sendMessageSafely( + String topic, String key, Object message, int maxRetries) { + + return sendMessageWithRetry(topic, key, message, maxRetries, 0); + } + + private CompletableFuture> sendMessageWithRetry( + String topic, String key, Object message, int maxRetries, int attempt) { + + CompletableFuture> future = kafkaTemplate.send(topic, key, message); + + return future.exceptionallyCompose(ex -> { + if (attempt < maxRetries) { + log.warn("消息发送失败,进行第 {} 次重试 - 主题: {}, Key: {}", + attempt + 1, topic, key); + try { + Thread.sleep(1000L * (attempt + 1)); // 指数退避 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return CompletableFuture.failedFuture(e); + } + return sendMessageWithRetry(topic, key, message, maxRetries, attempt + 1); + } else { + log.error("消息发送失败,已达到最大重试次数 {} - 主题: {}, Key: {}", + maxRetries, topic, key, ex); + return CompletableFuture.failedFuture(ex); + } + }); + } +} diff --git a/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/utils/JacksonSerializerUtil.java b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/utils/JacksonSerializerUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..876b7529249c8e8dd7d7fbdb41594b56a2b4794b --- /dev/null +++ b/ruoyi-geek-plugins/ruoyi-kafka/src/main/java/com/ruoyi/kafka/utils/JacksonSerializerUtil.java @@ -0,0 +1,69 @@ +package com.ruoyi.kafka.utils; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +@Component +public class JacksonSerializerUtil { + + private static final Logger logger = LoggerFactory.getLogger(JacksonSerializerUtil.class); + + private final ObjectMapper objectMapper; + + public JacksonSerializerUtil(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public String serialize(Object obj) { + try { + return objectMapper.writeValueAsString(obj); + } catch (JsonProcessingException e) { + logger.error("Serialization error for object: {}", obj, e); + throw new RuntimeException("Serialization failed", e); + } + } + + public T deserialize(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (IOException e) { + logger.error("Deserialization error for JSON: {}", json, e); + throw new RuntimeException("Deserialization failed", e); + } + } + + public T deserialize(String json, TypeReference typeReference) { + try { + return objectMapper.readValue(json, typeReference); + } catch (IOException e) { + logger.error("Deserialization error for JSON: {}", json, e); + throw new RuntimeException("Deserialization failed", e); + } + } + + public String toPrettyJson(Object obj) { + try { + return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj); + } catch (JsonProcessingException e) { + logger.error("Pretty JSON serialization error", e); + return serialize(obj); + } + } + + public boolean isValidJson(String json) { + try { + objectMapper.readTree(json); + return true; + } catch (IOException e) { + return false; + } + } +} + diff --git a/ruoyi-geek-plugins/ruoyi-plugins-starter/pom.xml b/ruoyi-geek-plugins/ruoyi-plugins-starter/pom.xml index d30f293d589abd659a344a1c8f13d41b586fea21..a0fa6e117f00cce58fc0cabe35822abd351ccbe9 100644 --- a/ruoyi-geek-plugins/ruoyi-plugins-starter/pom.xml +++ b/ruoyi-geek-plugins/ruoyi-plugins-starter/pom.xml @@ -60,6 +60,16 @@ ruoyi-rabbitmq + + com.ruoyi.geekxd + ruoyi-elastic + + + + com.ruoyi.geekxd + ruoyi-kafka + +