diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java index 2dd94bb1cd103..cc5a957793c62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java @@ -24,7 +24,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; @@ -79,7 +78,7 @@ public void testReplicationWithAvroSchemaWithSchemaValidationEnforced() throws E admin2.schemas().createSchema(topicName, myClassSchema.getSchemaInfo()); // consume from the remote cluster (r2) - Consumer consumer2 = client2.newConsumer(myClassSchema) + org.apache.pulsar.client.api.Consumer consumer2 = client2.newConsumer(myClassSchema) .topic(topicName).subscriptionName("sub").subscribe(); // produce to local cluster (r1) @@ -100,4 +99,25 @@ public void testReplicationWithAvroSchemaWithSchemaValidationEnforced() throws E }); } + @Test(timeOut = 30000) + public void testReplicationReconnectWithSchemaValidationEnforced() throws Exception { + Schema myClassSchema = Schema.AVRO(MyClass.class); + final String topicName = + BrokerTestUtil.newUniqueName("persistent://" + sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_"); + // With the creation of the topic, replication will be initiated. + // The schema that the internal producer of replication holes will be "Auto_producer -> bytes". + admin1.topics().createNonPartitionedTopic(topicName); + waitReplicatorStarted(topicName); + // Registers new scheme on the remote-side. + admin2.schemas().createSchema(topicName, myClassSchema.getSchemaInfo()); + // After a reconnection, the schema that the internal producer of replication holes should be + // "Auto_producer -> myClassSchema". + ServerCnx serverCnx2 = (ServerCnx) pulsar2.getBrokerService().getTopic(topicName, false).get().get() + .getProducers().values().iterator().next().getCnx(); + serverCnx2.ctx().channel().close(); + // Verify: the producer reconnected successfully. + Thread.sleep(2000); + waitReplicatorStarted(topicName); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 8b7fec005b820..89f1c81caddf0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -86,6 +86,7 @@ import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.client.impl.metrics.Unit; import org.apache.pulsar.client.impl.metrics.UpDownCounter; +import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -2021,6 +2022,16 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { return null; } + if (cause instanceof PulsarClientException.IncompatibleSchemaException + && schema instanceof AutoProduceBytesSchema autoProduceBytesSchema + && !autoProduceBytesSchema.hasUserProvidedSchema()) { + client.reloadSchemaForAutoProduceProducer(topic, autoProduceBytesSchema) + .whenComplete((__, throwable) -> { + future.completeExceptionally(cause); + }); + return null; + } + if (cause instanceof TimeoutException) { // Creating the producer has timed out. We need to ensure the broker closes the producer // in case it was indeed created, otherwise it might prevent new create producer operation, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f15dc8284ab27..c7a43a912a5de 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -421,26 +421,29 @@ public CompletableFuture> createProducerAsync(ProducerConfigurat if (autoProduceBytesSchema.hasUserProvidedSchema()) { return createProducerAsync(topic, conf, schema, interceptors); } - return lookup.getSchema(TopicName.get(conf.getTopicName())) - .thenCompose(schemaInfoOptional -> { - if (schemaInfoOptional.isPresent()) { - SchemaInfo schemaInfo = schemaInfoOptional.get(); - if (schemaInfo.getType() == SchemaType.PROTOBUF) { - autoProduceBytesSchema.setSchema(new GenericAvroSchema(schemaInfo)); - } else { - autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo)); - } - } else { - autoProduceBytesSchema.setSchema(Schema.BYTES); - } - return createProducerAsync(topic, conf, schema, interceptors); - }); + return reloadSchemaForAutoProduceProducer(topic, autoProduceBytesSchema) + .thenCompose(schemaInfoOptional -> createProducerAsync(topic, conf, schema, interceptors)); } else { return createProducerAsync(topic, conf, schema, interceptors); } } + public CompletableFuture reloadSchemaForAutoProduceProducer(String topic, AutoProduceBytesSchema autoSchema) { + return lookup.getSchema(TopicName.get(topic)).thenAccept(schemaInfoOptional -> { + if (schemaInfoOptional.isPresent()) { + SchemaInfo schemaInfo = schemaInfoOptional.get(); + if (schemaInfo.getType() == SchemaType.PROTOBUF) { + autoSchema.setSchema(new GenericAvroSchema(schemaInfo)); + } else { + autoSchema.setSchema(Schema.getSchema(schemaInfo)); + } + } else { + autoSchema.setSchema(Schema.BYTES); + } + }); + } + private CompletableFuture checkPartitions(String topic, boolean forceNoPartitioned, @Nullable String producerNameForLog) { CompletableFuture checkPartitions = new CompletableFuture<>();