From 653ba2efcdb6b695cd445947dac17d0b9a299288 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 15 Jun 2026 15:08:59 +0800 Subject: [PATCH 1/6] [fix](cdc) Cap debezium ChangeEventQueue with a heap-adaptive byte limit to avoid OOM --- .../doris/cdcclient/utils/ConfigUtil.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index a999f532ea9929..1c40a4d84ba472 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -37,6 +37,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.mysql.cj.conf.ConnectionUrl; +import io.debezium.config.CommonConnectorConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,9 +124,25 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static final String MAX_QUEUE_BYTES_SYS_PROP = "cdc.max.queue.size.in.bytes"; + + // Heap-adaptive byte cap for the debezium ChangeEventQueue buffer. + // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D overrides (<=0 disables). + private static long resolveMaxQueueSizeInBytes() { + Long override = Long.getLong(MAX_QUEUE_BYTES_SYS_PROP); + if (override != null) { + return override < 0 ? 0 : override; + } + long target = Runtime.getRuntime().maxMemory() / 16; + return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 * 1024)); + } + /** Optimized debezium parameters */ public static Properties getDefaultDebeziumProps() { Properties properties = new Properties(); + properties.put( + CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name(), + String.valueOf(resolveMaxQueueSizeInBytes())); return properties; } From f5d3ece1bed75d6385cfb90696185798260d4a4b Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 15 Jun 2026 15:13:43 +0800 Subject: [PATCH 2/6] [test](cdc) Cover queue byte cap override and clamp in ConfigUtilTest --- .../doris/cdcclient/utils/ConfigUtilTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java index 9fd6a61cdce576..bb8d83426b26c4 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java @@ -19,11 +19,13 @@ import org.apache.doris.job.cdc.DataSourceConfigKeys; +import io.debezium.config.CommonConnectorConfig; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -121,4 +123,48 @@ void tableListEmptyWhenNeitherSet() { String[] result = ConfigUtil.getTableList("public", config); assertEquals(0, result.length); } + + // ─── getDefaultDebeziumProps: queue byte cap ────────────────────────────── + + private static long queueBytes(Properties props) { + return Long.parseLong( + props.getProperty(CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name())); + } + + @Test + void defaultQueueBytesWithinClamp() { + long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps()); + assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024, + "expected clamp to [64MB, 256MB] but got " + bytes); + } + + @Test + void sysPropOverridesAdaptiveValue() { + String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + try { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "1048576"); + assertEquals(1048576L, queueBytes(ConfigUtil.getDefaultDebeziumProps())); + } finally { + restore(prev); + } + } + + @Test + void negativeSysPropDisablesByteBound() { + String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + try { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "-1"); + assertEquals(0L, queueBytes(ConfigUtil.getDefaultDebeziumProps())); + } finally { + restore(prev); + } + } + + private static void restore(String prev) { + if (prev == null) { + System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + } else { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, prev); + } + } } From c651e46eb21b553d26fed4705fb8a31ff6113215 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 15 Jun 2026 15:42:51 +0800 Subject: [PATCH 3/6] [fix](cdc) Parse queue byte override explicitly and warn on malformed value --- .../apache/doris/cdcclient/utils/ConfigUtil.java | 16 +++++++++++++--- .../doris/cdcclient/utils/ConfigUtilTest.java | 13 +++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index 1c40a4d84ba472..cc2fa9da68bbc5 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -127,11 +127,21 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro public static final String MAX_QUEUE_BYTES_SYS_PROP = "cdc.max.queue.size.in.bytes"; // Heap-adaptive byte cap for the debezium ChangeEventQueue buffer. - // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D overrides (<=0 disables). + // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D overrides + // (<=0 disables); a malformed override is logged and ignored, falling back to the cap. private static long resolveMaxQueueSizeInBytes() { - Long override = Long.getLong(MAX_QUEUE_BYTES_SYS_PROP); + String override = System.getProperty(MAX_QUEUE_BYTES_SYS_PROP); if (override != null) { - return override < 0 ? 0 : override; + try { + long bytes = Long.parseLong(override.trim()); + return bytes <= 0 ? 0 : bytes; + } catch (NumberFormatException e) { + LOG.warn( + "Ignoring invalid -D{}={}, expected an integer byte count; " + + "falling back to the adaptive cap", + MAX_QUEUE_BYTES_SYS_PROP, + override); + } } long target = Runtime.getRuntime().maxMemory() / 16; return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 * 1024)); diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java index bb8d83426b26c4..ab30b099745952 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java @@ -160,6 +160,19 @@ void negativeSysPropDisablesByteBound() { } } + @Test + void malformedSysPropFallsBackToClamp() { + String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + try { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "32MB"); + long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps()); + assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024, + "malformed override should fall back to [64MB, 256MB] but got " + bytes); + } finally { + restore(prev); + } + } + private static void restore(String prev) { if (prev == null) { System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); From d6cb8b636ae07f8b2366da80008eb29cb56cd4f4 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 16 Jun 2026 15:05:17 +0800 Subject: [PATCH 4/6] [fix](cdc) Add JDK17 java.base opens for cdc_client to support debezium queue byte cap --- be/src/runtime/cdc_client_mgr.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/src/runtime/cdc_client_mgr.cpp b/be/src/runtime/cdc_client_mgr.cpp index b37cadc980c920..6b90a67c77090d 100644 --- a/be/src/runtime/cdc_client_mgr.cpp +++ b/be/src/runtime/cdc_client_mgr.cpp @@ -214,6 +214,10 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { argv_storage.emplace_back(java_opts); // OOM safety net (last-wins, user opts cannot disable). argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError"); + // JDK17 opens for debezium ObjectSizeCalculator reflection. + argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED"); + argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED"); + argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED"); argv_storage.emplace_back("-jar"); argv_storage.emplace_back(cdc_jar_path); argv_storage.emplace_back(cdc_jar_port); From fd45eacd809cc3b6ac565f332b92fc2eaa40ad44 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 16 Jun 2026 15:58:04 +0800 Subject: [PATCH 5/6] [fix](cdc) Add java.nio open for cdc_client to cover ByteBuffer columns in ObjectSizeCalculator --- be/src/runtime/cdc_client_mgr.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/runtime/cdc_client_mgr.cpp b/be/src/runtime/cdc_client_mgr.cpp index 6b90a67c77090d..b60c2c60bb1cbe 100644 --- a/be/src/runtime/cdc_client_mgr.cpp +++ b/be/src/runtime/cdc_client_mgr.cpp @@ -218,6 +218,7 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED"); argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED"); argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED"); + argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED"); argv_storage.emplace_back("-jar"); argv_storage.emplace_back(cdc_jar_path); argv_storage.emplace_back(cdc_jar_port); From 07913707a7bb244035c8f7c2c669ed5d19de28d3 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 16 Jun 2026 18:11:36 +0800 Subject: [PATCH 6/6] [fix](cdc) Add JDK17 opens to cdc_client surefire/failsafe so byte-sizing ITCases pass --- fs_brokers/cdc_client/pom.xml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml index a88bddad6830a6..0a0c7355318f7e 100644 --- a/fs_brokers/cdc_client/pom.xml +++ b/fs_brokers/cdc_client/pom.xml @@ -76,6 +76,11 @@ under the License. 3.27.7 4.2.1 3.2.5 + 3.2.5 + + --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.math=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED @@ -258,6 +263,14 @@ under the License. 17 + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + ${test.add.opens} + + org.apache.maven.plugins maven-failsafe-plugin @@ -268,6 +281,7 @@ under the License. 1 true + ${test.add.opens}