Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_indexing::actors::{
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType,
};
use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_indexing::merge_policy::{MergeOperation, MergeSource};
use quickwit_indexing::{IndexingSplitStore, SplitsUpdateMailbox};
use quickwit_metrics::{counter, gauge, histogram, label_values};
use quickwit_proto::indexing::MergePipelineId;
Expand Down Expand Up @@ -351,7 +351,7 @@ impl CompactionPipeline {
self.pipeline_start = Some(now);
// Kick off the pipeline.
merge_split_downloader_mailbox
.try_send_message(self.merge_operation.clone())
.try_send_message(MergeSource::Operation(self.merge_operation.clone()))
.map_err(|err| {
anyhow::anyhow!("failed to send merge operation to downloader: {err:?}")
})?;
Expand Down
7 changes: 3 additions & 4 deletions quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::split_file;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_indexing::actors::MergeExecutor;
use quickwit_indexing::merge_policy::{MergeOperation, MergeTask};
use quickwit_indexing::merge_policy::{MergeOperation, MergeSource, MergeTask};
use quickwit_indexing::models::MergeScratch;
use quickwit_indexing::{TestSandbox, get_tantivy_directory_from_split_bundle};
use quickwit_metastore::{
Expand Down Expand Up @@ -285,10 +285,9 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap());
}
let merge_operation = MergeOperation::new_merge_operation(split_metadatas);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_scratch = MergeScratch {
merge_operation,
merge_task: Some(merge_task),
merge_source: MergeSource::Task(merge_task),
merge_scratch_directory,
downloaded_splits_directory,
tantivy_dirs,
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1398,9 +1398,9 @@ mod tests {
// change whenever IndexingSettings fields are added/removed. Recompute
// by temporarily adding a test that prints
// `indexing_pipeline_params_fingerprint(&index_config, &source_config)`.
const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394;
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791;
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428;
const PARAMS_FINGERPRINT_INGEST_API: u64 = 7973087274884969148;
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 9420938500552890840;
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 16199199787360162635;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down
29 changes: 16 additions & 13 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use tracing::{debug, error, info, instrument, warn};

use crate::actors::Packager;
use crate::controlled_directory::ControlledDirectory;
use crate::merge_policy::MergeOperationType;
use crate::merge_policy::{MergeOperationType, MergeSource};
use crate::models::{IndexedSplit, IndexedSplitBatch, MergeScratch, PublishLock, SplitAttrs};

#[derive(Clone)]
Expand Down Expand Up @@ -85,20 +85,20 @@ impl Actor for MergeExecutor {
impl Handler<MergeScratch> for MergeExecutor {
type Reply = ();

#[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_operation.merge_parent_span.id(), skip_all)]
#[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_source.as_operation().merge_parent_span.id(), skip_all)]
async fn handle(
&mut self,
merge_scratch: MergeScratch,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let start = Instant::now();
let MergeScratch {
merge_operation,
merge_task,
merge_source,
tantivy_dirs,
merge_scratch_directory,
..
} = merge_scratch;
let merge_operation = merge_source.as_operation();
// On nodes running the split compaction architecture, merge pipelines are ephemeral, and we
// need to make sure there aren't too many CPU-bound operations occurring concurrently.
let _cpu_permit = match &self.merge_execution_semaphore {
Expand Down Expand Up @@ -164,15 +164,20 @@ impl Handler<MergeScratch> for MergeExecutor {
operation_type = %merge_operation.operation_type,
"merge-operation-success"
);
let batch_parent_span = merge_operation.merge_parent_span.clone();
let merge_task_opt = match merge_source {
MergeSource::Task(task) => Some(task),
MergeSource::Operation(_) => None,
};
ctx.send_message(
&self.merge_packager_mailbox,
IndexedSplitBatch {
splits: vec![indexed_split],
checkpoint_delta_opt: Default::default(),
publish_lock: PublishLock::default(),
publish_token_opt: None,
batch_parent_span: merge_operation.merge_parent_span.clone(),
merge_task_opt: merge_task,
batch_parent_span,
merge_task_opt,
},
)
.await?;
Expand Down Expand Up @@ -615,7 +620,7 @@ mod tests {
use tantivy::{Document, ReloadPolicy, TantivyDocument};

use super::*;
use crate::merge_policy::{MergeOperation, MergeTask};
use crate::merge_policy::{MergeOperation, MergeSource, MergeTask};
use crate::{TestSandbox, get_tantivy_directory_from_split_bundle, new_split_id};

#[tokio::test]
Expand Down Expand Up @@ -664,10 +669,9 @@ mod tests {
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap())
}
let merge_operation = MergeOperation::new_merge_operation(split_metas);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_scratch = MergeScratch {
merge_operation,
merge_task: Some(merge_task),
merge_source: MergeSource::Task(merge_task),
tantivy_dirs,
merge_scratch_directory,
downloaded_splits_directory,
Expand Down Expand Up @@ -810,10 +814,9 @@ mod tests {
.await?;
let tantivy_dir = get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap();
let merge_operation = MergeOperation::new_delete_and_merge_operation(new_split_metadata);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_scratch = MergeScratch {
merge_operation,
merge_task: Some(merge_task),
merge_source: MergeSource::Task(merge_task),
tantivy_dirs: vec![tantivy_dir],
merge_scratch_directory,
downloaded_splits_directory,
Expand Down
32 changes: 18 additions & 14 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ mod tests {

use crate::actors::MergePlanner;
use crate::merge_policy::{
MergePolicy, MergeTask, StableLogMergePolicy, merge_policy_from_settings,
MergePolicy, MergeSource, StableLogMergePolicy, merge_policy_from_settings,
};
use crate::models::NewSplits;

Expand Down Expand Up @@ -481,36 +481,40 @@ mod tests {
};
merge_planner_mailbox.send_message(message).await?;
merge_planner_handle.process_pending_and_observe().await;
let operations = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let operations = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();
assert_eq!(operations.len(), 3);
let mut merge_operations = operations
.into_iter()
.sorted_by_key(|op| (op.splits[0].partition_id, op.splits[0].doc_mapping_uid));
let mut merge_operations = operations.into_iter().sorted_by_key(|op| {
let op = op.as_operation();
(op.splits[0].partition_id, op.splits[0].doc_mapping_uid)
});

let first_merge_operation = merge_operations.next().unwrap();
assert_eq!(first_merge_operation.splits.len(), 4);
assert_eq!(first_merge_operation.as_operation().splits.len(), 4);
assert!(
first_merge_operation
.as_operation()
.splits
.iter()
.all(|split| split.partition_id == 1
&& split.doc_mapping_uid == doc_mapping_uid1)
);

let second_merge_operation = merge_operations.next().unwrap();
assert_eq!(second_merge_operation.splits.len(), 3);
assert_eq!(second_merge_operation.as_operation().splits.len(), 3);
assert!(
second_merge_operation
.as_operation()
.splits
.iter()
.all(|split| split.partition_id == 1
&& split.doc_mapping_uid == doc_mapping_uid2)
);

let third_merge_operation = merge_operations.next().unwrap();
assert_eq!(third_merge_operation.splits.len(), 3);
assert_eq!(third_merge_operation.as_operation().splits.len(), 3);
assert!(
third_merge_operation
.as_operation()
.splits
.iter()
.all(|split| split.partition_id == 2
Expand Down Expand Up @@ -580,7 +584,7 @@ mod tests {
// We wait for the first merge ops. If we sent the Quit message right away, it would have
// been queue before first `PlanMerge` message.
let merge_task_res = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
.recv_typed_message::<MergeSource>()
.await;
assert!(merge_task_res.is_ok());

Expand All @@ -594,15 +598,15 @@ mod tests {

let _ = merge_planner_handle.process_pending_and_observe().await;

let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();

assert!(merge_ops.is_empty());

merge_planner_mailbox.send_message(Command::Quit).await?;

let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();
assert!(merge_ops.is_empty());
universe.assert_quit().await;
Ok(())
Expand Down Expand Up @@ -672,7 +676,7 @@ mod tests {
merge_planner_mailbox.send_message(Command::Quit).await?;
let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();

assert!(merge_tasks.is_empty());
universe.assert_quit().await;
Expand Down Expand Up @@ -750,7 +754,7 @@ mod tests {

// Instead, we wait for the first merge ops.
let merge_task_res = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
.recv_typed_message::<MergeSource>()
.await;
assert!(merge_task_res.is_ok());

Expand All @@ -759,7 +763,7 @@ mod tests {
let (exit_status, _last_state) = merge_planner_handle.join().await;

assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();
assert!(merge_tasks.is_empty());

universe.assert_quit().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::error;
use super::MergeSplitDownloader;
#[cfg(feature = "metrics")]
use super::parquet_pipeline::{ParquetMergeSplitDownloader, ParquetMergeTask};
use crate::merge_policy::{MergeOperation, MergeTask, compute_merge_score};
use crate::merge_policy::{MergeOperation, MergeSource, MergeTask, compute_merge_score};
use crate::metrics::{ONGOING_MERGE_OPERATIONS, PENDING_MERGE_BYTES, PENDING_MERGE_OPERATIONS};

pub struct MergePermit {
Expand Down Expand Up @@ -229,7 +229,7 @@ impl MergeSchedulerService {
self.pending_merge_bytes -= merge_task.merge_operation.total_num_bytes();
PENDING_MERGE_OPERATIONS.set(self.pending_merge_queue.len() as f64);
PENDING_MERGE_BYTES.set(self.pending_merge_bytes as f64);
match split_downloader_mailbox.try_send_message(merge_task) {
match split_downloader_mailbox.try_send_message(MergeSource::Task(merge_task)) {
Ok(_) => {}
Err(quickwit_actors::TrySendError::Full(_)) => {
// The split downloader mailbox has an unbounded queue capacity,
Expand Down
57 changes: 17 additions & 40 deletions quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tantivy::Directory;
use tracing::{debug, info, instrument};

use super::MergeExecutor;
use crate::merge_policy::{MergeOperation, MergeTask};
use crate::merge_policy::MergeSource;
use crate::models::MergeScratch;
use crate::split_store::IndexingSplitStore;

Expand All @@ -49,48 +49,19 @@ impl Actor for MergeSplitDownloader {
}

#[async_trait]
impl Handler<MergeTask> for MergeSplitDownloader {
impl Handler<MergeSource> for MergeSplitDownloader {
type Reply = ();

#[instrument(
name = "merge_split_downloader",
parent = merge_task.merge_parent_span.id(),
parent = merge_source.as_operation().merge_parent_span.id(),
skip_all,
)]
async fn handle(
&mut self,
merge_task: MergeTask,
merge_source: MergeSource,
ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
let merge_operation = merge_task.merge_operation.as_ref().clone();
self.download_and_send(merge_operation, ctx).await
}
}

#[async_trait]
impl Handler<MergeOperation> for MergeSplitDownloader {
type Reply = ();

#[instrument(
name = "merge_split_downloader",
parent = merge_operation.merge_parent_span.id(),
skip_all,
)]
async fn handle(
&mut self,
merge_operation: MergeOperation,
ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
self.download_and_send(merge_operation, ctx).await
}
}

impl MergeSplitDownloader {
async fn download_and_send(
&mut self,
merge_operation: MergeOperation,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let merge_scratch_directory = temp_dir::Builder::default()
.join("merge")
.tempdir_in(self.scratch_directory.path())
Expand All @@ -102,14 +73,13 @@ impl MergeSplitDownloader {
.map_err(|error| anyhow::anyhow!(error))?;
let tantivy_dirs = self
.download_splits(
merge_operation.splits_as_slice(),
merge_source.as_operation().splits_as_slice(),
downloaded_splits_directory.path(),
ctx,
)
.await?;
let msg = MergeScratch {
merge_operation,
merge_task: None,
merge_source,
merge_scratch_directory,
downloaded_splits_directory,
tantivy_dirs,
Expand Down Expand Up @@ -164,7 +134,7 @@ mod tests {
use quickwit_storage::{PutPayload, RamStorageBuilder, SplitPayloadBuilder};

use super::*;
use crate::merge_policy::MergeOperation;
use crate::merge_policy::{MergeOperation, MergeTask};
use crate::new_split_id;

#[tokio::test]
Expand Down Expand Up @@ -205,7 +175,7 @@ mod tests {
let merge_operation: MergeOperation = MergeOperation::new_merge_operation(splits_to_merge);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
merge_split_downloader_mailbox
.send_message(merge_task)
.send_message(MergeSource::Task(merge_task))
.await?;
merge_split_downloader_handler
.process_pending_and_observe()
Expand All @@ -218,8 +188,15 @@ mod tests {
.unwrap()
.downcast::<MergeScratch>()
.unwrap();
assert_eq!(merge_scratch.merge_operation.splits_as_slice().len(), 10);
for split in merge_scratch.merge_operation.splits_as_slice() {
assert_eq!(
merge_scratch
.merge_source
.as_operation()
.splits_as_slice()
.len(),
10
);
for split in merge_scratch.merge_source.as_operation().splits_as_slice() {
let split_filename = split_file(split.split_id());
let split_filepath = merge_scratch
.downloaded_splits_directory
Expand Down
Loading
Loading