[FLINK-39889][table] Thread USING CONNECTION clause into CatalogTable and CreateTableOperation#28385
Conversation
Thread the connection identifier from a parsed USING CONNECTION clause in SqlCreateTable through to CatalogTable and the CreateTableOperation. - Add Optional<UnresolvedIdentifier> getConnection() to CatalogTable, a connection() builder method, and the field/accessor in DefaultCatalogTable (equals/hashCode/toString/copy updated). - Delegate getConnection() in ResolvedCatalogTable. - Read SqlCreateTable.getConnection() in AbstractCreateTableConverter and set it on the built CatalogTable. - Serialize/deserialize the connection identifier in CatalogPropertiesUtil under a reserved 'connection.identifier' key, excluded from connector options, so it round-trips and appears in CreateTableOperation summaries. - Add a withConnection() OperationMatcher and tests covering conversion and property serde. Part of FLIP-529.
| final String sql = "create table derivedTable(\n" + " a int\n" + ")"; | ||
| Operation operation = parseAndConvert(sql); | ||
| assertThat(operation) | ||
| .is(new HamcrestCondition<>(isCreateTableOperation(withConnection(null)))); |
There was a problem hiding this comment.
why do we need HamcrestCondition here and can't live without it?
There was a problem hiding this comment.
Removed HamcrestCondition here — the tests now cast to CreateTableOperation and assert on getCatalogTable().getConnection() directly with AssertJ. Also dropped the now-unused withConnection matcher from OperationMatchers.
| "create table derivedTable(\n" | ||
| + " a int\n" | ||
| + ")\n" | ||
| + "USING CONNECTION mycat.mydb.myconn"; | ||
| Operation operation = parseAndConvert(sql); |
There was a problem hiding this comment.
case seems mixed
better to have all keywords in uppercase
There was a problem hiding this comment.
Good catch — uppercased the keywords (CREATE TABLE, INT) in both tests.
|
the PR name seems too generic and says nothing about operation |
…rcase SQL keywords
|
Renamed to |
What is the purpose of the change
Part of FLIP-529: Connections in Flink SQL and TableAPI (parent: FLINK-38259).
Threads the connection identifier from a parsed
USING CONNECTIONclause inSqlCreateTable(added in FLINK-39726) through toCatalogTableand theCreateTableOperation, mirroring how the optionaldistributionfield is wired.Brief change log
Optional<UnresolvedIdentifier> getConnection()toCatalogTablewith aconnection()builder method, and the backing field/accessor inDefaultCatalogTable(equals/hashCode/toString/copy updated; existing constructors kept for compatibility).getConnection()inResolvedCatalogTable.SqlCreateTable.getConnection()inAbstractCreateTableConverterand set it on the builtCatalogTable, soCreateTableOperation#getCatalogTable().getConnection()carries the reference.CatalogPropertiesUtilunder a reservedconnection.identifierkey, excluded from connector options, so it round-trips throughtoProperties/fromPropertiesand appears in operation summaries.Verifying this change
This change added tests and can be verified as follows:
SqlDdlToOperationConverterTest#testCreateTableWithConnection/#testCreateTableWithoutConnectionwith a newOperationMatchers#withConnectionmatcher.ResolvedCatalogTableserde case carrying a connection toCatalogPropertiesUtilTest#testCatalogTableSerde.Does this pull request potentially affect one of the following parts:
@Public(Evolving): yes (CatalogTable,ResolvedCatalogTablegain an optional getter with a default)Documentation
Was generative AI tooling used to co-author this PR?
Yes