Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `A2A.Plug` task-level authorization hook for `tasks/get`, `tasks/cancel`, and
`tasks/list`

## [0.2.0] - 2026-03-06

### Added
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ forward "/a2a", A2A.Plug,

The agent card is served at `GET /.well-known/agent-card.json` by default.

### Task Access Control

`A2A.Plug` accepts an optional `:authorize_task` callback for task-scoped
operations:

```elixir
forward "/a2a", A2A.Plug,
agent: MyAgent,
base_url: "http://localhost:4000/a2a",
authorize_task: fn operation, task, %{metadata: metadata} ->
same_tenant? = metadata["tenant_id"] == task.metadata["tenant_id"]
same_tenant? and operation in [:get, :cancel, :list]
end
```

The callback runs before `tasks/get`, `tasks/cancel`, and `tasks/list` responses.
Denied `tasks/get` and `tasks/cancel` requests return `TaskNotFoundError` so
callers cannot distinguish nonexistent tasks from tasks they cannot access.

## Calling Remote Agents

`A2A.Client` discovers and communicates with remote A2A agents over HTTP. Requires the `req` optional dependency.
Expand Down Expand Up @@ -222,7 +241,8 @@ Key A2A spec features not yet covered:
- **REST / gRPC transports** — only JSON-RPC is supported
- **Version negotiation** — hardcoded to A2A v0.3
- **Task resubscribe** — reconnecting to active SSE streams
- **Security middleware** — auth plug, agent card signatures, task-level ACL (security scheme data modeling is complete)
- **Security middleware** — agent card signatures and OAuth flows (auth plug,
task ACL hook, and security scheme data modeling are complete)

See [SPEC.md](SPEC.md) for full details and roadmap.

Expand Down
20 changes: 15 additions & 5 deletions SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,21 @@ flow built into the client.
### Task-Level Access Control

The spec states: "Servers MUST NOT reveal the existence of resources the client
is not authorized to access." Currently any caller can access any task by ID.
is not authorized to access." `A2A.Plug` now supports an `:authorize_task`
callback for task-scoped JSON-RPC operations.

A callback or plug that maps authenticated identity to allowed task IDs /
context IDs. The runtime would check this before returning task data from
`tasks/get` or `tasks/cancel`.
- `tasks/get` and `tasks/cancel` call the callback before returning or mutating a
task. Denied requests return `TaskNotFoundError` so task IDs are not leaked.
- `tasks/list` filters the returned page through the same callback.
- The callback receives `(operation, task, context)` where `operation` is
`:get`, `:cancel`, or `:list`, and `context.metadata` contains the resolved
Plug metadata, including `A2A.Plug.Auth` identity under `"a2a.auth"` when that
plug is used.

Remaining hardening:

- Move authorization down into task stores that can filter before pagination.
- Add store-specific examples for tenant and user ownership policies.

### Agent Card Signature Verification

Expand All @@ -228,7 +238,7 @@ verification. Not yet implemented. Would require:
1. Agent card signature verification
2. Authenticated extended card endpoint
3. Client-side OAuth 2.0 flows
4. Task-level access control
4. Store-level authorization filters

---

Expand Down
106 changes: 85 additions & 21 deletions lib/a2a/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ if Code.ensure_loaded?(Plug) do
- `:metadata` — static metadata merged into every JSON-RPC call
(default: `%{}`). Useful for deployment-level metadata like
`%{"env" => "prod"}`. Overridden per-request by `put_metadata/2`.
- `:authorize_task` — optional authorization callback for task-scoped
operations. Called as `(operation, task, context)` before returning,
canceling, or listing tasks. Denied `tasks/get` and `tasks/cancel`
requests return `TaskNotFoundError` so task IDs are not leaked.

## Per-Request Overrides

Expand Down Expand Up @@ -116,7 +120,8 @@ if Code.ensure_loaded?(Plug) do
agent_card_path: Keyword.get(opts, :agent_card_path, [".well-known", "agent-card.json"]),
json_rpc_path: Keyword.get(opts, :json_rpc_path, []),
agent_card_opts: Keyword.get(opts, :agent_card_opts, []),
metadata: Keyword.get(opts, :metadata, %{})
metadata: Keyword.get(opts, :metadata, %{}),
authorize_task: Keyword.get(opts, :authorize_task)
}
end

Expand Down Expand Up @@ -271,35 +276,38 @@ if Code.ensure_loaded?(Plug) do
end

@impl A2A.JSONRPC
def handle_get(task_id, _params, %{agent: agent}) do
def handle_get(task_id, params, %{agent: agent, opts: plug_opts}) do
case GenServer.call(agent, {:get_task, task_id}) do
{:ok, task} -> {:ok, task}
{:ok, task} -> authorize_task(:get, task, params, plug_opts)
{:error, :not_found} -> {:error, Error.task_not_found()}
end
end

@impl A2A.JSONRPC
def handle_cancel(task_id, _params, %{agent: agent}) do
case GenServer.call(agent, {:cancel, task_id}) do
:ok ->
case GenServer.call(agent, {:get_task, task_id}) do
{:ok, task} -> {:ok, task}
{:error, _} -> {:error, Error.task_not_found()}
end

{:error, :not_found} ->
{:error, Error.task_not_found()}

{:error, reason} ->
{:error, Error.task_not_cancelable(inspect(reason))}
def handle_cancel(task_id, params, %{agent: agent, opts: plug_opts}) do
with {:ok, task} <- fetch_task(agent, task_id),
{:ok, _task} <- authorize_task(:cancel, task, params, plug_opts) do
case GenServer.call(agent, {:cancel, task_id}) do
:ok ->
fetch_task(agent, task_id)

{:error, :not_found} ->
{:error, Error.task_not_found()}

{:error, reason} ->
{:error, Error.task_not_cancelable(inspect(reason))}
end
else
{:error, :not_found} -> {:error, Error.task_not_found()}
{:error, %Error{} = error} -> {:error, error}
end
end

@impl A2A.JSONRPC
def handle_list(params, %{agent: agent}) do
def handle_list(params, %{agent: agent, opts: plug_opts}) do
case GenServer.call(agent, {:list_tasks, params}) do
{:ok, result} ->
{:ok, result}
{:ok, authorize_task_list(result, params, plug_opts)}

{:error, :invalid_page_token} ->
{:error, Error.invalid_params("\"pageToken\" is invalid")}
Expand All @@ -311,11 +319,63 @@ if Code.ensure_loaded?(Plug) do

# -- Helpers ---------------------------------------------------------------

defp fetch_task(agent, task_id) do
case GenServer.call(agent, {:get_task, task_id}) do
{:ok, task} -> {:ok, task}
{:error, :not_found} -> {:error, :not_found}
end
end

defp authorize_task(_operation, task, _params, %{authorize_task: nil}), do: {:ok, task}

defp authorize_task(operation, task, params, plug_opts) do
context = authorization_context(params, plug_opts)

case call_authorizer(plug_opts.authorize_task, operation, task, context) do
:ok -> {:ok, task}
true -> {:ok, task}
{:ok, true} -> {:ok, task}
{:ok, _identity} -> {:ok, task}
{:error, %Error{} = error} -> {:error, error}
_deny -> {:error, Error.task_not_found()}
end
end

defp authorize_task_list(result, _params, %{authorize_task: nil}), do: result

defp authorize_task_list(%{tasks: tasks} = result, params, plug_opts) do
authorized =
Enum.filter(tasks, fn task ->
match?({:ok, ^task}, authorize_task(:list, task, params, plug_opts))
end)

%{
result
| tasks: authorized,
total_size: length(authorized),
page_size: length(authorized)
}
end

defp call_authorizer(fun, operation, task, context) when is_function(fun, 3) do
fun.(operation, task, context)
end

defp call_authorizer(fun, operation, task, _context) when is_function(fun, 2) do
fun.(operation, task)
end

defp call_authorizer({module, function}, operation, task, context) do
apply(module, function, [operation, task, context])
end

defp authorization_context(params, plug_opts) do
%{metadata: request_metadata(params, plug_opts), params: params}
end

defp build_call_opts(params, plug_opts) do
# 3-layer metadata merge: init → conn.private → JSON-RPC params
metadata =
plug_opts.metadata
|> merge_unless_nil(params["metadata"])
metadata = request_metadata(params, plug_opts)

[]
|> maybe_put(:task_id, params["id"])
Expand All @@ -326,6 +386,10 @@ if Code.ensure_loaded?(Plug) do
defp merge_unless_nil(base, nil), do: base
defp merge_unless_nil(base, override), do: Map.merge(base, override)

defp request_metadata(params, plug_opts) do
merge_unless_nil(plug_opts.metadata, params["metadata"])
end

defp maybe_put(opts, _key, nil), do: opts
defp maybe_put(opts, key, val), do: [{key, val} | opts]

Expand Down
109 changes: 109 additions & 0 deletions test/a2a/plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ defmodule A2A.PlugTest do
}
end

defp owner_authorizer do
fn _operation, task, %{metadata: metadata} ->
metadata["user_id"] == task.metadata["owner_id"]
end
end

defp json_body(conn) do
Jason.decode!(conn.resp_body)
end
Expand Down Expand Up @@ -191,6 +197,49 @@ defmodule A2A.PlugTest do
body = json_body(conn)
assert body["error"]["code"] == -32_001
end

test "authorize_task denies access without leaking task existence", %{agent: agent} do
opts = plug_opts(agent, authorize_task: owner_authorizer())

send_conn =
json_rpc_conn(
"message/send",
Map.put(message_params(), "metadata", %{"owner_id" => "u-1"})
)
|> A2A.Plug.call(opts)

task_id = json_body(send_conn)["result"]["task"]["id"]

conn =
json_rpc_conn("tasks/get", %{"id" => task_id})
|> A2A.Plug.put_metadata(%{"user_id" => "u-2"})
|> A2A.Plug.call(opts)

body = json_body(conn)
assert body["error"]["code"] == -32_001
assert body["error"]["message"] == "Task not found"
end

test "authorize_task allows matching task owner", %{agent: agent} do
opts = plug_opts(agent, authorize_task: owner_authorizer())

send_conn =
json_rpc_conn(
"message/send",
Map.put(message_params(), "metadata", %{"owner_id" => "u-1"})
)
|> A2A.Plug.call(opts)

task_id = json_body(send_conn)["result"]["task"]["id"]

conn =
json_rpc_conn("tasks/get", %{"id" => task_id})
|> A2A.Plug.put_metadata(%{"user_id" => "u-1"})
|> A2A.Plug.call(opts)

body = json_body(conn)
assert body["result"]["id"] == task_id
end
end

# -- tasks/cancel ------------------------------------------------------------
Expand Down Expand Up @@ -228,6 +277,66 @@ defmodule A2A.PlugTest do
body = json_body(conn)
assert body["error"]["code"] == -32_001
end

test "authorize_task denies cancel before mutating task" do
agent = start_supervised!({A2A.Test.MultiTurnAgent, [name: nil]})
opts = plug_opts(agent, authorize_task: owner_authorizer())

send_conn =
json_rpc_conn(
"message/send",
message_params("order pizza") |> Map.put("metadata", %{"owner_id" => "u-1"})
)
|> A2A.Plug.call(opts)

task_id = json_body(send_conn)["result"]["task"]["id"]

denied_conn =
json_rpc_conn("tasks/cancel", %{"id" => task_id})
|> A2A.Plug.put_metadata(%{"user_id" => "u-2"})
|> A2A.Plug.call(opts)

assert json_body(denied_conn)["error"]["code"] == -32_001

get_conn =
json_rpc_conn("tasks/get", %{"id" => task_id})
|> A2A.Plug.put_metadata(%{"user_id" => "u-1"})
|> A2A.Plug.call(opts)

assert json_body(get_conn)["result"]["status"]["state"] == "TASK_STATE_INPUT_REQUIRED"
end
end

# -- tasks/list --------------------------------------------------------------

describe "tasks/list" do
test "authorize_task filters tasks from list results", %{agent: agent} do
opts = plug_opts(agent, authorize_task: owner_authorizer())

task_ids =
for owner_id <- ["u-1", "u-2"] do
conn =
json_rpc_conn(
"message/send",
message_params("hello #{owner_id}")
|> Map.put("metadata", %{"owner_id" => owner_id})
)
|> A2A.Plug.call(opts)

json_body(conn)["result"]["task"]["id"]
end

conn =
json_rpc_conn("tasks/list", %{})
|> A2A.Plug.put_metadata(%{"user_id" => "u-1"})
|> A2A.Plug.call(opts)

body = json_body(conn)

assert Enum.map(body["result"]["tasks"], & &1["id"]) == [List.first(task_ids)]
assert body["result"]["totalSize"] == 1
assert body["result"]["pageSize"] == 1
end
end

# -- Unknown method ----------------------------------------------------------
Expand Down
Loading