From f86cd7db4359a36c4c4db317ab512eabfb1b2dca Mon Sep 17 00:00:00 2001 From: sundapeng Date: Tue, 28 Nov 2023 05:33:35 +0000 Subject: [PATCH] Fix NPE in SubscriptionState --- 0006-NPE-subscriptionState.patch | 51 ++++++++++++++++++++++++++++++++ kafka.spec | 6 +++- 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 0006-NPE-subscriptionState.patch diff --git a/0006-NPE-subscriptionState.patch b/0006-NPE-subscriptionState.patch new file mode 100644 index 0000000..f268025 --- /dev/null +++ b/0006-NPE-subscriptionState.patch @@ -0,0 +1,51 @@ +diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +index 30491110a3..ce81aa1b95 100644 +--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ++++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +@@ -539,10 +539,13 @@ public class SubscriptionState { + + synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { + TopicPartitionState topicPartitionState = assignedState(tp); +- if (isolationLevel == IsolationLevel.READ_COMMITTED) ++ if (topicPartitionState.position == null) { ++ return null; ++ } else if (isolationLevel == IsolationLevel.READ_COMMITTED) { + return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset; +- else ++ } else { + return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset; ++ } + } + + synchronized Long partitionLead(TopicPartition tp) { +diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +index d6e88008b5..d19234fe8a 100644 +--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ++++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; + import org.apache.kafka.clients.consumer.OffsetResetStrategy; + import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; ++import org.apache.kafka.common.IsolationLevel; + import org.apache.kafka.common.Node; + import org.apache.kafka.common.TopicPartition; + import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +@@ -794,4 +795,18 @@ public class SubscriptionStateTest { + assertFalse(state.isOffsetResetNeeded(tp0)); + } + ++ @Test ++ public void nullPositionLagOnNoPosition() { ++ state.assignFromUser(Collections.singleton(tp0)); ++ ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); ++ ++ state.updateHighWatermark(tp0, 1L); ++ state.updateLastStableOffset(tp0, 1L); ++ ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); ++ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); ++ } ++ + } diff --git a/kafka.spec b/kafka.spec index 821e462..465871d 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 5 +Release: 6 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -17,6 +17,7 @@ Patch1: 0002-CVE-2022-41881.patch Patch2: 0003-CVE-2023-34455.patch Patch3: 0004-CVE-2022-42004.patch Patch4: 0005-CVE-2016-3189.patch +Patch5: 0006-NPE-subscriptionState.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -68,6 +69,9 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Mon Nov 27 2023 sundapeng - 2.8.2-6 +- fix NPE in subscriptionState.patch + * Fri Nov 03 2023 sundapeng - 2.8.2-5 -fix CVE-2016-3189 -- Gitee