Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 248 additions & 6 deletions core/core/src/types/context/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ impl WriteGenerator<oio::Writer> {
// - write buffer + bs directly.
if !self.exact {
let fill_size = bs.len();
// Save old buffer before pushing bs, so we can restore only the old
// buffer on failure. The caller will retry with the same bs.
let old_buffer = self.buffer.clone();
self.buffer.push(bs);
let buf = self.buffer.take().collect();
self.w.write_dyn(buf).await?;
return Ok(fill_size);
match self.w.write_dyn(buf).await {
Ok(()) => return Ok(fill_size),
Err(err) => {
self.buffer = old_buffer;
return Err(err);
}
}
}

// Condition:
Expand All @@ -167,8 +175,12 @@ impl WriteGenerator<oio::Writer> {
// Action:
// - write existing buffer in chunk_size to make more rooms for writing data.
if self.buffer.len() >= chunk_size {
let buf = self.buffer.take().collect();
self.w.write_dyn(buf).await?;
let taken = self.buffer.take();
let buf = taken.clone().collect();
if let Err(err) = self.w.write_dyn(buf).await {
self.buffer = taken;
return Err(err);
}
}

// Condition
Expand All @@ -190,8 +202,12 @@ impl WriteGenerator<oio::Writer> {
break;
}

let buf = self.buffer.take().collect();
self.w.write_dyn(buf).await?;
let taken = self.buffer.take();
let buf = taken.clone().collect();
if let Err(err) = self.w.write_dyn(buf).await {
self.buffer = taken;
return Err(err);
}
}

self.w.close().await
Expand Down Expand Up @@ -539,4 +555,230 @@ mod tests {

Ok(())
}

/// Simulates the scenario where RetryWrapper exhausts its retry budget:
///
/// ```text
/// Caller → WriteGenerator → RetryWrapper → InnerWriter
/// ```
///
/// RetryWrapper clones the buffer on each retry attempt. If all retries
/// are exhausted, it returns the error to WriteGenerator. WriteGenerator
/// must restore its internal buffer (using old_buffer) so the caller can
/// retry WriteGenerator::write() without losing previously buffered data.
///
/// 1. Write 5 bytes → buffered internally
/// 2. Write 10 bytes → triggers flush → inner writer fails (simulating
/// RetryWrapper exhaustion)
/// 3. WriteGenerator restores old buffer (5 bytes from step 1)
/// 4. Caller retries WriteGenerator::write(10 bytes) with the SAME data
/// 5. This time flush succeeds → both chunks (5+10=15 bytes) are written
///
/// Without the fix: step 3 would leave WriteGenerator's buffer empty,
/// and only the 10 bytes from the retry would be written (5 bytes lost).
#[tokio::test]
async fn test_inexact_write_failure_with_retry_style_retry() -> Result<()> {
let buf = Arc::new(Mutex::new(vec![]));
let fail_count = Arc::new(Mutex::new(1usize));

let mut writer = WriteGenerator::new(
Box::new(FailOnceMockWriter {
buf: buf.clone(),
fail_count: fail_count.clone(),
}),
Some(10),
false, // inexact mode
);

// Step 1: Write 5 bytes (buffered, below chunk_size=10)
let data1 = Bytes::from(vec![1u8; 5]);
let n = writer.write(data1.into()).await?;
assert_eq!(n, 5);

// Step 2: Write 10 bytes — triggers flush, fails once.
// This simulates RetryWrapper exhausting retries and returning error.
let data2 = Bytes::from(vec![2u8; 10]);
let err = writer.write(data2.clone().into()).await;
assert!(err.is_err(), "first flush should fail");

// Step 3: Caller retries with the SAME data2.
// Without the fix, data1 (5 bytes from step 1) would be lost.
// With the fix, data1 is preserved and both are written.
let n = writer.write(data2.into()).await?;
assert_eq!(n, 10);

writer.close().await?;

// Verify all 15 bytes were written correctly — no data loss
let written = buf.lock().await;
assert_eq!(written.len(), 15);
assert_eq!(&written[..5], &[1u8; 5]);
assert_eq!(&written[5..], &[2u8; 10]);

Ok(())
}

/// A mock writer that fails the first N write calls, then succeeds.
/// Used to test that WriteGenerator retains buffered data on write failure.
struct FailOnceMockWriter {
buf: Arc<Mutex<Vec<u8>>>,
fail_count: Arc<Mutex<usize>>,
}

impl Write for FailOnceMockWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let mut fail_count = self.fail_count.lock().await;
if *fail_count > 0 {
*fail_count -= 1;
return Err(
Error::new(ErrorKind::Unexpected, "write failed (simulated)").set_temporary(),
);
}
drop(fail_count);

let mut buf = self.buf.lock().await;
buf.put(bs);
Ok(())
}

async fn close(&mut self) -> Result<Metadata> {
Ok(Metadata::default())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}
}

/// Test that in inexact mode, a write failure retains only the old buffered data
/// (not the current bs), so the caller can safely retry without data duplication.
#[tokio::test]
async fn test_inexact_write_failure_retains_buffer() -> Result<()> {
setup();

let buf = Arc::new(Mutex::new(vec![]));
let fail_count = Arc::new(Mutex::new(1usize));
let mut writer = WriteGenerator::new(
Box::new(FailOnceMockWriter {
buf: buf.clone(),
fail_count: fail_count.clone(),
}),
Some(10),
false, // inexact mode
);

// Write 5 bytes (buffered, below chunk_size=10)
let data1 = Bytes::from(vec![1u8, 2, 3, 4, 5]);
let n = writer.write(data1.into()).await?;
assert_eq!(n, 5);

// Write 10 bytes — buffer (5) + new (10) = 15 >= chunk_size, triggers flush.
// First flush will fail.
let data2 = Bytes::from(vec![6u8, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
let err = writer.write(data2.clone().into()).await;
assert!(err.is_err(), "first flush should fail");

// On failure, only old buffer (data1) is retained; data2 is NOT absorbed.
// Caller retries with the same data2 — now the mock writer succeeds.
let n = writer.write(data2.into()).await?;
assert_eq!(n, 10);

writer.close().await?;

// Verify no data was lost and no data was duplicated: exactly 15 bytes.
let buf = buf.lock().await;
assert_eq!(buf.len(), 15);
assert_eq!(
&*buf,
&[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
);

Ok(())
}

/// Test that in exact mode, a write failure retains the buffered data
/// so that a subsequent write/close succeeds without data loss.
#[tokio::test]
async fn test_exact_write_failure_retains_buffer() -> Result<()> {
setup();

let buf = Arc::new(Mutex::new(vec![]));
let fail_count = Arc::new(Mutex::new(1usize));
let mut writer = WriteGenerator::new(
Box::new(FailOnceMockWriter {
buf: buf.clone(),
fail_count: fail_count.clone(),
}),
Some(10),
true, // exact mode
);

// Fill buffer to exactly chunk_size
let data1 = Bytes::from(vec![1u8; 10]);
let mut remaining = data1.clone();
while !remaining.is_empty() {
let n = writer.write(remaining.clone().into()).await?;
remaining.advance(n);
}

// Write more data — buffer is full (10 bytes), so it flushes first.
// The first flush will fail.
let data2 = Bytes::from(vec![2u8; 5]);
let err = writer.write(data2.clone().into()).await;
assert!(err.is_err(), "first flush should fail");

// Retry — now succeeds. The buffer should still have the original 10 bytes.
// On retry, the flush succeeds and clears the buffer, then data2 is buffered.
let mut remaining = data2;
while !remaining.is_empty() {
let n = writer.write(remaining.clone().into()).await?;
remaining.advance(n);
}

writer.close().await?;

// All 15 bytes should be present.
let buf = buf.lock().await;
assert_eq!(buf.len(), 15);
assert_eq!(&buf[..10], &[1u8; 10]);
assert_eq!(&buf[10..], &[2u8; 5]);

Ok(())
}

/// Test that close() retains buffered data when the underlying write fails,
/// and succeeds on retry.
#[tokio::test]
async fn test_close_failure_retains_buffer() -> Result<()> {
setup();

let buf = Arc::new(Mutex::new(vec![]));
let fail_count = Arc::new(Mutex::new(1usize));
let mut writer = WriteGenerator::new(
Box::new(FailOnceMockWriter {
buf: buf.clone(),
fail_count: fail_count.clone(),
}),
Some(10),
false,
);

// Write 5 bytes (buffered, below chunk_size)
let data = Bytes::from(vec![42u8; 5]);
let n = writer.write(data.into()).await?;
assert_eq!(n, 5);

// First close attempt fails during flush
let err = writer.close().await;
assert!(err.is_err(), "first close should fail");

// Second close attempt succeeds — buffer was retained
writer.close().await?;

let buf = buf.lock().await;
assert_eq!(buf.len(), 5);
assert_eq!(&*buf, &[42u8; 5]);

Ok(())
}
}
Loading