-
Notifications
You must be signed in to change notification settings - Fork 937
Description
Environment
- confluent-kafka version: 2.8.2, 2.12.2 (all tested and failing)
- Python version: 3.11.14, 3.13.3 (both affected)
- Operating System: macOS, Linux (production)
- fastavro version: 1.10.0
Description
Messages with Avro schemas that contain schema references fail to deserialize with confluent-kafka >= 2.8.2. The same code works correctly with version 2.8.0.
Important context: We are consuming messages from topics that are produced by external systems (not under our control). We cannot modify how these messages were originally serialized, nor can we republish them. The messages were created with earlier versions of confluent-kafka and are registered in Schema Registry with schema references.
Root Cause
Between version 2.8.0 and 2.8.2, changes were made to how schemas with references are parsed. The deserializer can no longer correctly resolve schema references when the message was serialized with an expanded schema but the Schema Registry has the schema registered with references.
Steps to Reproduce
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema, SchemaReference
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
import struct
import io
import avro.schema
import avro.io
# Setup Schema Registry client1
conf = {'url': "mock://"}
client = SchemaRegistryClient.new_client(conf)
# Register ReferencedRecord schema
referenced_schema_str = '{"type":"record","name":"ReferencedRecord","namespace":"example.references","fields":[{"name":"id","type":"string"}]}'
referenced_subject = 'test-ReferencedRecord'
referenced_schema = Schema(referenced_schema_str, schema_type='AVRO')
referenced_schema_id = client.register_schema(referenced_subject, referenced_schema)
# Register ReferencingRecord schema with reference to ReferencedRecord
referencing_schema_str = '{"type":"record","name":"ReferencingRecord","namespace":"example.references","fields":[{"name":"payload","type":"ReferencedRecord"}]}'
referencing_subject = 'test-value'
referencing_schema = Schema(
referencing_schema_str,
schema_type='AVRO',
references=[SchemaReference(name='ReferencedRecord', subject=referenced_subject, version=1)]
)
referencing_schema_id = client.register_schema(referencing_subject, referencing_schema)
# Serialize message with expanded schema (simulating how messages were created by external producers)
expanded_schema_str = '{"type":"record","name":"ReferencingRecord","namespace":"example.references","fields":[{"name":"payload","type":{"type":"record","name":"ReferencedRecord","namespace":"example.references","fields":[{"name":"id","type":"string"}]}}]}'
avro_schema = avro.schema.parse(expanded_schema_str)
writer = avro.io.DatumWriter(avro_schema)
bytes_writer = io.BytesIO()
bytes_writer.write(struct.pack('>bI', 0, referencing_schema_id))
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"payload": {"id": "123"}}, encoder)
message_bytes = bytes_writer.getvalue()
# Try to deserialize (this fails in 2.8.2+)
deserializer = AvroDeserializer(client)
ctx = SerializationContext('test-topic', MessageField.VALUE)
result = deserializer(message_bytes, ctx) # <- FAILS HEREExpected Behavior
The message should be deserialized successfully, as it does with version 2.8.0:
✓ Deserialized: {'payload': {'id': '123'}}
Actual Behavior (2.8.2, 2.12.2)
TypeError: argument of type 'NoneType' is not iterable
Traceback (most recent call last):
File "fastavro/_schema.pyx", line 537, in fastavro._schema._parse_schema_with_repo
File "fastavro/_schema.pyx", line 173, in fastavro._schema.parse_schema
File "fastavro/_schema.pyx", line 407, in fastavro._schema._parse_schema
File "fastavro/_schema.pyx", line 475, in fastavro._schema.parse_field
File "fastavro/_schema.pyx", line 267, in fastavro._schema._parse_schema
fastavro._schema_common.UnknownType: example.references.ReferencedRecord
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/test_bug.py", line 49, in <module>
result = deserializer(message_bytes, ctx)
File "confluent_kafka/schema_registry/avro.py", line 562, in __call__
writer_schema = self._get_parsed_schema(writer_schema_raw)
File "confluent_kafka/schema_registry/avro.py", line 612, in _get_parsed_schema
parsed_schema = parse_schema_with_repo(
File "confluent_kafka/schema_registry/avro.py", line 631, in parse_schema_with_repo
return load_schema("$root", repo=repo)
File "fastavro/_schema.pyx", line 509, in fastavro._schema.load_schema
File "fastavro/_schema.pyx", line 517, in fastavro._schema._load_schema
File "fastavro/_schema.pyx", line 541, in fastavro._schema._parse_schema_with_repo
File "fastavro/_schema.pyx", line 517, in fastavro._schema._load_schema
File "fastavro/_schema.pyx", line 537, in fastavro._schema._parse_schema_with_repo
File "fastavro/_schema.pyx", line 173, in fastavro._schema.parse_schema
File "fastavro/_schema.pyx", line 263, in fastavro._schema._parse_schema
TypeError: argument of type 'NoneType' is not iterable
Impact
We can not consume messages from topics with schemas that use references when using confluent-kafka versions >= 2.8.2.
Additional Context
This regression appears related to changes in how parse_schema_with_repo handles schema references. Between 2.8.0 and 2.8.2, the expand parameter seems to have been removed from schema parsing, causing fastavro to fail when resolving referenced schemas.
The error specifically occurs in the AvroDeserializer._get_parsed_schema() method when it calls parse_schema_with_repo(), which then fails in fastavro when trying to resolve the ReferencedRecord type reference.
Schema pattern affected:
{
"fields": [{"name": "payload", "type": "ReferencedRecord"}],
"name": "ReferencingRecord",
"namespace": "example.references",
"type": "record"
}where ReferencedRecord is registered as a separate schema in Schema Registry.