From eeda17b155ced90c29e2e196a41acbe682c3fc9a Mon Sep 17 00:00:00 2001 From: Ruzel Ibragimov Date: Wed, 21 Jan 2026 14:00:11 +0000 Subject: [PATCH] add test to calculate overhead --- kernel/src/snapshot.rs | 424 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 424 insertions(+) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 9f19e93641..24bb206836 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -1751,4 +1751,428 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_snapshot_memory_usage() -> DeltaResult<()> { + use memory_profiler::measure_snapshot; + + // Number of versions to create (adjust this to test different scales) + const NUM_VERSIONS: u64 = 100_000; + // Checkpoint interval - create checkpoint this many versions before the end + const CHECKPOINT_INTERVAL: u64 = 200; + + let store = Arc::new(InMemory::new()); + let url = Url::parse("memory:///memory_test")?; + let engine = DefaultEngineBuilder::new(store.clone()).build(); + + // Create initial commit with protocol and metadata + let commit0 = vec![ + json!({ + "commitInfo": { + "timestamp": 1587968586154i64, + "operation": "WRITE", + } + }), + json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 2 + } + }), + json!({ + "metaData": { + "id": "test-table-memory", + "format": { + "provider": "parquet", + "options": {} + }, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", + "partitionColumns": [], + "configuration": {}, + "createdTime": 1587968585495i64 + } + }), + json!({ + "add": { + "path": "part-00000-1.parquet", + "size": 12345, + "modificationTime": 1587968586154i64, + "dataChange": true, + "stats": "{\"numRecords\":100}" + } + }), + ]; + commit(store.as_ref(), 0, commit0.clone()).await; + + println!("Creating {} commit files (this may take a while)...", NUM_VERSIONS); + + // Create all commits + for version in 1u64..=NUM_VERSIONS { + let commit_data = vec![ + json!({ + "commitInfo": { + "timestamp": 1587968586154i64 + version as i64 * 1000, + "operation": "WRITE", + } + }), + json!({ + "add": { + "path": format!("part-{:05}-1.parquet", version), + "size": 12345, + "modificationTime": 1587968586154i64 + version as i64 * 1000, + "dataChange": true, + "stats": "{\"numRecords\":100}" + } + }), + ]; + commit(store.as_ref(), version, commit_data).await; + + if version % 10_000 == 0 { + println!(" Created {} commits...", version); + } + } + + // ===== TEST 1: No checkpoint ===== + println!("\n========================================"); + println!("TEST 1: Building snapshot WITHOUT checkpoint"); + println!("========================================"); + println!("Building snapshot at version {} (no checkpoints)...", NUM_VERSIONS); + + let snapshot_no_checkpoint = Snapshot::builder_for(url.clone()) + .at_version(NUM_VERSIONS) + .build(&engine)?; + + println!("\nMeasuring memory usage..."); + let stats_no_checkpoint = measure_snapshot(&snapshot_no_checkpoint); + + let total_commits = NUM_VERSIONS + 1; + println!("\n=== Snapshot Memory Usage (NO CHECKPOINT) ==="); + println!("Versions: 0-{} ({} commits)", NUM_VERSIONS, total_commits); + println!("Checkpoints: 0"); + println!("\nMemory Breakdown:"); + println!(" Stack memory: {:>10} bytes ({:>8.2} KB)", stats_no_checkpoint.stack_bytes, stats_no_checkpoint.stack_bytes as f64 / 1024.0); + println!(" Heap memory: {:>10} bytes ({:>8.2} KB)", stats_no_checkpoint.heap_bytes, stats_no_checkpoint.heap_bytes as f64 / 1024.0); + println!(" Total memory: {:>10} bytes ({:>8.2} KB, {:.2} MB)", + stats_no_checkpoint.total_bytes, + stats_no_checkpoint.total_bytes as f64 / 1024.0, + stats_no_checkpoint.total_bytes as f64 / (1024.0 * 1024.0) + ); + println!("\nPer-commit average: {:.1} bytes", stats_no_checkpoint.total_bytes as f64 / total_commits as f64); + + // Verify the snapshot has the expected structure + assert_eq!(snapshot_no_checkpoint.version(), NUM_VERSIONS); + assert_eq!(snapshot_no_checkpoint.log_segment.checkpoint_version, None); + assert_eq!(snapshot_no_checkpoint.log_segment.ascending_commit_files.len(), total_commits as usize); + + // ===== TEST 2: With checkpoint ===== + println!("\n========================================"); + println!("TEST 2: Creating checkpoint and rebuilding snapshot"); + println!("========================================"); + + let checkpoint_version = NUM_VERSIONS - CHECKPOINT_INTERVAL; + println!("Creating checkpoint at version {}...", checkpoint_version); + + // Create checkpoint data (only protocol and metadata, no Add actions for simplicity) + let checkpoint_data = vec![ + json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 2 + } + }), + json!({ + "metaData": { + "id": "test-table-memory", + "format": { + "provider": "parquet", + "options": {} + }, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", + "partitionColumns": [], + "configuration": {}, + "createdTime": 1587968585495i64 + } + }), + ]; + + let handler = engine.json_handler(); + let json_strings: StringArray = checkpoint_data + .iter() + .map(|json| json.to_string()) + .collect::>() + .into(); + let parsed = handler + .parse_json( + string_array_to_engine_data(json_strings), + crate::actions::get_commit_schema().clone(), + ) + .unwrap(); + let checkpoint_batch = ArrowEngineData::try_from_engine_data(parsed).unwrap(); + let checkpoint: RecordBatch = checkpoint_batch.into(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?; + writer.write(&checkpoint)?; + writer.close()?; + + store + .put( + &delta_path_for_version(checkpoint_version, "checkpoint.parquet"), + buffer.into(), + ) + .await + .unwrap(); + + // Build snapshot at final version (now with checkpoint) + println!("Building snapshot at version {} (with checkpoint at {})...", NUM_VERSIONS, checkpoint_version); + let snapshot_with_checkpoint = Snapshot::builder_for(url.clone()) + .at_version(NUM_VERSIONS) + .build(&engine)?; + + println!("\nMeasuring memory usage..."); + let stats_with_checkpoint = measure_snapshot(&snapshot_with_checkpoint); + + println!("\n=== Snapshot Memory Usage (WITH CHECKPOINT) ==="); + println!("Versions: 0-{} ({} total)", NUM_VERSIONS, total_commits); + println!("Checkpoint at: {}", checkpoint_version); + println!("Commits in memory: {} (versions {}-{})", + snapshot_with_checkpoint.log_segment.ascending_commit_files.len(), + checkpoint_version + 1, + NUM_VERSIONS + ); + println!("\nMemory Breakdown:"); + println!(" Stack memory: {:>10} bytes ({:>8.2} KB)", stats_with_checkpoint.stack_bytes, stats_with_checkpoint.stack_bytes as f64 / 1024.0); + println!(" Heap memory: {:>10} bytes ({:>8.2} KB)", stats_with_checkpoint.heap_bytes, stats_with_checkpoint.heap_bytes as f64 / 1024.0); + println!(" Total memory: {:>10} bytes ({:>8.2} KB, {:.2} MB)", + stats_with_checkpoint.total_bytes, + stats_with_checkpoint.total_bytes as f64 / 1024.0, + stats_with_checkpoint.total_bytes as f64 / (1024.0 * 1024.0) + ); + + let commits_in_memory = snapshot_with_checkpoint.log_segment.ascending_commit_files.len(); + println!("\nPer-commit average: {:.1} bytes", stats_with_checkpoint.total_bytes as f64 / commits_in_memory as f64); + + // Verify structure + assert_eq!(snapshot_with_checkpoint.version(), NUM_VERSIONS); + assert_eq!(snapshot_with_checkpoint.log_segment.checkpoint_version, Some(checkpoint_version)); + assert_eq!(commits_in_memory, CHECKPOINT_INTERVAL as usize); + + // ===== Comparison ===== + println!("\n========================================"); + println!("COMPARISON"); + println!("========================================"); + let memory_saved = stats_no_checkpoint.total_bytes as i64 - stats_with_checkpoint.total_bytes as i64; + let commits_saved = (total_commits - CHECKPOINT_INTERVAL) as i64; + println!("Memory saved by checkpoint: {} bytes ({:.2} KB, {:.2} MB)", + memory_saved, + memory_saved as f64 / 1024.0, + memory_saved as f64 / (1024.0 * 1024.0) + ); + println!("Commits saved from memory: {} commits", commits_saved); + println!("Memory reduction: {:.1}%", + (memory_saved as f64 / stats_no_checkpoint.total_bytes as f64) * 100.0 + ); + + Ok(()) + } + + /// Memory profiler for calculating actual heap and stack memory usage + #[allow(dead_code, unreachable_pub)] + mod memory_profiler { + use std::collections::HashMap; + use std::mem::size_of; + use url::Url; + use crate::log_segment::LogSegment; + use crate::path::{ParsedLogPath, AsUrl}; + use crate::table_configuration::TableConfiguration; + use crate::Snapshot; + + pub struct MemoryStats { + pub stack_bytes: usize, + pub heap_bytes: usize, + pub total_bytes: usize, + } + + impl MemoryStats { + fn new() -> Self { + Self { + stack_bytes: 0, + heap_bytes: 0, + total_bytes: 0, + } + } + + fn add(&mut self, stack: usize, heap: usize) { + self.stack_bytes += stack; + self.heap_bytes += heap; + self.total_bytes = self.stack_bytes + self.heap_bytes; + } + } + + fn measure_string(s: &String) -> (usize, usize) { + let stack = size_of::(); + let heap = s.capacity(); + (stack, heap) + } + + fn measure_url(url: &Url) -> (usize, usize) { + let stack = size_of::(); + // Url stores the full serialization internally as a String + let heap = url.as_str().len() + 50; // approximate overhead + (stack, heap) + } + + fn measure_vec(vec: &Vec, measure_item: impl Fn(&T) -> (usize, usize)) -> (usize, usize) { + let stack = size_of::>(); + let mut heap = vec.capacity() * size_of::(); + + for item in vec { + let (_, item_heap) = measure_item(item); + heap += item_heap; + } + + (stack, heap) + } + + fn measure_hashmap(map: &HashMap) -> (usize, usize) { + let stack = size_of::>(); + // HashMap has overhead for buckets and node pointers + let mut heap = map.capacity() * (size_of::() + size_of::() + 16); + + // For String keys/values, add their heap allocation + for (k, v) in map { + if size_of::() == size_of::() { + // Assume it's a String + let k_ptr = k as *const K as *const String; + unsafe { + heap += (*k_ptr).capacity(); + } + } + if size_of::() == size_of::() { + let v_ptr = v as *const V as *const String; + unsafe { + heap += (*v_ptr).capacity(); + } + } + } + + (stack, heap) + } + + fn measure_parsed_log_path(path: &ParsedLogPath) -> (usize, usize) { + let mut stats = MemoryStats::new(); + + // Stack size of ParsedLogPath struct + stats.add(size_of::(), 0); + + // location: FileMeta (which contains Url) + let (url_stack, url_heap) = measure_url(path.location.as_url()); + stats.add(url_stack, url_heap); + + // filename: String + let (fn_stack, fn_heap) = measure_string(&path.filename); + stats.add(fn_stack, fn_heap); + + // extension: String + let (ext_stack, ext_heap) = measure_string(&path.extension); + stats.add(ext_stack, ext_heap); + + // version: u64 + stats.add(size_of::(), 0); + + // file_type: LogPathFileType (enum) + stats.add(16, 0); // enum with max 16 bytes + + (stats.stack_bytes, stats.heap_bytes) + } + + fn measure_log_segment(log_segment: &LogSegment) -> MemoryStats { + let mut stats = MemoryStats::new(); + + // Base struct size + stats.add(size_of::(), 0); + + // end_version: u64 + stats.add(size_of::(), 0); + + // checkpoint_version: Option + stats.add(size_of::>(), 0); + + // log_root: Url + let (url_stack, url_heap) = measure_url(&log_segment.log_root); + stats.add(url_stack, url_heap); + + // ascending_commit_files: Vec + let (vec_stack, vec_heap) = measure_vec(&log_segment.ascending_commit_files, measure_parsed_log_path); + stats.add(vec_stack, vec_heap); + + // ascending_compaction_files: Vec + let (vec_stack, vec_heap) = measure_vec(&log_segment.ascending_compaction_files, measure_parsed_log_path); + stats.add(vec_stack, vec_heap); + + // checkpoint_parts: Vec + let (vec_stack, vec_heap) = measure_vec(&log_segment.checkpoint_parts, measure_parsed_log_path); + stats.add(vec_stack, vec_heap); + + // latest_crc_file: Option + if let Some(ref crc) = log_segment.latest_crc_file { + let (crc_stack, crc_heap) = measure_parsed_log_path(crc); + stats.add(crc_stack, crc_heap); + } + + // latest_commit_file: Option + if let Some(ref commit) = log_segment.latest_commit_file { + let (commit_stack, commit_heap) = measure_parsed_log_path(commit); + stats.add(commit_stack, commit_heap); + } + + // checkpoint_schema: Option> - just pointer + stats.add(size_of::>>(), 0); + + // max_published_version: Option + stats.add(size_of::>(), 0); + + stats + } + + fn measure_table_configuration(_config: &TableConfiguration) -> MemoryStats { + let mut stats = MemoryStats::new(); + + // Base struct size + stats.add(size_of::(), 0); + + // This is a rough estimate since we can't easily access private fields + // metadata contains: id, name, description, schema_string, partition_columns, configuration + // Approximate: ~5-10KB based on typical schemas + stats.add(0, 8000); + + // protocol: small, ~100 bytes + stats.add(0, 100); + + // schema: Arc pointer + stats.add(size_of::>(), 0); + + // table_properties, column_mapping_mode, table_root, version + stats.add(0, 500); + + stats + } + + pub fn measure_snapshot(snapshot: &Snapshot) -> MemoryStats { + let mut stats = MemoryStats::new(); + + // Base Snapshot struct + stats.add(size_of::(), 0); + + // log_segment + let log_seg_stats = measure_log_segment(&snapshot.log_segment); + stats.add(log_seg_stats.stack_bytes, log_seg_stats.heap_bytes); + + // table_configuration + let table_config_stats = measure_table_configuration(snapshot.table_configuration()); + stats.add(table_config_stats.stack_bytes, table_config_stats.heap_bytes); + + stats + } + } }