Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions crates/forge_services/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ impl<F: 'static + WorkspaceIndexRepository + FileReaderInfra, D: FileDiscovery +
}
}

// Pass 2: upload files — each file's content is read on-demand
// immediately before upload so only one batch occupies memory at a time.
let mut upload_stream = self.upload_files(sync_paths.upload);
// Pass 2: upload files — files are grouped into batches of `batch_size`
// and each batch is sent in a single HTTP request, sequentially.
let mut upload_stream = Box::pin(self.upload_files(sync_paths.upload));

// Process uploads as they complete, updating progress incrementally
while let Some(result) = upload_stream.next().await {
Expand Down Expand Up @@ -262,13 +262,15 @@ impl<F: 'static + WorkspaceIndexRepository + FileReaderInfra, D: FileDiscovery +
Ok(files_to_delete.len())
}

/// Uploads files in parallel, reading their content on-demand to keep
/// memory bounded to a single batch at a time.
/// Uploads files in batches, sending one HTTP request per batch of
/// `batch_size` files.
///
/// Each path is read from disk immediately before its upload, so the peak
/// memory footprint is `batch_size × avg_file_size` rather than the size
/// of the entire upload set. The caller is responsible for processing the
/// stream and tracking progress.
/// Files within each batch are read from disk, collected into a single
/// [`forge_domain::FileUpload`] payload, and uploaded in one request.
/// Batches are processed sequentially — only one HTTP request is in-flight
/// at a time — which keeps both memory usage and server concurrency
/// bounded. The stream yields the number of files successfully uploaded
/// per batch.
fn upload_files(
&self,
paths: Vec<PathBuf>,
Expand All @@ -279,37 +281,42 @@ impl<F: 'static + WorkspaceIndexRepository + FileReaderInfra, D: FileDiscovery +
let infra = self.infra.clone();
let batch_size = self.batch_size;

futures::stream::iter(paths)
.map(move |file_path| {
let user_id = user_id.clone();
let workspace_id = workspace_id.clone();
let token = token.clone();
let infra = infra.clone();
async move {
futures::stream::iter(paths).chunks(batch_size).then(move |batch| {
let user_id = user_id.clone();
let workspace_id = workspace_id.clone();
let token = token.clone();
let infra = infra.clone();
async move {
let mut files = Vec::with_capacity(batch.len());
for file_path in &batch {
info!(workspace_id = %workspace_id, path = %file_path.display(), "File sync started");
// Read content on-demand — keeps only one batch in memory at a time
let content = infra
.read_utf8(&file_path)
.read_utf8(file_path)
.await
.with_context(|| {
format!("Failed to read file '{}' for upload", file_path.display())
})?;
let path_str = file_path.to_string_lossy().into_owned();
let file = forge_domain::FileRead::new(path_str, content);
let upload = forge_domain::CodeBase::new(
user_id.clone(),
workspace_id.clone(),
vec![file],
);
infra
.upload_files(&upload, &token)
.await
.context("Failed to upload files")?;
files.push(forge_domain::FileRead::new(
file_path.to_string_lossy().into_owned(),
content,
));
}
let count = files.len();
let upload = forge_domain::CodeBase::new(
user_id.clone(),
workspace_id.clone(),
files,
);
infra
.upload_files(&upload, &token)
.await
.context("Failed to upload files")?;
for file_path in &batch {
info!(workspace_id = %workspace_id, path = %file_path.display(), "File sync completed");
Ok::<_, anyhow::Error>(1)
}
})
.buffer_unordered(batch_size)
Ok::<_, anyhow::Error>(count)
}
})
}

/// Discovers workspace files and streams their hashes without retaining
Expand Down
Loading