From 5d8c313b2c592bdb1bb84594316bf8316fd4bcf9 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Fri, 17 Apr 2026 15:36:30 +0800 Subject: [PATCH 1/3] fix(core): retain buffered data in WriteGenerator on write failure WriteGenerator's write() and close() methods called buffer.take() before attempting to write to the underlying writer. If the write failed, the buffered data was lost, making retry impossible without data loss. Fix by cloning the taken buffer before writing, and restoring it on failure. This matches the invariant maintained by other writer implementations (AppendWriter, BlockWriter, MultipartWriter, PositionWriter). Co-Authored-By: Claude Opus 4.6 --- core/core/src/types/context/write.rs | 189 ++++++++++++++++++++++++++- 1 file changed, 182 insertions(+), 7 deletions(-) diff --git a/core/core/src/types/context/write.rs b/core/core/src/types/context/write.rs index 6e5ce5ee2375..a1102f176d4f 100644 --- a/core/core/src/types/context/write.rs +++ b/core/core/src/types/context/write.rs @@ -155,9 +155,15 @@ impl WriteGenerator { if !self.exact { let fill_size = bs.len(); self.buffer.push(bs); - let buf = self.buffer.take().collect(); - self.w.write_dyn(buf).await?; - return Ok(fill_size); + let taken = self.buffer.take(); + let buf = taken.clone().collect(); + match self.w.write_dyn(buf).await { + Ok(()) => return Ok(fill_size), + Err(err) => { + self.buffer = taken; + return Err(err); + } + } } // Condition: @@ -167,8 +173,12 @@ impl WriteGenerator { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let buf = self.buffer.take().collect(); - self.w.write_dyn(buf).await?; + let taken = self.buffer.take(); + let buf = taken.clone().collect(); + if let Err(err) = self.w.write_dyn(buf).await { + self.buffer = taken; + return Err(err); + } } // Condition @@ -190,8 +200,12 @@ impl WriteGenerator { break; } - let buf = self.buffer.take().collect(); - self.w.write_dyn(buf).await?; + let taken = self.buffer.take(); + let buf = taken.clone().collect(); + if let Err(err) = self.w.write_dyn(buf).await { + self.buffer = taken; + return Err(err); + } } self.w.close().await @@ -539,4 +553,165 @@ mod tests { Ok(()) } + + /// A mock writer that fails the first N write calls, then succeeds. + /// Used to test that WriteGenerator retains buffered data on write failure. + struct FailOnceMockWriter { + buf: Arc>>, + fail_count: Arc>, + } + + impl Write for FailOnceMockWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + let mut fail_count = self.fail_count.lock().await; + if *fail_count > 0 { + *fail_count -= 1; + return Err( + Error::new(ErrorKind::Unexpected, "write failed (simulated)").set_temporary(), + ); + } + drop(fail_count); + + let mut buf = self.buf.lock().await; + buf.put(bs); + Ok(()) + } + + async fn close(&mut self) -> Result { + Ok(Metadata::default()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + } + + /// Test that in inexact mode, a write failure retains the buffered data + /// so that a subsequent write/close succeeds without data loss. + #[tokio::test] + async fn test_inexact_write_failure_retains_buffer() -> Result<()> { + setup(); + + let buf = Arc::new(Mutex::new(vec![])); + let fail_count = Arc::new(Mutex::new(1usize)); + let mut writer = WriteGenerator::new( + Box::new(FailOnceMockWriter { + buf: buf.clone(), + fail_count: fail_count.clone(), + }), + Some(10), + false, // inexact mode + ); + + // Write 5 bytes (buffered, below chunk_size=10) + let data1 = Bytes::from(vec![1u8, 2, 3, 4, 5]); + let n = writer.write(data1.into()).await?; + assert_eq!(n, 5); + + // Write 10 bytes — buffer (5) + new (10) = 15 >= chunk_size, triggers flush. + // First flush will fail. + let data2 = Bytes::from(vec![6u8, 7, 8, 9, 10, 11, 12, 13, 14, 15]); + let err = writer.write(data2.clone().into()).await; + assert!(err.is_err(), "first flush should fail"); + + // The failed write already pushed data2 into the buffer and restored it on failure. + // So the buffer now has 15 bytes (data1 + data2). Just close to flush. + writer.close().await?; + + // Verify no data was lost: all 15 bytes should be present. + let buf = buf.lock().await; + assert_eq!(buf.len(), 15); + assert_eq!( + &*buf, + &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + ); + + Ok(()) + } + + /// Test that in exact mode, a write failure retains the buffered data + /// so that a subsequent write/close succeeds without data loss. + #[tokio::test] + async fn test_exact_write_failure_retains_buffer() -> Result<()> { + setup(); + + let buf = Arc::new(Mutex::new(vec![])); + let fail_count = Arc::new(Mutex::new(1usize)); + let mut writer = WriteGenerator::new( + Box::new(FailOnceMockWriter { + buf: buf.clone(), + fail_count: fail_count.clone(), + }), + Some(10), + true, // exact mode + ); + + // Fill buffer to exactly chunk_size + let data1 = Bytes::from(vec![1u8; 10]); + let mut remaining = data1.clone(); + while !remaining.is_empty() { + let n = writer.write(remaining.clone().into()).await?; + remaining.advance(n); + } + + // Write more data — buffer is full (10 bytes), so it flushes first. + // The first flush will fail. + let data2 = Bytes::from(vec![2u8; 5]); + let err = writer.write(data2.clone().into()).await; + assert!(err.is_err(), "first flush should fail"); + + // Retry — now succeeds. The buffer should still have the original 10 bytes. + // On retry, the flush succeeds and clears the buffer, then data2 is buffered. + let mut remaining = data2; + while !remaining.is_empty() { + let n = writer.write(remaining.clone().into()).await?; + remaining.advance(n); + } + + writer.close().await?; + + // All 15 bytes should be present. + let buf = buf.lock().await; + assert_eq!(buf.len(), 15); + assert_eq!(&buf[..10], &[1u8; 10]); + assert_eq!(&buf[10..], &[2u8; 5]); + + Ok(()) + } + + /// Test that close() retains buffered data when the underlying write fails, + /// and succeeds on retry. + #[tokio::test] + async fn test_close_failure_retains_buffer() -> Result<()> { + setup(); + + let buf = Arc::new(Mutex::new(vec![])); + let fail_count = Arc::new(Mutex::new(1usize)); + let mut writer = WriteGenerator::new( + Box::new(FailOnceMockWriter { + buf: buf.clone(), + fail_count: fail_count.clone(), + }), + Some(10), + false, + ); + + // Write 5 bytes (buffered, below chunk_size) + let data = Bytes::from(vec![42u8; 5]); + let n = writer.write(data.into()).await?; + assert_eq!(n, 5); + + // First close attempt fails during flush + let err = writer.close().await; + assert!(err.is_err(), "first close should fail"); + + // Second close attempt succeeds — buffer was retained + writer.close().await?; + + let buf = buf.lock().await; + assert_eq!(buf.len(), 5); + assert_eq!(&*buf, &[42u8; 5]); + + Ok(()) + } } From 1c1053284b30331e34fece8159496e6bfaf763ab Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Fri, 17 Apr 2026 15:42:53 +0800 Subject: [PATCH 2/3] fix: restore only old buffer on inexact write failure to prevent data duplication On write failure in inexact mode, only restore the previously buffered data (not the current bs). The caller will retry with the same bs, so including it in the restored buffer would cause data duplication. Co-Authored-By: Claude Opus 4.6 --- core/core/src/types/context/write.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/core/src/types/context/write.rs b/core/core/src/types/context/write.rs index a1102f176d4f..4f8561fac0fc 100644 --- a/core/core/src/types/context/write.rs +++ b/core/core/src/types/context/write.rs @@ -154,13 +154,15 @@ impl WriteGenerator { // - write buffer + bs directly. if !self.exact { let fill_size = bs.len(); + // Save old buffer before pushing bs, so we can restore only the old + // buffer on failure. The caller will retry with the same bs. + let old_buffer = self.buffer.clone(); self.buffer.push(bs); - let taken = self.buffer.take(); - let buf = taken.clone().collect(); + let buf = self.buffer.take().collect(); match self.w.write_dyn(buf).await { Ok(()) => return Ok(fill_size), Err(err) => { - self.buffer = taken; + self.buffer = old_buffer; return Err(err); } } @@ -586,8 +588,8 @@ mod tests { } } - /// Test that in inexact mode, a write failure retains the buffered data - /// so that a subsequent write/close succeeds without data loss. + /// Test that in inexact mode, a write failure retains only the old buffered data + /// (not the current bs), so the caller can safely retry without data duplication. #[tokio::test] async fn test_inexact_write_failure_retains_buffer() -> Result<()> { setup(); @@ -614,11 +616,14 @@ mod tests { let err = writer.write(data2.clone().into()).await; assert!(err.is_err(), "first flush should fail"); - // The failed write already pushed data2 into the buffer and restored it on failure. - // So the buffer now has 15 bytes (data1 + data2). Just close to flush. + // On failure, only old buffer (data1) is retained; data2 is NOT absorbed. + // Caller retries with the same data2 — now the mock writer succeeds. + let n = writer.write(data2.into()).await?; + assert_eq!(n, 10); + writer.close().await?; - // Verify no data was lost: all 15 bytes should be present. + // Verify no data was lost and no data was duplicated: exactly 15 bytes. let buf = buf.lock().await; assert_eq!(buf.len(), 15); assert_eq!( From 3231588bbbc9399edf5dce435644773c21921229 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Sat, 6 Jun 2026 05:10:03 +0800 Subject: [PATCH 3/3] test: add test demonstrating RetryLayer-style retry preserves buffered data --- core/core/src/types/context/write.rs | 62 ++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/core/core/src/types/context/write.rs b/core/core/src/types/context/write.rs index 4f8561fac0fc..8ef79c1ecc2b 100644 --- a/core/core/src/types/context/write.rs +++ b/core/core/src/types/context/write.rs @@ -556,6 +556,68 @@ mod tests { Ok(()) } + /// Simulates the scenario where RetryWrapper exhausts its retry budget: + /// + /// ```text + /// Caller → WriteGenerator → RetryWrapper → InnerWriter + /// ``` + /// + /// RetryWrapper clones the buffer on each retry attempt. If all retries + /// are exhausted, it returns the error to WriteGenerator. WriteGenerator + /// must restore its internal buffer (using old_buffer) so the caller can + /// retry WriteGenerator::write() without losing previously buffered data. + /// + /// 1. Write 5 bytes → buffered internally + /// 2. Write 10 bytes → triggers flush → inner writer fails (simulating + /// RetryWrapper exhaustion) + /// 3. WriteGenerator restores old buffer (5 bytes from step 1) + /// 4. Caller retries WriteGenerator::write(10 bytes) with the SAME data + /// 5. This time flush succeeds → both chunks (5+10=15 bytes) are written + /// + /// Without the fix: step 3 would leave WriteGenerator's buffer empty, + /// and only the 10 bytes from the retry would be written (5 bytes lost). + #[tokio::test] + async fn test_inexact_write_failure_with_retry_style_retry() -> Result<()> { + let buf = Arc::new(Mutex::new(vec![])); + let fail_count = Arc::new(Mutex::new(1usize)); + + let mut writer = WriteGenerator::new( + Box::new(FailOnceMockWriter { + buf: buf.clone(), + fail_count: fail_count.clone(), + }), + Some(10), + false, // inexact mode + ); + + // Step 1: Write 5 bytes (buffered, below chunk_size=10) + let data1 = Bytes::from(vec![1u8; 5]); + let n = writer.write(data1.into()).await?; + assert_eq!(n, 5); + + // Step 2: Write 10 bytes — triggers flush, fails once. + // This simulates RetryWrapper exhausting retries and returning error. + let data2 = Bytes::from(vec![2u8; 10]); + let err = writer.write(data2.clone().into()).await; + assert!(err.is_err(), "first flush should fail"); + + // Step 3: Caller retries with the SAME data2. + // Without the fix, data1 (5 bytes from step 1) would be lost. + // With the fix, data1 is preserved and both are written. + let n = writer.write(data2.into()).await?; + assert_eq!(n, 10); + + writer.close().await?; + + // Verify all 15 bytes were written correctly — no data loss + let written = buf.lock().await; + assert_eq!(written.len(), 15); + assert_eq!(&written[..5], &[1u8; 5]); + assert_eq!(&written[5..], &[2u8; 10]); + + Ok(()) + } + /// A mock writer that fails the first N write calls, then succeeds. /// Used to test that WriteGenerator retains buffered data on write failure. struct FailOnceMockWriter {