ct: Local retention for tiered_v2 (pt3)#30545
Open
Lazin wants to merge 14 commits into
Open
Conversation
Optional kafka::offset cached on ctp_stm_state, produced later by the reconciler. Defaults to nullopt; bumps serde version to 1 while keeping compat_version=0 so older snapshots still load. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
New replicated command carrying an optional kafka::offset. nullopt clears the hint; Some(offset) sets it. Single command type covers advance, lower, set, and clear. Apply path is a stub returning no-op; the real implementation lands in a follow-up change.
Dispatch the new command in do_apply, store the value in state, and signal the prefix-truncate loop so it can pick up the new target on the next iteration.
Introduce prefix_truncate_target() that returns LRLO when the hint is unset, or the log offset of the hint when set (clamped to LRLO). max_removable_local_log_offset() continues to return LRLO so the storage layer's reclaim path is unaffected. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Public entry point used by the reconciler to replicate the new command. Idempotently no-ops when the cached value already matches.
Bytes-since-last-eval counter, last-eval timestamp, last-published value. Used by the upcoming evaluate_local_retention path.
Compute the allowed_local_start_offset target from storage.mode, cleanup.policy, and storage::log::retention_offset against effective local-retention targets. Publish via ctp_stm_api when changed. Idempotent when the value matches last-published. The source class gains two virtuals (compute_local_retention_target, publish_local_retention_target) so partition/ctp_stm-specific logic lives in l0_source while the reconciler orchestrates decision and bookkeeping. fake_source implements the virtuals as test hooks.
Bytes (>= segment_size_bytes), time (60s), and config-shape mismatch triggers. Wired into per-source post-reconcile path and the idle tick.
Local footprint converges near retention.local.target.bytes; flip to cloud and enabling compaction both evict aggressively.
In `tiered_cloud` mode the local log is prefix-truncated only up to `min(LRO, allowed_local_start_offset)`, so it may still cover offsets below LRO. Route those reads to the L0 (local) reader instead of L1 when the local log's start offset is at or below the requested start. In `cloud` mode the local log is prefix-truncated to LRO, so the new check is a no-op and the existing read-routing behavior is preserved. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Add storage::log::cloud_gc_offset() getter (implemented on disk_log_impl and failure_injectable_log) so the ctp_stm's truncate loop can observe the value the space manager publishes. The actual fix to ctp_stm::prefix_truncate_target lands in a follow-up so this commit demonstrates the bug: * prefix_truncate_target_respects_cloud_gc_above_hint — set hint=20 and cloud_gc=60, expect target == cloud_gc. FAILS today (returns hint). * prefix_truncate_target_clamps_cloud_gc_to_lrlo — cloud_gc above LRLO must clamp. Passes today by accident; locks the post-fix behavior. * prefix_truncate_target_uses_hint_when_cloud_gc_below — cloud_gc below hint must not relax truncation. Passes today by accident; locks the post-fix behavior. A small test-only accessor on disk_log_impl bypasses the is_cloud_retention_active() gate in set_cloud_gc_offset so the cloud topics fixture (with no ntp_config overrides) can drive the value directly. Generalizing that gate so the space manager can drive cloud_topics partitions in production is a separate concern. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
In tiered_cloud mode the space manager publishes a cloud_gc offset per log when disk pressure builds. disk_log_impl::do_gc consumes the value for normal tiered storage, but cloud_topics partitions are exempt from that housekeeping and rely on ctp_stm::prefix_truncate_below_lro instead. Read cloud_gc through the new storage::log::cloud_gc_offset() getter and raise the truncate target when it would drive more aggressive eviction than the local-retention hint, capped at LRLO so we never truncate unreconciled data. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
…ions
The space manager (resource_mgmt/storage.cc) previously filtered out
cloud_topics partitions because they lack a remote_partition handle,
and disk_log_impl::{set_cloud_gc_offset,get_reclaimable_offsets}
explicitly rejected them via the is_cloud_retention_active() gate.
Combined with disk_log_impl::do_gc consuming any pending cloud_gc
unconditionally, the result was that the space manager could not
influence local retention on tiered_cloud partitions at all even
though their local log is exactly what disk pressure should reclaim.
Plumb the path end-to-end:
* Space manager: include partitions where cloud_topic_enabled() is
true in the eviction schedule.
* disk_log_impl::set_cloud_gc_offset and get_reclaimable_offsets:
accept cloud_topics partitions alongside tiered storage ones.
* disk_log_impl::do_gc: skip its cloud_gc consumption (and skip the
reset) when the partition is cloud_topics so the value is left for
the partition's own truncation loop.
* storage::log: add reset_cloud_gc_offset() so consumers can clear
the value after acting on it (mirrors do_gc's clear-after-use
pattern; ctp_stm needs it because it bypasses do_gc).
* ctp_stm::prefix_truncate_below_lro: after a successful
snapshot_and_truncate_log, clear cloud_gc so the next space-mgmt
round publishes a fresh decision rather than re-applying a stale
one.
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Adds test_space_manager_reclaim_under_pressure to the local-retention suite. With tight retention_local_target_capacity_bytes and retention_local_strict enabled, the space manager must shrink local footprint below the reconciler-published hint, confirming that max_removable_local_log_offset() is not gated by the hint. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR extends the cloud-topics (“ct”) pipeline to support local retention in tiered_cloud mode by having the reconciler compute/publish an allowed_local_start_offset hint, and by wiring space-management driven GC (cloud_gc_offset) through to cloud-topics prefix truncation. It also adds unit + end-to-end coverage around the new behavior.
Changes:
- Add reconciler-side evaluation/publishing of
allowed_local_start_offsetfortiered_cloud+ delete topics and associated bookkeeping/triggering. - Extend cloud-topics
ctp_stmprefix truncation to respectallowed_local_start_offset(hold back) andcloud_gc_offset(evict more aggressively under pressure). - Update space-management and storage surfaces to treat cloud-topics partitions as “cloud-backed” for reclaim decisions; add unit/e2e tests.
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/tiered_cloud_local_retention_test.py | New ducktape e2e coverage validating local footprint behavior + mode/policy flips + disk pressure reclaim. |
| src/v/storage/log.h | Adds cloud_gc_offset() / reset_cloud_gc_offset() virtual surfaces to storage::log. |
| src/v/storage/disk_log_impl.h | Implements new cloud_gc_offset() surfaces; adds is_cloud_backed(); test accessor friendship. |
| src/v/storage/disk_log_impl.cc | Introduces is_cloud_backed() and uses it to include cloud-topics in reclaimable/GC gating. |
| src/v/resource_mgmt/storage.cc | Includes cloud-topics partitions in eviction-policy reclaim collection (tiered storage + cloud topics). |
| src/v/raft/tests/failure_injectable_log.h | Extends test log wrapper interface for new cloud_gc_offset surfaces. |
| src/v/raft/tests/failure_injectable_log.cc | Delegates new cloud_gc_offset methods to underlying log. |
| src/v/cloud_topics/reconciler/tests/test_utils.h | Adds fake-source hooks for local-retention evaluator tests (compute target, publish capture, shape flags, segment size). |
| src/v/cloud_topics/reconciler/tests/reconciliation_source_test.cc | New unit tests for reconciliation_source local-retention bookkeeping defaults/mutators. |
| src/v/cloud_topics/reconciler/tests/reconciler_test.cc | Adds evaluator + “eval due” predicate tests, and ensures evaluator is invoked from reconcile(). |
| src/v/cloud_topics/reconciler/tests/BUILD | Wires new reconciliation_source gtest target. |
| src/v/cloud_topics/reconciler/reconciliation_source.h | Extends source interface with local-retention compute/publish + shape/segment-size hooks and evaluator bookkeeping state. |
| src/v/cloud_topics/reconciler/reconciliation_source.cc | Implements local-retention hint compute/publish logic for L0 sources. |
| src/v/cloud_topics/reconciler/reconciler.h | Adds local_retention_eval_due and evaluate_local_retention_hint entry points. |
| src/v/cloud_topics/reconciler/reconciler.cc | Runs per-tick evaluation pass; tracks per-source bytes; implements due predicate + evaluator. |
| src/v/cloud_topics/reconciler/BUILD | Adds deps needed for topic config/properties, storage, tristate. |
| src/v/cloud_topics/level_zero/stm/types.h | Adds new ctp_stm_key::set_allowed_local_start_offset. |
| src/v/cloud_topics/level_zero/stm/types.cc | Adds formatter string for the new STM key. |
| src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc | Adds test-only accessor for cloud_gc_offset + many tests for allowed_local_start_offset and prefix-truncate targeting logic. |
| src/v/cloud_topics/level_zero/stm/tests/ctp_stm_state_test.cc | Adds state tests for allowed_local_start_offset defaults/set/get/serde round-trip. |
| src/v/cloud_topics/level_zero/stm/tests/BUILD | Adds serde + storage deps needed by new tests. |
| src/v/cloud_topics/level_zero/stm/ctp_stm.h | Adds apply handler + prefix_truncate_target() API. |
| src/v/cloud_topics/level_zero/stm/ctp_stm.cc | Uses prefix_truncate_target() in background truncation loop; consumes/clears cloud_gc_offset; applies new STM command; implements targeting logic. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_state.h | Bumps serde version; adds allowed_local_start_offset field + accessors. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_state.cc | Implements allowed_local_start_offset accessors. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_commands.h | Adds set_allowed_local_start_offset_cmd. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_api.h | Adds API to replicate allowed_local_start_offset command. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc | Implements idempotent replication for allowed_local_start_offset. |
| src/v/cloud_topics/frontend/frontend.cc | Adjusts read path to serve from local log below LRO in tiered_cloud mode when safe (non-compacted). |
Comment on lines
+542
to
+548
| // Translate the kafka::offset hint to a log offset. to_log_offset | ||
| // may return a sentinel for offsets outside the translator's known | ||
| // range (e.g. a stale hint from a previous epoch); fall back to the | ||
| // cap in that case rather than feeding garbage into std::min. | ||
| auto hint_log = _raft->log()->to_log_offset(kafka::offset_cast(*hint)); | ||
| if (hint_log != model::offset{} && hint_log != model::offset::min()) { | ||
| target = std::min(cap, hint_log); |
Comment on lines
+57
to
+61
| const bool has_local_limit | ||
| = (!props.retention_local_target_bytes.is_disabled() | ||
| && props.retention_local_target_bytes.has_optional_value()) | ||
| || (!props.retention_local_target_ms.is_disabled() | ||
| && props.retention_local_target_ms.has_optional_value()); |
| lg.warn, | ||
| "{}: failed to publish allowed_local_start_offset: {}", | ||
| src->ntp(), | ||
| res.error()); |
Collaborator
CI test resultstest results on build#84717
|
This was referenced May 20, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is a last PR of the series.
More tests and integration with the space management.
The first PR: #30543
The previous PR: #30544
Summary
Add a single optional hint,
allowed_local_start_offset, replicated intoctp_stmstate. The reconciler is the sole writer of the hint. On eachtick it inspects the partition's topic config and the data it has reconciled,
computes a retention offset from
retention.local.target.{ms,bytes}exactlylike
disk_log_impl::housekeepingwould, and publishes it (it takes into accounttopic config, cluster level config, and retention_local_strict).
ctp_stm's truncate loop then usesprefix_truncate_target = min(LRLO, log_offset(hint))instead of just LRLO.That is the only consumer of the hint.
For
cloudmode (or compacted topics), the reconciler publishesnulloptandbehaviour is unchanged — aggressive eviction up to LRO.
The housekeeping in
the ctp_stmis already tracking active L0 readers so theraces with the local eviction are not possible.
Local retention rules
storage.modetiered_cloudnulloptSome(offset)tiered_cloudSome(_)tiered_cloudSome(_)nulloptcloudSome(_)nulloptcloudnulloptSpace manager: no changes needed
The space manager already publishes a
cloud_gcoffset on the log when diskpressure rises. In classic tiered,
disk_log_impl::housekeepingreads andclears it. For cloud topics:
ctp_stm::prefix_truncate_below_lroalready consultscloud_gcwhencomputing its truncation target, so the space manager can push the local
footprint below the reconciler's hint under pressure.
cloud_gcis cleared byctp_stm(sincedisk_log_impl::housekeepingdoesn't run).
partitions; the GC mechanism is identical to tiered.
max_removable_local_log_offset()is intentionally unchanged (returnsLRLO), so the reclaim path is not gated by the reconciler's hint.
Net result: the reconciler steers the steady-state local footprint, and the
space manager retains the ability to reclaim further on demand. The retention
logic (how much to keep) is the same as in classic tiered storage; only the
executor differs.
Component interactions
Summary
tiered_clouddisk_log_impl::housekeepingctp_stm::prefix_truncate_below_lrohousekeepingitselfreconciler(publishes hint via STM cmd)cloud_gcon logcloud_gcon log (same)cloud_gc?housekeepingctp_stmretention.local.target.{ms,bytes}→ offsetBackports Required
Release Notes
Features
tiered_cloudstorage mode matches tiered-storage