Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 128 additions & 22 deletions src/internal/pack/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, Gi
.expect("zlib compress should never failed");
inflate.flush().expect("zlib flush should never failed");
let compressed_data = inflate.finish().expect("zlib compress should never failed");
// self.write_all_and_update(&compressed_data).await;
encoded_data.extend(compressed_data);
Ok(encoded_data)
}
Expand Down Expand Up @@ -348,8 +347,8 @@ impl PackEncoder {
enable_zstdelta: bool,
) -> Result<(), GitError> {
let head = encode_header(self.object_number);
self.send_data(head.clone()).await;
self.inner_hash.update(&head);
self.send_data(head).await;

// ensure only one decode can only invoke once
if self.start_encoding {
Expand Down Expand Up @@ -446,14 +445,15 @@ impl PackEncoder {
let blob_res = blob_results?;
let tag_res = tag_results?;

let total_entries = commit_res.len() + tree_res.len() + blob_res.len() + tag_res.len();
let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];

let mut idx_entries = Vec::new();
let mut idx_entries = Vec::with_capacity(total_entries);
for res in &mut all_res {
for data in res {
data.1.offset = self.inner_offset as u64;
self.write_all_and_update(&data.0).await;
idx_entries.push(data.1.clone());
for (encoded_bytes, mut idx_entry) in res.drain(..) {
idx_entry.offset = self.inner_offset as u64;
self.write_owned_and_update(encoded_bytes).await;
idx_entries.push(idx_entry);
}
}

Expand All @@ -462,7 +462,7 @@ impl PackEncoder {
// Hash signature
let hash_result = self.inner_hash.clone().finalize();
self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
self.send_data(hash_result.to_vec()).await;
self.send_data(hash_result).await;
Comment on lines 462 to +465

self.drop_sender();
Ok(())
Expand All @@ -482,7 +482,7 @@ impl PackEncoder {
) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
let mut current_offset = 0usize;
let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::with_capacity(bucket.len());
//let mut idx_entries: Vec<IndexEntry> = Vec::new();

for entry in bucket.iter_mut() {
Expand Down Expand Up @@ -584,8 +584,9 @@ impl PackEncoder {
if window.len() > window_size {
window.pop_front();
}
res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
current_offset += obj_data.len();
let obj_data_len = obj_data.len();
res.push((obj_data, IndexEntry::new(entry, 0)));
current_offset += obj_data_len;
}
Ok(res)
}
Expand All @@ -602,8 +603,8 @@ impl PackEncoder {
}

let head = encode_header(self.object_number);
self.send_data(head.clone()).await;
self.inner_hash.update(&head);
self.send_data(head).await;

// ensure only one decode can only invoke once
if self.start_encoding {
Expand All @@ -612,7 +613,7 @@ impl PackEncoder {
));
}

let mut idx_entries = Vec::new();
let mut idx_entries = Vec::with_capacity(self.object_number);
let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
tracing::info!("encode with batch size: {}", batch_size);
loop {
Expand Down Expand Up @@ -653,10 +654,10 @@ impl PackEncoder {

time_it!("parallel encode: write batch", {
for obj_data in batch_result {
let mut obj_data = obj_data?;
obj_data.1.offset = self.inner_offset as u64;
self.write_all_and_update(&obj_data.0).await;
idx_entries.push(obj_data.1);
let (encoded_bytes, mut idx_entry) = obj_data?;
idx_entry.offset = self.inner_offset as u64;
self.write_owned_and_update(encoded_bytes).await;
idx_entries.push(idx_entry);
}
});
}
Expand All @@ -672,18 +673,18 @@ impl PackEncoder {
// hash signature
let hash_result = self.inner_hash.clone().finalize();
self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
self.send_data(hash_result.to_vec()).await;
self.send_data(hash_result).await;
Comment on lines 673 to +676
self.drop_sender();

self.idx_entries = Some(idx_entries);
Ok(())
}

/// Write data to writer and update hash & offset
async fn write_all_and_update(&mut self, data: &[u8]) {
self.inner_hash.update(data);
/// Write owned pack object bytes and update hash and offset without copying the buffer again.
async fn write_owned_and_update(&mut self, data: Vec<u8>) {
self.inner_hash.update(&data);
self.inner_offset += data.len();
self.send_data(data.to_vec()).await;
self.send_data(data).await;
}

async fn generate_idx_file(&mut self) -> Result<(), GitError> {
Expand Down Expand Up @@ -835,6 +836,111 @@ mod tests {
assert!(pack_with_delta.len() <= pack_without_delta_size);
check_format(&pack_with_delta);
}

#[test]
fn test_try_as_offset_delta_keeps_one_result_per_input() {
let _guard = set_hash_kind_for_test(HashKind::Sha1);
let entries: Vec<Entry> = [
"alpha content",
"beta content",
"gamma content",
"delta content",
]
.into_iter()
.map(|content| Blob::from_content(content).into())
.collect();
let expected_hashes: Vec<ObjectHash> = entries.iter().map(|entry| entry.hash).collect();

let results = PackEncoder::try_as_offset_delta(entries, 0, false)
.expect("offset delta encoding should succeed");

assert_eq!(results.len(), expected_hashes.len());
for ((encoded, idx_entry), expected_hash) in results.iter().zip(expected_hashes) {
assert!(!encoded.is_empty(), "encoded object should not be empty");
assert_eq!(idx_entry.hash, expected_hash);
}
}

#[test]
fn test_try_as_offset_delta_accepts_empty_bucket() {
let _guard = set_hash_kind_for_test(HashKind::Sha1);
let entries = Vec::new();

let results = PackEncoder::try_as_offset_delta(entries, 0, false)
.expect("empty bucket should encode successfully");

assert!(results.is_empty());
}

#[tokio::test]
async fn test_delta_window_encode_after_copy_optimization_roundtrips() {
let _guard = set_hash_kind_for_test(HashKind::Sha1);
let shared_prefix = "shared-prefix-".repeat(16);
let contents = vec![
format!("{shared_prefix}alpha-tail"),
format!("{shared_prefix}beta-tail"),
format!("{shared_prefix}gamma-tail"),
format!("{shared_prefix}delta-tail"),
];
let (tx, mut rx) = mpsc::channel(16);
let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(16);
let encoder = PackEncoder::new(contents.len(), 4, tx);
encoder.encode_async(entry_rx).await.unwrap();

for content in contents {
Comment on lines +887 to +890
let entry: Entry = Blob::from_content(&content).into();
entry_tx
.send(MetaAttached {
inner: entry,
meta: EntryMeta::new(),
})
.await
.unwrap();
}
drop(entry_tx);

let mut result = Vec::new();
while let Some(chunk) = rx.recv().await {
result.extend(chunk);
}

check_format(&result);
}

#[tokio::test]
async fn test_parallel_encode_after_owned_write_roundtrips() {
let _guard = set_hash_kind_for_test(HashKind::Sha1);
let contents = vec![
"parallel alpha",
"parallel beta",
"parallel gamma",
"parallel delta",
];
let (tx, mut rx) = mpsc::channel(16);
let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(16);
let encoder = PackEncoder::new(contents.len(), 0, tx);
encoder.encode_async(entry_rx).await.unwrap();
Comment on lines +910 to +922

for content in contents {
let entry: Entry = Blob::from_content(content).into();
entry_tx
.send(MetaAttached {
inner: entry,
meta: EntryMeta::new(),
})
.await
.unwrap();
}
drop(entry_tx);

let mut result = Vec::new();
while let Some(chunk) = rx.recv().await {
result.extend(chunk);
}

check_format(&result);
}

#[tokio::test]
async fn test_pack_encoder_sha256() {
let _guard = set_hash_kind_for_test(HashKind::Sha256);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/pack/pack_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl IdxBuilder {

/// Write the fanout table for the index.
async fn write_fanout(&mut self, entries: &mut [IndexEntry]) -> Result<(), GitError> {
entries.sort_by(|a, b| a.hash.cmp(&b.hash));
entries.sort_by_key(|entry| entry.hash);
let mut fanout = [0u32; 256];
for entry in entries.iter() {
fanout[entry.hash.to_data()[0] as usize] += 1;
Expand Down
15 changes: 6 additions & 9 deletions src/internal/pack/test_pack_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,11 @@ fn release_ref(path: &Path) -> bool {
static DOWNLOAD_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));

/// Download a pack/idx file if not already present, returning the local path.
fn ensure_downloaded(filename: &str) -> PathBuf {
fn ensure_downloaded_locked(filename: &str) -> PathBuf {
let path = download_dir().join(filename);
if path.exists() {
return path;
}
let _lock = DOWNLOAD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
// Double-check after acquiring lock.
if path.exists() {
return path;
}
let url = format!("{BASE_URL}/{filename}");
tracing::info!("Downloading test pack file: {url}");
let mut response = ureq::get(&url)
Expand All @@ -83,6 +78,7 @@ pub struct PackFileGuard {

impl Drop for PackFileGuard {
fn drop(&mut self) {
let _lock = DOWNLOAD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
if release_ref(&self.path) {
let _ = std::fs::remove_file(&self.path);
}
Expand All @@ -92,14 +88,15 @@ impl Drop for PackFileGuard {
/// Download a pack file (and its companion .idx if the file is a .pack),
/// returning `(path, guard)`. The file is deleted when all guards for it are dropped.
pub fn download_pack_file(filename: &str) -> (PathBuf, PackFileGuard) {
let path = ensure_downloaded(filename);
let _lock = DOWNLOAD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let path = ensure_downloaded_locked(filename);
// Also download the companion file (.pack ↔ .idx).
if filename.ends_with(".pack") {
let idx = filename.replace(".pack", ".idx");
let _ = ensure_downloaded(&idx);
let _ = ensure_downloaded_locked(&idx);
} else if filename.ends_with(".idx") {
let pack = filename.replace(".idx", ".pack");
let _ = ensure_downloaded(&pack);
let _ = ensure_downloaded_locked(&pack);
}
acquire_ref(&path);
let guard = PackFileGuard { path: path.clone() };
Expand Down
20 changes: 10 additions & 10 deletions src/protocol/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,16 @@ where
.await?;
}
crate::internal::object::tree::TreeItemMode::Blob
| crate::internal::object::tree::TreeItemMode::BlobExecutable => {
if !visited_blobs.contains(&entry_hash) {
visited_blobs.insert(entry_hash.clone());
let blob = self.repo_access.get_blob(&entry_hash).await.map_err(|e| {
ProtocolError::repository_error(format!(
"Failed to get blob {entry_hash}: {e}"
))
})?;
blobs.push(blob);
}
| crate::internal::object::tree::TreeItemMode::BlobExecutable
if !visited_blobs.contains(&entry_hash) =>
{
visited_blobs.insert(entry_hash.clone());
let blob = self.repo_access.get_blob(&entry_hash).await.map_err(|e| {
ProtocolError::repository_error(format!(
"Failed to get blob {entry_hash}: {e}"
))
})?;
blobs.push(blob);
}
_ => {}
}
Expand Down
Loading