diff --git a/src/copilot_usage/docs/architecture.md b/src/copilot_usage/docs/architecture.md index d495dee0..39b27d80 100644 --- a/src/copilot_usage/docs/architecture.md +++ b/src/copilot_usage/docs/architecture.md @@ -48,7 +48,7 @@ Monorepo containing Python CLI utilities that share tooling, CI, and common depe ### Event Processing Pipeline 1. **Discovery** — `discover_sessions()` scans `~/.copilot/session-state/*/events.jsonl`, returns paths sorted by modification time -2. **Parsing** — `parse_events()` reads each line as JSON, creates `SessionEvent` objects via Pydantic validation. Malformed lines are skipped with a warning. +2. **Parsing** — `_parse_events_from_offset()` reads each line as JSON in binary mode, creates `SessionEvent` objects via Pydantic validation. The production pipeline accesses this through `get_cached_events()`, which caches results and supports incremental byte-offset parsing for append-only file growth. The public `parse_events()` delegates to the same implementation with `include_partial_tail=True` for one-shot full-file reads. Malformed lines are skipped with a warning. 3. **Typed dispatch** — callers use the narrowly-typed `as_*()` accessors (`as_session_start()`, `as_assistant_message()`, etc.) on `SessionEvent` to get a validated payload for each known event type. Unknown event types still validate as `SessionEvent`, but normal processing ignores them unless a caller explicitly validates `data` with `GenericEventData`. 4. **Summarization** — `build_session_summary()` orchestrates four focused helpers: - `_first_pass()`: single pass over events — extracts session metadata from `session.start`, counts raw events (model calls, user messages, output tokens), collects all shutdown data diff --git a/src/copilot_usage/parser.py b/src/copilot_usage/parser.py index 54889a32..9f0729a8 100644 --- a/src/copilot_usage/parser.py +++ b/src/copilot_usage/parser.py @@ -210,7 +210,10 @@ def _insert_events_entry( def _parse_events_from_offset( - events_path: Path, offset: int + events_path: Path, + offset: int, + *, + include_partial_tail: bool = False, ) -> tuple[list[SessionEvent], int]: """Parse events from *events_path* starting at byte *offset*. @@ -223,6 +226,12 @@ def _parse_events_from_offset( parsing — the returned *safe_end* does not advance past them so the caller can retry on the next refresh. + When *include_partial_tail* is ``True`` (used by :func:`parse_events` + for one-shot full parsing), this special case is disabled for a final + non-newline-terminated line that fails JSON decoding: the line is + warned about and skipped, and ``safe_end`` advances past it. Valid + final lines are parsed and returned regardless of this flag. + Returns: ``(new_events, safe_end)`` where *safe_end* is the byte position after the last fully consumed line. Callers should @@ -250,7 +259,7 @@ def _parse_events_from_offset( except ValidationError as exc: errors = exc.errors(include_url=False) if errors and errors[0].get("type") == "json_invalid": - if not raw_line.endswith(b"\n"): + if not raw_line.endswith(b"\n") and not include_partial_tail: break logger.warning( "{}:offset {} — malformed JSON, skipping", @@ -644,52 +653,26 @@ def discover_sessions(base_path: Path | None = None) -> list[Path]: def parse_events(events_path: Path) -> list[SessionEvent]: """Parse an ``events.jsonl`` file into a list of :class:`SessionEvent`. - Uses ``SessionEvent.model_validate_json`` (Rust-based parser) for each - line, bypassing the intermediate ``dict`` allocation of - ``json.loads`` + ``model_validate``. + Delegates to :func:`_parse_events_from_offset` (the same binary-mode + implementation used by the production cache layer) with + ``include_partial_tail=True`` to match full-file parsing behavior, + including the more permissive handling of a malformed partial tail at + EOF. Lines that fail JSON decoding or Pydantic validation are skipped with a warning. - If a UTF-8 decode error occurs while reading the file, parsing stops - early and the events parsed so far are returned (a partial session). + If a ``UnicodeDecodeError`` is raised during parsing (e.g. from a + Pydantic validator), parsing stops early and the events parsed so + far are returned as a partial session. Raises: OSError: If the file cannot be opened or read (e.g., deleted between discovery and parsing, or I/O error while streaming). - UnicodeDecodeError is caught internally; callers only need to - handle OSError. + ``UnicodeDecodeError`` is caught internally; callers only + need to handle ``OSError``. """ - events: list[SessionEvent] = [] - try: - with events_path.open(encoding="utf-8") as fh: - for lineno, line in enumerate(fh, start=1): - if not line or line.isspace(): - continue - try: - events.append(SessionEvent.model_validate_json(line)) - except ValidationError as exc: - errors = exc.errors(include_url=False) - if errors and errors[0].get("type") == "json_invalid": - logger.warning( - "{}:{} — malformed JSON, skipping", - events_path, - lineno, - ) - else: - logger.warning( - "{}:{} — validation error ({}), skipping", - events_path, - lineno, - exc.error_count(), - ) - except UnicodeDecodeError as exc: - logger.warning( - "{} — UTF-8 decode error while reading; returning {} parsed events so far (partial session): {}", - events_path, - len(events), - exc, - ) + events, _ = _parse_events_from_offset(events_path, 0, include_partial_tail=True) return events diff --git a/tests/copilot_usage/test_parser.py b/tests/copilot_usage/test_parser.py index 6fded77d..d8ae268d 100644 --- a/tests/copilot_usage/test_parser.py +++ b/tests/copilot_usage/test_parser.py @@ -317,7 +317,7 @@ def _clear_session_cache() -> None: def _write_events(path: Path, *lines: str) -> Path: """Write event lines to an events.jsonl file and return the path.""" path.parent.mkdir(parents=True, exist_ok=True) - path.write_text("\n".join(lines) + "\n", encoding="utf-8") + path.write_text("\n".join(lines) + "\n", encoding="utf-8", newline="\n") return path @@ -1500,10 +1500,10 @@ def test_validation_error_non_json_invalid_logs_warning( # error_count() must be passed as positional arg after format string assert warning_spy.call_args.args[3] == expected_error_count - def test_validation_error_warning_includes_file_and_lineno( + def test_validation_error_warning_includes_file_and_offset( self, tmp_path: Path ) -> None: - """Validation-error warning includes the file path and line number.""" + """Validation-error warning includes the file path and byte offset.""" bad_event = json.dumps({"no_type_field": True}) p = tmp_path / "s" / "events.jsonl" _write_events(p, _START_EVENT, bad_event) @@ -1512,10 +1512,12 @@ def test_validation_error_warning_includes_file_and_lineno( warning_spy.assert_called_once() args = warning_spy.call_args.args assert args[1] == p # file path - assert args[2] == 2 # line number (bad event is the second line) + # byte offset of the second line (after _START_EVENT + newline separator) + expected_offset = len(_START_EVENT.encode("utf-8")) + 1 + assert args[2] == expected_offset def test_multiple_validation_errors_each_warned(self, tmp_path: Path) -> None: - """Two bad lines emit two separate warnings with correct line numbers.""" + """Two bad lines emit two separate warnings with correct byte offsets.""" bad1 = json.dumps({"no_type_field": True}) bad2 = json.dumps({"also_invalid": 42}) p = tmp_path / "s" / "events.jsonl" @@ -1524,15 +1526,20 @@ def test_multiple_validation_errors_each_warned(self, tmp_path: Path) -> None: events = parse_events(p) assert len(events) == 1 # only the start event survives assert warning_spy.call_count == 2 - # Each warning should reference its own line number - line_numbers = [call.args[2] for call in warning_spy.call_args_list] - assert line_numbers == [2, 3] + # Each warning should reference its own byte offset + offsets = [call.args[2] for call in warning_spy.call_args_list] + offset1 = len(_START_EVENT.encode("utf-8")) + 1 + offset2 = offset1 + len(bad1.encode("utf-8")) + 1 + assert offsets == [offset1, offset2] - def test_unicode_decode_error_returns_partial(self, tmp_path: Path) -> None: - """events.jsonl with invalid UTF-8 bytes returns what was parsed so far. + def test_invalid_utf8_line_skipped_valid_event_returned( + self, tmp_path: Path + ) -> None: + """Invalid UTF-8 bytes on a line are skipped; prior valid events returned. - Due to buffered I/O, the UnicodeDecodeError may fire before any - lines are yielded, so the result is typically an empty list. + In binary mode, invalid UTF-8 bytes cause a JSON parse failure + rather than a ``UnicodeDecodeError``, so the affected line is + skipped and valid lines are still returned. """ p = tmp_path / "s" / "events.jsonl" p.parent.mkdir(parents=True, exist_ok=True) @@ -1541,29 +1548,27 @@ def test_unicode_decode_error_returns_partial(self, tmp_path: Path) -> None: bad_line = b"\xff\xfe invalid utf-8 bytes\n" p.write_bytes(valid_line + bad_line) events = parse_events(p) - # Should return what was parsed before the error and not raise. + # The bad line is skipped; the valid start event always survives. assert isinstance(events, list) - # 0 or 1 events may be returned depending on buffered I/O behavior. - assert len(events) <= 1 - if events: - # If any event is returned, it should be the initial session.start. - assert len(events) == 1 - assert events[0].type == "session.start" + assert len(events) == 1 + assert events[0].type == "session.start" - def test_unicode_decode_error_full_file(self, tmp_path: Path) -> None: - """events.jsonl that is entirely invalid UTF-8 returns empty list.""" + def test_entirely_invalid_utf8_returns_empty_list(self, tmp_path: Path) -> None: + """events.jsonl that is entirely invalid UTF-8 bytes returns empty list.""" p = tmp_path / "s" / "events.jsonl" p.parent.mkdir(parents=True, exist_ok=True) p.write_bytes(b"\xff\xfe\x80\x81\x82") events = parse_events(p) assert events == [] - def test_unicode_decode_error_returns_partial_results(self, tmp_path: Path) -> None: - """Valid events before an invalid UTF-8 sequence are returned. + def test_embedded_invalid_utf8_skips_malformed_line(self, tmp_path: Path) -> None: + """Embedded invalid UTF-8 bytes produce a malformed-JSON skip, not a crash. - Python's TextIOWrapper reads in buffer-sized chunks, so the valid - content must exceed one buffer to guarantee the first lines are - yielded before the decode error fires on the next chunk. + Because ``parse_events`` delegates to ``_parse_events_from_offset`` + (binary mode), raw invalid UTF-8 bytes don't raise + ``UnicodeDecodeError`` at the file-I/O layer. Instead the + affected line fails JSON parsing and is skipped; lines on both + sides of the bad bytes still parse successfully. """ p = tmp_path / "s" / "events.jsonl" p.parent.mkdir(parents=True, exist_ok=True) @@ -1573,23 +1578,23 @@ def test_unicode_decode_error_returns_partial_results(self, tmp_path: Path) -> N # First block: repeat valid lines enough to exceed the default read buffer. first_repeat = (io.DEFAULT_BUFFER_SIZE // len(valid_line)) + 2 first_block = valid_line * first_repeat - # Second block: additional valid lines that should never be returned. + # Second block: additional valid lines after the bad bytes. second_repeat = 5 second_block = valid_line * second_repeat - total_valid_lines = first_repeat + second_repeat - # Insert invalid UTF-8 bytes between the two valid blocks so the decode - # error occurs in the middle of the file, after some events were yielded. + # Insert invalid UTF-8 bytes between the two valid blocks. + # Because ``first_block`` ends with a newline, these bytes + # prefix the next line, corrupting the first line of + # ``second_block`` so it becomes malformed JSON and is skipped. invalid_bytes = b"\xff\xfe" p.write_bytes(first_block + invalid_bytes + second_block) result = parse_events(p) - # Partial parse: at least the first event must survive. assert isinstance(result, list) assert len(result) >= 1 assert result[0].type == EventType.SESSION_START - # Not everything was returned (error cut parsing short in the middle). - assert len(result) < total_valid_lines - # All returned events should be from the first valid block. - assert len(result) <= first_repeat + # Binary mode recovers: all valid lines parse, only the one + # line containing the invalid bytes is skipped. + total_valid_lines = first_repeat + second_repeat + assert len(result) == total_valid_lines - 1 class TestParseEventsModelValidateJson: @@ -9765,6 +9770,56 @@ def _boom( assert safe_end == len(line1.encode("utf-8")) +class TestParseEventsPartialTailParity: + """Regression: parse_events and get_cached_events agree on partial-tail files.""" + + def test_no_trailing_newline_same_events(self, tmp_path: Path) -> None: + """File whose last line is valid JSON without trailing newline. + + Both ``parse_events`` (one-shot) and ``get_cached_events`` (cache + layer) must return the same set of events, including the + partial-tail line. + """ + p = tmp_path / "sess" / "events.jsonl" + p.parent.mkdir(parents=True, exist_ok=True) + line1 = _make_event_line("session.start", "s1") + "\n" + line2 = _make_event_line("assistant.message", "m1") # no trailing newline + p.write_bytes(line1.encode("utf-8") + line2.encode("utf-8")) + + one_shot = parse_events(p) + cached = list(get_cached_events(p)) + + assert len(one_shot) == 2 + assert one_shot[0].type == "session.start" + assert one_shot[1].type == "assistant.message" + assert len(cached) == len(one_shot) + assert [e.type for e in cached] == [e.type for e in one_shot] + + def test_malformed_partial_tail_warned_and_skipped(self, tmp_path: Path) -> None: + """Malformed JSON final line without trailing newline is skipped with warning. + + When ``include_partial_tail=True``, a non-newline-terminated line + that fails JSON decoding should be warned about and skipped (not + trigger the early ``break``). ``safe_end`` must advance past it. + """ + p = tmp_path / "sess" / "events.jsonl" + p.parent.mkdir(parents=True, exist_ok=True) + line1 = _make_event_line("session.start", "s1") + "\n" + bad_tail = b"NOT-VALID-JSON{{{" # no trailing newline + p.write_bytes(line1.encode("utf-8") + bad_tail) + + with patch.object(_parser_module.logger, "warning") as warning_spy: + events, safe_end = _parse_events_from_offset( + p, 0, include_partial_tail=True + ) + + assert len(events) == 1 + assert events[0].type == "session.start" + assert safe_end == p.stat().st_size + warning_spy.assert_called_once() + assert "json" in warning_spy.call_args.args[0].lower() + + class TestIncrementalEventsCaching: """Tests for incremental cache behaviour in get_cached_events."""