From ecae86a3df01f4da1186d3f592fdee3445dde416 Mon Sep 17 00:00:00 2001 From: Seth Nickell Date: Thu, 9 Apr 2026 23:44:23 -1000 Subject: [PATCH] feat: add send-message python plugin Signed-off-by: Seth Nickell --- .../checklist.python.md | 86 +++ .../plan.python.md | 119 ++++ .../plugins/send-message-python/Dockerfile | 10 + .../send-message-python/manifests/crds.yaml | 81 +++ .../send-message-python/manifests/rbac.yaml | 22 + .../plugins/send-message-python/plugin.yaml | 34 + .../send-message-python/requirements.txt | 1 + .../send_message_plugin/__init__.py | 28 + .../send_message_plugin/__main__.py | 9 + .../send_message_plugin/http.py | 55 ++ .../send_message_plugin/kubernetes.py | 103 ++++ .../send_message_plugin/models.py | 56 ++ .../send_message_plugin/payloads.py | 111 ++++ .../send_message_plugin/service.py | 172 ++++++ .../send_message_plugin/slack.py | 45 ++ .../send_message_plugin/util.py | 20 + .../send-message-python/smoke/README.md | 20 + .../plugins/send-message-python/smoke/lib.py | 398 ++++++++++++ .../send-message-python/smoke/smoke_test.py | 64 ++ .../send-message-python/tests/test_server.py | 580 ++++++++++++++++++ extended/tests/e2e_stepplugins.sh | 104 +++- 21 files changed, 2115 insertions(+), 3 deletions(-) create mode 100644 extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md create mode 100644 extended/docs/proposals/0004-send-message-step-plugin/plan.python.md create mode 100644 extended/plugins/send-message-python/Dockerfile create mode 100644 extended/plugins/send-message-python/manifests/crds.yaml create mode 100644 extended/plugins/send-message-python/manifests/rbac.yaml create mode 100644 extended/plugins/send-message-python/plugin.yaml create mode 100644 extended/plugins/send-message-python/requirements.txt create mode 100644 extended/plugins/send-message-python/send_message_plugin/__init__.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/__main__.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/http.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/kubernetes.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/models.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/payloads.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/service.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/slack.py create mode 100644 extended/plugins/send-message-python/send_message_plugin/util.py create mode 100644 extended/plugins/send-message-python/smoke/README.md create mode 100644 extended/plugins/send-message-python/smoke/lib.py create mode 100644 extended/plugins/send-message-python/smoke/smoke_test.py create mode 100644 extended/plugins/send-message-python/tests/test_server.py diff --git a/extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md b/extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md new file mode 100644 index 0000000000..a7e91da74c --- /dev/null +++ b/extended/docs/proposals/0004-send-message-step-plugin/checklist.python.md @@ -0,0 +1,86 @@ +# 0004 Python Checklist + +## Baseline + +- [x] Plugin work lives under `extended/plugins/send-message-python/`. +- [x] The subtree stands alone. +- [x] No committed Slack credential appears anywhere. +- [x] The plugin owns Kubernetes reads and Slack calls. + +## Phase 0: pin the contract + +- [x] Re-read `proposal.md`. +- [x] Re-read `spec.md`. +- [x] Re-read the `send-message` slice in proposal `0002`. +- [x] Keep scope to Slack only. + +## Phase 1: create the tiny repo + +- [x] Create `extended/plugins/send-message-python/`. +- [x] Add runtime source. +- [x] Add image build. +- [x] Add `plugin.yaml`. +- [x] Add CRD manifests. +- [x] Add RBAC manifests. +- [x] Add smoke assets. + +## Phase 2: implement the runtime + +- [x] Implement `POST /api/v1/step.execute`. +- [x] Enforce bearer auth from `/var/run/kargo/token`. +- [x] Read `MessageChannel`. +- [x] Read `ClusterMessageChannel`. +- [x] Read referenced `Secret`. +- [x] Send plaintext Slack payloads. +- [x] Send encoded Slack payloads. +- [x] Return `slack.threadTS`. + +## Phase 3: test the contract + +- [x] Add auth tests. +- [x] Add channel lookup tests. +- [x] Add Secret lookup tests. +- [x] Add plaintext payload tests. +- [x] Add encoded payload tests. +- [x] Add XML decode tests. +- [x] Add Slack failure tests. + +## Phase 4: smoke + +- [x] Add plugin-owned `smoke/smoke_test.py`. +- [x] Keep smoke orchestration in Python, not shell. +- [x] Build the image. +- [x] Load it into kind. +- [x] Install CRDs and RBAC. +- [x] Install StepPlugin `ConfigMap`. +- [x] Create local-only test Secret. +- [x] Create test `MessageChannel`. +- [x] Run a `Stage` with `uses: send-message`. +- [x] Assert `Succeeded`. +- [x] Assert non-empty `slack.threadTS`. + +## Phase 5: mandatory radical simplification pass 1 + +- [x] Ask "can I make this look easier by deleting a dependency?" +- [x] Ask "can I merge files without making the contract harder to read?" +- [x] Ask "am I using a framework just because it is familiar?" +- [x] Delete anything that fails those checks. + +## Phase 6: mandatory radical simplification pass 2 + +- [x] Re-run tests and smoke from a green tree. +- [x] Ask "can I make this radically simpler?" +- [x] Remove abstractions that only serve style. +- [x] Remove helpers that only save a few lines. +- [x] Stop only when the repo still reads like a small third-party plugin. + +Refactor pass notes: +- Split the runtime into small package files for: + - app flow + - HTTP entrypoint + - Kubernetes and Slack clients + - payload decoding and shaping +- Split smoke support out of the entrypoint into `smoke/lib.py`. +- Re-ran unit tests after refactor. +- Re-ran `py_compile` across all Python files after refactor. +- Re-ran isolated kind smoke through `extended/tests/e2e_stepplugins.sh`. diff --git a/extended/docs/proposals/0004-send-message-step-plugin/plan.python.md b/extended/docs/proposals/0004-send-message-step-plugin/plan.python.md new file mode 100644 index 0000000000..ef7d77c44e --- /dev/null +++ b/extended/docs/proposals/0004-send-message-step-plugin/plan.python.md @@ -0,0 +1,119 @@ +# 0004 Python Plan + +## Goal + +- Show the same `send-message` plugin contract in a form that looks easy to + write and easy to own. +- Keep all plugin-owned code under `extended/plugins/send-message-python/`. +- Keep the subtree standalone enough to behave like its own Git repo. +- Match the same Slack-only contract as `spec.md`. + +## Design Rule + +- Prefer directness over framework taste. +- Prefer a tiny dependency set over "proper" stacks. +- Prefer raw Kubernetes API reads over a large client library when that keeps + the repo smaller and clearer. +- Prefer the first-party Slack Python SDK only if it reduces code materially. +- If a dependency does not make the plugin look simpler to a third party, + delete it. + +## Runtime Shape + +- Runtime: + - Python +- Suggested layout: + - `extended/plugins/send-message-python/` + - `server.py` + - `smoke/smoke_test.py` + - `requirements.txt` + - `Dockerfile` + - `plugin.yaml` + - `manifests/` + - `smoke/` +- Suggested server shape: + - one small HTTP server + - one request parser + - one Kubernetes reader + - one Slack sender + +## Minimal Dependency Target + +- Acceptable: + - `slack_sdk` + - `PyYAML` +- Strong preference: + - stdlib HTTP server + - stdlib `json` + - stdlib `xml.etree.ElementTree` + - direct HTTPS to Kubernetes +- Avoid: + - large web frameworks + - large Kubernetes client stacks + - background workers + - async machinery unless it removes code + +## Behavior + +- Implement `POST /api/v1/step.execute`. +- Enforce bearer auth from `/var/run/kargo/token`. +- Read `MessageChannel` and `ClusterMessageChannel` directly from Kubernetes. +- Read referenced `Secret` directly from Kubernetes. +- Send Slack messages from the plugin. +- Support: + - plaintext + - `json` + - `yaml` + - `xml` +- Match the response contract in `spec.md`. + +## Tests + +- Keep tests inside the subtree. +- Prefer a small unit-test suite over a heavy harness. +- Cover: + - auth + - channel lookup + - Secret lookup + - plaintext payload shaping + - encoded payload shaping + - XML decode shape + - Slack error handling + +## Smoke + +- Own `smoke/smoke_test.py` inside the subtree. +- Assume Kargo already exists. +- Use Python for the smoke orchestration too. +- Build image, install manifests, create Secret and channel, run a Stage, + assert `Succeeded`, assert non-empty `slack.threadTS`. + +## Mandatory Simplify Passes + +- Simplify pass 1, before full smoke: + - ask "can I delete a dependency and still keep this clearer?" + - ask "can I collapse this into fewer files without hiding the contract?" +- Simplify pass 2, after green: + - ask "can I make this radically simpler?" + - remove any helper, abstraction, or library that does not make the plugin + look easier to write + +## Current Implementation Notes + +- Runtime now lives in a small package: + - `extended/plugins/send-message-python/send_message_plugin/` +- Smoke orchestration lives in: + - `extended/plugins/send-message-python/smoke/smoke_test.py` + - `extended/plugins/send-message-python/smoke/lib.py` +- Keep the dependency set to: + - stdlib + - `PyYAML` +- Do not add: + - Slack SDK + - Kubernetes Python client + - web framework +- Local validation currently proves: + - unit tests pass + - Python sources compile + - Docker image builds + - isolated kind smoke passes through `extended/tests/e2e_stepplugins.sh` diff --git a/extended/plugins/send-message-python/Dockerfile b/extended/plugins/send-message-python/Dockerfile new file mode 100644 index 0000000000..5ea5562800 --- /dev/null +++ b/extended/plugins/send-message-python/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.12-alpine + +WORKDIR /app + +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +ENTRYPOINT ["python", "-m", "send_message_plugin"] diff --git a/extended/plugins/send-message-python/manifests/crds.yaml b/extended/plugins/send-message-python/manifests/crds.yaml new file mode 100644 index 0000000000..008f9931f8 --- /dev/null +++ b/extended/plugins/send-message-python/manifests/crds.yaml @@ -0,0 +1,81 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: messagechannels.ee.kargo.akuity.io +spec: + group: ee.kargo.akuity.io + names: + kind: MessageChannel + plural: messagechannels + singular: messagechannel + listKind: MessageChannelList + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + secretRef: + type: object + properties: + name: + type: string + required: + - name + slack: + type: object + properties: + channelID: + type: string + required: + - channelID + required: + - secretRef + - slack +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: clustermessagechannels.ee.kargo.akuity.io +spec: + group: ee.kargo.akuity.io + names: + kind: ClusterMessageChannel + plural: clustermessagechannels + singular: clustermessagechannel + listKind: ClusterMessageChannelList + scope: Cluster + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + secretRef: + type: object + properties: + name: + type: string + required: + - name + slack: + type: object + properties: + channelID: + type: string + required: + - channelID + required: + - secretRef + - slack diff --git a/extended/plugins/send-message-python/manifests/rbac.yaml b/extended/plugins/send-message-python/manifests/rbac.yaml new file mode 100644 index 0000000000..df10af6b7f --- /dev/null +++ b/extended/plugins/send-message-python/manifests/rbac.yaml @@ -0,0 +1,22 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: send-message-step-plugin-reader +rules: +- apiGroups: + - ee.kargo.akuity.io + resources: + - messagechannels + - clustermessagechannels + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch diff --git a/extended/plugins/send-message-python/plugin.yaml b/extended/plugins/send-message-python/plugin.yaml new file mode 100644 index 0000000000..e2e26546e6 --- /dev/null +++ b/extended/plugins/send-message-python/plugin.yaml @@ -0,0 +1,34 @@ +apiVersion: kargo-extended.code.org/v1alpha1 +kind: StepPlugin +metadata: + name: send-message + namespace: kargo-system-resources +spec: + sidecar: + automountServiceAccountToken: true + container: + name: send-message-step-plugin + image: send-message-step-plugin-python:dev + imagePullPolicy: IfNotPresent + env: + - name: SYSTEM_RESOURCES_NAMESPACE + value: kargo-system-resources + ports: + - containerPort: 9765 + securityContext: + runAsNonRoot: true + runAsUser: 65532 + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: + - ALL + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 250m + memory: 128Mi + steps: + - kind: send-message diff --git a/extended/plugins/send-message-python/requirements.txt b/extended/plugins/send-message-python/requirements.txt new file mode 100644 index 0000000000..f62ce0c56d --- /dev/null +++ b/extended/plugins/send-message-python/requirements.txt @@ -0,0 +1 @@ +PyYAML==6.0.3 diff --git a/extended/plugins/send-message-python/send_message_plugin/__init__.py b/extended/plugins/send-message-python/send_message_plugin/__init__.py new file mode 100644 index 0000000000..7665e3cb95 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/__init__.py @@ -0,0 +1,28 @@ +from .service import PluginServer +from .http import serve +from .models import ( + AUTH_HEADER, + AUTH_TOKEN_PATH, + BEARER_PREFIX, + ChannelResource, + KubernetesClient, + RequestError, + SlackClient, + STEP_EXECUTE_PATH, +) +from .payloads import build_slack_payload, decode_xml_slack_payload + +__all__ = [ + "AUTH_HEADER", + "AUTH_TOKEN_PATH", + "BEARER_PREFIX", + "ChannelResource", + "KubernetesClient", + "PluginServer", + "RequestError", + "STEP_EXECUTE_PATH", + "SlackClient", + "build_slack_payload", + "decode_xml_slack_payload", + "serve", +] diff --git a/extended/plugins/send-message-python/send_message_plugin/__main__.py b/extended/plugins/send-message-python/send_message_plugin/__main__.py new file mode 100644 index 0000000000..50df33a385 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/__main__.py @@ -0,0 +1,9 @@ +from .http import serve + + +def main() -> None: + serve() + + +if __name__ == "__main__": + main() diff --git a/extended/plugins/send-message-python/send_message_plugin/http.py b/extended/plugins/send-message-python/send_message_plugin/http.py new file mode 100644 index 0000000000..c360fa55b9 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/http.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import json +import os +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any + +from .service import PluginServer + + +class RequestHandler(BaseHTTPRequestHandler): + plugin_server: PluginServer + + def do_POST(self) -> None: # noqa: N802 + self._handle() + + def do_GET(self) -> None: # noqa: N802 + self._handle() + + def log_message(self, format: str, *args: Any) -> None: + return + + def _handle(self) -> None: + content_length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(content_length) if content_length else b"" + status_code, response = self.plugin_server.handle( + self.command, + self.path, + {key: value for key, value in self.headers.items()}, + body, + ) + self.send_response(status_code) + if response is None: + self.end_headers() + return + data = json.dumps(response).encode("utf-8") + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + +def make_handler(plugin_server: PluginServer) -> type[RequestHandler]: + class Handler(RequestHandler): + pass + + Handler.plugin_server = plugin_server + return Handler + + +def serve() -> None: + port = int(os.environ.get("PORT", "9765")) + address = os.environ.get("HOST", "0.0.0.0") + server = ThreadingHTTPServer((address, port), make_handler(PluginServer())) + server.serve_forever() diff --git a/extended/plugins/send-message-python/send_message_plugin/kubernetes.py b/extended/plugins/send-message-python/send_message_plugin/kubernetes.py new file mode 100644 index 0000000000..1689221e51 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/kubernetes.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import base64 +import json +import os +import ssl +import urllib.error +import urllib.parse +import urllib.request +from http import HTTPStatus +from typing import Any + +from .models import ( + AUTH_HEADER, + BEARER_PREFIX, + DEFAULT_KUBERNETES_URL, + ChannelResource, + SERVICE_ACCOUNT_CA, + SERVICE_ACCOUNT_TOKEN, +) +from .util import as_dict, deep_get, string_value + + +class InClusterKubernetesClient: + def __init__(self) -> None: + self.token = SERVICE_ACCOUNT_TOKEN.read_text(encoding="utf-8").strip() + self.base_url = kubernetes_base_url() + self.ssl_context = ssl.create_default_context(cafile=str(SERVICE_ACCOUNT_CA)) + + def get_message_channel(self, namespace: str, name: str) -> ChannelResource: + response = self._get_json( + "/apis/ee.kargo.akuity.io/v1alpha1/namespaces/" + f"{quote(namespace)}/messagechannels/{quote(name)}" + ) + return ChannelResource( + secret_name=string_value(deep_get(response, "spec", "secretRef", "name")), + slack_channel_id=string_value(deep_get(response, "spec", "slack", "channelID")), + resource_kind="MessageChannel", + resource_name=string_value(deep_get(response, "metadata", "name")), + resource_namespace=string_value(deep_get(response, "metadata", "namespace")), + ) + + def get_cluster_message_channel(self, name: str) -> ChannelResource: + response = self._get_json( + "/apis/ee.kargo.akuity.io/v1alpha1/clustermessagechannels/" + f"{quote(name)}" + ) + return ChannelResource( + secret_name=string_value(deep_get(response, "spec", "secretRef", "name")), + slack_channel_id=string_value(deep_get(response, "spec", "slack", "channelID")), + resource_kind="ClusterMessageChannel", + resource_name=string_value(deep_get(response, "metadata", "name")), + ) + + def get_secret(self, namespace: str, name: str) -> dict[str, str]: + response = self._get_json( + f"/api/v1/namespaces/{quote(namespace)}/secrets/{quote(name)}" + ) + decoded: dict[str, str] = {} + for key, value in as_dict(response.get("data")).items(): + try: + raw = base64.b64decode(string_value(value)) + except Exception as exc: + raise RuntimeError(f'error decoding secret key "{key}": {exc}') from exc + decoded[str(key)] = raw.decode("utf-8") + return decoded + + def _get_json(self, path: str) -> dict[str, Any]: + request = urllib.request.Request( + urllib.parse.urljoin(self.base_url.rstrip("/") + "/", path.lstrip("/")), + method="GET", + headers={AUTH_HEADER: f"{BEARER_PREFIX}{self.token}"}, + ) + try: + with urllib.request.urlopen( + request, + context=self.ssl_context, + timeout=30, + ) as response: + return json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace").strip() + if exc.code == HTTPStatus.NOT_FOUND: + raise RuntimeError(f"resource not found at {path}") from exc + raise RuntimeError( + f"Kubernetes API error {exc.code} {exc.reason}: {body}" + ) from exc + + +def kubernetes_base_url() -> str: + host = os.environ.get("KUBERNETES_SERVICE_HOST", "").strip() + if not host: + return DEFAULT_KUBERNETES_URL + port = ( + os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", "").strip() + or os.environ.get("KUBERNETES_SERVICE_PORT", "").strip() + or "443" + ) + return f"https://{host}:{port}" + + +def quote(value: str) -> str: + return urllib.parse.quote(value, safe="") diff --git a/extended/plugins/send-message-python/send_message_plugin/models.py b/extended/plugins/send-message-python/send_message_plugin/models.py new file mode 100644 index 0000000000..d6d8b8270d --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/models.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Protocol + +STEP_EXECUTE_PATH = "/api/v1/step.execute" +AUTH_HEADER = "Authorization" +BEARER_PREFIX = "Bearer " + +AUTH_TOKEN_PATH = Path("/var/run/kargo/token") +SERVICE_ACCOUNT_DIR = Path("/var/run/secrets/kubernetes.io/serviceaccount") +SERVICE_ACCOUNT_TOKEN = SERVICE_ACCOUNT_DIR / "token" +SERVICE_ACCOUNT_CA = SERVICE_ACCOUNT_DIR / "ca.crt" + +DEFAULT_SYSTEM_NAMESPACE = "kargo-system-resources" +DEFAULT_KUBERNETES_URL = "https://kubernetes.default.svc" +DEFAULT_SLACK_API_BASE_URL = "https://slack.com/api" + + +class RequestError(RuntimeError): + pass + + +@dataclass +class ChannelResource: + secret_name: str + slack_channel_id: str + resource_kind: str + resource_name: str + resource_namespace: str = "" + + +class KubernetesClient(Protocol): + def get_message_channel( + self, + namespace: str, + name: str, + ) -> ChannelResource: + ... + + def get_cluster_message_channel(self, name: str) -> ChannelResource: + ... + + def get_secret(self, namespace: str, name: str) -> dict[str, str]: + ... + + +class SlackClient(Protocol): + def post_message( + self, + api_base_url: str, + token: str, + payload: dict[str, Any], + ) -> dict[str, Any]: + ... diff --git a/extended/plugins/send-message-python/send_message_plugin/payloads.py b/extended/plugins/send-message-python/send_message_plugin/payloads.py new file mode 100644 index 0000000000..8bb132adb6 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/payloads.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import json +import xml.etree.ElementTree as ET +from typing import Any + +import yaml + +from .models import ChannelResource, RequestError +from .util import as_dict, string_value + + +def build_slack_payload( + config: dict[str, Any], + channel: ChannelResource, +) -> tuple[dict[str, Any], str]: + encoding_type = string_value(config.get("encodingType")) + slack_config = as_dict(config.get("slack")) + + if not encoding_type: + channel_id = string_value(slack_config.get("channelID")) or channel.slack_channel_id + if not channel_id: + raise RequestError( + f'{channel.resource_kind} "{channel.resource_name}" does not define ' + "spec.slack.channelID and config.slack.channelID is empty" + ) + payload: dict[str, Any] = { + "channel": channel_id, + "text": string_value(config.get("message")), + } + thread_ts = string_value(slack_config.get("threadTS")) + if thread_ts: + payload["thread_ts"] = thread_ts + return payload, thread_ts + + payload = decode_encoded_payload(encoding_type, string_value(config.get("message"))) + if "channel" not in payload: + if not channel.slack_channel_id: + raise RequestError( + f'{channel.resource_kind} "{channel.resource_name}" does not define ' + "spec.slack.channelID" + ) + payload["channel"] = channel.slack_channel_id + return payload, string_value(payload.get("thread_ts")) + + +def decode_encoded_payload( + encoding_type: str, + message: str, +) -> dict[str, Any]: + if encoding_type == "json": + try: + payload = json.loads(message) + except json.JSONDecodeError as exc: + raise RequestError(f"error decoding JSON Slack payload: {exc}") from exc + elif encoding_type == "yaml": + try: + payload = yaml.safe_load(message) + except yaml.YAMLError as exc: + raise RequestError(f"error decoding YAML Slack payload: {exc}") from exc + elif encoding_type == "xml": + payload = decode_xml_slack_payload(message) + else: + raise RequestError(f'unsupported encodingType "{encoding_type}"') + + if not isinstance(payload, dict): + raise RequestError("Slack payload must decode to an object") + return payload + + +def decode_xml_slack_payload(message: str) -> dict[str, Any]: + try: + root = ET.fromstring(message) + except ET.ParseError as exc: + raise RequestError(f"error decoding XML Slack payload: {exc}") from exc + payload = xml_node_to_object(root) + if not isinstance(payload, dict): + raise RequestError("Slack payload must decode to an object") + return payload + + +def xml_node_to_object(node: ET.Element) -> dict[str, Any]: + payload: dict[str, Any] = dict(node.attrib) + for child in list(node): + append_xml_value(payload, child.tag, xml_node_value(child)) + + text = normalized_text(node.text) + if text: + payload["#text" if payload else "text"] = text + return payload + + +def xml_node_value(node: ET.Element) -> Any: + if not node.attrib and not list(node): + return normalized_text(node.text) + return xml_node_to_object(node) + + +def append_xml_value(payload: dict[str, Any], key: str, value: Any) -> None: + if key not in payload: + payload[key] = value + return + existing = payload[key] + if isinstance(existing, list): + existing.append(value) + return + payload[key] = [existing, value] + + +def normalized_text(value: str | None) -> str: + return (value or "").strip() diff --git a/extended/plugins/send-message-python/send_message_plugin/service.py b/extended/plugins/send-message-python/send_message_plugin/service.py new file mode 100644 index 0000000000..3967d14e72 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/service.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + +from .kubernetes import InClusterKubernetesClient +from .models import ( + AUTH_HEADER, + AUTH_TOKEN_PATH, + BEARER_PREFIX, + DEFAULT_SLACK_API_BASE_URL, + DEFAULT_SYSTEM_NAMESPACE, + ChannelResource, + KubernetesClient, + RequestError, + SlackClient, +) +from .payloads import build_slack_payload +from .slack import RealSlackClient +from .util import as_dict, string_value + + +class PluginServer: + def __init__( + self, + *, + expected_token_path: Path | None = None, + system_resources_namespace: str | None = None, + slack_api_base_url: str | None = None, + kube_client: KubernetesClient | None = None, + slack_client: SlackClient | None = None, + ) -> None: + self.expected_token_path = expected_token_path or AUTH_TOKEN_PATH + self.system_resources_namespace = ( + system_resources_namespace + or os.environ.get("SYSTEM_RESOURCES_NAMESPACE", "").strip() + or DEFAULT_SYSTEM_NAMESPACE + ) + self.slack_api_base_url = ( + slack_api_base_url + or os.environ.get("SLACK_API_BASE_URL", "").strip() + or DEFAULT_SLACK_API_BASE_URL + ) + self.kube_client = kube_client or InClusterKubernetesClient() + self.slack_client = slack_client or RealSlackClient() + + def handle( + self, + method: str, + path: str, + headers: dict[str, str], + body: bytes, + ) -> tuple[int, dict[str, Any] | None]: + from http import HTTPStatus + from .models import STEP_EXECUTE_PATH + + if path != STEP_EXECUTE_PATH: + return HTTPStatus.NOT_FOUND, None + if method != "POST": + return HTTPStatus.METHOD_NOT_ALLOWED, None + + try: + self.authorize(headers.get(AUTH_HEADER, "")) + except Exception as exc: # pragma: no cover + return HTTPStatus.FORBIDDEN, errored_response(str(exc)) + + try: + request = json.loads(body.decode("utf-8")) + except Exception as exc: + return HTTPStatus.BAD_REQUEST, { + "status": "Errored", + "message": "invalid request body", + "error": str(exc), + "terminal": True, + } + + return HTTPStatus.OK, self.execute(request) + + def execute(self, request: dict[str, Any]) -> dict[str, Any]: + step = as_dict(request.get("step")) + step_kind = string_value(step.get("kind")) + if step_kind != "send-message": + return errored_response(f'unsupported step kind "{step_kind}"') + + try: + config = as_dict(step.get("config")) + channel, secret_namespace = self.lookup_channel(request, config) + token = self.read_slack_token(secret_namespace, channel.secret_name) + payload, output_thread_ts = build_slack_payload(config, channel) + slack_response = self.slack_client.post_message( + self.slack_api_base_url, + token, + payload, + ) + if not slack_response.get("ok"): + raise RuntimeError(f'Slack API error: {slack_response.get("error", "")}') + if not output_thread_ts: + output_thread_ts = string_value(slack_response.get("ts")) + return { + "status": "Succeeded", + "output": { + "slack": { + "threadTS": output_thread_ts, + }, + }, + } + except RequestError as exc: + return errored_response(str(exc)) + except Exception as exc: + return failed_response(str(exc)) + + def lookup_channel( + self, + request: dict[str, Any], + config: dict[str, Any], + ) -> tuple[ChannelResource, str]: + channel_ref = as_dict(config.get("channel")) + channel_kind = string_value(channel_ref.get("kind")) + channel_name = string_value(channel_ref.get("name")) + project = string_value(as_dict(request.get("context")).get("project")) + + if channel_kind == "MessageChannel": + if not project: + raise RequestError("step context project is required for MessageChannel") + return self.kube_client.get_message_channel(project, channel_name), project + + if channel_kind == "ClusterMessageChannel": + return ( + self.kube_client.get_cluster_message_channel(channel_name), + self.system_resources_namespace, + ) + + raise RequestError(f'unsupported channel kind "{channel_kind}"') + + def read_slack_token(self, namespace: str, secret_name: str) -> str: + try: + secret = self.kube_client.get_secret(namespace, secret_name) + except Exception as exc: + raise RuntimeError(f"error getting Slack Secret: {exc}") from exc + token = string_value(secret.get("apiKey")) + if not token: + raise RuntimeError('Slack Secret is missing key "apiKey"') + return token + + def authorize(self, header_value: str) -> None: + expected = self.expected_token_path.read_text(encoding="utf-8").strip() + value = header_value.strip() + if not value.startswith(BEARER_PREFIX): + raise RuntimeError("missing bearer token") + received = value[len(BEARER_PREFIX) :].strip() + if received != expected: + raise RuntimeError("invalid bearer token") + + +def failed_response(message: str) -> dict[str, Any]: + return { + "status": "Failed", + "message": message, + "error": message, + "terminal": True, + } + + +def errored_response(message: str) -> dict[str, Any]: + return { + "status": "Errored", + "message": message, + "error": message, + "terminal": True, + } diff --git a/extended/plugins/send-message-python/send_message_plugin/slack.py b/extended/plugins/send-message-python/send_message_plugin/slack.py new file mode 100644 index 0000000000..2cd3439ea1 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/slack.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import json +import urllib.error +import urllib.parse +import urllib.request +from http import HTTPStatus +from typing import Any + +from .models import AUTH_HEADER, BEARER_PREFIX + + +class RealSlackClient: + def post_message( + self, + api_base_url: str, + token: str, + payload: dict[str, Any], + ) -> dict[str, Any]: + request = urllib.request.Request( + urllib.parse.urljoin(api_base_url.rstrip("/") + "/", "chat.postMessage"), + method="POST", + headers={ + AUTH_HEADER: f"{BEARER_PREFIX}{token}", + "Content-Type": "application/json", + }, + data=json.dumps(payload).encode("utf-8"), + ) + try: + with urllib.request.urlopen(request, timeout=30) as response: + result = json.loads(response.read().decode("utf-8")) + if response.status >= HTTPStatus.BAD_REQUEST and not result.get("error"): + result["ok"] = False + result["error"] = response.reason + return result + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + try: + result = json.loads(body) if body else {} + except json.JSONDecodeError: + result = {} + if "error" not in result: + result["error"] = f"{exc.code} {exc.reason}" + result["ok"] = False + return result diff --git a/extended/plugins/send-message-python/send_message_plugin/util.py b/extended/plugins/send-message-python/send_message_plugin/util.py new file mode 100644 index 0000000000..80a9e8aea6 --- /dev/null +++ b/extended/plugins/send-message-python/send_message_plugin/util.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import Any + + +def as_dict(value: Any) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def deep_get(value: Any, *keys: str) -> Any: + current = value + for key in keys: + if not isinstance(current, dict): + return None + current = current.get(key) + return current + + +def string_value(value: Any) -> str: + return value if isinstance(value, str) else "" diff --git a/extended/plugins/send-message-python/smoke/README.md b/extended/plugins/send-message-python/smoke/README.md new file mode 100644 index 0000000000..b7dc83eaed --- /dev/null +++ b/extended/plugins/send-message-python/smoke/README.md @@ -0,0 +1,20 @@ +# Local Smoke Notes + +- Primary smoke entrypoint: + - `extended/plugins/send-message-python/smoke/smoke_test.py` +- The script assumes a working Kargo cluster and CLI are already available. +- Required env: + - `SEND_MESSAGE_SMOKE_PROJECT` + - `SEND_MESSAGE_SMOKE_WAREHOUSE` + - `SEND_MESSAGE_SMOKE_FREIGHT_NAME` + - `SEND_MESSAGE_SMOKE_SLACK_API_KEY` + - `SEND_MESSAGE_SMOKE_CHANNEL_ID` +- Optional env: + - `KARGO_BIN` + - `KARGO_FLAGS` + - `SEND_MESSAGE_SMOKE_SYSTEM_RESOURCES_NAMESPACE` + - `KUBECTL_BIN` + - `DOCKER_BIN` + - `KIND_BIN` +- No committed file in this subtree contains a real token value or local token + source. diff --git a/extended/plugins/send-message-python/smoke/lib.py b/extended/plugins/send-message-python/smoke/lib.py new file mode 100644 index 0000000000..645d66e43f --- /dev/null +++ b/extended/plugins/send-message-python/smoke/lib.py @@ -0,0 +1,398 @@ +from __future__ import annotations + +import json +import os +import shlex +import subprocess +import time +from pathlib import Path + + +class SmokeEnv: + def __init__(self) -> None: + self.project = os.environ.get("SEND_MESSAGE_SMOKE_PROJECT", "") + self.warehouse = os.environ.get("SEND_MESSAGE_SMOKE_WAREHOUSE", "") + self.freight_name = os.environ.get("SEND_MESSAGE_SMOKE_FREIGHT_NAME", "") + self.slack_api_key = os.environ.get("SEND_MESSAGE_SMOKE_SLACK_API_KEY", "") + self.channel_id = os.environ.get("SEND_MESSAGE_SMOKE_CHANNEL_ID", "") + self.system_namespace = os.environ.get( + "SEND_MESSAGE_SMOKE_SYSTEM_RESOURCES_NAMESPACE", + "kargo-system-resources", + ) + self.secret_name = os.environ.get( + "SEND_MESSAGE_SMOKE_SECRET_NAME", + "send-message-slack-token", + ) + self.channel_name = os.environ.get( + "SEND_MESSAGE_SMOKE_CHANNEL_NAME", + "send-message-smoke", + ) + self.cluster_role_name = "send-message-step-plugin-reader" + self.kargo_bin = os.environ.get("KARGO_BIN", "kargo") + self.kubectl_bin = os.environ.get("KUBECTL_BIN", "kubectl") + self.docker_bin = os.environ.get("DOCKER_BIN", "docker") + self.kind_bin = os.environ.get("KIND_BIN", "kind") + self.kargo_flags = shlex.split(os.environ.get("KARGO_FLAGS", "")) + + def require(self, *names: str) -> None: + for name in names: + if not os.environ.get(name): + fail(f"{name} is required") + + def run(self, *args: str, cwd: Path | None = None) -> None: + log("INFO", " ".join(args)) + subprocess.run(args, cwd=str(cwd) if cwd else None, check=True) + + def capture(self, *args: str, cwd: Path | None = None) -> str: + completed = subprocess.run( + args, + cwd=str(cwd) if cwd else None, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + return completed.stdout + + +class SmokeResources: + def __init__(self, env: SmokeEnv, plugin_dir: Path, tmp_dir: Path) -> None: + self.env = env + self.plugin_dir = plugin_dir + self.tmp_dir = tmp_dir + self.rendered_plugin_dir = tmp_dir / "plugin" + self.configmap_path: Path | None = None + self.cluster_role_binding_name = f"send-message-step-plugin-reader-{env.project}" + self.stage_name = f"smsg-stepplugin-{int(time.time())}" + self.promotion_name = f"{self.stage_name}-promotion" + self.message_text = ( + f"send-message PYTHON stepplugin smoke {self.stage_name}" + ) + + def write_plugin_dir(self, image_tag: str) -> None: + self.rendered_plugin_dir.mkdir(parents=True, exist_ok=True) + rendered = (self.plugin_dir / "plugin.yaml").read_text(encoding="utf-8") + rendered = rendered.replace( + "namespace: kargo-system-resources", + f"namespace: {self.env.system_namespace}", + ) + rendered = rendered.replace( + "image: send-message-step-plugin-python:dev", + f"image: {image_tag}", + ) + rendered = rendered.replace( + "value: kargo-system-resources", + f"value: {self.env.system_namespace}", + ) + (self.rendered_plugin_dir / "plugin.yaml").write_text(rendered, encoding="utf-8") + + def build_image(self, image_tag: str) -> None: + self.env.run(self.env.docker_bin, "build", "-t", image_tag, str(self.plugin_dir)) + + def load_image(self, kind_name: str, image_tag: str) -> None: + self.env.run( + self.env.kind_bin, + "load", + "docker-image", + "--name", + kind_name, + image_tag, + ) + + def build_configmap(self) -> None: + self.env.run( + self.env.kargo_bin, + "step-plugin", + "build", + ".", + cwd=self.rendered_plugin_dir, + ) + matches = sorted(self.rendered_plugin_dir.glob("*-configmap.yaml")) + if len(matches) != 1: + fail( + "expected exactly one rendered StepPlugin ConfigMap, got " + f"{[path.name for path in matches]}" + ) + self.configmap_path = matches[0] + + def install_crds(self) -> None: + self.apply_file(self.plugin_dir / "manifests" / "crds.yaml") + + def install_rbac(self) -> None: + self.apply_file(self.plugin_dir / "manifests" / "rbac.yaml") + + def bind_cluster_role(self) -> None: + binding_path = self.tmp_dir / "clusterrolebinding.yaml" + binding_path.write_text( + "\n".join( + [ + "apiVersion: rbac.authorization.k8s.io/v1", + "kind: ClusterRoleBinding", + "metadata:", + f" name: {self.cluster_role_binding_name}", + "subjects:", + "- kind: ServiceAccount", + " name: default", + f" namespace: {self.env.project}", + "roleRef:", + " apiGroup: rbac.authorization.k8s.io", + " kind: ClusterRole", + f" name: {self.env.cluster_role_name}", + "", + ] + ), + encoding="utf-8", + ) + self.apply_file(binding_path) + + def install_configmap(self) -> None: + if self.configmap_path is None: + fail("StepPlugin ConfigMap was not built") + self.apply_file(self.configmap_path) + + def create_secret(self) -> None: + path = self.tmp_dir / "secret.yaml" + path.write_text( + "\n".join( + [ + "apiVersion: v1", + "kind: Secret", + "metadata:", + f" name: {self.env.secret_name}", + f" namespace: {self.env.project}", + "type: Opaque", + "stringData:", + f" apiKey: {self.env.slack_api_key}", + "", + ] + ), + encoding="utf-8", + ) + self.apply_file(path) + + def create_channel(self) -> None: + path = self.tmp_dir / "messagechannel.yaml" + path.write_text( + "\n".join( + [ + "apiVersion: ee.kargo.akuity.io/v1alpha1", + "kind: MessageChannel", + "metadata:", + f" name: {self.env.channel_name}", + f" namespace: {self.env.project}", + "spec:", + " secretRef:", + f" name: {self.env.secret_name}", + " slack:", + f" channelID: {self.env.channel_id}", + "", + ] + ), + encoding="utf-8", + ) + self.apply_file(path) + + def create_stage(self) -> None: + path = self.tmp_dir / "stage.yaml" + path.write_text( + "\n".join( + [ + "apiVersion: kargo.akuity.io/v1alpha1", + "kind: Stage", + "metadata:", + f" name: {self.stage_name}", + f" namespace: {self.env.project}", + "spec:", + " requestedFreight:", + " - origin:", + " kind: Warehouse", + f" name: {self.env.warehouse}", + " sources:", + " direct: true", + " promotionTemplate:", + " spec:", + " steps:", + " - uses: send-message", + " config:", + " channel:", + " kind: MessageChannel", + f" name: {self.env.channel_name}", + f' message: "{self.message_text}"', + "", + ] + ), + encoding="utf-8", + ) + self.env.run(self.env.kargo_bin, "apply", "-f", str(path), *self.env.kargo_flags) + + def approve_freight(self) -> None: + self.env.run( + self.env.kargo_bin, + "approve", + f"--project={self.env.project}", + f"--freight={self.env.freight_name}", + f"--stage={self.stage_name}", + *self.env.kargo_flags, + ) + + def promote_freight(self) -> None: + path = self.tmp_dir / "promotion.yaml" + path.write_text( + "\n".join( + [ + "apiVersion: kargo.akuity.io/v1alpha1", + "kind: Promotion", + "metadata:", + f" name: {self.promotion_name}", + f" namespace: {self.env.project}", + "spec:", + f" stage: {self.stage_name}", + f" freight: {self.env.freight_name}", + " steps:", + " - uses: send-message", + " config:", + " channel:", + " kind: MessageChannel", + f" name: {self.env.channel_name}", + f' message: "{self.message_text}"', + "", + ] + ), + encoding="utf-8", + ) + self.apply_file(path) + + def wait_for_success(self) -> None: + deadline = time.time() + 180 + last_phase = "" + last_promotion_name = "" + last_thread_ts = "" + + while time.time() < deadline: + stdout = self.env.capture( + self.env.kubectl_bin, + "get", + "promotion.kargo.akuity.io", + "-n", + self.env.project, + "-o", + "json", + ) + promotion_name, phase, thread_ts = find_latest_promotion(stdout, self.stage_name) + if promotion_name: + last_promotion_name = promotion_name + last_phase = phase + last_thread_ts = thread_ts + + if phase == "Succeeded": + if not thread_ts: + fail("send-message smoke did not produce slack.threadTS output") + return + if phase in {"Failed", "Errored", "Aborted"}: + fail( + "send-message smoke promotion reached terminal phase " + f"{phase} ({promotion_name})" + ) + time.sleep(2) + + fail( + "send-message smoke promotion did not succeed in time" + f" (last promotion={last_promotion_name}, last phase={last_phase}, " + f"last threadTS={last_thread_ts})" + ) + + def cleanup(self) -> None: + delete( + self.env.kubectl_bin, + "promotion.kargo.akuity.io", + self.promotion_name, + "-n", + self.env.project, + ) + delete(self.env.kubectl_bin, "stage.kargo.akuity.io", self.stage_name, "-n", self.env.project) + delete( + self.env.kubectl_bin, + "messagechannel.ee.kargo.akuity.io", + self.env.channel_name, + "-n", + self.env.project, + ) + delete(self.env.kubectl_bin, "secret", self.env.secret_name, "-n", self.env.project) + if self.configmap_path is not None: + delete( + self.env.kubectl_bin, + "configmap", + self.configmap_path.stem.removesuffix("-configmap"), + "-n", + self.env.system_namespace, + ) + delete(self.env.kubectl_bin, "clusterrolebinding", self.cluster_role_binding_name) + delete(self.env.kubectl_bin, "clusterrole", self.env.cluster_role_name) + subprocess.run( + [ + self.env.kubectl_bin, + "delete", + "-f", + str(self.plugin_dir / "manifests" / "crds.yaml"), + "--ignore-not-found", + ], + check=False, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + def apply_file(self, path: Path) -> None: + self.env.run(self.env.kubectl_bin, "apply", "-f", str(path)) + + +def delete(command: str, resource: str, name: str, *extra: str) -> None: + subprocess.run( + [command, "delete", resource, name, *extra, "--ignore-not-found"], + check=False, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + +def find_latest_promotion(payload: str, stage_name: str) -> tuple[str, str, str]: + data = json.loads(payload) + items = [ + item + for item in data.get("items", []) + if item.get("spec", {}).get("stage") == stage_name + ] + if not items: + return "", "", "" + items.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", "")) + promotion = items[-1] + metadata = promotion.get("metadata", {}) + status = promotion.get("status", {}) + thread_ts = ( + deep_get(status, "stepExecutionMetadata", 0, "output", "slack", "threadTS") + or deep_get(status, "state", "step-1", "slack", "threadTS") + or "" + ) + return metadata.get("name", ""), status.get("phase", ""), thread_ts + + +def deep_get(value: object, *path: object) -> str: + current = value + for key in path: + if isinstance(key, int): + if not isinstance(current, list) or key >= len(current): + return "" + current = current[key] + continue + if not isinstance(current, dict): + return "" + current = current.get(key) + return current if isinstance(current, str) else "" + + +def log(level: str, message: str) -> None: + print(f"[{level}] {message}") + + +def fail(message: str) -> None: + log("FAIL", message) + raise SystemExit(1) diff --git a/extended/plugins/send-message-python/smoke/smoke_test.py b/extended/plugins/send-message-python/smoke/smoke_test.py new file mode 100644 index 0000000000..bb1c9b5ef5 --- /dev/null +++ b/extended/plugins/send-message-python/smoke/smoke_test.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import shutil +import sys +import tempfile +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from smoke.lib import SmokeEnv, SmokeResources, fail, log + + +def main() -> int: + plugin_dir = Path(__file__).resolve().parents[1] + env = SmokeEnv() + env.require( + "SEND_MESSAGE_SMOKE_PROJECT", + "SEND_MESSAGE_SMOKE_WAREHOUSE", + "SEND_MESSAGE_SMOKE_FREIGHT_NAME", + "SEND_MESSAGE_SMOKE_SLACK_API_KEY", + "SEND_MESSAGE_SMOKE_CHANNEL_ID", + ) + + current_context = env.capture( + env.kubectl_bin, + "config", + "current-context", + ).strip() + if not current_context.startswith("kind-"): + fail(f"send-message smoke requires a kind context, got: {current_context}") + + kind_name = current_context.removeprefix("kind-") + image_tag = f"send-message-step-plugin-python:e2e-{int(time.time())}" + tmp_dir = Path(tempfile.mkdtemp(prefix="send-message-python-smoke-")) + + resources = SmokeResources(env, plugin_dir, tmp_dir) + try: + resources.write_plugin_dir(image_tag) + resources.build_image(image_tag) + resources.load_image(kind_name, image_tag) + resources.build_configmap() + resources.install_crds() + resources.install_rbac() + resources.bind_cluster_role() + resources.install_configmap() + resources.create_secret() + resources.create_channel() + resources.create_stage() + resources.approve_freight() + resources.promote_freight() + resources.wait_for_success() + finally: + resources.cleanup() + shutil.rmtree(tmp_dir, ignore_errors=True) + + log("PASS", "send-message StepPlugin smoke promotion finished successfully") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/extended/plugins/send-message-python/tests/test_server.py b/extended/plugins/send-message-python/tests/test_server.py new file mode 100644 index 0000000000..2e6709651b --- /dev/null +++ b/extended/plugins/send-message-python/tests/test_server.py @@ -0,0 +1,580 @@ +from __future__ import annotations + +import json +import sys +import tempfile +import unittest +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +import send_message_plugin as server + + +class FakeKubernetesClient: + def __init__( + self, + *, + message_channels: dict[tuple[str, str], server.ChannelResource] | None = None, + cluster_message_channels: dict[str, server.ChannelResource] | None = None, + secrets: dict[tuple[str, str], dict[str, str]] | None = None, + ) -> None: + self.message_channels = message_channels or {} + self.cluster_message_channels = cluster_message_channels or {} + self.secrets = secrets or {} + + def get_message_channel(self, namespace: str, name: str) -> server.ChannelResource: + try: + return self.message_channels[(namespace, name)] + except KeyError as exc: + raise RuntimeError("message channel not found") from exc + + def get_cluster_message_channel(self, name: str) -> server.ChannelResource: + try: + return self.cluster_message_channels[name] + except KeyError as exc: + raise RuntimeError("cluster message channel not found") from exc + + def get_secret(self, namespace: str, name: str) -> dict[str, str]: + try: + return self.secrets[(namespace, name)] + except KeyError as exc: + raise RuntimeError("secret not found") from exc + + +class FakeSlackClient: + def __init__( + self, + *, + response: dict[str, object] | None = None, + error: Exception | None = None, + ) -> None: + self.response = response or {"ok": True, "ts": "0"} + self.error = error + self.last_payload: dict[str, object] | None = None + + def post_message( + self, + api_base_url: str, + token: str, + payload: dict[str, object], + ) -> dict[str, object]: + del api_base_url + del token + self.last_payload = payload + if self.error is not None: + raise self.error + return self.response + + +class PluginServerTest(unittest.TestCase): + def setUp(self) -> None: + self.temp_dir = tempfile.TemporaryDirectory() + self.token_path = Path(self.temp_dir.name) / "token" + self.token_path.write_text("expected-token", encoding="utf-8") + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def new_server( + self, + *, + kube_client: server.KubernetesClient | None = None, + slack_client: server.SlackClient | None = None, + ) -> server.PluginServer: + return server.PluginServer( + expected_token_path=self.token_path, + system_resources_namespace="kargo-system-resources", + kube_client=kube_client or FakeKubernetesClient(), + slack_client=slack_client or FakeSlackClient(), + ) + + def test_handler_rejects_missing_bearer_token(self) -> None: + plugin = self.new_server() + + status_code, response = plugin.handle( + "POST", + server.STEP_EXECUTE_PATH, + {}, + json.dumps(minimal_request()).encode("utf-8"), + ) + + self.assertEqual(403, status_code) + self.assertEqual("Errored", response["status"]) + + def test_handler_rejects_invalid_bearer_token(self) -> None: + plugin = self.new_server() + + status_code, response = plugin.handle( + "POST", + server.STEP_EXECUTE_PATH, + {server.AUTH_HEADER: f"{server.BEARER_PREFIX}wrong-token"}, + json.dumps(minimal_request()).encode("utf-8"), + ) + + self.assertEqual(403, status_code) + self.assertEqual("Errored", response["status"]) + + def test_handler_rejects_invalid_request_body(self) -> None: + plugin = self.new_server() + + status_code, response = plugin.handle( + "POST", + server.STEP_EXECUTE_PATH, + {server.AUTH_HEADER: f"{server.BEARER_PREFIX}expected-token"}, + b"{not-json", + ) + + self.assertEqual(400, status_code) + self.assertEqual("Errored", response["status"]) + self.assertEqual("invalid request body", response["message"]) + + def test_execute_uses_namespaced_message_channel(self) -> None: + kube_client = FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ) + slack_client = FakeSlackClient(response={"ok": True, "ts": "1712345678.000100"}) + plugin = self.new_server(kube_client=kube_client, slack_client=slack_client) + + response = execute_request( + plugin, + { + "context": {"project": "demo"}, + "step": minimal_request()["step"], + }, + ) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual("1712345678.000100", response["output"]["slack"]["threadTS"]) + self.assertEqual( + { + "channel": "C123", + "text": "hello from plugin", + }, + slack_client.last_payload, + ) + + def test_execute_uses_cluster_message_channel(self) -> None: + kube_client = FakeKubernetesClient( + cluster_message_channels={ + "send": server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C777", + resource_kind="ClusterMessageChannel", + resource_name="send", + ), + }, + secrets={ + ("kargo-system-resources", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ) + slack_client = FakeSlackClient(response={"ok": True, "ts": "1712345678.000200"}) + plugin = self.new_server(kube_client=kube_client, slack_client=slack_client) + + response = execute_request( + plugin, + { + "context": {"project": "demo"}, + "step": { + "kind": "send-message", + "config": { + "channel": { + "kind": "ClusterMessageChannel", + "name": "send", + }, + "message": "hello from plugin", + }, + }, + }, + ) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual( + { + "channel": "C777", + "text": "hello from plugin", + }, + slack_client.last_payload, + ) + + def test_execute_honors_plaintext_slack_overrides(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000300"}), + ) + + request = minimal_request() + request["context"] = {"project": "demo"} + request["step"]["config"]["slack"] = { + "channelID": "C999", + "threadTS": "1700000000.000001", + } + + response = execute_request(plugin, request) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual( + { + "channel": "C999", + "text": "hello from plugin", + "thread_ts": "1700000000.000001", + }, + plugin.slack_client.last_payload, + ) + self.assertEqual( + "1700000000.000001", + response["output"]["slack"]["threadTS"], + ) + + def test_execute_supports_json_encoding(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000400"}), + ) + + request = minimal_request() + request["context"] = {"project": "demo"} + request["step"]["config"]["encodingType"] = "json" + request["step"]["config"]["message"] = ( + '{"text":"rich","blocks":[{"type":"section"}]}' + ) + + response = execute_request(plugin, request) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual( + { + "channel": "C123", + "text": "rich", + "blocks": [{"type": "section"}], + }, + plugin.slack_client.last_payload, + ) + + def test_execute_supports_yaml_encoding(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000410"}), + ) + + request = minimal_request() + request["context"] = {"project": "demo"} + request["step"]["config"]["encodingType"] = "yaml" + request["step"]["config"]["message"] = ( + "text: rich\nblocks:\n- type: section\n" + ) + + response = execute_request(plugin, request) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual( + { + "channel": "C123", + "text": "rich", + "blocks": [{"type": "section"}], + }, + plugin.slack_client.last_payload, + ) + + def test_execute_returns_errored_for_bad_yaml(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient(), + ) + + request = minimal_request() + request["context"] = {"project": "demo"} + request["step"]["config"]["encodingType"] = "yaml" + request["step"]["config"]["message"] = "text: [oops" + + response = execute_request(plugin, request) + + self.assertEqual("Errored", response["status"]) + self.assertIn("error decoding YAML Slack payload", response["error"]) + + def test_execute_supports_xml_encoding(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000500"}), + ) + + request = minimal_request() + request["context"] = {"project": "demo"} + request["step"]["config"]["encodingType"] = "xml" + request["step"]["config"]["message"] = """ + + rich + 1700000000.000003 + + """ + + response = execute_request(plugin, request) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual( + { + "channel": "C123", + "text": "rich", + "thread_ts": "1700000000.000003", + }, + plugin.slack_client.last_payload, + ) + self.assertEqual( + "1700000000.000003", + response["output"]["slack"]["threadTS"], + ) + + def test_execute_encoded_payload_ignores_slack_overrides(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient(response={"ok": True, "ts": "1712345678.000450"}), + ) + + request = minimal_request() + request["context"] = {"project": "demo"} + request["step"]["config"]["encodingType"] = "json" + request["step"]["config"]["message"] = ( + '{"text":"rich","thread_ts":"1700000000.000002"}' + ) + request["step"]["config"]["slack"] = { + "channelID": "C999", + "threadTS": "1700000000.000001", + } + + response = execute_request(plugin, request) + + self.assertEqual("Succeeded", response["status"]) + self.assertEqual( + { + "channel": "C123", + "text": "rich", + "thread_ts": "1700000000.000002", + }, + plugin.slack_client.last_payload, + ) + self.assertEqual( + "1700000000.000002", + response["output"]["slack"]["threadTS"], + ) + + def test_execute_fails_when_secret_missing(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + ), + slack_client=FakeSlackClient(), + ) + + response = execute_request( + plugin, + { + "context": {"project": "demo"}, + "step": minimal_request()["step"], + }, + ) + + self.assertEqual("Failed", response["status"]) + self.assertIn("secret not found", response["error"]) + + def test_execute_fails_when_slack_errors(self) -> None: + plugin = self.new_server( + kube_client=FakeKubernetesClient( + message_channels={ + ("demo", "send"): server.ChannelResource( + secret_name="slack-token", + slack_channel_id="C123", + resource_kind="MessageChannel", + resource_name="send", + resource_namespace="demo", + ), + }, + secrets={ + ("demo", "slack-token"): {"apiKey": "xoxb-demo"}, + }, + ), + slack_client=FakeSlackClient( + response={"ok": False, "error": "channel_not_found"} + ), + ) + + response = execute_request( + plugin, + { + "context": {"project": "demo"}, + "step": minimal_request()["step"], + }, + ) + + self.assertEqual("Failed", response["status"]) + self.assertIn("channel_not_found", response["error"]) + + def test_xml_decode_shape(self) -> None: + payload = server.decode_xml_slack_payload( + """ + + rich + + section + + mrkdwn + *hello* + + + + divider + + + """ + ) + + self.assertEqual( + { + "icon_emoji": ":wave:", + "text": "rich", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*hello*", + }, + }, + { + "type": "divider", + }, + ], + }, + payload, + ) + + +def execute_request( + plugin: server.PluginServer, + request: dict[str, object], +) -> dict[str, object]: + status_code, response = plugin.handle( + "POST", + server.STEP_EXECUTE_PATH, + {server.AUTH_HEADER: f"{server.BEARER_PREFIX}expected-token"}, + json.dumps(request).encode("utf-8"), + ) + if status_code != 200: + raise AssertionError(f"unexpected status code {status_code}: {response}") + assert response is not None + return response + + +def minimal_request() -> dict[str, object]: + return { + "step": { + "kind": "send-message", + "config": { + "channel": { + "kind": "MessageChannel", + "name": "send", + }, + "message": "hello from plugin", + }, + }, + } + + +if __name__ == "__main__": + unittest.main() diff --git a/extended/tests/e2e_stepplugins.sh b/extended/tests/e2e_stepplugins.sh index 075f9a67f3..5b14413fb6 100644 --- a/extended/tests/e2e_stepplugins.sh +++ b/extended/tests/e2e_stepplugins.sh @@ -4,6 +4,37 @@ STEPPLUGIN_TEST_STAGE="" STEPPLUGIN_CONFIGMAP_NAME="mkdir-step-plugin" STEPPLUGIN_PLUGIN_DIR="" +create_stepplugin_smoke_promotion() { + local project="$1" + local freight_name="$2" + local stage_name="$3" + local promotion_name="stepplugin-smoke-promotion-$(date +%s)" + local promotion_path="/tmp/${promotion_name}.yaml" + + cat > "$promotion_path" <