From 44cde4c8f96931dedf34d1598b83ad7bf38ae950 Mon Sep 17 00:00:00 2001 From: dujie Date: Fri, 21 Nov 2025 16:40:04 +0800 Subject: [PATCH] [Feature-#1944][OceanBase]support MysqlMode and Oracle Mode --- .../chunjun-connector-oceanbase/pom.xml | 12 ++ .../oceanbase/config/OceanBaseConf.java | 16 +++ .../oceanbase/config/OceanBaseMode.java | 6 + .../OceanbaseOracleSyncConverter.java | 97 ++++++++++++++++ .../converter/OceanbaseRawTypeMapper.java | 2 + .../dialect/OceanbaseMysqlModeDialect.java | 31 ++++++ .../dialect/OceanbaseOracleModeDialect.java | 47 ++++++++ .../oceanbase/sink/OceanbaseSinkFactory.java | 24 +++- .../source/OceanbaseSourceFactory.java | 68 +++++++++++- .../table/MysqlDynamicTableFactoryProxy.java | 12 ++ .../table/OceanbaseDynamicTableFactory.java | 104 +++++++++++++++++- .../table/OracleDynamicTableFactoryProxy.java | 12 ++ .../json/oceanbase/oceanbase_stream.json | 54 +++++++++ .../json/oceanbase/stream_oceanbase.json | 73 ++++++++++++ 14 files changed, 545 insertions(+), 13 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java create mode 100644 chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java create mode 100644 chunjun-examples/json/oceanbase/oceanbase_stream.json create mode 100644 chunjun-examples/json/oceanbase/stream_oceanbase.json diff --git a/chunjun-connectors/chunjun-connector-oceanbase/pom.xml b/chunjun-connectors/chunjun-connector-oceanbase/pom.xml index 93bd131d78..7ee7b50d02 100644 --- a/chunjun-connectors/chunjun-connector-oceanbase/pom.xml +++ b/chunjun-connectors/chunjun-connector-oceanbase/pom.xml @@ -48,6 +48,18 @@ chunjun-connector-jdbc-base ${project.version} + + + com.dtstack.chunjun + chunjun-connector-oracle + ${project.version} + + + com.dtstack.chunjun + chunjun-connector-mysql + ${project.version} + + diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java new file mode 100644 index 0000000000..b355e77f7a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseConf.java @@ -0,0 +1,16 @@ +package com.dtstack.chunjun.connector.oceanbase.config; + +import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; + +public class OceanBaseConf extends JdbcConfig { + + private String oceanBaseMode = OceanBaseMode.MYSQL.name(); + + public String getOceanBaseMode() { + return oceanBaseMode; + } + + public void setOceanBaseMode(String oceanBaseMode) { + this.oceanBaseMode = oceanBaseMode; + } +} diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java new file mode 100644 index 0000000000..af80222caf --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/config/OceanBaseMode.java @@ -0,0 +1,6 @@ +package com.dtstack.chunjun.connector.oceanbase.config; + +public enum OceanBaseMode { + MYSQL, + ORACLE +} diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java new file mode 100644 index 0000000000..89cb6707e8 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseOracleSyncConverter.java @@ -0,0 +1,97 @@ +package com.dtstack.chunjun.connector.oceanbase.converter; + +import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.oracle.converter.BlobType; +import com.dtstack.chunjun.connector.oracle.converter.ClobType; +import com.dtstack.chunjun.connector.oracle.converter.ConvertUtil; +import com.dtstack.chunjun.connector.oracle.converter.OracleSyncConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import com.dtstack.chunjun.element.column.BytesColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; +import com.dtstack.chunjun.element.column.ZonedTimestampColumn; + +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +import com.oceanbase.jdbc.Blob; +import com.oceanbase.jdbc.Clob; +import com.oceanbase.jdbc.extend.datatype.DataTypeUtilities; +import com.oceanbase.jdbc.extend.datatype.TIMESTAMPLTZ; +import com.oceanbase.jdbc.extend.datatype.TIMESTAMPTZ; + +import java.sql.Timestamp; +import java.util.TimeZone; + +public class OceanbaseOracleSyncConverter extends OracleSyncConverter { + + public OceanbaseOracleSyncConverter(RowType rowType, CommonConfig commonConfig) { + super(rowType, commonConfig); + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case VARCHAR: + if (type instanceof ClobType) { + return val -> { + Clob clob = (Clob) val; + return new StringColumn(ConvertUtil.convertClob(clob)); + }; + } + return val -> new StringColumn(val.toString()); + case VARBINARY: + return val -> { + if (type instanceof BlobType) { + Blob blob = (Blob) val; + byte[] bytes = blob.getBytes(1, (int) blob.length()); + return new BytesColumn(bytes); + } else { + return new BytesColumn((byte[]) val); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int precision = ((TimestampType) type).getPrecision(); + if (precision == 6) { + return val -> new TimestampColumn((Timestamp) val, 0); // java.sql.Timestamp + } + case TIMESTAMP_WITH_TIME_ZONE: + if (type instanceof ZonedTimestampType) { + final int zonedPrecision = ((ZonedTimestampType) type).getPrecision(); + return val -> { + TIMESTAMPTZ timestamptz = (TIMESTAMPTZ) val; + Timestamp timestamp = timestamptz.timestampValue(); + return new ZonedTimestampColumn(timestamp, zonedPrecision); + }; + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (type instanceof LocalZonedTimestampType) { + final int localPrecision = ((LocalZonedTimestampType) type).getPrecision(); + return val -> { + TIMESTAMPLTZ timestamptz = (TIMESTAMPLTZ) val; + // 重写处理12个字节情况,TIMESTAMPLTZ#toTimestamp + byte[] bytes = timestamptz.toBytes(); // 获取字节码 + TimeZone timeZone; + if (bytes.length >= 14) { + // 字节数组长度足够,尝试提取时区信息 + String tzStr = + DataTypeUtilities.toTimezoneStr( + bytes[12], bytes[13], "GMT", true); + timeZone = TimeZone.getTimeZone(tzStr); + } else { + // 字节数组长度不足,使用默认时区 + timeZone = TimeZone.getDefault(); + } + Timestamp timestamp = + new Timestamp(DataTypeUtilities.getOriginTime(bytes, timeZone)); + timestamp.setNanos(DataTypeUtilities.getNanos(bytes, 7)); + return new ZonedTimestampColumn(timestamp, timeZone, localPrecision); + }; + } + } + return super.createInternalConverter(type); + } +} diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java index 00f926d312..3d900df1bd 100644 --- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/converter/OceanbaseRawTypeMapper.java @@ -50,6 +50,7 @@ public static DataType apply(TypeConfig type) { case "DECIMAL": case "DECIMAL UNSIGNED": case "NUMERIC": + case "NUMBER": return DataTypes.DECIMAL(38, 18); case "DOUBLE": case "DOUBLE UNSIGNED": @@ -81,6 +82,7 @@ public static DataType apply(TypeConfig type) { case "LONGTEXT": case "ENUM": case "SET": + case "VARCHAR2": return DataTypes.STRING(); default: throw new UnsupportedTypeException(type); diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java new file mode 100644 index 0000000000..6fd929939f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseMysqlModeDialect.java @@ -0,0 +1,31 @@ +package com.dtstack.chunjun.connector.oceanbase.dialect; + +import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter; +import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; +import com.dtstack.chunjun.converter.RawTypeMapper; + +import java.util.Optional; + +public class OceanbaseMysqlModeDialect extends MysqlDialect { + OceanbaseDialect oceanbaseDialect = new OceanbaseDialect(); + + @Override + public String dialectName() { + return oceanbaseDialect.dialectName(); + } + + @Override + public boolean canHandle(String url) { + return oceanbaseDialect.canHandle(url); + } + + @Override + public RawTypeMapper getRawTypeConverter() { + return MysqlRawTypeConverter::apply; + } + + @Override + public Optional defaultDriverName() { + return oceanbaseDialect.defaultDriverName(); + } +} diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java new file mode 100644 index 0000000000..2e68814121 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/dialect/OceanbaseOracleModeDialect.java @@ -0,0 +1,47 @@ +package com.dtstack.chunjun.connector.oceanbase.dialect; + +import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.connector.oceanbase.converter.OceanbaseOracleSyncConverter; +import com.dtstack.chunjun.connector.oracle.converter.OracleRawTypeConverter; +import com.dtstack.chunjun.connector.oracle.dialect.OracleDialect; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.RawTypeMapper; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; + +import java.sql.ResultSet; +import java.util.Optional; + +public class OceanbaseOracleModeDialect extends OracleDialect { + OceanbaseDialect oceanbaseDialect = new OceanbaseDialect(); + + @Override + public String dialectName() { + return oceanbaseDialect.dialectName(); + } + + @Override + public boolean canHandle(String url) { + return oceanbaseDialect.canHandle(url); + } + + @Override + public RawTypeMapper getRawTypeConverter() { + return OracleRawTypeConverter::apply; + } + + @Override + public Optional defaultDriverName() { + return oceanbaseDialect.defaultDriverName(); + } + + @Override + public AbstractRowConverter + getColumnConverter(RowType rowType, CommonConfig commonConfig) { + return new OceanbaseOracleSyncConverter(rowType, commonConfig); + } +} diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java index 2e938d2da3..57e83ffff1 100644 --- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/sink/OceanbaseSinkFactory.java @@ -18,11 +18,31 @@ package com.dtstack.chunjun.connector.oceanbase.sink; import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory; -import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect; +import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf; +import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode; +import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect; +import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect; public class OceanbaseSinkFactory extends JdbcSinkFactory { + + private OceanBaseConf oceanBaseConf; + public OceanbaseSinkFactory(SyncConfig syncConfig) { - super(syncConfig, new OceanbaseDialect()); + super(syncConfig, new OceanbaseMysqlModeDialect()); + this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig; + if (oceanBaseConf != null) { + OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode()); + // 若是for oracle模式 + if (mode == OceanBaseMode.ORACLE) { + this.jdbcDialect = new OceanbaseOracleModeDialect(); + } + } + } + + @Override + protected Class getConfClass() { + return OceanBaseConf.class; } } diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java index e39f779162..cbd7a819e9 100644 --- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/source/OceanbaseSourceFactory.java @@ -18,20 +18,76 @@ package com.dtstack.chunjun.connector.oceanbase.source; import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; -import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect; +import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf; +import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode; +import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect; +import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.commons.lang3.StringUtils; +import java.util.Properties; + public class OceanbaseSourceFactory extends JdbcSourceFactory { + // 默认是Mysql流式拉取 + private static final int DEFAULT_FETCH_SIZE = Integer.MIN_VALUE; + private static final String ORACLE_JDBC_READ_TIMEOUT = "oracle.jdbc.ReadTimeout"; + private static final String ORACLE_NET_CONNECT_TIMEOUT = "oracle.net.CONNECT_TIMEOUT"; + + private OceanBaseConf oceanBaseConf; + public OceanbaseSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) { - super(syncConfig, env, new OceanbaseDialect()); - if (jdbcConfig.isPolling() - && StringUtils.isEmpty(jdbcConfig.getStartLocation()) - && jdbcConfig.getFetchSize() == 0) { - jdbcConfig.setFetchSize(1000); + super(syncConfig, env, new OceanbaseMysqlModeDialect()); // 默认为mysql方言 + this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig; + if (oceanBaseConf != null) { + OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode()); + // 若是for oracle模式 + if (mode == OceanBaseMode.ORACLE) { + // 设置for oracle方言 + this.jdbcDialect = new OceanbaseOracleModeDialect(); + Properties properties = jdbcConfig.getProperties(); + if (properties == null) { + properties = new Properties(); + } + if (jdbcConfig.getConnectTimeOut() != 0) { + // queryTimeOut单位是秒 需要转换成毫秒 + properties.putIfAbsent( + ORACLE_JDBC_READ_TIMEOUT, + String.valueOf(jdbcConfig.getQueryTimeOut() * 1000)); + properties.putIfAbsent( + ORACLE_NET_CONNECT_TIMEOUT, + String.valueOf(jdbcConfig.getQueryTimeOut() * 3 * 1000)); + jdbcConfig.setProperties(properties); + } + } else { + // 其他情况:for mysql模式 初始化 + // 避免result.next阻塞 + if (jdbcConfig.isPolling() + && StringUtils.isEmpty(jdbcConfig.getStartLocation()) + && jdbcConfig.getFetchSize() == 0) { + jdbcConfig.setFetchSize(1000); + } + } + } + } + + @Override + protected Class getConfClass() { + return OceanBaseConf.class; + } + + @Override + protected int getDefaultFetchSize() { + if (oceanBaseConf != null) { + OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode()); + // 处理for oracle情况 + if (mode == OceanBaseMode.ORACLE) { + return super.getDefaultFetchSize(); + } } + return DEFAULT_FETCH_SIZE; } } diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java new file mode 100644 index 0000000000..7e1ca3a288 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/MysqlDynamicTableFactoryProxy.java @@ -0,0 +1,12 @@ +package com.dtstack.chunjun.connector.oceanbase.table; + +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.mysql.table.MysqlDynamicTableFactory; +import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect; + +public class MysqlDynamicTableFactoryProxy extends MysqlDynamicTableFactory { + @Override + protected JdbcDialect getDialect() { + return new OceanbaseMysqlModeDialect(); + } +} diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java index bbef9a80cc..31eb73f1dc 100644 --- a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OceanbaseDynamicTableFactory.java @@ -17,12 +17,29 @@ */ package com.dtstack.chunjun.connector.oceanbase.table; -import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.options.JdbcCommonOptions; import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory; -import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect; +import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode; -public class OceanbaseDynamicTableFactory extends JdbcDynamicTableFactory { +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; + +public class OceanbaseDynamicTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { private static final String IDENTIFIER = "oceanbase-x"; + private JdbcDynamicTableFactory factory; @Override public String factoryIdentifier() { @@ -30,7 +47,84 @@ public String factoryIdentifier() { } @Override - protected JdbcDialect getDialect() { - return new OceanbaseDialect(); + public DynamicTableSource createDynamicTableSource(Context context) { + try { + JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(context); + return dynamicTableSourceProxy.createDynamicTableSource(context); + } catch (Exception e) { + throw new RuntimeException("Create DynamicTableSource failed ", e); + } + } + + @Override + public Set> requiredOptions() { + try { + JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(null); + return dynamicTableSourceProxy.requiredOptions(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Set> optionalOptions() { + try { + JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(null); + return dynamicTableSourceProxy.optionalOptions(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + try { + JdbcDynamicTableFactory dynamicTableSourceProxy = getDynamicTableSourceProxy(context); + return dynamicTableSourceProxy.createDynamicTableSink(context); + } catch (Exception e) { + throw new RuntimeException("Create DynamicTableSink failed", e); + } + } + + private JdbcDynamicTableFactory getDynamicTableSourceProxy(Context context) throws Exception { + if (factory == null) { + if (context == null) { + return new MysqlDynamicTableFactoryProxy(); + } + OceanBaseMode mode; + try { + mode = getCompatibilityMode(context); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (mode == OceanBaseMode.MYSQL) { + factory = new MysqlDynamicTableFactoryProxy(); + } else if (mode == OceanBaseMode.ORACLE) { + factory = new OracleDynamicTableFactoryProxy(); + } else { + throw new Exception("Nonsupport such mode " + mode); + } + } + return factory; + } + + private OceanBaseMode getCompatibilityMode(DynamicTableFactory.Context context) { + Map options = context.getCatalogTable().getOptions(); + String password = options.get(JdbcCommonOptions.PASSWORD.key()); + String url = options.get(JdbcCommonOptions.URL.key()); + String username = options.get(JdbcCommonOptions.USERNAME.key()); + String mode; + try { + Class.forName("com.oceanbase.jdbc.Driver"); + + Connection connection = DriverManager.getConnection(url, username, password); + Statement sm = connection.createStatement(); + ResultSet rs = sm.executeQuery("SHOW GLOBAL VARIABLES like 'ob_compatibility_mode'"); + rs.next(); + mode = rs.getString(2); + } catch (ClassNotFoundException | SQLException e) { + throw new RuntimeException("Get oceanbase compatibility mode failed", e); + } + return OceanBaseMode.valueOf(mode); } } diff --git a/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java new file mode 100644 index 0000000000..1887780b7a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-oceanbase/src/main/java/com/dtstack/chunjun/connector/oceanbase/table/OracleDynamicTableFactoryProxy.java @@ -0,0 +1,12 @@ +package com.dtstack.chunjun.connector.oceanbase.table; + +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect; +import com.dtstack.chunjun.connector.oracle.table.OracleDynamicTableFactory; + +public class OracleDynamicTableFactoryProxy extends OracleDynamicTableFactory { + @Override + protected JdbcDialect getDialect() { + return new OceanbaseOracleModeDialect(); + } +} diff --git a/chunjun-examples/json/oceanbase/oceanbase_stream.json b/chunjun-examples/json/oceanbase/oceanbase_stream.json new file mode 100644 index 0000000000..c9dbfc3ee5 --- /dev/null +++ b/chunjun-examples/json/oceanbase/oceanbase_stream.json @@ -0,0 +1,54 @@ +{ + "job":{ + "content":[ + { + "reader":{ + "parameter":{ + "password":"******", + "customSql":"", + "column":[ + { + "name":"id", + "type":"int" + }, + { + "name":"name", + "type":"varchar2" + } + ], + "connection":[ + { + "jdbcUrl":[ + "jdbc:oceanbase://127.0.0.1:30691" + ], + "table":[ + "test.users" + ] + } + ], + "oceanBaseMode" : "MYSQL", + "username":"root" + }, + "name":"oceanbasereader" + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": true + } + } + } + ], + "setting":{ + "restore":{ + }, + "errorLimit":{}, + "speed":{ + "readerChannel":1, + "writerChannel":1, + "bytes":-1048576, + "channel":1 + } + } + } +} diff --git a/chunjun-examples/json/oceanbase/stream_oceanbase.json b/chunjun-examples/json/oceanbase/stream_oceanbase.json new file mode 100644 index 0000000000..8a4afc9beb --- /dev/null +++ b/chunjun-examples/json/oceanbase/stream_oceanbase.json @@ -0,0 +1,73 @@ +{ + "job" : { + "content" : [ { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "name": "id", + "type": "id" + }, + { + "name": "name", + "type": "string" + } + ], + "sliceRecordCount": [ + 1000 + ] + } + }, + "writer" : { + "parameter" : { + "oceanBaseMode" : "MYSQL", + "schema" : "test", + "session" : [ ], + "column" : [ { + "customConverterType" : "INT", + "name" : "id", + "isPart" : false, + "type" : "INT", + "key" : "id" + }, { + "customConverterType" : "VARCHAR", + "name" : "name", + "isPart" : false, + "type" : "VARCHAR", + "key" : "name" + } ], + "executePreSqlNoData" : false, + "writeMode" : "insert", + "password" : "******", + "executePostSqlNoData" : false, + "connection" : [ { + "schema" : "test", + "jdbcUrl" : "jdbc:oceanbase://127.0.0.1:30691", + "table" : [ "users" ] + } ], + "username" : "root" + }, + "name" : "oceanbasewriter" + } + } ], + "setting" : { + "restore" : { + "maxRowNumForCheckpoint" : 0, + "isRestore" : true, + "restoreColumnName" : "id", + "restoreColumnIndex" : 0 + }, + "errorLimit" : { + "record" : 10, + "percentage" : 100.0 + }, + "speed" : { + "readerChannel" : 1, + "writerChannel" : 1, + "bytes" : 0, + "channel" : 1 + } + } + } +}