Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions src/apm_cli/commands/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import builtins
import os
import sys
import tempfile
from pathlib import Path

import click
Expand All @@ -21,6 +20,7 @@
APM_YML_FILENAME,
GITIGNORE_FILENAME,
)
from ..utils.atomic_io import atomic_write_text as _atomic_write
from ..utils.console import _rich_echo, _rich_info, _rich_warning
from ..update_policy import get_update_hint_message, is_self_update_enabled
from ..version import get_build_sha, get_version
Expand Down Expand Up @@ -298,21 +298,6 @@ def _check_and_notify_updates():
pass


def _atomic_write(path: Path, data: str) -> None:
"""Atomically write text data to path (best-effort)."""
fd, tmp_name = tempfile.mkstemp(prefix="apm-write-", dir=str(path.parent))
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(data)
os.replace(tmp_name, path)
except Exception:
try:
os.unlink(tmp_name)
except OSError:
pass
raise


def _update_gitignore_for_apm_modules(logger=None):
"""Add apm_modules/ to .gitignore if not already present."""
gitignore_path = Path(GITIGNORE_FILENAME)
Expand Down
25 changes: 3 additions & 22 deletions src/apm_cli/commands/compile/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
_rich_panel,
)
from .._helpers import (
_atomic_write,
_check_orphaned_packages,
_get_console,
_rich_blank_line,
Expand Down Expand Up @@ -577,26 +576,6 @@ def compile(
output_path=output_path,
)

# Compute deterministic Build ID (12-char SHA256) over content with placeholder removed
import hashlib

from ...compilation.constants import BUILD_ID_PLACEHOLDER

lines = final_content.splitlines()
# Identify placeholder line index
try:
idx = lines.index(BUILD_ID_PLACEHOLDER)
except ValueError:
idx = None
hash_input_lines = [l for i, l in enumerate(lines) if i != idx]
hash_bytes = "\n".join(hash_input_lines).encode("utf-8")
build_id = hashlib.sha256(hash_bytes).hexdigest()[:12]
if idx is not None:
lines[idx] = f"<!-- Build ID: {build_id} -->"
final_content = "\n".join(lines) + (
"\n" if final_content.endswith("\n") else ""
)

if not dry_run:
# Only rewrite when content materially changes (creation, update, missing constitution case)
if c_status in ("CREATED", "UPDATED", "MISSING"):
Expand All @@ -616,7 +595,9 @@ def compile(
f"-- run 'apm audit --file {output_path}' to inspect"
)
try:
_atomic_write(output_path, final_content)
from ...compilation.output_writer import CompiledOutputWriter

CompiledOutputWriter().write(output_path, final_content)
except OSError as e:
logger.error(f"Failed to write final AGENTS.md: {e}")
Comment thread
edenfunf marked this conversation as resolved.
sys.exit(1)
Expand Down
30 changes: 13 additions & 17 deletions src/apm_cli/compilation/agents_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,10 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol
files_written = 0
critical_security_found = False
from ..security.gate import WARN_POLICY, SecurityGate
from .output_writer import CompiledOutputWriter
writer = CompiledOutputWriter()
for claude_path, content in claude_result.content_map.items():
try:
# Create directory if needed
claude_path.parent.mkdir(parents=True, exist_ok=True)

# Handle constitution injection if enabled
final_content = content
if config.with_constitution:
Expand All @@ -559,7 +558,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol
)
except Exception as exc:
_logger.debug("Constitution injection failed for %s: %s", claude_path, exc)

# Defense-in-depth: scan compiled output before writing
verdict = SecurityGate.scan_text(
final_content, str(claude_path), policy=WARN_POLICY
Expand All @@ -573,7 +572,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol
f"— run 'apm audit --file {claude_path}' to inspect"
)

claude_path.write_text(final_content, encoding='utf-8')
writer.write(claude_path, final_content)
files_written += 1
except OSError as e:
all_errors.append(f"Failed to write {claude_path}: {str(e)}")
Expand Down Expand Up @@ -665,10 +664,11 @@ def _compile_gemini_md(self, config: CompilationConfig, primitives: PrimitiveCol
)

files_written = 0
from .output_writer import CompiledOutputWriter
writer = CompiledOutputWriter()
for gemini_path, content in gemini_result.content_map.items():
try:
gemini_path.parent.mkdir(parents=True, exist_ok=True)
gemini_path.write_text(content, encoding="utf-8")
writer.write(gemini_path, content)
files_written += 1
except OSError as e:
all_errors.append(f"Failed to write {gemini_path}: {str(e)}")
Expand Down Expand Up @@ -835,14 +835,14 @@ def _generate_template_data(self, primitives: PrimitiveCollection, config: Compi

def _write_output_file(self, output_path: str, content: str) -> None:
"""Write the generated content to the output file.

Args:
output_path (str): Path to write the output.
content (str): Content to write.
"""
from .output_writer import CompiledOutputWriter
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content)
CompiledOutputWriter().write(Path(output_path), content)
except OSError as e:
self.errors.append(f"Failed to write output file {output_path}: {str(e)}")

Expand Down Expand Up @@ -892,13 +892,9 @@ def _write_distributed_file(self, agents_path: Path, content: str, config: Compi
except Exception as exc:
_logger.debug("Constitution injection failed for %s: %s", agents_path, exc)

# Create directory if it doesn't exist
agents_path.parent.mkdir(parents=True, exist_ok=True)

# Write the file
with open(agents_path, 'w', encoding='utf-8') as f:
f.write(final_content)

from .output_writer import CompiledOutputWriter
CompiledOutputWriter().write(agents_path, final_content)

except OSError as e:
raise OSError(f"Failed to write distributed AGENTS.md file {agents_path}: {str(e)}")

Expand Down
41 changes: 41 additions & 0 deletions src/apm_cli/compilation/build_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Build ID stabilization for compiled outputs.

Formatters insert ``BUILD_ID_PLACEHOLDER`` (a sentinel marker line) into
their generated content. Before persisting that content to disk, callers
must replace the placeholder with a deterministic 12-char SHA256 hash so
the file stays byte-stable across rebuilds with identical input.

The hash is computed over the content with the placeholder line *removed*
so the hash is not self-referential -- it would otherwise change every
time the placeholder string itself changed.

This module is the single source of truth for that replacement; all
compiled-output write sites must route through ``CompiledOutputWriter``
(see ``output_writer.py``) which calls this helper.
"""

import hashlib

from .constants import BUILD_ID_PLACEHOLDER


def stabilize_build_id(content: str) -> str:
"""Replace BUILD_ID_PLACEHOLDER with a deterministic 12-char SHA256 hash.

Idempotent: returns ``content`` unchanged if no placeholder is present.
Preserves a trailing newline if the input had one.
"""
lines = content.splitlines()
try:
idx = lines.index(BUILD_ID_PLACEHOLDER)
except ValueError:
return content

hash_input_lines = [line for i, line in enumerate(lines) if i != idx]
build_id = hashlib.sha256(
"\n".join(hash_input_lines).encode("utf-8")
).hexdigest()[:12]

lines[idx] = f"<!-- Build ID: {build_id} -->"
trailing_nl = "\n" if content.endswith("\n") else ""
return "\n".join(lines) + trailing_nl
49 changes: 49 additions & 0 deletions src/apm_cli/compilation/output_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Single chokepoint for persisting compiled outputs.

All compilation targets (single-file AGENTS.md, distributed AGENTS.md,
CLAUDE.md, GEMINI.md, future targets) MUST route their writes through
``CompiledOutputWriter.write``. The writer guarantees:

1. ``BUILD_ID_PLACEHOLDER`` is replaced with a deterministic hash
(see ``build_id.stabilize_build_id``).
2. A defensive assertion fails loudly if the placeholder survives
stabilization, so a future code path that bypasses or breaks
stabilization cannot silently emit ``__BUILD_ID__`` to disk.
3. Parent directories are created.
4. The write is atomic (replace-on-rename), so a crash mid-write cannot
corrupt a pre-existing target file.

Direct ``Path.write_text`` / ``open(...).write`` on compiled output is a
contract violation -- adding new write sites without using this writer
will, by design, miss every cross-cutting concern this writer owns.

Error contract:
- ``OSError`` from filesystem operations (mkdir, rename) propagates
to callers, which typically log + continue.
- ``RuntimeError`` is raised when the stabilization assertion fails
(i.e. ``BUILD_ID_PLACEHOLDER`` survived ``stabilize_build_id``).
This is a programmer error -- never expected in production -- and
is intentionally NOT caught by callers' ``except OSError`` blocks
so it surfaces as a loud traceback rather than a silent skip.
"""

from pathlib import Path

from ..utils.atomic_io import atomic_write_text
from .build_id import stabilize_build_id
from .constants import BUILD_ID_PLACEHOLDER


class CompiledOutputWriter:
"""Persist compiled output with cross-cutting concerns applied."""

def write(self, path: Path, content: str) -> None:
final = stabilize_build_id(content)
if BUILD_ID_PLACEHOLDER in final:
raise RuntimeError(
"build_id stabilization bypassed: "
f"{BUILD_ID_PLACEHOLDER!r} still present after stabilization "
f"(target={path})"
)
path.parent.mkdir(parents=True, exist_ok=True)
atomic_write_text(path, final)
39 changes: 39 additions & 0 deletions src/apm_cli/utils/atomic_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Atomic file-write primitive for APM.

Writes go to a temp file in the same directory as the target, then are
renamed via :func:`os.replace`. A crash mid-write cannot leave a half-
written destination, and on POSIX the rename is atomic with respect to
concurrent readers.

This is the single canonical implementation; both
``apm_cli.commands._helpers._atomic_write`` (kept as an alias for
backward compatibility with existing tests) and
``apm_cli.compilation.output_writer`` route through here.
"""

import os
import tempfile
from pathlib import Path


def atomic_write_text(path: Path, data: str) -> None:
"""Atomically write ``data`` (UTF-8) to ``path``.

The temp file is created in ``path.parent`` so the eventual
``os.replace`` is a same-filesystem rename. Caller is responsible
for ensuring the parent directory exists.

On any failure, the temp file is removed and the original target
file (if any) remains untouched.
"""
fd, tmp_name = tempfile.mkstemp(prefix="apm-atomic-", dir=str(path.parent))
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
Comment thread
edenfunf marked this conversation as resolved.
fh.write(data)
os.replace(tmp_name, path)
except Exception:
try:
os.unlink(tmp_name)
except OSError:
pass
raise
15 changes: 12 additions & 3 deletions tests/unit/compilation/test_agents_compiler_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,20 @@ def tearDown(self):

def test_write_output_file_oserror_adds_error(self):
"""_write_output_file adds error message when OS error occurs."""
from unittest.mock import patch

compiler = AgentsCompiler(self.tmp)
bad_path = str(Path(self.tmp) / "nodir" / "deep" / "AGENTS.md")
target = str(Path(self.tmp) / "AGENTS.md")

# Force the atomic-write rename to fail so we exercise the OSError
# path. Parent directory is now auto-created by CompiledOutputWriter,
# so we cannot rely on a missing-parent failure mode.
with patch(
"apm_cli.utils.atomic_io.os.replace",
side_effect=OSError("simulated rename failure"),
):
compiler._write_output_file(target, "content")

# Don't create parent directory so the write fails.
compiler._write_output_file(bad_path, "content")
self.assertEqual(len(compiler.errors), 1)
self.assertIn("Failed to write", compiler.errors[0])

Expand Down
Loading
Loading