diff --git a/extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md b/extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md
new file mode 100644
index 0000000000..a7e91da74c
--- /dev/null
+++ b/extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md
@@ -0,0 +1,86 @@
+# 0004 Python Checklist
+
+## Baseline
+
+- [x] Plugin work lives under `extended/plugins/send-message-python/`.
+- [x] The subtree stands alone.
+- [x] No committed Slack credential appears anywhere.
+- [x] The plugin owns Kubernetes reads and Slack calls.
+
+## Phase 0: pin the contract
+
+- [x] Re-read `proposal.md`.
+- [x] Re-read `spec.md`.
+- [x] Re-read the `send-message` slice in proposal `0002`.
+- [x] Keep scope to Slack only.
+
+## Phase 1: create the tiny repo
+
+- [x] Create `extended/plugins/send-message-python/`.
+- [x] Add runtime source.
+- [x] Add image build.
+- [x] Add `plugin.yaml`.
+- [x] Add CRD manifests.
+- [x] Add RBAC manifests.
+- [x] Add smoke assets.
+
+## Phase 2: implement the runtime
+
+- [x] Implement `POST /api/v1/step.execute`.
+- [x] Enforce bearer auth from `/var/run/kargo/token`.
+- [x] Read `MessageChannel`.
+- [x] Read `ClusterMessageChannel`.
+- [x] Read referenced `Secret`.
+- [x] Send plaintext Slack payloads.
+- [x] Send encoded Slack payloads.
+- [x] Return `slack.threadTS`.
+
+## Phase 3: test the contract
+
+- [x] Add auth tests.
+- [x] Add channel lookup tests.
+- [x] Add Secret lookup tests.
+- [x] Add plaintext payload tests.
+- [x] Add encoded payload tests.
+- [x] Add XML decode tests.
+- [x] Add Slack failure tests.
+
+## Phase 4: smoke
+
+- [x] Add plugin-owned `smoke/smoke_test.py`.
+- [x] Keep smoke orchestration in Python, not shell.
+- [x] Build the image.
+- [x] Load it into kind.
+- [x] Install CRDs and RBAC.
+- [x] Install StepPlugin `ConfigMap`.
+- [x] Create local-only test Secret.
+- [x] Create test `MessageChannel`.
+- [x] Run a `Stage` with `uses: send-message`.
+- [x] Assert `Succeeded`.
+- [x] Assert non-empty `slack.threadTS`.
+
+## Phase 5: mandatory radical simplification pass 1
+
+- [x] Ask "can I make this look easier by deleting a dependency?"
+- [x] Ask "can I merge files without making the contract harder to read?"
+- [x] Ask "am I using a framework just because it is familiar?"
+- [x] Delete anything that fails those checks.
+
+## Phase 6: mandatory radical simplification pass 2
+
+- [x] Re-run tests and smoke from a green tree.
+- [x] Ask "can I make this radically simpler?"
+- [x] Remove abstractions that only serve style.
+- [x] Remove helpers that only save a few lines.
+- [x] Stop only when the repo still reads like a small third-party plugin.
+
+Refactor pass notes:
+- Split the runtime into small package files for:
+ - app flow
+ - HTTP entrypoint
+ - Kubernetes and Slack clients
+ - payload decoding and shaping
+- Split smoke support out of the entrypoint into `smoke/lib.py`.
+- Re-ran unit tests after refactor.
+- Re-ran `py_compile` across all Python files after refactor.
+- Re-ran isolated kind smoke through `extended/tests/e2e_stepplugins.sh`.
diff --git a/extended/docs/proposals/0004-send-message-step-plugin/plan.python.md b/extended/docs/proposals/0004-send-message-step-plugin/plan.python.md
new file mode 100644
index 0000000000..ef7d77c44e
--- /dev/null
+++ b/extended/docs/proposals/0004-send-message-step-plugin/plan.python.md
@@ -0,0 +1,119 @@
+# 0004 Python Plan
+
+## Goal
+
+- Show the same `send-message` plugin contract in a form that looks easy to
+ write and easy to own.
+- Keep all plugin-owned code under `extended/plugins/send-message-python/`.
+- Keep the subtree standalone enough to behave like its own Git repo.
+- Match the same Slack-only contract as `spec.md`.
+
+## Design Rule
+
+- Prefer directness over framework taste.
+- Prefer a tiny dependency set over "proper" stacks.
+- Prefer raw Kubernetes API reads over a large client library when that keeps
+ the repo smaller and clearer.
+- Prefer the first-party Slack Python SDK only if it reduces code materially.
+- If a dependency does not make the plugin look simpler to a third party,
+ delete it.
+
+## Runtime Shape
+
+- Runtime:
+ - Python
+- Suggested layout:
+ - `extended/plugins/send-message-python/`
+ - `server.py`
+ - `smoke/smoke_test.py`
+ - `requirements.txt`
+ - `Dockerfile`
+ - `plugin.yaml`
+ - `manifests/`
+ - `smoke/`
+- Suggested server shape:
+ - one small HTTP server
+ - one request parser
+ - one Kubernetes reader
+ - one Slack sender
+
+## Minimal Dependency Target
+
+- Acceptable:
+ - `slack_sdk`
+ - `PyYAML`
+- Strong preference:
+ - stdlib HTTP server
+ - stdlib `json`
+ - stdlib `xml.etree.ElementTree`
+ - direct HTTPS to Kubernetes
+- Avoid:
+ - large web frameworks
+ - large Kubernetes client stacks
+ - background workers
+ - async machinery unless it removes code
+
+## Behavior
+
+- Implement `POST /api/v1/step.execute`.
+- Enforce bearer auth from `/var/run/kargo/token`.
+- Read `MessageChannel` and `ClusterMessageChannel` directly from Kubernetes.
+- Read referenced `Secret` directly from Kubernetes.
+- Send Slack messages from the plugin.
+- Support:
+ - plaintext
+ - `json`
+ - `yaml`
+ - `xml`
+- Match the response contract in `spec.md`.
+
+## Tests
+
+- Keep tests inside the subtree.
+- Prefer a small unit-test suite over a heavy harness.
+- Cover:
+ - auth
+ - channel lookup
+ - Secret lookup
+ - plaintext payload shaping
+ - encoded payload shaping
+ - XML decode shape
+ - Slack error handling
+
+## Smoke
+
+- Own `smoke/smoke_test.py` inside the subtree.
+- Assume Kargo already exists.
+- Use Python for the smoke orchestration too.
+- Build image, install manifests, create Secret and channel, run a Stage,
+ assert `Succeeded`, assert non-empty `slack.threadTS`.
+
+## Mandatory Simplify Passes
+
+- Simplify pass 1, before full smoke:
+ - ask "can I delete a dependency and still keep this clearer?"
+ - ask "can I collapse this into fewer files without hiding the contract?"
+- Simplify pass 2, after green:
+ - ask "can I make this radically simpler?"
+ - remove any helper, abstraction, or library that does not make the plugin
+ look easier to write
+
+## Current Implementation Notes
+
+- Runtime now lives in a small package:
+ - `extended/plugins/send-message-python/send_message_plugin/`
+- Smoke orchestration lives in:
+ - `extended/plugins/send-message-python/smoke/smoke_test.py`
+ - `extended/plugins/send-message-python/smoke/lib.py`
+- Keep the dependency set to:
+ - stdlib
+ - `PyYAML`
+- Do not add:
+ - Slack SDK
+ - Kubernetes Python client
+ - web framework
+- Local validation currently proves:
+ - unit tests pass
+ - Python sources compile
+ - Docker image builds
+ - isolated kind smoke passes through `extended/tests/e2e_stepplugins.sh`
diff --git a/extended/plugins/send-message-python/Dockerfile b/extended/plugins/send-message-python/Dockerfile
new file mode 100644
index 0000000000..5ea5562800
--- /dev/null
+++ b/extended/plugins/send-message-python/Dockerfile
@@ -0,0 +1,10 @@
+FROM python:3.12-alpine
+
+WORKDIR /app
+
+COPY requirements.txt ./
+RUN pip install --no-cache-dir -r requirements.txt
+
+COPY . .
+
+ENTRYPOINT ["python", "-m", "send_message_plugin"]
diff --git a/extended/plugins/send-message-python/manifests/crds.yaml b/extended/plugins/send-message-python/manifests/crds.yaml
new file mode 100644
index 0000000000..008f9931f8
--- /dev/null
+++ b/extended/plugins/send-message-python/manifests/crds.yaml
@@ -0,0 +1,81 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: messagechannels.ee.kargo.akuity.io
+spec:
+ group: ee.kargo.akuity.io
+ names:
+ kind: MessageChannel
+ plural: messagechannels
+ singular: messagechannel
+ listKind: MessageChannelList
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ properties:
+ secretRef:
+ type: object
+ properties:
+ name:
+ type: string
+ required:
+ - name
+ slack:
+ type: object
+ properties:
+ channelID:
+ type: string
+ required:
+ - channelID
+ required:
+ - secretRef
+ - slack
+---
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: clustermessagechannels.ee.kargo.akuity.io
+spec:
+ group: ee.kargo.akuity.io
+ names:
+ kind: ClusterMessageChannel
+ plural: clustermessagechannels
+ singular: clustermessagechannel
+ listKind: ClusterMessageChannelList
+ scope: Cluster
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ properties:
+ secretRef:
+ type: object
+ properties:
+ name:
+ type: string
+ required:
+ - name
+ slack:
+ type: object
+ properties:
+ channelID:
+ type: string
+ required:
+ - channelID
+ required:
+ - secretRef
+ - slack
diff --git a/extended/plugins/send-message-python/manifests/rbac.yaml b/extended/plugins/send-message-python/manifests/rbac.yaml
new file mode 100644
index 0000000000..df10af6b7f
--- /dev/null
+++ b/extended/plugins/send-message-python/manifests/rbac.yaml
@@ -0,0 +1,22 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: send-message-step-plugin-reader
+rules:
+- apiGroups:
+ - ee.kargo.akuity.io
+ resources:
+ - messagechannels
+ - clustermessagechannels
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - ""
+ resources:
+ - secrets
+ verbs:
+ - get
+ - list
+ - watch
diff --git a/extended/plugins/send-message-python/plugin.yaml b/extended/plugins/send-message-python/plugin.yaml
new file mode 100644
index 0000000000..e2e26546e6
--- /dev/null
+++ b/extended/plugins/send-message-python/plugin.yaml
@@ -0,0 +1,34 @@
+apiVersion: kargo-extended.code.org/v1alpha1
+kind: StepPlugin
+metadata:
+ name: send-message
+ namespace: kargo-system-resources
+spec:
+ sidecar:
+ automountServiceAccountToken: true
+ container:
+ name: send-message-step-plugin
+ image: send-message-step-plugin-python:dev
+ imagePullPolicy: IfNotPresent
+ env:
+ - name: SYSTEM_RESOURCES_NAMESPACE
+ value: kargo-system-resources
+ ports:
+ - containerPort: 9765
+ securityContext:
+ runAsNonRoot: true
+ runAsUser: 65532
+ allowPrivilegeEscalation: false
+ readOnlyRootFilesystem: true
+ capabilities:
+ drop:
+ - ALL
+ resources:
+ requests:
+ cpu: 50m
+ memory: 64Mi
+ limits:
+ cpu: 250m
+ memory: 128Mi
+ steps:
+ - kind: send-message
diff --git a/extended/plugins/send-message-python/requirements.txt b/extended/plugins/send-message-python/requirements.txt
new file mode 100644
index 0000000000..f62ce0c56d
--- /dev/null
+++ b/extended/plugins/send-message-python/requirements.txt
@@ -0,0 +1 @@
+PyYAML==6.0.3
diff --git a/extended/plugins/send-message-python/send_message_plugin/__init__.py b/extended/plugins/send-message-python/send_message_plugin/__init__.py
new file mode 100644
index 0000000000..7665e3cb95
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/__init__.py
@@ -0,0 +1,28 @@
+from .service import PluginServer
+from .http import serve
+from .models import (
+ AUTH_HEADER,
+ AUTH_TOKEN_PATH,
+ BEARER_PREFIX,
+ ChannelResource,
+ KubernetesClient,
+ RequestError,
+ SlackClient,
+ STEP_EXECUTE_PATH,
+)
+from .payloads import build_slack_payload, decode_xml_slack_payload
+
+__all__ = [
+ "AUTH_HEADER",
+ "AUTH_TOKEN_PATH",
+ "BEARER_PREFIX",
+ "ChannelResource",
+ "KubernetesClient",
+ "PluginServer",
+ "RequestError",
+ "STEP_EXECUTE_PATH",
+ "SlackClient",
+ "build_slack_payload",
+ "decode_xml_slack_payload",
+ "serve",
+]
diff --git a/extended/plugins/send-message-python/send_message_plugin/__main__.py b/extended/plugins/send-message-python/send_message_plugin/__main__.py
new file mode 100644
index 0000000000..50df33a385
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/__main__.py
@@ -0,0 +1,9 @@
+from .http import serve
+
+
+def main() -> None:
+ serve()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/extended/plugins/send-message-python/send_message_plugin/http.py b/extended/plugins/send-message-python/send_message_plugin/http.py
new file mode 100644
index 0000000000..c360fa55b9
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/http.py
@@ -0,0 +1,55 @@
+from __future__ import annotations
+
+import json
+import os
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+from typing import Any
+
+from .service import PluginServer
+
+
+class RequestHandler(BaseHTTPRequestHandler):
+ plugin_server: PluginServer
+
+ def do_POST(self) -> None: # noqa: N802
+ self._handle()
+
+ def do_GET(self) -> None: # noqa: N802
+ self._handle()
+
+ def log_message(self, format: str, *args: Any) -> None:
+ return
+
+ def _handle(self) -> None:
+ content_length = int(self.headers.get("Content-Length", "0"))
+ body = self.rfile.read(content_length) if content_length else b""
+ status_code, response = self.plugin_server.handle(
+ self.command,
+ self.path,
+ {key: value for key, value in self.headers.items()},
+ body,
+ )
+ self.send_response(status_code)
+ if response is None:
+ self.end_headers()
+ return
+ data = json.dumps(response).encode("utf-8")
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Length", str(len(data)))
+ self.end_headers()
+ self.wfile.write(data)
+
+
+def make_handler(plugin_server: PluginServer) -> type[RequestHandler]:
+ class Handler(RequestHandler):
+ pass
+
+ Handler.plugin_server = plugin_server
+ return Handler
+
+
+def serve() -> None:
+ port = int(os.environ.get("PORT", "9765"))
+ address = os.environ.get("HOST", "0.0.0.0")
+ server = ThreadingHTTPServer((address, port), make_handler(PluginServer()))
+ server.serve_forever()
diff --git a/extended/plugins/send-message-python/send_message_plugin/kubernetes.py b/extended/plugins/send-message-python/send_message_plugin/kubernetes.py
new file mode 100644
index 0000000000..1689221e51
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/kubernetes.py
@@ -0,0 +1,103 @@
+from __future__ import annotations
+
+import base64
+import json
+import os
+import ssl
+import urllib.error
+import urllib.parse
+import urllib.request
+from http import HTTPStatus
+from typing import Any
+
+from .models import (
+ AUTH_HEADER,
+ BEARER_PREFIX,
+ DEFAULT_KUBERNETES_URL,
+ ChannelResource,
+ SERVICE_ACCOUNT_CA,
+ SERVICE_ACCOUNT_TOKEN,
+)
+from .util import as_dict, deep_get, string_value
+
+
+class InClusterKubernetesClient:
+ def __init__(self) -> None:
+ self.token = SERVICE_ACCOUNT_TOKEN.read_text(encoding="utf-8").strip()
+ self.base_url = kubernetes_base_url()
+ self.ssl_context = ssl.create_default_context(cafile=str(SERVICE_ACCOUNT_CA))
+
+ def get_message_channel(self, namespace: str, name: str) -> ChannelResource:
+ response = self._get_json(
+ "/apis/ee.kargo.akuity.io/v1alpha1/namespaces/"
+ f"{quote(namespace)}/messagechannels/{quote(name)}"
+ )
+ return ChannelResource(
+ secret_name=string_value(deep_get(response, "spec", "secretRef", "name")),
+ slack_channel_id=string_value(deep_get(response, "spec", "slack", "channelID")),
+ resource_kind="MessageChannel",
+ resource_name=string_value(deep_get(response, "metadata", "name")),
+ resource_namespace=string_value(deep_get(response, "metadata", "namespace")),
+ )
+
+ def get_cluster_message_channel(self, name: str) -> ChannelResource:
+ response = self._get_json(
+ "/apis/ee.kargo.akuity.io/v1alpha1/clustermessagechannels/"
+ f"{quote(name)}"
+ )
+ return ChannelResource(
+ secret_name=string_value(deep_get(response, "spec", "secretRef", "name")),
+ slack_channel_id=string_value(deep_get(response, "spec", "slack", "channelID")),
+ resource_kind="ClusterMessageChannel",
+ resource_name=string_value(deep_get(response, "metadata", "name")),
+ )
+
+ def get_secret(self, namespace: str, name: str) -> dict[str, str]:
+ response = self._get_json(
+ f"/api/v1/namespaces/{quote(namespace)}/secrets/{quote(name)}"
+ )
+ decoded: dict[str, str] = {}
+ for key, value in as_dict(response.get("data")).items():
+ try:
+ raw = base64.b64decode(string_value(value))
+ except Exception as exc:
+ raise RuntimeError(f'error decoding secret key "{key}": {exc}') from exc
+ decoded[str(key)] = raw.decode("utf-8")
+ return decoded
+
+ def _get_json(self, path: str) -> dict[str, Any]:
+ request = urllib.request.Request(
+ urllib.parse.urljoin(self.base_url.rstrip("/") + "/", path.lstrip("/")),
+ method="GET",
+ headers={AUTH_HEADER: f"{BEARER_PREFIX}{self.token}"},
+ )
+ try:
+ with urllib.request.urlopen(
+ request,
+ context=self.ssl_context,
+ timeout=30,
+ ) as response:
+ return json.loads(response.read().decode("utf-8"))
+ except urllib.error.HTTPError as exc:
+ body = exc.read().decode("utf-8", errors="replace").strip()
+ if exc.code == HTTPStatus.NOT_FOUND:
+ raise RuntimeError(f"resource not found at {path}") from exc
+ raise RuntimeError(
+ f"Kubernetes API error {exc.code} {exc.reason}: {body}"
+ ) from exc
+
+
+def kubernetes_base_url() -> str:
+ host = os.environ.get("KUBERNETES_SERVICE_HOST", "").strip()
+ if not host:
+ return DEFAULT_KUBERNETES_URL
+ port = (
+ os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", "").strip()
+ or os.environ.get("KUBERNETES_SERVICE_PORT", "").strip()
+ or "443"
+ )
+ return f"https://{host}:{port}"
+
+
+def quote(value: str) -> str:
+ return urllib.parse.quote(value, safe="")
diff --git a/extended/plugins/send-message-python/send_message_plugin/models.py b/extended/plugins/send-message-python/send_message_plugin/models.py
new file mode 100644
index 0000000000..d6d8b8270d
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/models.py
@@ -0,0 +1,56 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Protocol
+
+STEP_EXECUTE_PATH = "/api/v1/step.execute"
+AUTH_HEADER = "Authorization"
+BEARER_PREFIX = "Bearer "
+
+AUTH_TOKEN_PATH = Path("/var/run/kargo/token")
+SERVICE_ACCOUNT_DIR = Path("/var/run/secrets/kubernetes.io/serviceaccount")
+SERVICE_ACCOUNT_TOKEN = SERVICE_ACCOUNT_DIR / "token"
+SERVICE_ACCOUNT_CA = SERVICE_ACCOUNT_DIR / "ca.crt"
+
+DEFAULT_SYSTEM_NAMESPACE = "kargo-system-resources"
+DEFAULT_KUBERNETES_URL = "https://kubernetes.default.svc"
+DEFAULT_SLACK_API_BASE_URL = "https://slack.com/api"
+
+
+class RequestError(RuntimeError):
+ pass
+
+
+@dataclass
+class ChannelResource:
+ secret_name: str
+ slack_channel_id: str
+ resource_kind: str
+ resource_name: str
+ resource_namespace: str = ""
+
+
+class KubernetesClient(Protocol):
+ def get_message_channel(
+ self,
+ namespace: str,
+ name: str,
+ ) -> ChannelResource:
+ ...
+
+ def get_cluster_message_channel(self, name: str) -> ChannelResource:
+ ...
+
+ def get_secret(self, namespace: str, name: str) -> dict[str, str]:
+ ...
+
+
+class SlackClient(Protocol):
+ def post_message(
+ self,
+ api_base_url: str,
+ token: str,
+ payload: dict[str, Any],
+ ) -> dict[str, Any]:
+ ...
diff --git a/extended/plugins/send-message-python/send_message_plugin/payloads.py b/extended/plugins/send-message-python/send_message_plugin/payloads.py
new file mode 100644
index 0000000000..8bb132adb6
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/payloads.py
@@ -0,0 +1,111 @@
+from __future__ import annotations
+
+import json
+import xml.etree.ElementTree as ET
+from typing import Any
+
+import yaml
+
+from .models import ChannelResource, RequestError
+from .util import as_dict, string_value
+
+
+def build_slack_payload(
+ config: dict[str, Any],
+ channel: ChannelResource,
+) -> tuple[dict[str, Any], str]:
+ encoding_type = string_value(config.get("encodingType"))
+ slack_config = as_dict(config.get("slack"))
+
+ if not encoding_type:
+ channel_id = string_value(slack_config.get("channelID")) or channel.slack_channel_id
+ if not channel_id:
+ raise RequestError(
+ f'{channel.resource_kind} "{channel.resource_name}" does not define '
+ "spec.slack.channelID and config.slack.channelID is empty"
+ )
+ payload: dict[str, Any] = {
+ "channel": channel_id,
+ "text": string_value(config.get("message")),
+ }
+ thread_ts = string_value(slack_config.get("threadTS"))
+ if thread_ts:
+ payload["thread_ts"] = thread_ts
+ return payload, thread_ts
+
+ payload = decode_encoded_payload(encoding_type, string_value(config.get("message")))
+ if "channel" not in payload:
+ if not channel.slack_channel_id:
+ raise RequestError(
+ f'{channel.resource_kind} "{channel.resource_name}" does not define '
+ "spec.slack.channelID"
+ )
+ payload["channel"] = channel.slack_channel_id
+ return payload, string_value(payload.get("thread_ts"))
+
+
+def decode_encoded_payload(
+ encoding_type: str,
+ message: str,
+) -> dict[str, Any]:
+ if encoding_type == "json":
+ try:
+ payload = json.loads(message)
+ except json.JSONDecodeError as exc:
+ raise RequestError(f"error decoding JSON Slack payload: {exc}") from exc
+ elif encoding_type == "yaml":
+ try:
+ payload = yaml.safe_load(message)
+ except yaml.YAMLError as exc:
+ raise RequestError(f"error decoding YAML Slack payload: {exc}") from exc
+ elif encoding_type == "xml":
+ payload = decode_xml_slack_payload(message)
+ else:
+ raise RequestError(f'unsupported encodingType "{encoding_type}"')
+
+ if not isinstance(payload, dict):
+ raise RequestError("Slack payload must decode to an object")
+ return payload
+
+
+def decode_xml_slack_payload(message: str) -> dict[str, Any]:
+ try:
+ root = ET.fromstring(message)
+ except ET.ParseError as exc:
+ raise RequestError(f"error decoding XML Slack payload: {exc}") from exc
+ payload = xml_node_to_object(root)
+ if not isinstance(payload, dict):
+ raise RequestError("Slack payload must decode to an object")
+ return payload
+
+
+def xml_node_to_object(node: ET.Element) -> dict[str, Any]:
+ payload: dict[str, Any] = dict(node.attrib)
+ for child in list(node):
+ append_xml_value(payload, child.tag, xml_node_value(child))
+
+ text = normalized_text(node.text)
+ if text:
+ payload["#text" if payload else "text"] = text
+ return payload
+
+
+def xml_node_value(node: ET.Element) -> Any:
+ if not node.attrib and not list(node):
+ return normalized_text(node.text)
+ return xml_node_to_object(node)
+
+
+def append_xml_value(payload: dict[str, Any], key: str, value: Any) -> None:
+ if key not in payload:
+ payload[key] = value
+ return
+ existing = payload[key]
+ if isinstance(existing, list):
+ existing.append(value)
+ return
+ payload[key] = [existing, value]
+
+
+def normalized_text(value: str | None) -> str:
+ return (value or "").strip()
diff --git a/extended/plugins/send-message-python/send_message_plugin/service.py b/extended/plugins/send-message-python/send_message_plugin/service.py
new file mode 100644
index 0000000000..3967d14e72
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/service.py
@@ -0,0 +1,172 @@
+from __future__ import annotations
+
+import json
+import os
+from pathlib import Path
+from typing import Any
+
+from .kubernetes import InClusterKubernetesClient
+from .models import (
+ AUTH_HEADER,
+ AUTH_TOKEN_PATH,
+ BEARER_PREFIX,
+ DEFAULT_SLACK_API_BASE_URL,
+ DEFAULT_SYSTEM_NAMESPACE,
+ ChannelResource,
+ KubernetesClient,
+ RequestError,
+ SlackClient,
+)
+from .payloads import build_slack_payload
+from .slack import RealSlackClient
+from .util import as_dict, string_value
+
+
+class PluginServer:
+ def __init__(
+ self,
+ *,
+ expected_token_path: Path | None = None,
+ system_resources_namespace: str | None = None,
+ slack_api_base_url: str | None = None,
+ kube_client: KubernetesClient | None = None,
+ slack_client: SlackClient | None = None,
+ ) -> None:
+ self.expected_token_path = expected_token_path or AUTH_TOKEN_PATH
+ self.system_resources_namespace = (
+ system_resources_namespace
+ or os.environ.get("SYSTEM_RESOURCES_NAMESPACE", "").strip()
+ or DEFAULT_SYSTEM_NAMESPACE
+ )
+ self.slack_api_base_url = (
+ slack_api_base_url
+ or os.environ.get("SLACK_API_BASE_URL", "").strip()
+ or DEFAULT_SLACK_API_BASE_URL
+ )
+ self.kube_client = kube_client or InClusterKubernetesClient()
+ self.slack_client = slack_client or RealSlackClient()
+
+ def handle(
+ self,
+ method: str,
+ path: str,
+ headers: dict[str, str],
+ body: bytes,
+ ) -> tuple[int, dict[str, Any] | None]:
+ from http import HTTPStatus
+ from .models import STEP_EXECUTE_PATH
+
+ if path != STEP_EXECUTE_PATH:
+ return HTTPStatus.NOT_FOUND, None
+ if method != "POST":
+ return HTTPStatus.METHOD_NOT_ALLOWED, None
+
+ try:
+ self.authorize(headers.get(AUTH_HEADER, ""))
+ except Exception as exc: # pragma: no cover
+ return HTTPStatus.FORBIDDEN, errored_response(str(exc))
+
+ try:
+ request = json.loads(body.decode("utf-8"))
+ except Exception as exc:
+ return HTTPStatus.BAD_REQUEST, {
+ "status": "Errored",
+ "message": "invalid request body",
+ "error": str(exc),
+ "terminal": True,
+ }
+
+ return HTTPStatus.OK, self.execute(request)
+
+ def execute(self, request: dict[str, Any]) -> dict[str, Any]:
+ step = as_dict(request.get("step"))
+ step_kind = string_value(step.get("kind"))
+ if step_kind != "send-message":
+ return errored_response(f'unsupported step kind "{step_kind}"')
+
+ try:
+ config = as_dict(step.get("config"))
+ channel, secret_namespace = self.lookup_channel(request, config)
+ token = self.read_slack_token(secret_namespace, channel.secret_name)
+ payload, output_thread_ts = build_slack_payload(config, channel)
+ slack_response = self.slack_client.post_message(
+ self.slack_api_base_url,
+ token,
+ payload,
+ )
+ if not slack_response.get("ok"):
+ raise RuntimeError(f'Slack API error: {slack_response.get("error", "")}')
+ if not output_thread_ts:
+ output_thread_ts = string_value(slack_response.get("ts"))
+ return {
+ "status": "Succeeded",
+ "output": {
+ "slack": {
+ "threadTS": output_thread_ts,
+ },
+ },
+ }
+ except RequestError as exc:
+ return errored_response(str(exc))
+ except Exception as exc:
+ return failed_response(str(exc))
+
+ def lookup_channel(
+ self,
+ request: dict[str, Any],
+ config: dict[str, Any],
+ ) -> tuple[ChannelResource, str]:
+ channel_ref = as_dict(config.get("channel"))
+ channel_kind = string_value(channel_ref.get("kind"))
+ channel_name = string_value(channel_ref.get("name"))
+ project = string_value(as_dict(request.get("context")).get("project"))
+
+ if channel_kind == "MessageChannel":
+ if not project:
+ raise RequestError("step context project is required for MessageChannel")
+ return self.kube_client.get_message_channel(project, channel_name), project
+
+ if channel_kind == "ClusterMessageChannel":
+ return (
+ self.kube_client.get_cluster_message_channel(channel_name),
+ self.system_resources_namespace,
+ )
+
+ raise RequestError(f'unsupported channel kind "{channel_kind}"')
+
+ def read_slack_token(self, namespace: str, secret_name: str) -> str:
+ try:
+ secret = self.kube_client.get_secret(namespace, secret_name)
+ except Exception as exc:
+ raise RuntimeError(f"error getting Slack Secret: {exc}") from exc
+ token = string_value(secret.get("apiKey"))
+ if not token:
+ raise RuntimeError('Slack Secret is missing key "apiKey"')
+ return token
+
+ def authorize(self, header_value: str) -> None:
+ expected = self.expected_token_path.read_text(encoding="utf-8").strip()
+ value = header_value.strip()
+ if not value.startswith(BEARER_PREFIX):
+ raise RuntimeError("missing bearer token")
+ received = value[len(BEARER_PREFIX) :].strip()
+ if received != expected:
+ raise RuntimeError("invalid bearer token")
+
+
+def failed_response(message: str) -> dict[str, Any]:
+ return {
+ "status": "Failed",
+ "message": message,
+ "error": message,
+ "terminal": True,
+ }
+
+
+def errored_response(message: str) -> dict[str, Any]:
+ return {
+ "status": "Errored",
+ "message": message,
+ "error": message,
+ "terminal": True,
+ }
diff --git a/extended/plugins/send-message-python/send_message_plugin/slack.py b/extended/plugins/send-message-python/send_message_plugin/slack.py
new file mode 100644
index 0000000000..2cd3439ea1
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/slack.py
@@ -0,0 +1,45 @@
+from __future__ import annotations
+
+import json
+import urllib.error
+import urllib.parse
+import urllib.request
+from http import HTTPStatus
+from typing import Any
+
+from .models import AUTH_HEADER, BEARER_PREFIX
+
+
+class RealSlackClient:
+ def post_message(
+ self,
+ api_base_url: str,
+ token: str,
+ payload: dict[str, Any],
+ ) -> dict[str, Any]:
+ request = urllib.request.Request(
+ urllib.parse.urljoin(api_base_url.rstrip("/") + "/", "chat.postMessage"),
+ method="POST",
+ headers={
+ AUTH_HEADER: f"{BEARER_PREFIX}{token}",
+ "Content-Type": "application/json",
+ },
+ data=json.dumps(payload).encode("utf-8"),
+ )
+ try:
+ with urllib.request.urlopen(request, timeout=30) as response:
+ result = json.loads(response.read().decode("utf-8"))
+ if response.status >= HTTPStatus.BAD_REQUEST and not result.get("error"):
+ result["ok"] = False
+ result["error"] = response.reason
+ return result
+ except urllib.error.HTTPError as exc:
+ body = exc.read().decode("utf-8", errors="replace")
+ try:
+ result = json.loads(body) if body else {}
+ except json.JSONDecodeError:
+ result = {}
+ if "error" not in result:
+ result["error"] = f"{exc.code} {exc.reason}"
+ result["ok"] = False
+ return result
diff --git a/extended/plugins/send-message-python/send_message_plugin/util.py b/extended/plugins/send-message-python/send_message_plugin/util.py
new file mode 100644
index 0000000000..80a9e8aea6
--- /dev/null
+++ b/extended/plugins/send-message-python/send_message_plugin/util.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from typing import Any
+
+
+def as_dict(value: Any) -> dict[str, Any]:
+ return value if isinstance(value, dict) else {}
+
+
+def deep_get(value: Any, *keys: str) -> Any:
+ current = value
+ for key in keys:
+ if not isinstance(current, dict):
+ return None
+ current = current.get(key)
+ return current
+
+
+def string_value(value: Any) -> str:
+ return value if isinstance(value, str) else ""
diff --git a/extended/plugins/send-message-python/smoke/README.md b/extended/plugins/send-message-python/smoke/README.md
new file mode 100644
index 0000000000..b7dc83eaed
--- /dev/null
+++ b/extended/plugins/send-message-python/smoke/README.md
@@ -0,0 +1,20 @@
+# Local Smoke Notes
+
+- Primary smoke entrypoint:
+ - `extended/plugins/send-message-python/smoke/smoke_test.py`
+- The script assumes a working Kargo cluster and CLI are already available.
+- Required env:
+ - `SEND_MESSAGE_SMOKE_PROJECT`
+ - `SEND_MESSAGE_SMOKE_WAREHOUSE`
+ - `SEND_MESSAGE_SMOKE_FREIGHT_NAME`
+ - `SEND_MESSAGE_SMOKE_SLACK_API_KEY`
+ - `SEND_MESSAGE_SMOKE_CHANNEL_ID`
+- Optional env:
+ - `KARGO_BIN`
+ - `KARGO_FLAGS`
+ - `SEND_MESSAGE_SMOKE_SYSTEM_RESOURCES_NAMESPACE`
+ - `KUBECTL_BIN`
+ - `DOCKER_BIN`
+ - `KIND_BIN`
+- No committed file in this subtree contains a real token value or local token
+ source.
diff --git a/extended/plugins/send-message-python/smoke/lib.py b/extended/plugins/send-message-python/smoke/lib.py
new file mode 100644
index 0000000000..645d66e43f
--- /dev/null
+++ b/extended/plugins/send-message-python/smoke/lib.py
@@ -0,0 +1,398 @@
+from __future__ import annotations
+
+import json
+import os
+import shlex
+import subprocess
+import time
+from pathlib import Path
+
+
+class SmokeEnv:
+ def __init__(self) -> None:
+ self.project = os.environ.get("SEND_MESSAGE_SMOKE_PROJECT", "")
+ self.warehouse = os.environ.get("SEND_MESSAGE_SMOKE_WAREHOUSE", "")
+ self.freight_name = os.environ.get("SEND_MESSAGE_SMOKE_FREIGHT_NAME", "")
+ self.slack_api_key = os.environ.get("SEND_MESSAGE_SMOKE_SLACK_API_KEY", "")
+ self.channel_id = os.environ.get("SEND_MESSAGE_SMOKE_CHANNEL_ID", "")
+ self.system_namespace = os.environ.get(
+ "SEND_MESSAGE_SMOKE_SYSTEM_RESOURCES_NAMESPACE",
+ "kargo-system-resources",
+ )
+ self.secret_name = os.environ.get(
+ "SEND_MESSAGE_SMOKE_SECRET_NAME",
+ "send-message-slack-token",
+ )
+ self.channel_name = os.environ.get(
+ "SEND_MESSAGE_SMOKE_CHANNEL_NAME",
+ "send-message-smoke",
+ )
+ self.cluster_role_name = "send-message-step-plugin-reader"
+ self.kargo_bin = os.environ.get("KARGO_BIN", "kargo")
+ self.kubectl_bin = os.environ.get("KUBECTL_BIN", "kubectl")
+ self.docker_bin = os.environ.get("DOCKER_BIN", "docker")
+ self.kind_bin = os.environ.get("KIND_BIN", "kind")
+ self.kargo_flags = shlex.split(os.environ.get("KARGO_FLAGS", ""))
+
+ def require(self, *names: str) -> None:
+ for name in names:
+ if not os.environ.get(name):
+ fail(f"{name} is required")
+
+ def run(self, *args: str, cwd: Path | None = None) -> None:
+ log("INFO", " ".join(args))
+ subprocess.run(args, cwd=str(cwd) if cwd else None, check=True)
+
+ def capture(self, *args: str, cwd: Path | None = None) -> str:
+ completed = subprocess.run(
+ args,
+ cwd=str(cwd) if cwd else None,
+ check=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+ return completed.stdout
+
+
+class SmokeResources:
+ def __init__(self, env: SmokeEnv, plugin_dir: Path, tmp_dir: Path) -> None:
+ self.env = env
+ self.plugin_dir = plugin_dir
+ self.tmp_dir = tmp_dir
+ self.rendered_plugin_dir = tmp_dir / "plugin"
+ self.configmap_path: Path | None = None
+ self.cluster_role_binding_name = f"send-message-step-plugin-reader-{env.project}"
+ self.stage_name = f"smsg-stepplugin-{int(time.time())}"
+ self.promotion_name = f"{self.stage_name}-promotion"
+ self.message_text = (
+ f"send-message PYTHON stepplugin smoke {self.stage_name}"
+ )
+
+ def write_plugin_dir(self, image_tag: str) -> None:
+ self.rendered_plugin_dir.mkdir(parents=True, exist_ok=True)
+ rendered = (self.plugin_dir / "plugin.yaml").read_text(encoding="utf-8")
+ rendered = rendered.replace(
+ "namespace: kargo-system-resources",
+ f"namespace: {self.env.system_namespace}",
+ )
+ rendered = rendered.replace(
+ "image: send-message-step-plugin-python:dev",
+ f"image: {image_tag}",
+ )
+ rendered = rendered.replace(
+ "value: kargo-system-resources",
+ f"value: {self.env.system_namespace}",
+ )
+ (self.rendered_plugin_dir / "plugin.yaml").write_text(rendered, encoding="utf-8")
+
+ def build_image(self, image_tag: str) -> None:
+ self.env.run(self.env.docker_bin, "build", "-t", image_tag, str(self.plugin_dir))
+
+ def load_image(self, kind_name: str, image_tag: str) -> None:
+ self.env.run(
+ self.env.kind_bin,
+ "load",
+ "docker-image",
+ "--name",
+ kind_name,
+ image_tag,
+ )
+
+ def build_configmap(self) -> None:
+ self.env.run(
+ self.env.kargo_bin,
+ "step-plugin",
+ "build",
+ ".",
+ cwd=self.rendered_plugin_dir,
+ )
+ matches = sorted(self.rendered_plugin_dir.glob("*-configmap.yaml"))
+ if len(matches) != 1:
+ fail(
+ "expected exactly one rendered StepPlugin ConfigMap, got "
+ f"{[path.name for path in matches]}"
+ )
+ self.configmap_path = matches[0]
+
+ def install_crds(self) -> None:
+ self.apply_file(self.plugin_dir / "manifests" / "crds.yaml")
+
+ def install_rbac(self) -> None:
+ self.apply_file(self.plugin_dir / "manifests" / "rbac.yaml")
+
+ def bind_cluster_role(self) -> None:
+ binding_path = self.tmp_dir / "clusterrolebinding.yaml"
+ binding_path.write_text(
+ "\n".join(
+ [
+ "apiVersion: rbac.authorization.k8s.io/v1",
+ "kind: ClusterRoleBinding",
+ "metadata:",
+ f" name: {self.cluster_role_binding_name}",
+ "subjects:",
+ "- kind: ServiceAccount",
+ " name: default",
+ f" namespace: {self.env.project}",
+ "roleRef:",
+ " apiGroup: rbac.authorization.k8s.io",
+ " kind: ClusterRole",
+ f" name: {self.env.cluster_role_name}",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+ self.apply_file(binding_path)
+
+ def install_configmap(self) -> None:
+ if self.configmap_path is None:
+ fail("StepPlugin ConfigMap was not built")
+ self.apply_file(self.configmap_path)
+
+ def create_secret(self) -> None:
+ path = self.tmp_dir / "secret.yaml"
+ path.write_text(
+ "\n".join(
+ [
+ "apiVersion: v1",
+ "kind: Secret",
+ "metadata:",
+ f" name: {self.env.secret_name}",
+ f" namespace: {self.env.project}",
+ "type: Opaque",
+ "stringData:",
+ f" apiKey: {self.env.slack_api_key}",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+ self.apply_file(path)
+
+ def create_channel(self) -> None:
+ path = self.tmp_dir / "messagechannel.yaml"
+ path.write_text(
+ "\n".join(
+ [
+ "apiVersion: ee.kargo.akuity.io/v1alpha1",
+ "kind: MessageChannel",
+ "metadata:",
+ f" name: {self.env.channel_name}",
+ f" namespace: {self.env.project}",
+ "spec:",
+ " secretRef:",
+ f" name: {self.env.secret_name}",
+ " slack:",
+ f" channelID: {self.env.channel_id}",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+ self.apply_file(path)
+
+ def create_stage(self) -> None:
+ path = self.tmp_dir / "stage.yaml"
+ path.write_text(
+ "\n".join(
+ [
+ "apiVersion: kargo.akuity.io/v1alpha1",
+ "kind: Stage",
+ "metadata:",
+ f" name: {self.stage_name}",
+ f" namespace: {self.env.project}",
+ "spec:",
+ " requestedFreight:",
+ " - origin:",
+ " kind: Warehouse",
+ f" name: {self.env.warehouse}",
+ " sources:",
+ " direct: true",
+ " promotionTemplate:",
+ " spec:",
+ " steps:",
+ " - uses: send-message",
+ " config:",
+ " channel:",
+ " kind: MessageChannel",
+ f" name: {self.env.channel_name}",
+ f' message: "{self.message_text}"',
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+ self.env.run(self.env.kargo_bin, "apply", "-f", str(path), *self.env.kargo_flags)
+
+ def approve_freight(self) -> None:
+ self.env.run(
+ self.env.kargo_bin,
+ "approve",
+ f"--project={self.env.project}",
+ f"--freight={self.env.freight_name}",
+ f"--stage={self.stage_name}",
+ *self.env.kargo_flags,
+ )
+
+ def promote_freight(self) -> None:
+ path = self.tmp_dir / "promotion.yaml"
+ path.write_text(
+ "\n".join(
+ [
+ "apiVersion: kargo.akuity.io/v1alpha1",
+ "kind: Promotion",
+ "metadata:",
+ f" name: {self.promotion_name}",
+ f" namespace: {self.env.project}",
+ "spec:",
+ f" stage: {self.stage_name}",
+ f" freight: {self.env.freight_name}",
+ " steps:",
+ " - uses: send-message",
+ " config:",
+ " channel:",
+ " kind: MessageChannel",
+ f" name: {self.env.channel_name}",
+ f' message: "{self.message_text}"',
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+ self.apply_file(path)
+
+ def wait_for_success(self) -> None:
+ deadline = time.time() + 180
+ last_phase = ""
+ last_promotion_name = ""
+ last_thread_ts = ""
+
+ while time.time() < deadline:
+ stdout = self.env.capture(
+ self.env.kubectl_bin,
+ "get",
+ "promotion.kargo.akuity.io",
+ "-n",
+ self.env.project,
+ "-o",
+ "json",
+ )
+ promotion_name, phase, thread_ts = find_latest_promotion(stdout, self.stage_name)
+ if promotion_name:
+ last_promotion_name = promotion_name
+ last_phase = phase
+ last_thread_ts = thread_ts
+
+ if phase == "Succeeded":
+ if not thread_ts:
+ fail("send-message smoke did not produce slack.threadTS output")
+ return
+ if phase in {"Failed", "Errored", "Aborted"}:
+ fail(
+ "send-message smoke promotion reached terminal phase "
+ f"{phase} ({promotion_name})"
+ )
+ time.sleep(2)
+
+ fail(
+ "send-message smoke promotion did not succeed in time"
+ f" (last promotion={last_promotion_name}, last phase={last_phase}, "
+ f"last threadTS={last_thread_ts})"
+ )
+
+ def cleanup(self) -> None:
+ delete(
+ self.env.kubectl_bin,
+ "promotion.kargo.akuity.io",
+ self.promotion_name,
+ "-n",
+ self.env.project,
+ )
+ delete(self.env.kubectl_bin, "stage.kargo.akuity.io", self.stage_name, "-n", self.env.project)
+ delete(
+ self.env.kubectl_bin,
+ "messagechannel.ee.kargo.akuity.io",
+ self.env.channel_name,
+ "-n",
+ self.env.project,
+ )
+ delete(self.env.kubectl_bin, "secret", self.env.secret_name, "-n", self.env.project)
+ if self.configmap_path is not None:
+ delete(
+ self.env.kubectl_bin,
+ "configmap",
+ self.configmap_path.stem.removesuffix("-configmap"),
+ "-n",
+ self.env.system_namespace,
+ )
+ delete(self.env.kubectl_bin, "clusterrolebinding", self.cluster_role_binding_name)
+ delete(self.env.kubectl_bin, "clusterrole", self.env.cluster_role_name)
+ subprocess.run(
+ [
+ self.env.kubectl_bin,
+ "delete",
+ "-f",
+ str(self.plugin_dir / "manifests" / "crds.yaml"),
+ "--ignore-not-found",
+ ],
+ check=False,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ def apply_file(self, path: Path) -> None:
+ self.env.run(self.env.kubectl_bin, "apply", "-f", str(path))
+
+
+def delete(command: str, resource: str, name: str, *extra: str) -> None:
+ subprocess.run(
+ [command, "delete", resource, name, *extra, "--ignore-not-found"],
+ check=False,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+
+def find_latest_promotion(payload: str, stage_name: str) -> tuple[str, str, str]:
+ data = json.loads(payload)
+ items = [
+ item
+ for item in data.get("items", [])
+ if item.get("spec", {}).get("stage") == stage_name
+ ]
+ if not items:
+ return "", "", ""
+ items.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", ""))
+ promotion = items[-1]
+ metadata = promotion.get("metadata", {})
+ status = promotion.get("status", {})
+ thread_ts = (
+ deep_get(status, "stepExecutionMetadata", 0, "output", "slack", "threadTS")
+ or deep_get(status, "state", "step-1", "slack", "threadTS")
+ or ""
+ )
+ return metadata.get("name", ""), status.get("phase", ""), thread_ts
+
+
+def deep_get(value: object, *path: object) -> str:
+ current = value
+ for key in path:
+ if isinstance(key, int):
+ if not isinstance(current, list) or key >= len(current):
+ return ""
+ current = current[key]
+ continue
+ if not isinstance(current, dict):
+ return ""
+ current = current.get(key)
+ return current if isinstance(current, str) else ""
+
+
+def log(level: str, message: str) -> None:
+ print(f"[{level}] {message}")
+
+
+def fail(message: str) -> None:
+ log("FAIL", message)
+ raise SystemExit(1)
diff --git a/extended/plugins/send-message-python/smoke/smoke_test.py b/extended/plugins/send-message-python/smoke/smoke_test.py
new file mode 100644
index 0000000000..bb1c9b5ef5
--- /dev/null
+++ b/extended/plugins/send-message-python/smoke/smoke_test.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python3
+
+from __future__ import annotations
+
+import shutil
+import sys
+import tempfile
+import time
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+from smoke.lib import SmokeEnv, SmokeResources, fail, log
+
+
+def main() -> int:
+ plugin_dir = Path(__file__).resolve().parents[1]
+ env = SmokeEnv()
+ env.require(
+ "SEND_MESSAGE_SMOKE_PROJECT",
+ "SEND_MESSAGE_SMOKE_WAREHOUSE",
+ "SEND_MESSAGE_SMOKE_FREIGHT_NAME",
+ "SEND_MESSAGE_SMOKE_SLACK_API_KEY",
+ "SEND_MESSAGE_SMOKE_CHANNEL_ID",
+ )
+
+ current_context = env.capture(
+ env.kubectl_bin,
+ "config",
+ "current-context",
+ ).strip()
+ if not current_context.startswith("kind-"):
+ fail(f"send-message smoke requires a kind context, got: {current_context}")
+
+ kind_name = current_context.removeprefix("kind-")
+ image_tag = f"send-message-step-plugin-python:e2e-{int(time.time())}"
+ tmp_dir = Path(tempfile.mkdtemp(prefix="send-message-python-smoke-"))
+
+ resources = SmokeResources(env, plugin_dir, tmp_dir)
+ try:
+ resources.write_plugin_dir(image_tag)
+ resources.build_image(image_tag)
+ resources.load_image(kind_name, image_tag)
+ resources.build_configmap()
+ resources.install_crds()
+ resources.install_rbac()
+ resources.bind_cluster_role()
+ resources.install_configmap()
+ resources.create_secret()
+ resources.create_channel()
+ resources.create_stage()
+ resources.approve_freight()
+ resources.promote_freight()
+ resources.wait_for_success()
+ finally:
+ resources.cleanup()
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+
+ log("PASS", "send-message StepPlugin smoke promotion finished successfully")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/extended/plugins/send-message-python/tests/test_server.py b/extended/plugins/send-message-python/tests/test_server.py
new file mode 100644
index 0000000000..2e6709651b
--- /dev/null
+++ b/extended/plugins/send-message-python/tests/test_server.py
@@ -0,0 +1,580 @@
+from __future__ import annotations
+
+import json
+import sys
+import tempfile
+import unittest
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+import send_message_plugin as server
+
+
+class FakeKubernetesClient:
+ def __init__(
+ self,
+ *,
+ message_channels: dict[tuple[str, str], server.ChannelResource] | None = None,
+ cluster_message_channels: dict[str, server.ChannelResource] | None = None,
+ secrets: dict[tuple[str, str], dict[str, str]] | None = None,
+ ) -> None:
+ self.message_channels = message_channels or {}
+ self.cluster_message_channels = cluster_message_channels or {}
+ self.secrets = secrets or {}
+
+ def get_message_channel(self, namespace: str, name: str) -> server.ChannelResource:
+ try:
+ return self.message_channels[(namespace, name)]
+ except KeyError as exc:
+ raise RuntimeError("message channel not found") from exc
+
+ def get_cluster_message_channel(self, name: str) -> server.ChannelResource:
+ try:
+ return self.cluster_message_channels[name]
+ except KeyError as exc:
+ raise RuntimeError("cluster message channel not found") from exc
+
+ def get_secret(self, namespace: str, name: str) -> dict[str, str]:
+ try:
+ return self.secrets[(namespace, name)]
+ except KeyError as exc:
+ raise RuntimeError("secret not found") from exc
+
+
+class FakeSlackClient:
+ def __init__(
+ self,
+ *,
+ response: dict[str, object] | None = None,
+ error: Exception | None = None,
+ ) -> None:
+ self.response = response or {"ok": True, "ts": "0"}
+ self.error = error
+ self.last_payload: dict[str, object] | None = None
+
+ def post_message(
+ self,
+ api_base_url: str,
+ token: str,
+ payload: dict[str, object],
+ ) -> dict[str, object]:
+ del api_base_url
+ del token
+ self.last_payload = payload
+ if self.error is not None:
+ raise self.error
+ return self.response
+
+
+class PluginServerTest(unittest.TestCase):
+ def setUp(self) -> None:
+ self.temp_dir = tempfile.TemporaryDirectory()
+ self.token_path = Path(self.temp_dir.name) / "token"
+ self.token_path.write_text("expected-token", encoding="utf-8")
+
+ def tearDown(self) -> None:
+ self.temp_dir.cleanup()
+
+ def new_server(
+ self,
+ *,
+ kube_client: server.KubernetesClient | None = None,
+ slack_client: server.SlackClient | None = None,
+ ) -> server.PluginServer:
+ return server.PluginServer(
+ expected_token_path=self.token_path,
+ system_resources_namespace="kargo-system-resources",
+ kube_client=kube_client or FakeKubernetesClient(),
+ slack_client=slack_client or FakeSlackClient(),
+ )
+
+ def test_handler_rejects_missing_bearer_token(self) -> None:
+ plugin = self.new_server()
+
+ status_code, response = plugin.handle(
+ "POST",
+ server.STEP_EXECUTE_PATH,
+ {},
+ json.dumps(minimal_request()).encode("utf-8"),
+ )
+
+ self.assertEqual(403, status_code)
+ self.assertEqual("Errored", response["status"])
+
+ def test_handler_rejects_invalid_bearer_token(self) -> None:
+ plugin = self.new_server()
+
+ status_code, response = plugin.handle(
+ "POST",
+ server.STEP_EXECUTE_PATH,
+ {server.AUTH_HEADER: f"{server.BEARER_PREFIX}wrong-token"},
+ json.dumps(minimal_request()).encode("utf-8"),
+ )
+
+ self.assertEqual(403, status_code)
+ self.assertEqual("Errored", response["status"])
+
+ def test_handler_rejects_invalid_request_body(self) -> None:
+ plugin = self.new_server()
+
+ status_code, response = plugin.handle(
+ "POST",
+ server.STEP_EXECUTE_PATH,
+ {server.AUTH_HEADER: f"{server.BEARER_PREFIX}expected-token"},
+ b"{not-json",
+ )
+
+ self.assertEqual(400, status_code)
+ self.assertEqual("Errored", response["status"])
+ self.assertEqual("invalid request body", response["message"])
+
+ def test_execute_uses_namespaced_message_channel(self) -> None:
+ kube_client = FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ )
+ slack_client = FakeSlackClient(response={"ok": True, "ts": "1712345678.000100"})
+ plugin = self.new_server(kube_client=kube_client, slack_client=slack_client)
+
+ response = execute_request(
+ plugin,
+ {
+ "context": {"project": "demo"},
+ "step": minimal_request()["step"],
+ },
+ )
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual("1712345678.000100", response["output"]["slack"]["threadTS"])
+ self.assertEqual(
+ {
+ "channel": "C123",
+ "text": "hello from plugin",
+ },
+ slack_client.last_payload,
+ )
+
+ def test_execute_uses_cluster_message_channel(self) -> None:
+ kube_client = FakeKubernetesClient(
+ cluster_message_channels={
+ "send": server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C777",
+ resource_kind="ClusterMessageChannel",
+ resource_name="send",
+ ),
+ },
+ secrets={
+ ("kargo-system-resources", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ )
+ slack_client = FakeSlackClient(response={"ok": True, "ts": "1712345678.000200"})
+ plugin = self.new_server(kube_client=kube_client, slack_client=slack_client)
+
+ response = execute_request(
+ plugin,
+ {
+ "context": {"project": "demo"},
+ "step": {
+ "kind": "send-message",
+ "config": {
+ "channel": {
+ "kind": "ClusterMessageChannel",
+ "name": "send",
+ },
+ "message": "hello from plugin",
+ },
+ },
+ },
+ )
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual(
+ {
+ "channel": "C777",
+ "text": "hello from plugin",
+ },
+ slack_client.last_payload,
+ )
+
+ def test_execute_honors_plaintext_slack_overrides(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000300"}),
+ )
+
+ request = minimal_request()
+ request["context"] = {"project": "demo"}
+ request["step"]["config"]["slack"] = {
+ "channelID": "C999",
+ "threadTS": "1700000000.000001",
+ }
+
+ response = execute_request(plugin, request)
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual(
+ {
+ "channel": "C999",
+ "text": "hello from plugin",
+ "thread_ts": "1700000000.000001",
+ },
+ plugin.slack_client.last_payload,
+ )
+ self.assertEqual(
+ "1700000000.000001",
+ response["output"]["slack"]["threadTS"],
+ )
+
+ def test_execute_supports_json_encoding(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000400"}),
+ )
+
+ request = minimal_request()
+ request["context"] = {"project": "demo"}
+ request["step"]["config"]["encodingType"] = "json"
+ request["step"]["config"]["message"] = (
+ '{"text":"rich","blocks":[{"type":"section"}]}'
+ )
+
+ response = execute_request(plugin, request)
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual(
+ {
+ "channel": "C123",
+ "text": "rich",
+ "blocks": [{"type": "section"}],
+ },
+ plugin.slack_client.last_payload,
+ )
+
+ def test_execute_supports_yaml_encoding(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000410"}),
+ )
+
+ request = minimal_request()
+ request["context"] = {"project": "demo"}
+ request["step"]["config"]["encodingType"] = "yaml"
+ request["step"]["config"]["message"] = (
+ "text: rich\nblocks:\n- type: section\n"
+ )
+
+ response = execute_request(plugin, request)
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual(
+ {
+ "channel": "C123",
+ "text": "rich",
+ "blocks": [{"type": "section"}],
+ },
+ plugin.slack_client.last_payload,
+ )
+
+ def test_execute_returns_errored_for_bad_yaml(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(),
+ )
+
+ request = minimal_request()
+ request["context"] = {"project": "demo"}
+ request["step"]["config"]["encodingType"] = "yaml"
+ request["step"]["config"]["message"] = "text: [oops"
+
+ response = execute_request(plugin, request)
+
+ self.assertEqual("Errored", response["status"])
+ self.assertIn("error decoding YAML Slack payload", response["error"])
+
+ def test_execute_supports_xml_encoding(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000500"}),
+ )
+
+ request = minimal_request()
+ request["context"] = {"project": "demo"}
+ request["step"]["config"]["encodingType"] = "xml"
+ request["step"]["config"]["message"] = """
+
+ rich
+ 1700000000.000003
+
+ """
+
+ response = execute_request(plugin, request)
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual(
+ {
+ "channel": "C123",
+ "text": "rich",
+ "thread_ts": "1700000000.000003",
+ },
+ plugin.slack_client.last_payload,
+ )
+ self.assertEqual(
+ "1700000000.000003",
+ response["output"]["slack"]["threadTS"],
+ )
+
+ def test_execute_encoded_payload_ignores_slack_overrides(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000450"}),
+ )
+
+ request = minimal_request()
+ request["context"] = {"project": "demo"}
+ request["step"]["config"]["encodingType"] = "json"
+ request["step"]["config"]["message"] = (
+ '{"text":"rich","thread_ts":"1700000000.000002"}'
+ )
+ request["step"]["config"]["slack"] = {
+ "channelID": "C999",
+ "threadTS": "1700000000.000001",
+ }
+
+ response = execute_request(plugin, request)
+
+ self.assertEqual("Succeeded", response["status"])
+ self.assertEqual(
+ {
+ "channel": "C123",
+ "text": "rich",
+ "thread_ts": "1700000000.000002",
+ },
+ plugin.slack_client.last_payload,
+ )
+ self.assertEqual(
+ "1700000000.000002",
+ response["output"]["slack"]["threadTS"],
+ )
+
+ def test_execute_fails_when_secret_missing(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ ),
+ slack_client=FakeSlackClient(),
+ )
+
+ response = execute_request(
+ plugin,
+ {
+ "context": {"project": "demo"},
+ "step": minimal_request()["step"],
+ },
+ )
+
+ self.assertEqual("Failed", response["status"])
+ self.assertIn("secret not found", response["error"])
+
+ def test_execute_fails_when_slack_errors(self) -> None:
+ plugin = self.new_server(
+ kube_client=FakeKubernetesClient(
+ message_channels={
+ ("demo", "send"): server.ChannelResource(
+ secret_name="slack-token",
+ slack_channel_id="C123",
+ resource_kind="MessageChannel",
+ resource_name="send",
+ resource_namespace="demo",
+ ),
+ },
+ secrets={
+ ("demo", "slack-token"): {"apiKey": "xoxb-demo"},
+ },
+ ),
+ slack_client=FakeSlackClient(
+ response={"ok": False, "error": "channel_not_found"}
+ ),
+ )
+
+ response = execute_request(
+ plugin,
+ {
+ "context": {"project": "demo"},
+ "step": minimal_request()["step"],
+ },
+ )
+
+ self.assertEqual("Failed", response["status"])
+ self.assertIn("channel_not_found", response["error"])
+
+ def test_xml_decode_shape(self) -> None:
+ payload = server.decode_xml_slack_payload(
+ """
+
+ rich
+
+ section
+
+ mrkdwn
+ *hello*
+
+
+
+ divider
+
+
+ """
+ )
+
+ self.assertEqual(
+ {
+ "icon_emoji": ":wave:",
+ "text": "rich",
+ "blocks": [
+ {
+ "type": "section",
+ "text": {
+ "type": "mrkdwn",
+ "text": "*hello*",
+ },
+ },
+ {
+ "type": "divider",
+ },
+ ],
+ },
+ payload,
+ )
+
+
+def execute_request(
+ plugin: server.PluginServer,
+ request: dict[str, object],
+) -> dict[str, object]:
+ status_code, response = plugin.handle(
+ "POST",
+ server.STEP_EXECUTE_PATH,
+ {server.AUTH_HEADER: f"{server.BEARER_PREFIX}expected-token"},
+ json.dumps(request).encode("utf-8"),
+ )
+ if status_code != 200:
+ raise AssertionError(f"unexpected status code {status_code}: {response}")
+ assert response is not None
+ return response
+
+
+def minimal_request() -> dict[str, object]:
+ return {
+ "step": {
+ "kind": "send-message",
+ "config": {
+ "channel": {
+ "kind": "MessageChannel",
+ "name": "send",
+ },
+ "message": "hello from plugin",
+ },
+ },
+ }
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/extended/tests/e2e_stepplugins.sh b/extended/tests/e2e_stepplugins.sh
index 075f9a67f3..5b14413fb6 100644
--- a/extended/tests/e2e_stepplugins.sh
+++ b/extended/tests/e2e_stepplugins.sh
@@ -4,6 +4,37 @@ STEPPLUGIN_TEST_STAGE=""
STEPPLUGIN_CONFIGMAP_NAME="mkdir-step-plugin"
STEPPLUGIN_PLUGIN_DIR=""
+create_stepplugin_smoke_promotion() {
+ local project="$1"
+ local freight_name="$2"
+ local stage_name="$3"
+ local promotion_name="stepplugin-smoke-promotion-$(date +%s)"
+ local promotion_path="/tmp/${promotion_name}.yaml"
+
+ cat > "$promotion_path" <