diff --git a/0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch b/0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch new file mode 100644 index 0000000000000000000000000000000000000000..957df93b56a83eda7f7f3d8299f03604ffefb7f4 --- /dev/null +++ b/0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch @@ -0,0 +1,167 @@ +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +index 8cab06be11..17a6d1dbfb 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +@@ -140,6 +140,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas + if (IS_CLOSED_UPDATER.get(this) == TRUE) { + log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer); + consumer.disconnect(); ++ return; + } + + if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +index 9694584025..1d74d00776 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher + protected final Subscription subscription; + + private CompletableFuture closeFuture = null; +- private final String name; ++ protected final String name; + protected final Rate msgDrop; + protected static final AtomicIntegerFieldUpdater + TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +index e5e5349651..da7fe56bde 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +@@ -39,6 +39,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; + import org.apache.pulsar.common.api.proto.KeySharedMeta; + import org.apache.pulsar.common.api.proto.KeySharedMode; + import org.apache.pulsar.common.protocol.Commands; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; + + public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers { + +@@ -84,6 +86,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis + + @Override + public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { ++ if (IS_CLOSED_UPDATER.get(this) == TRUE) { ++ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer); ++ consumer.disconnect(); ++ return; ++ } + super.addConsumer(consumer); + try { + selector.addConsumer(consumer); +@@ -168,4 +175,6 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis + public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) { + return (ksm.getKeySharedMode() == this.keySharedMode); + } ++ ++ private static final Logger log = LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class); + } +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +index 90db639fde..e5b6f68bdf 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +@@ -18,6 +18,7 @@ + */ + package org.apache.pulsar.broker.service.persistent; + ++import com.google.common.annotations.VisibleForTesting; + import io.netty.util.concurrent.FastThreadLocal; + import java.util.ArrayList; + import java.util.Collections; +@@ -99,8 +100,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi + } + } + ++ @VisibleForTesting ++ public StickyKeyConsumerSelector getSelector() { ++ return selector; ++ } ++ + @Override + public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { ++ if (IS_CLOSED_UPDATER.get(this) == TRUE) { ++ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer); ++ consumer.disconnect(); ++ return; ++ } + super.addConsumer(consumer); + try { + selector.addConsumer(consumer); +diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +index 31e6f5579b..2b58ddfa88 100644 +--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java ++++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +@@ -293,6 +293,7 @@ public class PersistentDispatcherFailoverConsumerTest { + assertEquals(isActive, change.isIsActive()); + } + ++ + @Test + public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); +diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +index f319b7ce4a..b21f80ce1f 100644 +--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java ++++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes; + import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; + import org.apache.pulsar.broker.service.RedeliveryTracker; + import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; ++import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; + import org.apache.pulsar.common.api.proto.MessageMetadata; + import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; + import org.apache.pulsar.common.protocol.Commands; +@@ -67,6 +68,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { + private ServiceConfiguration configMock; + + private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher; ++ private StickyKeyConsumerSelector selector; + + final String topicName = "non-persistent://public/default/testTopic"; + +@@ -93,6 +95,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { + doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies(); + + subscriptionMock = mock(NonPersistentSubscription.class); ++ selector = new HashRangeAutoSplitStickyKeyConsumerSelector(); + + try (MockedStatic rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) { + rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded( +@@ -102,8 +105,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { + any(DispatchRateLimiter.Type.class))) + .thenReturn(false); + nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers( +- topicMock, subscriptionMock, +- new HashRangeAutoSplitStickyKeyConsumerSelector()); ++ topicMock, subscriptionMock, selector); + } + } + +diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +index 99a66f44ac..587ef122ec 100644 +--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java ++++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +@@ -37,6 +37,7 @@ import static org.mockito.Mockito.times; + import static org.mockito.Mockito.verify; + import static org.mockito.Mockito.when; + import static org.testng.Assert.assertEquals; ++import static org.testng.Assert.assertTrue; + import static org.testng.Assert.fail; + import io.netty.buffer.ByteBuf; + import io.netty.buffer.Unpooled; +@@ -156,6 +157,16 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { + } + } + ++ @Test(timeOut = 10000) ++ public void testAddConsumerWhenClosed() throws Exception { ++ persistentDispatcher.close().get(); ++ Consumer consumer = mock(Consumer.class); ++ persistentDispatcher.addConsumer(consumer); ++ verify(consumer, times(1)).disconnect(); ++ assertEquals(0, persistentDispatcher.getConsumers().size()); ++ assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty()); ++ } ++ + @Test + public void testSendMarkerMessage() { + try { diff --git a/pulsar.spec b/pulsar.spec index d8ae4cd95b84974babcf2c09bfde697af864f144..e1f93aeec402768df2c6a5e05b857a504a90d7e3 100644 --- a/pulsar.spec +++ b/pulsar.spec @@ -1,6 +1,6 @@ %define debug_package %{nil} %define pulsar_ver 2.10.4 -%define pkg_ver 17 +%define pkg_ver 18 %define _prefix /opt/pulsar Summary: Cloud-Native, Distributed Messaging and Streaming Name: pulsar @@ -27,6 +27,7 @@ Patch0014: 0014-CVE-2023-32732.patch Patch0015: 0015-fix-no-messages.patch Patch0016: 0016-handle-exception.patch Patch0017: 0017-return-earliest-position.patch +Patch0018: 0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch BuildRoot: /root/rpmbuild/BUILDROOT/ BuildRequires: java-1.8.0-openjdk-devel,maven,systemd Requires: java-1.8.0-openjdk,systemd @@ -56,6 +57,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin %patch0015 -p1 %patch0016 -p1 %patch0017 -p1 +%patch0018 -p1 %build mvn clean install -Pcore-modules,-main -DskipTests @@ -81,6 +83,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu exit 0 %changelog +* Fri Dec 8 2023 Dapeng Sun - 2.10.4-18 +- Return if AbstractDispatcherSingleActiveConsumer closed * Fri Dec 8 2023 Dapeng Sun - 2.10.4-17 - Fix return the earliest position when query position by timestamp. * Fri Dec 8 2023 Dapeng Sun - 2.10.4-16