ct: Local retention for tiered_v2 mode (pt2)#30544
Open
Lazin wants to merge 6 commits into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Adds end-to-end and unit test coverage plus implementation plumbing for a reconciler-published allowed_local_start_offset hint, enabling tiered-cloud topics to retain more local data (bounded by local-retention targets) and adjusting read/prefix-truncate behavior accordingly.
Changes:
- Introduces
allowed_local_start_offsetcommand/state inctp_stm, and uses it to compute a prefix-truncate target distinct from LRLO. - Adds a per-tick local-retention evaluator in the reconciler that computes/publishes the hint based on effective retention configuration.
- Adds rptest + gtests validating default bookkeeping, evaluator behavior, and STM prefix-truncate targeting semantics.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/tiered_cloud_local_retention_test.py | New E2E test validating local footprint behavior in tiered_cloud and hint clearing on mode/policy changes. |
| src/v/cloud_topics/reconciler/tests/test_utils.h | Extends fake_source with hooks to drive local-retention evaluator tests. |
| src/v/cloud_topics/reconciler/tests/reconciliation_source_test.cc | New unit tests for reconciliation source local-retention bookkeeping fields/mutators. |
| src/v/cloud_topics/reconciler/tests/reconciler_test.cc | Adds gtests for evaluator publishing/idempotency and eval-due predicate triggers. |
| src/v/cloud_topics/reconciler/tests/BUILD | Registers the new reconciliation source gtest target. |
| src/v/cloud_topics/reconciler/reconciliation_source.h | Adds source interface for computing/publishing local-retention target + per-source evaluator bookkeeping. |
| src/v/cloud_topics/reconciler/reconciliation_source.cc | Implements local-retention target computation and publishing for l0_source. |
| src/v/cloud_topics/reconciler/reconciler.h | Declares evaluator and “due” predicate entry points on reconciler. |
| src/v/cloud_topics/reconciler/reconciler.cc | Wires evaluator into reconcile tick; increments bytes-since-eval; implements due predicate + evaluator logic. |
| src/v/cloud_topics/reconciler/BUILD | Adds dependencies needed for evaluator/retention computation. |
| src/v/cloud_topics/level_zero/stm/types.h | Adds new STM command key enum value. |
| src/v/cloud_topics/level_zero/stm/types.cc | Adds formatter support for new STM command key. |
| src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc | Adds STM tests for applying/replicating hint and for prefix-truncate target behavior. |
| src/v/cloud_topics/level_zero/stm/tests/ctp_stm_state_test.cc | Adds tests for defaulting/serde round-trip of allowed_local_start_offset. |
| src/v/cloud_topics/level_zero/stm/tests/BUILD | Adds serde deps required by new state serde tests. |
| src/v/cloud_topics/level_zero/stm/ctp_stm.h | Declares apply handler for hint cmd and new prefix_truncate_target() helper. |
| src/v/cloud_topics/level_zero/stm/ctp_stm.cc | Applies new cmd, uses new prefix-truncate target in background loop, and computes target from hint. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_state.h | Bumps serde version and adds allowed_local_start_offset field + accessors. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_state.cc | Implements accessors for allowed_local_start_offset. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_commands.h | Defines set_allowed_local_start_offset_cmd. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_api.h | Adds API method to replicate hint command. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc | Implements idempotent replication of hint command. |
| src/v/cloud_topics/frontend/frontend.cc | Adjusts read-path decision to prefer local log for tiered-cloud when local covers the requested range (except compacted). |
Comment on lines
+46
to
+62
| bool tiered_cloud_with_local_limit(const cluster::topic_properties& props) { | ||
| const auto mode = props.storage_mode; | ||
| if (mode != model::redpanda_storage_mode::tiered_cloud) { | ||
| return false; | ||
| } | ||
| const bool compact | ||
| = props.cleanup_policy_bitflags.has_value() | ||
| && ((*props.cleanup_policy_bitflags & model::cleanup_policy_bitflags::compaction) == model::cleanup_policy_bitflags::compaction); | ||
| if (compact) { | ||
| return false; | ||
| } | ||
| const bool has_local_limit | ||
| = (!props.retention_local_target_bytes.is_disabled() | ||
| && props.retention_local_target_bytes.has_optional_value()) | ||
| || (!props.retention_local_target_ms.is_disabled() | ||
| && props.retention_local_target_ms.has_optional_value()); | ||
| return has_local_limit; |
Comment on lines
+536
to
+542
| // Translate the kafka::offset hint to a log offset. to_log_offset | ||
| // may return a sentinel for offsets outside the translator's known | ||
| // range (e.g. a stale hint from a previous epoch); fall back to the | ||
| // cap in that case rather than feeding garbage into std::min. | ||
| auto hint_log = _raft->log()->to_log_offset(kafka::offset_cast(*hint)); | ||
| if (hint_log != model::offset{} && hint_log != model::offset::min()) { | ||
| target = std::min(cap, hint_log); |
This was referenced May 20, 2026
Bytes-since-last-eval counter, last-eval timestamp, last-published value. Used by the upcoming evaluate_local_retention path.
Compute the allowed_local_start_offset target from storage.mode, cleanup.policy, and storage::log::retention_offset against effective local-retention targets. Publish via ctp_stm_api when changed. Idempotent when the value matches last-published. The source class gains two virtuals (compute_local_retention_target, publish_local_retention_target) so partition/ctp_stm-specific logic lives in l0_source while the reconciler orchestrates decision and bookkeeping. fake_source implements the virtuals as test hooks.
Bytes (>= segment_size_bytes), time (60s), and config-shape mismatch triggers. Wired into per-source post-reconcile path and the idle tick.
Local footprint converges near retention.local.target.bytes; flip to cloud and enabling compaction both evict aggressively.
In `tiered_cloud` mode the local log is prefix-truncated only up to `min(LRO, allowed_local_start_offset)`, so it may still cover offsets below LRO. Route those reads to the L0 (local) reader instead of L1 when the local log's start offset is at or below the requested start. In `cloud` mode the local log is prefix-truncated to LRO, so the new check is a no-op and the existing read-routing behavior is preserved. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Handle offset translation errors in prefix_truncate_target. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
6d00f61 to
c876033
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The second part of the PR series.
The change includes retention evaluator for
tiered_cloudand tests.The previous part: #30543
The next part: #30545
Summary
Add a single optional hint,
allowed_local_start_offset, replicated intoctp_stmstate. The reconciler is the sole writer of the hint. On eachtick it inspects the partition's topic config and the data it has reconciled,
computes a retention offset from
retention.local.target.{ms,bytes}exactlylike
disk_log_impl::housekeepingwould, and publishes it (it takes into accounttopic config, cluster level config, and retention_local_strict).
ctp_stm's truncate loop then usesprefix_truncate_target = min(LRLO, log_offset(hint))instead of just LRLO.That is the only consumer of the hint.
For
cloudmode (or compacted topics), the reconciler publishesnulloptandbehaviour is unchanged — aggressive eviction up to LRO.
The housekeeping in
the ctp_stmis already tracking active L0 readers so theraces with the local eviction are not possible.
Local retention rules
storage.modetiered_cloudnulloptSome(offset)tiered_cloudSome(_)tiered_cloudSome(_)nulloptcloudSome(_)nulloptcloudnulloptSpace manager: no changes needed
The space manager already publishes a
cloud_gcoffset on the log when diskpressure rises. In classic tiered,
disk_log_impl::housekeepingreads andclears it. For cloud topics:
ctp_stm::prefix_truncate_below_lroalready consultscloud_gcwhencomputing its truncation target, so the space manager can push the local
footprint below the reconciler's hint under pressure.
cloud_gcis cleared byctp_stm(sincedisk_log_impl::housekeepingdoesn't run).
partitions; the GC mechanism is identical to tiered.
max_removable_local_log_offset()is intentionally unchanged (returnsLRLO), so the reclaim path is not gated by the reconciler's hint.
Net result: the reconciler steers the steady-state local footprint, and the
space manager retains the ability to reclaim further on demand. The retention
logic (how much to keep) is the same as in classic tiered storage; only the
executor differs.
Component interactions
Summary
tiered_clouddisk_log_impl::housekeepingctp_stm::prefix_truncate_below_lrohousekeepingitselfreconciler(publishes hint via STM cmd)cloud_gcon logcloud_gcon log (same)cloud_gc?housekeepingctp_stmretention.local.target.{ms,bytes}→ offsetBackports Required
Release Notes