From e013e250ff62dd0e94195a29ebbf7e99d2dc2e80 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Fri, 22 May 2026 14:33:32 -0400 Subject: [PATCH 1/4] Rework merge task for both compactor and indexer merge flows --- .../src/compaction_pipeline.rs | 4 +- quickwit/quickwit-indexing/failpoints/mod.rs | 7 +-- .../src/actors/merge_executor.rs | 29 +++++----- .../src/actors/merge_scheduler_service.rs | 4 +- .../src/actors/merge_split_downloader.rs | 57 ++++++------------- .../quickwit-indexing/src/merge_policy/mod.rs | 22 +++++++ .../src/models/merge_scratch.rs | 8 +-- 7 files changed, 64 insertions(+), 67 deletions(-) diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs index 9fc2b261967..d07dbb5ee8b 100644 --- a/quickwit/quickwit-compaction/src/compaction_pipeline.rs +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -25,7 +25,7 @@ use quickwit_doc_mapper::DocMapper; use quickwit_indexing::actors::{ MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType, }; -use quickwit_indexing::merge_policy::MergeOperation; +use quickwit_indexing::merge_policy::{MergeOperation, MergeSource}; use quickwit_indexing::{IndexingSplitStore, SplitsUpdateMailbox}; use quickwit_metrics::{counter, gauge, histogram, label_values}; use quickwit_proto::indexing::MergePipelineId; @@ -351,7 +351,7 @@ impl CompactionPipeline { self.pipeline_start = Some(now); // Kick off the pipeline. merge_split_downloader_mailbox - .try_send_message(self.merge_operation.clone()) + .try_send_message(MergeSource::Operation(self.merge_operation.clone())) .map_err(|err| { anyhow::anyhow!("failed to send merge operation to downloader: {err:?}") })?; diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index 25b6522851b..5b4ffe5f0c6 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -40,7 +40,7 @@ use quickwit_common::rand::append_random_suffix; use quickwit_common::split_file; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::actors::MergeExecutor; -use quickwit_indexing::merge_policy::{MergeOperation, MergeTask}; +use quickwit_indexing::merge_policy::{MergeOperation, MergeSource, MergeTask}; use quickwit_indexing::models::MergeScratch; use quickwit_indexing::{TestSandbox, get_tantivy_directory_from_split_bundle}; use quickwit_metastore::{ @@ -285,10 +285,9 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap()); } let merge_operation = MergeOperation::new_merge_operation(split_metadatas); - let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone()); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); let merge_scratch = MergeScratch { - merge_operation, - merge_task: Some(merge_task), + merge_source: MergeSource::Task(merge_task), merge_scratch_directory, downloaded_splits_directory, tantivy_dirs, diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index d51796e9774..9995a7d4263 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -47,7 +47,7 @@ use tracing::{debug, error, info, instrument, warn}; use crate::actors::Packager; use crate::controlled_directory::ControlledDirectory; -use crate::merge_policy::MergeOperationType; +use crate::merge_policy::{MergeOperationType, MergeSource}; use crate::models::{IndexedSplit, IndexedSplitBatch, MergeScratch, PublishLock, SplitAttrs}; #[derive(Clone)] @@ -85,7 +85,7 @@ impl Actor for MergeExecutor { impl Handler for MergeExecutor { type Reply = (); - #[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_operation.merge_parent_span.id(), skip_all)] + #[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_source.as_operation().merge_parent_span.id(), skip_all)] async fn handle( &mut self, merge_scratch: MergeScratch, @@ -93,12 +93,12 @@ impl Handler for MergeExecutor { ) -> Result<(), ActorExitStatus> { let start = Instant::now(); let MergeScratch { - merge_operation, - merge_task, + merge_source, tantivy_dirs, merge_scratch_directory, .. } = merge_scratch; + let merge_operation = merge_source.as_operation(); // On nodes running the split compaction architecture, merge pipelines are ephemeral, and we // need to make sure there aren't too many CPU-bound operations occurring concurrently. let _cpu_permit = match &self.merge_execution_semaphore { @@ -164,6 +164,11 @@ impl Handler for MergeExecutor { operation_type = %merge_operation.operation_type, "merge-operation-success" ); + let batch_parent_span = merge_operation.merge_parent_span.clone(); + let merge_task_opt = match merge_source { + MergeSource::Task(task) => Some(task), + MergeSource::Operation(_) => None, + }; ctx.send_message( &self.merge_packager_mailbox, IndexedSplitBatch { @@ -171,8 +176,8 @@ impl Handler for MergeExecutor { checkpoint_delta_opt: Default::default(), publish_lock: PublishLock::default(), publish_token_opt: None, - batch_parent_span: merge_operation.merge_parent_span.clone(), - merge_task_opt: merge_task, + batch_parent_span, + merge_task_opt, }, ) .await?; @@ -615,7 +620,7 @@ mod tests { use tantivy::{Document, ReloadPolicy, TantivyDocument}; use super::*; - use crate::merge_policy::{MergeOperation, MergeTask}; + use crate::merge_policy::{MergeOperation, MergeSource, MergeTask}; use crate::{TestSandbox, get_tantivy_directory_from_split_bundle, new_split_id}; #[tokio::test] @@ -664,10 +669,9 @@ mod tests { tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap()) } let merge_operation = MergeOperation::new_merge_operation(split_metas); - let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone()); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); let merge_scratch = MergeScratch { - merge_operation, - merge_task: Some(merge_task), + merge_source: MergeSource::Task(merge_task), tantivy_dirs, merge_scratch_directory, downloaded_splits_directory, @@ -810,10 +814,9 @@ mod tests { .await?; let tantivy_dir = get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap(); let merge_operation = MergeOperation::new_delete_and_merge_operation(new_split_metadata); - let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone()); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); let merge_scratch = MergeScratch { - merge_operation, - merge_task: Some(merge_task), + merge_source: MergeSource::Task(merge_task), tantivy_dirs: vec![tantivy_dir], merge_scratch_directory, downloaded_splits_directory, diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index dfe832f0c61..8e636b0e551 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -29,7 +29,7 @@ use tracing::error; use super::MergeSplitDownloader; #[cfg(feature = "metrics")] use super::parquet_pipeline::{ParquetMergeSplitDownloader, ParquetMergeTask}; -use crate::merge_policy::{MergeOperation, MergeTask, compute_merge_score}; +use crate::merge_policy::{MergeOperation, MergeSource, MergeTask, compute_merge_score}; use crate::metrics::{ONGOING_MERGE_OPERATIONS, PENDING_MERGE_BYTES, PENDING_MERGE_OPERATIONS}; pub struct MergePermit { @@ -229,7 +229,7 @@ impl MergeSchedulerService { self.pending_merge_bytes -= merge_task.merge_operation.total_num_bytes(); PENDING_MERGE_OPERATIONS.set(self.pending_merge_queue.len() as f64); PENDING_MERGE_BYTES.set(self.pending_merge_bytes as f64); - match split_downloader_mailbox.try_send_message(merge_task) { + match split_downloader_mailbox.try_send_message(MergeSource::Task(merge_task)) { Ok(_) => {} Err(quickwit_actors::TrySendError::Full(_)) => { // The split downloader mailbox has an unbounded queue capacity, diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 1911ecc0363..8a2aa499e7b 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -23,7 +23,7 @@ use tantivy::Directory; use tracing::{debug, info, instrument}; use super::MergeExecutor; -use crate::merge_policy::{MergeOperation, MergeTask}; +use crate::merge_policy::MergeSource; use crate::models::MergeScratch; use crate::split_store::IndexingSplitStore; @@ -49,48 +49,19 @@ impl Actor for MergeSplitDownloader { } #[async_trait] -impl Handler for MergeSplitDownloader { +impl Handler for MergeSplitDownloader { type Reply = (); #[instrument( name = "merge_split_downloader", - parent = merge_task.merge_parent_span.id(), + parent = merge_source.as_operation().merge_parent_span.id(), skip_all, )] async fn handle( &mut self, - merge_task: MergeTask, + merge_source: MergeSource, ctx: &ActorContext, ) -> Result<(), quickwit_actors::ActorExitStatus> { - let merge_operation = merge_task.merge_operation.as_ref().clone(); - self.download_and_send(merge_operation, ctx).await - } -} - -#[async_trait] -impl Handler for MergeSplitDownloader { - type Reply = (); - - #[instrument( - name = "merge_split_downloader", - parent = merge_operation.merge_parent_span.id(), - skip_all, - )] - async fn handle( - &mut self, - merge_operation: MergeOperation, - ctx: &ActorContext, - ) -> Result<(), quickwit_actors::ActorExitStatus> { - self.download_and_send(merge_operation, ctx).await - } -} - -impl MergeSplitDownloader { - async fn download_and_send( - &mut self, - merge_operation: MergeOperation, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { let merge_scratch_directory = temp_dir::Builder::default() .join("merge") .tempdir_in(self.scratch_directory.path()) @@ -102,14 +73,13 @@ impl MergeSplitDownloader { .map_err(|error| anyhow::anyhow!(error))?; let tantivy_dirs = self .download_splits( - merge_operation.splits_as_slice(), + merge_source.as_operation().splits_as_slice(), downloaded_splits_directory.path(), ctx, ) .await?; let msg = MergeScratch { - merge_operation, - merge_task: None, + merge_source, merge_scratch_directory, downloaded_splits_directory, tantivy_dirs, @@ -164,7 +134,7 @@ mod tests { use quickwit_storage::{PutPayload, RamStorageBuilder, SplitPayloadBuilder}; use super::*; - use crate::merge_policy::MergeOperation; + use crate::merge_policy::{MergeOperation, MergeTask}; use crate::new_split_id; #[tokio::test] @@ -205,7 +175,7 @@ mod tests { let merge_operation: MergeOperation = MergeOperation::new_merge_operation(splits_to_merge); let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); merge_split_downloader_mailbox - .send_message(merge_task) + .send_message(MergeSource::Task(merge_task)) .await?; merge_split_downloader_handler .process_pending_and_observe() @@ -218,8 +188,15 @@ mod tests { .unwrap() .downcast::() .unwrap(); - assert_eq!(merge_scratch.merge_operation.splits_as_slice().len(), 10); - for split in merge_scratch.merge_operation.splits_as_slice() { + assert_eq!( + merge_scratch + .merge_source + .as_operation() + .splits_as_slice() + .len(), + 10 + ); + for split in merge_scratch.merge_source.as_operation().splits_as_slice() { let split_filename = split_file(split.split_id()); let split_filepath = merge_scratch .downloaded_splits_directory diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 408dc7cd798..dd429fbeba6 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -64,6 +64,28 @@ impl MergeTask { } } +/// Carries either a scheduled merge task (old pipeline, with RAII permit + inventory tracking) +/// or a bare operation (compactor pipeline, which manages concurrency and dedup independently). +pub enum MergeSource { + Task(MergeTask), + Operation(MergeOperation), +} + +impl MergeSource { + pub fn as_operation(&self) -> &MergeOperation { + match self { + MergeSource::Task(task) => task, + MergeSource::Operation(op) => op, + } + } +} + +impl fmt::Debug for MergeSource { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.as_operation().fmt(f) + } +} + impl fmt::Debug for MergeTask { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.merge_operation.as_ref().fmt(f) diff --git a/quickwit/quickwit-indexing/src/models/merge_scratch.rs b/quickwit/quickwit-indexing/src/models/merge_scratch.rs index a0296f69d2d..a952b24f500 100644 --- a/quickwit/quickwit-indexing/src/models/merge_scratch.rs +++ b/quickwit/quickwit-indexing/src/models/merge_scratch.rs @@ -15,15 +15,11 @@ use quickwit_common::temp_dir::TempDirectory; use tantivy::Directory; -use crate::merge_policy::{MergeOperation, MergeTask}; +use crate::merge_policy::MergeSource; #[derive(Debug)] pub struct MergeScratch { - /// The merge operation data (splits, merge_split_id, operation type). - pub merge_operation: MergeOperation, - // TODO: remove once the old MergePipeline is deleted and the - // DeleteTaskPipeline no longer routes through MergeSchedulerService. - pub merge_task: Option, + pub merge_source: MergeSource, pub merge_scratch_directory: TempDirectory, pub downloaded_splits_directory: TempDirectory, pub tantivy_dirs: Vec>, From 77eb5ec9a1c52cbd73075d85a524b67f9af09db6 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Fri, 22 May 2026 15:01:59 -0400 Subject: [PATCH 2/4] fingerprint update --- quickwit/quickwit-indexing/src/actors/indexing_service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 8fe95f79cf7..ae2232f45d3 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1398,9 +1398,9 @@ mod tests { // change whenever IndexingSettings fields are added/removed. Recompute // by temporarily adding a test that prints // `indexing_pipeline_params_fingerprint(&index_config, &source_config)`. - const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394; - const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791; - const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428; + const PARAMS_FINGERPRINT_INGEST_API: u64 = 7973087274884969148; + const PARAMS_FINGERPRINT_SOURCE_1: u64 = 9420938500552890840; + const PARAMS_FINGERPRINT_SOURCE_2: u64 = 16199199787360162635; quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); From ebea0bea5759779b1eedf3cd5322be58c15ab50d Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Fri, 22 May 2026 15:19:11 -0400 Subject: [PATCH 3/4] more test fixes --- .../src/actors/merge_planner.rs | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index f807572c252..0ec578a9ade 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -376,7 +376,7 @@ mod tests { use crate::actors::MergePlanner; use crate::merge_policy::{ - MergePolicy, MergeTask, StableLogMergePolicy, merge_policy_from_settings, + MergePolicy, MergeSource, StableLogMergePolicy, merge_policy_from_settings, }; use crate::models::NewSplits; @@ -481,16 +481,20 @@ mod tests { }; merge_planner_mailbox.send_message(message).await?; merge_planner_handle.process_pending_and_observe().await; - let operations = merge_split_downloader_inbox.drain_for_test_typed::(); + let operations = merge_split_downloader_inbox.drain_for_test_typed::(); assert_eq!(operations.len(), 3); let mut merge_operations = operations .into_iter() - .sorted_by_key(|op| (op.splits[0].partition_id, op.splits[0].doc_mapping_uid)); + .sorted_by_key(|op| { + let op = op.as_operation(); + (op.splits[0].partition_id, op.splits[0].doc_mapping_uid) + }); let first_merge_operation = merge_operations.next().unwrap(); - assert_eq!(first_merge_operation.splits.len(), 4); + assert_eq!(first_merge_operation.as_operation().splits.len(), 4); assert!( first_merge_operation + .as_operation() .splits .iter() .all(|split| split.partition_id == 1 @@ -498,9 +502,10 @@ mod tests { ); let second_merge_operation = merge_operations.next().unwrap(); - assert_eq!(second_merge_operation.splits.len(), 3); + assert_eq!(second_merge_operation.as_operation().splits.len(), 3); assert!( second_merge_operation + .as_operation() .splits .iter() .all(|split| split.partition_id == 1 @@ -508,9 +513,10 @@ mod tests { ); let third_merge_operation = merge_operations.next().unwrap(); - assert_eq!(third_merge_operation.splits.len(), 3); + assert_eq!(third_merge_operation.as_operation().splits.len(), 3); assert!( third_merge_operation + .as_operation() .splits .iter() .all(|split| split.partition_id == 2 @@ -580,7 +586,7 @@ mod tests { // We wait for the first merge ops. If we sent the Quit message right away, it would have // been queue before first `PlanMerge` message. let merge_task_res = merge_split_downloader_inbox - .recv_typed_message::() + .recv_typed_message::() .await; assert!(merge_task_res.is_ok()); @@ -594,7 +600,7 @@ mod tests { let _ = merge_planner_handle.process_pending_and_observe().await; - let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::(); + let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::(); assert!(merge_ops.is_empty()); @@ -602,7 +608,7 @@ mod tests { let (exit_status, _last_state) = merge_planner_handle.join().await; assert!(matches!(exit_status, ActorExitStatus::Quit)); - let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::(); + let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::(); assert!(merge_ops.is_empty()); universe.assert_quit().await; Ok(()) @@ -672,7 +678,7 @@ mod tests { merge_planner_mailbox.send_message(Command::Quit).await?; let (exit_status, _last_state) = merge_planner_handle.join().await; assert!(matches!(exit_status, ActorExitStatus::Quit)); - let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::(); + let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::(); assert!(merge_tasks.is_empty()); universe.assert_quit().await; @@ -750,7 +756,7 @@ mod tests { // Instead, we wait for the first merge ops. let merge_task_res = merge_split_downloader_inbox - .recv_typed_message::() + .recv_typed_message::() .await; assert!(merge_task_res.is_ok()); @@ -759,7 +765,7 @@ mod tests { let (exit_status, _last_state) = merge_planner_handle.join().await; assert!(matches!(exit_status, ActorExitStatus::Quit)); - let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::(); + let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::(); assert!(merge_tasks.is_empty()); universe.assert_quit().await; From 07783bfd0c4cbd0b829c582a777a396f0f240e1b Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Fri, 22 May 2026 15:30:41 -0400 Subject: [PATCH 4/4] lints --- quickwit/quickwit-indexing/src/actors/merge_planner.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 0ec578a9ade..0f2a9475b17 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -483,12 +483,10 @@ mod tests { merge_planner_handle.process_pending_and_observe().await; let operations = merge_split_downloader_inbox.drain_for_test_typed::(); assert_eq!(operations.len(), 3); - let mut merge_operations = operations - .into_iter() - .sorted_by_key(|op| { - let op = op.as_operation(); - (op.splits[0].partition_id, op.splits[0].doc_mapping_uid) - }); + let mut merge_operations = operations.into_iter().sorted_by_key(|op| { + let op = op.as_operation(); + (op.splits[0].partition_id, op.splits[0].doc_mapping_uid) + }); let first_merge_operation = merge_operations.next().unwrap(); assert_eq!(first_merge_operation.as_operation().splits.len(), 4);