[improve][test] Migrate pulsar-perf to the V5 client API#25887
Merged
Conversation
PulsarClientBuilderV5#tlsPolicy was a stub that only set useTls=true and dropped every field on the TlsPolicy on the floor. Wire the policy through: trustCertsFilePath, keyFilePath, certificateFilePath, allowInsecureConnection, enableHostnameVerification all land on the underlying ClientConfigurationData. This unblocks any V5 caller that needs custom TLS — most immediately the pulsar-perf migration where users pass --trust-cert-file and expect the broker connection to honor it. Added two unit tests covering field propagation and the TlsPolicy.ofInsecure() shortcut.
So that pulsar-perf produce / consume / read / transaction work transparently against both regular and scalable topics — V5 detects the topic kind via topic:// vs persistent:// lookup and routes accordingly. All four perf commands swap the v4 client API for the V5 surface: - PerformanceProducer: V5 Producer / AsyncProducer.async().send(), BatchingPolicy / ChunkingPolicy / CompressionPolicy / ProducerAccessMode, ProducerEncryptionPolicy backed by PemFileKeyProvider for --encryption-key-name + -value-file, V5 client-level TransactionPolicy + Transaction.async().commit(). - PerformanceConsumer: V5 QueueConsumer (all subscription types fold into QueueConsumer; Exclusive / Failover trigger a runtime warning). V5 has no MessageListener so each consumer gets a dedicated poll thread driving receive(Duration), invoking the same handler the v4 listener did. - PerformanceReader → CheckpointConsumer. Same poll-thread pattern as PerformanceConsumer. --start-message-id only accepts earliest / latest (V5 Checkpoint has no lid:eid form). - PerformanceTransaction: combined producer+consumer over V5 Transaction. Per-message ack is synchronous (V5 acknowledge is sync void). PerfClientUtils gets a createV5ClientBuilderFromArguments() factory mirroring the v4 helper. The TLS predicate only enables TLS when the user genuinely wants it (URL is pulsar+ssl, a trust cert path is supplied, or a flag is explicitly TRUE) — never on Boolean.FALSE from default-value resolution. The unconditional non-null check caused TLS to flip on against plaintext brokers. V5 features without a direct v4 equivalent are logged as no-ops or documented warnings: --max-outstanding(-across-partitions), --stats-interval-seconds, --max-lookup-request, --ssl-factory-plugin*, --busy-wait, --auto-scaled-receiver-queue-size, --batch-index-ack, --pool-messages, chunked-message knobs, --receiver-queue-size-across-partitions, --replicated, --use-tls. Per-transaction --txn-timeout moves to client-level TransactionPolicy.
…erBuilderImpl) CI build failed in PerformanceProducerTest.java:179 because the test cast createProducerBuilder()'s return to v4 ProducerBuilderImpl and inspected getConf().isBatchingEnabled(). The V5 ProducerBuilder is opaque (no public conf accessor), so that white-box assertion cannot survive the V5 migration. Regression intent — "disableBatching=true must propagate to the configured builder" — is now covered by V5's BatchingPolicy.ofDisabled() tests and the end-to-end perf workflow tests in this file.
dao-jun
approved these changes
May 29, 2026
Two V5 client-side bugs that the pulsar-perf migration tests exposed: 1. PulsarClientBuilderV5#authentication(plugin, params) set the plugin class name + params strings on the conf but never instantiated the Authentication and called conf.setAuthentication(...). The v4 PulsarClientImpl reads the Authentication instance via conf.getAuthentication() at connect time, so the client connected with no credentials and the broker rejected the handshake (caught by Oauth2PerformanceTransactionTest). Fix: instantiate via AuthenticationFactory.create and attach to the conf — matches what v4 ClientBuilder does in the same overload. Bad plugin class names are wrapped as V5 PulsarClientException rather than leaking the v4 exception type. New unit tests cover both paths. 2. QueueConsumerBuilder was missing replicateSubscriptionState. The underlying v4 ConsumerConfigurationData supports it and StreamConsumerBuilder already exposes it; QueueConsumerBuilder was the odd one out. Same one-line wiring through to the v4 conf. Needed by PerformanceTransactionTest.testTxnPerf which asserts on subscription.isReplicated().
Three CI-uncovered fixes on the perf side:
1. CmdBase: enable picocli case-insensitive enum parsing. V5 enums
(SubscriptionInitialPosition.EARLIEST, ProducerAccessMode.SHARED,
...) are uppercase, while users have been passing the v4 spellings
("Earliest", "Shared") on the perf CLI for years. Picocli is
case-sensitive by default and would silently fail to parse the
v4 spellings, exiting the perf command before any logs were
emitted. Caught by PerformanceTransactionTest.testConsumeTxnMessage.
2. PerformanceTransaction: collect per-txn send futures and await
them before committing. V5 transaction-aware sends are queued
onto an internal dispatch chain, so the v4-side txn-coordinator
registration of the send can race the commit() if commit fires
before the chain drains. Symptom:
InvalidTxnStatusException: ... unexpected state : COMMITTING,
expect OPEN. This is also the semantically correct ordering
(commit only after sends land). Caught by
Oauth2PerformanceTransactionTest.testTransactionPerf.
3. Wire --replicated through to the V5 QueueConsumerBuilder now
that it exposes replicateSubscriptionState (preceding commit).
Drop the "ignored" warning. Needed by
PerformanceTransactionTest.testTxnPerf.
…roducer Addresses review feedback: PerformanceProducer committed the transaction purely on the local send counter (numMessageSend == numMessagesPerTransaction), but messageBuilder.send() is asynchronous and V5 dispatches it through the producer dispatch chain. The commit could race ahead of the txn-coordinator registration of the sends, which the broker rejects with InvalidTxnStatusException (COMMITTING, expect OPEN). Collect the in-flight transaction's send futures and await them before committing — the same fix already applied to PerformanceTransaction. The send chain swallows per-send failures, so the join never throws on a send error.
void-ptr974
approved these changes
Jun 2, 2026
Contributor
void-ptr974
left a comment
There was a problem hiding this comment.
Thanks for the update. This makes V5 more practical to try and benchmark, especially with scalable topics.
Looking forward to the broader V5 client work.
LGTM.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
PIP-475 added regular-to-scalable topic migration on the broker and the V5 client SDK. The V5 client transparently routes against both regular and scalable topics. Pulsar's perf CLI (
pulsar-perf produce / consume / read / transaction) still used the v4 client API and therefore could not benchmark scalable topics at all.This PR migrates all four
pulsar-perfcommands to the V5 client API so the same binary now works against regular and scalable topics out of the box.A small V5 client-side fix is bundled in because it is required for the perf migration:
PulsarClientBuilderV5#tlsPolicywas a stub that dropped every TlsPolicy field on the floor (only settinguseTls=true). The perf commands take--trust-cert-fileand the related flags and expect them to land, so the policy is now wired through toClientConfigurationDataend-to-end.Modifications
pulsar-client-v5PulsarClientBuilderV5#tlsPolicynow wirestrustCertsFilePath,keyFilePath,certificateFilePath,allowInsecureConnection,enableHostnameVerificationthrough to the underlying v4 conf. Two new unit tests inPulsarClientBuilderV5Testcover field propagation and theTlsPolicy.ofInsecure()shortcut.pulsar-testclient— depends onpulsar-client-v5/pulsar-client-api-v5.PerfClientUtilsgetscreateV5ClientBuilderFromArguments()mirroring the v4 helper. Important detail: the TLS predicate only enables TLS when the user genuinely wants it (URL ispulsar+ssl://, a trust cert path is supplied, or a flag is explicitly TRUE). Earlier wiring that treated non-null Booleans as intent caused TLS to flip on against plaintext brokers because picocli / default-value resolution coerces those flags toBoolean.FALSEeven when not passed.PerformanceProducer: V5Producer/AsyncProducer.async().send().BatchingPolicy/ChunkingPolicy/CompressionPolicy/ProducerAccessMode.ProducerEncryptionPolicybacked byPemFileKeyProviderfor--encryption-key-name+--encryption-key-value-file. V5 client-levelTransactionPolicy+Transaction.async().commit().PerformanceConsumer: V5QueueConsumer. All subscription types fold intoQueueConsumersemantics (work distribution);Exclusive/Failovertrigger a runtime warning. V5 has noMessageListener, so each consumer gets one dedicated poll thread drivingreceive(Duration)and invoking the same handler the v4 listener did. V5acknowledge(...)is synchronous void — wrapped in try / catch with the existing failure counters.PerformanceReader→ V5CheckpointConsumer. Same poll-thread emulation asPerformanceConsumerreplacesReaderListener.--start-message-idnow only acceptsearliest/latest— V5Checkpointhas nolid:eidfactory and rejecting that form explicitly is cleaner than silently falling back toearliest.PerformanceTransaction: combined producer+consumer over V5Transaction. V5 puts.transaction(txn)on the message builder rather thanproducer.newMessage(txn). Same semantics, different ergonomics.V5 features without a direct v4 equivalent
The following CLI flags survive for back-compat but are logged as no-ops at start-up:
--max-outstanding,--max-outstanding-across-partitions,--stats-interval-seconds,--max-lookup-request,--ssl-factory-plugin*,--busy-wait,--auto-scaled-receiver-queue-size,--batch-index-ack,--pool-messages, chunked-message knobs (--max_chunked_msg,--expire_time_incomplete_chunked_messages,--auto_ack_chunk_q_full),--receiver-queue-size-across-partitions,--replicated,--use-tls.Per-transaction
--txn-timeoutmoves to client-levelTransactionPolicy.Verifying this change
PulsarClientBuilderV5Testcover the TlsPolicy wiring.This change is already covered by existing tests, so it does not need additional tests.
Does this pull request potentially affect one of the following parts:
pulsar-perfsubcommands. CLI flag UX is preserved end-to-end; behavior for flags listed above is logged as a no-op.