Skip to content

[FLINK-39036][formats] Honor microsecond Avro logical types in RowDataToAvroConverters#28071

Open
daguimu wants to merge 1 commit intoapache:masterfrom
daguimu:fix/avro-timestamp-micros-FLINK-39036
Open

[FLINK-39036][formats] Honor microsecond Avro logical types in RowDataToAvroConverters#28071
daguimu wants to merge 1 commit intoapache:masterfrom
daguimu:fix/avro-timestamp-micros-FLINK-39036

Conversation

@daguimu
Copy link
Copy Markdown

@daguimu daguimu commented Apr 29, 2026

What is the purpose of the change

Fix FLINK-39036: RowDataToAvroConverters always converts TIMESTAMP and TIMESTAMP_WITH_LOCAL_TIME_ZONE columns to milliseconds-since-epoch, ignoring the Avro logical type carried by the target schema. When the schema declares logicalType=timestamp-micros or local-timestamp-micros, readers that respect the logical type interpret that millisecond value as microseconds and shift the timestamp by a factor of 1000.

Brief change log

  • flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
    • In each TIMESTAMP_WITH_LOCAL_TIME_ZONE and TIMESTAMP_WITHOUT_TIME_ZONE converter (legacy and non-legacy mapping), check whether the supplied schema declares timestamp-micros / local-timestamp-micros. If yes, emit the value in microseconds; otherwise keep the existing milliseconds output.
    • Add private helpers isMicrosLogicalType(Schema) and toEpochMicros(Instant).
  • flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RowDataToAvroConvertersTest.java (new)
    • Three test methods covering the three converter branches that were updated; each asserts the exact micro/milli numeric output for a fixed 2024-01-02T03:04:05.123456 timestamp.

Verifying this change

This change adds tests and is already covered by them:

  • RowDataToAvroConvertersTest#testTimestampWithLocalTimeZoneRespectsMicrosLogicalTypeLocalZonedTimestampType(6) writer emits 1_704_164_645_123_456L for the micros schema and 1_704_164_645_123L for the millis schema.
  • RowDataToAvroConvertersTest#testTimestampWithoutTimeZoneRespectsLocalMicrosLogicalTypeTimestampType(6) writer emits the same numeric values for the local-timestamp-micros / -millis schemas.
  • RowDataToAvroConvertersTest#testLegacyTimestampMappingRespectsMicrosLogicalTypelegacyTimestampMapping=true path still honours timestamp-micros.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (only a single schema-logical-type check before the existing branch)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature: no (bug fix)
  • If yes, how is the feature documented: not applicable

…aToAvroConverters

RowDataToAvroConverters always converted TIMESTAMP / TIMESTAMP_WITH_LOCAL_TIME_ZONE
columns to milliseconds-since-epoch and ignored the Avro logical type carried by
the target schema. When the destination schema declared
logicalType=timestamp-micros (or local-timestamp-micros), readers that respect
the logical type interpreted the millisecond value as microseconds and shifted
the timestamp by a factor of 1000.

Detect timestamp-micros and local-timestamp-micros at conversion time and emit
the value in microseconds; otherwise keep the existing milliseconds path so
existing schemas using timestamp-millis are unaffected.

Add three RowDataToAvroConvertersTest cases covering:
* TIMESTAMP_WITH_LOCAL_TIME_ZONE under both micros and millis schemas
* TIMESTAMP_WITHOUT_TIME_ZONE under both local-timestamp-micros and -millis schemas
* the legacyTimestampMapping path under timestamp-micros

Closes #FLINK-39036
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 29, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants