diff --git a/CHANGELOG.md b/CHANGELOG.md index e772de4..8358b96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `A2A.Plug` task-level authorization hook for `tasks/get`, `tasks/cancel`, and + `tasks/list` + ## [0.2.0] - 2026-03-06 ### Added diff --git a/README.md b/README.md index 090137b..85669a8 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,25 @@ forward "/a2a", A2A.Plug, The agent card is served at `GET /.well-known/agent-card.json` by default. +### Task Access Control + +`A2A.Plug` accepts an optional `:authorize_task` callback for task-scoped +operations: + +```elixir +forward "/a2a", A2A.Plug, + agent: MyAgent, + base_url: "http://localhost:4000/a2a", + authorize_task: fn operation, task, %{metadata: metadata} -> + same_tenant? = metadata["tenant_id"] == task.metadata["tenant_id"] + same_tenant? and operation in [:get, :cancel, :list] + end +``` + +The callback runs before `tasks/get`, `tasks/cancel`, and `tasks/list` responses. +Denied `tasks/get` and `tasks/cancel` requests return `TaskNotFoundError` so +callers cannot distinguish nonexistent tasks from tasks they cannot access. + ## Calling Remote Agents `A2A.Client` discovers and communicates with remote A2A agents over HTTP. Requires the `req` optional dependency. @@ -222,7 +241,8 @@ Key A2A spec features not yet covered: - **REST / gRPC transports** — only JSON-RPC is supported - **Version negotiation** — hardcoded to A2A v0.3 - **Task resubscribe** — reconnecting to active SSE streams -- **Security middleware** — auth plug, agent card signatures, task-level ACL (security scheme data modeling is complete) +- **Security middleware** — agent card signatures and OAuth flows (auth plug, + task ACL hook, and security scheme data modeling are complete) See [SPEC.md](SPEC.md) for full details and roadmap. diff --git a/SPEC.md b/SPEC.md index 4530f21..704c83d 100644 --- a/SPEC.md +++ b/SPEC.md @@ -205,11 +205,21 @@ flow built into the client. ### Task-Level Access Control The spec states: "Servers MUST NOT reveal the existence of resources the client -is not authorized to access." Currently any caller can access any task by ID. +is not authorized to access." `A2A.Plug` now supports an `:authorize_task` +callback for task-scoped JSON-RPC operations. -A callback or plug that maps authenticated identity to allowed task IDs / -context IDs. The runtime would check this before returning task data from -`tasks/get` or `tasks/cancel`. +- `tasks/get` and `tasks/cancel` call the callback before returning or mutating a + task. Denied requests return `TaskNotFoundError` so task IDs are not leaked. +- `tasks/list` filters the returned page through the same callback. +- The callback receives `(operation, task, context)` where `operation` is + `:get`, `:cancel`, or `:list`, and `context.metadata` contains the resolved + Plug metadata, including `A2A.Plug.Auth` identity under `"a2a.auth"` when that + plug is used. + +Remaining hardening: + +- Move authorization down into task stores that can filter before pagination. +- Add store-specific examples for tenant and user ownership policies. ### Agent Card Signature Verification @@ -228,7 +238,7 @@ verification. Not yet implemented. Would require: 1. Agent card signature verification 2. Authenticated extended card endpoint 3. Client-side OAuth 2.0 flows -4. Task-level access control +4. Store-level authorization filters --- diff --git a/lib/a2a/plug.ex b/lib/a2a/plug.ex index 2a48474..0fd3e18 100644 --- a/lib/a2a/plug.ex +++ b/lib/a2a/plug.ex @@ -32,6 +32,10 @@ if Code.ensure_loaded?(Plug) do - `:metadata` — static metadata merged into every JSON-RPC call (default: `%{}`). Useful for deployment-level metadata like `%{"env" => "prod"}`. Overridden per-request by `put_metadata/2`. + - `:authorize_task` — optional authorization callback for task-scoped + operations. Called as `(operation, task, context)` before returning, + canceling, or listing tasks. Denied `tasks/get` and `tasks/cancel` + requests return `TaskNotFoundError` so task IDs are not leaked. ## Per-Request Overrides @@ -116,7 +120,8 @@ if Code.ensure_loaded?(Plug) do agent_card_path: Keyword.get(opts, :agent_card_path, [".well-known", "agent-card.json"]), json_rpc_path: Keyword.get(opts, :json_rpc_path, []), agent_card_opts: Keyword.get(opts, :agent_card_opts, []), - metadata: Keyword.get(opts, :metadata, %{}) + metadata: Keyword.get(opts, :metadata, %{}), + authorize_task: Keyword.get(opts, :authorize_task) } end @@ -271,35 +276,38 @@ if Code.ensure_loaded?(Plug) do end @impl A2A.JSONRPC - def handle_get(task_id, _params, %{agent: agent}) do + def handle_get(task_id, params, %{agent: agent, opts: plug_opts}) do case GenServer.call(agent, {:get_task, task_id}) do - {:ok, task} -> {:ok, task} + {:ok, task} -> authorize_task(:get, task, params, plug_opts) {:error, :not_found} -> {:error, Error.task_not_found()} end end @impl A2A.JSONRPC - def handle_cancel(task_id, _params, %{agent: agent}) do - case GenServer.call(agent, {:cancel, task_id}) do - :ok -> - case GenServer.call(agent, {:get_task, task_id}) do - {:ok, task} -> {:ok, task} - {:error, _} -> {:error, Error.task_not_found()} - end - - {:error, :not_found} -> - {:error, Error.task_not_found()} - - {:error, reason} -> - {:error, Error.task_not_cancelable(inspect(reason))} + def handle_cancel(task_id, params, %{agent: agent, opts: plug_opts}) do + with {:ok, task} <- fetch_task(agent, task_id), + {:ok, _task} <- authorize_task(:cancel, task, params, plug_opts) do + case GenServer.call(agent, {:cancel, task_id}) do + :ok -> + fetch_task(agent, task_id) + + {:error, :not_found} -> + {:error, Error.task_not_found()} + + {:error, reason} -> + {:error, Error.task_not_cancelable(inspect(reason))} + end + else + {:error, :not_found} -> {:error, Error.task_not_found()} + {:error, %Error{} = error} -> {:error, error} end end @impl A2A.JSONRPC - def handle_list(params, %{agent: agent}) do + def handle_list(params, %{agent: agent, opts: plug_opts}) do case GenServer.call(agent, {:list_tasks, params}) do {:ok, result} -> - {:ok, result} + {:ok, authorize_task_list(result, params, plug_opts)} {:error, :invalid_page_token} -> {:error, Error.invalid_params("\"pageToken\" is invalid")} @@ -311,11 +319,63 @@ if Code.ensure_loaded?(Plug) do # -- Helpers --------------------------------------------------------------- + defp fetch_task(agent, task_id) do + case GenServer.call(agent, {:get_task, task_id}) do + {:ok, task} -> {:ok, task} + {:error, :not_found} -> {:error, :not_found} + end + end + + defp authorize_task(_operation, task, _params, %{authorize_task: nil}), do: {:ok, task} + + defp authorize_task(operation, task, params, plug_opts) do + context = authorization_context(params, plug_opts) + + case call_authorizer(plug_opts.authorize_task, operation, task, context) do + :ok -> {:ok, task} + true -> {:ok, task} + {:ok, true} -> {:ok, task} + {:ok, _identity} -> {:ok, task} + {:error, %Error{} = error} -> {:error, error} + _deny -> {:error, Error.task_not_found()} + end + end + + defp authorize_task_list(result, _params, %{authorize_task: nil}), do: result + + defp authorize_task_list(%{tasks: tasks} = result, params, plug_opts) do + authorized = + Enum.filter(tasks, fn task -> + match?({:ok, ^task}, authorize_task(:list, task, params, plug_opts)) + end) + + %{ + result + | tasks: authorized, + total_size: length(authorized), + page_size: length(authorized) + } + end + + defp call_authorizer(fun, operation, task, context) when is_function(fun, 3) do + fun.(operation, task, context) + end + + defp call_authorizer(fun, operation, task, _context) when is_function(fun, 2) do + fun.(operation, task) + end + + defp call_authorizer({module, function}, operation, task, context) do + apply(module, function, [operation, task, context]) + end + + defp authorization_context(params, plug_opts) do + %{metadata: request_metadata(params, plug_opts), params: params} + end + defp build_call_opts(params, plug_opts) do # 3-layer metadata merge: init → conn.private → JSON-RPC params - metadata = - plug_opts.metadata - |> merge_unless_nil(params["metadata"]) + metadata = request_metadata(params, plug_opts) [] |> maybe_put(:task_id, params["id"]) @@ -326,6 +386,10 @@ if Code.ensure_loaded?(Plug) do defp merge_unless_nil(base, nil), do: base defp merge_unless_nil(base, override), do: Map.merge(base, override) + defp request_metadata(params, plug_opts) do + merge_unless_nil(plug_opts.metadata, params["metadata"]) + end + defp maybe_put(opts, _key, nil), do: opts defp maybe_put(opts, key, val), do: [{key, val} | opts] diff --git a/test/a2a/plug_test.exs b/test/a2a/plug_test.exs index 842db78..d2766d1 100644 --- a/test/a2a/plug_test.exs +++ b/test/a2a/plug_test.exs @@ -30,6 +30,12 @@ defmodule A2A.PlugTest do } end + defp owner_authorizer do + fn _operation, task, %{metadata: metadata} -> + metadata["user_id"] == task.metadata["owner_id"] + end + end + defp json_body(conn) do Jason.decode!(conn.resp_body) end @@ -191,6 +197,49 @@ defmodule A2A.PlugTest do body = json_body(conn) assert body["error"]["code"] == -32_001 end + + test "authorize_task denies access without leaking task existence", %{agent: agent} do + opts = plug_opts(agent, authorize_task: owner_authorizer()) + + send_conn = + json_rpc_conn( + "message/send", + Map.put(message_params(), "metadata", %{"owner_id" => "u-1"}) + ) + |> A2A.Plug.call(opts) + + task_id = json_body(send_conn)["result"]["task"]["id"] + + conn = + json_rpc_conn("tasks/get", %{"id" => task_id}) + |> A2A.Plug.put_metadata(%{"user_id" => "u-2"}) + |> A2A.Plug.call(opts) + + body = json_body(conn) + assert body["error"]["code"] == -32_001 + assert body["error"]["message"] == "Task not found" + end + + test "authorize_task allows matching task owner", %{agent: agent} do + opts = plug_opts(agent, authorize_task: owner_authorizer()) + + send_conn = + json_rpc_conn( + "message/send", + Map.put(message_params(), "metadata", %{"owner_id" => "u-1"}) + ) + |> A2A.Plug.call(opts) + + task_id = json_body(send_conn)["result"]["task"]["id"] + + conn = + json_rpc_conn("tasks/get", %{"id" => task_id}) + |> A2A.Plug.put_metadata(%{"user_id" => "u-1"}) + |> A2A.Plug.call(opts) + + body = json_body(conn) + assert body["result"]["id"] == task_id + end end # -- tasks/cancel ------------------------------------------------------------ @@ -228,6 +277,66 @@ defmodule A2A.PlugTest do body = json_body(conn) assert body["error"]["code"] == -32_001 end + + test "authorize_task denies cancel before mutating task" do + agent = start_supervised!({A2A.Test.MultiTurnAgent, [name: nil]}) + opts = plug_opts(agent, authorize_task: owner_authorizer()) + + send_conn = + json_rpc_conn( + "message/send", + message_params("order pizza") |> Map.put("metadata", %{"owner_id" => "u-1"}) + ) + |> A2A.Plug.call(opts) + + task_id = json_body(send_conn)["result"]["task"]["id"] + + denied_conn = + json_rpc_conn("tasks/cancel", %{"id" => task_id}) + |> A2A.Plug.put_metadata(%{"user_id" => "u-2"}) + |> A2A.Plug.call(opts) + + assert json_body(denied_conn)["error"]["code"] == -32_001 + + get_conn = + json_rpc_conn("tasks/get", %{"id" => task_id}) + |> A2A.Plug.put_metadata(%{"user_id" => "u-1"}) + |> A2A.Plug.call(opts) + + assert json_body(get_conn)["result"]["status"]["state"] == "TASK_STATE_INPUT_REQUIRED" + end + end + + # -- tasks/list -------------------------------------------------------------- + + describe "tasks/list" do + test "authorize_task filters tasks from list results", %{agent: agent} do + opts = plug_opts(agent, authorize_task: owner_authorizer()) + + task_ids = + for owner_id <- ["u-1", "u-2"] do + conn = + json_rpc_conn( + "message/send", + message_params("hello #{owner_id}") + |> Map.put("metadata", %{"owner_id" => owner_id}) + ) + |> A2A.Plug.call(opts) + + json_body(conn)["result"]["task"]["id"] + end + + conn = + json_rpc_conn("tasks/list", %{}) + |> A2A.Plug.put_metadata(%{"user_id" => "u-1"}) + |> A2A.Plug.call(opts) + + body = json_body(conn) + + assert Enum.map(body["result"]["tasks"], & &1["id"]) == [List.first(task_ids)] + assert body["result"]["totalSize"] == 1 + assert body["result"]["pageSize"] == 1 + end end # -- Unknown method ----------------------------------------------------------