From 433e5a86316a9f37273bfd109e6b2a12e4fa998a Mon Sep 17 00:00:00 2001 From: sundapeng Date: Fri, 8 Dec 2023 07:38:22 +0000 Subject: [PATCH] log clean relative index range check of group consider empty log segment to avoid too many empty log segment left --- 0017-fix-log-clean.patch | 74 ++++++++++++++++++++++++++++++++++++++++ kafka.spec | 5 ++- 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 0017-fix-log-clean.patch diff --git a/0017-fix-log-clean.patch b/0017-fix-log-clean.patch new file mode 100644 index 0000000..8d52ef8 --- /dev/null +++ b/0017-fix-log-clean.patch @@ -0,0 +1,74 @@ +diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala +index 7a8a13c6e7..177b460d38 100644 +--- a/core/src/main/scala/kafka/log/LogCleaner.scala ++++ b/core/src/main/scala/kafka/log/LogCleaner.scala +@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int, + logSize + segs.head.size <= maxSize && + indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize && + timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && +- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { ++ //if first segment size is 0, we don't need to do the index offset range check. ++ //this will avoid empty log left every 2^31 message. ++ (segs.head.size == 0 || ++ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) { + group = segs.head :: group + logSize += segs.head.size + indexSize += segs.head.offsetIndex.sizeInBytes +diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +index 43bc3b9f28..e5984c4f31 100755 +--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ++++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +@@ -1258,6 +1258,53 @@ class LogCleanerTest { + "All but the last group should be the target size.") + } + ++ @Test ++ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={ ++ val cleaner = makeCleaner(Int.MaxValue) ++ val logProps = new Properties() ++ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) ++ ++ val k="key".getBytes() ++ val v="val".getBytes() ++ ++ //create 3 segments ++ for(i <- 0 until 3){ ++ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) ++ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment ++ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 ) ++ log.appendAsFollower(records) ++ assertEquals(i + 1, log.numberOfSegments) ++ } ++ ++ //4th active segment, not clean ++ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) ++ ++ val totalSegments = 4 ++ //last segment not cleanable ++ val firstUncleanableOffset = log.logEndOffset - 1 ++ val notCleanableSegments = 1 ++ ++ assertEquals(totalSegments, log.numberOfSegments) ++ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) ++ //because index file uses 4 byte relative index offset and current segments all none empty, ++ //segments will not group even their size is very small. ++ assertEquals(totalSegments - notCleanableSegments, groups.size) ++ //do clean to clean first 2 segments to empty ++ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) ++ assertEquals(totalSegments, log.numberOfSegments) ++ assertEquals(0, log.logSegments.head.size) ++ ++ //after clean we got 2 empty segment, they will group together this time ++ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) ++ val noneEmptySegment = 1 ++ assertEquals(noneEmptySegment + 1, groups.size) ++ ++ //trigger a clean and 2 empty segments should cleaned to 1 ++ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) ++ assertEquals(totalSegments - 1, log.numberOfSegments) ++ } ++ ++ + /** + * Validate the logic for grouping log segments together for cleaning when only a small number of + * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not diff --git a/kafka.spec b/kafka.spec index 5ba4b7d..1bf6c73 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 16 +Release: 17 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -28,6 +28,7 @@ Patch12: 0013-AlterIsr.patch Patch13: 0014-override-toString.patch Patch14: 0015-SessionWindows-closed-early.patch Patch15: 0016-non-existent-URL.patch +Patch16: 0017-fix-log-clean.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -79,6 +80,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Fri Dec 08 2023 sundapeng - 2.8.2-17 +- log clean relative index range check of group consider empty log segment to avoid too many empty log segment left * Fri Dec 08 2023 sundapeng - 2.8.2-16 - Javadocs search sends you to a non-existent URL * Fri Dec 08 2023 sundapeng - 2.8.2-15 -- Gitee