Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions fact-ebpf/src/bpf/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
// clang-format on

__always_inline static bool path_is_monitored(struct bound_path_t* path) {
if (!filter_by_prefix()) {
// no path configured, allow all
return true;
}

// Backup bytes length and restore it before exiting
unsigned int len = path->len;

Expand Down
21 changes: 0 additions & 21 deletions fact-ebpf/src/bpf/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,6 @@ __always_inline static struct helper_t* get_helper() {
return bpf_map_lookup_elem(&helper_map, &zero);
}

/**
* A map with a single entry, determining whether prefix filtering
* should be done based on the `path_prefix` map.
*/
struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__type(key, __u32);
__type(value, char);
__uint(max_entries, 1);
} filter_by_prefix_map SEC(".maps");

/// Whether we should filter by path prefix or not.
__always_inline static bool filter_by_prefix() {
unsigned int zero = 0;
char* res = bpf_map_lookup_elem(&filter_by_prefix_map, &zero);

// The NULL check is simply here to satisfy some verifiers, the result
// will never actually be NULL.
return res == NULL || *res != 0;
}

struct {
__uint(type, BPF_MAP_TYPE_LPM_TRIE);
__type(key, struct path_prefix_t);
Expand Down
51 changes: 34 additions & 17 deletions fact/src/bpf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{io, path::PathBuf};

use anyhow::{bail, Context};
use aya::{
maps::{Array, HashMap, LpmTrie, MapData, PerCpuArray, RingBuf},
programs::Program,
maps::{HashMap, LpmTrie, MapData, PerCpuArray, RingBuf},
programs::{lsm::LsmLink, Program},
Btf, Ebpf,
};
use checks::Checks;
Expand Down Expand Up @@ -33,6 +33,8 @@ pub struct Bpf {
paths_config: watch::Receiver<Vec<PathBuf>>,

paths_globset: GlobSet,

links: Vec<LsmLink>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference, if we ever decide to use programs other than lsm, aya provides a Link trait that can be used here to store different types of links.

}

impl Bpf {
Expand Down Expand Up @@ -65,10 +67,11 @@ impl Bpf {
paths,
paths_config,
paths_globset: GlobSet::empty(),
links: Vec::new(),
};

bpf.load_paths()?;
bpf.load_progs(&btf)?;
bpf.load_paths()?;

Ok(bpf)
}
Expand Down Expand Up @@ -116,12 +119,16 @@ impl Bpf {
}

fn load_paths(&mut self) -> anyhow::Result<()> {
let paths_config = self.paths_config.borrow();
let Some(filter_by_prefix) = self.obj.map_mut("filter_by_prefix_map") else {
bail!("filter_by_prefix_map map not found");
};
let mut filter_by_prefix: Array<&mut MapData, c_char> = Array::try_from(filter_by_prefix)?;
filter_by_prefix.set(0, !paths_config.is_empty() as c_char, 0)?;
if self.paths_config.borrow().is_empty() {
self.detach_progs();
self.paths.clear();
self.paths_globset = GlobSet::empty();
return Ok(());
}

if self.links.is_empty() {
self.attach_progs()?;
}

let Some(path_prefix) = self.obj.map_mut("path_prefix") else {
bail!("path_prefix map not found");
Expand All @@ -130,6 +137,7 @@ impl Bpf {
LpmTrie::try_from(path_prefix)?;

// Add the new prefixes
let paths_config = self.paths_config.borrow();
let mut new_paths = Vec::with_capacity(paths_config.len());
let mut builder = GlobSetBuilder::new();
for p in paths_config.iter() {
Expand Down Expand Up @@ -175,16 +183,28 @@ impl Bpf {
Ok(())
}

/// Attaches all BPF programs. If any attach fails, all previously
/// attached programs are automatically detached via drop.
fn attach_progs(&mut self) -> anyhow::Result<()> {
for (_, prog) in self.obj.programs_mut() {
match prog {
Program::Lsm(prog) => prog.attach()?,
self.links = self
.obj
.programs_mut()
.map(|(_, prog)| match prog {
Program::Lsm(prog) => {
let link_id = prog.attach()?;
prog.take_link(link_id)
}
u => unimplemented!("{u:?}"),
};
}
})
.collect::<Result<_, _>>()?;
Ok(())
}

/// Detaches all BPF programs by dropping owned links.
fn detach_progs(&mut self) {
self.links.clear();
}

// Gather events from the ring buffer and print them out.
pub fn start(
mut self,
Expand All @@ -194,9 +214,6 @@ impl Bpf {
info!("Starting BPF worker...");

tokio::spawn(async move {
self.attach_progs()
.context("Failed to attach ebpf programs")?;

let rb = self.take_ringbuffer()?;
let mut fd = AsyncFd::new(rb)?;

Expand Down
23 changes: 14 additions & 9 deletions tests/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from concurrent import futures
from collections import deque
import json
from threading import Event
from threading import Event as ThreadingEvent
from time import sleep

import grpc
Expand All @@ -24,7 +24,7 @@ def __init__(self):
sfa_iservice_pb2_grpc.FileActivityService.__init__(self)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
self.queue = deque()
self.running = Event()
self.running = ThreadingEvent()
self.executor = futures.ThreadPoolExecutor(max_workers=2)

def Communicate(self, request_iterator, context):
Expand Down Expand Up @@ -81,8 +81,8 @@ def is_running(self):
"""
return self.running.is_set()

def _wait_events(self, events: list[Event], strict: bool):
while self.is_running():
def _wait_events(self, events: list['Event'], strict: bool, cancel: ThreadingEvent):
while self.is_running() and not cancel.is_set():
msg = self.get_next()
if msg is None:
sleep(0.5)
Expand All @@ -99,19 +99,24 @@ def _wait_events(self, events: list[Event], strict: bool):
elif strict:
raise ValueError(json.dumps(diff, indent=4))

def wait_events(self, events: list[Event], strict: bool = True):
def wait_events(self, events: list['Event'], strict: bool = True):
"""
Continuously checks the server for incoming events until the
specified events are found.

Args:
server: The server instance to retrieve events from.
event (list[Event]): The events to search for.
events (list['Event']): The events to search for.
strict (bool): Fail if an unexpected event is detected.

Raises:
TimeoutError: If the required events are not found in 5 seconds.
"""
print('Waiting for events:', *events, sep='\n')
fs = self.executor.submit(self._wait_events, events, strict)
fs.result(timeout=5)
cancel = ThreadingEvent()
fs = self.executor.submit(self._wait_events, events, strict, cancel)
try:
fs.result(timeout=5)
except TimeoutError:
raise
finally:
cancel.set()
72 changes: 69 additions & 3 deletions tests/test_config_hotreload.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def assert_endpoint(endpoint, status_code=200):
assert resp.status_code == status_code


def reload_config(fact, config, file, delay=0.1):
def reload_config(fact, config, file, delay=0.5):
with open(file, 'w') as f:
yaml.dump(config, f)
fact.kill('SIGHUP')
Expand Down Expand Up @@ -137,7 +137,7 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server):

config, config_file = fact_config
config['paths'] = [f'{ignored_dir}/**/*']
reload_config(fact, config, config_file, delay=0.5)
reload_config(fact, config, config_file)

# At this point, the event in the ignored directory should show up
# and the event on the monitored directory should be ignored
Expand All @@ -154,6 +154,72 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server):
server.wait_events([e])


def test_no_paths_then_add(fact, fact_config, monitored_dir, server):
"""
Start with no paths configured, verify no events are produced,
then add paths via hot-reload and verify events appear.
"""
p = Process.from_proc()

# Remove all paths
config, config_file = fact_config
config['paths'] = []
reload_config(fact, config, config_file)

# Write to a file — should NOT produce events
fut = os.path.join(monitored_dir, 'test2.txt')
with open(fut, 'w') as f:
f.write('This should be ignored')
sleep(1)

e = Event(process=p, event_type=EventType.OPEN,
file=fut, host_path=fut)

with pytest.raises(TimeoutError):
server.wait_events([e])

# Add paths back
config['paths'] = [f'{monitored_dir}/**/*']
reload_config(fact, config, config_file)

# Write to a file — should produce events
with open(fut, 'w') as f:
f.write('This should be detected')

server.wait_events([e])


def test_paths_then_remove(fact, fact_config, monitored_dir, server):
"""
Start with paths configured, verify events are produced,
then remove all paths via hot-reload and verify events stop.
"""
p = Process.from_proc()

# Write to a file — should produce events
fut = os.path.join(monitored_dir, 'test2.txt')
with open(fut, 'w') as f:
f.write('This is a test')

e = Event(process=p, event_type=EventType.CREATION,
file=fut, host_path='')

server.wait_events([e])

# Remove all paths
config, config_file = fact_config
config['paths'] = []
reload_config(fact, config, config_file)

# Write to a file — should NOT produce events
with open(fut, 'w') as f:
f.write('This should be ignored')
sleep(1)

with pytest.raises(TimeoutError):
server.wait_events([e])


def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server):
p = Process.from_proc()

Expand All @@ -174,7 +240,7 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server):

config, config_file = fact_config
config['paths'] = [f'{monitored_dir}/**/*', f'{ignored_dir}/**/*']
reload_config(fact, config, config_file, delay=0.5)
reload_config(fact, config, config_file)

# At this point, the event in the ignored directory should show up
# alongside the regular event
Expand Down
Loading