Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ private MaxComputeOptions extractMaxComputeOptions(
String quotaName = factoryConfiguration.get(MaxComputeDataSinkOptions.QUOTA_NAME);
String stsToken = factoryConfiguration.get(MaxComputeDataSinkOptions.STS_TOKEN);
int bucketsNum = factoryConfiguration.get(MaxComputeDataSinkOptions.BUCKETS_NUM);
MaxComputeOptions.SinkOperation sinkOperation =
factoryConfiguration.get(MaxComputeDataSinkOptions.SINK_OPERATION);

String schemaOperatorUid =
pipelineConfiguration.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
Expand All @@ -65,6 +67,7 @@ private MaxComputeOptions extractMaxComputeOptions(
.withStsToken(stsToken)
.withBucketsNum(bucketsNum)
.withSchemaOperatorUid(schemaOperatorUid)
.withSinkOperation(sinkOperation)
.build();
}

Expand Down Expand Up @@ -124,6 +127,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM);
optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.SINK_OPERATION);

return optionalOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;

/** Options for MaxCompute Data Sink. */
public class MaxComputeDataSinkOptions {
Expand Down Expand Up @@ -107,4 +108,11 @@ public class MaxComputeDataSinkOptions {
.intType()
.defaultValue(4)
.withDescription("The number of concurrent with flush bucket data.");

public static final ConfigOption<MaxComputeOptions.SinkOperation> SINK_OPERATION =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation for the newly introduced sink.operation option. This should explain:

  • upsert (default): Requires primary keys in schema. Creates a transactional table and supports update/delete semantics.
  • append: Creates a regular table regardless of primary keys. Only supports insert operations, suitable for append-only scenarios.

This helps users understand the configuration impact and choose the appropriate mode for their use case.

ConfigOptions.key("sink.operation")
.enumType(MaxComputeOptions.SinkOperation.class)
.defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
.withDescription(
"The sink operation type, support 'upsert' and 'append', default is 'upsert'.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MaxComputeOptions implements Serializable {
private final String stsToken;
private final int bucketsNum;
private final String schemaOperatorUid;
private final SinkOperation sinkOperation;

private MaxComputeOptions(Builder builder) {
this.accessId = builder.accessId;
Expand All @@ -48,6 +49,7 @@ private MaxComputeOptions(Builder builder) {
this.bucketsNum = builder.bucketsNum;
this.supportSchema = MaxComputeUtils.supportSchema(this);
this.schemaOperatorUid = builder.schemaOperatorUid;
this.sinkOperation = builder.sinkOperation;
}

public static Builder builder(
Expand Down Expand Up @@ -95,6 +97,10 @@ public String getSchemaOperatorUid() {
return schemaOperatorUid;
}

public SinkOperation getSinkOperation() {
return sinkOperation;
}

/** builder for maxcompute options. */
public static class Builder {

Expand All @@ -107,6 +113,7 @@ public static class Builder {
private String stsToken;
private String schemaOperatorUid;
private int bucketsNum = 16;
private SinkOperation sinkOperation = SinkOperation.UPSERT;

public Builder(String accessId, String accessKey, String endpoint, String project) {
this.accessId = accessId;
Expand Down Expand Up @@ -140,8 +147,46 @@ public Builder withSchemaOperatorUid(String schemaOperatorUid) {
return this;
}

public Builder withSinkOperation(SinkOperation sinkOperation) {
this.sinkOperation = sinkOperation;
return this;
}

public MaxComputeOptions build() {
return new MaxComputeOptions(this);
}
}

/** Sink operation mode for MaxCompute: APPEND or UPSERT. */
public enum SinkOperation {
APPEND("append"),
UPSERT("upsert");

private final String value;

SinkOperation(String value) {
this.value = value;
}

public String getValue() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unused and could be removed.

return value;
}

@Override
public String toString() {
return value;
}

public static SinkOperation fromValue(String value) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unused and could be removed.

for (SinkOperation op : values()) {
if (op.value.equalsIgnoreCase(value)) {
return op;
}
}
throw new IllegalArgumentException(
"Unknown sink operation: '"
+ value
+ "'. Valid values are: 'upsert', 'append'.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public static void createTable(MaxComputeOptions options, TableId tableId, Schem
.withHints(unsupportSchemahints)
.ifNotExists()
.debug();
if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())
&& options.getSinkOperation() == MaxComputeOptions.SinkOperation.UPSERT) {
tableCreator
.transactionTable()
.withBucketNum(options.getBucketsNum())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public BatchAppendWriter(

private void initOrReloadSession(SessionIdentifier identifier) {
String partitionSpec = identifier.getPartitionName();
PartitionSpec partitionSpecObj =
partitionSpec != null && !partitionSpec.isEmpty()
? new PartitionSpec(partitionSpec)
: null;
String sessionId = identifier.getSessionId();

try {
Expand All @@ -79,15 +83,15 @@ private void initOrReloadSession(SessionIdentifier identifier) {
identifier.getProject(),
identifier.getSchema(),
identifier.getTable(),
new PartitionSpec(partitionSpec),
partitionSpecObj,
false);
} else {
this.uploadSession =
tunnel.getUploadSession(
identifier.getProject(),
identifier.getSchema(),
identifier.getTable(),
new PartitionSpec(partitionSpec),
partitionSpecObj,
sessionId);
}
this.recordWriter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static MaxComputeWriter batchWriter(
MaxComputeWriteOptions writeOptions,
SessionIdentifier sessionIdentifier)
throws IOException {
if (MaxComputeUtils.isTransactionalTable(options, sessionIdentifier)) {
if (MaxComputeUtils.isTransactionalTable(options, sessionIdentifier)
&& options.getSinkOperation() == MaxComputeOptions.SinkOperation.UPSERT) {
return new BatchUpsertWriter(options, writeOptions, sessionIdentifier);
} else {
return new BatchAppendWriter(options, writeOptions, sessionIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ static void destroyContainer() {
public final MaxComputeOptions testOptions =
MaxComputeOptions.builder("ak", "sk", getEndpoint(), "mocked_mc").build();

public final MaxComputeOptions appendOptions =
MaxComputeOptions.builder("ak", "sk", getEndpoint(), "mocked_mc")
.withSinkOperation(MaxComputeOptions.SinkOperation.APPEND)
.build();

public final Odps odpsInstance = MaxComputeUtils.getOdps(testOptions);

private String getEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,25 @@ void testRenameColumn() {
fail(e.getMessage());
}
}

@Test
void testCreateTableInAppendMode() {
try {
String appendTable = "SCHEMA_EVOLUTION_APPEND_TABLE";
SchemaEvolutionUtils.createTable(
appendOptions,
TableId.tableId(appendTable),
Schema.newBuilder()
.physicalColumn("PK", DataTypes.BIGINT())
.physicalColumn("ID1", DataTypes.BIGINT())
.primaryKey("PK")
.build());
// In APPEND mode the table should NOT be created as a transactional table,
// so primary key metadata should be absent even though the schema defines one.
assertThat(odpsInstance.tables().get(appendTable).getPrimaryKey()).isEmpty();
odpsInstance.tables().delete(appendTable, true);
Comment on lines +133 to +147
} catch (Exception e) {
fail(e.getMessage());
}
}
}
Loading