diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java index 5b0d41141e01d..faafe24145cd7 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java @@ -35,7 +35,6 @@ import org.apache.flink.util.InstantiationUtil; import java.io.IOException; -import java.util.Arrays; import java.util.stream.IntStream; import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask; @@ -205,7 +204,9 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataSerializerSnapshot oldRowDataSerializerSnapshot = (RowDataSerializerSnapshot) oldSerializerSnapshot; - if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) { + // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. + if (!typesAreCompatibleAfterNullabilityWidening( + types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -225,5 +226,32 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return intermediateResult.getFinalResult(); } + + /** + * Returns true when new field types are structurally equal to old ones ignoring top-level + * nullability, and no field narrows from nullable to non-nullable. + */ + private static boolean typesAreCompatibleAfterNullabilityWidening( + LogicalType[] newTypes, LogicalType[] oldTypes) { + if (newTypes == oldTypes) { + return true; + } + if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { + return false; + } + for (int i = 0; i < newTypes.length; i++) { + LogicalType newType = newTypes[i]; + LogicalType oldType = oldTypes[i]; + // structurally equal except (possibly) for top-level nullability + if (!newType.copy(true).equals(oldType.copy(true))) { + return false; + } + // reject narrowing: nullable -> non-nullable + if (oldType.isNullable() && !newType.isNullable()) { + return false; + } + } + return true; + } } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java new file mode 100644 index 0000000000000..4e9e274dd68a6 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializerSnapshotTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.RowDataSerializerSnapshot; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowDataSerializerSnapshot}. */ +class RowDataSerializerSnapshotTest { + + @Test + void sameTypesIsCompatibleAsIs() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Widening field nullability (NOT NULL -> NULL) should be compatible as-is. */ + @Test + void wideningFieldNullabilityIsCompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = serializerOf(new IntType(true), new VarCharType(true, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Narrowing field nullability (NULL -> NOT NULL) should be incompatible. */ + @Test + void narrowingFieldNullabilityIsIncompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(true), new VarCharType(true, 100)); + RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + /** Mixed nullability changes should be incompatible when any field narrows. */ + @Test + void mixedNullabilityChangeIsIncompatibleWhenAnyFieldNarrows() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false), new VarCharType(true, 100)); + // INT widened, VARCHAR narrowed + RowDataSerializer current = serializerOf(new IntType(true), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldCountIsIncompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false)); + RowDataSerializer current = serializerOf(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldTypeIsIncompatible() throws Exception { + RowDataSerializer previous = serializerOf(new IntType(false)); + RowDataSerializer current = serializerOf(new BigIntType(false)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + private static RowDataSerializer serializerOf(LogicalType... types) { + TypeSerializer[] fieldSerializers = new TypeSerializer[types.length]; + for (int i = 0; i < types.length; i++) { + fieldSerializers[i] = fieldSerializerFor(types[i]); + } + return new RowDataSerializer(types, fieldSerializers); + } + + private static TypeSerializer fieldSerializerFor(LogicalType type) { + switch (type.getTypeRoot()) { + case INTEGER: + return IntSerializer.INSTANCE; + case BIGINT: + return LongSerializer.INSTANCE; + case VARCHAR: + case CHAR: + return StringSerializer.INSTANCE; + default: + throw new IllegalArgumentException("Unsupported type for test: " + type); + } + } + + /** + * Round-trips the previous serializer's snapshot through serialization (mirroring what happens + * on checkpoint restore) and resolves compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + RowDataSerializer previous, RowDataSerializer current) throws Exception { + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = new RowDataSerializerSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + RowDataSerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java new file mode 100644 index 0000000000000..513801ac98a2d --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerSnapshotTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowDataSerializerSnapshot}. */ +class RowDataSerializerSnapshotTest { + + @Test + void sameTypesIsCompatibleAsIs() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Widening field nullability (NOT NULL -> NULL) should be compatible as-is. */ + @Test + void wideningFieldNullabilityIsCompatible() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + RowDataSerializer current = + new RowDataSerializer(new IntType(true), new VarCharType(true, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isCompatibleAsIs()); + } + + /** Narrowing field nullability (NULL -> NOT NULL) should be incompatible. */ + @Test + void narrowingFieldNullabilityIsIncompatible() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(true), new VarCharType(true, 100)); + RowDataSerializer current = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + /** Mixed nullability changes should be incompatible when any field narrows. */ + @Test + void mixedNullabilityChangeIsIncompatibleWhenAnyFieldNarrows() throws Exception { + RowDataSerializer previous = + new RowDataSerializer(new IntType(false), new VarCharType(true, 100)); + // INT widened, VARCHAR narrowed + RowDataSerializer current = + new RowDataSerializer(new IntType(true), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldCountIsIncompatible() throws Exception { + RowDataSerializer previous = new RowDataSerializer(new IntType(false)); + RowDataSerializer current = + new RowDataSerializer(new IntType(false), new VarCharType(false, 100)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + @Test + void differentFieldTypeIsIncompatible() throws Exception { + RowDataSerializer previous = new RowDataSerializer(new IntType(false)); + RowDataSerializer current = new RowDataSerializer(new BigIntType(false)); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + /** + * Nullability changes in nested row fields (even widening) are not supported and should be + * incompatible. + */ + @Test + void nullabilityChangeInNestedRowFieldIsIncompatible() throws Exception { + RowType nestedNotNull = + RowType.of( + new LogicalType[] {new IntType(false), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + RowType nestedNullable = + RowType.of( + new LogicalType[] {new IntType(true), new VarCharType(false, 100)}, + new String[] {"a", "b"}); + + RowDataSerializer previous = new RowDataSerializer(new LogicalType[] {nestedNotNull}); + RowDataSerializer current = new RowDataSerializer(new LogicalType[] {nestedNullable}); + + assertThat(resolveCompatibility(previous, current)).is(isIncompatible()); + } + + // ------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------- + + /** + * Round-trips the previous serializer's snapshot through serialization (mirroring what happens + * on checkpoint restore) and resolves compatibility against the current serializer's snapshot. + */ + private static TypeSerializerSchemaCompatibility resolveCompatibility( + RowDataSerializer previous, RowDataSerializer current) throws Exception { + TypeSerializerSnapshot previousSnapshot = previous.snapshotConfiguration(); + + DataOutputSerializer out = new DataOutputSerializer(64); + previousSnapshot.writeSnapshot(out); + + TypeSerializerSnapshot restoredPreviousSnapshot = new RowDataSerializerSnapshot(); + restoredPreviousSnapshot.readSnapshot( + previousSnapshot.getCurrentVersion(), + new DataInputDeserializer(out.getCopyOfBuffer()), + RowDataSerializerSnapshotTest.class.getClassLoader()); + + return current.snapshotConfiguration().resolveSchemaCompatibility(restoredPreviousSnapshot); + } +} diff --git a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java index 7a634f67efe04..8d419f15212f4 100644 --- a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java +++ b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java @@ -344,7 +344,9 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataSerializerSnapshot oldRowDataSerializerSnapshot = (RowDataSerializerSnapshot) oldSerializerSnapshot; - if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) { + // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL narrowing. + if (!typesAreCompatibleAfterNullabilityWidening( + types, oldRowDataSerializerSnapshot.types)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -364,5 +366,32 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return intermediateResult.getFinalResult(); } + + /** + * Returns true when new field types are structurally equal to old ones ignoring top-level + * nullability, and no field narrows from nullable to non-nullable. + */ + private static boolean typesAreCompatibleAfterNullabilityWidening( + LogicalType[] newTypes, LogicalType[] oldTypes) { + if (newTypes == oldTypes) { + return true; + } + if (newTypes == null || oldTypes == null || newTypes.length != oldTypes.length) { + return false; + } + for (int i = 0; i < newTypes.length; i++) { + LogicalType newType = newTypes[i]; + LogicalType oldType = oldTypes[i]; + // structurally equal except (possibly) for top-level nullability + if (!newType.copy(true).equals(oldType.copy(true))) { + return false; + } + // reject narrowing: nullable -> non-nullable + if (oldType.isNullable() && !newType.isNullable()) { + return false; + } + } + return true; + } } }