Skip to content

[improve][test] Migrate pulsar-perf to the V5 client API#25887

Merged
merlimat merged 6 commits into
apache:masterfrom
merlimat:mmerli/pip-475-perf-v5
Jun 2, 2026
Merged

[improve][test] Migrate pulsar-perf to the V5 client API#25887
merlimat merged 6 commits into
apache:masterfrom
merlimat:mmerli/pip-475-perf-v5

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

Motivation

PIP-475 added regular-to-scalable topic migration on the broker and the V5 client SDK. The V5 client transparently routes against both regular and scalable topics. Pulsar's perf CLI (pulsar-perf produce / consume / read / transaction) still used the v4 client API and therefore could not benchmark scalable topics at all.

This PR migrates all four pulsar-perf commands to the V5 client API so the same binary now works against regular and scalable topics out of the box.

A small V5 client-side fix is bundled in because it is required for the perf migration: PulsarClientBuilderV5#tlsPolicy was a stub that dropped every TlsPolicy field on the floor (only setting useTls=true). The perf commands take --trust-cert-file and the related flags and expect them to land, so the policy is now wired through to ClientConfigurationData end-to-end.

Modifications

pulsar-client-v5

  • PulsarClientBuilderV5#tlsPolicy now wires trustCertsFilePath, keyFilePath, certificateFilePath, allowInsecureConnection, enableHostnameVerification through to the underlying v4 conf. Two new unit tests in PulsarClientBuilderV5Test cover field propagation and the TlsPolicy.ofInsecure() shortcut.

pulsar-testclient — depends on pulsar-client-v5 / pulsar-client-api-v5.

  • PerfClientUtils gets createV5ClientBuilderFromArguments() mirroring the v4 helper. Important detail: the TLS predicate only enables TLS when the user genuinely wants it (URL is pulsar+ssl://, a trust cert path is supplied, or a flag is explicitly TRUE). Earlier wiring that treated non-null Booleans as intent caused TLS to flip on against plaintext brokers because picocli / default-value resolution coerces those flags to Boolean.FALSE even when not passed.

  • PerformanceProducer: V5 Producer / AsyncProducer.async().send(). BatchingPolicy / ChunkingPolicy / CompressionPolicy / ProducerAccessMode. ProducerEncryptionPolicy backed by PemFileKeyProvider for --encryption-key-name + --encryption-key-value-file. V5 client-level TransactionPolicy + Transaction.async().commit().

  • PerformanceConsumer: V5 QueueConsumer. All subscription types fold into QueueConsumer semantics (work distribution); Exclusive / Failover trigger a runtime warning. V5 has no MessageListener, so each consumer gets one dedicated poll thread driving receive(Duration) and invoking the same handler the v4 listener did. V5 acknowledge(...) is synchronous void — wrapped in try / catch with the existing failure counters.

  • PerformanceReader → V5 CheckpointConsumer. Same poll-thread emulation as PerformanceConsumer replaces ReaderListener. --start-message-id now only accepts earliest / latest — V5 Checkpoint has no lid:eid factory and rejecting that form explicitly is cleaner than silently falling back to earliest.

  • PerformanceTransaction: combined producer+consumer over V5 Transaction. V5 puts .transaction(txn) on the message builder rather than producer.newMessage(txn). Same semantics, different ergonomics.

V5 features without a direct v4 equivalent

The following CLI flags survive for back-compat but are logged as no-ops at start-up:

--max-outstanding, --max-outstanding-across-partitions, --stats-interval-seconds, --max-lookup-request, --ssl-factory-plugin*, --busy-wait, --auto-scaled-receiver-queue-size, --batch-index-ack, --pool-messages, chunked-message knobs (--max_chunked_msg, --expire_time_incomplete_chunked_messages, --auto_ack_chunk_q_full), --receiver-queue-size-across-partitions, --replicated, --use-tls.

Per-transaction --txn-timeout moves to client-level TransactionPolicy.

Verifying this change

  • New unit tests in PulsarClientBuilderV5Test cover the TlsPolicy wiring.

This change is already covered by existing tests, so it does not need additional tests.

Does this pull request potentially affect one of the following parts:

  • The public API: yes — the four pulsar-perf subcommands. CLI flag UX is preserved end-to-end; behavior for flags listed above is logged as a no-op.
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: no
  • Anything that affects deployment: no

merlimat added 2 commits May 29, 2026 09:34
PulsarClientBuilderV5#tlsPolicy was a stub that only set useTls=true and
dropped every field on the TlsPolicy on the floor. Wire the policy
through: trustCertsFilePath, keyFilePath, certificateFilePath,
allowInsecureConnection, enableHostnameVerification all land on the
underlying ClientConfigurationData.

This unblocks any V5 caller that needs custom TLS — most immediately
the pulsar-perf migration where users pass --trust-cert-file and
expect the broker connection to honor it.

Added two unit tests covering field propagation and the
TlsPolicy.ofInsecure() shortcut.
So that pulsar-perf produce / consume / read / transaction work
transparently against both regular and scalable topics — V5 detects
the topic kind via topic:// vs persistent:// lookup and routes
accordingly.

All four perf commands swap the v4 client API for the V5 surface:

- PerformanceProducer: V5 Producer / AsyncProducer.async().send(),
  BatchingPolicy / ChunkingPolicy / CompressionPolicy /
  ProducerAccessMode, ProducerEncryptionPolicy backed by
  PemFileKeyProvider for --encryption-key-name + -value-file, V5
  client-level TransactionPolicy + Transaction.async().commit().

- PerformanceConsumer: V5 QueueConsumer (all subscription types fold
  into QueueConsumer; Exclusive / Failover trigger a runtime warning).
  V5 has no MessageListener so each consumer gets a dedicated poll
  thread driving receive(Duration), invoking the same handler the v4
  listener did.

- PerformanceReader → CheckpointConsumer. Same poll-thread pattern as
  PerformanceConsumer. --start-message-id only accepts earliest /
  latest (V5 Checkpoint has no lid:eid form).

- PerformanceTransaction: combined producer+consumer over V5
  Transaction. Per-message ack is synchronous (V5 acknowledge is sync
  void).

PerfClientUtils gets a createV5ClientBuilderFromArguments() factory
mirroring the v4 helper. The TLS predicate only enables TLS when the
user genuinely wants it (URL is pulsar+ssl, a trust cert path is
supplied, or a flag is explicitly TRUE) — never on Boolean.FALSE
from default-value resolution. The unconditional non-null check
caused TLS to flip on against plaintext brokers.

V5 features without a direct v4 equivalent are logged as no-ops or
documented warnings: --max-outstanding(-across-partitions),
--stats-interval-seconds, --max-lookup-request, --ssl-factory-plugin*,
--busy-wait, --auto-scaled-receiver-queue-size, --batch-index-ack,
--pool-messages, chunked-message knobs,
--receiver-queue-size-across-partitions, --replicated, --use-tls.
Per-transaction --txn-timeout moves to client-level TransactionPolicy.
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

…erBuilderImpl)

CI build failed in PerformanceProducerTest.java:179 because the test
cast createProducerBuilder()'s return to v4 ProducerBuilderImpl and
inspected getConf().isBatchingEnabled(). The V5 ProducerBuilder is
opaque (no public conf accessor), so that white-box assertion cannot
survive the V5 migration.

Regression intent — "disableBatching=true must propagate to the
configured builder" — is now covered by V5's BatchingPolicy.ofDisabled()
tests and the end-to-end perf workflow tests in this file.
merlimat added 2 commits May 30, 2026 10:36
Two V5 client-side bugs that the pulsar-perf migration tests exposed:

1. PulsarClientBuilderV5#authentication(plugin, params) set the plugin
   class name + params strings on the conf but never instantiated the
   Authentication and called conf.setAuthentication(...). The v4
   PulsarClientImpl reads the Authentication instance via
   conf.getAuthentication() at connect time, so the client connected
   with no credentials and the broker rejected the handshake (caught
   by Oauth2PerformanceTransactionTest). Fix: instantiate via
   AuthenticationFactory.create and attach to the conf — matches
   what v4 ClientBuilder does in the same overload. Bad plugin class
   names are wrapped as V5 PulsarClientException rather than leaking
   the v4 exception type. New unit tests cover both paths.

2. QueueConsumerBuilder was missing replicateSubscriptionState. The
   underlying v4 ConsumerConfigurationData supports it and
   StreamConsumerBuilder already exposes it; QueueConsumerBuilder
   was the odd one out. Same one-line wiring through to the v4 conf.
   Needed by PerformanceTransactionTest.testTxnPerf which asserts on
   subscription.isReplicated().
Three CI-uncovered fixes on the perf side:

1. CmdBase: enable picocli case-insensitive enum parsing. V5 enums
   (SubscriptionInitialPosition.EARLIEST, ProducerAccessMode.SHARED,
   ...) are uppercase, while users have been passing the v4 spellings
   ("Earliest", "Shared") on the perf CLI for years. Picocli is
   case-sensitive by default and would silently fail to parse the
   v4 spellings, exiting the perf command before any logs were
   emitted. Caught by PerformanceTransactionTest.testConsumeTxnMessage.

2. PerformanceTransaction: collect per-txn send futures and await
   them before committing. V5 transaction-aware sends are queued
   onto an internal dispatch chain, so the v4-side txn-coordinator
   registration of the send can race the commit() if commit fires
   before the chain drains. Symptom:
   InvalidTxnStatusException: ... unexpected state : COMMITTING,
   expect OPEN. This is also the semantically correct ordering
   (commit only after sends land). Caught by
   Oauth2PerformanceTransactionTest.testTransactionPerf.

3. Wire --replicated through to the V5 QueueConsumerBuilder now
   that it exposes replicateSubscriptionState (preceding commit).
   Drop the "ignored" warning. Needed by
   PerformanceTransactionTest.testTxnPerf.
…roducer

Addresses review feedback: PerformanceProducer committed the
transaction purely on the local send counter
(numMessageSend == numMessagesPerTransaction), but messageBuilder.send()
is asynchronous and V5 dispatches it through the producer dispatch
chain. The commit could race ahead of the txn-coordinator registration
of the sends, which the broker rejects with InvalidTxnStatusException
(COMMITTING, expect OPEN).

Collect the in-flight transaction's send futures and await them before
committing — the same fix already applied to PerformanceTransaction.
The send chain swallows per-send failures, so the join never throws on
a send error.
Copy link
Copy Markdown
Contributor

@void-ptr974 void-ptr974 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. This makes V5 more practical to try and benchmark, especially with scalable topics.

Looking forward to the broader V5 client work.

LGTM.

@merlimat merlimat merged commit caa493b into apache:master Jun 2, 2026
43 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants