diff --git a/be/src/runtime/cdc_client_mgr.cpp b/be/src/runtime/cdc_client_mgr.cpp
index b37cadc980c920..b60c2c60bb1cbe 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -214,6 +214,11 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
argv_storage.emplace_back(java_opts);
// OOM safety net (last-wins, user opts cannot disable).
argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
+ // JDK17 opens for debezium ObjectSizeCalculator reflection.
+ argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED");
+ argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED");
+ argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED");
+ argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED");
argv_storage.emplace_back("-jar");
argv_storage.emplace_back(cdc_jar_path);
argv_storage.emplace_back(cdc_jar_port);
diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml
index a88bddad6830a6..0a0c7355318f7e 100644
--- a/fs_brokers/cdc_client/pom.xml
+++ b/fs_brokers/cdc_client/pom.xml
@@ -76,6 +76,11 @@ under the License.
3.27.7
4.2.1
3.2.5
+ 3.2.5
+
+ --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.math=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED
@@ -258,6 +263,14 @@ under the License.
17
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+ ${test.add.opens}
+
+
org.apache.maven.plugins
maven-failsafe-plugin
@@ -268,6 +281,7 @@ under the License.
1
true
+ ${test.add.opens}
diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index a999f532ea9929..cc2fa9da68bbc5 100644
--- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -37,6 +37,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.mysql.cj.conf.ConnectionUrl;
+import io.debezium.config.CommonConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,9 +124,35 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro
return ZoneId.systemDefault();
}
+ public static final String MAX_QUEUE_BYTES_SYS_PROP = "cdc.max.queue.size.in.bytes";
+
+ // Heap-adaptive byte cap for the debezium ChangeEventQueue buffer.
+ // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D overrides
+ // (<=0 disables); a malformed override is logged and ignored, falling back to the cap.
+ private static long resolveMaxQueueSizeInBytes() {
+ String override = System.getProperty(MAX_QUEUE_BYTES_SYS_PROP);
+ if (override != null) {
+ try {
+ long bytes = Long.parseLong(override.trim());
+ return bytes <= 0 ? 0 : bytes;
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "Ignoring invalid -D{}={}, expected an integer byte count; "
+ + "falling back to the adaptive cap",
+ MAX_QUEUE_BYTES_SYS_PROP,
+ override);
+ }
+ }
+ long target = Runtime.getRuntime().maxMemory() / 16;
+ return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 * 1024));
+ }
+
/** Optimized debezium parameters */
public static Properties getDefaultDebeziumProps() {
Properties properties = new Properties();
+ properties.put(
+ CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name(),
+ String.valueOf(resolveMaxQueueSizeInBytes()));
return properties;
}
diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index 980f83bdc8d4b0..cf70b6b075c77d 100644
--- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -19,11 +19,13 @@
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import io.debezium.config.CommonConnectorConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -128,6 +130,63 @@ void tableListEmptyWhenNeitherSet() {
assertEquals(0, result.length);
}
+ // ─── getDefaultDebeziumProps: queue byte cap ──────────────────────────────
+
+ private static long queueBytes(Properties props) {
+ return Long.parseLong(
+ props.getProperty(CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name()));
+ }
+
+ @Test
+ void defaultQueueBytesWithinClamp() {
+ long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
+ assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024,
+ "expected clamp to [64MB, 256MB] but got " + bytes);
+ }
+
+ @Test
+ void sysPropOverridesAdaptiveValue() {
+ String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ try {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "1048576");
+ assertEquals(1048576L, queueBytes(ConfigUtil.getDefaultDebeziumProps()));
+ } finally {
+ restore(prev);
+ }
+ }
+
+ @Test
+ void negativeSysPropDisablesByteBound() {
+ String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ try {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "-1");
+ assertEquals(0L, queueBytes(ConfigUtil.getDefaultDebeziumProps()));
+ } finally {
+ restore(prev);
+ }
+ }
+
+ @Test
+ void malformedSysPropFallsBackToClamp() {
+ String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ try {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "32MB");
+ long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
+ assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024,
+ "malformed override should fall back to [64MB, 256MB] but got " + bytes);
+ } finally {
+ restore(prev);
+ }
+ }
+
+ private static void restore(String prev) {
+ if (prev == null) {
+ System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ } else {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, prev);
+ }
+ }
+
// ─── server timezone parsing ──────────────────────────────────────────────
@Test