diff --git a/docs/configuration/storage-config.md b/docs/configuration/storage-config.md index 4cd46285394..a96cfa5ad2a 100644 --- a/docs/configuration/storage-config.md +++ b/docs/configuration/storage-config.md @@ -46,17 +46,18 @@ This section contains one configuration subsection per storage provider. If a st ### S3 storage configuration -| Property | Description | Default value | -| --- | --- | --- | -| `flavor` | The optional storage flavor to use. Available flavors are `digital_ocean`, `garage`, `gcs`, and `minio`. | | -| `access_key_id` | The AWS access key ID. | | -| `secret_access_key` | The AWS secret access key. | | -| `region` | The AWS region to send requests to. | `us-east-1` (SDK default) | -| `endpoint` | Custom endpoint for use with S3-compatible providers. | SDK default | -| `force_path_style_access` | Disables [virtual-hosted–style](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html) requests. Required by some S3-compatible providers (Ceph, MinIO). | `false` | -| `disable_multi_object_delete` | Disables [Multi-Object Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) requests. Required by some S3-compatible providers (GCS). | `false` | -| `disable_multipart_upload` | Disables [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) of objects. Required by some S3-compatible providers (GCS). | `false` | -| `disable_checksums` | Disables [checksums](https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html) on requests and responses. Required by S3-compatible providers that do not support the additional checksum algorithms enabled by default in recent versions of the AWS SDK (Digital Ocean, Garage, GCS, MinIO). | `false` | +| Property | Description | Default value | +| --- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------| +| `flavor` | The optional storage flavor to use. Available flavors are `digital_ocean`, `garage`, `gcs`, and `minio`. | | +| `access_key_id` | The AWS access key ID. | | +| `secret_access_key` | The AWS secret access key. | | +| `region` | The AWS region to send requests to. | `us-east-1` (SDK default) | +| `endpoint` | Custom endpoint for use with S3-compatible providers. | SDK default | +| `force_path_style_access` | Disables [virtual-hosted–style](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html) requests. Required by some S3-compatible providers (Ceph, MinIO). | `false` | +| `disable_multi_object_delete` | Disables [Multi-Object Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) requests. Required by some S3-compatible providers (GCS). | `false` | +| `disable_multipart_upload` | Disables [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) of objects. Required by some S3-compatible providers (GCS). | `false` | +| `checksum_algorithm` | Upload [checksum](https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html) algorithm. Allowed values: `crc32c` (computed and validated by the AWS SDK), `md5` (sent client-side via `Content-MD5`; useful for S3-compatible providers that predate `x-amz-checksum-*`), or `disabled`. | `crc32c` | +| `disable_checksums` | **Deprecated.** Previously a boolean that disabled all request/response checksums. Equivalent to setting `checksum_algorithm: disabled`. | `false` | :::warning Hardcoding credentials into configuration files is not secure and strongly discouraged. Prefer the alternative authentication methods that your storage backend may provide. @@ -79,11 +80,11 @@ Storage flavors ensure that Quickwit works correctly with storage providers that *Digital Ocean* -The Digital Ocean flavor (`digital_ocean`) forces path-style access, turns off multi-object delete requests, and disables checksums. +The Digital Ocean flavor (`digital_ocean`) forces path-style access and turns off multi-object delete requests. *Garage flavor* -The Garage flavor (`garage`) overrides the `region` parameter to `garage`, forces path-style access, and disables checksums. +The Garage flavor (`garage`) overrides the `region` parameter to `garage` and forces path-style access. *Google Cloud Storage* @@ -91,7 +92,7 @@ The Google Cloud Storage flavor (`gcs`) turns off multi-object delete requests, *MinIO flavor* -The MinIO flavor (`minio`) overrides the `region` parameter to `minio`, forces path-style access, and disables checksums. +The MinIO flavor (`minio`) overrides the `region` parameter to `minio` and forces path-style access. Example of a storage configuration for Google Cloud Storage in YAML format: diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index b10afbc8b0b..eacaceb1e9a 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,8 +80,9 @@ pub use crate::node_config::{ }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ - AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + AzureStorageConfig, ChecksumAlgorithm, FileStorageConfig, GoogleCloudStorageConfig, + RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 88795a9228c..e469e1d1a72 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -38,6 +38,20 @@ pub enum StorageBackend { S3, } +/// Strategy used to checksum object-storage uploads. +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ChecksumAlgorithm { + /// CRC32C, computed and validated by the AWS SDK. Native S3 default. + #[default] + Crc32c, + /// MD5 (Content-MD5 header), computed client-side. Used by S3-compatible + /// implementations that predate the SDK's `x-amz-checksum-*` headers. + Md5, + /// No upload checksum is sent and no response checksum is validated. + Disabled, +} + #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum StorageBackendFlavor { @@ -330,7 +344,10 @@ pub struct S3StorageConfig { #[serde(default)] pub disable_multipart_upload: bool, #[serde(default)] - pub disable_checksums: bool, + pub checksum_algorithm: ChecksumAlgorithm, + /// Deprecated: applies into `checksum_algorithm: disabled`. + #[serde(default, skip_serializing)] + pub disable_checksums: Option, #[serde(default)] pub disable_stalled_stream_protection_upload: bool, #[serde(default)] @@ -343,25 +360,27 @@ impl S3StorageConfig { Some(StorageBackendFlavor::DigitalOcean) => { self.force_path_style_access = true; self.disable_multi_object_delete = true; - self.disable_checksums = true; } Some(StorageBackendFlavor::Garage) => { self.region = Some("garage".to_string()); self.force_path_style_access = true; - self.disable_checksums = true; } Some(StorageBackendFlavor::Gcs) => { self.disable_multi_object_delete = true; self.disable_multipart_upload = true; - self.disable_checksums = true; + // doesnt support CRC32C via the S3 SDK + self.checksum_algorithm = ChecksumAlgorithm::Disabled; } Some(StorageBackendFlavor::MinIO) => { self.region = Some("minio".to_string()); self.force_path_style_access = true; - self.disable_checksums = true; } _ => {} } + // Legacy: honor `disable_checksums: true` from older configs. + if matches!(self.disable_checksums, Some(true)) { + self.checksum_algorithm = ChecksumAlgorithm::Disabled; + } } pub fn redact(&mut self) { @@ -404,7 +423,7 @@ impl fmt::Debug for S3StorageConfig { &self.disable_multi_object_delete, ) .field("disable_multipart_upload", &self.disable_multipart_upload) - .field("disable_checksums", &self.disable_checksums) + .field("checksum_algorithm", &self.checksum_algorithm) .field( "disable_stalled_stream_protection_upload", &self.disable_stalled_stream_protection_upload, @@ -647,7 +666,7 @@ mod tests { force_path_style_access: true disable_multi_object_delete_requests: true disable_multipart_upload: true - disable_checksums: true + checksum_algorithm: disabled disable_stalled_stream_protection_upload: true disable_stalled_stream_protection_download: true "#; @@ -660,7 +679,7 @@ mod tests { force_path_style_access: true, disable_multi_object_delete: true, disable_multipart_upload: true, - disable_checksums: true, + checksum_algorithm: ChecksumAlgorithm::Disabled, disable_stalled_stream_protection_upload: true, disable_stalled_stream_protection_download: true, ..Default::default() diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index a407ccdaeca..b59177ce442 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -33,7 +33,9 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::{AggregatedBytes, ByteStream}; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; +use aws_sdk_s3::types::{ + ChecksumAlgorithm, CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, +}; use base64::prelude::{BASE64_STANDARD, Engine}; use bytes::Bytes; use futures::{StreamExt, stream}; @@ -94,6 +96,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + checksum_strategy: quickwit_config::ChecksumAlgorithm, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -154,7 +157,10 @@ pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { s3_config.set_stalled_stream_protection(Some(stalled_stream_protection)); s3_config.set_timeout_config(aws_config.timeout_config().cloned()); - if s3_storage_config.disable_checksums { + if matches!( + s3_storage_config.checksum_algorithm, + quickwit_config::ChecksumAlgorithm::Disabled + ) { s3_config.set_request_checksum_calculation(Some(RequestChecksumCalculation::WhenRequired)); s3_config.set_response_checksum_validation(Some(ResponseChecksumValidation::WhenRequired)); } @@ -198,6 +204,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + checksum_strategy: s3_storage_config.checksum_algorithm, }) } @@ -215,6 +222,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + checksum_strategy: self.checksum_strategy, } } @@ -244,19 +252,17 @@ pub fn parse_s3_uri(uri: &Uri) -> Option<(String, PathBuf)> { Some((bucket, prefix)) } -#[derive(Clone, Debug)] -struct MultipartUploadId(pub String); - -#[derive(Clone, Debug)] -struct Part { - pub part_number: usize, - pub range: Range, - pub md5: md5::Digest, -} - -impl Part { - fn len(&self) -> u64 { - self.range.end - self.range.start +/// Maps a [`ChecksumAlgorithm`] onto the AWS SDK's flexible-checksum algorithm. +/// `Md5` returns `None` because the S3 SDK silently no-ops `ChecksumAlgorithm::Md5`; +/// MD5 is instead sent via the legacy `Content-MD5` header, computed client-side. +fn aws_checksum_algorithm( + strategy: quickwit_config::ChecksumAlgorithm, +) -> Option { + match strategy { + quickwit_config::ChecksumAlgorithm::Crc32c => Some(ChecksumAlgorithm::Crc32C), + quickwit_config::ChecksumAlgorithm::Md5 | quickwit_config::ChecksumAlgorithm::Disabled => { + None + } } } @@ -274,6 +280,24 @@ async fn compute_md5(mut read: T) -> io::Resu } } +#[derive(Clone, Debug)] +struct MultipartUploadId(pub String); + +#[derive(Clone, Debug)] +struct Part { + pub part_number: usize, + pub range: Range, + /// Pre-computed MD5 of the part bytes; only populated when + /// [`ChecksumAlgorithm::Md5`] is in use. + pub md5: Option, +} + +impl Part { + fn len(&self) -> u64 { + self.range.end - self.range.start + } +} + impl S3CompatibleObjectStorage { fn key(&self, relative_path: &Path) -> String { // FIXME: This may not work on Windows. @@ -310,6 +334,7 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .send() .await .map_err(|sdk_error| { @@ -322,6 +347,7 @@ impl S3CompatibleObjectStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn put_single_part<'a>( &'a self, key: &'a str, @@ -343,6 +369,7 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .key(key) .send() .await @@ -356,32 +383,42 @@ impl S3CompatibleObjectStorage { Ok(MultipartUploadId(upload_id)) } + /// Returns the MD5 of the byte range when the configured strategy is + /// [`ChecksumAlgorithm::Md5`], otherwise `None` (no I/O performed). + async fn maybe_compute_part_md5( + &self, + payload: &dyn crate::PutPayload, + range: Range, + ) -> io::Result> { + if !matches!( + self.checksum_strategy, + quickwit_config::ChecksumAlgorithm::Md5 + ) { + return Ok(None); + } + let read = payload.range_byte_stream(range).await?.into_async_read(); + Ok(Some(compute_md5(read).await?)) + } + async fn create_multipart_requests( &self, - payload: Box, + payload: &dyn crate::PutPayload, len: u64, part_len: u64, ) -> io::Result> { assert!(len > 0); - let multipart_ranges = chunk_range(0..len as usize, part_len as usize) + let multipart_ranges: Vec> = chunk_range(0..len as usize, part_len as usize) .map(into_u64_range) - .collect::>(); - + .collect(); let mut parts = Vec::with_capacity(multipart_ranges.len()); - for (multipart_id, multipart_range) in multipart_ranges.into_iter().enumerate() { - let read = payload - .range_byte_stream(multipart_range.clone()) - .await? - .into_async_read(); - let md5 = compute_md5(read).await?; - - let part = Part { + parts.push(Part { part_number: multipart_id + 1, // parts are 1-indexed + md5: self + .maybe_compute_part_md5(payload, multipart_range.clone()) + .await?, range: multipart_range, - md5, - }; - parts.push(part); + }); } Ok(parts) } @@ -420,6 +457,7 @@ impl S3CompatibleObjectStorage { Ok(delete_requests) } + #[tracing::instrument(level = "debug", skip_all, fields(part_number = part.part_number, num_bytes=part.len()))] async fn upload_part<'a>( &'a self, upload_id: MultipartUploadId, @@ -432,33 +470,39 @@ impl S3CompatibleObjectStorage { .await .map_err(StorageError::from) .map_err(Retry::Permanent)?; - let md5 = BASE64_STANDARD.encode(part.md5.0); crate::metrics::OBJECT_STORAGE_PUT_PARTS.inc(); crate::metrics::OBJECT_STORAGE_UPLOAD_NUM_BYTES.inc_by(part.len()); + let content_md5 = part.md5.map(|digest| BASE64_STANDARD.encode(digest.0)); let upload_part_output = self .s3_client .upload_part() .bucket(self.bucket.clone()) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) + // None if checksum isnt md5. + .set_content_md5(content_md5) .key(key) .body(byte_stream) .content_length(part.len() as i64) - .content_md5(md5) .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() .await - .map_err(|sdk_error| { - if sdk_error.is_retryable() { - Retry::Transient(StorageError::from(sdk_error)) + .map_err(|s3_err| { + if s3_err.is_retryable() { + Retry::Transient(StorageError::from(s3_err)) } else { - Retry::Permanent(StorageError::from(sdk_error)) + Retry::Permanent(StorageError::from(s3_err)) } })?; + // Only one checksum field is populated by the SDK, matching the algorithm we + // advertised on the upload; the rest stay `None`. let completed_part = CompletedPart::builder() .set_e_tag(upload_part_output.e_tag) + .set_checksum_crc32_c(upload_part_output.checksum_crc32_c) + .set_checksum_md5(upload_part_output.checksum_md5) .part_number(part.part_number as i32) .build(); Ok(completed_part) @@ -473,7 +517,7 @@ impl S3CompatibleObjectStorage { ) -> StorageResult<()> { let upload_id = self.create_multipart_upload(key).await?; let parts = self - .create_multipart_requests(payload.clone(), total_len, part_len) + .create_multipart_requests(payload.as_ref(), total_len, part_len) .await?; let max_concurrent_upload = self.multipart_policy.max_concurrent_uploads(); let completed_parts_res: StorageResult> = @@ -488,7 +532,7 @@ impl S3CompatibleObjectStorage { .collect::>() .await .into_iter() - .map(|res| res.map_err(|error| error.into_inner())) + .map(|res| res.map_err(|e| e.into_inner())) .collect(); match completed_parts_res { Ok(completed_parts) => { @@ -928,7 +972,6 @@ mod tests { let data = (0..1_500_000).map(|el| el as u8).collect::>(); let md5 = compute_md5(data.as_slice()).await?; assert_eq!(md5, md5::compute(data)); - Ok(()) } @@ -994,6 +1037,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: quickwit_config::ChecksumAlgorithm::Crc32c, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1041,6 +1085,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + checksum_strategy: quickwit_config::ChecksumAlgorithm::Crc32c, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1078,6 +1123,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: quickwit_config::ChecksumAlgorithm::Crc32c, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1160,6 +1206,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: quickwit_config::ChecksumAlgorithm::Crc32c, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1251,6 +1298,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: quickwit_config::ChecksumAlgorithm::Crc32c, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3]))