Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions glidefs/examples/cache_hot_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! Empirical probe for Claim 4: can a bigger explicit L1 + O_DIRECT recover the
//! warm-read latency that buffered I/O gets "for free" from the page cache?
//!
//! Exercises the full `HybridCache::get()` path (L1 memory tier -> L2 SSD tier)
//! under a hot-biased access pattern, so L1 sizing determines the hit source:
//! - buffered + small L1 -> hot reads miss L1, served by page-cached L2 (~tens of µs)
//! - direct + small L1 -> hot reads miss L1, served by media (~hundreds of µs)
//! - direct + large L1 -> hot reads hit L1 (~sub-µs)
//!
//! Usage:
//! cargo run --release --example cache_hot_read -- <buffered|direct> <l1_mb> <hot_mb> <total_mb> <dir>

use std::path::Path;
use std::time::{Duration, Instant};

use bytes::Bytes;
use foyer::{
BlockEngineConfig, DeviceBuilder, EvictionConfig, FsDeviceBuilder, HybridCache,
HybridCacheBuilder, HybridCachePolicy, IoEngineConfig, RecoverMode, S3FifoConfig,
};
use rand::Rng;

use glidefs::block::block_map::{Blake3Hash, blake3_128};

const BLOCK_SIZE: usize = 128 * 1024;

#[tokio::main]
async fn main() {
let a: Vec<String> = std::env::args().collect();
let mode = a.get(1).map(String::as_str).unwrap_or("buffered");
let l1_mb: usize = a.get(2).and_then(|s| s.parse().ok()).unwrap_or(2);
let hot_mb: usize = a.get(3).and_then(|s| s.parse().ok()).unwrap_or(32);
let total_mb: usize = a.get(4).and_then(|s| s.parse().ok()).unwrap_or(256);
let dir = a.get(5).cloned().unwrap_or_else(|| "/tmp/cache_hot".into());
let direct = mode == "direct";

let num_total = total_mb * 1024 * 1024 / BLOCK_SIZE;
let num_hot = (hot_mb * 1024 * 1024 / BLOCK_SIZE).min(num_total);

std::fs::create_dir_all(&dir).unwrap();
let device = FsDeviceBuilder::new(Path::new(&dir))
.with_capacity(num_total * BLOCK_SIZE * 2)
.with_direct(direct)
.build()
.expect("build device");
let cache: HybridCache<Blake3Hash, Bytes> = HybridCacheBuilder::new()
.with_name("cache-hot-read")
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(l1_mb * 1024 * 1024)
.with_eviction_config(EvictionConfig::S3Fifo(S3FifoConfig::default()))
.with_weighter(|_k: &Blake3Hash, v: &Bytes| v.len())
.storage()
.with_engine_config(BlockEngineConfig::new(device))
.with_io_engine_config(Box::new(foyer::PsyncIoEngineConfig::new()) as Box<dyn IoEngineConfig>)
.with_recover_mode(RecoverMode::Quiet)
.build()
.await
.expect("build cache");

// Populate, recording hashes (block index -> hash).
let mut hashes = Vec::with_capacity(num_total);
let mut buf = vec![0u8; BLOCK_SIZE];
for i in 0..num_total {
for (j, b) in buf.iter_mut().enumerate() {
*b = (i as u32).wrapping_mul(2654435761).wrapping_add(j as u32) as u8;
}
let h = blake3_128(&buf);
hashes.push(h);
cache.insert(h, Bytes::copy_from_slice(&buf));
if i % 32 == 31 {
cache.storage().wait().await;
}
}
cache.storage().wait().await;

// 90% of accesses hit the hot set (first num_hot blocks), 10% uniform.
let mut rng = rand::thread_rng();
let pick = |rng: &mut rand::rngs::ThreadRng| -> usize {
if rng.gen_bool(0.9) {
rng.gen_range(0..num_hot)
} else {
rng.gen_range(0..num_total)
}
};

// Warm up so L1 converges to the hot set under this pattern.
for _ in 0..(num_total * 3) {
let _ = cache.get(&hashes[pick(&mut rng)]).await.expect("get");
}

// Timed run.
const ITERS: usize = 100_000;
let mut samples = Vec::with_capacity(ITERS);
for _ in 0..ITERS {
let h = hashes[pick(&mut rng)];
let t = Instant::now();
let got = cache.get(&h).await.expect("get");
samples.push(t.elapsed());
assert!(got.is_some(), "miss");
}
samples.sort_unstable();
let median = samples[ITERS / 2];
let p99 = samples[ITERS * 99 / 100];
let mean: Duration = samples.iter().sum::<Duration>() / ITERS as u32;

println!(
"mode={mode:<8} L1={l1_mb:>4}MiB hot={hot_mb}MiB total={total_mb}MiB | \
median={:>8.2?} mean={:>8.2?} p99={:>8.2?}",
median, mean, p99
);
std::fs::remove_dir_all(&dir).ok();
}
131 changes: 131 additions & 0 deletions glidefs/examples/cache_l2_poll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//! Empirical probe: do io_uring's O_DIRECT-only knobs (iopoll / sqpoll) help the
//! foyer L2 (SSD) read path, and what do they cost in CPU?
//!
//! Builds an O_DIRECT foyer cache with the uring engine in {plain, iopoll,
//! sqpoll} mode, populates it, then times `storage().load()` (the pure media read
//! path) at a given concurrency -- reporting read latency AND the CPU time the
//! process burned during the timed window (polling modes spin, so watch stime).
//!
//! Usage:
//! cargo run --release --example cache_l2_poll -- <none|iopoll|sqpoll> <conc> <dir>

use std::path::Path;
use std::time::{Duration, Instant};

use bytes::Bytes;
use foyer::{
BlockEngineConfig, DeviceBuilder, EvictionConfig, FsDeviceBuilder, HybridCache,
HybridCacheBuilder, HybridCachePolicy, IoEngineConfig, RecoverMode, S3FifoConfig,
UringIoEngineConfig,
};
use futures::future::join_all;
use rand::Rng;

use glidefs::block::block_map::{Blake3Hash, blake3_128};

const BLOCK_SIZE: usize = 128 * 1024;
const NUM_BLOCKS: usize = 2048; // 256 MiB working set

// CPU time (user+sys) consumed by this process so far, in seconds.
fn cpu_secs() -> (f64, f64) {
let stat = std::fs::read_to_string("/proc/self/stat").unwrap_or_default();
// Fields 14 (utime) and 15 (stime) in clock ticks, after the ")" of comm.
let after = stat.rsplit(')').next().unwrap_or("");
let f: Vec<&str> = after.split_whitespace().collect();
let hz = 100.0; // USER_HZ
let utime = f.get(11).and_then(|s| s.parse::<f64>().ok()).unwrap_or(0.0);
let stime = f.get(12).and_then(|s| s.parse::<f64>().ok()).unwrap_or(0.0);
(utime / hz, stime / hz)
}

#[tokio::main]
async fn main() {
let a: Vec<String> = std::env::args().collect();
let poll = a.get(1).map(String::as_str).unwrap_or("none");
let conc: usize = a.get(2).and_then(|s| s.parse().ok()).unwrap_or(16);
let dir = a.get(3).cloned().unwrap_or_else(|| "/tmp/cache_l2_poll".into());

let mut uring = UringIoEngineConfig::new();
match poll {
"none" => {}
"iopoll" => uring = uring.with_iopoll(true),
"sqpoll" => uring = uring.with_sqpoll(true),
other => panic!("poll must be none|iopoll|sqpoll, got {other}"),
}

std::fs::create_dir_all(&dir).unwrap();
let device = FsDeviceBuilder::new(Path::new(&dir))
.with_capacity(NUM_BLOCKS * BLOCK_SIZE * 2)
.with_direct(true) // iopoll/sqpoll REQUIRE O_DIRECT
.build()
.expect("build O_DIRECT device");

let cache: HybridCache<Blake3Hash, Bytes> = match HybridCacheBuilder::new()
.with_name("cache-l2-poll")
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(2 * 1024 * 1024) // tiny L1 -> reads hit L2 media
.with_eviction_config(EvictionConfig::S3Fifo(S3FifoConfig::default()))
.with_weighter(|_k: &Blake3Hash, v: &Bytes| v.len())
.storage()
.with_engine_config(BlockEngineConfig::new(device))
.with_io_engine_config(Box::new(uring) as Box<dyn IoEngineConfig>)
.with_recover_mode(RecoverMode::Quiet)
.build()
.await
{
Ok(c) => c,
Err(e) => {
println!("poll={poll:<7} UNSUPPORTED: {e}");
std::fs::remove_dir_all(&dir).ok();
return;
}
};

// Populate; keep hashes.
let mut hashes = Vec::with_capacity(NUM_BLOCKS);
let mut buf = vec![0u8; BLOCK_SIZE];
for i in 0..NUM_BLOCKS {
for (j, b) in buf.iter_mut().enumerate() {
*b = (i as u32).wrapping_mul(2654435761).wrapping_add(j as u32) as u8;
}
let h = blake3_128(&buf);
hashes.push(h);
cache.insert(h, Bytes::copy_from_slice(&buf));
if i % 32 == 31 {
cache.storage().wait().await;
}
}
cache.storage().wait().await;

// Timed: storage().load() (pure L2/media path), `conc` in flight per unit.
const UNITS: usize = 4000;
let mut rng = rand::thread_rng();
let (u0, s0) = cpu_secs();
let mut samples = Vec::with_capacity(UNITS);
for _ in 0..UNITS {
let keys: Vec<Blake3Hash> = (0..conc).map(|_| hashes[rng.gen_range(0..hashes.len())]).collect();
let t = Instant::now();
let loads = join_all(keys.iter().map(|k| cache.storage().load(k))).await;
samples.push(t.elapsed());
for l in loads {
assert!(matches!(l.expect("load"), foyer::Load::Entry { .. }));
}
}
let (u1, s1) = cpu_secs();
samples.sort_unstable();
let wall: Duration = samples.iter().sum();
let per_read = wall / (UNITS * conc) as u32;
let median_batch = samples[UNITS / 2];
let reads = (UNITS * conc) as f64;
let cpu_user = u1 - u0;
let cpu_sys = s1 - s0;

println!(
"poll={poll:<7} conc={conc:<3} | per_read={per_read:>8.2?} batch_median={median_batch:>8.2?} \
| CPU/read: user={:>5.2}us sys={:>5.2}us (wall={:.1}s)",
cpu_user / reads * 1e6,
cpu_sys / reads * 1e6,
wall.as_secs_f64(),
);
std::fs::remove_dir_all(&dir).ok();
}
136 changes: 136 additions & 0 deletions glidefs/examples/cache_ram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//! Empirical probe for the buffered-vs-O_DIRECT RAM-ownership question.
//!
//! Builds a foyer `HybridCache<Blake3Hash, Bytes>` (mirroring the clean-cache
//! config) on a real directory, populates it, then reports the process RSS (which
//! includes foyer's explicit L1 memory tier). It then EXITS leaving the cache
//! files on disk so the page cache they hold survives -- inspect it with `fincore`
//! afterward. Buffered should show ~working-set bytes still in page cache (i.e.
//! double-cached on top of foyer's L1); O_DIRECT should show ~0.
//!
//! Usage:
//! cargo run --release --example cache_ram -- <buffered|direct> <mem_mb> <num_blocks> <dir>
//!
//! Then:
//! fincore --bytes --total <dir>/* ; rm -rf <dir>

use std::path::Path;
use std::time::Instant;

use bytes::Bytes;
use foyer::{
BlockEngineConfig, DeviceBuilder, EvictionConfig, FsDeviceBuilder, HybridCache,
HybridCacheBuilder, HybridCachePolicy, IoEngineConfig, PsyncIoEngineConfig, RecoverMode,
S3FifoConfig,
};

use glidefs::block::block_map::{Blake3Hash, blake3_128};
use rand::Rng;

const BLOCK_SIZE: usize = 128 * 1024;

fn vm_rss_kb() -> u64 {
let status = std::fs::read_to_string("/proc/self/status").unwrap_or_default();
for line in status.lines() {
if let Some(rest) = line.strip_prefix("VmRSS:") {
return rest
.split_whitespace()
.next()
.and_then(|v| v.parse().ok())
.unwrap_or(0);
}
}
0
}

#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();
let mode = args.get(1).map(String::as_str).unwrap_or("buffered");
let mem_mb: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(64);
let num_blocks: usize = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(2048);
let dir = args
.get(4)
.cloned()
.unwrap_or_else(|| "/tmp/cache_ram_probe".to_string());
// If > 0, after populating keep reading the L2 tier for this many seconds
// (used as an "active cache" antagonist in the tenant-interference test).
let soak_secs: u64 = args.get(5).and_then(|s| s.parse().ok()).unwrap_or(0);

let direct = match mode {
"buffered" => false,
"direct" => true,
other => panic!("mode must be buffered|direct, got {other}"),
};

let mem_bytes = mem_mb * 1024 * 1024;
let ssd_bytes = (num_blocks * BLOCK_SIZE) * 2; // headroom
let working_set_mb = (num_blocks * BLOCK_SIZE) / (1024 * 1024);

std::fs::create_dir_all(&dir).unwrap();
let device = FsDeviceBuilder::new(Path::new(&dir))
.with_capacity(ssd_bytes)
.with_direct(direct)
.build()
.expect("build device (O_DIRECT not supported here?)");

let cache: HybridCache<Blake3Hash, Bytes> = HybridCacheBuilder::new()
.with_name("cache-ram-probe")
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(mem_bytes)
.with_eviction_config(EvictionConfig::S3Fifo(S3FifoConfig::default()))
.with_weighter(|_k: &Blake3Hash, v: &Bytes| v.len())
.storage()
.with_engine_config(BlockEngineConfig::new(device))
.with_io_engine_config(Box::new(PsyncIoEngineConfig::new()) as Box<dyn IoEngineConfig>)
.with_recover_mode(RecoverMode::Quiet)
.build()
.await
.expect("build hybrid cache");

let rss_before = vm_rss_kb();

// Populate in batches, flushing so everything lands on the SSD tier.
let mut hashes = Vec::with_capacity(num_blocks);
let mut data = vec![0u8; BLOCK_SIZE];
for i in 0..num_blocks {
// Cheap deterministic fill; distinct per block so hashes differ.
for (j, b) in data.iter_mut().enumerate() {
*b = (i as u8).wrapping_add(j as u8).wrapping_mul(31);
}
let hash = blake3_128(&data);
hashes.push(hash);
cache.insert(hash, Bytes::copy_from_slice(&data));
if i % 32 == 31 {
cache.storage().wait().await;
}
}
cache.storage().wait().await;

let rss_after = vm_rss_kb();

println!("mode={mode} direct={direct} L1_mem={mem_mb}MiB working_set={working_set_mb}MiB");
println!(
"process RSS: before={} MiB after={} MiB (delta={} MiB)",
rss_before / 1024,
rss_after / 1024,
(rss_after.saturating_sub(rss_before)) / 1024,
);
println!("cache dir: {dir}");
println!("--> now run: fincore --bytes --total {dir}/*");

if soak_secs > 0 {
// Continuously read the L2 tier so its footprint stays hot. In buffered
// mode this keeps ~working-set bytes resident in the page cache (the
// antagonist); in direct mode it touches the device, not the page cache.
println!("SOAKING for {soak_secs}s"); // readiness marker for the harness
let mut rng = rand::thread_rng();
let deadline = Instant::now() + std::time::Duration::from_secs(soak_secs);
while Instant::now() < deadline {
for _ in 0..256 {
let h = hashes[rng.gen_range(0..hashes.len())];
let _ = cache.storage().load(&h).await;
}
}
}
// Exit without dropping caches; files persist for fincore inspection.
}
Loading
Loading