From 94d20ec7e97b8388f86ddc631ba8a7d07d250250 Mon Sep 17 00:00:00 2001 From: sundapeng Date: Fri, 8 Dec 2023 01:53:08 +0000 Subject: [PATCH] Cast SMT should allow null value records to pass through --- 0008-Cast-SMT-allow-null.patch | 63 ++++++++++++++++++++++++++++++++++ kafka.spec | 5 ++- 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 0008-Cast-SMT-allow-null.patch diff --git a/0008-Cast-SMT-allow-null.patch b/0008-Cast-SMT-allow-null.patch new file mode 100644 index 0000000..7d18970 --- /dev/null +++ b/0008-Cast-SMT-allow-null.patch @@ -0,0 +1,63 @@ +diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +index e872b336e8..7ffd0a90f3 100644 +--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java ++++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +@@ -116,6 +116,10 @@ public abstract class Cast> implements Transformation + + @Override + public R apply(R record) { ++ if (operatingValue(record) == null) { ++ return record; ++ } ++ + if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { +diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +index ae90c1956b..d25fd8cf2a 100644 +--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java ++++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +@@ -88,6 +88,43 @@ public class CastTest { + assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"))); + } + ++ @Test ++ public void castNullValueRecordWithSchema() { ++ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); ++ SourceRecord original = new SourceRecord(null, null, "topic", 0, ++ Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null); ++ SourceRecord transformed = xformValue.apply(original); ++ assertEquals(original, transformed); ++ } ++ ++ @Test ++ public void castNullValueRecordSchemaless() { ++ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); ++ SourceRecord original = new SourceRecord(null, null, "topic", 0, ++ Schema.STRING_SCHEMA, "key", null, null); ++ SourceRecord transformed = xformValue.apply(original); ++ assertEquals(original, transformed); ++ } ++ ++ @Test ++ public void castNullKeyRecordWithSchema() { ++ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); ++ SourceRecord original = new SourceRecord(null, null, "topic", 0, ++ Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value"); ++ SourceRecord transformed = xformKey.apply(original); ++ assertEquals(original, transformed); ++ } ++ ++ @Test ++ public void castNullKeyRecordSchemaless() { ++ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); ++ SourceRecord original = new SourceRecord(null, null, "topic", 0, ++ null, null, Schema.STRING_SCHEMA, "value"); ++ SourceRecord transformed = xformKey.apply(original); ++ assertEquals(original, transformed); ++ } ++ ++ + @Test + public void castWholeRecordKeyWithSchema() { + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); diff --git a/kafka.spec b/kafka.spec index 3727cd8..a9a0bcd 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 7 +Release: 8 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -19,6 +19,7 @@ Patch3: 0004-CVE-2022-42004.patch Patch4: 0005-CVE-2016-3189.patch Patch5: 0006-NPE-subscriptionState.patch Patch6: 0007-fix-payload-incorrectly.patch +Patch7: 0008-Cast-SMT-allow-null.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -70,6 +71,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Fri Dec 08 2023 sundapeng - 2.8.2-8 +- Cast SMT should allow null value records to pass through * Fri Dec 08 2023 sundapeng - 2.8.2-7 - Fix using random payload in ProducerPerformance incorrectly * Mon Nov 27 2023 sundapeng - 2.8.2-6 -- Gitee