From ce71186fc3fa42d3dd3dcafca65c0514e4d3e1bf Mon Sep 17 00:00:00 2001 From: cheng Date: Thu, 28 May 2026 16:16:48 +0800 Subject: [PATCH 1/4] Optimize pack encode allocation and copy paths --- src/internal/pack/encode.rs | 74 ++++++++++++++++++++++++++++++++++--- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/src/internal/pack/encode.rs b/src/internal/pack/encode.rs index 78b5f66a..68c584e0 100644 --- a/src/internal/pack/encode.rs +++ b/src/internal/pack/encode.rs @@ -446,14 +446,15 @@ impl PackEncoder { let blob_res = blob_results?; let tag_res = tag_results?; + let total_entries = commit_res.len() + tree_res.len() + blob_res.len() + tag_res.len(); let mut all_res = vec![commit_res, tree_res, blob_res, tag_res]; - let mut idx_entries = Vec::new(); + let mut idx_entries = Vec::with_capacity(total_entries); for res in &mut all_res { - for data in res { + for mut data in res.drain(..) { data.1.offset = self.inner_offset as u64; self.write_all_and_update(&data.0).await; - idx_entries.push(data.1.clone()); + idx_entries.push(data.1); } } @@ -482,7 +483,7 @@ impl PackEncoder { ) -> Result, IndexEntry)>, GitError> { let mut current_offset = 0usize; let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size); - let mut res: Vec<(Vec, IndexEntry)> = Vec::new(); + let mut res: Vec<(Vec, IndexEntry)> = Vec::with_capacity(bucket.len()); //let mut idx_entries: Vec = Vec::new(); for entry in bucket.iter_mut() { @@ -584,8 +585,9 @@ impl PackEncoder { if window.len() > window_size { window.pop_front(); } - res.push((obj_data.clone(), IndexEntry::new(entry, 0))); - current_offset += obj_data.len(); + let obj_data_len = obj_data.len(); + res.push((obj_data, IndexEntry::new(entry, 0))); + current_offset += obj_data_len; } Ok(res) } @@ -835,6 +837,66 @@ mod tests { assert!(pack_with_delta.len() <= pack_without_delta_size); check_format(&pack_with_delta); } + + #[test] + fn test_try_as_offset_delta_keeps_one_result_per_input() { + let _guard = set_hash_kind_for_test(HashKind::Sha1); + let entries: Vec = [ + "alpha content", + "beta content", + "gamma content", + "delta content", + ] + .into_iter() + .map(|content| Blob::from_content(content).into()) + .collect(); + let expected_hashes: Vec = entries.iter().map(|entry| entry.hash).collect(); + + let results = PackEncoder::try_as_offset_delta(entries, 0, false) + .expect("offset delta encoding should succeed"); + + assert_eq!(results.len(), expected_hashes.len()); + for ((encoded, idx_entry), expected_hash) in results.iter().zip(expected_hashes) { + assert!(!encoded.is_empty(), "encoded object should not be empty"); + assert_eq!(idx_entry.hash, expected_hash); + } + } + + #[tokio::test] + async fn test_delta_window_encode_after_copy_optimization_roundtrips() { + let _guard = set_hash_kind_for_test(HashKind::Sha1); + let shared_prefix = "shared-prefix-".repeat(16); + let contents = vec![ + format!("{shared_prefix}alpha-tail"), + format!("{shared_prefix}beta-tail"), + format!("{shared_prefix}gamma-tail"), + format!("{shared_prefix}delta-tail"), + ]; + let (tx, mut rx) = mpsc::channel(16); + let (entry_tx, entry_rx) = mpsc::channel::>(16); + let encoder = PackEncoder::new(contents.len(), 4, tx); + encoder.encode_async(entry_rx).await.unwrap(); + + for content in contents { + let entry: Entry = Blob::from_content(&content).into(); + entry_tx + .send(MetaAttached { + inner: entry, + meta: EntryMeta::new(), + }) + .await + .unwrap(); + } + drop(entry_tx); + + let mut result = Vec::new(); + while let Some(chunk) = rx.recv().await { + result.extend(chunk); + } + + check_format(&result); + } + #[tokio::test] async fn test_pack_encoder_sha256() { let _guard = set_hash_kind_for_test(HashKind::Sha256); From ad496cc22884161f4ef9b240189941f62d5f1dd3 Mon Sep 17 00:00:00 2001 From: cheng Date: Fri, 29 May 2026 18:44:43 +0800 Subject: [PATCH 2/4] test: cover empty pack encode delta bucket --- src/internal/pack/encode.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/internal/pack/encode.rs b/src/internal/pack/encode.rs index 68c584e0..9633e1ed 100644 --- a/src/internal/pack/encode.rs +++ b/src/internal/pack/encode.rs @@ -862,6 +862,17 @@ mod tests { } } + #[test] + fn test_try_as_offset_delta_accepts_empty_bucket() { + let _guard = set_hash_kind_for_test(HashKind::Sha1); + let entries = Vec::new(); + + let results = PackEncoder::try_as_offset_delta(entries, 0, false) + .expect("empty bucket should encode successfully"); + + assert!(results.is_empty()); + } + #[tokio::test] async fn test_delta_window_encode_after_copy_optimization_roundtrips() { let _guard = set_hash_kind_for_test(HashKind::Sha1); From 670cad1065be02fb284da73e47804eea8b49a568 Mon Sep 17 00:00:00 2001 From: cheng Date: Fri, 29 May 2026 21:37:16 +0800 Subject: [PATCH 3/4] perf: eliminate to_vec() copy in pack encode owned write path Replace write_all_and_update(&[u8]) with write_owned_and_update(Vec) so encoded object bytes are moved directly into the output channel instead of being copied via to_vec(). This removes the last per-object full-buffer copy in the encode hot path. Also: - Move header and trailer bytes instead of cloning in both inner_encode and parallel_encode - Pre-allocate idx_entries in parallel_encode with Vec::with_capacity - Use tuple destructuring in write loops for clarity New test: test_parallel_encode_after_owned_write_roundtrips covers the parallel (window_size=0) path with owned write semantics. Co-Authored-By: Claude Opus 4.8 Signed-off-by: cheng --- src/internal/pack/encode.rs | 69 +++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/src/internal/pack/encode.rs b/src/internal/pack/encode.rs index 9633e1ed..73da32ed 100644 --- a/src/internal/pack/encode.rs +++ b/src/internal/pack/encode.rs @@ -199,7 +199,6 @@ fn encode_one_object(entry: &Entry, offset: Option) -> Result, Gi .expect("zlib compress should never failed"); inflate.flush().expect("zlib flush should never failed"); let compressed_data = inflate.finish().expect("zlib compress should never failed"); - // self.write_all_and_update(&compressed_data).await; encoded_data.extend(compressed_data); Ok(encoded_data) } @@ -348,8 +347,8 @@ impl PackEncoder { enable_zstdelta: bool, ) -> Result<(), GitError> { let head = encode_header(self.object_number); - self.send_data(head.clone()).await; self.inner_hash.update(&head); + self.send_data(head).await; // ensure only one decode can only invoke once if self.start_encoding { @@ -451,10 +450,10 @@ impl PackEncoder { let mut idx_entries = Vec::with_capacity(total_entries); for res in &mut all_res { - for mut data in res.drain(..) { - data.1.offset = self.inner_offset as u64; - self.write_all_and_update(&data.0).await; - idx_entries.push(data.1); + for (encoded_bytes, mut idx_entry) in res.drain(..) { + idx_entry.offset = self.inner_offset as u64; + self.write_owned_and_update(encoded_bytes).await; + idx_entries.push(idx_entry); } } @@ -463,7 +462,7 @@ impl PackEncoder { // Hash signature let hash_result = self.inner_hash.clone().finalize(); self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap()); - self.send_data(hash_result.to_vec()).await; + self.send_data(hash_result).await; self.drop_sender(); Ok(()) @@ -604,8 +603,8 @@ impl PackEncoder { } let head = encode_header(self.object_number); - self.send_data(head.clone()).await; self.inner_hash.update(&head); + self.send_data(head).await; // ensure only one decode can only invoke once if self.start_encoding { @@ -614,7 +613,7 @@ impl PackEncoder { )); } - let mut idx_entries = Vec::new(); + let mut idx_entries = Vec::with_capacity(self.object_number); let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized tracing::info!("encode with batch size: {}", batch_size); loop { @@ -655,10 +654,10 @@ impl PackEncoder { time_it!("parallel encode: write batch", { for obj_data in batch_result { - let mut obj_data = obj_data?; - obj_data.1.offset = self.inner_offset as u64; - self.write_all_and_update(&obj_data.0).await; - idx_entries.push(obj_data.1); + let (encoded_bytes, mut idx_entry) = obj_data?; + idx_entry.offset = self.inner_offset as u64; + self.write_owned_and_update(encoded_bytes).await; + idx_entries.push(idx_entry); } }); } @@ -674,18 +673,18 @@ impl PackEncoder { // hash signature let hash_result = self.inner_hash.clone().finalize(); self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap()); - self.send_data(hash_result.to_vec()).await; + self.send_data(hash_result).await; self.drop_sender(); self.idx_entries = Some(idx_entries); Ok(()) } - /// Write data to writer and update hash & offset - async fn write_all_and_update(&mut self, data: &[u8]) { - self.inner_hash.update(data); + /// Write owned pack object bytes and update hash and offset without copying the buffer again. + async fn write_owned_and_update(&mut self, data: Vec) { + self.inner_hash.update(&data); self.inner_offset += data.len(); - self.send_data(data.to_vec()).await; + self.send_data(data).await; } async fn generate_idx_file(&mut self) -> Result<(), GitError> { @@ -908,6 +907,40 @@ mod tests { check_format(&result); } + #[tokio::test] + async fn test_parallel_encode_after_owned_write_roundtrips() { + let _guard = set_hash_kind_for_test(HashKind::Sha1); + let contents = vec![ + "parallel alpha", + "parallel beta", + "parallel gamma", + "parallel delta", + ]; + let (tx, mut rx) = mpsc::channel(16); + let (entry_tx, entry_rx) = mpsc::channel::>(16); + let encoder = PackEncoder::new(contents.len(), 0, tx); + encoder.encode_async(entry_rx).await.unwrap(); + + for content in contents { + let entry: Entry = Blob::from_content(content).into(); + entry_tx + .send(MetaAttached { + inner: entry, + meta: EntryMeta::new(), + }) + .await + .unwrap(); + } + drop(entry_tx); + + let mut result = Vec::new(); + while let Some(chunk) = rx.recv().await { + result.extend(chunk); + } + + check_format(&result); + } + #[tokio::test] async fn test_pack_encoder_sha256() { let _guard = set_hash_kind_for_test(HashKind::Sha256); From a78d704f5c85ce0edc5bc4647d97d09bcc818b87 Mon Sep 17 00:00:00 2001 From: cheng Date: Tue, 2 Jun 2026 10:07:32 +0800 Subject: [PATCH 4/4] ci: fix clippy warnings and pack fixture race --- src/internal/pack/pack_index.rs | 2 +- src/internal/pack/test_pack_download.rs | 15 ++++++--------- src/protocol/pack.rs | 20 ++++++++++---------- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/internal/pack/pack_index.rs b/src/internal/pack/pack_index.rs index 5fe2694a..a4733321 100644 --- a/src/internal/pack/pack_index.rs +++ b/src/internal/pack/pack_index.rs @@ -83,7 +83,7 @@ impl IdxBuilder { /// Write the fanout table for the index. async fn write_fanout(&mut self, entries: &mut [IndexEntry]) -> Result<(), GitError> { - entries.sort_by(|a, b| a.hash.cmp(&b.hash)); + entries.sort_by_key(|entry| entry.hash); let mut fanout = [0u32; 256]; for entry in entries.iter() { fanout[entry.hash.to_data()[0] as usize] += 1; diff --git a/src/internal/pack/test_pack_download.rs b/src/internal/pack/test_pack_download.rs index 047d1524..9d04b2f0 100644 --- a/src/internal/pack/test_pack_download.rs +++ b/src/internal/pack/test_pack_download.rs @@ -49,16 +49,11 @@ fn release_ref(path: &Path) -> bool { static DOWNLOAD_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); /// Download a pack/idx file if not already present, returning the local path. -fn ensure_downloaded(filename: &str) -> PathBuf { +fn ensure_downloaded_locked(filename: &str) -> PathBuf { let path = download_dir().join(filename); if path.exists() { return path; } - let _lock = DOWNLOAD_LOCK.lock().unwrap_or_else(|e| e.into_inner()); - // Double-check after acquiring lock. - if path.exists() { - return path; - } let url = format!("{BASE_URL}/{filename}"); tracing::info!("Downloading test pack file: {url}"); let mut response = ureq::get(&url) @@ -83,6 +78,7 @@ pub struct PackFileGuard { impl Drop for PackFileGuard { fn drop(&mut self) { + let _lock = DOWNLOAD_LOCK.lock().unwrap_or_else(|e| e.into_inner()); if release_ref(&self.path) { let _ = std::fs::remove_file(&self.path); } @@ -92,14 +88,15 @@ impl Drop for PackFileGuard { /// Download a pack file (and its companion .idx if the file is a .pack), /// returning `(path, guard)`. The file is deleted when all guards for it are dropped. pub fn download_pack_file(filename: &str) -> (PathBuf, PackFileGuard) { - let path = ensure_downloaded(filename); + let _lock = DOWNLOAD_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + let path = ensure_downloaded_locked(filename); // Also download the companion file (.pack ↔ .idx). if filename.ends_with(".pack") { let idx = filename.replace(".pack", ".idx"); - let _ = ensure_downloaded(&idx); + let _ = ensure_downloaded_locked(&idx); } else if filename.ends_with(".idx") { let pack = filename.replace(".idx", ".pack"); - let _ = ensure_downloaded(&pack); + let _ = ensure_downloaded_locked(&pack); } acquire_ref(&path); let guard = PackFileGuard { path: path.clone() }; diff --git a/src/protocol/pack.rs b/src/protocol/pack.rs index d75ff838..686e4595 100644 --- a/src/protocol/pack.rs +++ b/src/protocol/pack.rs @@ -235,16 +235,16 @@ where .await?; } crate::internal::object::tree::TreeItemMode::Blob - | crate::internal::object::tree::TreeItemMode::BlobExecutable => { - if !visited_blobs.contains(&entry_hash) { - visited_blobs.insert(entry_hash.clone()); - let blob = self.repo_access.get_blob(&entry_hash).await.map_err(|e| { - ProtocolError::repository_error(format!( - "Failed to get blob {entry_hash}: {e}" - )) - })?; - blobs.push(blob); - } + | crate::internal::object::tree::TreeItemMode::BlobExecutable + if !visited_blobs.contains(&entry_hash) => + { + visited_blobs.insert(entry_hash.clone()); + let blob = self.repo_access.get_blob(&entry_hash).await.map_err(|e| { + ProtocolError::repository_error(format!( + "Failed to get blob {entry_hash}: {e}" + )) + })?; + blobs.push(blob); } _ => {} }