From fcd5e4913131c8c6a44a74bc124944c0caef7333 Mon Sep 17 00:00:00 2001 From: Cizaii Date: Thu, 31 Oct 2024 13:32:00 +0800 Subject: [PATCH 1/3] =?UTF-8?q?###=20=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. **chunk(int size)** - 将流分组为固定大小的子流 - 与现有的 `split` 方法类似,但保证每组大小相等(最后一组除外) - 支持并行流操作 - 当 size <= 0 时抛出 IllegalArgumentException 2. **nth(long n)** - 获取流中第 n 个元素 - 支持获取指定位置的元素 - 当 n < 0 时返回 Optional.empty() 3. **cycle(int times)** - 将流转换为循环流 - 支持将原始流重复指定次数 - 当 times <= 0 或原始流为空时返回空流 4. **sliding(int windowSize, int step)** - 滑动窗口操作 - 支持指定窗口大小和步长的滑动窗口操作 - 当 windowSize 或 step <= 0 时抛出 IllegalArgumentException --- .../streamquery/stream/core/stream/Steam.java | 105 +++++++++++++++- .../stream/core/stream/SteamTest.java | 119 ++++++++++++++++++ 2 files changed, 223 insertions(+), 1 deletion(-) diff --git a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java index b9b75b74..35064d3e 100644 --- a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java +++ b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java @@ -444,6 +444,7 @@ public class Steam extends AbstractStreamWrapper> * @param mapper 操作,返回流 * @return 返回叠加拆分操作后的流 */ + @Override public Steam mapMulti(BiConsumer> mapper) { Objects.requireNonNull(mapper); return flatMap( @@ -479,7 +480,7 @@ public class Steam extends AbstractStreamWrapper> })) .parallel(); } else { - Set exists = new HashSet<>(); + Set exists = new LinkedHashSet<>(); return of(stream.filter(e -> exists.add(keyExtractor.apply(e)))); } } @@ -1069,4 +1070,106 @@ public class Steam extends AbstractStreamWrapper> public Steam onClose(Runnable closeHandler) { return super.onClose(closeHandler); } + + /** + * 将流分组为指定大小的子流,最后一组可能小于指定大小 + * 与split方法类似,但这个方法保证每组大小相等(最后一组除外) + * + * @param size 每组大小 + * @return 分组后的流 + */ + public Steam> chunk(int size) { + if (size <= 0) { + throw new IllegalArgumentException("Size must be greater than 0"); + } + + return Steam.of( + () -> new Iterator<>() { + private final Iterator iterator = iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public List next() { + List chunk = new ArrayList<>(size); + for (int i = 0; i < size && iterator.hasNext(); i++) { + chunk.add(iterator.next()); + } + return chunk; + } + } + ); + } + + /** + * 获取流中第n个元素 + * + * @param n 位置(从0开始) + * @return 第n个元素 + */ + public Optional nth(long n) { + if (n < 0) { + return Optional.empty(); + } + return skip(n).findFirst(); + } + + /** + * 将流转换为循环流 + * 当到达流的末尾时,重新从开始处开始 + * + * @param times 循环次数 + * @return 循环后的流 + */ + public Steam cycle(int times) { + if (times <= 0) { + return empty(); + } + + List list = toList(); + if (list.isEmpty()) { + return empty(); + } + + return Steam.iterate(0, i -> i < times * list.size(), i -> i + 1) + .map(i -> list.get(i % list.size())); + } + + /** + * 对流中的元素进行滑动窗口操作 + * + * @param windowSize 窗口大小 + * @param step 步长 + * @return 滑动窗口流 + */ + public Steam> sliding(int windowSize, int step) { + if (windowSize <= 0 || step <= 0) { + throw new IllegalArgumentException("Window size and step must be greater than 0"); + } + + List source = toList(); + return Steam.of( + () -> new Iterator>() { + private int index = 0; + + @Override + public boolean hasNext() { + return index + windowSize <= source.size(); + } + + @Override + public List next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + List window = source.subList(index, index + windowSize); + index += step; + return window; + } + } + ); + } } diff --git a/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java b/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java index 38f3654d..1e9cf46c 100644 --- a/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java +++ b/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java @@ -426,4 +426,123 @@ class SteamTest { List list = asList(0, 1, 2); Assertions.assertEquals(asList(1, 2, 3), Steam.of(list).map(i -> i + 1).log().toList()); } + + @Test + void testChunk() { + List list = Arrays.asList(1, 2, 3, 4, 5); + + // 测试正常分组 + List> chunks = Steam.of(list).chunk(2).toList(); + Assertions.assertEquals(Arrays.asList( + Arrays.asList(1, 2), + Arrays.asList(3, 4), + Arrays.asList(5) + ), chunks); + + // 测试size为1的情况 + chunks = Steam.of(list).chunk(1).toList(); + Assertions.assertEquals(Arrays.asList( + Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5) + ), chunks); + + // 测试size大于列表长度的情况 + chunks = Steam.of(list).chunk(10).toList(); + Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2, 3, 4, 5)), chunks); + + // 测试非法参数 + Assertions.assertThrows(IllegalArgumentException.class, () -> + Steam.of(list).chunk(0).toList() + ); + } + + @Test + void testNth() { + List list = Arrays.asList(1, 2, 3, 4, 5); + + // 测试正常获取 + Assertions.assertEquals(1, Steam.of(list).nth(0).orElse(null)); + Assertions.assertEquals(3, Steam.of(list).nth(2).orElse(null)); + Assertions.assertEquals(5, Steam.of(list).nth(4).orElse(null)); + + // 测试越界情况 + Assertions.assertFalse(Steam.of(list).nth(5).isPresent()); + Assertions.assertFalse(Steam.of(list).nth(-1).isPresent()); + + // 测试空流 + Assertions.assertFalse(Steam.empty().nth(0).isPresent()); + } + + @Test + void testCycle() { + List list = Arrays.asList(1, 2, 3); + + // 测试正常循环 + List result = Steam.of(list).cycle(2).toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3), result); + + // 测试循环1次 + result = Steam.of(list).cycle(1).toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 3), result); + + // 测试循环0次 + result = Steam.of(list).cycle(0).toList(); + Assertions.assertEquals(Collections.emptyList(), result); + + // 测试空列表 + result = Steam.empty().cycle(2).toList(); + Assertions.assertEquals(Collections.emptyList(), result); + } + + @Test + void testSliding() { + List list = Arrays.asList(1, 2, 3, 4, 5); + + // 测试正常滑动窗口 + List> windows = Steam.of(list).sliding(3, 1).toList(); + Assertions.assertEquals(Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(2, 3, 4), + Arrays.asList(3, 4, 5) + ), windows); + + // 测试步长为2的滑动窗口 + windows = Steam.of(list).sliding(2, 2).toList(); + Assertions.assertEquals(Arrays.asList( + Arrays.asList(1, 2), + Arrays.asList(3, 4) + ), windows); + + // 测试非法参数 + Assertions.assertThrows(IllegalArgumentException.class, () -> + Steam.of(list).sliding(0, 1).toList() + ); + Assertions.assertThrows(IllegalArgumentException.class, () -> + Steam.of(list).sliding(1, 0).toList() + ); + } + + + + @Test + void testDistinctOptimized() { + List list = Arrays.asList(1, 2, 2, 3, 3, 3, 1); + + // 测试顺序流去重 + List result = Steam.of(list) + .distinct(String::valueOf) + .toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 3), result); + + // 测试并行流去重 + result = Steam.of(list) + .parallel() + .distinct(String::valueOf) + .toList(); + Assertions.assertEquals(3, result.size()); + Assertions.assertTrue(result.containsAll(Arrays.asList(1, 2, 3))); + } } -- Gitee From d38a18730a3d000bdb598c30f8fcf3047167786d Mon Sep 17 00:00:00 2001 From: Cizaii Date: Thu, 31 Oct 2024 15:56:57 +0800 Subject: [PATCH 2/3] recode --- .../streamquery/stream/core/stream/Steam.java | 95 +++++------- .../stream/core/stream/SteamTest.java | 136 +++++++----------- 2 files changed, 91 insertions(+), 140 deletions(-) diff --git a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java index 35064d3e..eb920826 100644 --- a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java +++ b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java @@ -212,7 +212,7 @@ public class Steam extends AbstractStreamWrapper> /** * 创建一个惰性拼接流,其元素是第一个流的所有元素,然后是第二个流的所有元素。 如果两个输入流都是有序的,则结果流是有序的,如果任一输入流是并行的,则结果流是并行的。 - * 当结果流关闭时,两个输入流的关闭处理程序都会被调用。 + * 当结果流关闭时,两个输入流的关闭处理程序都���被调用。 * *

从重复串行流进行拼接时可能会导致深度调用链甚至抛出 {@code StackOverflowException} */ @@ -1072,8 +1072,7 @@ public class Steam extends AbstractStreamWrapper> } /** - * 将流分组为指定大小的子流,最后一组可能小于指定大小 - * 与split方法类似,但这个方法保证每组大小相等(最后一组除外) + * 将流分组为指定大小的子流,最后一组可能小于指定大小 与split方法类似,但这个方法保证每组大小相等(最后一组除外) * * @param size 每组大小 * @return 分组后的流 @@ -1082,94 +1081,74 @@ public class Steam extends AbstractStreamWrapper> if (size <= 0) { throw new IllegalArgumentException("Size must be greater than 0"); } - + return Steam.of( - () -> new Iterator<>() { - private final Iterator iterator = iterator(); + () -> + new Iterator<>() { + private final Iterator iterator = iterator(); - @Override - public boolean hasNext() { + @Override + public boolean hasNext() { return iterator.hasNext(); - } + } - @Override - public List next() { + @Override + public List next() { List chunk = new ArrayList<>(size); for (int i = 0; i < size && iterator.hasNext(); i++) { - chunk.add(iterator.next()); + chunk.add(iterator.next()); } return chunk; - } - } - ); - } - - /** - * 获取流中第n个元素 - * - * @param n 位置(从0开始) - * @return 第n个元素 - */ - public Optional nth(long n) { - if (n < 0) { - return Optional.empty(); - } - return skip(n).findFirst(); + } + }); } /** - * 将流转换为循环流 - * 当到达流的末尾时,重新从开始处开始 - * - * @param times 循环次数 - * @return 循环后的流 + * 将流转换为无限循环流 当到达流的末尾时,重新从开始处开始 用户可以使用 limit() 来限制元素数量 + * + * @return 无限循环的流 */ - public Steam cycle(int times) { - if (times <= 0) { - return empty(); - } - + public Steam cycle() { List list = toList(); if (list.isEmpty()) { - return empty(); + return empty(); } - - return Steam.iterate(0, i -> i < times * list.size(), i -> i + 1) - .map(i -> list.get(i % list.size())); + + return Steam.iterate(0, i -> true, i -> i + 1).map(i -> list.get(i % list.size())); } /** * 对流中的元素进行滑动窗口操作 - * + * * @param windowSize 窗口大小 * @param step 步长 * @return 滑动窗口流 */ public Steam> sliding(int windowSize, int step) { if (windowSize <= 0 || step <= 0) { - throw new IllegalArgumentException("Window size and step must be greater than 0"); + throw new IllegalArgumentException("Window size and step must be greater than 0"); } - + List source = toList(); return Steam.of( - () -> new Iterator>() { - private int index = 0; - - @Override - public boolean hasNext() { + () -> + new Iterator>() { + private int index = 0; + + @Override + public boolean hasNext() { return index + windowSize <= source.size(); - } - - @Override - public List next() { + } + + @Override + public List next() { if (!hasNext()) { - throw new NoSuchElementException(); + throw new NoSuchElementException(); } List window = source.subList(index, index + windowSize); index += step; return window; - } - } - ); + } + }); } } diff --git a/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java b/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java index 1e9cf46c..6cde89d1 100644 --- a/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java +++ b/stream-core/src/test/java/org/dromara/streamquery/stream/core/stream/SteamTest.java @@ -430,118 +430,90 @@ class SteamTest { @Test void testChunk() { List list = Arrays.asList(1, 2, 3, 4, 5); - + // 测试正常分组 List> chunks = Steam.of(list).chunk(2).toList(); - Assertions.assertEquals(Arrays.asList( - Arrays.asList(1, 2), - Arrays.asList(3, 4), - Arrays.asList(5) - ), chunks); - + Assertions.assertEquals( + Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5)), chunks); + // 测试size为1的情况 chunks = Steam.of(list).chunk(1).toList(); - Assertions.assertEquals(Arrays.asList( - Arrays.asList(1), - Arrays.asList(2), - Arrays.asList(3), - Arrays.asList(4), - Arrays.asList(5) - ), chunks); - + Assertions.assertEquals( + Arrays.asList( + Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5)), + chunks); + // 测试size大于列表长度的情况 chunks = Steam.of(list).chunk(10).toList(); Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2, 3, 4, 5)), chunks); - + // 测试非法参数 - Assertions.assertThrows(IllegalArgumentException.class, () -> - Steam.of(list).chunk(0).toList() - ); + Assertions.assertThrows(IllegalArgumentException.class, () -> Steam.of(list).chunk(0).toList()); } @Test - void testNth() { - List list = Arrays.asList(1, 2, 3, 4, 5); - - // 测试正常获取 - Assertions.assertEquals(1, Steam.of(list).nth(0).orElse(null)); - Assertions.assertEquals(3, Steam.of(list).nth(2).orElse(null)); - Assertions.assertEquals(5, Steam.of(list).nth(4).orElse(null)); - - // 测试越界情况 - Assertions.assertFalse(Steam.of(list).nth(5).isPresent()); - Assertions.assertFalse(Steam.of(list).nth(-1).isPresent()); - - // 测试空流 - Assertions.assertFalse(Steam.empty().nth(0).isPresent()); - } + public void testCycle() { + // 测试普通循环 + List result = Steam.of(1, 2, 3).cycle().limit(8).toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2), result); - @Test - void testCycle() { - List list = Arrays.asList(1, 2, 3); - - // 测试正常循环 - List result = Steam.of(list).cycle(2).toList(); - Assertions.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3), result); - - // 测试循环1次 - result = Steam.of(list).cycle(1).toList(); - Assertions.assertEquals(Arrays.asList(1, 2, 3), result); - - // 测试循环0次 - result = Steam.of(list).cycle(0).toList(); - Assertions.assertEquals(Collections.emptyList(), result); - - // 测试空列表 - result = Steam.empty().cycle(2).toList(); - Assertions.assertEquals(Collections.emptyList(), result); + // 测试空流 + List emptyResult = Steam.empty().cycle().limit(5).toList(); + Assertions.assertTrue(emptyResult.isEmpty()); + + // 测试单个元素 + List singleResult = Steam.of("a").cycle().limit(3).toList(); + Assertions.assertEquals(Arrays.asList("a", "a", "a"), singleResult); + + // 测试不限制长度时是否能正常获取前几个元素 + List unlimitedResult = + Steam.of(1, 2) + .cycle() + .peek( + i -> { + // 确保流确实在运行,但不会无限运行 + Assertions.assertTrue(i <= 2); + }) + .limit(4) + .toList(); + Assertions.assertEquals(Arrays.asList(1, 2, 1, 2), unlimitedResult); } @Test void testSliding() { List list = Arrays.asList(1, 2, 3, 4, 5); - + // 测试正常滑动窗口 List> windows = Steam.of(list).sliding(3, 1).toList(); - Assertions.assertEquals(Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(2, 3, 4), - Arrays.asList(3, 4, 5) - ), windows); - + Assertions.assertEquals( + Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 4), Arrays.asList(3, 4, 5)), + windows); + // 测试步长为2的滑动窗口 windows = Steam.of(list).sliding(2, 2).toList(); - Assertions.assertEquals(Arrays.asList( - Arrays.asList(1, 2), - Arrays.asList(3, 4) - ), windows); - + Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)), windows); + // 测试非法参数 - Assertions.assertThrows(IllegalArgumentException.class, () -> - Steam.of(list).sliding(0, 1).toList() - ); - Assertions.assertThrows(IllegalArgumentException.class, () -> - Steam.of(list).sliding(1, 0).toList() - ); + Assertions.assertThrows( + IllegalArgumentException.class, () -> Steam.of(list).sliding(0, 1).toList()); + Assertions.assertThrows( + IllegalArgumentException.class, () -> Steam.of(list).sliding(1, 0).toList()); } - - @Test void testDistinctOptimized() { List list = Arrays.asList(1, 2, 2, 3, 3, 3, 1); - + // 测试顺序流去重 - List result = Steam.of(list) - .distinct(String::valueOf) - .toList(); + List result = Steam.of(list).distinct(String::valueOf).toList(); Assertions.assertEquals(Arrays.asList(1, 2, 3), result); - + // 测试并行流去重 - result = Steam.of(list) - .parallel() - .distinct(String::valueOf) - .toList(); + result = Steam.of(list).parallel().distinct(String::valueOf).toList(); Assertions.assertEquals(3, result.size()); Assertions.assertTrue(result.containsAll(Arrays.asList(1, 2, 3))); } -- Gitee From d211b1f5c3d6d8810b450882d94e182a6bf37aa5 Mon Sep 17 00:00:00 2001 From: Cizaii Date: Thu, 31 Oct 2024 15:57:55 +0800 Subject: [PATCH 3/3] recode --- .../java/org/dromara/streamquery/stream/core/stream/Steam.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java index eb920826..1fa4800e 100644 --- a/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java +++ b/stream-core/src/main/java/org/dromara/streamquery/stream/core/stream/Steam.java @@ -212,7 +212,7 @@ public class Steam extends AbstractStreamWrapper> /** * 创建一个惰性拼接流,其元素是第一个流的所有元素,然后是第二个流的所有元素。 如果两个输入流都是有序的,则结果流是有序的,如果任一输入流是并行的,则结果流是并行的。 - * 当结果流关闭时,两个输入流的关闭处理程序都���被调用。 + * 当结果流关闭时,两个输入流的关闭处理程序都会被调用。 * *

从重复串行流进行拼接时可能会导致深度调用链甚至抛出 {@code StackOverflowException} */ -- Gitee