diff --git a/0006-NPE-subscriptionState.patch b/0006-NPE-subscriptionState.patch new file mode 100644 index 0000000000000000000000000000000000000000..f268025a37e6171ebf4fd9ef0bf4503cf350876d --- /dev/null +++ b/0006-NPE-subscriptionState.patch @@ -0,0 +1,51 @@ +diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +index 30491110a3..ce81aa1b95 100644 +--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ++++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +@@ -539,10 +539,13 @@ public class SubscriptionState { + + synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { + TopicPartitionState topicPartitionState = assignedState(tp); +- if (isolationLevel == IsolationLevel.READ_COMMITTED) ++ if (topicPartitionState.position == null) { ++ return null; ++ } else if (isolationLevel == IsolationLevel.READ_COMMITTED) { + return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset; +- else ++ } else { + return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset; ++ } + } + + synchronized Long partitionLead(TopicPartition tp) { +diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +index d6e88008b5..d19234fe8a 100644 +--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ++++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; + import org.apache.kafka.clients.consumer.OffsetResetStrategy; + import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; ++import org.apache.kafka.common.IsolationLevel; + import org.apache.kafka.common.Node; + import org.apache.kafka.common.TopicPartition; + import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +@@ -794,4 +795,18 @@ public class SubscriptionStateTest { + assertFalse(state.isOffsetResetNeeded(tp0)); + } + ++ @Test ++ public void nullPositionLagOnNoPosition() { ++ state.assignFromUser(Collections.singleton(tp0)); ++ ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); ++ ++ state.updateHighWatermark(tp0, 1L); ++ state.updateLastStableOffset(tp0, 1L); ++ ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); ++ } ++ + } diff --git a/kafka.spec b/kafka.spec index 821e462db72a5507f07eeed4948070db05296ce6..465871d5daaeca1616f115f23061b23beeff7b8f 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 5 +Release: 6 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -17,6 +17,7 @@ Patch1: 0002-CVE-2022-41881.patch Patch2: 0003-CVE-2023-34455.patch Patch3: 0004-CVE-2022-42004.patch Patch4: 0005-CVE-2016-3189.patch +Patch5: 0006-NPE-subscriptionState.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -68,6 +69,9 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Mon Nov 27 2023 sundapeng - 2.8.2-6 +- fix NPE in subscriptionState.patch + * Fri Nov 03 2023 sundapeng - 2.8.2-5 -fix CVE-2016-3189