From e042f9293daef94f3fb5502248a1753b3355e76e Mon Sep 17 00:00:00 2001 From: yux <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 20 Apr 2026 10:32:10 +0800 Subject: [PATCH] [FLINK-39418] Fix decimal truncation when precision / scale is over 19 --- .../cdc/common/converter/CommonConverter.java | 15 +- .../converter/InternalObjectConverter.java | 2 +- .../common/pipeline/DecimalPrecisionMode.java | 34 ++ .../cdc/common/pipeline/PipelineOptions.java | 19 + .../InternalObjectConverterTest.java | 2 +- .../composer/flink/FlinkPipelineComposer.java | 3 + .../flink/translator/TransformTranslator.java | 3 + .../flink/FlinkPipelineTransformITCase.java | 90 ++- .../composer/specs/TransformSpecsITCase.java | 11 +- .../src/test/resources/specs/casting.yaml | 120 ++++ .../src/test/resources/specs/decimal.yaml | 181 +++++- .../transform/PostTransformOperator.java | 14 +- .../PostTransformOperatorBuilder.java | 11 +- .../transform/TransformFilterProcessor.java | 14 +- .../TransformProjectionProcessor.java | 7 +- .../runtime/parser/FlinkCdcTypeSystem.java | 56 ++ .../cdc/runtime/parser/JaninoCompiler.java | 24 +- .../cdc/runtime/parser/TransformParser.java | 41 +- .../runtime/parser/TransformParserTest.java | 532 +++++++++++------- 19 files changed, 911 insertions(+), 268 deletions(-) create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/DecimalPrecisionMode.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/FlinkCdcTypeSystem.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java index feb0e445935..8260c305d00 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java @@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.types.ArrayType; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.MapType; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.types.VariantType; @@ -153,13 +154,21 @@ static StringData convertToStringData(Object obj) { "Cannot convert " + obj + " of type " + obj.getClass() + " to STRING DATA."); } - static DecimalData convertToDecimalData(Object obj) { + static DecimalData convertToDecimalData(Object obj, DecimalType decimalType) { if (obj instanceof DecimalData) { - return (DecimalData) obj; + DecimalData dd = (DecimalData) obj; + // Re-convert to target precision and scale if different + if (dd.precision() == decimalType.getPrecision() + && dd.scale() == decimalType.getScale()) { + return dd; + } + return DecimalData.fromBigDecimal( + dd.toBigDecimal(), decimalType.getPrecision(), decimalType.getScale()); } if (obj instanceof BigDecimal) { BigDecimal bd = (BigDecimal) obj; - return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale()); + return DecimalData.fromBigDecimal( + bd, decimalType.getPrecision(), decimalType.getScale()); } throw new RuntimeException( "Cannot convert " + obj + " of type " + obj.getClass() + " to DECIMAL DATA."); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java index 7f1cc97bdfe..0dc664e54ba 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java @@ -87,7 +87,7 @@ public Function visit(VarBinaryType varBinaryType) { @Override public Function visit(DecimalType decimalType) { - return CommonConverter::convertToDecimalData; + return obj -> CommonConverter.convertToDecimalData(obj, decimalType); } @Override diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/DecimalPrecisionMode.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/DecimalPrecisionMode.java new file mode 100644 index 00000000000..79a733fb1a1 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/DecimalPrecisionMode.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.pipeline; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; + +/** + * Maximum precision mode for DECIMAL type in transform expressions. Controls the upper bound of + * numeric precision used by the SQL type system during expression evaluation. + */ +@PublicEvolving +public enum DecimalPrecisionMode { + + /** Limits DECIMAL precision to 19 digits, matching Calcite's default type system behavior. */ + UP_TO_19, + + /** Allows DECIMAL precision up to 38 digits, matching Flink CDC's extended type system. */ + UP_TO_38 +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 443d6145b74..ebc57101994 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -136,5 +136,24 @@ public class PipelineOptions { .withDescription( "The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes."); + public static final ConfigOption + PIPELINE_TRANSFORM_DECIMAL_PRECISION_MODE = + ConfigOptions.key("transform.decimal.precision.mode") + .enumType(DecimalPrecisionMode.class) + .defaultValue(DecimalPrecisionMode.UP_TO_19) + .withDescription( + Description.builder() + .text( + "Maximum precision mode for DECIMAL type in transform expression evaluation. ") + .linebreak() + .add( + ListElement.list( + text( + "UP_TO_19: Limits DECIMAL precision to 19 digits, matching Calcite's default type system. " + + "This is the default behavior for all versions."), + text( + "UP_TO_38: Allows DECIMAL precision up to 38 digits, matching Flink CDC's extended type system."))) + .build()); + private PipelineOptions() {} } diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java index 2a4c3e1b341..ee4ff5859d5 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java @@ -195,7 +195,7 @@ void testConvertToDecimal() { .hasToString("4.2"); assertThat(convertToInternal(new BigDecimal("-3.1415926"), DataTypes.DECIMAL(20, 10))) .isInstanceOf(DecimalData.class) - .hasToString("-3.1415926"); + .hasToString("-3.1415926000"); assertThat( convertToInternal( diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 1854b05032b..241985b7f6a 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -195,6 +195,9 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) stream, pipelineDef.getTransforms(), pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), + pipelineDef + .getConfig() + .get(PipelineOptions.PIPELINE_TRANSFORM_DECIMAL_PRECISION_MODE), pipelineDef.getUdfs(), pipelineDef.getModels(), dataSource.supportedMetadataColumns(), diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index 6e296db4d1c..35687087e5d 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.composer.definition.ModelDef; import org.apache.flink.cdc.composer.definition.TransformDef; @@ -94,6 +95,7 @@ public DataStream translatePostTransform( DataStream input, List transforms, String timezone, + DecimalPrecisionMode decimalPrecisionMode, List udfFunctions, List models, SupportedMetadataColumn[] supportedMetadataColumns, @@ -117,6 +119,7 @@ public DataStream translatePostTransform( supportedMetadataColumns); } postTransformFunctionBuilder.addTimezone(timezone); + postTransformFunctionBuilder.addDecimalPrecisionMode(decimalPrecisionMode); postTransformFunctionBuilder.addUdfFunctions( udfFunctions.stream().map(this::udfDefToUDFTuple).collect(Collectors.toList())); postTransformFunctionBuilder.addUdfFunctions( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 9a774642e2f..f1b89a94b21 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; @@ -2179,7 +2180,8 @@ void testTransformWithCommentsAndDefaultExpr() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida, 26, extras], after=[], op=DELETE, meta=()}"); } - String[] runNumericCastingWith(String expression) throws Exception { + String[] runNumericCastingWith(DecimalPrecisionMode decimalPrecisionMode, String expression) + throws Exception { try { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); @@ -2207,6 +2209,9 @@ String[] runNumericCastingWith(String expression) throws Exception { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); pipelineConfig.set( PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.set( + PipelineOptions.PIPELINE_TRANSFORM_DECIMAL_PRECISION_MODE, + decimalPrecisionMode); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -2253,9 +2258,10 @@ private static String generateCastTo(String type) { .collect(Collectors.joining(", ")); } - @Test - void testNumericCastingsWithTruncation() throws Exception { - assertThat(runNumericCastingWith("*")) + @ParameterizedTest(name = "Decimal mode: {0}") + @EnumSource + void testNumericCastingsWithTruncation(DecimalPrecisionMode mode) throws Exception { + assertThat(runNumericCastingWith(mode, "*")) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2),`valid_char_c` VARCHAR(17),`invalid_char_c` VARCHAR(17)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11, -12.13, foo], op=INSERT, meta=()}", @@ -2263,7 +2269,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11, 12.13, baz], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("BOOLEAN"))) + assertThat(runNumericCastingWith(mode, generateCastTo("BOOLEAN"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", @@ -2271,7 +2277,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("TINYINT"))) + assertThat(runNumericCastingWith(mode, generateCastTo("TINYINT"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", @@ -2279,7 +2285,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("SMALLINT"))) + assertThat(runNumericCastingWith(mode, generateCastTo("SMALLINT"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` SMALLINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", @@ -2287,7 +2293,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("INT"))) + assertThat(runNumericCastingWith(mode, generateCastTo("INT"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` INT,`invalid_char_c` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", @@ -2295,7 +2301,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("BIGINT"))) + assertThat(runNumericCastingWith(mode, generateCastTo("BIGINT"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", @@ -2303,7 +2309,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("FLOAT"))) + assertThat(runNumericCastingWith(mode, generateCastTo("FLOAT"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", @@ -2311,7 +2317,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("DOUBLE"))) + assertThat(runNumericCastingWith(mode, generateCastTo("DOUBLE"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", @@ -2319,7 +2325,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("DECIMAL(1, 0)"))) + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(1, 0)"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` DECIMAL(1, 0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null, null, null], op=INSERT, meta=()}", @@ -2327,7 +2333,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null, null, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("DECIMAL(2, 0)"))) + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(2, 0)"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` DECIMAL(2, 0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}", @@ -2335,7 +2341,7 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("DECIMAL(3, 1)"))) + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(3, 1)"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` DECIMAL(3, 1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1, -12.1, null], op=INSERT, meta=()}", @@ -2343,13 +2349,67 @@ void testNumericCastingsWithTruncation() throws Exception { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1, 12.1, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); - assertThat(runNumericCastingWith(generateCastTo("DECIMAL(19, 10)"))) + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(19, 10)"))) .containsExactly( "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + // Test DECIMAL with maximum precision (38) + + if (mode.equals(DecimalPrecisionMode.UP_TO_38)) { + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(38, 0)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(38, 0),`small_c` DECIMAL(38, 0),`int_c` DECIMAL(38, 0),`bigint_c` DECIMAL(38, 0),`float_c` DECIMAL(38, 0),`double_c` DECIMAL(38, 0),`decimal_c` DECIMAL(38, 0),`valid_char_c` DECIMAL(38, 0),`invalid_char_c` DECIMAL(38, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(38, 10)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(38, 10),`small_c` DECIMAL(38, 10),`int_c` DECIMAL(38, 10),`bigint_c` DECIMAL(38, 10),`float_c` DECIMAL(38, 10),`double_c` DECIMAL(38, 10),`decimal_c` DECIMAL(38, 10),`valid_char_c` DECIMAL(38, 10),`invalid_char_c` DECIMAL(38, 10)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(38, 18)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(38, 18),`small_c` DECIMAL(38, 18),`int_c` DECIMAL(38, 18),`bigint_c` DECIMAL(38, 18),`float_c` DECIMAL(38, 18),`double_c` DECIMAL(38, 18),`decimal_c` DECIMAL(38, 18),`valid_char_c` DECIMAL(38, 18),`invalid_char_c` DECIMAL(38, 18)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.000000000000000000, -3.000000000000000000, -4.000000000000000000, -5.000000000000000000, -6.700000000000000000, -8.900000000000000000, -10.110000000000000000, -12.130000000000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.000000000000000000, 3.000000000000000000, 4.000000000000000000, 5.000000000000000000, 6.700000000000000000, 8.900000000000000000, 10.110000000000000000, 12.130000000000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + } else if (mode.equals(DecimalPrecisionMode.UP_TO_19)) { + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(38, 0)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(19, 0),`small_c` DECIMAL(19, 0),`int_c` DECIMAL(19, 0),`bigint_c` DECIMAL(19, 0),`float_c` DECIMAL(19, 0),`double_c` DECIMAL(19, 0),`decimal_c` DECIMAL(19, 0),`valid_char_c` DECIMAL(19, 0),`invalid_char_c` DECIMAL(19, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(38, 10)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(mode, generateCastTo("DECIMAL(38, 18)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(19, 18),`small_c` DECIMAL(19, 18),`int_c` DECIMAL(19, 18),`bigint_c` DECIMAL(19, 18),`float_c` DECIMAL(19, 18),`double_c` DECIMAL(19, 18),`decimal_c` DECIMAL(19, 18),`valid_char_c` DECIMAL(19, 18),`invalid_char_c` DECIMAL(19, 18)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.000000000000000000, -3.000000000000000000, -4.000000000000000000, -5.000000000000000000, -6.700000000000000000, -8.900000000000000000, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, 0.000000000000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.000000000000000000, 3.000000000000000000, 4.000000000000000000, 5.000000000000000000, 6.700000000000000000, 8.900000000000000000, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + } else { + Assertions.fail("Unexpected decimal precision mode: " + mode); + } } @Test diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java index 21e7ddc5d1b..22b330ea9e7 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; @@ -342,7 +343,6 @@ static Stream loadTestSpecs() throws IOException { private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - @SuppressWarnings("unchecked") private static Stream loadTestSpec(Path specPath) { List specs = new ArrayList<>(); try { @@ -356,6 +356,11 @@ private static Stream loadTestSpec(Path specPath) { if (specNode.has("time-zone")) { spec.timeZone = asTextOrNull(specNode.get("time-zone")); } + if (specNode.has("decimal-precision-mode")) { + spec.decimalPrecisionMode = + DecimalPrecisionMode.valueOf( + specNode.get("decimal-precision-mode").asText().toUpperCase()); + } if (specNode.has("projection")) { spec.projectionRules = List.of( @@ -399,6 +404,7 @@ static class TestSpec { public String name; public String ignore; public String timeZone = "UTC"; + public DecimalPrecisionMode decimalPrecisionMode = DecimalPrecisionMode.UP_TO_19; public List projectionRules = new ArrayList<>(); public @Nullable String filterRule; public @Nullable String primaryKey; @@ -474,6 +480,9 @@ void runTransformSpecs(String group, String name, TestSpec spec) throws Exceptio pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, spec.timeZone); pipelineConfig.set( PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.set( + PipelineOptions.PIPELINE_TRANSFORM_DECIMAL_PRECISION_MODE, + spec.decimalPrecisionMode); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-composer/src/test/resources/specs/casting.yaml b/flink-cdc-composer/src/test/resources/specs/casting.yaml index f5ce0e802a5..c5d07c981cf 100644 --- a/flink-cdc-composer/src/test/resources/specs/casting.yaml +++ b/flink-cdc-composer/src/test/resources/specs/casting.yaml @@ -220,6 +220,126 @@ DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.00000, 1.00000, 2.22000, 333.00000, 44.44000, 5555555.00000, null], after=[], op=DELETE, meta=()} +- do: Cast To Decimal(38, 0) (Decimal-19 mode) + decimal-precision-mode: up_to_19 + projection: |- + id_ + CAST(null AS DECIMAL(38, 0)) AS comp_0 + CAST(0 AS DECIMAL(38, 0)) AS comp_1 + CAST(1 AS DECIMAL(38, 0)) AS comp_2 + CAST('2.22' AS DECIMAL(38, 0)) AS comp_3 + CAST('333' AS DECIMAL(38, 0)) AS comp_4 + CAST('44.44' AS DECIMAL(38, 0)) AS comp_5 + CAST('5555555' AS DECIMAL(38, 0)) AS comp_6 + CAST('FOOBAR' AS DECIMAL(38, 0)) AS comp_7 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(19, 0),`comp_1` DECIMAL(19, 0),`comp_2` DECIMAL(19, 0),`comp_3` DECIMAL(19, 0),`comp_4` DECIMAL(19, 0),`comp_5` DECIMAL(19, 0),`comp_6` DECIMAL(19, 0),`comp_7` DECIMAL(19, 0)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0, 1, 2, 333, 44, 5555555, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0, 1, 2, 333, 44, 5555555, null], after=[-1, null, 0, 1, 2, 333, 44, 5555555, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0, 1, 2, 333, 44, 5555555, null], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0, 1, 2, 333, 44, 5555555, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0, 1, 2, 333, 44, 5555555, null], after=[], op=DELETE, meta=()} +- do: Cast To Decimal(38, 0) (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + CAST(null AS DECIMAL(38, 0)) AS comp_0 + CAST(0 AS DECIMAL(38, 0)) AS comp_1 + CAST(1 AS DECIMAL(38, 0)) AS comp_2 + CAST('2.22' AS DECIMAL(38, 0)) AS comp_3 + CAST('333' AS DECIMAL(38, 0)) AS comp_4 + CAST('44.44' AS DECIMAL(38, 0)) AS comp_5 + CAST('5555555' AS DECIMAL(38, 0)) AS comp_6 + CAST('FOOBAR' AS DECIMAL(38, 0)) AS comp_7 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(38, 0),`comp_1` DECIMAL(38, 0),`comp_2` DECIMAL(38, 0),`comp_3` DECIMAL(38, 0),`comp_4` DECIMAL(38, 0),`comp_5` DECIMAL(38, 0),`comp_6` DECIMAL(38, 0),`comp_7` DECIMAL(38, 0)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0, 1, 2, 333, 44, 5555555, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0, 1, 2, 333, 44, 5555555, null], after=[-1, null, 0, 1, 2, 333, 44, 5555555, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0, 1, 2, 333, 44, 5555555, null], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0, 1, 2, 333, 44, 5555555, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0, 1, 2, 333, 44, 5555555, null], after=[], op=DELETE, meta=()} +- do: Cast To Decimal(38, 10) (Decimal-19 mode) + decimal-precision-mode: up_to_19 + projection: |- + id_ + CAST(null AS DECIMAL(38, 10)) AS comp_0 + CAST(0 AS DECIMAL(38, 10)) AS comp_1 + CAST(1 AS DECIMAL(38, 10)) AS comp_2 + CAST('2.22' AS DECIMAL(38, 10)) AS comp_3 + CAST('333' AS DECIMAL(38, 10)) AS comp_4 + CAST('44.44' AS DECIMAL(38, 10)) AS comp_5 + CAST('5555555' AS DECIMAL(38, 10)) AS comp_6 + CAST('FOOBAR' AS DECIMAL(38, 10)) AS comp_7 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(19, 10),`comp_1` DECIMAL(19, 10),`comp_2` DECIMAL(19, 10),`comp_3` DECIMAL(19, 10),`comp_4` DECIMAL(19, 10),`comp_5` DECIMAL(19, 10),`comp_6` DECIMAL(19, 10),`comp_7` DECIMAL(19, 10)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[-1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[], op=DELETE, meta=()} +- do: Cast To Decimal(38, 10) (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + CAST(null AS DECIMAL(38, 10)) AS comp_0 + CAST(0 AS DECIMAL(38, 10)) AS comp_1 + CAST(1 AS DECIMAL(38, 10)) AS comp_2 + CAST('2.22' AS DECIMAL(38, 10)) AS comp_3 + CAST('333' AS DECIMAL(38, 10)) AS comp_4 + CAST('44.44' AS DECIMAL(38, 10)) AS comp_5 + CAST('5555555' AS DECIMAL(38, 10)) AS comp_6 + CAST('FOOBAR' AS DECIMAL(38, 10)) AS comp_7 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(38, 10),`comp_1` DECIMAL(38, 10),`comp_2` DECIMAL(38, 10),`comp_3` DECIMAL(38, 10),`comp_4` DECIMAL(38, 10),`comp_5` DECIMAL(38, 10),`comp_6` DECIMAL(38, 10),`comp_7` DECIMAL(38, 10)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[-1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.0000000000, 1.0000000000, 2.2200000000, 333.0000000000, 44.4400000000, 5555555.0000000000, null], after=[], op=DELETE, meta=()} +- do: Cast To Decimal(38, 18) (Decimal-19 mode) + decimal-precision-mode: up_to_19 + projection: |- + id_ + CAST(null AS DECIMAL(38, 18)) AS comp_0 + CAST(0 AS DECIMAL(38, 18)) AS comp_1 + CAST(1 AS DECIMAL(38, 18)) AS comp_2 + CAST('2.22' AS DECIMAL(38, 18)) AS comp_3 + CAST('333' AS DECIMAL(38, 18)) AS comp_4 + CAST('44.44' AS DECIMAL(38, 18)) AS comp_5 + CAST('5555555' AS DECIMAL(38, 18)) AS comp_6 + CAST('FOOBAR' AS DECIMAL(38, 18)) AS comp_7 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(19, 18),`comp_1` DECIMAL(19, 18),`comp_2` DECIMAL(19, 18),`comp_3` DECIMAL(19, 18),`comp_4` DECIMAL(19, 18),`comp_5` DECIMAL(19, 18),`comp_6` DECIMAL(19, 18),`comp_7` DECIMAL(19, 18)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, null, null, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, null, null, null, null], after=[-1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, null, null, null, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, null, null, null, null], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, null, null, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, null, null, null, null], after=[], op=DELETE, meta=()} +- do: Cast To Decimal(38, 18) (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + CAST(null AS DECIMAL(38, 18)) AS comp_0 + CAST(0 AS DECIMAL(38, 18)) AS comp_1 + CAST(1 AS DECIMAL(38, 18)) AS comp_2 + CAST('2.22' AS DECIMAL(38, 18)) AS comp_3 + CAST('333' AS DECIMAL(38, 18)) AS comp_4 + CAST('44.44' AS DECIMAL(38, 18)) AS comp_5 + CAST('5555555' AS DECIMAL(38, 18)) AS comp_6 + CAST('FOOBAR' AS DECIMAL(38, 18)) AS comp_7 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_0` DECIMAL(38, 18),`comp_1` DECIMAL(38, 18),`comp_2` DECIMAL(38, 18),`comp_3` DECIMAL(38, 18),`comp_4` DECIMAL(38, 18),`comp_5` DECIMAL(38, 18),`comp_6` DECIMAL(38, 18),`comp_7` DECIMAL(38, 18)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], after=[-1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, 0.000000000000000000, 1.000000000000000000, 2.220000000000000000, 333.000000000000000000, 44.440000000000000000, 5555555.000000000000000000, null], after=[], op=DELETE, meta=()} - do: Cast To Timestamp (UTC) projection: |- id_ diff --git a/flink-cdc-composer/src/test/resources/specs/decimal.yaml b/flink-cdc-composer/src/test/resources/specs/decimal.yaml index f1418dd32c5..74c1230c706 100644 --- a/flink-cdc-composer/src/test/resources/specs/decimal.yaml +++ b/flink-cdc-composer/src/test/resources/specs/decimal.yaml @@ -13,7 +13,8 @@ # limitations under the License. ################################################################################ -- do: Add Op +- do: Add Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 projection: |- id_ decimal_10_0_ + CAST(1 AS DECIMAL(1, 0)) AS comp_1 @@ -25,7 +26,23 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567891, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567891, null], after=[-1, -9876543209, null], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543209, null], after=[], op=DELETE, meta=()} -- do: Subtract Op + +- do: Add Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + decimal_10_0_ + CAST(1 AS DECIMAL(1, 0)) AS comp_1 + decimal_20_2_ + CAST(1 AS DECIMAL(1, 0)) AS comp_2 + non-null: 'true' + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(21, 2)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567891, 123456789012345679.90], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567891, 123456789012345679.90], after=[-1, -9876543209, -987654321098765431.10], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543209, -987654321098765431.10], after=[], op=DELETE, meta=()} + +- do: Subtract Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 projection: |- id_ decimal_10_0_ - CAST(1 AS DECIMAL(1, 0)) AS comp_1 @@ -37,31 +54,83 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567889, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567889, null], after=[-1, -9876543211, null], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543211, null], after=[], op=DELETE, meta=()} -- do: Multiply Op + +- do: Subtract Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 projection: |- id_ + decimal_10_0_ - CAST(1 AS DECIMAL(1, 0)) AS comp_1 + decimal_20_2_ - CAST(1 AS DECIMAL(1, 0)) AS comp_2 + primary-key: id_ + non-null: 'true' + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(21, 2)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567889, 123456789012345677.90], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567889, 123456789012345677.90], after=[-1, -9876543211, -987654321098765433.10], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543211, -987654321098765433.10], after=[], op=DELETE, meta=()} + +- do: Multiply Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 + projection: |- + id_ + decimal_10_0_, decimal_20_2_ decimal_10_0_ * CAST(2 AS DECIMAL(1, 0)) AS comp_1 decimal_20_2_ * CAST(2 AS DECIMAL(1, 0)) AS comp_2 primary-key: id_ non-null: 'true' expect: |- - CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(19, 2)}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 2469135780, null], op=INSERT, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, 2469135780, null], after=[-1, -19753086420, null], op=UPDATE, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[-1, -19753086420, null], after=[], op=DELETE, meta=()} -- do: Divide Op + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(19, 2)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 2469135780, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 2469135780, null], after=[-1, -9876543210, -987654321098765432.10, -19753086420, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -19753086420, null], after=[], op=DELETE, meta=()} + +- do: Multiply Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 projection: |- id_ + decimal_10_0_, decimal_20_2_ + decimal_10_0_ * CAST(2 AS DECIMAL(1, 0)) AS comp_1 + decimal_20_2_ * CAST(2 AS DECIMAL(1, 0)) AS comp_2 + primary-key: id_ + non-null: 'true' + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(21, 2)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 2469135780, 246913578024691357.80], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 2469135780, 246913578024691357.80], after=[-1, -9876543210, -987654321098765432.10, -19753086420, -1975308642197530864.20], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -19753086420, -1975308642197530864.20], after=[], op=DELETE, meta=()} + +- do: Divide Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 + projection: |- + id_ + decimal_10_0_, decimal_20_2_ + decimal_10_0_ / CAST(2 AS DECIMAL(1, 0)) AS comp_1 + decimal_20_2_ / CAST(2 AS DECIMAL(1, 0)) AS comp_2 + primary-key: id_ + non-null: 'true' + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(16, 6),`comp_2` DECIMAL(19, 2)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 617283945.000000, 61728394506172839.45], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 617283945.000000, 61728394506172839.45], after=[-1, -9876543210, -987654321098765432.10, -4938271605.000000, null], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -4938271605.000000, null], after=[], op=DELETE, meta=()} + +- do: Divide Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + decimal_10_0_, decimal_20_2_ decimal_10_0_ / CAST(2 AS DECIMAL(1, 0)) AS comp_1 decimal_20_2_ / CAST(2 AS DECIMAL(1, 0)) AS comp_2 primary-key: id_ non-null: 'true' expect: |- - CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(16, 6),`comp_2` DECIMAL(19, 2)}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 617.283945, 61728394506172839.45], op=INSERT, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, 617.283945, 61728394506172839.45], after=[-1, -4938.271605, null], op=UPDATE, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[-1, -4938.271605, null], after=[], op=DELETE, meta=()} -- do: Abs Op + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(16, 6),`comp_2` DECIMAL(24, 6)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 617283945.000000, 61728394506172839.450000], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 617283945.000000, 61728394506172839.450000], after=[-1, -9876543210, -987654321098765432.10, -4938271605.000000, -493827160549382716.050000], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -4938271605.000000, -493827160549382716.050000], after=[], op=DELETE, meta=()} + +- do: Abs Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 projection: |- id_ ABS(decimal_10_0_) AS comp_1 @@ -74,7 +143,24 @@ DataChangeEvent{tableId=foo.bar.baz, before=[-1, 9876543210, null], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} -- do: Ceil Op + +- do: Abs Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + ABS(decimal_10_0_) AS comp_1 + ABS(decimal_20_2_) AS comp_2 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(20, 2)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90], after=[-1, 9876543210, 987654321098765432.10], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, 9876543210, 987654321098765432.10], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} + +- do: Ceil Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 projection: |- id_ CEIL(decimal_10_0_) AS comp_1 @@ -87,7 +173,24 @@ DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} -- do: Floor Op + +- do: Ceil Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + CEIL(decimal_10_0_) AS comp_1 + CEIL(decimal_20_2_) AS comp_2 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(20, 0)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345679], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345679], after=[-1, -9876543210, -987654321098765432], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} + +- do: Floor Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 projection: |- id_ FLOOR(decimal_10_0_) AS comp_1 @@ -100,16 +203,50 @@ DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765433], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} -- do: Round Op + +- do: Floor Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 projection: |- id_ - ROUND(decimal_10_0_, 1) AS comp_1 - ROUND(decimal_20_2_, 1) AS comp_2 + FLOOR(decimal_10_0_) AS comp_1 + FLOOR(decimal_20_2_) AS comp_2 primary-key: id_ expect: |- - CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(19, 1)}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 12345678900, 123456789012345678.9], op=INSERT, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, 12345678900, 123456789012345678.9], after=[-1, -98765432100, -987654321098765432.1], op=UPDATE, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[-1, -98765432100, -987654321098765432.1], after=[], op=DELETE, meta=()} + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(20, 0)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678], after=[-1, -9876543210, -987654321098765433], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765433], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} + +- do: Round Op (Decimal-19 mode) + decimal-precision-mode: up_to_19 + projection: |- + id_ + decimal_10_0_, decimal_20_2_ + ROUND(decimal_10_0_, 1) AS comp_1 + ROUND(decimal_20_2_, 1) AS comp_2 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(19, 1)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 1234567890, 123456789012345678.9], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 1234567890, 123456789012345678.9], after=[-1, -9876543210, -987654321098765432.10, -9876543210, -987654321098765432.1], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -9876543210, -987654321098765432.1], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null], after=[], op=DELETE, meta=()} + +- do: Round Op (Decimal-38 mode) + decimal-precision-mode: up_to_38 + projection: |- + id_ + decimal_10_0_, decimal_20_2_ + ROUND(decimal_10_0_, 1) AS comp_1 + ROUND(decimal_20_2_, 1) AS comp_2 + primary-key: id_ + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(20, 1)}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 1234567890, 123456789012345678.9], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 1234567890, 123456789012345678.9], after=[-1, -9876543210, -987654321098765432.10, -9876543210, -987654321098765432.1], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -9876543210, -987654321098765432.1], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null], after=[], op=DELETE, meta=()} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 0a8b63703b9..0bdfadef28c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; @@ -71,6 +72,7 @@ public class PostTransformOperator extends AbstractStreamOperatorAdapter private static final long serialVersionUID = 1L; private final String timezone; + private final DecimalPrecisionMode decimalPrecisionMode; private final List transformRules; private final Map hasAsteriskMap; private final Map> projectedColumnsMap; @@ -98,8 +100,10 @@ public static PostTransformOperatorBuilder newBuilder() { PostTransformOperator( List transformRules, String timezone, + DecimalPrecisionMode decimalPrecisionMode, List>> udfFunctions) { this.timezone = timezone; + this.decimalPrecisionMode = decimalPrecisionMode; this.transformRules = transformRules; this.hasAsteriskMap = new HashMap<>(); this.projectedColumnsMap = new HashMap<>(); @@ -368,7 +372,8 @@ private Schema transformSchema(Schema preSchema, PostTransformer transformer) { .orElse(null), preSchema.getColumns(), udfDescriptors, - transformer.getSupportedMetadataColumns()); + transformer.getSupportedMetadataColumns(), + decimalPrecisionMode); return preSchema.copy( projectionColumns.stream() .map(ProjectionColumn::getColumn) @@ -445,6 +450,7 @@ private TransformProjectionProcessor getProjectionProcessor( .map(TransformProjection::getProjection) .orElse(null), timezone, + decimalPrecisionMode, udfDescriptors, udfFunctionInstances, postTransformer.getSupportedMetadataColumns())); @@ -460,7 +466,10 @@ private TransformFilterProcessor getFilterProcessor( TableId tableId, PostTransformer postTransformer) { if (!filterProcessors.contains(tableId, postTransformer)) { if (!postTransformer.getFilter().isPresent()) { - filterProcessors.put(tableId, postTransformer, TransformFilterProcessor.ofNoOp()); + filterProcessors.put( + tableId, + postTransformer, + TransformFilterProcessor.ofNoOp(decimalPrecisionMode)); } else { PostTransformChangeInfo changeInfo = postTransformInfoMap.get(tableId); filterProcessors.put( @@ -470,6 +479,7 @@ private TransformFilterProcessor getFilterProcessor( changeInfo, postTransformer.getFilter().orElse(null), timezone, + decimalPrecisionMode, udfDescriptors, udfFunctionInstances, postTransformer.getSupportedMetadataColumns())); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java index 380d9343a74..748e77865f2 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; @@ -32,6 +33,7 @@ public class PostTransformOperatorBuilder { private final List transformRules = new ArrayList<>(); private String timezone; + private DecimalPrecisionMode decimalPrecisionMode = DecimalPrecisionMode.UP_TO_19; private final List>> udfFunctions = new ArrayList<>(); @@ -105,6 +107,12 @@ public PostTransformOperatorBuilder addTimezone(String timezone) { return this; } + public PostTransformOperatorBuilder addDecimalPrecisionMode( + DecimalPrecisionMode decimalPrecisionMode) { + this.decimalPrecisionMode = decimalPrecisionMode; + return this; + } + public PostTransformOperatorBuilder addUdfFunctions( List>> udfFunctions) { this.udfFunctions.addAll(udfFunctions); @@ -112,6 +120,7 @@ public PostTransformOperatorBuilder addUdfFunctions( } public PostTransformOperator build() { - return new PostTransformOperator(transformRules, timezone, udfFunctions); + return new PostTransformOperator( + transformRules, timezone, decimalPrecisionMode, udfFunctions); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 77dc5cfe6ef..a58ae47f339 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.converter.JavaClassConverter; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; @@ -45,6 +46,7 @@ public class TransformFilterProcessor { private final PostTransformChangeInfo tableInfo; private final TransformFilter transformFilter; private final String timezone; + private final DecimalPrecisionMode decimalPrecisionMode; private final List udfFunctionInstances; private final Map supportedMetadataColumns; @@ -56,6 +58,7 @@ protected TransformFilterProcessor( PostTransformChangeInfo tableInfo, TransformFilter transformFilter, String timezone, + DecimalPrecisionMode decimalPrecisionMode, List udfDescriptors, List udfFunctionInstances, Map supportedMetadataColumns) { @@ -63,6 +66,7 @@ protected TransformFilterProcessor( this.tableInfo = tableInfo; this.transformFilter = transformFilter; this.timezone = timezone; + this.decimalPrecisionMode = decimalPrecisionMode; this.udfFunctionInstances = udfFunctionInstances; this.supportedMetadataColumns = supportedMetadataColumns; @@ -83,14 +87,16 @@ protected TransformFilterProcessor( } } - public static TransformFilterProcessor ofNoOp() { - return new TransformFilterProcessor(true, null, null, null, null, null, null); + public static TransformFilterProcessor ofNoOp(DecimalPrecisionMode decimalPrecisionMode) { + return new TransformFilterProcessor( + true, null, null, null, decimalPrecisionMode, null, null, null); } public static TransformFilterProcessor of( PostTransformChangeInfo tableInfo, TransformFilter transformFilter, String timezone, + DecimalPrecisionMode decimalPrecisionMode, List udfDescriptors, List udfFunctionInstances, SupportedMetadataColumn[] supportedMetadataColumns) { @@ -104,6 +110,7 @@ public static TransformFilterProcessor of( tableInfo, transformFilter, timezone, + decimalPrecisionMode, udfDescriptors, udfFunctionInstances, supportedMetadataColumnsMap); @@ -228,7 +235,8 @@ private TransformExpressionKey generateTransformExpressionKey( columns, udfDescriptors, supportedMetadataColumns, - transformFilter.getColumnNameMap()); + transformFilter.getColumnNameMap(), + decimalPrecisionMode); return TransformExpressionKey.of( transformFilter.getExpression(), diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index 231e72875ac..e423822fd9c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.runtime.parser.TransformParser; @@ -47,6 +48,7 @@ public class TransformProjectionProcessor { private final PostTransformChangeInfo changeInfo; private final String projectionExpression; private final String timezone; + private final DecimalPrecisionMode decimalPrecisionMode; private final List udfDescriptors; private final List udfFunctionInstances; private final List columnProcessors; @@ -57,12 +59,14 @@ public TransformProjectionProcessor( PostTransformChangeInfo changeInfo, String projectionExpression, String timezone, + DecimalPrecisionMode decimalPrecisionMode, List udfDescriptors, List udfFunctionInstances, SupportedMetadataColumn[] supportedMetadataColumns) { this.changeInfo = changeInfo; this.projectionExpression = projectionExpression; this.timezone = timezone; + this.decimalPrecisionMode = decimalPrecisionMode; this.udfDescriptors = udfDescriptors; this.udfFunctionInstances = udfFunctionInstances; this.supportedMetadataColumns = supportedMetadataColumns; @@ -93,7 +97,8 @@ private List createProjectionColumnProcessors() { projectionExpression, changeInfo.getPreTransformedSchema().getColumns(), udfDescriptors, - supportedMetadataColumns); + supportedMetadataColumns, + decimalPrecisionMode); List columnProcessors = projectionColumns.stream() diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/FlinkCdcTypeSystem.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/FlinkCdcTypeSystem.java new file mode 100644 index 00000000000..8bc0f8aae4e --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/FlinkCdcTypeSystem.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.parser; + +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; + +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** A customized version of {@link org.apache.calcite.rel.type.RelDataTypeSystem}. */ +public class FlinkCdcTypeSystem extends RelDataTypeSystemImpl { + + public static final FlinkCdcTypeSystem UP_TO_38 = new FlinkCdcTypeSystem(38); + public static final FlinkCdcTypeSystem UP_TO_19 = new FlinkCdcTypeSystem(19); + + private final int maxPrecision; + + private FlinkCdcTypeSystem(int maxPrecision) { + this.maxPrecision = maxPrecision; + } + + public static FlinkCdcTypeSystem of(DecimalPrecisionMode mode) { + switch (mode) { + case UP_TO_38: + return UP_TO_38; + case UP_TO_19: + return UP_TO_19; + default: + throw new IllegalArgumentException("Unexpected decimal precision mode: " + mode); + } + } + + @Override + public int getMaxNumericPrecision() { + return maxPrecision; + } + + @Override + public int getMaxNumericScale() { + return maxPrecision; + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 14c483d7082..3a7bfce1004 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.io.ParseException; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.converter.JavaClassConverter; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataType; @@ -341,7 +342,8 @@ private static Java.Rvalue generateBinaryOperation( context.columns, sqlBasicCall, context.udfDescriptors, - context.supportedMetadataColumns); + context.supportedMetadataColumns, + context.decimalPrecisionMode); if (resultType.is(DataTypeRoot.DECIMAL)) { return new Java.MethodInvocation(Location.NOWHERE, null, handler, atoms); } @@ -490,7 +492,8 @@ private static Java.Rvalue generateItemAccessOperation( context.columns, sqlBasicCall, context.udfDescriptors, - context.supportedMetadataColumns); + context.supportedMetadataColumns, + context.decimalPrecisionMode); // Get the Java class for the result type and add a cast // Use getCanonicalName() to correctly handle array types (e.g., byte[] instead of "[B") @@ -678,23 +681,34 @@ public static class Context { // Readable metadata columns public final SupportedMetadataColumn[] supportedMetadataColumns; + // Maximum precision mode for DECIMAL type evaluation + public final DecimalPrecisionMode decimalPrecisionMode; + private Context( List columns, Map columnNameMap, List udfDescriptors, - SupportedMetadataColumn[] supportedMetadataColumns) { + SupportedMetadataColumn[] supportedMetadataColumns, + DecimalPrecisionMode decimalPrecisionMode) { this.columns = columns; this.columnNameMap = columnNameMap; this.udfDescriptors = udfDescriptors; this.supportedMetadataColumns = supportedMetadataColumns; + this.decimalPrecisionMode = decimalPrecisionMode; } public static Context of( List columns, Map columnNameMap, List udfDescriptors, - SupportedMetadataColumn[] supportedMetadataColumns) { - return new Context(columns, columnNameMap, udfDescriptors, supportedMetadataColumns); + SupportedMetadataColumn[] supportedMetadataColumns, + DecimalPrecisionMode decimalPrecisionMode) { + return new Context( + columns, + columnNameMap, + udfDescriptors, + supportedMetadataColumns, + decimalPrecisionMode); } } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 5ee7153013c..dc9953c3660 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.parser; import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataType; @@ -110,7 +111,8 @@ private static RelNode sqlToRel( List columns, SqlNode sqlNode, List udfDescriptors, - SupportedMetadataColumn[] supportedMetadataColumns) { + SupportedMetadataColumn[] supportedMetadataColumns, + DecimalPrecisionMode decimalPrecisionMode) { List columnsWithMetadata = copyFillMetadataColumn(columns, supportedMetadataColumns); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); @@ -154,7 +156,8 @@ private static RelNode sqlToRel( throw new RuntimeException("Failed to resolve UDF: " + udf, e); } } - SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeSystem typeSystem = FlinkCdcTypeSystem.of(decimalPrecisionMode); + SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(typeSystem); CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader( rootSchema, @@ -275,7 +278,8 @@ public static List generateProjectionColumns( String projectionExpression, List columns, List udfDescriptors, - SupportedMetadataColumn[] supportedMetadataColumns) { + SupportedMetadataColumn[] supportedMetadataColumns, + DecimalPrecisionMode decimalPrecisionMode) { if (isNullOrWhitespaceOnly(projectionExpression)) { return new ArrayList<>(); } @@ -285,7 +289,13 @@ public static List generateProjectionColumns( } expandWildcard(sqlSelect, columns); - RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, supportedMetadataColumns); + RelNode relNode = + sqlToRel( + columns, + sqlSelect, + udfDescriptors, + supportedMetadataColumns, + decimalPrecisionMode); RelDataType[] relDataTypes = relNode.getRowType().getFieldList().stream() .map(RelDataTypeField::getType) @@ -352,7 +362,8 @@ public static List generateProjectionColumns( columns, columnNameMap, udfDescriptors, - supportedMetadataColumns), + supportedMetadataColumns, + decimalPrecisionMode), exprNode), originalColumnNames, columnNameMap); @@ -431,7 +442,8 @@ public static String translateFilterExpressionToJaninoExpression( List columns, List udfDescriptors, SupportedMetadataColumn[] supportedMetadataColumns, - Map columnNameMap) { + Map columnNameMap, + DecimalPrecisionMode decimalPrecisionMode) { if (isNullOrWhitespaceOnly(filterExpression)) { return ""; } @@ -442,7 +454,11 @@ public static String translateFilterExpressionToJaninoExpression( SqlNode where = sqlSelect.getWhere(); return JaninoCompiler.translateSqlNodeToJaninoExpression( JaninoCompiler.Context.of( - columns, columnNameMap, udfDescriptors, supportedMetadataColumns), + columns, + columnNameMap, + udfDescriptors, + supportedMetadataColumns, + decimalPrecisionMode), where); } @@ -611,7 +627,8 @@ public static DataType deduceSubExpressionType( List columns, SqlNode subExpression, List udfDescriptors, - SupportedMetadataColumn[] supportedMetadataColumns) { + SupportedMetadataColumn[] supportedMetadataColumns, + DecimalPrecisionMode decimalPrecisionMode) { SqlSelect sqlSelect = new SqlSelect( SqlParserPos.QUOTED_ZERO, @@ -626,7 +643,13 @@ public static DataType deduceSubExpressionType( null, null, null); - RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, supportedMetadataColumns); + RelNode relNode = + sqlToRel( + columns, + sqlSelect, + udfDescriptors, + supportedMetadataColumns, + decimalPrecisionMode); RelDataType[] relDataTypes = relNode.getRowType().getFieldList().stream() .map(RelDataTypeField::getType) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index a522f4f345c..d2b6480172e 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.parser; import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.cdc.common.pipeline.DecimalPrecisionMode; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; @@ -31,7 +32,6 @@ import org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlSelect; @@ -40,6 +40,8 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Arrays; import java.util.Collections; @@ -69,8 +71,9 @@ void testCalciteParser() { Assertions.assertThat(parse.getWhere()).hasToString("`uniq_id` > 10 AND `id` IS NOT NULL"); } - @Test - void testTransformCalciteValidate() { + @ParameterizedTest(name = "Decimal mode: {0}") + @EnumSource + void testTransformCalciteValidate(DecimalPrecisionMode mode) { SqlSelect parse = TransformParser.parseSelect( "select SUBSTR(id, 1) as uniq_id, * from tb where id is not null"); @@ -83,7 +86,7 @@ void testTransformCalciteValidate() { TransformSchemaFactory.INSTANCE.create( rootSchema.plus(), "default_schema", operand); rootSchema.add("default_schema", schema); - SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(FlinkCdcTypeSystem.of(mode)); CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader( rootSchema, @@ -111,8 +114,9 @@ void testTransformCalciteValidate() { + "WHERE `tb`.`id` IS NOT NULL"); } - @Test - void testCalciteRelNode() { + @ParameterizedTest(name = "Decimal mode: {0}") + @EnumSource + void testCalciteRelNode(DecimalPrecisionMode mode) { SqlSelect parse = TransformParser.parseSelect( "select SUBSTR(id, 1) as uniq_id, * from tb where id is not null"); @@ -125,7 +129,7 @@ void testCalciteRelNode() { TransformSchemaFactory.INSTANCE.create( rootSchema.plus(), "default_schema", operand); rootSchema.add("default_schema", schema); - SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(FlinkCdcTypeSystem.of(mode)); CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader( rootSchema, @@ -170,229 +174,294 @@ void testParseFilterColumnNameList() { .isEqualTo(new String[] {"uniq_id", "id"}); } - @Test - void testTranslateFilterToJaninoExpression() { - testFilterExpression("id is not null", "null != id"); - testFilterExpression("id is null", "null == id"); - testFilterExpression("id = 1 and uid = 2", "valueEquals(id, 1) && valueEquals(uid, 2)"); - testFilterExpression("id = 1 or id = 2", "valueEquals(id, 1) || valueEquals(id, 2)"); - testFilterExpression("not (id = 1)", "!valueEquals(id, 1)"); - testFilterExpression("id = '1'", "valueEquals(id, \"1\")"); - testFilterExpression("id <> '1'", "!valueEquals(id, \"1\")"); - testFilterExpression("d between d1 and d2", "betweenAsymmetric(d, d1, d2)"); - testFilterExpression("d not between d1 and d2", "notBetweenAsymmetric(d, d1, d2)"); - testFilterExpression("d in (d1, d2)", "in(d, d1, d2)"); - testFilterExpression("d not in (d1, d2)", "notIn(d, d1, d2)"); - testFilterExpression("id is false", "false == id"); - testFilterExpression("id is not false", "true == id"); - testFilterExpression("id is true", "true == id"); - testFilterExpression("id is not true", "false == id"); - testFilterExpression("a || b", "concat(a, b)"); - testFilterExpression("CHAR_LENGTH(id)", "charLength(id)"); - testFilterExpression("trim(id)", "trim(\"BOTH\", \" \", id)"); - testFilterExpression( - "REGEXP_REPLACE(id, '[a-zA-Z]', '')", "regexpReplace(id, \"[a-zA-Z]\", \"\")"); - testFilterExpression("upper(id)", "upper(id)"); - testFilterExpression("lower(id)", "lower(id)"); - testFilterExpression("concat(a,b)", "concat(a, b)"); - testFilterExpression("SUBSTR(a,1)", "substr(a, 1)"); - testFilterExpression("id like '^[a-zA-Z]'", "like(id, \"^[a-zA-Z]\")"); - testFilterExpression("id not like '^[a-zA-Z]'", "notLike(id, \"^[a-zA-Z]\")"); - testFilterExpression("abs(2)", "abs(2)"); - testFilterExpression("ceil(2)", "ceil(2)"); - testFilterExpression("ceiling(2)", "ceil(2)"); - testFilterExpression("floor(2)", "floor(2)"); - testFilterExpression("round(2,2)", "round(2, 2)"); - testFilterExpression("uuid()", "uuid()"); - testFilterExpression( - "id = LOCALTIME", "valueEquals(id, localtime(__epoch_time__, __time_zone__))"); + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + void testTranslateFilterToJaninoExpression(DecimalPrecisionMode mode) { + testFilterExpression(mode, "id is not null", "null != id"); + testFilterExpression(mode, "id is null", "null == id"); + testFilterExpression( + mode, "id = 1 and uid = 2", "valueEquals(id, 1) && valueEquals(uid, 2)"); + testFilterExpression(mode, "id = 1 or id = 2", "valueEquals(id, 1) || valueEquals(id, 2)"); + testFilterExpression(mode, "not (id = 1)", "!valueEquals(id, 1)"); + testFilterExpression(mode, "id = '1'", "valueEquals(id, \"1\")"); + testFilterExpression(mode, "id <> '1'", "!valueEquals(id, \"1\")"); + testFilterExpression(mode, "d between d1 and d2", "betweenAsymmetric(d, d1, d2)"); + testFilterExpression(mode, "d not between d1 and d2", "notBetweenAsymmetric(d, d1, d2)"); + testFilterExpression(mode, "d in (d1, d2)", "in(d, d1, d2)"); + testFilterExpression(mode, "d not in (d1, d2)", "notIn(d, d1, d2)"); + testFilterExpression(mode, "id is false", "false == id"); + testFilterExpression(mode, "id is not false", "true == id"); + testFilterExpression(mode, "id is true", "true == id"); + testFilterExpression(mode, "id is not true", "false == id"); + testFilterExpression(mode, "a || b", "concat(a, b)"); + testFilterExpression(mode, "CHAR_LENGTH(id)", "charLength(id)"); + testFilterExpression(mode, "trim(id)", "trim(\"BOTH\", \" \", id)"); testFilterExpression( + mode, + "REGEXP_REPLACE(id, '[a-zA-Z]', '')", + "regexpReplace(id, \"[a-zA-Z]\", \"\")"); + testFilterExpression(mode, "upper(id)", "upper(id)"); + testFilterExpression(mode, "lower(id)", "lower(id)"); + testFilterExpression(mode, "concat(a,b)", "concat(a, b)"); + testFilterExpression(mode, "SUBSTR(a,1)", "substr(a, 1)"); + testFilterExpression(mode, "id like '^[a-zA-Z]'", "like(id, \"^[a-zA-Z]\")"); + testFilterExpression(mode, "id not like '^[a-zA-Z]'", "notLike(id, \"^[a-zA-Z]\")"); + testFilterExpression(mode, "abs(2)", "abs(2)"); + testFilterExpression(mode, "ceil(2)", "ceil(2)"); + testFilterExpression(mode, "ceiling(2)", "ceil(2)"); + testFilterExpression(mode, "floor(2)", "floor(2)"); + testFilterExpression(mode, "round(2,2)", "round(2, 2)"); + testFilterExpression(mode, "uuid()", "uuid()"); + testFilterExpression( + mode, + "id = LOCALTIME", + "valueEquals(id, localtime(__epoch_time__, __time_zone__))"); + testFilterExpression( + mode, "id = LOCALTIMESTAMP", "valueEquals(id, localtimestamp(__epoch_time__, __time_zone__))"); testFilterExpression( - "id = CURRENT_TIME", "valueEquals(id, currentTime(__epoch_time__, __time_zone__))"); + mode, + "id = CURRENT_TIME", + "valueEquals(id, currentTime(__epoch_time__, __time_zone__))"); testFilterExpression( - "id = CURRENT_DATE", "valueEquals(id, currentDate(__epoch_time__, __time_zone__))"); + mode, + "id = CURRENT_DATE", + "valueEquals(id, currentDate(__epoch_time__, __time_zone__))"); testFilterExpression( - "id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))"); - testFilterExpression("NOW()", "now(__epoch_time__)"); - testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)"); + mode, + "id = CURRENT_TIMESTAMP", + "valueEquals(id, currentTimestamp(__epoch_time__))"); + testFilterExpression(mode, "NOW()", "now(__epoch_time__)"); + testFilterExpression(mode, "FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)"); testFilterExpression( + mode, "FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')", "fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)"); - testFilterExpression("UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)"); testFilterExpression( + mode, "UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)"); + testFilterExpression( + mode, "UNIX_TIMESTAMP('1970-01-01 08:00:01')", "unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, __time_zone__)"); testFilterExpression( + mode, "UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X')", "unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)"); - testFilterExpression("YEAR(dt)", "year(dt)"); - testFilterExpression("QUARTER(dt)", "quarter(dt)"); - testFilterExpression("MONTH(dt)", "month(dt)"); - testFilterExpression("WEEK(dt)", "week(dt)"); + testFilterExpression(mode, "YEAR(dt)", "year(dt)"); + testFilterExpression(mode, "QUARTER(dt)", "quarter(dt)"); + testFilterExpression(mode, "MONTH(dt)", "month(dt)"); + testFilterExpression(mode, "WEEK(dt)", "week(dt)"); testFilterExpression( - "DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt, \"yyyy-MM-dd\", __time_zone__)"); - testFilterExpression("TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\")"); - testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)"); + mode, + "DATE_FORMAT(dt,'yyyy-MM-dd')", + "dateFormat(dt, \"yyyy-MM-dd\", __time_zone__)"); + testFilterExpression(mode, "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\")"); + testFilterExpression(mode, "TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMP_DIFF('SECOND', dt1, dt2)", "timestampDiff(\"SECOND\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestamp_diff('second', dt1, dt2)", "timestampDiff(\"second\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMP_DIFF('MINUTE', dt1, dt2)", "timestampDiff(\"MINUTE\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestamp_diff('minute', dt1, dt2)", "timestampDiff(\"minute\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMP_DIFF('HOUR', dt1, dt2)", "timestampDiff(\"HOUR\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestamp_diff('hour', dt1, dt2)", "timestampDiff(\"hour\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestamp_diff('day', dt1, dt2)", "timestampDiff(\"day\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMP_DIFF('MONTH', dt1, dt2)", "timestampDiff(\"MONTH\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestamp_diff('month', dt1, dt2)", "timestampDiff(\"month\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMP_DIFF('YEAR', dt1, dt2)", "timestampDiff(\"YEAR\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestamp_diff('year', dt1, dt2)", "timestampDiff(\"year\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMPDIFF(SECOND, dt1, dt2)", "timestampdiff(\"SECOND\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestampdiff(second, dt1, dt2)", "timestampdiff(\"SECOND\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMPDIFF(MINUTE, dt1, dt2)", "timestampdiff(\"MINUTE\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestampdiff(minute, dt1, dt2)", "timestampdiff(\"MINUTE\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMPDIFF(HOUR, dt1, dt2)", "timestampdiff(\"HOUR\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestampdiff(hour, dt1, dt2)", "timestampdiff(\"HOUR\", dt1, dt2, __time_zone__)"); testFilterExpression( - "TIMESTAMPDIFF(DAY, dt1, dt2)", "timestampdiff(\"DAY\", dt1, dt2, __time_zone__)"); + mode, + "TIMESTAMPDIFF(DAY, dt1, dt2)", + "timestampdiff(\"DAY\", dt1, dt2, __time_zone__)"); testFilterExpression( - "timestampdiff(day, dt1, dt2)", "timestampdiff(\"DAY\", dt1, dt2, __time_zone__)"); + mode, + "timestampdiff(day, dt1, dt2)", + "timestampdiff(\"DAY\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMPDIFF(MONTH, dt1, dt2)", "timestampdiff(\"MONTH\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestampdiff(month, dt1, dt2)", "timestampdiff(\"MONTH\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "TIMESTAMPDIFF(YEAR, dt1, dt2)", "timestampdiff(\"YEAR\", dt1, dt2, __time_zone__)"); testFilterExpression( + mode, "timestampdiff(year, dt1, dt2)", "timestampdiff(\"YEAR\", dt1, dt2, __time_zone__)"); testFilterExpression( - "TIMESTAMPADD(SECOND, 1, dt)", "timestampadd(\"SECOND\", 1, dt, __time_zone__)"); + mode, + "TIMESTAMPADD(SECOND, 1, dt)", + "timestampadd(\"SECOND\", 1, dt, __time_zone__)"); + testFilterExpression( + mode, + "timestampadd(second, 1, dt)", + "timestampadd(\"SECOND\", 1, dt, __time_zone__)"); testFilterExpression( - "timestampadd(second, 1, dt)", "timestampadd(\"SECOND\", 1, dt, __time_zone__)"); + mode, + "TIMESTAMPADD(MINUTE, 1, dt)", + "timestampadd(\"MINUTE\", 1, dt, __time_zone__)"); testFilterExpression( - "TIMESTAMPADD(MINUTE, 1, dt)", "timestampadd(\"MINUTE\", 1, dt, __time_zone__)"); + mode, + "timestampadd(minute, 1, dt)", + "timestampadd(\"MINUTE\", 1, dt, __time_zone__)"); testFilterExpression( - "timestampadd(minute, 1, dt)", "timestampadd(\"MINUTE\", 1, dt, __time_zone__)"); + mode, "TIMESTAMPADD(HOUR, 1, dt)", "timestampadd(\"HOUR\", 1, dt, __time_zone__)"); testFilterExpression( - "TIMESTAMPADD(HOUR, 1, dt)", "timestampadd(\"HOUR\", 1, dt, __time_zone__)"); + mode, "timestampadd(hour, 1, dt)", "timestampadd(\"HOUR\", 1, dt, __time_zone__)"); testFilterExpression( - "timestampadd(hour, 1, dt)", "timestampadd(\"HOUR\", 1, dt, __time_zone__)"); + mode, "TIMESTAMPADD(DAY, 1, dt)", "timestampadd(\"DAY\", 1, dt, __time_zone__)"); testFilterExpression( - "TIMESTAMPADD(DAY, 1, dt)", "timestampadd(\"DAY\", 1, dt, __time_zone__)"); + mode, "timestampadd(day, 1, dt)", "timestampadd(\"DAY\", 1, dt, __time_zone__)"); testFilterExpression( - "timestampadd(day, 1, dt)", "timestampadd(\"DAY\", 1, dt, __time_zone__)"); + mode, + "TIMESTAMPADD(MONTH, 1, dt)", + "timestampadd(\"MONTH\", 1, dt, __time_zone__)"); testFilterExpression( - "TIMESTAMPADD(MONTH, 1, dt)", "timestampadd(\"MONTH\", 1, dt, __time_zone__)"); + mode, + "timestampadd(month, 1, dt)", + "timestampadd(\"MONTH\", 1, dt, __time_zone__)"); testFilterExpression( - "timestampadd(month, 1, dt)", "timestampadd(\"MONTH\", 1, dt, __time_zone__)"); + mode, "TIMESTAMPADD(YEAR, 1, dt)", "timestampadd(\"YEAR\", 1, dt, __time_zone__)"); testFilterExpression( - "TIMESTAMPADD(YEAR, 1, dt)", "timestampadd(\"YEAR\", 1, dt, __time_zone__)"); + mode, "timestampadd(year, 1, dt)", "timestampadd(\"YEAR\", 1, dt, __time_zone__)"); + testFilterExpression(mode, "IF(a>b,a,b)", "greaterThan(a, b) ? a : b"); + testFilterExpression(mode, "NULLIF(a,b)", "nullif(a, b)"); + testFilterExpression(mode, "COALESCE(a,b,c)", "coalesce(a, b, c)"); + testFilterExpression(mode, "id + 2", "id + 2"); + testFilterExpression(mode, "id - 2", "id - 2"); + testFilterExpression(mode, "id * 2", "id * 2"); + testFilterExpression(mode, "id / 2", "id / 2"); + testFilterExpression(mode, "id % 2", "id % 2"); + testFilterExpression(mode, "a < b", "lessThan(a, b)"); + testFilterExpression(mode, "a <= b", "lessThanOrEqual(a, b)"); + testFilterExpression(mode, "a > b", "greaterThan(a, b)"); + testFilterExpression(mode, "a >= b", "greaterThanOrEqual(a, b)"); + testFilterExpression(mode, "__table_name__ = 'tb'", "valueEquals(__table_name__, \"tb\")"); testFilterExpression( - "timestampadd(year, 1, dt)", "timestampadd(\"YEAR\", 1, dt, __time_zone__)"); - testFilterExpression("IF(a>b,a,b)", "greaterThan(a, b) ? a : b"); - testFilterExpression("NULLIF(a,b)", "nullif(a, b)"); - testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)"); - testFilterExpression("id + 2", "id + 2"); - testFilterExpression("id - 2", "id - 2"); - testFilterExpression("id * 2", "id * 2"); - testFilterExpression("id / 2", "id / 2"); - testFilterExpression("id % 2", "id % 2"); - testFilterExpression("a < b", "lessThan(a, b)"); - testFilterExpression("a <= b", "lessThanOrEqual(a, b)"); - testFilterExpression("a > b", "greaterThan(a, b)"); - testFilterExpression("a >= b", "greaterThanOrEqual(a, b)"); - testFilterExpression("__table_name__ = 'tb'", "valueEquals(__table_name__, \"tb\")"); - testFilterExpression("__schema_name__ = 'tb'", "valueEquals(__schema_name__, \"tb\")"); + mode, "__schema_name__ = 'tb'", "valueEquals(__schema_name__, \"tb\")"); testFilterExpression( - "__namespace_name__ = 'tb'", "valueEquals(__namespace_name__, \"tb\")"); - testFilterExpression("upper(lower(id))", "upper(lower(id))"); + mode, "__namespace_name__ = 'tb'", "valueEquals(__namespace_name__, \"tb\")"); + testFilterExpression(mode, "upper(lower(id))", "upper(lower(id))"); testFilterExpression( + mode, "abs(uniq_id) > 10 and id is not null", "greaterThan(abs(uniq_id), 10) && null != id"); testFilterExpression( + mode, "case id when 1 then 'a' when 2 then 'b' else 'c' end", "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); testFilterExpression( + mode, "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end", "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); testFilterExpression( + mode, "case id when 1 then 'a' when 2 then 'b' else 'c' end", "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); testFilterExpression( + mode, "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end", "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); - testFilterExpression("cast(id||'0' as int)", "castToInteger(concat(id, \"0\"))"); - testFilterExpression("cast(1 as string)", "castToString(1)"); - testFilterExpression("cast(1 as boolean)", "castToBoolean(1)"); - testFilterExpression("cast(1 as tinyint)", "castToByte(1)"); - testFilterExpression("cast(1 as smallint)", "castToShort(1)"); - testFilterExpression("cast(1 as bigint)", "castToLong(1)"); - testFilterExpression("cast(1 as float)", "castToFloat(1)"); - testFilterExpression("cast(1 as double)", "castToDouble(1)"); - testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10, 0)"); - testFilterExpression("cast(1 as char)", "castToString(1)"); - testFilterExpression("cast(1 as varchar)", "castToString(1)"); - testFilterExpression("cast(null as int)", "castToInteger(null)"); - testFilterExpression("cast(null as string)", "castToString(null)"); - testFilterExpression("cast(null as boolean)", "castToBoolean(null)"); - testFilterExpression("cast(null as tinyint)", "castToByte(null)"); - testFilterExpression("cast(null as smallint)", "castToShort(null)"); - testFilterExpression("cast(null as bigint)", "castToLong(null)"); - testFilterExpression("cast(null as float)", "castToFloat(null)"); - testFilterExpression("cast(null as double)", "castToDouble(null)"); - testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 10, 0)"); - testFilterExpression("cast(null as char)", "castToString(null)"); - testFilterExpression("cast(null as varchar)", "castToString(null)"); + testFilterExpression(mode, "cast(id||'0' as int)", "castToInteger(concat(id, \"0\"))"); + testFilterExpression(mode, "cast(1 as string)", "castToString(1)"); + testFilterExpression(mode, "cast(1 as boolean)", "castToBoolean(1)"); + testFilterExpression(mode, "cast(1 as tinyint)", "castToByte(1)"); + testFilterExpression(mode, "cast(1 as smallint)", "castToShort(1)"); + testFilterExpression(mode, "cast(1 as bigint)", "castToLong(1)"); + testFilterExpression(mode, "cast(1 as float)", "castToFloat(1)"); + testFilterExpression(mode, "cast(1 as double)", "castToDouble(1)"); + testFilterExpression(mode, "cast(1 as decimal)", "castToBigDecimal(1, 10, 0)"); + testFilterExpression(mode, "cast(1 as char)", "castToString(1)"); + testFilterExpression(mode, "cast(1 as varchar)", "castToString(1)"); + testFilterExpression(mode, "cast(null as int)", "castToInteger(null)"); + testFilterExpression(mode, "cast(null as string)", "castToString(null)"); + testFilterExpression(mode, "cast(null as boolean)", "castToBoolean(null)"); + testFilterExpression(mode, "cast(null as tinyint)", "castToByte(null)"); + testFilterExpression(mode, "cast(null as smallint)", "castToShort(null)"); + testFilterExpression(mode, "cast(null as bigint)", "castToLong(null)"); + testFilterExpression(mode, "cast(null as float)", "castToFloat(null)"); + testFilterExpression(mode, "cast(null as double)", "castToDouble(null)"); + testFilterExpression(mode, "cast(null as decimal)", "castToBigDecimal(null, 10, 0)"); + testFilterExpression(mode, "cast(null as char)", "castToString(null)"); + testFilterExpression(mode, "cast(null as varchar)", "castToString(null)"); testFilterExpression( + mode, "cast(CURRENT_TIMESTAMP as TIMESTAMP)", "castToTimestamp(currentTimestamp(__epoch_time__), __time_zone__)"); - testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)"); - testFilterExpression("parse_json(jsonStr)", "parseJson(jsonStr)"); - testFilterExpression("try_parse_json(jsonStr)", "tryParseJson(jsonStr)"); + testFilterExpression(mode, "cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)"); + testFilterExpression(mode, "parse_json(jsonStr)", "parseJson(jsonStr)"); + testFilterExpression(mode, "try_parse_json(jsonStr)", "tryParseJson(jsonStr)"); } - @Test - public void testTranslateItemAccessToJaninoExpression() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + public void testTranslateItemAccessToJaninoExpression(DecimalPrecisionMode mode) { // Test collection access functions (ARRAY, MAP) with proper column schema List columns = List.of( @@ -404,21 +473,26 @@ public void testTranslateItemAccessToJaninoExpression() { // Array access: array[index] - index is 1-based (SQL standard) // Result type is String (from ARRAY), so cast is added - testFilterExpressionWithColumns("arr[1]", "(java.lang.String) itemAccess(arr, 1)", columns); - testFilterExpressionWithColumns("arr[2]", "(java.lang.String) itemAccess(arr, 2)", columns); testFilterExpressionWithColumns( - "arr[idx]", "(java.lang.String) itemAccess(arr, idx)", columns); + mode, "arr[1]", "(java.lang.String) itemAccess(arr, 1)", columns); + testFilterExpressionWithColumns( + mode, "arr[2]", "(java.lang.String) itemAccess(arr, 2)", columns); + testFilterExpressionWithColumns( + mode, "arr[idx]", "(java.lang.String) itemAccess(arr, idx)", columns); // Map access: map[key] // Result type is Integer (from MAP), so cast is added testFilterExpressionWithColumns( - "m['key']", "(java.lang.Integer) itemAccess(m, \"key\")", columns); - testFilterExpressionWithColumns("m[k]", "(java.lang.Integer) itemAccess(m, k)", columns); + mode, "m['key']", "(java.lang.Integer) itemAccess(m, \"key\")", columns); + testFilterExpressionWithColumns( + mode, "m[k]", "(java.lang.Integer) itemAccess(m, k)", columns); // Nested access with comparisons testFilterExpressionWithColumns( + mode, "arr[1] = 'value'", "valueEquals((java.lang.String) itemAccess(arr, 1), \"value\")", columns); testFilterExpressionWithColumns( + mode, "m['key'] > 10", "greaterThan((java.lang.Integer) itemAccess(m, \"key\"), 10)", columns); @@ -426,80 +500,86 @@ public void testTranslateItemAccessToJaninoExpression() { List binaryArrayColumns = List.of(Column.physicalColumn("binArr", DataTypes.ARRAY(DataTypes.BINARY(16)))); testFilterExpressionWithColumns( - "binArr[1]", "(byte[]) itemAccess(binArr, 1)", binaryArrayColumns); + mode, "binArr[1]", "(byte[]) itemAccess(binArr, 1)", binaryArrayColumns); // Variant access tests List variantColumns = List.of(Column.physicalColumn("v", DataTypes.VARIANT())); testFilterExpressionWithColumns( + mode, "v['key']", "(org.apache.flink.cdc.common.types.variant.Variant) itemAccess(v, \"key\")", variantColumns); testFilterExpressionWithColumns( + mode, "v[1]", "(org.apache.flink.cdc.common.types.variant.Variant) itemAccess(v, 1)", variantColumns); testFilterExpressionWithColumns( + mode, "v['a']['b']", "(org.apache.flink.cdc.common.types.variant.Variant) itemAccess((org.apache.flink.cdc.common.types.variant.Variant) itemAccess(v, \"a\"), \"b\")", variantColumns); testFilterExpressionWithColumns( + mode, "parse_json('{\"key\": \"value\"}')['key']", "(org.apache.flink.cdc.common.types.variant.Variant) itemAccess(parseJson(\"{\\\"key\\\": \\\"value\\\"}\"), \"key\")", Collections.emptyList()); } - @Test - public void testTranslateFilterToJaninoExpressionError() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + public void testTranslateFilterToJaninoExpressionError(DecimalPrecisionMode mode) { Assertions.assertThatThrownBy( - () -> { - TransformParser.translateFilterExpressionToJaninoExpression( - "TIMESTAMPDIFF(SECONDS, dt1, dt2)", - Collections.emptyList(), - Collections.emptyList(), - new SupportedMetadataColumn[0], - Collections.emptyMap()); - }) + () -> + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPDIFF(SECONDS, dt1, dt2)", + Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], + Collections.emptyMap(), + mode)) .isExactlyInstanceOf(ParseException.class) .hasMessage("Statements can not be parsed."); Assertions.assertThatThrownBy( - () -> { - TransformParser.translateFilterExpressionToJaninoExpression( - "TIMESTAMPDIFF(QUARTER, dt1, dt2)", - Collections.emptyList(), - Collections.emptyList(), - new SupportedMetadataColumn[0], - Collections.emptyMap()); - }) + () -> + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPDIFF(QUARTER, dt1, dt2)", + Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], + Collections.emptyMap(), + mode)) .isExactlyInstanceOf(ParseException.class) .hasMessage( "Unsupported time interval unit in timestamp diff function: \"QUARTER\""); Assertions.assertThatThrownBy( - () -> { - TransformParser.translateFilterExpressionToJaninoExpression( - "TIMESTAMPADD(SECONDS, dt1, dt2)", - Collections.emptyList(), - Collections.emptyList(), - new SupportedMetadataColumn[0], - Collections.emptyMap()); - }) + () -> + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPADD(SECONDS, dt1, dt2)", + Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], + Collections.emptyMap(), + mode)) .isExactlyInstanceOf(ParseException.class) .hasMessage("Statements can not be parsed."); Assertions.assertThatThrownBy( - () -> { - TransformParser.translateFilterExpressionToJaninoExpression( - "TIMESTAMPADD(QUARTER, dt1, dt2)", - Collections.emptyList(), - Collections.emptyList(), - new SupportedMetadataColumn[0], - Collections.emptyMap()); - }) + () -> + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPADD(QUARTER, dt1, dt2)", + Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], + Collections.emptyMap(), + mode)) .isExactlyInstanceOf(ParseException.class) .hasMessage( "Unsupported time interval unit in timestamp add function: \"QUARTER\""); } - @Test - void testGenerateProjectionColumns() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + void testGenerateProjectionColumns(DecimalPrecisionMode mode) { List testColumns = Arrays.asList( Column.physicalColumn("id", DataTypes.INT(), "id"), @@ -518,7 +598,8 @@ void testGenerateProjectionColumns() { "id, upper(name) as name, age + 1 as newage, createTime as newCreateTime, address as newAddress, deposit as deposits, weight / (height * height) as bmi", testColumns, Collections.emptyList(), - new SupportedMetadataColumn[0]); + new SupportedMetadataColumn[0], + mode); List expected = Arrays.asList( @@ -536,7 +617,8 @@ void testGenerateProjectionColumns() { "*, __namespace_name__, __schema_name__, __table_name__, __data_event_type__ AS op_type", testColumns, Collections.emptyList(), - new SupportedMetadataColumn[0]); + new SupportedMetadataColumn[0], + mode); List metadataExpected = Arrays.asList( @@ -563,14 +645,16 @@ void testGenerateProjectionColumns() { "id, 1 + 1", testColumns, Collections.emptyList(), - new SupportedMetadataColumn[0])) + new SupportedMetadataColumn[0], + mode)) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage( "Unrecognized projection expression: 1 + 1. Should be AS "); } - @Test - public void testGenerateProjectionColumnsWithPrecision() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + public void testGenerateProjectionColumnsWithPrecision(DecimalPrecisionMode mode) { List testColumns = Arrays.asList( Column.physicalColumn("id", DataTypes.INT(), "id"), @@ -589,7 +673,8 @@ public void testGenerateProjectionColumnsWithPrecision() { "id, UPPER(name) as name2, UPPER(sex) as sex2, COALESCE(address,address) as address2, COALESCE(phone,phone) as phone2, COALESCE(deposit,deposit) as deposit2, COALESCE(birthday,birthday) as birthday2, COALESCE(birthday_ltz,birthday_ltz) as birthday_ltz2, COALESCE(update_time,update_time) as update_time2", testColumns, Collections.emptyList(), - new SupportedMetadataColumn[0]); + new SupportedMetadataColumn[0], + mode); List expected = Arrays.asList( @@ -641,52 +726,59 @@ void testGenerateReferencedColumns() { .isExactlyInstanceOf(ParseException.class); } - @Test - void testTranslateUdfFilterToJaninoExpression() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + void testTranslateUdfFilterToJaninoExpression(DecimalPrecisionMode mode) { testFilterExpressionWithUdf( - "format(upper(id))", "__instanceOfFormatFunctionClass.eval(upper(id))"); + mode, "format(upper(id))", "__instanceOfFormatFunctionClass.eval(upper(id))"); testFilterExpressionWithUdf( - "format(lower(id))", "__instanceOfFormatFunctionClass.eval(lower(id))"); + mode, "format(lower(id))", "__instanceOfFormatFunctionClass.eval(lower(id))"); testFilterExpressionWithUdf( - "format(concat(a,b))", "__instanceOfFormatFunctionClass.eval(concat(a, b))"); + mode, "format(concat(a,b))", "__instanceOfFormatFunctionClass.eval(concat(a, b))"); testFilterExpressionWithUdf( - "format(SUBSTR(a,1))", "__instanceOfFormatFunctionClass.eval(substr(a, 1))"); + mode, "format(SUBSTR(a,1))", "__instanceOfFormatFunctionClass.eval(substr(a, 1))"); testFilterExpressionWithUdf( + mode, "typeof(id like '^[a-zA-Z]')", "__instanceOfTypeOfFunctionClass.eval(like(id, \"^[a-zA-Z]\"))"); testFilterExpressionWithUdf( + mode, "typeof(id not like '^[a-zA-Z]')", "__instanceOfTypeOfFunctionClass.eval(notLike(id, \"^[a-zA-Z]\"))"); testFilterExpressionWithUdf( - "typeof(abs(2))", "__instanceOfTypeOfFunctionClass.eval(abs(2))"); + mode, "typeof(abs(2))", "__instanceOfTypeOfFunctionClass.eval(abs(2))"); testFilterExpressionWithUdf( - "typeof(ceil(2))", "__instanceOfTypeOfFunctionClass.eval(ceil(2))"); + mode, "typeof(ceil(2))", "__instanceOfTypeOfFunctionClass.eval(ceil(2))"); testFilterExpressionWithUdf( - "typeof(ceiling(2))", "__instanceOfTypeOfFunctionClass.eval(ceil(2))"); + mode, "typeof(ceiling(2))", "__instanceOfTypeOfFunctionClass.eval(ceil(2))"); testFilterExpressionWithUdf( - "typeof(floor(2))", "__instanceOfTypeOfFunctionClass.eval(floor(2))"); + mode, "typeof(floor(2))", "__instanceOfTypeOfFunctionClass.eval(floor(2))"); testFilterExpressionWithUdf( - "typeof(round(2,2))", "__instanceOfTypeOfFunctionClass.eval(round(2, 2))"); + mode, "typeof(round(2,2))", "__instanceOfTypeOfFunctionClass.eval(round(2, 2))"); testFilterExpressionWithUdf( - "typeof(id + 2)", "__instanceOfTypeOfFunctionClass.eval(id + 2)"); + mode, "typeof(id + 2)", "__instanceOfTypeOfFunctionClass.eval(id + 2)"); testFilterExpressionWithUdf( - "typeof(id - 2)", "__instanceOfTypeOfFunctionClass.eval(id - 2)"); + mode, "typeof(id - 2)", "__instanceOfTypeOfFunctionClass.eval(id - 2)"); testFilterExpressionWithUdf( - "typeof(id * 2)", "__instanceOfTypeOfFunctionClass.eval(id * 2)"); + mode, "typeof(id * 2)", "__instanceOfTypeOfFunctionClass.eval(id * 2)"); testFilterExpressionWithUdf( - "typeof(id / 2)", "__instanceOfTypeOfFunctionClass.eval(id / 2)"); + mode, "typeof(id / 2)", "__instanceOfTypeOfFunctionClass.eval(id / 2)"); testFilterExpressionWithUdf( - "typeof(id % 2)", "__instanceOfTypeOfFunctionClass.eval(id % 2)"); + mode, "typeof(id % 2)", "__instanceOfTypeOfFunctionClass.eval(id % 2)"); testFilterExpressionWithUdf( + mode, "addone(addone(id)) > 4 OR typeof(id) <> 'bool' AND format('from %s to %s is %s', 'a', 'z', 'lie') <> ''", "greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)), 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")"); testFilterExpressionWithUdf( + mode, "ADDONE(ADDONE(id)) > 4 OR TYPEOF(id) <> 'bool' AND FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''", "greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)), 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")"); } - @Test - public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap( + DecimalPrecisionMode mode) { List columns = List.of( Column.physicalColumn("a", DataTypes.INT()), @@ -699,68 +791,81 @@ public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() { columnNameMap.put("a-b", "$2"); testFilterExpressionWithUdf( + mode, "format(upper(a))", "__instanceOfFormatFunctionClass.eval(upper($0))", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "format(lower(b))", "__instanceOfFormatFunctionClass.eval(lower($1))", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "format(concat(a,b))", "__instanceOfFormatFunctionClass.eval(concat($0, $1))", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "format(SUBSTR(`a-b`,1))", "__instanceOfFormatFunctionClass.eval(substr($2, 1))", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "typeof(`a-b` like '^[a-zA-Z]')", "__instanceOfTypeOfFunctionClass.eval(like($2, \"^[a-zA-Z]\"))", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "typeof(`a-b` not like '^[a-zA-Z]')", "__instanceOfTypeOfFunctionClass.eval(notLike($2, \"^[a-zA-Z]\"))", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "typeof(a-b-`a-b`)", "__instanceOfTypeOfFunctionClass.eval($0 - $1 - $2)", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "typeof(a-b-2)", "__instanceOfTypeOfFunctionClass.eval($0 - $1 - 2)", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "addone(addone(`a-b`)) > 4 OR typeof(a-b) <> 'bool' AND format('from %s to %s is %s', 'a', 'z', 'lie') <> ''", "greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)), 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")", columns, columnNameMap); testFilterExpressionWithUdf( + mode, "ADDONE(ADDONE(`a-b`)) > 4 OR TYPEOF(a-b) <> 'bool' AND FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''", "greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)), 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")", columns, columnNameMap); } - @Test - void testLargeNumericalLiterals() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + void testLargeNumericalLiterals(DecimalPrecisionMode mode) { // For literals within [-2147483648, 2147483647] range, plain Integers are OK - testFilterExpression("id > 2147483647", "greaterThan(id, 2147483647)"); - testFilterExpression("id < -2147483648", "lessThan(id, -2147483648)"); + testFilterExpression(mode, "id > 2147483647", "greaterThan(id, 2147483647)"); + testFilterExpression(mode, "id < -2147483648", "lessThan(id, -2147483648)"); // For out-of-range literals, an extra `L` suffix is required - testFilterExpression("id > 2147483648", "greaterThan(id, 2147483648L)"); - testFilterExpression("id > -2147483649", "greaterThan(id, -2147483649L)"); - testFilterExpression("id < 9223372036854775807", "lessThan(id, 9223372036854775807L)"); - testFilterExpression("id > -9223372036854775808", "greaterThan(id, -9223372036854775808L)"); + testFilterExpression(mode, "id > 2147483648", "greaterThan(id, 2147483648L)"); + testFilterExpression(mode, "id > -2147483649", "greaterThan(id, -2147483649L)"); + testFilterExpression( + mode, "id < 9223372036854775807", "lessThan(id, 9223372036854775807L)"); + testFilterExpression( + mode, "id > -9223372036854775808", "greaterThan(id, -9223372036854775808L)"); // But there's still a limit Assertions.assertThatThrownBy( @@ -770,7 +875,8 @@ void testLargeNumericalLiterals() { Collections.emptyList(), Collections.emptyList(), new SupportedMetadataColumn[0], - Collections.emptyMap())) + Collections.emptyMap(), + mode)) .isExactlyInstanceOf(CalciteContextException.class) .hasMessageContaining("Numeric literal '9223372036854775808' out of range"); @@ -781,13 +887,15 @@ void testLargeNumericalLiterals() { Collections.emptyList(), Collections.emptyList(), new SupportedMetadataColumn[0], - Collections.emptyMap())) + Collections.emptyMap(), + mode)) .isExactlyInstanceOf(CalciteContextException.class) .hasMessageContaining("Numeric literal '-9223372036854775809' out of range"); } - @Test - public void testProjectionColumnsWithColumnNameMap() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + public void testProjectionColumnsWithColumnNameMap(DecimalPrecisionMode mode) { List testColumns = Arrays.asList( Column.physicalColumn("a", DataTypes.INT(), "a"), @@ -799,7 +907,8 @@ public void testProjectionColumnsWithColumnNameMap() { "a, b, a-b as c, `a-b`, `a-b` AS d, `a-b`-1 AS e, a-b+`a-b` AS f, `test-meta-col`, `test-meta-col`-a-b AS g", testColumns, Collections.emptyList(), - new SupportedMetadataColumn[] {new TestMetadataColumn()}); + new SupportedMetadataColumn[] {new TestMetadataColumn()}, + mode); List expected = Arrays.asList( @@ -830,8 +939,9 @@ public void testProjectionColumnsWithColumnNameMap() { "piedzimst brīvi" }; - @Test - void testParsingExpressionWithUnicodeLiterals() { + @ParameterizedTest(name = "precision mode: {0}") + @EnumSource + void testParsingExpressionWithUnicodeLiterals(DecimalPrecisionMode mode) { List columns = Arrays.asList( Column.physicalColumn("a", DataTypes.STRING(), "a"), @@ -844,7 +954,8 @@ void testParsingExpressionWithUnicodeLiterals() { .replace("{UNICODE_STRING}", unicodeString), columns, Collections.emptyList(), - new SupportedMetadataColumn[] {})) + new SupportedMetadataColumn[] {}, + mode)) .map(ProjectionColumn::getScriptExpression) .containsExactly( "$0", @@ -855,44 +966,56 @@ void testParsingExpressionWithUnicodeLiterals() { "!valueEquals($0, castToInteger(\"" + unicodeString + "\"))"); testFilterExpression( - "a = '" + unicodeString + "'", "valueEquals(a, \"" + unicodeString + "\")"); + mode, + "a = '" + unicodeString + "'", + "valueEquals(a, \"" + unicodeString + "\")"); testFilterExpression( - "a <> '" + unicodeString + "'", "!valueEquals(a, \"" + unicodeString + "\")"); + mode, + "a <> '" + unicodeString + "'", + "!valueEquals(a, \"" + unicodeString + "\")"); } } private static final List DUMMY_COLUMNS = List.of(Column.physicalColumn("id", DataTypes.INT())); - private void testFilterExpression(String expression, String expressionExpect) { + private void testFilterExpression( + DecimalPrecisionMode mode, String expression, String expressionExpect) { String janinoExpression = TransformParser.translateFilterExpressionToJaninoExpression( expression, DUMMY_COLUMNS, Collections.emptyList(), new SupportedMetadataColumn[0], - Collections.emptyMap()); + Collections.emptyMap(), + mode); Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); } private void testFilterExpressionWithColumns( - String expression, String expressionExpect, List columns) { + DecimalPrecisionMode mode, + String expression, + String expressionExpect, + List columns) { String janinoExpression = TransformParser.translateFilterExpressionToJaninoExpression( expression, columns, Collections.emptyList(), new SupportedMetadataColumn[0], - Collections.emptyMap()); + Collections.emptyMap(), + mode); Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); } - private void testFilterExpressionWithUdf(String expression, String expressionExpect) { + private void testFilterExpressionWithUdf( + DecimalPrecisionMode mode, String expression, String expressionExpect) { testFilterExpressionWithUdf( - expression, expressionExpect, DUMMY_COLUMNS, Collections.emptyMap()); + mode, expression, expressionExpect, DUMMY_COLUMNS, Collections.emptyMap()); } private void testFilterExpressionWithUdf( + DecimalPrecisionMode mode, String expression, String expressionExpect, List columns, @@ -912,7 +1035,8 @@ private void testFilterExpressionWithUdf( "typeof", "org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")), new SupportedMetadataColumn[0], - columnNameMap); + columnNameMap, + mode); Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); }