diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java index 202cd601bc02e..9a516602d8b2e 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CollectionUtil; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -36,6 +37,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; +import java.time.Instant; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; @@ -166,7 +168,11 @@ public Object convert(Schema schema, Object object) { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); + final TimestampData timestampData = (TimestampData) object; + if (isMicrosLogicalType(schema)) { + return toEpochMicros(timestampData.toInstant()); + } + return timestampData.toInstant().toEpochMilli(); } }; } else { @@ -176,7 +182,14 @@ public Object convert(Schema schema, Object object) { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object) + final TimestampData timestampData = (TimestampData) object; + if (isMicrosLogicalType(schema)) { + return toEpochMicros( + timestampData + .toLocalDateTime() + .toInstant(ZoneOffset.UTC)); + } + return timestampData .toLocalDateTime() .toInstant(ZoneOffset.UTC) .toEpochMilli(); @@ -194,7 +207,11 @@ public Object convert(Schema schema, Object object) { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); + final TimestampData timestampData = (TimestampData) object; + if (isMicrosLogicalType(schema)) { + return toEpochMicros(timestampData.toInstant()); + } + return timestampData.toInstant().toEpochMilli(); } }; } @@ -353,4 +370,16 @@ public Object convert(Schema schema, Object object) { } }; } + + private static boolean isMicrosLogicalType(Schema schema) { + final org.apache.avro.LogicalType logicalType = schema.getLogicalType(); + return logicalType == LogicalTypes.timestampMicros() + || logicalType == LogicalTypes.localTimestampMicros(); + } + + private static long toEpochMicros(Instant instant) { + return Math.addExact( + Math.multiplyExact(instant.getEpochSecond(), 1_000_000L), + instant.getNano() / 1_000L); + } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RowDataToAvroConvertersTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RowDataToAvroConvertersTest.java new file mode 100644 index 0000000000000..a1b2413317ecf --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RowDataToAvroConvertersTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.formats.avro.RowDataToAvroConverters.RowDataToAvroConverter; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.TimestampType; + +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowDataToAvroConverters}. */ +class RowDataToAvroConvertersTest { + + private static final String TIMESTAMP_MILLIS_SCHEMA = + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}"; + private static final String TIMESTAMP_MICROS_SCHEMA = + "{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}"; + private static final String LOCAL_TIMESTAMP_MILLIS_SCHEMA = + "{\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"}"; + private static final String LOCAL_TIMESTAMP_MICROS_SCHEMA = + "{\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"; + + @Test + void testTimestampWithLocalTimeZoneRespectsMicrosLogicalType() { + // FLINK-39036: writer must produce micros when the Avro schema declares + // logicalType=timestamp-micros, otherwise downstream readers that respect + // the logical type interpret the millis value as micros and shift the + // timestamp by a factor of 1000. + final RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(new LocalZonedTimestampType(6), false); + + final TimestampData timestamp = + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2024-01-02T03:04:05.123456")); + + final long micros = + (long) converter.convert(new Schema.Parser().parse(TIMESTAMP_MICROS_SCHEMA), timestamp); + assertThat(micros).isEqualTo(1_704_164_645_123_456L); + + final long millis = + (long) converter.convert(new Schema.Parser().parse(TIMESTAMP_MILLIS_SCHEMA), timestamp); + assertThat(millis).isEqualTo(1_704_164_645_123L); + } + + @Test + void testTimestampWithoutTimeZoneRespectsLocalMicrosLogicalType() { + // TIMESTAMP_WITHOUT_TIME_ZONE maps to Avro local-timestamp-* logical types + // under the new mapping. The converter must honor local-timestamp-micros + // and emit microseconds instead of milliseconds. + final RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(new TimestampType(6), false); + + final TimestampData timestamp = + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2024-01-02T03:04:05.123456")); + + final long micros = + (long) + converter.convert( + new Schema.Parser().parse(LOCAL_TIMESTAMP_MICROS_SCHEMA), timestamp); + assertThat(micros).isEqualTo(1_704_164_645_123_456L); + + final long millis = + (long) + converter.convert( + new Schema.Parser().parse(LOCAL_TIMESTAMP_MILLIS_SCHEMA), timestamp); + assertThat(millis).isEqualTo(1_704_164_645_123L); + } + + @Test + void testLegacyTimestampMappingRespectsMicrosLogicalType() { + // The legacy mapping path also has to honor timestamp-micros when present + // in the Avro schema, since users may serialize against an externally + // provided schema with the micros logical type. + final RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(new TimestampType(6), true); + + final TimestampData timestamp = + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2024-01-02T03:04:05.123456")); + + final long micros = + (long) converter.convert(new Schema.Parser().parse(TIMESTAMP_MICROS_SCHEMA), timestamp); + assertThat(micros).isEqualTo(1_704_164_645_123_456L); + } +}