Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void testReplicationWithAvroSchemaWithSchemaValidationEnforced() throws E
admin2.schemas().createSchema(topicName, myClassSchema.getSchemaInfo());

// consume from the remote cluster (r2)
Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema)
org.apache.pulsar.client.api.Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema)
.topic(topicName).subscriptionName("sub").subscribe();

// produce to local cluster (r1)
Expand All @@ -100,4 +99,25 @@ public void testReplicationWithAvroSchemaWithSchemaValidationEnforced() throws E
});
}

@Test(timeOut = 30000)
public void testReplicationReconnectWithSchemaValidationEnforced() throws Exception {
Schema<MyClass> myClassSchema = Schema.AVRO(MyClass.class);
final String topicName =
BrokerTestUtil.newUniqueName("persistent://" + sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
// With the creation of the topic, replication will be initiated.
// The schema that the internal producer of replication holes will be "Auto_producer -> bytes".
admin1.topics().createNonPartitionedTopic(topicName);
waitReplicatorStarted(topicName);
// Registers new scheme on the remote-side.
admin2.schemas().createSchema(topicName, myClassSchema.getSchemaInfo());
// After a reconnection, the schema that the internal producer of replication holes should be
// "Auto_producer -> myClassSchema".
ServerCnx serverCnx2 = (ServerCnx) pulsar2.getBrokerService().getTopic(topicName, false).get().get()
.getProducers().values().iterator().next().getCnx();
serverCnx2.ctx().channel().close();
// Verify: the producer reconnected successfully.
Thread.sleep(2000);
waitReplicatorStarted(topicName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
Expand Down Expand Up @@ -2021,6 +2022,16 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
return null;
}

if (cause instanceof PulsarClientException.IncompatibleSchemaException
&& schema instanceof AutoProduceBytesSchema autoProduceBytesSchema
&& !autoProduceBytesSchema.hasUserProvidedSchema()) {
client.reloadSchemaForAutoProduceProducer(topic, autoProduceBytesSchema)
.whenComplete((__, throwable) -> {
future.completeExceptionally(cause);
});
return null;
}

if (cause instanceof TimeoutException) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,26 +421,29 @@ public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurat
if (autoProduceBytesSchema.hasUserProvidedSchema()) {
return createProducerAsync(topic, conf, schema, interceptors);
}
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent()) {
SchemaInfo schemaInfo = schemaInfoOptional.get();
if (schemaInfo.getType() == SchemaType.PROTOBUF) {
autoProduceBytesSchema.setSchema(new GenericAvroSchema(schemaInfo));
} else {
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
}
} else {
autoProduceBytesSchema.setSchema(Schema.BYTES);
}
return createProducerAsync(topic, conf, schema, interceptors);
});
return reloadSchemaForAutoProduceProducer(topic, autoProduceBytesSchema)
.thenCompose(schemaInfoOptional -> createProducerAsync(topic, conf, schema, interceptors));
} else {
return createProducerAsync(topic, conf, schema, interceptors);
}

}

public CompletableFuture<Void> reloadSchemaForAutoProduceProducer(String topic, AutoProduceBytesSchema autoSchema) {
return lookup.getSchema(TopicName.get(topic)).thenAccept(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent()) {
SchemaInfo schemaInfo = schemaInfoOptional.get();
if (schemaInfo.getType() == SchemaType.PROTOBUF) {
autoSchema.setSchema(new GenericAvroSchema(schemaInfo));
} else {
autoSchema.setSchema(Schema.getSchema(schemaInfo));
}
} else {
autoSchema.setSchema(Schema.BYTES);
}
});
}

private CompletableFuture<Integer> checkPartitions(String topic, boolean forceNoPartitioned,
@Nullable String producerNameForLog) {
CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();
Expand Down
Loading