diff --git a/core/core/src/layers/correctness_check.rs b/core/core/src/layers/correctness_check.rs index 5db06b7d81b3..31c7f5f76b1d 100644 --- a/core/core/src/layers/correctness_check.rs +++ b/core/core/src/layers/correctness_check.rs @@ -240,6 +240,38 @@ impl LayeredAccess for CorrectnessAccessor { "if_match", )); } + if args.source_if_match().is_some() && !capability.copy_with_source_if_match { + return Err(new_unsupported_error( + &self.info, + Operation::Copy, + "source_if_match", + )); + } + if args.source_if_none_match().is_some() && !capability.copy_with_source_if_none_match { + return Err(new_unsupported_error( + &self.info, + Operation::Copy, + "source_if_none_match", + )); + } + if args.source_if_modified_since().is_some() + && !capability.copy_with_source_if_modified_since + { + return Err(new_unsupported_error( + &self.info, + Operation::Copy, + "source_if_modified_since", + )); + } + if args.source_if_unmodified_since().is_some() + && !capability.copy_with_source_if_unmodified_since + { + return Err(new_unsupported_error( + &self.info, + Operation::Copy, + "source_if_unmodified_since", + )); + } if args.source_version().is_some() && !capability.copy_with_source_version { return Err(new_unsupported_error( &self.info, diff --git a/core/core/src/raw/ops.rs b/core/core/src/raw/ops.rs index aa5abc9468c8..7e6c2264e895 100644 --- a/core/core/src/raw/ops.rs +++ b/core/core/src/raw/ops.rs @@ -877,6 +877,10 @@ impl From for (OpWrite, OpWriter) { pub struct OpCopy { if_not_exists: bool, if_match: Option, + source_if_match: Option, + source_if_none_match: Option, + source_if_modified_since: Option, + source_if_unmodified_since: Option, source_version: Option, } @@ -914,6 +918,62 @@ impl OpCopy { self.if_match.as_deref() } + /// Set the source_if_match condition for the operation. + /// + /// When set, the copy operation will only proceed if the source object's + /// ETag matches the given value. + pub fn with_source_if_match(mut self, source_if_match: impl Into) -> Self { + self.source_if_match = Some(source_if_match.into()); + self + } + + /// Get source_if_match condition. + pub fn source_if_match(&self) -> Option<&str> { + self.source_if_match.as_deref() + } + + /// Set the source_if_none_match condition for the operation. + /// + /// When set, the copy operation will only proceed if the source object's + /// ETag does not match the given value. + pub fn with_source_if_none_match(mut self, source_if_none_match: impl Into) -> Self { + self.source_if_none_match = Some(source_if_none_match.into()); + self + } + + /// Get source_if_none_match condition. + pub fn source_if_none_match(&self) -> Option<&str> { + self.source_if_none_match.as_deref() + } + + /// Set the source_if_modified_since condition for the operation. + /// + /// When set, the copy operation will only proceed if the source object has + /// been modified after the given timestamp. + pub fn with_source_if_modified_since(mut self, v: Timestamp) -> Self { + self.source_if_modified_since = Some(v); + self + } + + /// Get source_if_modified_since condition. + pub fn source_if_modified_since(&self) -> Option { + self.source_if_modified_since + } + + /// Set the source_if_unmodified_since condition for the operation. + /// + /// When set, the copy operation will only proceed if the source object has + /// not been modified after the given timestamp. + pub fn with_source_if_unmodified_since(mut self, v: Timestamp) -> Self { + self.source_if_unmodified_since = Some(v); + self + } + + /// Get source_if_unmodified_since condition. + pub fn source_if_unmodified_since(&self) -> Option { + self.source_if_unmodified_since + } + /// Set source version for the operation. /// /// When set, the copy operation will copy from the specified source version. @@ -982,6 +1042,10 @@ impl From for (OpCopy, OpCopier) { OpCopy { if_not_exists: value.if_not_exists, if_match: value.if_match, + source_if_match: value.source_if_match, + source_if_none_match: value.source_if_none_match, + source_if_modified_since: value.source_if_modified_since, + source_if_unmodified_since: value.source_if_unmodified_since, source_version: value.source_version, }, OpCopier { diff --git a/core/core/src/types/capability.rs b/core/core/src/types/capability.rs index ac5148452f2c..3b548a6c6e13 100644 --- a/core/core/src/types/capability.rs +++ b/core/core/src/types/capability.rs @@ -158,6 +158,14 @@ pub struct Capability { pub copy_with_if_not_exists: bool, /// Indicates if conditional copy operations with if-match are supported. pub copy_with_if_match: bool, + /// Indicates if conditional copy operations with source-side if-match are supported. + pub copy_with_source_if_match: bool, + /// Indicates if conditional copy operations with source-side if-none-match are supported. + pub copy_with_source_if_none_match: bool, + /// Indicates if conditional copy operations with source-side if-modified-since are supported. + pub copy_with_source_if_modified_since: bool, + /// Indicates if conditional copy operations with source-side if-unmodified-since are supported. + pub copy_with_source_if_unmodified_since: bool, /// Indicates if copy operations from a specific source version are supported. pub copy_with_source_version: bool, /// Indicates if copy operations can be split into multiple server-side tasks. diff --git a/core/core/src/types/operator/operator_futures.rs b/core/core/src/types/operator/operator_futures.rs index 6392366a45d9..953ea67bbfd8 100644 --- a/core/core/src/types/operator/operator_futures.rs +++ b/core/core/src/types/operator/operator_futures.rs @@ -1446,6 +1446,42 @@ impl>> FutureCopy { self } + /// Sets the condition that copy operation will succeed only if the source + /// object currently has the given ETag. + /// + /// Refer to [`options::CopyOptions::source_if_match`] for more details. + pub fn source_if_match(mut self, etag: &str) -> Self { + self.args.0.source_if_match = Some(etag.to_string()); + self + } + + /// Sets the condition that copy operation will succeed only if the source + /// object's ETag does not match the given value. + /// + /// Refer to [`options::CopyOptions::source_if_none_match`] for more details. + pub fn source_if_none_match(mut self, etag: &str) -> Self { + self.args.0.source_if_none_match = Some(etag.to_string()); + self + } + + /// Sets the condition that copy operation will succeed only if the source + /// object has been modified after the given timestamp. + /// + /// Refer to [`options::CopyOptions::source_if_modified_since`] for more details. + pub fn source_if_modified_since(mut self, v: Timestamp) -> Self { + self.args.0.source_if_modified_since = Some(v); + self + } + + /// Sets the condition that copy operation will succeed only if the source + /// object has not been modified after the given timestamp. + /// + /// Refer to [`options::CopyOptions::source_if_unmodified_since`] for more details. + pub fn source_if_unmodified_since(mut self, v: Timestamp) -> Self { + self.args.0.source_if_unmodified_since = Some(v); + self + } + /// Sets source version for this copy operation. /// /// Refer to [`options::CopyOptions::source_version`] for more details. diff --git a/core/core/src/types/options.rs b/core/core/src/types/options.rs index 1435c37f0b08..b8a2edac9cfc 100644 --- a/core/core/src/types/options.rs +++ b/core/core/src/types/options.rs @@ -560,6 +560,61 @@ pub struct CopyOptions { /// destination object's ETag matches the given value. pub if_match: Option, + /// Sets the condition that copy operation will succeed only if the source + /// object currently has the given ETag. + /// + /// ### Capability + /// + /// Check [`Capability::copy_with_source_if_match`] before using this feature. + /// + /// ### Behavior + /// + /// - If supported, the copy operation will only succeed when the source + /// object's ETag matches the given value. + pub source_if_match: Option, + + /// Sets the condition that copy operation will succeed only if the source + /// object's ETag does not match the given value. + /// + /// ### Capability + /// + /// Check [`Capability::copy_with_source_if_none_match`] before using this + /// feature. + /// + /// ### Behavior + /// + /// - If supported, the copy operation will only succeed when the source + /// object's ETag does not match the given value. + pub source_if_none_match: Option, + + /// Sets the condition that copy operation will succeed only if the source + /// object has been modified after the given timestamp. + /// + /// ### Capability + /// + /// Check [`Capability::copy_with_source_if_modified_since`] before using + /// this feature. + /// + /// ### Behavior + /// + /// - If supported, the copy operation will only succeed when the source + /// object has been modified after the given timestamp. + pub source_if_modified_since: Option, + + /// Sets the condition that copy operation will succeed only if the source + /// object has not been modified after the given timestamp. + /// + /// ### Capability + /// + /// Check [`Capability::copy_with_source_if_unmodified_since`] before using + /// this feature. + /// + /// ### Behavior + /// + /// - If supported, the copy operation will only succeed when the source + /// object has not been modified after the given timestamp. + pub source_if_unmodified_since: Option, + /// Copy from a specific source object version. /// /// ### Capability diff --git a/core/layers/capability-check/src/lib.rs b/core/layers/capability-check/src/lib.rs index 088390d93997..5237bd6aeac7 100644 --- a/core/layers/capability-check/src/lib.rs +++ b/core/layers/capability-check/src/lib.rs @@ -169,6 +169,38 @@ impl LayeredAccess for CapabilityAccessor { "if_match", )); } + if args.source_if_match().is_some() && !capability.copy_with_source_if_match { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::Copy, + "source_if_match", + )); + } + if args.source_if_none_match().is_some() && !capability.copy_with_source_if_none_match { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::Copy, + "source_if_none_match", + )); + } + if args.source_if_modified_since().is_some() + && !capability.copy_with_source_if_modified_since + { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::Copy, + "source_if_modified_since", + )); + } + if args.source_if_unmodified_since().is_some() + && !capability.copy_with_source_if_unmodified_since + { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::Copy, + "source_if_unmodified_since", + )); + } if args.source_version().is_some() && !capability.copy_with_source_version { return Err(new_unsupported_error( self.info.as_ref(), diff --git a/core/services/s3/src/backend.rs b/core/services/s3/src/backend.rs index dc29f6284fcf..d63ff109f53f 100644 --- a/core/services/s3/src/backend.rs +++ b/core/services/s3/src/backend.rs @@ -983,6 +983,10 @@ impl Builder for S3Builder { copy_can_multi: true, copy_with_if_not_exists: true, copy_with_if_match: true, + copy_with_source_if_match: true, + copy_with_source_if_none_match: true, + copy_with_source_if_modified_since: true, + copy_with_source_if_unmodified_since: true, copy_with_source_version: true, // The min multipart size of S3 is 5 MiB. // diff --git a/core/services/s3/src/core.rs b/core/services/s3/src/core.rs index c08df46b6df6..f078aba69f4c 100644 --- a/core/services/s3/src/core.rs +++ b/core/services/s3/src/core.rs @@ -51,6 +51,10 @@ use opendal_core::*; pub mod constants { pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source"; pub const X_AMZ_COPY_SOURCE_RANGE: &str = "x-amz-copy-source-range"; + pub const X_AMZ_COPY_SOURCE_IF_MATCH: &str = "x-amz-copy-source-if-match"; + pub const X_AMZ_COPY_SOURCE_IF_NONE_MATCH: &str = "x-amz-copy-source-if-none-match"; + pub const X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE: &str = "x-amz-copy-source-if-modified-since"; + pub const X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE: &str = "x-amz-copy-source-if-unmodified-since"; pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption"; pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester"); @@ -685,6 +689,27 @@ impl S3Core { if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } + if let Some(source_if_match) = args.source_if_match() { + req = req.header(constants::X_AMZ_COPY_SOURCE_IF_MATCH, source_if_match); + } + if let Some(source_if_none_match) = args.source_if_none_match() { + req = req.header( + constants::X_AMZ_COPY_SOURCE_IF_NONE_MATCH, + source_if_none_match, + ); + } + if let Some(v) = args.source_if_modified_since() { + req = req.header( + constants::X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE, + v.format_http_date(), + ); + } + if let Some(v) = args.source_if_unmodified_since() { + req = req.header( + constants::X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE, + v.format_http_date(), + ); + } // Set SSE headers. req = self.insert_sse_headers(req, true); @@ -1145,6 +1170,27 @@ impl S3Core { if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } + if let Some(source_if_match) = args.source_if_match() { + req = req.header(constants::X_AMZ_COPY_SOURCE_IF_MATCH, source_if_match); + } + if let Some(source_if_none_match) = args.source_if_none_match() { + req = req.header( + constants::X_AMZ_COPY_SOURCE_IF_NONE_MATCH, + source_if_none_match, + ); + } + if let Some(v) = args.source_if_modified_since() { + req = req.header( + constants::X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE, + v.format_http_date(), + ); + } + if let Some(v) = args.source_if_unmodified_since() { + req = req.header( + constants::X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE, + v.format_http_date(), + ); + } // Set request payer header if enabled. req = self.insert_request_payer_header(req); diff --git a/core/tests/behavior/async_copy.rs b/core/tests/behavior/async_copy.rs index 8d359c6f07ec..96855448aa0a 100644 --- a/core/tests/behavior/async_copy.rs +++ b/core/tests/behavior/async_copy.rs @@ -65,6 +65,22 @@ pub fn tests(op: &Operator, tests: &mut Vec) { )) } + if cap.read && cap.write && cap.copy && cap.copy_with_source_if_match { + tests.extend(async_trials!( + op, + test_copy_with_source_if_match_match, + test_copy_with_source_if_match_mismatch + )) + } + + if cap.read && cap.write && cap.copy && cap.copy_with_source_if_none_match { + tests.extend(async_trials!( + op, + test_copy_with_source_if_none_match_mismatch, + test_copy_with_source_if_none_match_match + )) + } + if cap.read && cap.write && cap.stat && cap.copy && cap.copy_with_source_version { tests.extend(async_trials!( op, @@ -442,6 +458,134 @@ pub async fn test_copy_with_if_match_mismatch(op: Operator) -> Result<()> { Ok(()) } +/// Copy with source_if_match matching the source ETag should succeed. +pub async fn test_copy_with_source_if_match_match(op: Operator) -> Result<()> { + if !op.info().full_capability().copy_with_source_if_match { + return Ok(()); + } + + let source_path = uuid::Uuid::new_v4().to_string(); + let (source_content, _) = gen_bytes(op.info().full_capability()); + op.write(&source_path, source_content.clone()).await?; + + let Some(etag) = op.stat(&source_path).await?.etag().map(|s| s.to_string()) else { + op.delete(&source_path).await.expect("delete must succeed"); + return Ok(()); + }; + + let target_path = uuid::Uuid::new_v4().to_string(); + + op.copy_with(&source_path, &target_path) + .source_if_match(&etag) + .await?; + + let target_content = op + .read(&target_path) + .await + .expect("read must succeed") + .to_bytes(); + assert_eq!( + sha256_digest(target_content), + sha256_digest(&source_content), + ); + + op.delete(&source_path).await.expect("delete must succeed"); + op.delete(&target_path).await.expect("delete must succeed"); + Ok(()) +} + +/// Copy with source_if_match not matching should fail with ConditionNotMatch. +pub async fn test_copy_with_source_if_match_mismatch(op: Operator) -> Result<()> { + if !op.info().full_capability().copy_with_source_if_match { + return Ok(()); + } + + let source_path = uuid::Uuid::new_v4().to_string(); + let (source_content, _) = gen_bytes(op.info().full_capability()); + op.write(&source_path, source_content.clone()).await?; + + let target_path = uuid::Uuid::new_v4().to_string(); + + let err = op + .copy_with(&source_path, &target_path) + .source_if_match("\"00000000000000000000000000000000\"") + .await + .expect_err("copy must fail"); + assert_eq!(err.kind(), ErrorKind::ConditionNotMatch); + + assert!( + !op.exists(&target_path).await.expect("exists must succeed"), + "target must not be created on mismatch" + ); + + op.delete(&source_path).await.expect("delete must succeed"); + Ok(()) +} + +/// Copy with source_if_none_match not matching the source ETag should succeed. +pub async fn test_copy_with_source_if_none_match_mismatch(op: Operator) -> Result<()> { + if !op.info().full_capability().copy_with_source_if_none_match { + return Ok(()); + } + + let source_path = uuid::Uuid::new_v4().to_string(); + let (source_content, _) = gen_bytes(op.info().full_capability()); + op.write(&source_path, source_content.clone()).await?; + + let target_path = uuid::Uuid::new_v4().to_string(); + + op.copy_with(&source_path, &target_path) + .source_if_none_match("\"00000000000000000000000000000000\"") + .await?; + + let target_content = op + .read(&target_path) + .await + .expect("read must succeed") + .to_bytes(); + assert_eq!( + sha256_digest(target_content), + sha256_digest(&source_content), + ); + + op.delete(&source_path).await.expect("delete must succeed"); + op.delete(&target_path).await.expect("delete must succeed"); + Ok(()) +} + +/// Copy with source_if_none_match matching the source ETag should fail. +pub async fn test_copy_with_source_if_none_match_match(op: Operator) -> Result<()> { + if !op.info().full_capability().copy_with_source_if_none_match { + return Ok(()); + } + + let source_path = uuid::Uuid::new_v4().to_string(); + let (source_content, _) = gen_bytes(op.info().full_capability()); + op.write(&source_path, source_content.clone()).await?; + + let Some(etag) = op.stat(&source_path).await?.etag().map(|s| s.to_string()) else { + op.delete(&source_path).await.expect("delete must succeed"); + return Ok(()); + }; + + let target_path = uuid::Uuid::new_v4().to_string(); + + let err = op + .copy_with(&source_path, &target_path) + .source_if_none_match(&etag) + .await + .expect_err("copy must fail"); + assert_eq!(err.kind(), ErrorKind::ConditionNotMatch); + + assert!( + !op.exists(&target_path).await.expect("exists must succeed"), + "target must not be created on mismatch" + ); + + op.delete(&source_path).await.expect("delete must succeed"); + Ok(()) +} + /// Copy with source_version should copy a specific source version to a new file. pub async fn test_copy_with_source_version_to_new_file(op: Operator) -> Result<()> { if !op.info().full_capability().copy_with_source_version {