diff --git a/0001-adopt-huaweimaven.patch b/0001-adopt-huaweimaven.patch index ea4c7d6d502bcd90370524c8fd1e8686a0ea899e..aba2b6d361142e536418c424bc19fc64e51cdd8e 100644 --- a/0001-adopt-huaweimaven.patch +++ b/0001-adopt-huaweimaven.patch @@ -1,20 +1,16 @@ -diff -Naur kafka-2.8.1-src/build.gradle kafka-2.8.1-src-change/build.gradle ---- kafka-2.8.1-src/build.gradle 2021-09-14 21:03:12.000000000 +0800 -+++ kafka-2.8.1-src-change/build.gradle 2022-08-11 22:41:08.381225132 +0800 -@@ -19,11 +19,7 @@ +diff -uNr kafka-3.6.2-src-new/build.gradle kafka-3.6.2-src.n/build.gradle +--- kafka-3.6.2-src-new/build.gradle 2024-05-07 17:12:08.454881115 +0800 ++++ kafka-3.6.2-src.n/build.gradle 2024-05-07 17:13:33.858883040 +0800 +@@ -20,7 +20,7 @@ buildscript { repositories { - mavenCentral() -- jcenter() -- maven { -- url "https://plugins.gradle.org/m2/" -- } + maven{url 'https://repo.huaweicloud.com/repository/maven/'} } - apply from: file('gradle/buildscript.gradle'), to: buildscript apply from: "$rootDir/gradle/dependencies.gradle" -@@ -54,7 +50,7 @@ + +@@ -114,7 +114,7 @@ allprojects { repositories { @@ -22,27 +18,16 @@ diff -Naur kafka-2.8.1-src/build.gradle kafka-2.8.1-src-change/build.gradle + maven{url 'https://repo.huaweicloud.com/repository/maven/'} } - apply plugin: 'idea' -diff -Naur kafka-2.8.1-src/gradle/buildscript.gradle kafka-2.8.1-src-change/gradle/buildscript.gradle ---- kafka-2.8.1-src/gradle/buildscript.gradle 2021-09-14 21:03:12.000000000 +0800 -+++ kafka-2.8.1-src-change/gradle/buildscript.gradle 2022-08-11 23:00:43.143217999 +0800 -@@ -17,7 +17,7 @@ - repositories { - // For license plugin. - maven { -- url 'https://dl.bintray.com/content/netflixoss/external-gradle-plugins/' -+ url 'https://repo.huaweicloud.com/repository/maven/' - } - } - } -diff -Naur kafka-2.8.1-src/gradle/wrapper/gradle-wrapper.properties kafka-2.8.1-src-change/gradle/wrapper/gradle-wrapper.properties ---- kafka-2.8.1-src/gradle/wrapper/gradle-wrapper.properties 2021-09-14 21:03:12.000000000 +0800 -+++ kafka-2.8.1-src-change/gradle/wrapper/gradle-wrapper.properties 2022-08-11 22:41:33.044813236 +0800 -@@ -1,5 +1,6 @@ + dependencyUpdates { +diff -uNr kafka-3.6.2-src-new/gradle/wrapper/gradle-wrapper.properties kafka-3.6.2-src.n/gradle/wrapper/gradle-wrapper.properties +--- kafka-3.6.2-src-new/gradle/wrapper/gradle-wrapper.properties 2024-05-07 17:12:08.341881113 +0800 ++++ kafka-3.6.2-src.n/gradle/wrapper/gradle-wrapper.properties 2024-05-07 17:14:27.977884259 +0800 +@@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists --distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.1-all.zip -+#distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.1-all.zip -+distributionUrl=https\://repo.huaweicloud.com/gradle/gradle-6.8.1-all.zip + distributionSha256Sum=7c3ad722e9b0ce8205b91560fd6ce8296ac3eadf065672242fd73c06b8eeb6ee +-distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-all.zip ++distributionUrl=https\://repo.huaweicloud.com/gradle/gradle-8.2.1-all.zip + networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/0002-CVE-2022-41881.patch b/0002-CVE-2022-41881.patch deleted file mode 100644 index ca36e06f358bf5884d4ae24e6ecc187e6bae9b5b..0000000000000000000000000000000000000000 --- a/0002-CVE-2022-41881.patch +++ /dev/null @@ -1,13 +0,0 @@ -diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle -index 8dcf7af2f2..7b7974b5aa 100644 ---- a/gradle/dependencies.gradle -+++ b/gradle/dependencies.gradle -@@ -100,7 +100,7 @@ versions += [ - mavenArtifact: "3.8.1", - metrics: "2.2.0", - mockito: "3.6.0", -- netty: "4.1.73.Final", -+ netty: "4.1.86.Final", - owaspDepCheckPlugin: "6.0.3", - powermock: "2.0.9", - reflections: "0.9.12", diff --git a/0003-CVE-2023-34455.patch b/0003-CVE-2023-34455.patch deleted file mode 100644 index fb93b1d3601a88d3228def42b4739f310e81dd5c..0000000000000000000000000000000000000000 --- a/0003-CVE-2023-34455.patch +++ /dev/null @@ -1,33 +0,0 @@ -diff --git a/LICENSE-binary b/LICENSE-binary -index 7d885849c6..8f4b455502 100644 ---- a/LICENSE-binary -+++ b/LICENSE-binary -@@ -252,7 +252,7 @@ scala-library-2.13.5 - scala-logging_2.13-3.9.2 - scala-reflect-2.13.5 - scala-java8-compat_2.13-0.9.1 --snappy-java-1.1.8.1 -+snappy-java-1.1.10.1 - zookeeper-3.5.9 - zookeeper-jute-3.5.9 - -@@ -318,4 +318,4 @@ paranamer-2.8, see: licenses/paranamer-BSD-3-clause - Do What The F*ck You Want To Public License - see: licenses/DWTFYWTPL - --reflections-0.9.12 -\ No newline at end of file -+reflections-0.9.12 -diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle -index 8dcf7af2f2..4565ef664b 100644 ---- a/gradle/dependencies.gradle -+++ b/gradle/dependencies.gradle -@@ -113,7 +113,7 @@ versions += [ - scoveragePlugin: "5.0.0", - shadowPlugin: "6.1.0", - slf4j: "1.7.30", -- snappy: "1.1.8.1", -+ snappy: "1.1.10.1", - spotbugs: "4.1.4", - spotbugsPlugin: "4.6.0", - spotlessPlugin: "5.8.2", diff --git a/0004-CVE-2022-42004.patch b/0004-CVE-2022-42004.patch deleted file mode 100644 index d32918bb283432045ce75c12e68b4dd4d7aae1db..0000000000000000000000000000000000000000 --- a/0004-CVE-2022-42004.patch +++ /dev/null @@ -1,36 +0,0 @@ -diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala -index 2fc55bd7b6..e4611256d4 100755 ---- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala -+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala -@@ -21,7 +21,6 @@ import java.time.{Duration, Instant} - import java.util.Properties - import com.fasterxml.jackson.dataformat.csv.CsvMapper - import com.fasterxml.jackson.module.scala.DefaultScalaModule --import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper - import kafka.utils._ - import kafka.utils.Implicits._ - import org.apache.kafka.clients.admin._ -@@ -146,7 +145,7 @@ object ConsumerGroupCommand extends Logging { - } - // Example: CsvUtils().readerFor[CsvRecordWithoutGroup] - private[admin] case class CsvUtils() { -- val mapper = new CsvMapper with ScalaObjectMapper -+ val mapper = new CsvMapper - mapper.registerModule(DefaultScalaModule) - def readerFor[T <: CsvRecord : ClassTag] = { - val schema = getSchema[T] -diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle -index 8dcf7af2f2..ea0c1af419 100644 ---- a/gradle/dependencies.gradle -+++ b/gradle/dependencies.gradle -@@ -66,8 +66,8 @@ versions += [ - grgit: "4.1.1", - httpclient: "4.5.13", - easymock: "4.2", -- jackson: "2.10.5", -- jacksonDatabind: "2.10.5.1", -+ jackson: "2.13.4", -+ jacksonDatabind: "2.13.4.2", - jacoco: "0.8.5", - javassist: "3.27.0-GA", - jetty: "9.4.48.v20220622", diff --git a/0005-CVE-2016-3189.patch b/0005-CVE-2016-3189.patch deleted file mode 100644 index f02c0509ecf13b5f6a250e150b023313ca40df80..0000000000000000000000000000000000000000 --- a/0005-CVE-2016-3189.patch +++ /dev/null @@ -1,103 +0,0 @@ -diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle -index 8dcf7af2f2..5d56f19325 100644 ---- a/gradle/dependencies.gradle -+++ b/gradle/dependencies.gradle -@@ -104,7 +104,7 @@ versions += [ - owaspDepCheckPlugin: "6.0.3", - powermock: "2.0.9", - reflections: "0.9.12", -- rocksDB: "5.18.4", -+ rocksDB: "6.19.3", - scalaCollectionCompat: "2.3.0", - scalafmt: "1.5.1", - scalaJava8Compat : "0.9.1", -diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java -index 2e2cb41100..ae08255fb6 100644 ---- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java -+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java -@@ -123,11 +123,12 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - // (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc - //columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig()); - // (4-5) below: -- dbOptions.setMaxBackgroundFlushes(4); -+ /* dbOptions.setMaxBackgroundFlushes(4); - columnFamilyOptions.setDisableAutoCompactions(true); - columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30); - columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30); -- columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30); -+ columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);*/ -+ super.prepareForBulkLoad(); - return this; - } - -@@ -185,7 +186,7 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - } - - @Override -- public Options setComparator(final AbstractComparator> comparator) { -+ public Options setComparator(final AbstractComparator comparator) { - columnFamilyOptions.setComparator(comparator); - return this; - } -@@ -342,6 +343,7 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - return this; - } - -+ @Deprecated - @Override - public int maxBackgroundCompactions() { - return dbOptions.maxBackgroundCompactions(); -@@ -358,6 +360,7 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - return dbOptions.statistics(); - } - -+ @Deprecated - @Override - public void setBaseBackgroundCompactions(final int baseBackgroundCompactions) { - dbOptions.setBaseBackgroundCompactions(baseBackgroundCompactions); -@@ -368,6 +371,7 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - return dbOptions.baseBackgroundCompactions(); - } - -+ @Deprecated - @Override - public Options setMaxBackgroundCompactions(final int maxBackgroundCompactions) { - dbOptions.setMaxBackgroundCompactions(maxBackgroundCompactions); -@@ -375,8 +379,9 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - } - - @Override -- public void setMaxSubcompactions(final int maxSubcompactions) { -+ public Options setMaxSubcompactions(final int maxSubcompactions) { - dbOptions.setMaxSubcompactions(maxSubcompactions); -+ return this; - } - - @Override -@@ -384,11 +389,13 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends - return dbOptions.maxSubcompactions(); - } - -+ @Deprecated - @Override - public int maxBackgroundFlushes() { - return dbOptions.maxBackgroundFlushes(); - } - -+ @Deprecated - @Override - public Options setMaxBackgroundFlushes(final int maxBackgroundFlushes) { - dbOptions.setMaxBackgroundFlushes(maxBackgroundFlushes); -diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java -index e1c5df31d0..5b2b9cc7eb 100644 ---- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java -+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java -@@ -136,7 +136,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS - tableConfig.setBlockSize(BLOCK_SIZE); - - filter = new BloomFilter(); -- tableConfig.setFilter(filter); -+ tableConfig.setFilterPolicy(filter); - - userSpecifiedOptions.optimizeFiltersForHits(); - userSpecifiedOptions.setTableFormatConfig(tableConfig); diff --git a/0006-NPE-subscriptionState.patch b/0006-NPE-subscriptionState.patch deleted file mode 100644 index f268025a37e6171ebf4fd9ef0bf4503cf350876d..0000000000000000000000000000000000000000 --- a/0006-NPE-subscriptionState.patch +++ /dev/null @@ -1,51 +0,0 @@ -diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java -index 30491110a3..ce81aa1b95 100644 ---- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java -+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java -@@ -539,10 +539,13 @@ public class SubscriptionState { - - synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { - TopicPartitionState topicPartitionState = assignedState(tp); -- if (isolationLevel == IsolationLevel.READ_COMMITTED) -+ if (topicPartitionState.position == null) { -+ return null; -+ } else if (isolationLevel == IsolationLevel.READ_COMMITTED) { - return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset; -- else -+ } else { - return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset; -+ } - } - - synchronized Long partitionLead(TopicPartition tp) { -diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java -index d6e88008b5..d19234fe8a 100644 ---- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java -+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java -@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; - import org.apache.kafka.clients.consumer.OffsetAndMetadata; - import org.apache.kafka.clients.consumer.OffsetResetStrategy; - import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; -+import org.apache.kafka.common.IsolationLevel; - import org.apache.kafka.common.Node; - import org.apache.kafka.common.TopicPartition; - import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; -@@ -794,4 +795,18 @@ public class SubscriptionStateTest { - assertFalse(state.isOffsetResetNeeded(tp0)); - } - -+ @Test -+ public void nullPositionLagOnNoPosition() { -+ state.assignFromUser(Collections.singleton(tp0)); -+ -+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); -+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); -+ -+ state.updateHighWatermark(tp0, 1L); -+ state.updateLastStableOffset(tp0, 1L); -+ -+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); -+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); -+ } -+ - } diff --git a/0007-fix-payload-incorrectly.patch b/0007-fix-payload-incorrectly.patch deleted file mode 100644 index 86770bb95ba5cd5d986e6f671043707fabebcd15..0000000000000000000000000000000000000000 --- a/0007-fix-payload-incorrectly.patch +++ /dev/null @@ -1,43 +0,0 @@ -diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java -index 12a0fccea8..3c5f63df18 100644 ---- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java -+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java -@@ -112,12 +112,10 @@ public class ProducerPerformance { - - /* setup perf test */ - byte[] payload = null; -- Random random = new Random(0); - if (recordSize != null) { - payload = new byte[recordSize]; -- for (int i = 0; i < payload.length; ++i) -- payload[i] = (byte) (random.nextInt(26) + 65); - } -+ Random random = new Random(0); - ProducerRecord record; - Stats stats = new Stats(numRecords, 5000); - long startMs = System.currentTimeMillis(); -@@ -127,15 +125,20 @@ public class ProducerPerformance { - int currentTransactionSize = 0; - long transactionStartTime = 0; - for (long i = 0; i < numRecords; i++) { -+ if (payloadFilePath != null) { -+ payload = payloadByteList.get(random.nextInt(payloadByteList.size())); -+ } else if (recordSize != null) { -+ for (int j = 0; j < payload.length; ++j) -+ payload[j] = (byte) (random.nextInt(26) + 65); -+ } else { -+ throw new IllegalArgumentException("no payload File Path or record Size provided"); -+ } -+ - if (transactionsEnabled && currentTransactionSize == 0) { - producer.beginTransaction(); - transactionStartTime = System.currentTimeMillis(); - } - -- -- if (payloadFilePath != null) { -- payload = payloadByteList.get(random.nextInt(payloadByteList.size())); -- } - record = new ProducerRecord<>(topicName, payload); - - long sendStartMs = System.currentTimeMillis(); diff --git a/0008-Cast-SMT-allow-null.patch b/0008-Cast-SMT-allow-null.patch deleted file mode 100644 index 7d18970759d15352fcae601df21ca8263ac02033..0000000000000000000000000000000000000000 --- a/0008-Cast-SMT-allow-null.patch +++ /dev/null @@ -1,63 +0,0 @@ -diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java -index e872b336e8..7ffd0a90f3 100644 ---- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java -+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java -@@ -116,6 +116,10 @@ public abstract class Cast> implements Transformation - - @Override - public R apply(R record) { -+ if (operatingValue(record) == null) { -+ return record; -+ } -+ - if (operatingSchema(record) == null) { - return applySchemaless(record); - } else { -diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java -index ae90c1956b..d25fd8cf2a 100644 ---- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java -+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java -@@ -88,6 +88,43 @@ public class CastTest { - assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"))); - } - -+ @Test -+ public void castNullValueRecordWithSchema() { -+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); -+ SourceRecord original = new SourceRecord(null, null, "topic", 0, -+ Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null); -+ SourceRecord transformed = xformValue.apply(original); -+ assertEquals(original, transformed); -+ } -+ -+ @Test -+ public void castNullValueRecordSchemaless() { -+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); -+ SourceRecord original = new SourceRecord(null, null, "topic", 0, -+ Schema.STRING_SCHEMA, "key", null, null); -+ SourceRecord transformed = xformValue.apply(original); -+ assertEquals(original, transformed); -+ } -+ -+ @Test -+ public void castNullKeyRecordWithSchema() { -+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); -+ SourceRecord original = new SourceRecord(null, null, "topic", 0, -+ Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value"); -+ SourceRecord transformed = xformKey.apply(original); -+ assertEquals(original, transformed); -+ } -+ -+ @Test -+ public void castNullKeyRecordSchemaless() { -+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); -+ SourceRecord original = new SourceRecord(null, null, "topic", 0, -+ null, null, Schema.STRING_SCHEMA, "value"); -+ SourceRecord transformed = xformKey.apply(original); -+ assertEquals(original, transformed); -+ } -+ -+ - @Test - public void castWholeRecordKeyWithSchema() { - xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); diff --git a/0009-format-RocksDBConfigSetter.patch b/0009-format-RocksDBConfigSetter.patch deleted file mode 100644 index 1ea90dd752f7e6427973465bcdd128c723a5bc74..0000000000000000000000000000000000000000 --- a/0009-format-RocksDBConfigSetter.patch +++ /dev/null @@ -1,67 +0,0 @@ -diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html -index aa817ed47e..738d3f72ec 100644 ---- a/docs/streams/developer-guide/config-streams.html -+++ b/docs/streams/developer-guide/config-streams.html -@@ -698,34 +698,35 @@ -

The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default - configuration for RocksDB, you can implement RocksDBConfigSetter and provide your custom class via rocksdb.config.setter.

-

Here is an example that adjusts the memory size consumed by RocksDB.

--
    public static class CustomRocksDBConfig implements RocksDBConfigSetter {
--                    // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
--                    private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
-+              
-+
-+public static class CustomRocksDBConfig implements RocksDBConfigSetter {
-+    // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
-+    private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
- 
--                    @Override
--                    public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
--                      // See #1 below.
--                      BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
--                      tableConfig.setBlockCache(cache);
--                      // See #2 below.
--                      tableConfig.setBlockSize(16 * 1024L);
--                      // See #3 below.
--                      tableConfig.setCacheIndexAndFilterBlocks(true);
--                      options.setTableFormatConfig(tableConfig);
--                      // See #4 below.
--                      options.setMaxWriteBufferNumber(2);
--                    }
--
--                    @Override
--                    public void close(final String storeName, final Options options) {
--                      // See #5 below.
--                      cache.close();
--                    }
--                    }
--
--                    Properties streamsSettings = new Properties();
--                    streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
--                    
-+ @Override -+ public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { -+ // See #1 below. -+ BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); -+ tableConfig.setBlockCache(cache); -+ // See #2 below. -+ tableConfig.setBlockSize(16 * 1024L); -+ // See #3 below. -+ tableConfig.setCacheIndexAndFilterBlocks(true); -+ options.setTableFormatConfig(tableConfig); -+ // See #4 below. -+ options.setMaxWriteBufferNumber(2); -+ } -+ @Override -+ public void close(final String storeName, final Options options) { -+ // See #5 below. -+ cache.close(); -+ } -+} -+Properties streamsSettings = new Properties(); -+streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class); -+
-+
-
-
-
Notes for example:
diff --git a/0010-not-update-connection.patch b/0010-not-update-connection.patch deleted file mode 100644 index 926c24b389e2b3b478f3662acc0ce3d49861709e..0000000000000000000000000000000000000000 --- a/0010-not-update-connection.patch +++ /dev/null @@ -1,83 +0,0 @@ -diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java -index 1e710fdf6a..33f587f480 100644 ---- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java -+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java -@@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable { - attemptRead(channel); - } - -- if (channel.hasBytesBuffered()) { -+ if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) { - //this channel has bytes enqueued in intermediary buffers that we could not read - //(possibly because no memory). it may be the case that the underlying socket will - //not come up in the next poll() and so we need to remember this channel for the -@@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable { - private void mute(KafkaChannel channel) { - channel.mute(); - explicitlyMutedChannels.add(channel); -+ keysWithBufferedRead.remove(channel.selectionKey()); - } - - @Override -@@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable { - // Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted. - if (channel.maybeUnmute()) { - explicitlyMutedChannels.remove(channel); -+ if (channel.hasBytesBuffered()) { -+ keysWithBufferedRead.add(channel.selectionKey()); -+ } - } - } - -diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala -index 293614432c..fbc5563392 100644 ---- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala -+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala -@@ -1560,8 +1560,15 @@ class SocketServerTest { - val testableSelector = testableServer.testableSelector - testableSelector.updateMinWakeup(2) - -+ val sleepTimeMs = idleTimeMs / 2 + 1 - val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) -- time.sleep(idleTimeMs + 1) -+ // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated -+ // additional calls to poll() should not update the channel last idle time -+ for (_ <- 0 to 3) { -+ time.sleep(sleepTimeMs) -+ testableSelector.operationCounts.clear() -+ testableSelector.waitForOperations(SelectorOperation.Poll, 1) -+ } - testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false) - - val otherSocket = sslConnect(testableServer) -@@ -1574,6 +1581,30 @@ class SocketServerTest { - } - } - -+ @Test -+ def testUnmuteChannelWithBufferedReceives(): Unit = { -+ val time = new MockTime() -+ props ++= sslServerProps -+ val testableServer = new TestableSocketServer(time = time) -+ testableServer.startup() -+ val proxyServer = new ProxyServer(testableServer) -+ try { -+ val testableSelector = testableServer.testableSelector -+ val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) -+ testableSelector.operationCounts.clear() -+ testableSelector.waitForOperations(SelectorOperation.Poll, 1) -+ val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead") -+ assertEquals(Set.empty, keysWithBufferedRead.asScala) -+ processRequest(testableServer.dataPlaneRequestChannel, request) -+ // buffered requests should be processed after channel is unmuted -+ receiveRequest(testableServer.dataPlaneRequestChannel) -+ socket.close() -+ } finally { -+ proxyServer.close() -+ shutdownServerAndMetrics(testableServer) -+ } -+ } -+ - /** - * Tests exception handling in [[Processor.processCompletedReceives]]. Exception is - * injected into [[Selector.mute]] which is used to mute the channel when a receive is complete. diff --git a/0011-ConfigEntry.patch b/0011-ConfigEntry.patch deleted file mode 100644 index 2b8367d7443a717d1fd6649b95bd2b453ec0b734..0000000000000000000000000000000000000000 --- a/0011-ConfigEntry.patch +++ /dev/null @@ -1,92 +0,0 @@ -diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java -index 8f058a2d6c..8f3b336911 100644 ---- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java -+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java -@@ -171,11 +171,13 @@ public class ConfigEntry { - ConfigEntry that = (ConfigEntry) o; - - return this.name.equals(that.name) && -- this.value != null ? this.value.equals(that.value) : that.value == null && -+ Objects.equals(this.value, that.value) && - this.isSensitive == that.isSensitive && - this.isReadOnly == that.isReadOnly && -- this.source == that.source && -- Objects.equals(this.synonyms, that.synonyms); -+ Objects.equals(this.source, that.source) && -+ Objects.equals(this.synonyms, that.synonyms) && -+ Objects.equals(this.type, that.type) && -+ Objects.equals(this.documentation, that.documentation); - } - - @Override -@@ -183,11 +185,13 @@ public class ConfigEntry { - final int prime = 31; - int result = 1; - result = prime * result + name.hashCode(); -- result = prime * result + ((value == null) ? 0 : value.hashCode()); -- result = prime * result + (isSensitive ? 1 : 0); -+ result = prime * result + Objects.hashCode(value); -+ result = prime * result + (isSensitive ? 1 : 0); - result = prime * result + (isReadOnly ? 1 : 0); -- result = prime * result + source.hashCode(); -- result = prime * result + synonyms.hashCode(); -+ result = prime * result + Objects.hashCode(source); -+ result = prime * result + Objects.hashCode(synonyms); -+ result = prime * result + Objects.hashCode(type); -+ result = prime * result + Objects.hashCode(documentation); - return result; - } - -@@ -204,6 +208,8 @@ public class ConfigEntry { - ", isSensitive=" + isSensitive + - ", isReadOnly=" + isReadOnly + - ", synonyms=" + synonyms + -+ ", type=" + type + -+ ", documentation=" + documentation + - ")"; - } - -diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java -index 4008a54382..59d1150ac3 100644 ---- a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java -+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java -@@ -82,4 +82,19 @@ public class ConfigTest { - boolean isReadOnly, List synonyms) { - return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms, ConfigType.UNKNOWN, null); - } -+ -+ @Test -+ public void testHashCodeAndEqualsWithNull() { -+ ConfigEntry ce0 = new ConfigEntry("abc", null, null, false, false, null, null, null); -+ ConfigEntry ce1 = new ConfigEntry("abc", null, null, false, false, null, null, null); -+ assertEquals(ce0, ce1); -+ assertEquals(ce0.hashCode(), ce1.hashCode()); -+ } -+ -+ @Test -+ public void testEquals() { -+ ConfigEntry ce0 = new ConfigEntry("abc", null, ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, null, null, null); -+ ConfigEntry ce1 = new ConfigEntry("abc", null, ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, false, false, null, null, null); -+ assertNotEquals(ce0, ce1); -+ } - } -diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala -index 8603fbec78..66b7fb1cc9 100644 ---- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala -+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala -@@ -803,7 +803,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { - new AlterConfigOp(newConfigEntry("min.insync.replicas", "2"), AlterConfigOp.OpType.SET), - new AlterConfigOp(newConfigEntry("unclean.leader.election.enable", ""), AlterConfigOp.OpType.DELETE) - ) -- assertEquals(expectedConfigOps, alterConfigOps.asScala.toSet) -+ assertEquals(expectedConfigOps.size, alterConfigOps.size) -+ expectedConfigOps.foreach { expectedOp => -+ val actual = alterConfigOps.asScala.find(_.configEntry.name == expectedOp.configEntry.name) -+ assertNotEquals(actual, None) -+ assertEquals(expectedOp.opType, actual.get.opType) -+ assertEquals(expectedOp.configEntry.name, actual.get.configEntry.name) -+ assertEquals(expectedOp.configEntry.value, actual.get.configEntry.value) -+ } - alteredConfigs = true - alterResult - } diff --git a/0012-incorrectly-LeaderElectionCommand.patch b/0012-incorrectly-LeaderElectionCommand.patch deleted file mode 100644 index ede5b5f9c4244e2da4d69deaba363b3883864851..0000000000000000000000000000000000000000 --- a/0012-incorrectly-LeaderElectionCommand.patch +++ /dev/null @@ -1,13 +0,0 @@ -diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala -index 03737aa532..92edcad003 100644 ---- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala -+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala -@@ -166,7 +166,7 @@ object LeaderElectionCommand extends Logging { - } - - if (noop.nonEmpty) { -- val partitions = succeeded.mkString(", ") -+ val partitions = noop.mkString(", ") - println(s"Valid replica already elected for partitions $partitions") - } - diff --git a/0013-AlterIsr.patch b/0013-AlterIsr.patch deleted file mode 100644 index aef5085631d8f968999860a0a4888f78f4a96d29..0000000000000000000000000000000000000000 --- a/0013-AlterIsr.patch +++ /dev/null @@ -1,223 +0,0 @@ -diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala -index a58f4238ff..88b337311d 100755 ---- a/core/src/main/scala/kafka/cluster/Partition.scala -+++ b/core/src/main/scala/kafka/cluster/Partition.scala -@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition, - leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) - zkVersion = partitionState.zkVersion - -- // Clear any pending AlterIsr requests and check replica state -- alterIsrManager.clearPending(topicPartition) -- - // In the case of successive leader elections in a short time period, a follower may have - // entries in its log from a later epoch than any entry in the new leader's log. In order - // to ensure that these followers can truncate to the right offset, we must cache the new -@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition, - leaderEpochStartOffsetOpt = None - zkVersion = partitionState.zkVersion - -- // Since we might have been a leader previously, still clear any pending AlterIsr requests -- alterIsrManager.clearPending(topicPartition) -- - if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) { - false - } else { -@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition, - isrState = proposedIsrState - - if (!alterIsrManager.submit(alterIsrItem)) { -- // If the ISR manager did not accept our update, we need to revert back to previous state -+ // If the ISR manager did not accept our update, we need to revert the proposed state. -+ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or -+ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight. - isrState = oldState - isrChangeListener.markFailed() -- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition") -+ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition") -+ } else { -+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState") - } -- -- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState") - } - - /** -diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala -index 9ad734f708..1059a3df3e 100644 ---- a/core/src/main/scala/kafka/server/AlterIsrManager.scala -+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala -@@ -49,8 +49,6 @@ trait AlterIsrManager { - def shutdown(): Unit = {} - - def submit(alterIsrItem: AlterIsrItem): Boolean -- -- def clearPending(topicPartition: TopicPartition): Unit - } - - case class AlterIsrItem(topicPartition: TopicPartition, -@@ -134,9 +132,6 @@ class DefaultAlterIsrManager( - enqueued - } - -- override def clearPending(topicPartition: TopicPartition): Unit = { -- unsentIsrUpdates.remove(topicPartition) -- } - - private[server] def maybePropagateIsrChanges(): Unit = { - // Send all pending items if there is not already a request in-flight. -diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala -index 2d88aac6b4..8dffcdf307 100644 ---- a/core/src/main/scala/kafka/server/ZkIsrManager.scala -+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala -@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex - period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS) - } - -- override def clearPending(topicPartition: TopicPartition): Unit = { -- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to -- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK -- // has already happened, so we may as well send the notification to the controller. -- } -- - override def submit(alterIsrItem: AlterIsrItem): Boolean = { - debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " + - s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}") -diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala -index 5eedb63ae5..4dbd735753 100644 ---- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala -+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala -@@ -18,10 +18,10 @@ - package kafka.controller - - import java.util.Properties --import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue} -- -+import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit} - import com.yammer.metrics.core.Timer - import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr} -+import kafka.controller.KafkaController.AlterIsrCallback - import kafka.metrics.KafkaYammerMetrics - import kafka.server.{KafkaConfig, KafkaServer} - import kafka.utils.{LogCaptureAppender, TestUtils} -@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { - latch.await() - } - -+ @Test -+ def testAlterIsrErrors(): Unit = { -+ servers = makeServers(1) -+ val controllerId = TestUtils.waitUntilControllerElected(zkClient) -+ val tp = new TopicPartition("t", 0) -+ val assignment = Map(tp.partition -> Seq(controllerId)) -+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) -+ val controller = getController().kafkaController -+ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1, -+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId)))) -+ var capturedError = future.get(5, TimeUnit.SECONDS) -+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError) -+ -+ future = captureAlterIsrError(99, controller.brokerEpoch, -+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId)))) -+ capturedError = future.get(5, TimeUnit.SECONDS) -+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError) -+ -+ val unknownTopicPartition = new TopicPartition("unknown", 99) -+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch, -+ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition) -+ capturedError = future.get(5, TimeUnit.SECONDS) -+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError) -+ -+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch, -+ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp) -+ capturedError = future.get(5, TimeUnit.SECONDS) -+ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError) -+ } -+ -+ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = { -+ val future = new CompletableFuture[Errors]() -+ val controller = getController().kafkaController -+ val callback: AlterIsrCallback = { -+ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) => -+ future.completeExceptionally(new AssertionError(s"Should have seen top-level error")) -+ case Right(error: Errors) => -+ future.complete(error) -+ } -+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback)) -+ future -+ } -+ -+ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = { -+ val future = new CompletableFuture[Errors]() -+ val controller = getController().kafkaController -+ val callback: AlterIsrCallback = { -+ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) => -+ partitionResults.get(tp) match { -+ case Some(Left(error: Errors)) => future.complete(error) -+ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result")) -+ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result")) -+ } -+ case Right(_: Errors) => -+ future.completeExceptionally(new AssertionError(s"Should not seen top-level error")) -+ } -+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback)) -+ future -+ } -+ -+ - @Test - def testTopicIdsAreAdded(): Unit = { - servers = makeServers(1) -diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala -index 1074fd3157..1c8c81471f 100644 ---- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala -+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala -@@ -70,8 +70,10 @@ class AlterIsrManagerTest { - @Test - def testOverwriteWithinBatch(): Unit = { - val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]() -+ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() -+ - EasyMock.expect(brokerToController.start()) -- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once() -+ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2) - EasyMock.replay(brokerToController) - - val scheduler = new MockScheduler(time) -@@ -81,11 +83,21 @@ class AlterIsrManagerTest { - // Only send one ISR update for a given topic+partition - assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))) - assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0))) -+ -+ // Simulate response -+ val alterIsrResp = partitionResponse(tp0, Errors.NONE) -+ val resp = new ClientResponse(null, null, "", 0L, 0L, -+ false, null, null, alterIsrResp) -+ callbackCapture.getValue.onComplete(resp) -+ -+ // Now we can submit this partition again -+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0))) - EasyMock.verify(brokerToController) - -+ // Make sure we sent the right request ISR={1} - val request = capture.getValue.build() - assertEquals(request.data().topics().size(), 1) -- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3) -+ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1) - } - - @Test -diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala -index 43df2b97f4..8e52007bc7 100755 ---- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala -+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala -@@ -1106,10 +1106,6 @@ object TestUtils extends Logging { - } - } - -- override def clearPending(topicPartition: TopicPartition): Unit = { -- inFlight.set(false); -- } -- - def completeIsrUpdate(newZkVersion: Int): Unit = { - if (inFlight.compareAndSet(true, false)) { - val item = isrUpdates.head diff --git a/0014-override-toString.patch b/0014-override-toString.patch deleted file mode 100644 index 2a6005dbb405b3bcffa72bbea2464e2af1a1f0f3..0000000000000000000000000000000000000000 --- a/0014-override-toString.patch +++ /dev/null @@ -1,16 +0,0 @@ -diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java -index 58a9ce3d5a..daa5a0bf70 100644 ---- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java -+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java -@@ -521,6 +521,11 @@ public class WorkerConfig extends AbstractConfig { - } - } - } -+ -+ @Override -+ public String toString() { -+ return "List of comma-separated URIs, ex: http://localhost:8080,https://localhost:8443."; -+ } - } - - private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { diff --git a/0015-SessionWindows-closed-early.patch b/0015-SessionWindows-closed-early.patch deleted file mode 100644 index dfd92e5922fbca0194444c22274841b31f729714..0000000000000000000000000000000000000000 --- a/0015-SessionWindows-closed-early.patch +++ /dev/null @@ -1,103 +0,0 @@ -diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java -index 5648e8f0a3..24e9e21ad7 100644 ---- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java -+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java -@@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce - - final long timestamp = context().timestamp(); - observedStreamTime = Math.max(observedStreamTime, timestamp); -- final long closeTime = observedStreamTime - windows.gracePeriodMs(); -+ final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); - - final List, Agg>> merged = new ArrayList<>(); - final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); -diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java -index ab2adbfbb1..244ea9f4fe 100644 ---- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java -+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java -@@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest { - context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); - processor.process("OnTime1", "1"); - -- // dummy record to advance stream time = 1 -- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); -- processor.process("dummy", "dummy"); -+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window -+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); -+ processor.process("dummy", "dummy"); - - try (final LogCaptureAppender appender = - LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) { -@@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest { - assertThat( - appender.getMessages(), - hasItem("Skipping record for expired window." + -- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]") -+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]") - ); - } - -@@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest { - context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); - processor.process("OnTime1", "1"); - -- // dummy record to advance stream time = 1 -- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); -- processor.process("dummy", "dummy"); -+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window -+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); -+ processor.process("dummy", "dummy"); - - // delayed record arrives on time, should not be skipped - context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); - processor.process("OnTime2", "1"); - -- // dummy record to advance stream time = 2 -- context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null)); -- processor.process("dummy", "dummy"); -+ // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window -+ context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders())); -+ processor.process("dummy", "dummy"); - - // delayed record arrives late - context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); -@@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest { - assertThat( - appender.getMessages(), - hasItem("Skipping record for expired window." + -- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]") -+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]") - ); - } - -diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java -index 46a8ab8dcf..e0b7957e01 100644 ---- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java -+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java -@@ -581,7 +581,7 @@ public class SuppressScenarioTest { - // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace - inputTopic.pipeInput("k1", "v1", 1L); - // any record in the same partition advances stream time (note the key is different) -- inputTopic.pipeInput("k2", "v1", 6L); -+ inputTopic.pipeInput("k2", "v1", 11L); - // late event for first window - this should get dropped from all streams, since the first window is now closed. - inputTopic.pipeInput("k1", "v1", 5L); - // just pushing stream time forward to flush the other events through. -@@ -594,7 +594,7 @@ public class SuppressScenarioTest { - new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L), - new KeyValueTimestamp<>("[k1@0/5]", null, 5L), - new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), -- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L), -+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L), - new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L) - ) - ); -@@ -602,7 +602,7 @@ public class SuppressScenarioTest { - drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), - asList( - new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), -- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L) -+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L) - ) - ); - } diff --git a/0016-non-existent-URL.patch b/0016-non-existent-URL.patch deleted file mode 100644 index 01f3cbdc5185fe66562d1ee08bc798b6ebf18da0..0000000000000000000000000000000000000000 --- a/0016-non-existent-URL.patch +++ /dev/null @@ -1,42 +0,0 @@ -diff --git a/build.gradle b/build.gradle -index 5e8b82237a..0a181019cb 100644 ---- a/build.gradle -+++ b/build.gradle -@@ -2296,4 +2296,37 @@ task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) { - options.links "https://docs.oracle.com/en/java/javase/${JavaVersion.current().majorVersion}/docs/api/" - else - options.links "https://docs.oracle.com/javase/8/docs/api/" -+ // TODO: remove this snippet once JDK >11 is used or https://bugs.openjdk.java.net/browse/JDK-8215291 is backported to JDK11 -+ // Patch to include `getURLPrefix` from JDK 12 + -+ // NOTICE: This code was copied from original ORACLE search.js file present in JDK 12 and newer -+ final SEARCH_PATCH_MODULE_LESS_AWARE = "\n\n" + -+ "// Fix for moudle-less aware search\n" + -+ "function getURLPrefix(ui) {\n" + -+ " var urlPrefix=\"\";\n" + -+ " var slash = \"/\";\n" + -+ " if (ui.item.category === catModules) {\n" + -+ " return ui.item.l + slash;\n" + -+ " } else if (ui.item.category === catPackages && ui.item.m) {\n" + -+ " return ui.item.m + slash;\n" + -+ " } else if (ui.item.category === catTypes || ui.item.category === catMembers) {\n" + -+ " if (ui.item.m) {\n" + -+ " urlPrefix = ui.item.m + slash;\n" + -+ " } else {\n" + -+ " \$.each(packageSearchIndex, function(index, item) {\n" + -+ " if (item.m && ui.item.p === item.l) {\n" + -+ " urlPrefix = item.m + slash;\n" + -+ " }\n" + -+ " });\n" + -+ " }\n" + -+ " }\n" + -+ " return urlPrefix;\n" + -+ "}" -+ -+ // When all the JavaDoc is generated we proceed to patch the search.js file -+ doLast { -+ def searchJsFile = new File(destinationDir.getAbsolutePath() + '/search.js') -+ // Append the patch to the file. By being defined at a later position, JS will execute that definition instead of -+ // the one provided by default (higher up in the file). -+ searchJsFile.append SEARCH_PATCH_MODULE_LESS_AWARE -+ } - } diff --git a/0017-fix-log-clean.patch b/0017-fix-log-clean.patch deleted file mode 100644 index 8d52ef8106aeedcde9356b6c025a2e21251b7eaa..0000000000000000000000000000000000000000 --- a/0017-fix-log-clean.patch +++ /dev/null @@ -1,74 +0,0 @@ -diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala -index 7a8a13c6e7..177b460d38 100644 ---- a/core/src/main/scala/kafka/log/LogCleaner.scala -+++ b/core/src/main/scala/kafka/log/LogCleaner.scala -@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int, - logSize + segs.head.size <= maxSize && - indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize && - timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && -- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { -+ //if first segment size is 0, we don't need to do the index offset range check. -+ //this will avoid empty log left every 2^31 message. -+ (segs.head.size == 0 || -+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) { - group = segs.head :: group - logSize += segs.head.size - indexSize += segs.head.offsetIndex.sizeInBytes -diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala -index 43bc3b9f28..e5984c4f31 100755 ---- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala -+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala -@@ -1258,6 +1258,53 @@ class LogCleanerTest { - "All but the last group should be the target size.") - } - -+ @Test -+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={ -+ val cleaner = makeCleaner(Int.MaxValue) -+ val logProps = new Properties() -+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) -+ -+ val k="key".getBytes() -+ val v="val".getBytes() -+ -+ //create 3 segments -+ for(i <- 0 until 3){ -+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) -+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment -+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 ) -+ log.appendAsFollower(records) -+ assertEquals(i + 1, log.numberOfSegments) -+ } -+ -+ //4th active segment, not clean -+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) -+ -+ val totalSegments = 4 -+ //last segment not cleanable -+ val firstUncleanableOffset = log.logEndOffset - 1 -+ val notCleanableSegments = 1 -+ -+ assertEquals(totalSegments, log.numberOfSegments) -+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) -+ //because index file uses 4 byte relative index offset and current segments all none empty, -+ //segments will not group even their size is very small. -+ assertEquals(totalSegments - notCleanableSegments, groups.size) -+ //do clean to clean first 2 segments to empty -+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) -+ assertEquals(totalSegments, log.numberOfSegments) -+ assertEquals(0, log.logSegments.head.size) -+ -+ //after clean we got 2 empty segment, they will group together this time -+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) -+ val noneEmptySegment = 1 -+ assertEquals(noneEmptySegment + 1, groups.size) -+ -+ //trigger a clean and 2 empty segments should cleaned to 1 -+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) -+ assertEquals(totalSegments - 1, log.numberOfSegments) -+ } -+ -+ - /** - * Validate the logic for grouping log segments together for cleaning when only a small number of - * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not diff --git a/gradle-6.8.1-all.zip b/gradle-8.2.1-all.zip similarity index 75% rename from gradle-6.8.1-all.zip rename to gradle-8.2.1-all.zip index 4b31ee74ae948ecc4633d46109a69b5f164b93d8..396a58fd996cd771d5c2592903f77e41070d295e 100644 Binary files a/gradle-6.8.1-all.zip and b/gradle-8.2.1-all.zip differ diff --git a/kafka-2.8.2-src.tgz b/kafka-3.6.2-src.tgz similarity index 49% rename from kafka-2.8.2-src.tgz rename to kafka-3.6.2-src.tgz index a850e74faea425a754bbd005138b0a646db5f822..0910e029a901aff40e01f6d934ae32083279dddf 100644 Binary files a/kafka-2.8.2-src.tgz and b/kafka-3.6.2-src.tgz differ diff --git a/kafka.spec b/kafka.spec index 1bf6c73d7e4b45ad052ae38880de261219b3b655..f016da4ac781a5554507eaafb297c107d97a16ac 100644 --- a/kafka.spec +++ b/kafka.spec @@ -3,32 +3,16 @@ %define kafka_home /opt/kafka Name: kafka -Version: 2.8.2 -Release: 17 +Version: 3.6.2 +Release: 1 Summary: A Distributed Streaming Platform. License: Apache-2.0 Source0: https://archive.apache.org/dist/%{name}/%{version}/%{name}-%{version}-src.tgz -Source1: https://mirrors.huaweicloud.com/gradle/gradle-6.8.1-all.zip +Source1: https://mirrors.huaweicloud.com/gradle/gradle-8.2.1-all.zip Source2: kafka.service Source3: gradle-wrapper.jar Patch0: 0001-adopt-huaweimaven.patch -Patch1: 0002-CVE-2022-41881.patch -Patch2: 0003-CVE-2023-34455.patch -Patch3: 0004-CVE-2022-42004.patch -Patch4: 0005-CVE-2016-3189.patch -Patch5: 0006-NPE-subscriptionState.patch -Patch6: 0007-fix-payload-incorrectly.patch -Patch7: 0008-Cast-SMT-allow-null.patch -Patch8: 0009-format-RocksDBConfigSetter.patch -Patch9: 0010-not-update-connection.patch -Patch10: 0011-ConfigEntry.patch -Patch11: 0012-incorrectly-LeaderElectionCommand.patch -Patch12: 0013-AlterIsr.patch -Patch13: 0014-override-toString.patch -Patch14: 0015-SessionWindows-closed-early.patch -Patch15: 0016-non-existent-URL.patch -Patch16: 0017-fix-log-clean.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -46,12 +30,12 @@ exit 0 %prep %autosetup -p1 -n %{name}-%{version}-src -cp -r $RPM_SOURCE_DIR/gradle-6.8.1-all.zip %{_builddir}/kafka-%{version}-src/gradle/wrapper/ +cp -r $RPM_SOURCE_DIR/gradle-8.2.1-all.zip %{_builddir}/kafka-%{version}-src/gradle/wrapper/ cp -r $RPM_SOURCE_DIR/gradle-wrapper.jar %{_builddir}/kafka-%{version}-src/gradle/wrapper/ %build -unzip -q $RPM_SOURCE_DIR/gradle-6.8.1-all.zip -./gradle-6.8.1/bin/gradle --info +unzip -q $RPM_SOURCE_DIR/gradle-8.2.1-all.zip +./gradle-8.2.1/bin/gradle --info ./gradlew jar releaseTarGz @@ -80,6 +64,9 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Wed May 08 2024 dongjiao - 3.6.2-1 +- update to 3.6.2 + * Fri Dec 08 2023 sundapeng - 2.8.2-17 - log clean relative index range check of group consider empty log segment to avoid too many empty log segment left * Fri Dec 08 2023 sundapeng - 2.8.2-16