diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java index 87d87bea5b8..b70abee0976 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java @@ -56,6 +56,8 @@ private MaxComputeOptions extractMaxComputeOptions( String quotaName = factoryConfiguration.get(MaxComputeDataSinkOptions.QUOTA_NAME); String stsToken = factoryConfiguration.get(MaxComputeDataSinkOptions.STS_TOKEN); int bucketsNum = factoryConfiguration.get(MaxComputeDataSinkOptions.BUCKETS_NUM); + MaxComputeOptions.SinkOperation sinkOperation = + factoryConfiguration.get(MaxComputeDataSinkOptions.SINK_OPERATION); String schemaOperatorUid = pipelineConfiguration.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID); @@ -65,6 +67,7 @@ private MaxComputeOptions extractMaxComputeOptions( .withStsToken(stsToken) .withBucketsNum(bucketsNum) .withSchemaOperatorUid(schemaOperatorUid) + .withSinkOperation(sinkOperation) .build(); } @@ -124,6 +127,7 @@ public Set> optionalOptions() { optionalOptions.add(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM); optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE); optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE); + optionalOptions.add(MaxComputeDataSinkOptions.SINK_OPERATION); return optionalOptions; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java index e85c949410f..9484828be79 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.ConfigOptions; import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm; +import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions; /** Options for MaxCompute Data Sink. */ public class MaxComputeDataSinkOptions { @@ -107,4 +108,11 @@ public class MaxComputeDataSinkOptions { .intType() .defaultValue(4) .withDescription("The number of concurrent with flush bucket data."); + + public static final ConfigOption SINK_OPERATION = + ConfigOptions.key("sink.operation") + .enumType(MaxComputeOptions.SinkOperation.class) + .defaultValue(MaxComputeOptions.SinkOperation.UPSERT) + .withDescription( + "The sink operation type, support 'upsert' and 'append', default is 'upsert'."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java index 87ebd50391f..e69981694fc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java @@ -36,6 +36,7 @@ public class MaxComputeOptions implements Serializable { private final String stsToken; private final int bucketsNum; private final String schemaOperatorUid; + private final SinkOperation sinkOperation; private MaxComputeOptions(Builder builder) { this.accessId = builder.accessId; @@ -48,6 +49,7 @@ private MaxComputeOptions(Builder builder) { this.bucketsNum = builder.bucketsNum; this.supportSchema = MaxComputeUtils.supportSchema(this); this.schemaOperatorUid = builder.schemaOperatorUid; + this.sinkOperation = builder.sinkOperation; } public static Builder builder( @@ -95,6 +97,10 @@ public String getSchemaOperatorUid() { return schemaOperatorUid; } + public SinkOperation getSinkOperation() { + return sinkOperation; + } + /** builder for maxcompute options. */ public static class Builder { @@ -107,6 +113,7 @@ public static class Builder { private String stsToken; private String schemaOperatorUid; private int bucketsNum = 16; + private SinkOperation sinkOperation = SinkOperation.UPSERT; public Builder(String accessId, String accessKey, String endpoint, String project) { this.accessId = accessId; @@ -140,8 +147,46 @@ public Builder withSchemaOperatorUid(String schemaOperatorUid) { return this; } + public Builder withSinkOperation(SinkOperation sinkOperation) { + this.sinkOperation = sinkOperation; + return this; + } + public MaxComputeOptions build() { return new MaxComputeOptions(this); } } + + /** Sink operation mode for MaxCompute: APPEND or UPSERT. */ + public enum SinkOperation { + APPEND("append"), + UPSERT("upsert"); + + private final String value; + + SinkOperation(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return value; + } + + public static SinkOperation fromValue(String value) { + for (SinkOperation op : values()) { + if (op.value.equalsIgnoreCase(value)) { + return op; + } + } + throw new IllegalArgumentException( + "Unknown sink operation: '" + + value + + "'. Valid values are: 'upsert', 'append'."); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java index 573fd989046..8f1d201f24d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java @@ -85,7 +85,8 @@ public static void createTable(MaxComputeOptions options, TableId tableId, Schem .withHints(unsupportSchemahints) .ifNotExists() .debug(); - if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { + if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys()) + && options.getSinkOperation() == MaxComputeOptions.SinkOperation.UPSERT) { tableCreator .transactionTable() .withBucketNum(options.getBucketsNum()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/BatchAppendWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/BatchAppendWriter.java index 1cf6be4c0a3..1a6e9a51d7d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/BatchAppendWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/BatchAppendWriter.java @@ -70,6 +70,10 @@ public BatchAppendWriter( private void initOrReloadSession(SessionIdentifier identifier) { String partitionSpec = identifier.getPartitionName(); + PartitionSpec partitionSpecObj = + partitionSpec != null && !partitionSpec.isEmpty() + ? new PartitionSpec(partitionSpec) + : null; String sessionId = identifier.getSessionId(); try { @@ -79,7 +83,7 @@ private void initOrReloadSession(SessionIdentifier identifier) { identifier.getProject(), identifier.getSchema(), identifier.getTable(), - new PartitionSpec(partitionSpec), + partitionSpecObj, false); } else { this.uploadSession = @@ -87,7 +91,7 @@ private void initOrReloadSession(SessionIdentifier identifier) { identifier.getProject(), identifier.getSchema(), identifier.getTable(), - new PartitionSpec(partitionSpec), + partitionSpecObj, sessionId); } this.recordWriter = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/MaxComputeWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/MaxComputeWriter.java index dcac8500b2a..e771c6546a9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/MaxComputeWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/MaxComputeWriter.java @@ -35,7 +35,8 @@ static MaxComputeWriter batchWriter( MaxComputeWriteOptions writeOptions, SessionIdentifier sessionIdentifier) throws IOException { - if (MaxComputeUtils.isTransactionalTable(options, sessionIdentifier)) { + if (MaxComputeUtils.isTransactionalTable(options, sessionIdentifier) + && options.getSinkOperation() == MaxComputeOptions.SinkOperation.UPSERT) { return new BatchUpsertWriter(options, writeOptions, sessionIdentifier); } else { return new BatchAppendWriter(options, writeOptions, sessionIdentifier); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/EmulatorTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/EmulatorTestBase.java index 7d930b829e8..9ac6b1e577f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/EmulatorTestBase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/EmulatorTestBase.java @@ -66,6 +66,11 @@ static void destroyContainer() { public final MaxComputeOptions testOptions = MaxComputeOptions.builder("ak", "sk", getEndpoint(), "mocked_mc").build(); + public final MaxComputeOptions appendOptions = + MaxComputeOptions.builder("ak", "sk", getEndpoint(), "mocked_mc") + .withSinkOperation(MaxComputeOptions.SinkOperation.APPEND) + .build(); + public final Odps odpsInstance = MaxComputeUtils.getOdps(testOptions); private String getEndpoint() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtilsTest.java index 3acc70b83da..92e610f133e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtilsTest.java @@ -128,4 +128,25 @@ void testRenameColumn() { fail(e.getMessage()); } } + + @Test + void testCreateTableInAppendMode() { + try { + String appendTable = "SCHEMA_EVOLUTION_APPEND_TABLE"; + SchemaEvolutionUtils.createTable( + appendOptions, + TableId.tableId(appendTable), + Schema.newBuilder() + .physicalColumn("PK", DataTypes.BIGINT()) + .physicalColumn("ID1", DataTypes.BIGINT()) + .primaryKey("PK") + .build()); + // In APPEND mode the table should NOT be created as a transactional table, + // so primary key metadata should be absent even though the schema defines one. + assertThat(odpsInstance.tables().get(appendTable).getPrimaryKey()).isEmpty(); + odpsInstance.tables().delete(appendTable, true); + } catch (Exception e) { + fail(e.getMessage()); + } + } }