From 20910bdc7db685b5a716551f8f84072b1f1729dd Mon Sep 17 00:00:00 2001 From: sundapeng Date: Fri, 8 Dec 2023 02:35:40 +0000 Subject: [PATCH] Don't update connection idle time for muted connections --- 0010-not-update-connection.patch | 83 ++++++++++++++++++++++++++++++++ kafka.spec | 5 +- 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 0010-not-update-connection.patch diff --git a/0010-not-update-connection.patch b/0010-not-update-connection.patch new file mode 100644 index 0000000..926c24b --- /dev/null +++ b/0010-not-update-connection.patch @@ -0,0 +1,83 @@ +diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java +index 1e710fdf6a..33f587f480 100644 +--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java ++++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java +@@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable { + attemptRead(channel); + } + +- if (channel.hasBytesBuffered()) { ++ if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) { + //this channel has bytes enqueued in intermediary buffers that we could not read + //(possibly because no memory). it may be the case that the underlying socket will + //not come up in the next poll() and so we need to remember this channel for the +@@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable { + private void mute(KafkaChannel channel) { + channel.mute(); + explicitlyMutedChannels.add(channel); ++ keysWithBufferedRead.remove(channel.selectionKey()); + } + + @Override +@@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable { + // Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted. + if (channel.maybeUnmute()) { + explicitlyMutedChannels.remove(channel); ++ if (channel.hasBytesBuffered()) { ++ keysWithBufferedRead.add(channel.selectionKey()); ++ } + } + } + +diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +index 293614432c..fbc5563392 100644 +--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ++++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +@@ -1560,8 +1560,15 @@ class SocketServerTest { + val testableSelector = testableServer.testableSelector + testableSelector.updateMinWakeup(2) + ++ val sleepTimeMs = idleTimeMs / 2 + 1 + val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) +- time.sleep(idleTimeMs + 1) ++ // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated ++ // additional calls to poll() should not update the channel last idle time ++ for (_ <- 0 to 3) { ++ time.sleep(sleepTimeMs) ++ testableSelector.operationCounts.clear() ++ testableSelector.waitForOperations(SelectorOperation.Poll, 1) ++ } + testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false) + + val otherSocket = sslConnect(testableServer) +@@ -1574,6 +1581,30 @@ class SocketServerTest { + } + } + ++ @Test ++ def testUnmuteChannelWithBufferedReceives(): Unit = { ++ val time = new MockTime() ++ props ++= sslServerProps ++ val testableServer = new TestableSocketServer(time = time) ++ testableServer.startup() ++ val proxyServer = new ProxyServer(testableServer) ++ try { ++ val testableSelector = testableServer.testableSelector ++ val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) ++ testableSelector.operationCounts.clear() ++ testableSelector.waitForOperations(SelectorOperation.Poll, 1) ++ val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead") ++ assertEquals(Set.empty, keysWithBufferedRead.asScala) ++ processRequest(testableServer.dataPlaneRequestChannel, request) ++ // buffered requests should be processed after channel is unmuted ++ receiveRequest(testableServer.dataPlaneRequestChannel) ++ socket.close() ++ } finally { ++ proxyServer.close() ++ shutdownServerAndMetrics(testableServer) ++ } ++ } ++ + /** + * Tests exception handling in [[Processor.processCompletedReceives]]. Exception is + * injected into [[Selector.mute]] which is used to mute the channel when a receive is complete. diff --git a/kafka.spec b/kafka.spec index 2c039e8..a779f28 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 9 +Release: 10 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -21,6 +21,7 @@ Patch5: 0006-NPE-subscriptionState.patch Patch6: 0007-fix-payload-incorrectly.patch Patch7: 0008-Cast-SMT-allow-null.patch Patch8: 0009-format-RocksDBConfigSetter.patch +Patch9: 0010-not-update-connection.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -72,6 +73,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Fri Dec 08 2023 sundapeng - 2.8.2-10 +- Don't update connection idle time for muted connections * Fri Dec 08 2023 sundapeng - 2.8.2-9 - Fix the formatting of example RocksDBConfigSetter * Fri Dec 08 2023 sundapeng - 2.8.2-8 -- Gitee