From 5a01e21ba385278e928dab351dde66fffc133611 Mon Sep 17 00:00:00 2001 From: sundapeng Date: Fri, 8 Dec 2023 03:04:22 +0000 Subject: [PATCH] FetchSessionCache may cause starvation for partitions when FetchResponse is full --- 0011-fix-FetchSessionCache-error.log | 140 +++++++++++++++++++++++++++ kafka.spec | 5 +- 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 0011-fix-FetchSessionCache-error.log diff --git a/0011-fix-FetchSessionCache-error.log b/0011-fix-FetchSessionCache-error.log new file mode 100644 index 0000000..6f1de31 --- /dev/null +++ b/0011-fix-FetchSessionCache-error.log @@ -0,0 +1,140 @@ +diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala +index c6280f384d..5e1a342bc7 100644 +--- a/core/src/main/scala/kafka/server/FetchSession.scala ++++ b/core/src/main/scala/kafka/server/FetchSession.scala +@@ -428,7 +428,7 @@ class IncrementalFetchContext(private val time: Time, + val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) + if (mustRespond) { + nextElement = element +- if (updateFetchContextAndRemoveUnselected) { ++ if (updateFetchContextAndRemoveUnselected && FetchResponse.recordsSize(respData) > 0) { + session.partitionMap.remove(cachedPart) + session.partitionMap.mustAdd(cachedPart) + } +diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +index e00395a981..e6f8ce214a 100755 +--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ++++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +@@ -22,12 +22,16 @@ import kafka.utils.MockTime + import org.apache.kafka.common.TopicPartition + import org.apache.kafka.common.message.FetchResponseData + import org.apache.kafka.common.protocol.Errors ++import org.apache.kafka.common.record.CompressionType ++import org.apache.kafka.common.record.MemoryRecords ++import org.apache.kafka.common.record.SimpleRecord + import org.apache.kafka.common.record.Records + import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INVALID_SESSION_ID} + import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata} + import org.apache.kafka.common.utils.Utils + import org.junit.jupiter.api.Assertions._ + import org.junit.jupiter.api.{Test, Timeout} ++import scala.collection.mutable.ArrayBuffer + + @Timeout(120) + class FetchSessionTest { +@@ -256,7 +260,7 @@ class FetchSessionTest { + + // Verify that SESSIONLESS requests get a SessionlessFetchContext + val context = fetchManager.newContext(JFetchMetadata.LEGACY, +- new util.HashMap[TopicPartition, FetchRequest.PartitionData](), EMPTY_PART_LIST, true) ++ new util.HashMap[TopicPartition, FetchRequest.PartitionData](), EMPTY_PART_LIST, true) + assertEquals(classOf[SessionlessFetchContext], context.getClass) + + // Create a new fetch session with a FULL fetch request +@@ -695,4 +699,96 @@ class FetchSessionTest { + assertEquals(resp1.sessionId, resp4.sessionId) + assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData.keySet) + } ++ ++ @Test ++ def testDeprioritizesPartitionsWithRecordsOnly(): Unit = { ++ val time = new MockTime() ++ val cache = new FetchSessionCache(10, 1000) ++ val fetchManager = new FetchManager(time, cache) ++ val tp1 = new TopicPartition("foo", 1) ++ val tp2 = new TopicPartition("bar", 2) ++ val tp3 = new TopicPartition("zar", 3) ++ ++ val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] ++ reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) ++ reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) ++ reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) ++ ++ // Full fetch context returns all partitions in the response ++ val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData, EMPTY_PART_LIST, isFollower = false) ++ assertEquals(classOf[FullFetchContext], context1.getClass) ++ ++ val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] ++ respData1.put(tp1, new FetchResponseData.PartitionData() ++ .setPartitionIndex(tp1.partition) ++ .setHighWatermark(50) ++ .setLastStableOffset(50) ++ .setLogStartOffset(0)) ++ respData1.put(tp2, new FetchResponseData.PartitionData() ++ .setPartitionIndex(tp2.partition) ++ .setHighWatermark(50) ++ .setLastStableOffset(50) ++ .setLogStartOffset(0)) ++ respData1.put(tp3, new FetchResponseData.PartitionData() ++ .setPartitionIndex(tp3.partition) ++ .setHighWatermark(50) ++ .setLastStableOffset(50) ++ .setLogStartOffset(0)) ++ ++ val resp1 = context1.updateAndGenerateResponseData(respData1) ++ assertEquals(Errors.NONE, resp1.error) ++ assertNotEquals(INVALID_SESSION_ID, resp1.sessionId) ++ assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData.keySet()) ++ ++ // Incremental fetch context returns partitions with changes but only deprioritizes ++ // the partitions with records ++ val context2 = fetchManager.newContext(new JFetchMetadata(resp1.sessionId, 1), reqData, EMPTY_PART_LIST, isFollower = false) ++ assertEquals(classOf[IncrementalFetchContext], context2.getClass) ++ ++ // Partitions are ordered in the session as per last response ++ assertPartitionsOrder(context2, Seq(tp1, tp2, tp3)) ++ ++ // Response is empty ++ val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] ++ val resp2 = context2.updateAndGenerateResponseData(respData2) ++ assertEquals(Errors.NONE, resp2.error) ++ assertEquals(resp1.sessionId, resp2.sessionId) ++ assertEquals(Collections.emptySet(), resp2.responseData.keySet) ++ ++ // All partitions with changes should be returned. ++ val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] ++ respData3.put(tp1, new FetchResponseData.PartitionData() ++ .setPartitionIndex(tp1.partition) ++ .setHighWatermark(60) ++ .setLastStableOffset(50) ++ .setLogStartOffset(0)) ++ respData3.put(tp2, new FetchResponseData.PartitionData() ++ .setPartitionIndex(tp2.partition) ++ .setHighWatermark(60) ++ .setLastStableOffset(50) ++ .setLogStartOffset(0) ++ .setRecords(MemoryRecords.withRecords(CompressionType.NONE, ++ new SimpleRecord(100, null)))) ++ respData3.put(tp3, new FetchResponseData.PartitionData() ++ .setPartitionIndex(tp3.partition) ++ .setHighWatermark(50) ++ .setLastStableOffset(50) ++ .setLogStartOffset(0)) ++ val resp3 = context2.updateAndGenerateResponseData(respData3) ++ assertEquals(Errors.NONE, resp3.error) ++ assertEquals(resp1.sessionId, resp3.sessionId) ++ assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData.keySet) ++ ++ // Only the partitions whose returned records in the last response ++ // were deprioritized ++ assertPartitionsOrder(context2, Seq(tp1, tp3, tp2)) ++ } ++ ++ private def assertPartitionsOrder(context: FetchContext, partitions: Seq[TopicPartition]): Unit = { ++ val partitionsInContext = ArrayBuffer.empty[TopicPartition] ++ context.foreachPartition { (tp, _) => ++ partitionsInContext += tp ++ } ++ assertEquals(partitions, partitionsInContext.toSeq) ++ } + } diff --git a/kafka.spec b/kafka.spec index a779f28..e186964 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 10 +Release: 11 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -22,6 +22,7 @@ Patch6: 0007-fix-payload-incorrectly.patch Patch7: 0008-Cast-SMT-allow-null.patch Patch8: 0009-format-RocksDBConfigSetter.patch Patch9: 0010-not-update-connection.patch +Patch10: 0011-fix-FetchSessionCache-error.log BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -73,6 +74,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Fri Dec 08 2023 sundapeng - 2.8.2-11 +- FetchSessionCache may cause starvation for partitions when FetchResponse is full * Fri Dec 08 2023 sundapeng - 2.8.2-10 - Don't update connection idle time for muted connections * Fri Dec 08 2023 sundapeng - 2.8.2-9 -- Gitee