Skip to content
5 changes: 5 additions & 0 deletions be/src/runtime/cdc_client_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
argv_storage.emplace_back(java_opts);
// OOM safety net (last-wins, user opts cannot disable).
argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
// JDK17 opens for debezium ObjectSizeCalculator reflection.
argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED");
argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED");
argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED");
argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED");
argv_storage.emplace_back("-jar");
argv_storage.emplace_back(cdc_jar_path);
argv_storage.emplace_back(cdc_jar_port);
Expand Down
14 changes: 14 additions & 0 deletions fs_brokers/cdc_client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ under the License.
<assertj.version>3.27.7</assertj.version>
<awaitility.version>4.2.1</awaitility.version>
<maven-failsafe-plugin.version>3.2.5</maven-failsafe-plugin.version>
<maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
<!-- JDK17 opens for debezium ObjectSizeCalculator reflection (byte-sized queue).
Mirrors the set CdcClientMgr passes when BE forks cdc-client.jar; needed because
surefire/failsafe run the reader path directly without going through that fork. -->
<test.add.opens>--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.math=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED</test.add.opens>
</properties>

<dependencies>
Expand Down Expand Up @@ -258,6 +263,14 @@ under the License.
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<argLine>${test.add.opens}</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
Expand All @@ -268,6 +281,7 @@ under the License.
</includes>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<argLine>${test.add.opens}</argLine>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.mysql.cj.conf.ConnectionUrl;
import io.debezium.config.CommonConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -123,9 +124,35 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro
return ZoneId.systemDefault();
}

public static final String MAX_QUEUE_BYTES_SYS_PROP = "cdc.max.queue.size.in.bytes";

// Heap-adaptive byte cap for the debezium ChangeEventQueue buffer.
// heap 1G->64MB, 2G->128MB, >=4G->256MB. -D<MAX_QUEUE_BYTES_SYS_PROP> overrides
// (<=0 disables); a malformed override is logged and ignored, falling back to the cap.
private static long resolveMaxQueueSizeInBytes() {
String override = System.getProperty(MAX_QUEUE_BYTES_SYS_PROP);
if (override != null) {
try {
long bytes = Long.parseLong(override.trim());
return bytes <= 0 ? 0 : bytes;
} catch (NumberFormatException e) {
LOG.warn(
"Ignoring invalid -D{}={}, expected an integer byte count; "
+ "falling back to the adaptive cap",
MAX_QUEUE_BYTES_SYS_PROP,
override);
}
}
long target = Runtime.getRuntime().maxMemory() / 16;
return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 * 1024));
Comment thread
JNSimba marked this conversation as resolved.
}

/** Optimized debezium parameters */
public static Properties getDefaultDebeziumProps() {
Properties properties = new Properties();
properties.put(
Comment thread
JNSimba marked this conversation as resolved.
CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name(),
String.valueOf(resolveMaxQueueSizeInBytes()));
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.doris.job.cdc.DataSourceConfigKeys;

import io.debezium.config.CommonConnectorConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -128,6 +130,63 @@ void tableListEmptyWhenNeitherSet() {
assertEquals(0, result.length);
}

// ─── getDefaultDebeziumProps: queue byte cap ──────────────────────────────

private static long queueBytes(Properties props) {
return Long.parseLong(
props.getProperty(CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name()));
}

@Test
void defaultQueueBytesWithinClamp() {
long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024,
"expected clamp to [64MB, 256MB] but got " + bytes);
}

@Test
void sysPropOverridesAdaptiveValue() {
String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
try {
System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "1048576");
assertEquals(1048576L, queueBytes(ConfigUtil.getDefaultDebeziumProps()));
} finally {
restore(prev);
}
}

@Test
void negativeSysPropDisablesByteBound() {
String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
try {
System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "-1");
assertEquals(0L, queueBytes(ConfigUtil.getDefaultDebeziumProps()));
} finally {
restore(prev);
}
}

@Test
void malformedSysPropFallsBackToClamp() {
String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
try {
System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "32MB");
long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024,
"malformed override should fall back to [64MB, 256MB] but got " + bytes);
} finally {
restore(prev);
}
}

private static void restore(String prev) {
if (prev == null) {
System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
} else {
System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, prev);
}
}

// ─── server timezone parsing ──────────────────────────────────────────────

@Test
Expand Down
Loading