Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/copilot_usage/docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Monorepo containing Python CLI utilities that share tooling, CI, and common depe
### Event Processing Pipeline

1. **Discovery** — `discover_sessions()` scans `~/.copilot/session-state/*/events.jsonl`, returns paths sorted by modification time
2. **Parsing** — `parse_events()` reads each line as JSON, creates `SessionEvent` objects via Pydantic validation. Malformed lines are skipped with a warning.
2. **Parsing** — `_parse_events_from_offset()` reads each line as JSON in binary mode, creates `SessionEvent` objects via Pydantic validation. The production pipeline accesses this through `get_cached_events()`, which caches results and supports incremental byte-offset parsing for append-only file growth. The public `parse_events()` delegates to the same implementation with `include_partial_tail=True` for one-shot full-file reads. Malformed lines are skipped with a warning.
3. **Typed dispatch** — callers use the narrowly-typed `as_*()` accessors (`as_session_start()`, `as_assistant_message()`, etc.) on `SessionEvent` to get a validated payload for each known event type. Unknown event types still validate as `SessionEvent`, but normal processing ignores them unless a caller explicitly validates `data` with `GenericEventData`.
4. **Summarization** — `build_session_summary()` orchestrates four focused helpers:
- `_first_pass()`: single pass over events — extracts session metadata from `session.start`, counts raw events (model calls, user messages, output tokens), collects all shutdown data
Expand Down
61 changes: 22 additions & 39 deletions src/copilot_usage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ def _insert_events_entry(


def _parse_events_from_offset(
events_path: Path, offset: int
events_path: Path,
offset: int,
*,
include_partial_tail: bool = False,
) -> tuple[list[SessionEvent], int]:
"""Parse events from *events_path* starting at byte *offset*.

Expand All @@ -223,6 +226,12 @@ def _parse_events_from_offset(
parsing — the returned *safe_end* does not advance past them so
the caller can retry on the next refresh.

When *include_partial_tail* is ``True`` (used by :func:`parse_events`
for one-shot full parsing), this special case is disabled for a final
non-newline-terminated line that fails JSON decoding: the line is
warned about and skipped, and ``safe_end`` advances past it. Valid
final lines are parsed and returned regardless of this flag.

Returns:
``(new_events, safe_end)`` where *safe_end* is the byte
position after the last fully consumed line. Callers should
Expand Down Expand Up @@ -250,7 +259,7 @@ def _parse_events_from_offset(
except ValidationError as exc:
errors = exc.errors(include_url=False)
if errors and errors[0].get("type") == "json_invalid":
if not raw_line.endswith(b"\n"):
if not raw_line.endswith(b"\n") and not include_partial_tail:
break
logger.warning(
"{}:offset {} — malformed JSON, skipping",
Expand Down Expand Up @@ -644,52 +653,26 @@ def discover_sessions(base_path: Path | None = None) -> list[Path]:
def parse_events(events_path: Path) -> list[SessionEvent]:
"""Parse an ``events.jsonl`` file into a list of :class:`SessionEvent`.

Uses ``SessionEvent.model_validate_json`` (Rust-based parser) for each
line, bypassing the intermediate ``dict`` allocation of
``json.loads`` + ``model_validate``.
Delegates to :func:`_parse_events_from_offset` (the same binary-mode
implementation used by the production cache layer) with
``include_partial_tail=True`` to match full-file parsing behavior,
including the more permissive handling of a malformed partial tail at
EOF.

Lines that fail JSON decoding or Pydantic validation are skipped with
a warning.

If a UTF-8 decode error occurs while reading the file, parsing stops
early and the events parsed so far are returned (a partial session).
If a ``UnicodeDecodeError`` is raised during parsing (e.g. from a
Pydantic validator), parsing stops early and the events parsed so
far are returned as a partial session.

Raises:
OSError: If the file cannot be opened or read (e.g., deleted
between discovery and parsing, or I/O error while streaming).
UnicodeDecodeError is caught internally; callers only need to
handle OSError.
``UnicodeDecodeError`` is caught internally; callers only
need to handle ``OSError``.
"""
Comment thread
microsasa marked this conversation as resolved.
events: list[SessionEvent] = []
try:
with events_path.open(encoding="utf-8") as fh:
for lineno, line in enumerate(fh, start=1):
if not line or line.isspace():
continue
try:
events.append(SessionEvent.model_validate_json(line))
except ValidationError as exc:
errors = exc.errors(include_url=False)
if errors and errors[0].get("type") == "json_invalid":
logger.warning(
"{}:{} — malformed JSON, skipping",
events_path,
lineno,
)
else:
logger.warning(
"{}:{} — validation error ({}), skipping",
events_path,
lineno,
exc.error_count(),
)
except UnicodeDecodeError as exc:
logger.warning(
"{} — UTF-8 decode error while reading; returning {} parsed events so far (partial session): {}",
events_path,
len(events),
exc,
)
events, _ = _parse_events_from_offset(events_path, 0, include_partial_tail=True)
return events


Expand Down
125 changes: 90 additions & 35 deletions tests/copilot_usage/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _clear_session_cache() -> None:
def _write_events(path: Path, *lines: str) -> Path:
"""Write event lines to an events.jsonl file and return the path."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
path.write_text("\n".join(lines) + "\n", encoding="utf-8", newline="\n")
return path


Expand Down Expand Up @@ -1500,10 +1500,10 @@ def test_validation_error_non_json_invalid_logs_warning(
# error_count() must be passed as positional arg after format string
assert warning_spy.call_args.args[3] == expected_error_count

def test_validation_error_warning_includes_file_and_lineno(
def test_validation_error_warning_includes_file_and_offset(
self, tmp_path: Path
) -> None:
"""Validation-error warning includes the file path and line number."""
"""Validation-error warning includes the file path and byte offset."""
bad_event = json.dumps({"no_type_field": True})
p = tmp_path / "s" / "events.jsonl"
_write_events(p, _START_EVENT, bad_event)
Expand All @@ -1512,10 +1512,12 @@ def test_validation_error_warning_includes_file_and_lineno(
warning_spy.assert_called_once()
args = warning_spy.call_args.args
assert args[1] == p # file path
assert args[2] == 2 # line number (bad event is the second line)
# byte offset of the second line (after _START_EVENT + newline separator)
expected_offset = len(_START_EVENT.encode("utf-8")) + 1
assert args[2] == expected_offset
Comment thread
microsasa marked this conversation as resolved.

def test_multiple_validation_errors_each_warned(self, tmp_path: Path) -> None:
"""Two bad lines emit two separate warnings with correct line numbers."""
"""Two bad lines emit two separate warnings with correct byte offsets."""
bad1 = json.dumps({"no_type_field": True})
bad2 = json.dumps({"also_invalid": 42})
p = tmp_path / "s" / "events.jsonl"
Expand All @@ -1524,15 +1526,20 @@ def test_multiple_validation_errors_each_warned(self, tmp_path: Path) -> None:
events = parse_events(p)
assert len(events) == 1 # only the start event survives
assert warning_spy.call_count == 2
# Each warning should reference its own line number
line_numbers = [call.args[2] for call in warning_spy.call_args_list]
assert line_numbers == [2, 3]
# Each warning should reference its own byte offset
offsets = [call.args[2] for call in warning_spy.call_args_list]
offset1 = len(_START_EVENT.encode("utf-8")) + 1
offset2 = offset1 + len(bad1.encode("utf-8")) + 1
assert offsets == [offset1, offset2]

def test_unicode_decode_error_returns_partial(self, tmp_path: Path) -> None:
"""events.jsonl with invalid UTF-8 bytes returns what was parsed so far.
def test_invalid_utf8_line_skipped_valid_event_returned(
self, tmp_path: Path
) -> None:
"""Invalid UTF-8 bytes on a line are skipped; prior valid events returned.

Due to buffered I/O, the UnicodeDecodeError may fire before any
lines are yielded, so the result is typically an empty list.
In binary mode, invalid UTF-8 bytes cause a JSON parse failure
rather than a ``UnicodeDecodeError``, so the affected line is
skipped and valid lines are still returned.
"""
p = tmp_path / "s" / "events.jsonl"
p.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -1541,29 +1548,27 @@ def test_unicode_decode_error_returns_partial(self, tmp_path: Path) -> None:
bad_line = b"\xff\xfe invalid utf-8 bytes\n"
p.write_bytes(valid_line + bad_line)
events = parse_events(p)
# Should return what was parsed before the error and not raise.
# The bad line is skipped; the valid start event always survives.
assert isinstance(events, list)
# 0 or 1 events may be returned depending on buffered I/O behavior.
assert len(events) <= 1
if events:
# If any event is returned, it should be the initial session.start.
assert len(events) == 1
assert events[0].type == "session.start"
assert len(events) == 1
assert events[0].type == "session.start"

def test_unicode_decode_error_full_file(self, tmp_path: Path) -> None:
"""events.jsonl that is entirely invalid UTF-8 returns empty list."""
def test_entirely_invalid_utf8_returns_empty_list(self, tmp_path: Path) -> None:
"""events.jsonl that is entirely invalid UTF-8 bytes returns empty list."""
p = tmp_path / "s" / "events.jsonl"
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(b"\xff\xfe\x80\x81\x82")
events = parse_events(p)
assert events == []

def test_unicode_decode_error_returns_partial_results(self, tmp_path: Path) -> None:
"""Valid events before an invalid UTF-8 sequence are returned.
def test_embedded_invalid_utf8_skips_malformed_line(self, tmp_path: Path) -> None:
"""Embedded invalid UTF-8 bytes produce a malformed-JSON skip, not a crash.

Python's TextIOWrapper reads in buffer-sized chunks, so the valid
content must exceed one buffer to guarantee the first lines are
yielded before the decode error fires on the next chunk.
Because ``parse_events`` delegates to ``_parse_events_from_offset``
(binary mode), raw invalid UTF-8 bytes don't raise
``UnicodeDecodeError`` at the file-I/O layer. Instead the
affected line fails JSON parsing and is skipped; lines on both
sides of the bad bytes still parse successfully.
"""
p = tmp_path / "s" / "events.jsonl"
p.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -1573,23 +1578,23 @@ def test_unicode_decode_error_returns_partial_results(self, tmp_path: Path) -> N
# First block: repeat valid lines enough to exceed the default read buffer.
first_repeat = (io.DEFAULT_BUFFER_SIZE // len(valid_line)) + 2
first_block = valid_line * first_repeat
# Second block: additional valid lines that should never be returned.
# Second block: additional valid lines after the bad bytes.
second_repeat = 5
second_block = valid_line * second_repeat
total_valid_lines = first_repeat + second_repeat
# Insert invalid UTF-8 bytes between the two valid blocks so the decode
# error occurs in the middle of the file, after some events were yielded.
# Insert invalid UTF-8 bytes between the two valid blocks.
# Because ``first_block`` ends with a newline, these bytes
# prefix the next line, corrupting the first line of
# ``second_block`` so it becomes malformed JSON and is skipped.
invalid_bytes = b"\xff\xfe"
p.write_bytes(first_block + invalid_bytes + second_block)
result = parse_events(p)
# Partial parse: at least the first event must survive.
assert isinstance(result, list)
assert len(result) >= 1
assert result[0].type == EventType.SESSION_START
# Not everything was returned (error cut parsing short in the middle).
assert len(result) < total_valid_lines
# All returned events should be from the first valid block.
assert len(result) <= first_repeat
# Binary mode recovers: all valid lines parse, only the one
# line containing the invalid bytes is skipped.
total_valid_lines = first_repeat + second_repeat
assert len(result) == total_valid_lines - 1


class TestParseEventsModelValidateJson:
Expand Down Expand Up @@ -9765,6 +9770,56 @@ def _boom(
assert safe_end == len(line1.encode("utf-8"))


class TestParseEventsPartialTailParity:
"""Regression: parse_events and get_cached_events agree on partial-tail files."""

def test_no_trailing_newline_same_events(self, tmp_path: Path) -> None:
"""File whose last line is valid JSON without trailing newline.
Comment thread
microsasa marked this conversation as resolved.

Both ``parse_events`` (one-shot) and ``get_cached_events`` (cache
layer) must return the same set of events, including the
partial-tail line.
"""
p = tmp_path / "sess" / "events.jsonl"
p.parent.mkdir(parents=True, exist_ok=True)
line1 = _make_event_line("session.start", "s1") + "\n"
line2 = _make_event_line("assistant.message", "m1") # no trailing newline
p.write_bytes(line1.encode("utf-8") + line2.encode("utf-8"))

one_shot = parse_events(p)
cached = list(get_cached_events(p))

assert len(one_shot) == 2
assert one_shot[0].type == "session.start"
assert one_shot[1].type == "assistant.message"
assert len(cached) == len(one_shot)
assert [e.type for e in cached] == [e.type for e in one_shot]

def test_malformed_partial_tail_warned_and_skipped(self, tmp_path: Path) -> None:
"""Malformed JSON final line without trailing newline is skipped with warning.

When ``include_partial_tail=True``, a non-newline-terminated line
that fails JSON decoding should be warned about and skipped (not
trigger the early ``break``). ``safe_end`` must advance past it.
"""
p = tmp_path / "sess" / "events.jsonl"
p.parent.mkdir(parents=True, exist_ok=True)
line1 = _make_event_line("session.start", "s1") + "\n"
bad_tail = b"NOT-VALID-JSON{{{" # no trailing newline
p.write_bytes(line1.encode("utf-8") + bad_tail)

with patch.object(_parser_module.logger, "warning") as warning_spy:
events, safe_end = _parse_events_from_offset(
p, 0, include_partial_tail=True
)

assert len(events) == 1
assert events[0].type == "session.start"
assert safe_end == p.stat().st_size
warning_spy.assert_called_once()
assert "json" in warning_spy.call_args.args[0].lower()


class TestIncrementalEventsCaching:
"""Tests for incremental cache behaviour in get_cached_events."""

Expand Down
Loading