Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/automation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .engine import (
AutomationDefinition,
AutomationEngine,
EngineNotStartedError,
RunResult,
RunStatus,
TriggerEvent,
Expand All @@ -48,6 +49,10 @@
GeneratorVariable,
VariableRegistry,
)
from .guardrails import (
DEFAULT_TEXT_EXTENSIONS,
TextFileGuardrail,
)

__all__ = [
# Engine
Expand All @@ -56,6 +61,7 @@
"TriggerEvent",
"RunResult",
"RunStatus",
"EngineNotStartedError",
"default_engine",
"trigger",
# Components
Expand All @@ -67,4 +73,7 @@
# Variables
"GeneratorVariable",
"VariableRegistry",
# Guardrails
"DEFAULT_TEXT_EXTENSIONS",
"TextFileGuardrail",
]
Binary file modified src/automation/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file modified src/automation/__pycache__/components.cpython-312.pyc
Binary file not shown.
Binary file modified src/automation/__pycache__/engine.cpython-312.pyc
Binary file not shown.
Binary file modified src/automation/__pycache__/variables.cpython-312.pyc
Binary file not shown.
60 changes: 60 additions & 0 deletions src/automation/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class RunStatus(Enum):
SKIPPED = "skipped"


class EngineNotStartedError(RuntimeError):
"""Raised when :meth:`AutomationEngine.trigger` is called before :meth:`start`."""
Comment on lines +59 to +60


@dataclass
class TriggerEvent:
"""Represents an event that can trigger an automation run."""
Expand Down Expand Up @@ -171,11 +175,54 @@ def __init__(self) -> None:

self._automations: Dict[str, AutomationDefinition] = {}
self._history: List[RunResult] = []
self._running: bool = False

# Middleware hooks: callables invoked before/after each run
self._before_run: List[Callable[[RunResult, TriggerEvent], None]] = []
self._after_run: List[Callable[[RunResult], None]] = []

# ------------------------------------------------------------------
# Lifecycle: start / stop
# ------------------------------------------------------------------

def start(self) -> "AutomationEngine":
"""
Start the engine, allowing automation runs.

Must be called before :meth:`trigger` / :meth:`trigger_type`.
Calling :meth:`start` on an already-running engine is a no-op.
Returns *self* for chaining.
"""
if not self._running:
self._running = True
logger.info("AutomationEngine started")
return self
Comment on lines +196 to +199

def stop(self) -> "AutomationEngine":
"""
Stop the engine and tear down all registered components.

After :meth:`stop` returns, :meth:`trigger` / :meth:`trigger_type`
will raise :class:`EngineNotStartedError` until :meth:`start` is
called again. All component teardown hooks are invoked; any
Comment on lines +203 to +207
exceptions they raise are logged and suppressed.
Returns *self* for chaining.
"""
if self._running:
self._running = False
for name in list(self.components.names()):
try:
self.components.deregister(name)
except Exception:
logger.exception("teardown of component %r raised an exception", name)
logger.info("AutomationEngine stopped")
return self

@property
def is_running(self) -> bool:
"""``True`` while the engine is between :meth:`start` and :meth:`stop`."""
return self._running

# ------------------------------------------------------------------
# Component & variable shortcuts
# ------------------------------------------------------------------
Expand Down Expand Up @@ -247,9 +294,19 @@ def trigger(self, event: TriggerEvent) -> List[RunResult]:
"""
Dispatch *event* to all matching, enabled automations.

Raises:
EngineNotStartedError: if the engine has not been started via
:meth:`start`.

Returns the list of :class:`RunResult` objects produced (one per
matching automation).
"""
if not self._running:
raise EngineNotStartedError(
"AutomationEngine must be started before triggering events. "
"Call engine.start() first."
)

results: List[RunResult] = []

for automation in self._automations.values():
Expand Down Expand Up @@ -391,7 +448,10 @@ def __repr__(self) -> str:
# ---------------------------------------------------------------------------

#: Default engine instance – use this for simple single-engine setups.
#: Pre-started for convenience; create a fresh :class:`AutomationEngine` and
#: call :meth:`~AutomationEngine.start` explicitly for production use.
default_engine = AutomationEngine()
default_engine.start()


def trigger(event_type: str, payload: Any = None, **metadata: Any) -> List[RunResult]:
Expand Down
132 changes: 132 additions & 0 deletions src/automation/guardrails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Built-in guardrail components for the serverless automation engine.

Guardrails are :class:`~automation.components.Component` subclasses that act as
safety gates in a pipeline. They validate their input and return an error
output (without raising) when the input violates a policy, letting the engine
record the failure and invoke after-run hooks as normal.

Included guardrails
-------------------
* :class:`TextFileGuardrail` – validates payloads that reference text-based
file paths (any extension in :data:`DEFAULT_TEXT_EXTENSIONS`, including
``.txt``, ``.log``, ``.csv``, etc.), blocking path-traversal attacks, null
bytes, absolute paths, and disallowed file extensions.
"""

from __future__ import annotations

import os
from typing import FrozenSet

from .components import Component, ComponentInput, ComponentOutput

# ---------------------------------------------------------------------------
# Allowed text-file extensions
# ---------------------------------------------------------------------------

#: Extensions that are treated as significant text files and subjected to
#: guardrail checks when a payload string ends with one of these suffixes.
#: The set is intentionally conservative; extend via subclassing or by
#: passing *allowed_extensions* to :class:`TextFileGuardrail`.
DEFAULT_TEXT_EXTENSIONS: FrozenSet[str] = frozenset(
{".txt", ".text", ".log", ".csv", ".tsv", ".md", ".rst", ".ini", ".cfg"}
)


# ---------------------------------------------------------------------------
# TextFileGuardrail
# ---------------------------------------------------------------------------

class TextFileGuardrail(Component):
"""
Safety guardrail for text-file path payloads.

When this component is included as a step in an automation pipeline it
inspects the current ``payload`` (expected to be a file path string) and
rejects it if any of the following conditions are true:

* The payload is not a string.
* The file extension is not in *allowed_extensions*.
* The path contains ``..`` (directory traversal).
* The path contains a null byte.
* The path is an absolute path (configurable via *allow_absolute*).

On success the payload is passed through unchanged so that subsequent
pipeline steps can continue processing it.

Args:
allowed_extensions: Set of lowercase file extensions (including the
leading dot) that are permitted. Defaults to
:data:`DEFAULT_TEXT_EXTENSIONS`.
allow_absolute: When ``False`` (default) absolute paths are rejected.

Example::

engine = AutomationEngine()
engine.start()
engine.register_fn("read_file", lambda inp: open(inp.payload).read())
engine.components.register("guard", TextFileGuardrail())

engine.define(AutomationDefinition(
name="safe_read",
triggers=["file.read"],
steps=["guard", "read_file"],
))

results = engine.trigger_type("file.read", payload="notes.txt")
"""

name = "text_file_guardrail"
description = (
"Validates text-file path payloads: rejects path traversal, null bytes, "
"absolute paths, and disallowed extensions."
)

def __init__(
self,
allowed_extensions: FrozenSet[str] = DEFAULT_TEXT_EXTENSIONS,
allow_absolute: bool = False,
) -> None:
self._allowed_extensions = frozenset(ext.lower() for ext in allowed_extensions)
self._allow_absolute = allow_absolute

def validate(self, input_: ComponentInput) -> tuple[bool, str]:
payload = input_.payload

if not isinstance(payload, str):
return False, (
f"TextFileGuardrail: payload must be a string file path, "
f"got {type(payload).__name__!r}"
)

if "\x00" in payload:
return False, "TextFileGuardrail: payload contains a null byte"

if not self._allow_absolute and os.path.isabs(payload):
return False, (
"TextFileGuardrail: absolute paths are not permitted; "
f"got {payload!r}"
)

# Normalise separators then check for traversal components
normalised = payload.replace("\\", "/")
parts = normalised.split("/")
if ".." in parts:
return False, (
"TextFileGuardrail: path traversal ('..') detected in "
f"{payload!r}"
)

_, ext = os.path.splitext(payload)
if ext.lower() not in self._allowed_extensions:
return False, (
f"TextFileGuardrail: file extension {ext!r} is not in the "
f"allowed set {sorted(self._allowed_extensions)}"
)

return True, ""

def execute(self, input_: ComponentInput) -> ComponentOutput:
"""Pass the validated payload through unchanged."""
return self._ok(input_.payload)
Loading
Loading