diff --git a/be/src/runtime/cdc_client_mgr.cpp b/be/src/runtime/cdc_client_mgr.cpp index b37cadc980c920..b60c2c60bb1cbe 100644 --- a/be/src/runtime/cdc_client_mgr.cpp +++ b/be/src/runtime/cdc_client_mgr.cpp @@ -214,6 +214,11 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { argv_storage.emplace_back(java_opts); // OOM safety net (last-wins, user opts cannot disable). argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError"); + // JDK17 opens for debezium ObjectSizeCalculator reflection. + argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED"); + argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED"); + argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED"); + argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED"); argv_storage.emplace_back("-jar"); argv_storage.emplace_back(cdc_jar_path); argv_storage.emplace_back(cdc_jar_port); diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml index a88bddad6830a6..0a0c7355318f7e 100644 --- a/fs_brokers/cdc_client/pom.xml +++ b/fs_brokers/cdc_client/pom.xml @@ -76,6 +76,11 @@ under the License. 3.27.7 4.2.1 3.2.5 + 3.2.5 + + --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.math=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED @@ -258,6 +263,14 @@ under the License. 17 + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + ${test.add.opens} + + org.apache.maven.plugins maven-failsafe-plugin @@ -268,6 +281,7 @@ under the License. 1 true + ${test.add.opens} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index a999f532ea9929..cc2fa9da68bbc5 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -37,6 +37,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.mysql.cj.conf.ConnectionUrl; +import io.debezium.config.CommonConnectorConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,9 +124,35 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static final String MAX_QUEUE_BYTES_SYS_PROP = "cdc.max.queue.size.in.bytes"; + + // Heap-adaptive byte cap for the debezium ChangeEventQueue buffer. + // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D overrides + // (<=0 disables); a malformed override is logged and ignored, falling back to the cap. + private static long resolveMaxQueueSizeInBytes() { + String override = System.getProperty(MAX_QUEUE_BYTES_SYS_PROP); + if (override != null) { + try { + long bytes = Long.parseLong(override.trim()); + return bytes <= 0 ? 0 : bytes; + } catch (NumberFormatException e) { + LOG.warn( + "Ignoring invalid -D{}={}, expected an integer byte count; " + + "falling back to the adaptive cap", + MAX_QUEUE_BYTES_SYS_PROP, + override); + } + } + long target = Runtime.getRuntime().maxMemory() / 16; + return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 * 1024)); + } + /** Optimized debezium parameters */ public static Properties getDefaultDebeziumProps() { Properties properties = new Properties(); + properties.put( + CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name(), + String.valueOf(resolveMaxQueueSizeInBytes())); return properties; } diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java index 980f83bdc8d4b0..cf70b6b075c77d 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java @@ -19,11 +19,13 @@ import org.apache.doris.job.cdc.DataSourceConfigKeys; +import io.debezium.config.CommonConnectorConfig; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -128,6 +130,63 @@ void tableListEmptyWhenNeitherSet() { assertEquals(0, result.length); } + // ─── getDefaultDebeziumProps: queue byte cap ────────────────────────────── + + private static long queueBytes(Properties props) { + return Long.parseLong( + props.getProperty(CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name())); + } + + @Test + void defaultQueueBytesWithinClamp() { + long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps()); + assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024, + "expected clamp to [64MB, 256MB] but got " + bytes); + } + + @Test + void sysPropOverridesAdaptiveValue() { + String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + try { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "1048576"); + assertEquals(1048576L, queueBytes(ConfigUtil.getDefaultDebeziumProps())); + } finally { + restore(prev); + } + } + + @Test + void negativeSysPropDisablesByteBound() { + String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + try { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "-1"); + assertEquals(0L, queueBytes(ConfigUtil.getDefaultDebeziumProps())); + } finally { + restore(prev); + } + } + + @Test + void malformedSysPropFallsBackToClamp() { + String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + try { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "32MB"); + long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps()); + assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024, + "malformed override should fall back to [64MB, 256MB] but got " + bytes); + } finally { + restore(prev); + } + } + + private static void restore(String prev) { + if (prev == null) { + System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP); + } else { + System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, prev); + } + } + // ─── server timezone parsing ────────────────────────────────────────────── @Test