From 5b718fc23792eae2eaa19ecaa24a133fdc311100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A8=B1=E5=85=83=E8=B1=AA?= <146086744+edenfunf@users.noreply.github.com> Date: Mon, 27 Apr 2026 01:12:11 +0800 Subject: [PATCH 1/2] fix(compile): stabilize __BUILD_ID__ in all compiled outputs apm compile wrote literal '' to disk on every target except the legacy --single-agents path. The substitution that constants.py promises lived only in compile/cli.py:506-524 and was gated by `if config.strategy == "distributed" and not single_agents`. PR #468 (which closed #467) added cross-platform sort determinism but never extended substitution to the distributed/claude/gemini paths; its verification ran on --single-agents only. Affected paths producing literal __BUILD_ID__ on main: - apm compile (default distributed): AGENTS.md + every sub-dir - apm compile -t claude: CLAUDE.md - apm compile -t gemini: GEMINI.md - apm compile --watch (single-file): AGENTS.md - compile_agents_md() public API: AGENTS.md Rather than repeat PR #468's sibling-by-sibling pattern (4 files, all needing the same call site), this introduces a single chokepoint: compilation/output_writer.py::CompiledOutputWriter All five compiled-output write sites route through writer.write(...). The writer extracts stabilize_build_id() (preserved verbatim from compile/cli.py), asserts no placeholder survives stabilization (defense-in-depth: future bypass raises RuntimeError instead of silently emitting __BUILD_ID__), creates parent dirs, and writes atomically. Direct Path.write_text / open(...).write on compiled output is a contract violation by design. The atomic write primitive was duplicated in commands/_helpers. Consolidated to utils/atomic_io.atomic_write_text; commands/_helpers keeps a one-line alias for backward compatibility with existing tests. Closes #957 --- src/apm_cli/commands/_helpers.py | 14 +-- src/apm_cli/commands/compile/cli.py | 25 +--- src/apm_cli/compilation/agents_compiler.py | 30 +++-- src/apm_cli/compilation/build_id.py | 41 +++++++ src/apm_cli/compilation/output_writer.py | 49 ++++++++ src/apm_cli/utils/atomic_io.py | 39 +++++++ .../test_agents_compiler_coverage.py | 15 ++- tests/unit/compilation/test_build_id.py | 110 ++++++++++++++++++ tests/unit/compilation/test_output_writer.py | 95 +++++++++++++++ tests/unit/test_command_helpers.py | 5 +- 10 files changed, 366 insertions(+), 57 deletions(-) create mode 100644 src/apm_cli/compilation/build_id.py create mode 100644 src/apm_cli/compilation/output_writer.py create mode 100644 src/apm_cli/utils/atomic_io.py create mode 100644 tests/unit/compilation/test_build_id.py create mode 100644 tests/unit/compilation/test_output_writer.py diff --git a/src/apm_cli/commands/_helpers.py b/src/apm_cli/commands/_helpers.py index 136cd0ae7..f39f19afe 100644 --- a/src/apm_cli/commands/_helpers.py +++ b/src/apm_cli/commands/_helpers.py @@ -286,19 +286,7 @@ def _check_and_notify_updates(): pass -def _atomic_write(path: Path, data: str) -> None: - """Atomically write text data to path (best-effort).""" - fd, tmp_name = tempfile.mkstemp(prefix="apm-write-", dir=str(path.parent)) - try: - with os.fdopen(fd, "w", encoding="utf-8") as fh: - fh.write(data) - os.replace(tmp_name, path) - except Exception: - try: - os.unlink(tmp_name) - except OSError: - pass - raise +from ..utils.atomic_io import atomic_write_text as _atomic_write def _update_gitignore_for_apm_modules(logger=None): diff --git a/src/apm_cli/commands/compile/cli.py b/src/apm_cli/commands/compile/cli.py index 1b559eef2..775131b44 100644 --- a/src/apm_cli/commands/compile/cli.py +++ b/src/apm_cli/commands/compile/cli.py @@ -16,7 +16,6 @@ _rich_panel, ) from .._helpers import ( - _atomic_write, _check_orphaned_packages, _get_console, _rich_blank_line, @@ -503,26 +502,6 @@ def compile( output_path=output_path, ) - # Compute deterministic Build ID (12-char SHA256) over content with placeholder removed - import hashlib - - from ...compilation.constants import BUILD_ID_PLACEHOLDER - - lines = final_content.splitlines() - # Identify placeholder line index - try: - idx = lines.index(BUILD_ID_PLACEHOLDER) - except ValueError: - idx = None - hash_input_lines = [l for i, l in enumerate(lines) if i != idx] - hash_bytes = "\n".join(hash_input_lines).encode("utf-8") - build_id = hashlib.sha256(hash_bytes).hexdigest()[:12] - if idx is not None: - lines[idx] = f"" - final_content = "\n".join(lines) + ( - "\n" if final_content.endswith("\n") else "" - ) - if not dry_run: # Only rewrite when content materially changes (creation, update, missing constitution case) if c_status in ("CREATED", "UPDATED", "MISSING"): @@ -542,7 +521,9 @@ def compile( f"-- run 'apm audit --file {output_path}' to inspect" ) try: - _atomic_write(output_path, final_content) + from ...compilation.output_writer import CompiledOutputWriter + + CompiledOutputWriter().write(output_path, final_content) except OSError as e: logger.error(f"Failed to write final AGENTS.md: {e}") sys.exit(1) diff --git a/src/apm_cli/compilation/agents_compiler.py b/src/apm_cli/compilation/agents_compiler.py index 7dd7d4338..b1737e943 100644 --- a/src/apm_cli/compilation/agents_compiler.py +++ b/src/apm_cli/compilation/agents_compiler.py @@ -510,11 +510,10 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol files_written = 0 critical_security_found = False from ..security.gate import WARN_POLICY, SecurityGate + from .output_writer import CompiledOutputWriter + writer = CompiledOutputWriter() for claude_path, content in claude_result.content_map.items(): try: - # Create directory if needed - claude_path.parent.mkdir(parents=True, exist_ok=True) - # Handle constitution injection if enabled final_content = content if config.with_constitution: @@ -528,7 +527,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol ) except Exception: pass # Use original content if injection fails - + # Defense-in-depth: scan compiled output before writing verdict = SecurityGate.scan_text( final_content, str(claude_path), policy=WARN_POLICY @@ -542,7 +541,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol f"— run 'apm audit --file {claude_path}' to inspect" ) - claude_path.write_text(final_content, encoding='utf-8') + writer.write(claude_path, final_content) files_written += 1 except OSError as e: all_errors.append(f"Failed to write {claude_path}: {str(e)}") @@ -634,10 +633,11 @@ def _compile_gemini_md(self, config: CompilationConfig, primitives: PrimitiveCol ) files_written = 0 + from .output_writer import CompiledOutputWriter + writer = CompiledOutputWriter() for gemini_path, content in gemini_result.content_map.items(): try: - gemini_path.parent.mkdir(parents=True, exist_ok=True) - gemini_path.write_text(content, encoding="utf-8") + writer.write(gemini_path, content) files_written += 1 except OSError as e: all_errors.append(f"Failed to write {gemini_path}: {str(e)}") @@ -804,14 +804,14 @@ def _generate_template_data(self, primitives: PrimitiveCollection, config: Compi def _write_output_file(self, output_path: str, content: str) -> None: """Write the generated content to the output file. - + Args: output_path (str): Path to write the output. content (str): Content to write. """ + from .output_writer import CompiledOutputWriter try: - with open(output_path, 'w', encoding='utf-8') as f: - f.write(content) + CompiledOutputWriter().write(Path(output_path), content) except OSError as e: self.errors.append(f"Failed to write output file {output_path}: {str(e)}") @@ -862,13 +862,9 @@ def _write_distributed_file(self, agents_path: Path, content: str, config: Compi # If constitution injection fails, use original content pass - # Create directory if it doesn't exist - agents_path.parent.mkdir(parents=True, exist_ok=True) - - # Write the file - with open(agents_path, 'w', encoding='utf-8') as f: - f.write(final_content) - + from .output_writer import CompiledOutputWriter + CompiledOutputWriter().write(agents_path, final_content) + except OSError as e: raise OSError(f"Failed to write distributed AGENTS.md file {agents_path}: {str(e)}") diff --git a/src/apm_cli/compilation/build_id.py b/src/apm_cli/compilation/build_id.py new file mode 100644 index 000000000..bceb1ba79 --- /dev/null +++ b/src/apm_cli/compilation/build_id.py @@ -0,0 +1,41 @@ +"""Build ID stabilization for compiled outputs. + +Formatters insert ``BUILD_ID_PLACEHOLDER`` (a sentinel marker line) into +their generated content. Before persisting that content to disk, callers +must replace the placeholder with a deterministic 12-char SHA256 hash so +the file stays byte-stable across rebuilds with identical input. + +The hash is computed over the content with the placeholder line *removed* +so the hash is not self-referential -- it would otherwise change every +time the placeholder string itself changed. + +This module is the single source of truth for that replacement; all +compiled-output write sites must route through ``CompiledOutputWriter`` +(see ``output_writer.py``) which calls this helper. +""" + +import hashlib + +from .constants import BUILD_ID_PLACEHOLDER + + +def stabilize_build_id(content: str) -> str: + """Replace BUILD_ID_PLACEHOLDER with a deterministic 12-char SHA256 hash. + + Idempotent: returns ``content`` unchanged if no placeholder is present. + Preserves a trailing newline if the input had one. + """ + lines = content.splitlines() + try: + idx = lines.index(BUILD_ID_PLACEHOLDER) + except ValueError: + return content + + hash_input_lines = [line for i, line in enumerate(lines) if i != idx] + build_id = hashlib.sha256( + "\n".join(hash_input_lines).encode("utf-8") + ).hexdigest()[:12] + + lines[idx] = f"" + trailing_nl = "\n" if content.endswith("\n") else "" + return "\n".join(lines) + trailing_nl diff --git a/src/apm_cli/compilation/output_writer.py b/src/apm_cli/compilation/output_writer.py new file mode 100644 index 000000000..65693789f --- /dev/null +++ b/src/apm_cli/compilation/output_writer.py @@ -0,0 +1,49 @@ +"""Single chokepoint for persisting compiled outputs. + +All compilation targets (single-file AGENTS.md, distributed AGENTS.md, +CLAUDE.md, GEMINI.md, future targets) MUST route their writes through +``CompiledOutputWriter.write``. The writer guarantees: + +1. ``BUILD_ID_PLACEHOLDER`` is replaced with a deterministic hash + (see ``build_id.stabilize_build_id``). +2. A defensive assertion fails loudly if the placeholder survives + stabilization, so a future code path that bypasses or breaks + stabilization cannot silently emit ``__BUILD_ID__`` to disk. +3. Parent directories are created. +4. The write is atomic (replace-on-rename), so a crash mid-write cannot + corrupt a pre-existing target file. + +Direct ``Path.write_text`` / ``open(...).write`` on compiled output is a +contract violation -- adding new write sites without using this writer +will, by design, miss every cross-cutting concern this writer owns. + +Error contract: + - ``OSError`` from filesystem operations (mkdir, rename) propagates + to callers, which typically log + continue. + - ``RuntimeError`` is raised when the stabilization assertion fails + (i.e. ``BUILD_ID_PLACEHOLDER`` survived ``stabilize_build_id``). + This is a programmer error -- never expected in production -- and + is intentionally NOT caught by callers' ``except OSError`` blocks + so it surfaces as a loud traceback rather than a silent skip. +""" + +from pathlib import Path + +from ..utils.atomic_io import atomic_write_text +from .build_id import stabilize_build_id +from .constants import BUILD_ID_PLACEHOLDER + + +class CompiledOutputWriter: + """Persist compiled output with cross-cutting concerns applied.""" + + def write(self, path: Path, content: str) -> None: + final = stabilize_build_id(content) + if BUILD_ID_PLACEHOLDER in final: + raise RuntimeError( + "build_id stabilization bypassed: " + f"{BUILD_ID_PLACEHOLDER!r} still present after stabilization " + f"(target={path})" + ) + path.parent.mkdir(parents=True, exist_ok=True) + atomic_write_text(path, final) diff --git a/src/apm_cli/utils/atomic_io.py b/src/apm_cli/utils/atomic_io.py new file mode 100644 index 000000000..8633754e0 --- /dev/null +++ b/src/apm_cli/utils/atomic_io.py @@ -0,0 +1,39 @@ +"""Atomic file-write primitive for APM. + +Writes go to a temp file in the same directory as the target, then are +renamed via :func:`os.replace`. A crash mid-write cannot leave a half- +written destination, and on POSIX the rename is atomic with respect to +concurrent readers. + +This is the single canonical implementation; both +``apm_cli.commands._helpers._atomic_write`` (kept as an alias for +backward compatibility with existing tests) and +``apm_cli.compilation.output_writer`` route through here. +""" + +import os +import tempfile +from pathlib import Path + + +def atomic_write_text(path: Path, data: str) -> None: + """Atomically write ``data`` (UTF-8) to ``path``. + + The temp file is created in ``path.parent`` so the eventual + ``os.replace`` is a same-filesystem rename. Caller is responsible + for ensuring the parent directory exists. + + On any failure, the temp file is removed and the original target + file (if any) remains untouched. + """ + fd, tmp_name = tempfile.mkstemp(prefix="apm-atomic-", dir=str(path.parent)) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(data) + os.replace(tmp_name, path) + except Exception: + try: + os.unlink(tmp_name) + except OSError: + pass + raise diff --git a/tests/unit/compilation/test_agents_compiler_coverage.py b/tests/unit/compilation/test_agents_compiler_coverage.py index 2790a8aaf..afd42710a 100644 --- a/tests/unit/compilation/test_agents_compiler_coverage.py +++ b/tests/unit/compilation/test_agents_compiler_coverage.py @@ -319,11 +319,20 @@ def tearDown(self): def test_write_output_file_oserror_adds_error(self): """_write_output_file adds error message when OS error occurs.""" + from unittest.mock import patch + compiler = AgentsCompiler(self.tmp) - bad_path = str(Path(self.tmp) / "nodir" / "deep" / "AGENTS.md") + target = str(Path(self.tmp) / "AGENTS.md") + + # Force the atomic-write rename to fail so we exercise the OSError + # path. Parent directory is now auto-created by CompiledOutputWriter, + # so we cannot rely on a missing-parent failure mode. + with patch( + "apm_cli.utils.atomic_io.os.replace", + side_effect=OSError("simulated rename failure"), + ): + compiler._write_output_file(target, "content") - # Don't create parent directory so the write fails. - compiler._write_output_file(bad_path, "content") self.assertEqual(len(compiler.errors), 1) self.assertIn("Failed to write", compiler.errors[0]) diff --git a/tests/unit/compilation/test_build_id.py b/tests/unit/compilation/test_build_id.py new file mode 100644 index 000000000..bbeca47cf --- /dev/null +++ b/tests/unit/compilation/test_build_id.py @@ -0,0 +1,110 @@ +"""Unit tests for compilation/build_id.py. + +Tests the stabilize_build_id() helper that replaces the BUILD_ID_PLACEHOLDER +line with a deterministic 12-char SHA256 hash computed over the content with +the placeholder line removed (so the hash is not self-referential). +""" + +import re + +import pytest + +from apm_cli.compilation.build_id import stabilize_build_id +from apm_cli.compilation.constants import BUILD_ID_PLACEHOLDER + + +_HASH_LINE_RE = re.compile(r"^$") + + +def test_replaces_placeholder_with_hash_line(): + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "\n" + ) + + result = stabilize_build_id(content) + + assert BUILD_ID_PLACEHOLDER not in result + hash_line = result.splitlines()[1] + assert _HASH_LINE_RE.match(hash_line), f"unexpected line: {hash_line!r}" + + +def test_returns_unchanged_when_no_placeholder(): + content = "# AGENTS.md\n\nbody\n" + + assert stabilize_build_id(content) == content + + +def test_idempotent_after_one_pass(): + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "body\n" + ) + + once = stabilize_build_id(content) + twice = stabilize_build_id(once) + + assert once == twice + + +def test_deterministic_for_same_input(): + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "body\n" + ) + + assert stabilize_build_id(content) == stabilize_build_id(content) + + +def test_different_content_yields_different_hash(): + content_a = f"# A\n{BUILD_ID_PLACEHOLDER}\nbody-a\n" + content_b = f"# B\n{BUILD_ID_PLACEHOLDER}\nbody-b\n" + + hash_a = stabilize_build_id(content_a).splitlines()[1] + hash_b = stabilize_build_id(content_b).splitlines()[1] + + assert hash_a != hash_b + + +def test_hash_excludes_placeholder_line_itself(): + """Hash must be computed over content with the placeholder line removed, + otherwise the hash is self-referential and cannot remain stable across + formatter versions that change the placeholder string. + + Two contents that differ ONLY in the placeholder position must hash + identically when the placeholder is excluded from the hash input. + """ + content_a = f"# A\n{BUILD_ID_PLACEHOLDER}\nbody\n" + content_b = f"{BUILD_ID_PLACEHOLDER}\n# A\nbody\n" + + hash_a = stabilize_build_id(content_a).splitlines() + hash_b = stabilize_build_id(content_b).splitlines() + + hash_a_value = next(line for line in hash_a if line.startswith("") + + +def test_stabilizes_build_id_before_writing(tmp_path: Path): + target = tmp_path / "AGENTS.md" + content = ( + "# AGENTS.md\n" + f"{BUILD_ID_PLACEHOLDER}\n" + "\n" + ) + + CompiledOutputWriter().write(target, content) + + written = target.read_text(encoding="utf-8") + assert BUILD_ID_PLACEHOLDER not in written + assert _HASH_LINE_RE.search(written), written + + +def test_creates_parent_directories(tmp_path: Path): + target = tmp_path / "deep" / "nested" / "AGENTS.md" + content = f"# A\n{BUILD_ID_PLACEHOLDER}\n" + + CompiledOutputWriter().write(target, content) + + assert target.exists() + + +def test_writes_utf8_encoded(tmp_path: Path): + target = tmp_path / "AGENTS.md" + content = f"# 中文標題\n{BUILD_ID_PLACEHOLDER}\nbody\n" + + CompiledOutputWriter().write(target, content) + + assert target.read_text(encoding="utf-8").startswith("# 中文標題") + + +def test_passes_through_content_without_placeholder(tmp_path: Path): + target = tmp_path / "AGENTS.md" + content = "# A\n\nbody\n" + + CompiledOutputWriter().write(target, content) + + assert target.read_text(encoding="utf-8") == content + + +def test_raises_when_placeholder_unresolvable(tmp_path: Path, monkeypatch): + """Defense-in-depth: if a future code path mutates content so the + placeholder survives stabilization, the writer must refuse to persist. + """ + target = tmp_path / "AGENTS.md" + # Inject a sentinel that simulates a stabilization failure: stub + # ``stabilize_build_id`` to a no-op so the placeholder survives. + import apm_cli.compilation.output_writer as ow + + monkeypatch.setattr(ow, "stabilize_build_id", lambda c: c) + + with pytest.raises(RuntimeError, match="build_id stabilization bypassed"): + CompiledOutputWriter().write(target, f"# A\n{BUILD_ID_PLACEHOLDER}\n") + + assert not target.exists() + + +def test_atomic_write_no_partial_file_on_failure(tmp_path: Path, monkeypatch): + """Failed writes must not leave a half-written file at the target path.""" + target = tmp_path / "AGENTS.md" + target.write_text("PRE-EXISTING\n", encoding="utf-8") + + import apm_cli.utils.atomic_io as atomic_io + + def boom(*args, **kwargs): + raise OSError("disk full") + + monkeypatch.setattr(atomic_io.os, "replace", boom) + + with pytest.raises(OSError): + CompiledOutputWriter().write(target, f"# A\n{BUILD_ID_PLACEHOLDER}\n") + + # Pre-existing file must remain untouched + assert target.read_text(encoding="utf-8") == "PRE-EXISTING\n" diff --git a/tests/unit/test_command_helpers.py b/tests/unit/test_command_helpers.py index 7f26705ca..49fba42d6 100644 --- a/tests/unit/test_command_helpers.py +++ b/tests/unit/test_command_helpers.py @@ -64,8 +64,9 @@ def test_cleans_up_temp_file_on_write_error(self, tmp_path): with patch("os.replace", side_effect=OSError("replace failed")): with pytest.raises(OSError, match="replace failed"): _atomic_write(target, "data") - # No stale temp file should remain in tmp_path - leftover = list(tmp_path.glob("apm-write-*")) + # No stale temp file should remain in tmp_path. Prefix-agnostic so + # the assertion does not silently pass if the temp prefix changes. + leftover = [f for f in tmp_path.iterdir() if f != target] assert leftover == [], f"Temp file not cleaned up: {leftover}" From c59e009b1bb5dfb2abf7f750b3677bca032abece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A8=B1=E5=85=83=E8=B1=AA?= <146086744+edenfunf@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:24:48 +0800 Subject: [PATCH 2/2] chore(959): clean up imports and ASCII test data - _helpers.py: move atomic_write_text alias import to top of file (PEP 8 ordering) and drop now-unused tempfile import after the inline _atomic_write body was replaced with the alias. - test_output_writer.py: replace literal CJK title with \u escape sequences so the source file stays within the printable-ASCII policy enforced by .github/instructions/encoding.instructions.md. Runtime characters and the UTF-8 round-trip assertion are unchanged. --- src/apm_cli/commands/_helpers.py | 5 +---- tests/unit/compilation/test_output_writer.py | 5 +++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/apm_cli/commands/_helpers.py b/src/apm_cli/commands/_helpers.py index 919bea4ea..9f21deef9 100644 --- a/src/apm_cli/commands/_helpers.py +++ b/src/apm_cli/commands/_helpers.py @@ -6,7 +6,6 @@ import builtins import os import sys -import tempfile from pathlib import Path import click @@ -21,6 +20,7 @@ APM_YML_FILENAME, GITIGNORE_FILENAME, ) +from ..utils.atomic_io import atomic_write_text as _atomic_write from ..utils.console import _rich_echo, _rich_info, _rich_warning from ..update_policy import get_update_hint_message, is_self_update_enabled from ..version import get_build_sha, get_version @@ -298,9 +298,6 @@ def _check_and_notify_updates(): pass -from ..utils.atomic_io import atomic_write_text as _atomic_write - - def _update_gitignore_for_apm_modules(logger=None): """Add apm_modules/ to .gitignore if not already present.""" gitignore_path = Path(GITIGNORE_FILENAME) diff --git a/tests/unit/compilation/test_output_writer.py b/tests/unit/compilation/test_output_writer.py index 8d56b2a9c..d10f48e4b 100644 --- a/tests/unit/compilation/test_output_writer.py +++ b/tests/unit/compilation/test_output_writer.py @@ -43,11 +43,12 @@ def test_creates_parent_directories(tmp_path: Path): def test_writes_utf8_encoded(tmp_path: Path): target = tmp_path / "AGENTS.md" - content = f"# 中文標題\n{BUILD_ID_PLACEHOLDER}\nbody\n" + title = "\u4e2d\u6587\u6a19\u984c" + content = f"# {title}\n{BUILD_ID_PLACEHOLDER}\nbody\n" CompiledOutputWriter().write(target, content) - assert target.read_text(encoding="utf-8").startswith("# 中文標題") + assert target.read_text(encoding="utf-8").startswith(f"# {title}") def test_passes_through_content_without_placeholder(tmp_path: Path):