From d8872eb8e255538a0e81ddf41ce28de1c7dcff0a Mon Sep 17 00:00:00 2001 From: sundapeng Date: Fri, 8 Dec 2023 06:51:04 +0000 Subject: [PATCH] SessionWindows are closed too early --- 0015-SessionWindows-closed-early.patch | 103 +++++++++++++++++++++++++ kafka.spec | 5 +- 2 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 0015-SessionWindows-closed-early.patch diff --git a/0015-SessionWindows-closed-early.patch b/0015-SessionWindows-closed-early.patch new file mode 100644 index 0000000..dfd92e5 --- /dev/null +++ b/0015-SessionWindows-closed-early.patch @@ -0,0 +1,103 @@ +diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +index 5648e8f0a3..24e9e21ad7 100644 +--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java ++++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +@@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce + + final long timestamp = context().timestamp(); + observedStreamTime = Math.max(observedStreamTime, timestamp); +- final long closeTime = observedStreamTime - windows.gracePeriodMs(); ++ final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); + + final List, Agg>> merged = new ArrayList<>(); + final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); +diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +index ab2adbfbb1..244ea9f4fe 100644 +--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ++++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +@@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest { + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("OnTime1", "1"); + +- // dummy record to advance stream time = 1 +- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); +- processor.process("dummy", "dummy"); ++ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window ++ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); ++ processor.process("dummy", "dummy"); + + try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) { +@@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest { + assertThat( + appender.getMessages(), + hasItem("Skipping record for expired window." + +- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]") ++ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]") + ); + } + +@@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest { + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("OnTime1", "1"); + +- // dummy record to advance stream time = 1 +- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); +- processor.process("dummy", "dummy"); ++ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window ++ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); ++ processor.process("dummy", "dummy"); + + // delayed record arrives on time, should not be skipped + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("OnTime2", "1"); + +- // dummy record to advance stream time = 2 +- context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null)); +- processor.process("dummy", "dummy"); ++ // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window ++ context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders())); ++ processor.process("dummy", "dummy"); + + // delayed record arrives late + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); +@@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest { + assertThat( + appender.getMessages(), + hasItem("Skipping record for expired window." + +- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]") ++ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]") + ); + } + +diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +index 46a8ab8dcf..e0b7957e01 100644 +--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ++++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +@@ -581,7 +581,7 @@ public class SuppressScenarioTest { + // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace + inputTopic.pipeInput("k1", "v1", 1L); + // any record in the same partition advances stream time (note the key is different) +- inputTopic.pipeInput("k2", "v1", 6L); ++ inputTopic.pipeInput("k2", "v1", 11L); + // late event for first window - this should get dropped from all streams, since the first window is now closed. + inputTopic.pipeInput("k1", "v1", 5L); + // just pushing stream time forward to flush the other events through. +@@ -594,7 +594,7 @@ public class SuppressScenarioTest { + new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L), + new KeyValueTimestamp<>("[k1@0/5]", null, 5L), + new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), +- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L), ++ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L), + new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L) + ) + ); +@@ -602,7 +602,7 @@ public class SuppressScenarioTest { + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), +- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L) ++ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L) + ) + ); + } diff --git a/kafka.spec b/kafka.spec index 4a78e60..99f6359 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 14 +Release: 15 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -26,6 +26,7 @@ Patch10: 0011-ConfigEntry.patch Patch11: 0012-incorrectly-LeaderElectionCommand.patch Patch12: 0013-AlterIsr.patch Patch13: 0014-override-toString.patch +Patch14: 0015-SessionWindows-closed-early.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -77,6 +78,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Fri Dec 08 2023 sundapeng - 2.8.2-15 +- SessionWindows are closed too early * Fri Dec 08 2023 sundapeng - 2.8.2-14 - override toString method to show correct value in doc * Fri Dec 08 2023 sundapeng - 2.8.2-13 -- Gitee