diff --git a/src/apm_cli/commands/_helpers.py b/src/apm_cli/commands/_helpers.py index 9179e48d2..9f21deef9 100644 --- a/src/apm_cli/commands/_helpers.py +++ b/src/apm_cli/commands/_helpers.py @@ -6,7 +6,6 @@ import builtins import os import sys -import tempfile from pathlib import Path import click @@ -21,6 +20,7 @@ APM_YML_FILENAME, GITIGNORE_FILENAME, ) +from ..utils.atomic_io import atomic_write_text as _atomic_write from ..utils.console import _rich_echo, _rich_info, _rich_warning from ..update_policy import get_update_hint_message, is_self_update_enabled from ..version import get_build_sha, get_version @@ -298,21 +298,6 @@ def _check_and_notify_updates(): pass -def _atomic_write(path: Path, data: str) -> None: - """Atomically write text data to path (best-effort).""" - fd, tmp_name = tempfile.mkstemp(prefix="apm-write-", dir=str(path.parent)) - try: - with os.fdopen(fd, "w", encoding="utf-8") as fh: - fh.write(data) - os.replace(tmp_name, path) - except Exception: - try: - os.unlink(tmp_name) - except OSError: - pass - raise - - def _update_gitignore_for_apm_modules(logger=None): """Add apm_modules/ to .gitignore if not already present.""" gitignore_path = Path(GITIGNORE_FILENAME) diff --git a/src/apm_cli/commands/compile/cli.py b/src/apm_cli/commands/compile/cli.py index e1e01ad1b..25a8d2a37 100644 --- a/src/apm_cli/commands/compile/cli.py +++ b/src/apm_cli/commands/compile/cli.py @@ -16,7 +16,6 @@ _rich_panel, ) from .._helpers import ( - _atomic_write, _check_orphaned_packages, _get_console, _rich_blank_line, @@ -577,26 +576,6 @@ def compile( output_path=output_path, ) - # Compute deterministic Build ID (12-char SHA256) over content with placeholder removed - import hashlib - - from ...compilation.constants import BUILD_ID_PLACEHOLDER - - lines = final_content.splitlines() - # Identify placeholder line index - try: - idx = lines.index(BUILD_ID_PLACEHOLDER) - except ValueError: - idx = None - hash_input_lines = [l for i, l in enumerate(lines) if i != idx] - hash_bytes = "\n".join(hash_input_lines).encode("utf-8") - build_id = hashlib.sha256(hash_bytes).hexdigest()[:12] - if idx is not None: - lines[idx] = f"" - final_content = "\n".join(lines) + ( - "\n" if final_content.endswith("\n") else "" - ) - if not dry_run: # Only rewrite when content materially changes (creation, update, missing constitution case) if c_status in ("CREATED", "UPDATED", "MISSING"): @@ -616,7 +595,9 @@ def compile( f"-- run 'apm audit --file {output_path}' to inspect" ) try: - _atomic_write(output_path, final_content) + from ...compilation.output_writer import CompiledOutputWriter + + CompiledOutputWriter().write(output_path, final_content) except OSError as e: logger.error(f"Failed to write final AGENTS.md: {e}") sys.exit(1) diff --git a/src/apm_cli/compilation/agents_compiler.py b/src/apm_cli/compilation/agents_compiler.py index e48deb63c..0d33d9c45 100644 --- a/src/apm_cli/compilation/agents_compiler.py +++ b/src/apm_cli/compilation/agents_compiler.py @@ -541,11 +541,10 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol files_written = 0 critical_security_found = False from ..security.gate import WARN_POLICY, SecurityGate + from .output_writer import CompiledOutputWriter + writer = CompiledOutputWriter() for claude_path, content in claude_result.content_map.items(): try: - # Create directory if needed - claude_path.parent.mkdir(parents=True, exist_ok=True) - # Handle constitution injection if enabled final_content = content if config.with_constitution: @@ -559,7 +558,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol ) except Exception as exc: _logger.debug("Constitution injection failed for %s: %s", claude_path, exc) - + # Defense-in-depth: scan compiled output before writing verdict = SecurityGate.scan_text( final_content, str(claude_path), policy=WARN_POLICY @@ -573,7 +572,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol f"— run 'apm audit --file {claude_path}' to inspect" ) - claude_path.write_text(final_content, encoding='utf-8') + writer.write(claude_path, final_content) files_written += 1 except OSError as e: all_errors.append(f"Failed to write {claude_path}: {str(e)}") @@ -665,10 +664,11 @@ def _compile_gemini_md(self, config: CompilationConfig, primitives: PrimitiveCol ) files_written = 0 + from .output_writer import CompiledOutputWriter + writer = CompiledOutputWriter() for gemini_path, content in gemini_result.content_map.items(): try: - gemini_path.parent.mkdir(parents=True, exist_ok=True) - gemini_path.write_text(content, encoding="utf-8") + writer.write(gemini_path, content) files_written += 1 except OSError as e: all_errors.append(f"Failed to write {gemini_path}: {str(e)}") @@ -835,14 +835,14 @@ def _generate_template_data(self, primitives: PrimitiveCollection, config: Compi def _write_output_file(self, output_path: str, content: str) -> None: """Write the generated content to the output file. - + Args: output_path (str): Path to write the output. content (str): Content to write. """ + from .output_writer import CompiledOutputWriter try: - with open(output_path, 'w', encoding='utf-8') as f: - f.write(content) + CompiledOutputWriter().write(Path(output_path), content) except OSError as e: self.errors.append(f"Failed to write output file {output_path}: {str(e)}") @@ -892,13 +892,9 @@ def _write_distributed_file(self, agents_path: Path, content: str, config: Compi except Exception as exc: _logger.debug("Constitution injection failed for %s: %s", agents_path, exc) - # Create directory if it doesn't exist - agents_path.parent.mkdir(parents=True, exist_ok=True) - - # Write the file - with open(agents_path, 'w', encoding='utf-8') as f: - f.write(final_content) - + from .output_writer import CompiledOutputWriter + CompiledOutputWriter().write(agents_path, final_content) + except OSError as e: raise OSError(f"Failed to write distributed AGENTS.md file {agents_path}: {str(e)}") diff --git a/src/apm_cli/compilation/build_id.py b/src/apm_cli/compilation/build_id.py new file mode 100644 index 000000000..bceb1ba79 --- /dev/null +++ b/src/apm_cli/compilation/build_id.py @@ -0,0 +1,41 @@ +"""Build ID stabilization for compiled outputs. + +Formatters insert ``BUILD_ID_PLACEHOLDER`` (a sentinel marker line) into +their generated content. Before persisting that content to disk, callers +must replace the placeholder with a deterministic 12-char SHA256 hash so +the file stays byte-stable across rebuilds with identical input. + +The hash is computed over the content with the placeholder line *removed* +so the hash is not self-referential -- it would otherwise change every +time the placeholder string itself changed. + +This module is the single source of truth for that replacement; all +compiled-output write sites must route through ``CompiledOutputWriter`` +(see ``output_writer.py``) which calls this helper. +""" + +import hashlib + +from .constants import BUILD_ID_PLACEHOLDER + + +def stabilize_build_id(content: str) -> str: + """Replace BUILD_ID_PLACEHOLDER with a deterministic 12-char SHA256 hash. + + Idempotent: returns ``content`` unchanged if no placeholder is present. + Preserves a trailing newline if the input had one. + """ + lines = content.splitlines() + try: + idx = lines.index(BUILD_ID_PLACEHOLDER) + except ValueError: + return content + + hash_input_lines = [line for i, line in enumerate(lines) if i != idx] + build_id = hashlib.sha256( + "\n".join(hash_input_lines).encode("utf-8") + ).hexdigest()[:12] + + lines[idx] = f"" + trailing_nl = "\n" if content.endswith("\n") else "" + return "\n".join(lines) + trailing_nl diff --git a/src/apm_cli/compilation/output_writer.py b/src/apm_cli/compilation/output_writer.py new file mode 100644 index 000000000..65693789f --- /dev/null +++ b/src/apm_cli/compilation/output_writer.py @@ -0,0 +1,49 @@ +"""Single chokepoint for persisting compiled outputs. + +All compilation targets (single-file AGENTS.md, distributed AGENTS.md, +CLAUDE.md, GEMINI.md, future targets) MUST route their writes through +``CompiledOutputWriter.write``. The writer guarantees: + +1. ``BUILD_ID_PLACEHOLDER`` is replaced with a deterministic hash + (see ``build_id.stabilize_build_id``). +2. A defensive assertion fails loudly if the placeholder survives + stabilization, so a future code path that bypasses or breaks + stabilization cannot silently emit ``__BUILD_ID__`` to disk. +3. Parent directories are created. +4. The write is atomic (replace-on-rename), so a crash mid-write cannot + corrupt a pre-existing target file. + +Direct ``Path.write_text`` / ``open(...).write`` on compiled output is a +contract violation -- adding new write sites without using this writer +will, by design, miss every cross-cutting concern this writer owns. + +Error contract: + - ``OSError`` from filesystem operations (mkdir, rename) propagates + to callers, which typically log + continue. + - ``RuntimeError`` is raised when the stabilization assertion fails + (i.e. ``BUILD_ID_PLACEHOLDER`` survived ``stabilize_build_id``). + This is a programmer error -- never expected in production -- and + is intentionally NOT caught by callers' ``except OSError`` blocks + so it surfaces as a loud traceback rather than a silent skip. +""" + +from pathlib import Path + +from ..utils.atomic_io import atomic_write_text +from .build_id import stabilize_build_id +from .constants import BUILD_ID_PLACEHOLDER + + +class CompiledOutputWriter: + """Persist compiled output with cross-cutting concerns applied.""" + + def write(self, path: Path, content: str) -> None: + final = stabilize_build_id(content) + if BUILD_ID_PLACEHOLDER in final: + raise RuntimeError( + "build_id stabilization bypassed: " + f"{BUILD_ID_PLACEHOLDER!r} still present after stabilization " + f"(target={path})" + ) + path.parent.mkdir(parents=True, exist_ok=True) + atomic_write_text(path, final) diff --git a/src/apm_cli/utils/atomic_io.py b/src/apm_cli/utils/atomic_io.py new file mode 100644 index 000000000..8633754e0 --- /dev/null +++ b/src/apm_cli/utils/atomic_io.py @@ -0,0 +1,39 @@ +"""Atomic file-write primitive for APM. + +Writes go to a temp file in the same directory as the target, then are +renamed via :func:`os.replace`. A crash mid-write cannot leave a half- +written destination, and on POSIX the rename is atomic with respect to +concurrent readers. + +This is the single canonical implementation; both +``apm_cli.commands._helpers._atomic_write`` (kept as an alias for +backward compatibility with existing tests) and +``apm_cli.compilation.output_writer`` route through here. +""" + +import os +import tempfile +from pathlib import Path + + +def atomic_write_text(path: Path, data: str) -> None: + """Atomically write ``data`` (UTF-8) to ``path``. + + The temp file is created in ``path.parent`` so the eventual + ``os.replace`` is a same-filesystem rename. Caller is responsible + for ensuring the parent directory exists. + + On any failure, the temp file is removed and the original target + file (if any) remains untouched. + """ + fd, tmp_name = tempfile.mkstemp(prefix="apm-atomic-", dir=str(path.parent)) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(data) + os.replace(tmp_name, path) + except Exception: + try: + os.unlink(tmp_name) + except OSError: + pass + raise diff --git a/tests/unit/compilation/test_agents_compiler_coverage.py b/tests/unit/compilation/test_agents_compiler_coverage.py index 5af2295c1..32656df36 100644 --- a/tests/unit/compilation/test_agents_compiler_coverage.py +++ b/tests/unit/compilation/test_agents_compiler_coverage.py @@ -319,11 +319,20 @@ def tearDown(self): def test_write_output_file_oserror_adds_error(self): """_write_output_file adds error message when OS error occurs.""" + from unittest.mock import patch + compiler = AgentsCompiler(self.tmp) - bad_path = str(Path(self.tmp) / "nodir" / "deep" / "AGENTS.md") + target = str(Path(self.tmp) / "AGENTS.md") + + # Force the atomic-write rename to fail so we exercise the OSError + # path. Parent directory is now auto-created by CompiledOutputWriter, + # so we cannot rely on a missing-parent failure mode. + with patch( + "apm_cli.utils.atomic_io.os.replace", + side_effect=OSError("simulated rename failure"), + ): + compiler._write_output_file(target, "content") - # Don't create parent directory so the write fails. - compiler._write_output_file(bad_path, "content") self.assertEqual(len(compiler.errors), 1) self.assertIn("Failed to write", compiler.errors[0]) diff --git a/tests/unit/compilation/test_build_id.py b/tests/unit/compilation/test_build_id.py new file mode 100644 index 000000000..bbeca47cf --- /dev/null +++ b/tests/unit/compilation/test_build_id.py @@ -0,0 +1,110 @@ +"""Unit tests for compilation/build_id.py. + +Tests the stabilize_build_id() helper that replaces the BUILD_ID_PLACEHOLDER +line with a deterministic 12-char SHA256 hash computed over the content with +the placeholder line removed (so the hash is not self-referential). +""" + +import re + +import pytest + +from apm_cli.compilation.build_id import stabilize_build_id +from apm_cli.compilation.constants import BUILD_ID_PLACEHOLDER + + +_HASH_LINE_RE = re.compile(r"^$") + + +def test_replaces_placeholder_with_hash_line(): + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "\n" + ) + + result = stabilize_build_id(content) + + assert BUILD_ID_PLACEHOLDER not in result + hash_line = result.splitlines()[1] + assert _HASH_LINE_RE.match(hash_line), f"unexpected line: {hash_line!r}" + + +def test_returns_unchanged_when_no_placeholder(): + content = "# AGENTS.md\n\nbody\n" + + assert stabilize_build_id(content) == content + + +def test_idempotent_after_one_pass(): + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "body\n" + ) + + once = stabilize_build_id(content) + twice = stabilize_build_id(once) + + assert once == twice + + +def test_deterministic_for_same_input(): + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "body\n" + ) + + assert stabilize_build_id(content) == stabilize_build_id(content) + + +def test_different_content_yields_different_hash(): + content_a = f"# A\n{BUILD_ID_PLACEHOLDER}\nbody-a\n" + content_b = f"# B\n{BUILD_ID_PLACEHOLDER}\nbody-b\n" + + hash_a = stabilize_build_id(content_a).splitlines()[1] + hash_b = stabilize_build_id(content_b).splitlines()[1] + + assert hash_a != hash_b + + +def test_hash_excludes_placeholder_line_itself(): + """Hash must be computed over content with the placeholder line removed, + otherwise the hash is self-referential and cannot remain stable across + formatter versions that change the placeholder string. + + Two contents that differ ONLY in the placeholder position must hash + identically when the placeholder is excluded from the hash input. + """ + content_a = f"# A\n{BUILD_ID_PLACEHOLDER}\nbody\n" + content_b = f"{BUILD_ID_PLACEHOLDER}\n# A\nbody\n" + + hash_a = stabilize_build_id(content_a).splitlines() + hash_b = stabilize_build_id(content_b).splitlines() + + hash_a_value = next(line for line in hash_a if line.startswith("") + + +def test_stabilizes_build_id_before_writing(tmp_path: Path): + target = tmp_path / "AGENTS.md" + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "\n" + ) + + CompiledOutputWriter().write(target, content) + + written = target.read_text(encoding="utf-8") + assert BUILD_ID_PLACEHOLDER not in written + assert _HASH_LINE_RE.search(written), written + + +def test_creates_parent_directories(tmp_path: Path): + target = tmp_path / "deep" / "nested" / "AGENTS.md" + content = f"# A\n{BUILD_ID_PLACEHOLDER}\n" + + CompiledOutputWriter().write(target, content) + + assert target.exists() + + +def test_writes_utf8_encoded(tmp_path: Path): + target = tmp_path / "AGENTS.md" + title = "\u4e2d\u6587\u6a19\u984c" + content = f"# {title}\n{BUILD_ID_PLACEHOLDER}\nbody\n" + + CompiledOutputWriter().write(target, content) + + assert target.read_text(encoding="utf-8").startswith(f"# {title}") + + +def test_passes_through_content_without_placeholder(tmp_path: Path): + target = tmp_path / "AGENTS.md" + content = "# A\n\nbody\n" + + CompiledOutputWriter().write(target, content) + + assert target.read_text(encoding="utf-8") == content + + +def test_raises_when_placeholder_unresolvable(tmp_path: Path, monkeypatch): + """Defense-in-depth: if a future code path mutates content so the + placeholder survives stabilization, the writer must refuse to persist. + """ + target = tmp_path / "AGENTS.md" + # Inject a sentinel that simulates a stabilization failure: stub + # ``stabilize_build_id`` to a no-op so the placeholder survives. + import apm_cli.compilation.output_writer as ow + + monkeypatch.setattr(ow, "stabilize_build_id", lambda c: c) + + with pytest.raises(RuntimeError, match="build_id stabilization bypassed"): + CompiledOutputWriter().write(target, f"# A\n{BUILD_ID_PLACEHOLDER}\n") + + assert not target.exists() + + +def test_atomic_write_no_partial_file_on_failure(tmp_path: Path, monkeypatch): + """Failed writes must not leave a half-written file at the target path.""" + target = tmp_path / "AGENTS.md" + target.write_text("PRE-EXISTING\n", encoding="utf-8") + + import apm_cli.utils.atomic_io as atomic_io + + def boom(*args, **kwargs): + raise OSError("disk full") + + monkeypatch.setattr(atomic_io.os, "replace", boom) + + with pytest.raises(OSError): + CompiledOutputWriter().write(target, f"# A\n{BUILD_ID_PLACEHOLDER}\n") + + # Pre-existing file must remain untouched + assert target.read_text(encoding="utf-8") == "PRE-EXISTING\n" diff --git a/tests/unit/test_command_helpers.py b/tests/unit/test_command_helpers.py index 7f26705ca..49fba42d6 100644 --- a/tests/unit/test_command_helpers.py +++ b/tests/unit/test_command_helpers.py @@ -64,8 +64,9 @@ def test_cleans_up_temp_file_on_write_error(self, tmp_path): with patch("os.replace", side_effect=OSError("replace failed")): with pytest.raises(OSError, match="replace failed"): _atomic_write(target, "data") - # No stale temp file should remain in tmp_path - leftover = list(tmp_path.glob("apm-write-*")) + # No stale temp file should remain in tmp_path. Prefix-agnostic so + # the assertion does not silently pass if the temp prefix changes. + leftover = [f for f in tmp_path.iterdir() if f != target] assert leftover == [], f"Temp file not cleaned up: {leftover}"