Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/autoskillit/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from __future__ import annotations

import dataclasses
import inspect
import logging
import os
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
Expand Down Expand Up @@ -193,6 +195,25 @@ class LinuxTracingConfig:
log_dir: str = "" # empty = platform default (~/.local/share/autoskillit/logs on Linux)
tmpfs_path: str = "/dev/shm" # RAM-backed tmpfs for crash-resilient streaming

def __post_init__(self) -> None:
if self.tmpfs_path != "/dev/shm" or not os.environ.get("PYTEST_CURRENT_TEST"):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[warning] defense: The guard walks exactly two frames back (frame.f_back.f_back) to find the test caller. If LinuxTracingConfig is constructed via a helper factory, dataclasses.replace, a classmethod, or a one-liner wrapper, the test-file frame will be further than two levels up and the guard will silently not fire. The depth assumption is fragile and not documented in the error message.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated — this is intentional. The two-level frame walk is deliberately scoped to direct test callers only (commit 9e07dc5, comment on lines 200-202 documents this). Factory paths, from_dynaconf, and AutomationConfig default_factory are intentionally excluded: they construct LinuxTracingConfig with production defaults and should not be blocked. The 'fragility' is the feature: indirect paths bypass the guard by design.

return
# Only raise when called directly from test code — not from library machinery
# (e.g. AutomationConfig default_factory, from_dynaconf). We inspect the call
# frame two levels up: __post_init__ → __init__ (generated) → actual caller.
frame = inspect.currentframe()
init_frame = frame.f_back if frame is not None else None
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[warning] arch: The call-stack depth heuristic is fragile. The /tests/ check fires only if the caller is exactly two frames up. Indirect construction paths (fixtures that call helpers, default_factory in AutomationConfig, from_dynaconf) would bypass this guard silently, giving a false sense of safety while still writing to real /dev/shm.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated — this is intentional. The commit message 'fix: scope post_init guard to direct test callers only' (9e07dc5) explicitly defines the scope. Fixtures calling helpers and from_dynaconf bypassing the guard is the desired behavior — only direct test-code instantiation of LinuxTracingConfig(tmpfs_path='/dev/shm') should be blocked. The category: false_positive_intentional_pattern.

caller = init_frame.f_back if init_frame is not None else None
if caller is not None and "/tests/" in (caller.f_code.co_filename or ""):
raise RuntimeError(
"LinuxTracingConfig.tmpfs_path is '/dev/shm' but PYTEST_CURRENT_TEST "
"is set — this test would write to the real shared tmpfs and pollute "
"production state. Override tmpfs_path with a test-local path, e.g.: "
"LinuxTracingConfig(tmpfs_path=str(tmp_path)). "
"Use the isolated_tracing_config fixture for new tests."
)
del frame, init_frame, caller


@dataclass
class McpResponseConfig:
Expand Down
91 changes: 91 additions & 0 deletions src/autoskillit/execution/linux_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import dataclasses
import json
import os
import sys
import tempfile
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
Expand All @@ -29,9 +31,13 @@
import anyio.abc
import psutil

from autoskillit.core import get_logger

if TYPE_CHECKING:
from autoskillit.config import LinuxTracingConfig

logger = get_logger(__name__)

LINUX_TRACING_AVAILABLE = sys.platform == "linux"


Expand All @@ -58,6 +64,59 @@ def read_starttime_ticks(pid: int) -> int | None:
return None


@dataclass(frozen=True)
class TraceEnrollmentRecord:
"""Identity triple written atomically at trace-open time.

(boot_id, pid, starttime_ticks) together form a collision-resistant identity:
- boot_id rejects pre-reboot stale files
- starttime_ticks detects PID recycling
"""

schema_version: int # always 1
pid: int
boot_id: str | None # read_boot_id(); None if /proc unavailable
starttime_ticks: int | None # read_starttime_ticks(pid); None if unavailable
session_id: str # caller-provided; "" if not yet resolved
enrolled_at: str # ISO 8601 UTC
kitchen_id: str # ""
order_id: str # ""


def _write_enrollment_atomic(path: Path, record: TraceEnrollmentRecord) -> None:
"""Write enrollment sidecar atomically using tempfile + os.replace."""
content = json.dumps(dataclasses.asdict(record))
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
os.replace(tmp, path)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise


def read_enrollment(path: Path) -> TraceEnrollmentRecord | None:
"""Read and validate an enrollment sidecar. Returns None on any error."""
try:
data = json.loads(path.read_text(encoding="utf-8"))
return TraceEnrollmentRecord(
schema_version=data["schema_version"],
pid=data["pid"],
boot_id=data.get("boot_id"),
starttime_ticks=data.get("starttime_ticks"),
session_id=data.get("session_id", ""),
enrolled_at=data.get("enrolled_at", ""),
kitchen_id=data.get("kitchen_id", ""),
order_id=data.get("order_id", ""),
)
except (OSError, KeyError, TypeError, ValueError, json.JSONDecodeError):
return None


@dataclass(frozen=True)
class ProcSnapshot:
"""Point-in-time snapshot of process state."""
Expand Down Expand Up @@ -200,6 +259,7 @@ class LinuxTracingHandle:
_snapshots: list[ProcSnapshot] = field(default_factory=list)
_trace_path: Path | None = field(default=None)
_trace_file: IO[str] | None = field(default=None)
_enrollment_path: Path | None = field(default=None)

def stop(self) -> list[ProcSnapshot]:
"""Stop tracing, flush and close the trace file, return accumulated snapshots."""
Expand All @@ -212,13 +272,25 @@ def stop(self) -> list[ProcSnapshot]:
except OSError:
pass
self._trace_file = None
if self._trace_path is not None:
# Intentional: stop() cleans up its own trace file. Crash-recovery only reads
# files left behind by processes that never called stop() — so this is correct.
self._trace_path.unlink(missing_ok=True)
self._trace_path = None
if self._enrollment_path is not None:
self._enrollment_path.unlink(missing_ok=True)
self._enrollment_path = None
return list(self._snapshots)


def start_linux_tracing(
pid: int,
config: LinuxTracingConfig,
tg: anyio.abc.TaskGroup | None,
*,
session_id: str = "",
kitchen_id: str = "",
order_id: str = "",
) -> LinuxTracingHandle | None:
"""Start Linux tracing if all gates pass. Returns handle or None."""
if not LINUX_TRACING_AVAILABLE or not config.enabled:
Expand All @@ -240,6 +312,25 @@ def start_linux_tracing(
handle._trace_path = None
handle._trace_file = None

# Write enrollment sidecar atomically for crash-recovery identity contract
enrollment_path = tmpfs / f"autoskillit_enrollment_{pid}.json"
try:
record = TraceEnrollmentRecord(
schema_version=1,
pid=pid,
boot_id=read_boot_id(),
starttime_ticks=read_starttime_ticks(pid),
session_id=session_id,
enrolled_at=datetime.now(UTC).isoformat(),
kitchen_id=kitchen_id,
order_id=order_id,
)
_write_enrollment_atomic(enrollment_path, record)
handle._enrollment_path = enrollment_path
except OSError as e:
logger.warning("Failed to write enrollment sidecar for pid %d: %s", pid, e)
handle._enrollment_path = None

async def _run_monitor() -> None:
with scope:
async for snap in proc_monitor(pid, config.proc_interval):
Expand Down
50 changes: 39 additions & 11 deletions src/autoskillit/execution/session_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
from pathlib import Path
from typing import Any

import psutil

from autoskillit.core import atomic_write, claude_code_log_path, get_logger
from autoskillit.execution.anomaly_detection import detect_anomalies
from autoskillit.execution.linux_tracing import (
read_boot_id,
read_enrollment,
read_starttime_ticks,
)

logger = get_logger(__name__)

Expand Down Expand Up @@ -315,6 +322,7 @@ def recover_crashed_sessions(tmpfs_path: str = "/dev/shm", log_dir: str = "") ->
return 0

count = 0
current_boot_id = read_boot_id()
for trace_file in sorted(tmpfs.glob("autoskillit_trace_*.jsonl")):
# Skip files modified within the last 30 seconds — may be active
try:
Expand All @@ -324,7 +332,35 @@ def recover_crashed_sessions(tmpfs_path: str = "/dev/shm", log_dir: str = "") ->
if age_seconds < 30:
continue

# Read snapshots
# Extract PID from filename: autoskillit_trace_{pid}.jsonl
try:
pid = int(trace_file.stem.split("_")[-1])
except (ValueError, IndexError):
pid = -1

# Gate 1: Enrollment sidecar must exist — no sidecar means alien/test file
enrollment_path = tmpfs / f"autoskillit_enrollment_{pid}.json"
enrollment = read_enrollment(enrollment_path)
if enrollment is None:
logger.debug("Skipping %s: no enrollment sidecar", trace_file.name)
continue

# Gate 2: Boot ID must match current boot — mismatch means pre-reboot stale file
if current_boot_id and enrollment.boot_id and enrollment.boot_id != current_boot_id:
logger.debug("Skipping %s: boot_id mismatch", trace_file.name)
trace_file.unlink(missing_ok=True)
enrollment_path.unlink(missing_ok=True)
continue

# Gate 3: PID liveness + starttime_ticks identity
if psutil.pid_exists(pid):
current_ticks = read_starttime_ticks(pid)
if current_ticks is not None and current_ticks == enrollment.starttime_ticks:
logger.debug("Skipping %s: PID %d still alive", trace_file.name, pid)
continue
# PID recycled — original process is gone, treat as crash

# All gates passed — read snapshots and emit crashed row
snapshots: list[dict[str, object]] = []
try:
for line in trace_file.read_text().splitlines():
Expand All @@ -335,12 +371,6 @@ def recover_crashed_sessions(tmpfs_path: str = "/dev/shm", log_dir: str = "") ->
except OSError:
continue

# Extract PID from filename: autoskillit_trace_{pid}.jsonl
try:
pid = int(trace_file.stem.split("_")[-1])
except (ValueError, IndexError):
pid = 0

# Compute start_ts from file mtime
try:
mtime_ts = datetime.fromtimestamp(trace_file.stat().st_mtime, tz=UTC).isoformat()
Expand All @@ -365,10 +395,8 @@ def recover_crashed_sessions(tmpfs_path: str = "/dev/shm", log_dir: str = "") ->
logger.debug("recover_crashed_sessions: failed to finalize %s", trace_file)
continue

try:
trace_file.unlink()
except OSError:
pass
trace_file.unlink(missing_ok=True)
enrollment_path.unlink(missing_ok=True)

count += 1

Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def tool_ctx(monkeypatch, tmp_path):
ctx = make_context(AutomationConfig(), runner=mock_runner, plugin_dir=str(tmp_path))
ctx.gate = DefaultGateState(enabled=True)
ctx.config.linux_tracing.log_dir = str(tmp_path / "session_logs")
ctx.config.linux_tracing.tmpfs_path = str(tmp_path / "shm")
# Anchor temp_dir to tmp_path so server tools that read from ctx.temp_dir
# (e.g. _apply_triage_gate's staleness cache) write under the per-test
# tmp directory rather than the cwd captured at fixture-init time.
Expand Down
15 changes: 15 additions & 0 deletions tests/execution/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

from __future__ import annotations

import pathlib
import textwrap

import pytest

from autoskillit.config.settings import LinuxTracingConfig

# Simulates Claude CLI process that writes a result line then hangs.
# Used by test_process_channel_b.py and test_process_monitor.py.
WRITE_RESULT_THEN_HANG_SCRIPT = textwrap.dedent("""\
Expand All @@ -14,3 +19,13 @@
sys.stdout.flush()
time.sleep(3600)
""")


@pytest.fixture
def isolated_tracing_config(tmp_path: pathlib.Path) -> LinuxTracingConfig:
"""Pre-isolated LinuxTracingConfig for tracing tests.
Always writes to a tmp_path subdir, never to the real /dev/shm.
Use this fixture for all new tests that need a LinuxTracingConfig."""
shm = tmp_path / "shm"
shm.mkdir(parents=True, exist_ok=True)
return LinuxTracingConfig(enabled=True, proc_interval=0.05, tmpfs_path=str(shm))
Loading
Loading