diff --git a/chunjun-connectors/chunjun-connector-oceanbase/pom.xml b/chunjun-connectors/chunjun-connector-oceanbase/pom.xml
index 93bd131d78..7ee7b50d02 100644
--- a/chunjun-connectors/chunjun-connector-oceanbase/pom.xml
+++ b/chunjun-connectors/chunjun-connector-oceanbase/pom.xml
@@ -48,6 +48,18 @@
chunjun-connector-jdbc-base
${project.version}
+
+
+ com.dtstack.chunjun
+ chunjun-connector-oracle
+ ${project.version}
+
+
+ com.dtstack.chunjun
+ chunjun-connector-mysql
+ ${project.version}
+
+
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java
new file mode 100644
index 0000000000..b355e77f7a
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java
@@ -0,0 +1,16 @@
+package com.dtstack.chunjun.connector.oceanbase.config;
+
+import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
+
+public class OceanBaseConf extends JdbcConfig {
+
+ private String oceanBaseMode = OceanBaseMode.MYSQL.name();
+
+ public String getOceanBaseMode() {
+ return oceanBaseMode;
+ }
+
+ public void setOceanBaseMode(String oceanBaseMode) {
+ this.oceanBaseMode = oceanBaseMode;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java
new file mode 100644
index 0000000000..af80222caf
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java
@@ -0,0 +1,6 @@
+package com.dtstack.chunjun.connector.oceanbase.config;
+
+public enum OceanBaseMode {
+ MYSQL,
+ ORACLE
+}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java
new file mode 100644
index 0000000000..89cb6707e8
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java
@@ -0,0 +1,97 @@
+package com.dtstack.chunjun.connector.oceanbase.converter;
+
+import com.dtstack.chunjun.config.CommonConfig;
+import com.dtstack.chunjun.connector.oracle.converter.BlobType;
+import com.dtstack.chunjun.connector.oracle.converter.ClobType;
+import com.dtstack.chunjun.connector.oracle.converter.ConvertUtil;
+import com.dtstack.chunjun.connector.oracle.converter.OracleSyncConverter;
+import com.dtstack.chunjun.converter.IDeserializationConverter;
+import com.dtstack.chunjun.element.column.BytesColumn;
+import com.dtstack.chunjun.element.column.StringColumn;
+import com.dtstack.chunjun.element.column.TimestampColumn;
+import com.dtstack.chunjun.element.column.ZonedTimestampColumn;
+
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import com.oceanbase.jdbc.Blob;
+import com.oceanbase.jdbc.Clob;
+import com.oceanbase.jdbc.extend.datatype.DataTypeUtilities;
+import com.oceanbase.jdbc.extend.datatype.TIMESTAMPLTZ;
+import com.oceanbase.jdbc.extend.datatype.TIMESTAMPTZ;
+
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+public class OceanbaseOracleSyncConverter extends OracleSyncConverter {
+
+ public OceanbaseOracleSyncConverter(RowType rowType, CommonConfig commonConfig) {
+ super(rowType, commonConfig);
+ }
+
+ @Override
+ protected IDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case VARCHAR:
+ if (type instanceof ClobType) {
+ return val -> {
+ Clob clob = (Clob) val;
+ return new StringColumn(ConvertUtil.convertClob(clob));
+ };
+ }
+ return val -> new StringColumn(val.toString());
+ case VARBINARY:
+ return val -> {
+ if (type instanceof BlobType) {
+ Blob blob = (Blob) val;
+ byte[] bytes = blob.getBytes(1, (int) blob.length());
+ return new BytesColumn(bytes);
+ } else {
+ return new BytesColumn((byte[]) val);
+ }
+ };
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int precision = ((TimestampType) type).getPrecision();
+ if (precision == 6) {
+ return val -> new TimestampColumn((Timestamp) val, 0); // java.sql.Timestamp
+ }
+ case TIMESTAMP_WITH_TIME_ZONE:
+ if (type instanceof ZonedTimestampType) {
+ final int zonedPrecision = ((ZonedTimestampType) type).getPrecision();
+ return val -> {
+ TIMESTAMPTZ timestamptz = (TIMESTAMPTZ) val;
+ Timestamp timestamp = timestamptz.timestampValue();
+ return new ZonedTimestampColumn(timestamp, zonedPrecision);
+ };
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (type instanceof LocalZonedTimestampType) {
+ final int localPrecision = ((LocalZonedTimestampType) type).getPrecision();
+ return val -> {
+ TIMESTAMPLTZ timestamptz = (TIMESTAMPLTZ) val;
+ // 重写处理12个字节情况,TIMESTAMPLTZ#toTimestamp
+ byte[] bytes = timestamptz.toBytes(); // 获取字节码
+ TimeZone timeZone;
+ if (bytes.length >= 14) {
+ // 字节数组长度足够,尝试提取时区信息
+ String tzStr =
+ DataTypeUtilities.toTimezoneStr(
+ bytes[12], bytes[13], "GMT", true);
+ timeZone = TimeZone.getTimeZone(tzStr);
+ } else {
+ // 字节数组长度不足,使用默认时区
+ timeZone = TimeZone.getDefault();
+ }
+ Timestamp timestamp =
+ new Timestamp(DataTypeUtilities.getOriginTime(bytes, timeZone));
+ timestamp.setNanos(DataTypeUtilities.getNanos(bytes, 7));
+ return new ZonedTimestampColumn(timestamp, timeZone, localPrecision);
+ };
+ }
+ }
+ return super.createInternalConverter(type);
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java
index 00f926d312..3d900df1bd 100644
--- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java
@@ -50,6 +50,7 @@ public static DataType apply(TypeConfig type) {
case "DECIMAL":
case "DECIMAL UNSIGNED":
case "NUMERIC":
+ case "NUMBER":
return DataTypes.DECIMAL(38, 18);
case "DOUBLE":
case "DOUBLE UNSIGNED":
@@ -81,6 +82,7 @@ public static DataType apply(TypeConfig type) {
case "LONGTEXT":
case "ENUM":
case "SET":
+ case "VARCHAR2":
return DataTypes.STRING();
default:
throw new UnsupportedTypeException(type);
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java
new file mode 100644
index 0000000000..6fd929939f
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java
@@ -0,0 +1,31 @@
+package com.dtstack.chunjun.connector.oceanbase.dialect;
+
+import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter;
+import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
+import com.dtstack.chunjun.converter.RawTypeMapper;
+
+import java.util.Optional;
+
+public class OceanbaseMysqlModeDialect extends MysqlDialect {
+ OceanbaseDialect oceanbaseDialect = new OceanbaseDialect();
+
+ @Override
+ public String dialectName() {
+ return oceanbaseDialect.dialectName();
+ }
+
+ @Override
+ public boolean canHandle(String url) {
+ return oceanbaseDialect.canHandle(url);
+ }
+
+ @Override
+ public RawTypeMapper getRawTypeConverter() {
+ return MysqlRawTypeConverter::apply;
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return oceanbaseDialect.defaultDriverName();
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java
new file mode 100644
index 0000000000..2e68814121
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java
@@ -0,0 +1,47 @@
+package com.dtstack.chunjun.connector.oceanbase.dialect;
+
+import com.dtstack.chunjun.config.CommonConfig;
+import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
+import com.dtstack.chunjun.connector.oceanbase.converter.OceanbaseOracleSyncConverter;
+import com.dtstack.chunjun.connector.oracle.converter.OracleRawTypeConverter;
+import com.dtstack.chunjun.connector.oracle.dialect.OracleDialect;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+import com.dtstack.chunjun.converter.RawTypeMapper;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.vertx.core.json.JsonArray;
+
+import java.sql.ResultSet;
+import java.util.Optional;
+
+public class OceanbaseOracleModeDialect extends OracleDialect {
+ OceanbaseDialect oceanbaseDialect = new OceanbaseDialect();
+
+ @Override
+ public String dialectName() {
+ return oceanbaseDialect.dialectName();
+ }
+
+ @Override
+ public boolean canHandle(String url) {
+ return oceanbaseDialect.canHandle(url);
+ }
+
+ @Override
+ public RawTypeMapper getRawTypeConverter() {
+ return OracleRawTypeConverter::apply;
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return oceanbaseDialect.defaultDriverName();
+ }
+
+ @Override
+ public AbstractRowConverter
+ getColumnConverter(RowType rowType, CommonConfig commonConfig) {
+ return new OceanbaseOracleSyncConverter(rowType, commonConfig);
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java
index 2e938d2da3..57e83ffff1 100644
--- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java
@@ -18,11 +18,31 @@
package com.dtstack.chunjun.connector.oceanbase.sink;
import com.dtstack.chunjun.config.SyncConfig;
+import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
-import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
+import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf;
+import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
+import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
+import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;
public class OceanbaseSinkFactory extends JdbcSinkFactory {
+
+ private OceanBaseConf oceanBaseConf;
+
public OceanbaseSinkFactory(SyncConfig syncConfig) {
- super(syncConfig, new OceanbaseDialect());
+ super(syncConfig, new OceanbaseMysqlModeDialect());
+ this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig;
+ if (oceanBaseConf != null) {
+ OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
+ // 若是for oracle模式
+ if (mode == OceanBaseMode.ORACLE) {
+ this.jdbcDialect = new OceanbaseOracleModeDialect();
+ }
+ }
+ }
+
+ @Override
+ protected Class extends JdbcConfig> getConfClass() {
+ return OceanBaseConf.class;
}
}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java
index e39f779162..cbd7a819e9 100644
--- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java
@@ -18,20 +18,76 @@
package com.dtstack.chunjun.connector.oceanbase.source;
import com.dtstack.chunjun.config.SyncConfig;
+import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
-import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
+import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf;
+import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
+import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
+import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.commons.lang3.StringUtils;
+import java.util.Properties;
+
public class OceanbaseSourceFactory extends JdbcSourceFactory {
+ // 默认是Mysql流式拉取
+ private static final int DEFAULT_FETCH_SIZE = Integer.MIN_VALUE;
+ private static final String ORACLE_JDBC_READ_TIMEOUT = "oracle.jdbc.ReadTimeout";
+ private static final String ORACLE_NET_CONNECT_TIMEOUT = "oracle.net.CONNECT_TIMEOUT";
+
+ private OceanBaseConf oceanBaseConf;
+
public OceanbaseSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) {
- super(syncConfig, env, new OceanbaseDialect());
- if (jdbcConfig.isPolling()
- && StringUtils.isEmpty(jdbcConfig.getStartLocation())
- && jdbcConfig.getFetchSize() == 0) {
- jdbcConfig.setFetchSize(1000);
+ super(syncConfig, env, new OceanbaseMysqlModeDialect()); // 默认为mysql方言
+ this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig;
+ if (oceanBaseConf != null) {
+ OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
+ // 若是for oracle模式
+ if (mode == OceanBaseMode.ORACLE) {
+ // 设置for oracle方言
+ this.jdbcDialect = new OceanbaseOracleModeDialect();
+ Properties properties = jdbcConfig.getProperties();
+ if (properties == null) {
+ properties = new Properties();
+ }
+ if (jdbcConfig.getConnectTimeOut() != 0) {
+ // queryTimeOut单位是秒 需要转换成毫秒
+ properties.putIfAbsent(
+ ORACLE_JDBC_READ_TIMEOUT,
+ String.valueOf(jdbcConfig.getQueryTimeOut() * 1000));
+ properties.putIfAbsent(
+ ORACLE_NET_CONNECT_TIMEOUT,
+ String.valueOf(jdbcConfig.getQueryTimeOut() * 3 * 1000));
+ jdbcConfig.setProperties(properties);
+ }
+ } else {
+ // 其他情况:for mysql模式 初始化
+ // 避免result.next阻塞
+ if (jdbcConfig.isPolling()
+ && StringUtils.isEmpty(jdbcConfig.getStartLocation())
+ && jdbcConfig.getFetchSize() == 0) {
+ jdbcConfig.setFetchSize(1000);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Class extends JdbcConfig> getConfClass() {
+ return OceanBaseConf.class;
+ }
+
+ @Override
+ protected int getDefaultFetchSize() {
+ if (oceanBaseConf != null) {
+ OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
+ // 处理for oracle情况
+ if (mode == OceanBaseMode.ORACLE) {
+ return super.getDefaultFetchSize();
+ }
}
+ return DEFAULT_FETCH_SIZE;
}
}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java
new file mode 100644
index 0000000000..7e1ca3a288
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java
@@ -0,0 +1,12 @@
+package com.dtstack.chunjun.connector.oceanbase.table;
+
+import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
+import com.dtstack.chunjun.connector.mysql.table.MysqlDynamicTableFactory;
+import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
+
+public class MysqlDynamicTableFactoryProxy extends MysqlDynamicTableFactory {
+ @Override
+ protected JdbcDialect getDialect() {
+ return new OceanbaseMysqlModeDialect();
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java
index bbef9a80cc..31eb73f1dc 100644
--- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java
@@ -17,12 +17,29 @@
*/
package com.dtstack.chunjun.connector.oceanbase.table;
-import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
+import com.dtstack.chunjun.connector.jdbc.options.JdbcCommonOptions;
import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory;
-import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
+import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
-public class OceanbaseDynamicTableFactory extends JdbcDynamicTableFactory {
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Set;
+
+public class OceanbaseDynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
private static final String IDENTIFIER = "oceanbase-x";
+ private JdbcDynamicTableFactory factory;
@Override
public String factoryIdentifier() {
@@ -30,7 +47,84 @@ public String factoryIdentifier() {
}
@Override
- protected JdbcDialect getDialect() {
- return new OceanbaseDialect();
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ try {
+ JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(context);
+ return dynamicTableSourceProxy.createDynamicTableSource(context);
+ } catch (Exception e) {
+ throw new RuntimeException("Create DynamicTableSource failed ", e);
+ }
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ try {
+ JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(null);
+ return dynamicTableSourceProxy.requiredOptions();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ try {
+ JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(null);
+ return dynamicTableSourceProxy.optionalOptions();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ try {
+ JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(context);
+ return dynamicTableSourceProxy.createDynamicTableSink(context);
+ } catch (Exception e) {
+ throw new RuntimeException("Create DynamicTableSink failed", e);
+ }
+ }
+
+ private JdbcDynamicTableFactory getDynamicTableSourceProxy(Context context) throws Exception {
+ if (factory == null) {
+ if (context == null) {
+ return new MysqlDynamicTableFactoryProxy();
+ }
+ OceanBaseMode mode;
+ try {
+ mode = getCompatibilityMode(context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (mode == OceanBaseMode.MYSQL) {
+ factory = new MysqlDynamicTableFactoryProxy();
+ } else if (mode == OceanBaseMode.ORACLE) {
+ factory = new OracleDynamicTableFactoryProxy();
+ } else {
+ throw new Exception("Nonsupport such mode " + mode);
+ }
+ }
+ return factory;
+ }
+
+ private OceanBaseMode getCompatibilityMode(DynamicTableFactory.Context context) {
+ Map options = context.getCatalogTable().getOptions();
+ String password = options.get(JdbcCommonOptions.PASSWORD.key());
+ String url = options.get(JdbcCommonOptions.URL.key());
+ String username = options.get(JdbcCommonOptions.USERNAME.key());
+ String mode;
+ try {
+ Class.forName("com.oceanbase.jdbc.Driver");
+
+ Connection connection = DriverManager.getConnection(url, username, password);
+ Statement sm = connection.createStatement();
+ ResultSet rs = sm.executeQuery("SHOW GLOBAL VARIABLES like 'ob_compatibility_mode'");
+ rs.next();
+ mode = rs.getString(2);
+ } catch (ClassNotFoundException | SQLException e) {
+ throw new RuntimeException("Get oceanbase compatibility mode failed", e);
+ }
+ return OceanBaseMode.valueOf(mode);
}
}
diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java
new file mode 100644
index 0000000000..1887780b7a
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java
@@ -0,0 +1,12 @@
+package com.dtstack.chunjun.connector.oceanbase.table;
+
+import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
+import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;
+import com.dtstack.chunjun.connector.oracle.table.OracleDynamicTableFactory;
+
+public class OracleDynamicTableFactoryProxy extends OracleDynamicTableFactory {
+ @Override
+ protected JdbcDialect getDialect() {
+ return new OceanbaseOracleModeDialect();
+ }
+}
diff --git a/chunjun-examples/json/oceanbase/oceanbase_stream.json b/chunjun-examples/json/oceanbase/oceanbase_stream.json
new file mode 100644
index 0000000000..c9dbfc3ee5
--- /dev/null
+++ b/chunjun-examples/json/oceanbase/oceanbase_stream.json
@@ -0,0 +1,54 @@
+{
+ "job":{
+ "content":[
+ {
+ "reader":{
+ "parameter":{
+ "password":"******",
+ "customSql":"",
+ "column":[
+ {
+ "name":"id",
+ "type":"int"
+ },
+ {
+ "name":"name",
+ "type":"varchar2"
+ }
+ ],
+ "connection":[
+ {
+ "jdbcUrl":[
+ "jdbc:oceanbase://127.0.0.1:30691"
+ ],
+ "table":[
+ "test.users"
+ ]
+ }
+ ],
+ "oceanBaseMode" : "MYSQL",
+ "username":"root"
+ },
+ "name":"oceanbasereader"
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "print": true
+ }
+ }
+ }
+ ],
+ "setting":{
+ "restore":{
+ },
+ "errorLimit":{},
+ "speed":{
+ "readerChannel":1,
+ "writerChannel":1,
+ "bytes":-1048576,
+ "channel":1
+ }
+ }
+ }
+}
diff --git a/chunjun-examples/json/oceanbase/stream_oceanbase.json b/chunjun-examples/json/oceanbase/stream_oceanbase.json
new file mode 100644
index 0000000000..8a4afc9beb
--- /dev/null
+++ b/chunjun-examples/json/oceanbase/stream_oceanbase.json
@@ -0,0 +1,73 @@
+{
+ "job" : {
+ "content" : [ {
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column": [
+ {
+ "name": "id",
+ "type": "id"
+ },
+ {
+ "name": "name",
+ "type": "string"
+ }
+ ],
+ "sliceRecordCount": [
+ 1000
+ ]
+ }
+ },
+ "writer" : {
+ "parameter" : {
+ "oceanBaseMode" : "MYSQL",
+ "schema" : "test",
+ "session" : [ ],
+ "column" : [ {
+ "customConverterType" : "INT",
+ "name" : "id",
+ "isPart" : false,
+ "type" : "INT",
+ "key" : "id"
+ }, {
+ "customConverterType" : "VARCHAR",
+ "name" : "name",
+ "isPart" : false,
+ "type" : "VARCHAR",
+ "key" : "name"
+ } ],
+ "executePreSqlNoData" : false,
+ "writeMode" : "insert",
+ "password" : "******",
+ "executePostSqlNoData" : false,
+ "connection" : [ {
+ "schema" : "test",
+ "jdbcUrl" : "jdbc:oceanbase://127.0.0.1:30691",
+ "table" : [ "users" ]
+ } ],
+ "username" : "root"
+ },
+ "name" : "oceanbasewriter"
+ }
+ } ],
+ "setting" : {
+ "restore" : {
+ "maxRowNumForCheckpoint" : 0,
+ "isRestore" : true,
+ "restoreColumnName" : "id",
+ "restoreColumnIndex" : 0
+ },
+ "errorLimit" : {
+ "record" : 10,
+ "percentage" : 100.0
+ },
+ "speed" : {
+ "readerChannel" : 1,
+ "writerChannel" : 1,
+ "bytes" : 0,
+ "channel" : 1
+ }
+ }
+ }
+}