diff --git a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java index b9b75b74fe984b94bc30f095f8f6ce86a6fff079..1fa4800e65909476341d780463a86ea9b9d90884 100644 --- a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java +++ b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java @@ -444,6 +444,7 @@ public class Steam extends AbstractStreamWrapper> * @param mapper 操作,返回流 * @return 返回叠加拆分操作后的流 */ + @Override public Steam mapMulti(BiConsumer> mapper) { Objects.requireNonNull(mapper); return flatMap( @@ -479,7 +480,7 @@ public class Steam extends AbstractStreamWrapper> })) .parallel(); } else { - Set exists = new HashSet<>(); + Set exists = new LinkedHashSet<>(); return of(stream.filter(e -> exists.add(keyExtractor.apply(e)))); } } @@ -1069,4 +1070,85 @@ public class Steam extends AbstractStreamWrapper> public Steam onClose(Runnable closeHandler) { return super.onClose(closeHandler); } + + /** + * 将流分组为指定大小的子流,最后一组可能小于指定大小 与split方法类似,但这个方法保证每组大小相等(最后一组除外) + * + * @param size 每组大小 + * @return 分组后的流 + */ + public Steam> chunk(int size) { + if (size <= 0) { + throw new IllegalArgumentException("Size must be greater than 0"); + } + + return Steam.of( + () -> + new Iterator<>() { + private final Iterator iterator = iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public List next() { + List chunk = new ArrayList<>(size); + for (int i = 0; i < size && iterator.hasNext(); i++) { + chunk.add(iterator.next()); + } + return chunk; + } + }); + } + + /** + * 将流转换为无限循环流 当到达流的末尾时,重新从开始处开始 用户可以使用 limit() 来限制元素数量 + * + * @return 无限循环的流 + */ + public Steam cycle() { + List list = toList(); + if (list.isEmpty()) { + return empty(); + } + + return Steam.iterate(0, i -> true, i -> i + 1).map(i -> list.get(i % list.size())); + } + + /** + * 对流中的元素进行滑动窗口操作 + * + * @param windowSize 窗口大小 + * @param step 步长 + * @return 滑动窗口流 + */ + public Steam> sliding(int windowSize, int step) { + if (windowSize <= 0 || step <= 0) { + throw new IllegalArgumentException("Window size and step must be greater than 0"); + } + + List source = toList(); + return Steam.of( + () -> + new Iterator>() { + private int index = 0; + + @Override + public boolean hasNext() { + return index + windowSize <= source.size(); + } + + @Override + public List next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + List window = source.subList(index, index + windowSize); + index += step; + return window; + } + }); + } } diff --git a/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java b/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java index 38f3654d1a535542b36fbc1b1c353ded89ea4edb..6cde89d1dff6f067b432b243f9506bacd37ef69f 100644 --- a/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java +++ b/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java @@ -426,4 +426,95 @@ class SteamTest { List list = asList(0, 1, 2); Assertions.assertEquals(asList(1, 2, 3), Steam.of(list).map(i -> i + 1).log().toList()); } + + @Test + void testChunk() { + List list = Arrays.asList(1, 2, 3, 4, 5); + + // 测试正常分组 + List> chunks = Steam.of(list).chunk(2).toList(); + Assertions.assertEquals( + Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5)), chunks); + + // 测试size为1的情况 + chunks = Steam.of(list).chunk(1).toList(); + Assertions.assertEquals( + Arrays.asList( + Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5)), + chunks); + + // 测试size大于列表长度的情况 + chunks = Steam.of(list).chunk(10).toList(); + Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2, 3, 4, 5)), chunks); + + // 测试非法参数 + Assertions.assertThrows(IllegalArgumentException.class, () -> Steam.of(list).chunk(0).toList()); + } + + @Test + public void testCycle() { + // 测试普通循环 + List result = Steam.of(1, 2, 3).cycle().limit(8).toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2), result); + + // 测试空流 + List emptyResult = Steam.empty().cycle().limit(5).toList(); + Assertions.assertTrue(emptyResult.isEmpty()); + + // 测试单个元素 + List singleResult = Steam.of("a").cycle().limit(3).toList(); + Assertions.assertEquals(Arrays.asList("a", "a", "a"), singleResult); + + // 测试不限制长度时是否能正常获取前几个元素 + List unlimitedResult = + Steam.of(1, 2) + .cycle() + .peek( + i -> { + // 确保流确实在运行,但不会无限运行 + Assertions.assertTrue(i <= 2); + }) + .limit(4) + .toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 1, 2), unlimitedResult); + } + + @Test + void testSliding() { + List list = Arrays.asList(1, 2, 3, 4, 5); + + // 测试正常滑动窗口 + List> windows = Steam.of(list).sliding(3, 1).toList(); + Assertions.assertEquals( + Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 4), Arrays.asList(3, 4, 5)), + windows); + + // 测试步长为2的滑动窗口 + windows = Steam.of(list).sliding(2, 2).toList(); + Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)), windows); + + // 测试非法参数 + Assertions.assertThrows( + IllegalArgumentException.class, () -> Steam.of(list).sliding(0, 1).toList()); + Assertions.assertThrows( + IllegalArgumentException.class, () -> Steam.of(list).sliding(1, 0).toList()); + } + + @Test + void testDistinctOptimized() { + List list = Arrays.asList(1, 2, 2, 3, 3, 3, 1); + + // 测试顺序流去重 + List result = Steam.of(list).distinct(String::valueOf).toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 3), result); + + // 测试并行流去重 + result = Steam.of(list).parallel().distinct(String::valueOf).toList(); + Assertions.assertEquals(3, result.size()); + Assertions.assertTrue(result.containsAll(Arrays.asList(1, 2, 3))); + } }