From 4c37c278b50397b377a42c102af31b1e3d734c09 Mon Sep 17 00:00:00 2001 From: Paulo Lacerda Date: Sun, 14 Jun 2026 12:55:29 -0300 Subject: [PATCH] feat(doctor): detect missing OpenAI data-plane RBAC on Foundry resource Adds a new `security.missing_openai_data_plane_rbac` check to `agentops doctor` that resolves the signed-in principal from the access token `oid` claim and lists role assignments at the Foundry account scope using `azure-mgmt-authorization`. When none of *Cognitive Services OpenAI User*, *Cognitive Services OpenAI Contributor* or *Cognitive Services Contributor* is present (directly or inherited), Doctor surfaces a WARNING with an actionable `az role assignment create` command pre-populated with the principal object id and Foundry account scope. The check is read-only and skips silently when the SDK, principal or scope cannot be resolved, matching the existing Doctor fail-open pattern. Closes #228 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- CHANGELOG.md | 13 + pyproject.toml | 1 + src/agentops/agent/analyzer.py | 4 + .../agent/checks/_rbac_authorization.py | 118 ++++++ .../agent/checks/rbac_openai_data_plane.py | 235 ++++++++++++ ...est_agent_checks_rbac_openai_data_plane.py | 335 ++++++++++++++++++ 6 files changed, 706 insertions(+) create mode 100644 src/agentops/agent/checks/_rbac_authorization.py create mode 100644 src/agentops/agent/checks/rbac_openai_data_plane.py create mode 100644 tests/unit/test_agent_checks_rbac_openai_data_plane.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4702848c..1820608d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ This format follows [Keep a Changelog](https://keepachangelog.com/) and adheres ## [Unreleased] +### Added +- **`agentops doctor` now detects missing OpenAI data-plane RBAC on the Foundry + resource.** A new `security.missing_openai_data_plane_rbac` check resolves the + signed-in principal (via the `oid` claim of the access token used by + `DefaultAzureCredential`) and lists role assignments at the Foundry account + scope using `azure-mgmt-authorization`. When none of *Cognitive Services + OpenAI User*, *Cognitive Services OpenAI Contributor* or *Cognitive Services + Contributor* is present (directly or inherited), Doctor surfaces an + actionable WARNING that includes the exact `az role assignment create` + command for *Cognitive Services OpenAI User* scoped to the Foundry account. + The check is read-only and skips silently when the SDK, principal or scope + cannot be resolved. ([#228](https://github.com/Azure/agentops/issues/228)) + ### Changed - **`agentops-pr` workflow templates now auto-detect a committed baseline.** Both the GitHub Actions (`.github/workflows/agentops-pr.yml`) and Azure diff --git a/pyproject.toml b/pyproject.toml index 6974c5a3..d53a67ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ agent = [ "azure-identity>=1.17,<2.0", "azure-mgmt-cognitiveservices>=13.5,<14.0", "azure-mgmt-monitor>=6.0,<7.0", + "azure-mgmt-authorization>=4.0,<5.0", ] [project.scripts] diff --git a/src/agentops/agent/analyzer.py b/src/agentops/agent/analyzer.py index 4b0c2f0d..b1ae3082 100644 --- a/src/agentops/agent/analyzer.py +++ b/src/agentops/agent/analyzer.py @@ -16,6 +16,9 @@ from agentops.agent.checks.opex_workspace import run_opex_workspace_check from agentops.agent.checks.opex import run_opex_check from agentops.agent.checks.posture import run_posture_check +from agentops.agent.checks.rbac_openai_data_plane import ( + run_rbac_openai_data_plane_check, +) from agentops.agent.checks.regression import run_regression_check from agentops.agent.checks.release_readiness import run_release_readiness_check from agentops.agent.checks.safety import run_safety_check @@ -145,6 +148,7 @@ def analyze( findings.extend(run_errors_check(monitor, foundry, config.checks.errors)) findings.extend(run_safety_check(history, config.checks.safety, monitor, foundry)) findings.extend(run_posture_check(resources, posture_config)) + findings.extend(run_rbac_openai_data_plane_check(resources)) findings.extend(run_opex_workspace_check(workspace)) findings.extend(run_governance_check(workspace)) findings.extend(run_observability_check(workspace)) diff --git a/src/agentops/agent/checks/_rbac_authorization.py b/src/agentops/agent/checks/_rbac_authorization.py new file mode 100644 index 00000000..77a250fb --- /dev/null +++ b/src/agentops/agent/checks/_rbac_authorization.py @@ -0,0 +1,118 @@ +"""Lazy Azure SDK glue for the ``rbac_openai_data_plane`` Doctor check. + +Kept in a private module so the parent check can attempt the lazy +import in a single place and stay silent when ``azure-identity`` / +``azure-mgmt-authorization`` are not installed. All errors that should +make the check skip are normalised into :class:`AuthorizationCheckError`. +""" + +from __future__ import annotations + +import logging +from typing import List + +log = logging.getLogger(__name__) + + +class AuthorizationCheckError(RuntimeError): + """Raised when the RBAC check cannot run for an environmental reason.""" + + +def resolve_signed_in_principal_object_id() -> str: + """Return the ``oid`` claim of the shared Azure credential's access token. + + Raises :class:`AuthorizationCheckError` when the credential chain cannot + return a token, or when the token does not expose an ``oid`` claim. + """ + try: + from agentops.agent.sources._credentials import ( + format_source_error, + get_shared_credential, + ) + except ImportError as exc: # pragma: no cover - shipped together + raise AuthorizationCheckError( + f"shared credential factory unavailable: {exc}" + ) from exc + + try: + credential = get_shared_credential(process_timeout=30) + token = credential.get_token("https://management.azure.com/.default") + except Exception as exc: # noqa: BLE001 - normalised to skip-error + raise AuthorizationCheckError(format_source_error(exc)) from exc + + from agentops.agent.checks.rbac_openai_data_plane import decode_oid_from_jwt + + oid = decode_oid_from_jwt(getattr(token, "token", "") or "") + if not oid: + raise AuthorizationCheckError( + "access token did not include an 'oid' claim; cannot identify " + "the signed-in principal" + ) + return oid + + +def list_principal_role_definition_ids( + *, + subscription_id: str, + scope: str, + principal_object_id: str, +) -> List[str]: + """List role definition GUIDs assigned to the principal at/above scope. + + Uses ``RoleAssignmentsOperations.list_for_scope`` with the + ``atScopeAndAbove() and assignedTo('')`` filter so management-plane + inheritance (subscription, resource group, account) is honoured. + """ + try: + from azure.mgmt.authorization import AuthorizationManagementClient + except ImportError as exc: + raise AuthorizationCheckError( + "azure-mgmt-authorization not installed; install " + "`agentops-accelerator[agent]` (or add the package directly) to " + "enable the OpenAI data-plane RBAC check" + ) from exc + + try: + from agentops.agent.sources._credentials import ( + format_source_error, + get_shared_credential, + ) + except ImportError as exc: # pragma: no cover - shipped together + raise AuthorizationCheckError( + f"shared credential factory unavailable: {exc}" + ) from exc + + try: + credential = get_shared_credential(process_timeout=30) + client = AuthorizationManagementClient( + credential=credential, + subscription_id=subscription_id, + ) + except Exception as exc: # noqa: BLE001 + raise AuthorizationCheckError(format_source_error(exc)) from exc + + try: + assignments = list( + client.role_assignments.list_for_scope( + scope=scope, + filter=( + f"atScopeAndAbove() and assignedTo('{principal_object_id}')" + ), + ) + ) + except Exception as exc: # noqa: BLE001 + raise AuthorizationCheckError(format_source_error(exc)) from exc + + role_definition_ids: List[str] = [] + for assignment in assignments: + rd_id = ( + getattr(assignment, "role_definition_id", None) + or getattr(getattr(assignment, "properties", None), "role_definition_id", None) + ) + if not rd_id: + continue + # role_definition_id is a full ARM id ending in `/`. + guid = rd_id.rstrip("/").rsplit("/", 1)[-1] + if guid: + role_definition_ids.append(guid) + return role_definition_ids diff --git a/src/agentops/agent/checks/rbac_openai_data_plane.py b/src/agentops/agent/checks/rbac_openai_data_plane.py new file mode 100644 index 00000000..dbe7158b --- /dev/null +++ b/src/agentops/agent/checks/rbac_openai_data_plane.py @@ -0,0 +1,235 @@ +"""Check: signed-in principal has Cognitive Services OpenAI User RBAC. + +Cloud eval graders and any other data-plane Azure OpenAI call run by +``agentops eval run`` need the **Cognitive Services OpenAI User** role +(or another role granting the +``Microsoft.CognitiveServices/accounts/OpenAI/*/action`` data action) on +the AI Services account that backs the Foundry project. Without it the +runtime raises ``PermissionDenied`` / ``AuthenticationError`` mid-run +even after Azure CLI authentication looks fine. + +Today the tutorial and the ``agentops-eval`` skill document this, and +the CLI surfaces a clearer warning since v0.3.6, but Doctor was silent +on the missing assignment. This check fills the gap by inspecting the +existing :class:`AzureResourcesPayload` (resource group, account name, +account resource id) and querying ``azure-mgmt-authorization`` for role +assignments at and above the account scope. + +The check is **read-only**: it never grants or modifies RBAC. It stays +silent when: + +- the Azure resources source is disabled or returned a non-``ok`` status; +- ``azure-mgmt-authorization`` is not installed; +- the signed-in principal cannot be identified (no ``oid`` claim); +- the role assignments listing fails (permissions / network / etc.). + +Issue: https://github.com/Azure/agentops/issues/228 +""" + +from __future__ import annotations + +import base64 +import binascii +import json +import logging +from typing import Any, List, Optional, Sequence + +from agentops.agent.findings import Category, Finding, Severity +from agentops.agent.sources.azure_resources import AzureResourcesPayload + +log = logging.getLogger(__name__) + +SOURCE_NAME = "azure_resources" + +# https://learn.microsoft.com/azure/role-based-access-control/built-in-roles/ai-machine-learning#cognitive-services-openai-user +COGNITIVE_SERVICES_OPENAI_USER_ROLE_ID: str = ( + "5e0bd9bd-7b93-4f28-af87-19fc36ad61bd" +) + +# Roles that *also* grant the OpenAI data action and therefore satisfy the +# same need as Cognitive Services OpenAI User. Listed by definition GUID +# so renames in display name do not break detection. +_OPENAI_DATA_PLANE_ROLE_IDS: frozenset[str] = frozenset( + { + # Cognitive Services OpenAI User + "5e0bd9bd-7b93-4f28-af87-19fc36ad61bd", + # Cognitive Services OpenAI Contributor + "a001fd3d-188f-4b5d-821b-7da978bf7442", + # Cognitive Services Contributor (broader admin role; superset of data + # action via control plane). + "25fbc0a9-bd7c-42a3-aa1a-3b75d497ee68", + } +) + +_FINDING_ID = "security.missing_openai_data_plane_rbac" + + +def run_rbac_openai_data_plane_check( + resources: Optional[AzureResourcesPayload], +) -> List[Finding]: + """Return findings if the signed-in principal lacks OpenAI data-plane RBAC.""" + if resources is None: + return [] + diag = resources.diagnostics or {} + if diag.get("status") != "ok": + return [] + target = diag.get("target") + account_name = diag.get("account") or ( + resources.account.name if resources.account else None + ) + resource_group = diag.get("resource_group") + subscription_id = _subscription_id_from_target(target) + if not target or not account_name or not subscription_id: + return [] + + try: # lazy import — keeps `agentops` CLI light + from ._rbac_authorization import ( + AuthorizationCheckError, + list_principal_role_definition_ids, + resolve_signed_in_principal_object_id, + ) + except ImportError as exc: # pragma: no cover - test env always installs it + log.debug("rbac_openai_data_plane: helper unavailable: %s", exc) + return [] + + try: + principal_object_id = resolve_signed_in_principal_object_id() + except AuthorizationCheckError as exc: + log.info( + "rbac_openai_data_plane: skipped (cannot resolve principal): %s", exc + ) + return [] + + try: + role_definition_ids = list_principal_role_definition_ids( + subscription_id=subscription_id, + scope=target, + principal_object_id=principal_object_id, + ) + except AuthorizationCheckError as exc: + log.info( + "rbac_openai_data_plane: skipped (role assignments listing failed): %s", + exc, + ) + return [] + + granted_role_ids = {rid.lower() for rid in role_definition_ids} + if granted_role_ids & {rid.lower() for rid in _OPENAI_DATA_PLANE_ROLE_IDS}: + return [] + + return [ + _missing_rbac_finding( + account_name=account_name, + resource_group=resource_group, + subscription_id=subscription_id, + scope=target, + principal_object_id=principal_object_id, + granted_role_ids=sorted(granted_role_ids), + ) + ] + + +def _subscription_id_from_target(target: Optional[str]) -> Optional[str]: + """Extract the subscription GUID from an ARM resource id.""" + if not target: + return None + parts = target.strip("/").split("/") + try: + idx = parts.index("subscriptions") + except ValueError: + return None + if idx + 1 >= len(parts): + return None + return parts[idx + 1] + + +def _missing_rbac_finding( + *, + account_name: str, + resource_group: Optional[str], + subscription_id: str, + scope: str, + principal_object_id: str, + granted_role_ids: Sequence[str], +) -> Finding: + rg_clause = ( + f" (resource group `{resource_group}`)" if resource_group else "" + ) + az_command = ( + "az role assignment create " + f"--assignee {principal_object_id} " + '--role "Cognitive Services OpenAI User" ' + f"--scope {scope}" + ) + return Finding( + id=_FINDING_ID, + severity=Severity.WARNING, + category=Category.SECURITY, + title=( + "Signed-in principal is missing Cognitive Services OpenAI User on " + f"`{account_name}`" + ), + summary=( + "The Doctor signed-in principal does not hold the **Cognitive " + "Services OpenAI User** role (or any role with the OpenAI " + "data-plane action) at or above the AI Services account " + f"`{account_name}`{rg_clause}. Cloud eval graders and other " + "data-plane Azure OpenAI calls will fail with `PermissionDenied` " + "/ `AuthenticationError` until this assignment exists and Entra " + "ID propagates it (typically a few minutes)." + ), + recommendation=( + "Grant the role at the AI Services account scope (preferred) or " + "the resource group scope, then wait for propagation before " + f"re-running `agentops eval run`. Suggested command:\n\n" + f"```bash\n{az_command}\n```\n\n" + "If your team scopes this role at the subscription or resource " + "group level instead, grant it there. Doctor will detect any " + "role at or above the account scope. If you intentionally use a " + "different role granting " + "`Microsoft.CognitiveServices/accounts/OpenAI/*/action`, add its " + "definition GUID to " + "`agentops/agent/checks/rbac_openai_data_plane.py:" + "_OPENAI_DATA_PLANE_ROLE_IDS`." + ), + source=SOURCE_NAME, + evidence={ + "account": account_name, + "resource_group": resource_group, + "subscription_id": subscription_id, + "scope": scope, + "principal_object_id": principal_object_id, + "granted_role_definition_ids": list(granted_role_ids), + "required_role": "Cognitive Services OpenAI User", + "required_role_id": COGNITIVE_SERVICES_OPENAI_USER_ROLE_ID, + "remediation_command": az_command, + }, + ) + + +# Re-exported only so callers and tests can decode an Entra ID access token's +# ``oid`` claim without depending on a dedicated JWT library. Kept here (not +# in the helper module) because the helper module lazy-imports azure SDKs. + + +def decode_oid_from_jwt(token: str) -> Optional[str]: + """Return the ``oid`` (object id) claim of an Entra access token. + + Returns ``None`` when the token is malformed, when the payload cannot + be base64-decoded, or when the JSON has no ``oid`` field. The token's + signature is **not** verified — Doctor consumes it strictly as a + self-attested identifier already produced by the calling Azure SDK. + """ + if not token or token.count(".") < 2: + return None + payload = token.split(".")[1] + padding = "=" * (-len(payload) % 4) + try: + raw = base64.urlsafe_b64decode(payload + padding) + claims: Any = json.loads(raw.decode("utf-8")) + except (binascii.Error, UnicodeDecodeError, ValueError): + return None + if not isinstance(claims, dict): + return None + oid = claims.get("oid") + return oid if isinstance(oid, str) and oid else None diff --git a/tests/unit/test_agent_checks_rbac_openai_data_plane.py b/tests/unit/test_agent_checks_rbac_openai_data_plane.py new file mode 100644 index 00000000..2df70288 --- /dev/null +++ b/tests/unit/test_agent_checks_rbac_openai_data_plane.py @@ -0,0 +1,335 @@ +"""Unit tests for ``agentops.agent.checks.rbac_openai_data_plane``. + +The check must: + +- Stay silent when the Azure resources source skipped or did not finish. +- Decode the signed-in principal's ``oid`` claim from the access token + exposed by the shared credential factory. +- Detect that the principal already holds **Cognitive Services OpenAI + User** (or another role granting the OpenAI data action) at the AI + Services account scope or above. +- Emit a single :class:`Finding` with the ``az role assignment create`` + remediation when the role is missing. + +All Azure SDK calls are mocked so the test never hits Azure. +""" + +from __future__ import annotations + +import base64 +import json +from typing import Any, List +from unittest.mock import MagicMock, patch + +import pytest + +from agentops.agent.checks import rbac_openai_data_plane as check +from agentops.agent.checks._rbac_authorization import AuthorizationCheckError +from agentops.agent.findings import Category, Severity +from agentops.agent.sources.azure_resources import ( + AzureResourcesPayload, + CognitiveAccountSnapshot, +) + + +_SUBSCRIPTION_ID = "11111111-1111-1111-1111-111111111111" +_PRINCIPAL_OID = "22222222-2222-2222-2222-222222222222" +_ACCOUNT_TARGET = ( + f"/subscriptions/{_SUBSCRIPTION_ID}/resourceGroups/rg-x/providers/" + "Microsoft.CognitiveServices/accounts/ai-account" +) + + +def _payload(*, status: str = "ok") -> AzureResourcesPayload: + return AzureResourcesPayload( + account=CognitiveAccountSnapshot(name="ai-account"), + diagnostics={ + "status": status, + "target": _ACCOUNT_TARGET, + "account": "ai-account", + "resource_group": "rg-x", + }, + ) + + +def _fake_jwt(claims: dict[str, Any]) -> str: + """Return a token ``header.payload.signature`` that decodes to ``claims``.""" + header = base64.urlsafe_b64encode(b'{"alg":"none"}').rstrip(b"=").decode() + payload = base64.urlsafe_b64encode( + json.dumps(claims).encode("utf-8") + ).rstrip(b"=").decode() + return f"{header}.{payload}.sig" + + +# --------------------------------------------------------------------------- +# decode_oid_from_jwt +# --------------------------------------------------------------------------- + + +def test_decode_oid_returns_oid_claim() -> None: + token = _fake_jwt({"oid": _PRINCIPAL_OID, "aud": "x"}) + assert check.decode_oid_from_jwt(token) == _PRINCIPAL_OID + + +def test_decode_oid_returns_none_for_malformed_token() -> None: + assert check.decode_oid_from_jwt("not-a-token") is None + assert check.decode_oid_from_jwt("") is None + assert check.decode_oid_from_jwt("a.b") is None + + +def test_decode_oid_returns_none_when_payload_invalid_json() -> None: + token = "header." + base64.urlsafe_b64encode(b"not-json").rstrip(b"=").decode() + ".sig" + assert check.decode_oid_from_jwt(token) is None + + +def test_decode_oid_returns_none_when_oid_absent() -> None: + token = _fake_jwt({"aud": "x", "tid": "tenant"}) + assert check.decode_oid_from_jwt(token) is None + + +# --------------------------------------------------------------------------- +# run_rbac_openai_data_plane_check -- skip paths +# --------------------------------------------------------------------------- + + +def test_skips_when_resources_is_none() -> None: + assert check.run_rbac_openai_data_plane_check(None) == [] + + +def test_skips_when_status_not_ok() -> None: + assert check.run_rbac_openai_data_plane_check(_payload(status="skipped")) == [] + + +def test_skips_when_target_missing() -> None: + payload = AzureResourcesPayload( + account=CognitiveAccountSnapshot(name="ai-account"), + diagnostics={"status": "ok", "account": "ai-account"}, + ) + assert check.run_rbac_openai_data_plane_check(payload) == [] + + +def test_skips_when_principal_cannot_be_resolved() -> None: + with patch.object( + check, + "run_rbac_openai_data_plane_check", + wraps=check.run_rbac_openai_data_plane_check, + ): + with patch( + "agentops.agent.checks._rbac_authorization." + "resolve_signed_in_principal_object_id", + side_effect=AuthorizationCheckError("no oid"), + ): + assert check.run_rbac_openai_data_plane_check(_payload()) == [] + + +def test_skips_when_role_listing_fails() -> None: + with patch( + "agentops.agent.checks._rbac_authorization." + "resolve_signed_in_principal_object_id", + return_value=_PRINCIPAL_OID, + ), patch( + "agentops.agent.checks._rbac_authorization." + "list_principal_role_definition_ids", + side_effect=AuthorizationCheckError("boom"), + ): + assert check.run_rbac_openai_data_plane_check(_payload()) == [] + + +# --------------------------------------------------------------------------- +# run_rbac_openai_data_plane_check -- happy + finding paths +# --------------------------------------------------------------------------- + + +def test_no_finding_when_principal_has_openai_user_role() -> None: + with patch( + "agentops.agent.checks._rbac_authorization." + "resolve_signed_in_principal_object_id", + return_value=_PRINCIPAL_OID, + ), patch( + "agentops.agent.checks._rbac_authorization." + "list_principal_role_definition_ids", + return_value=[check.COGNITIVE_SERVICES_OPENAI_USER_ROLE_ID], + ): + assert check.run_rbac_openai_data_plane_check(_payload()) == [] + + +def test_no_finding_when_principal_has_openai_contributor_role() -> None: + with patch( + "agentops.agent.checks._rbac_authorization." + "resolve_signed_in_principal_object_id", + return_value=_PRINCIPAL_OID, + ), patch( + "agentops.agent.checks._rbac_authorization." + "list_principal_role_definition_ids", + # Cognitive Services OpenAI Contributor (a001fd3d-...) also grants + # the data action. + return_value=["a001fd3d-188f-4b5d-821b-7da978bf7442"], + ): + assert check.run_rbac_openai_data_plane_check(_payload()) == [] + + +def test_emits_finding_when_role_missing() -> None: + with patch( + "agentops.agent.checks._rbac_authorization." + "resolve_signed_in_principal_object_id", + return_value=_PRINCIPAL_OID, + ), patch( + "agentops.agent.checks._rbac_authorization." + "list_principal_role_definition_ids", + # Reader only -- not enough. + return_value=["acdd72a7-3385-48ef-bd42-f606fba81ae7"], + ): + findings = check.run_rbac_openai_data_plane_check(_payload()) + + assert len(findings) == 1 + finding = findings[0] + assert finding.id == "security.missing_openai_data_plane_rbac" + assert finding.severity is Severity.WARNING + assert finding.category is Category.SECURITY + assert finding.source == "azure_resources" + # Title flags the affected account by name. + assert "ai-account" in finding.title + # Recommendation contains the actionable az command with the right scope, + # role, and principal oid. + assert "az role assignment create" in finding.recommendation + assert _PRINCIPAL_OID in finding.recommendation + assert "Cognitive Services OpenAI User" in finding.recommendation + assert _ACCOUNT_TARGET in finding.recommendation + # Evidence captures everything the Doctor report needs to render the row. + ev = finding.evidence + assert ev["account"] == "ai-account" + assert ev["resource_group"] == "rg-x" + assert ev["subscription_id"] == _SUBSCRIPTION_ID + assert ev["scope"] == _ACCOUNT_TARGET + assert ev["principal_object_id"] == _PRINCIPAL_OID + assert ev["required_role"] == "Cognitive Services OpenAI User" + assert ev["required_role_id"] == check.COGNITIVE_SERVICES_OPENAI_USER_ROLE_ID + assert ev["granted_role_definition_ids"] == [ + "acdd72a7-3385-48ef-bd42-f606fba81ae7" + ] + assert "az role assignment create" in ev["remediation_command"] + + +def test_subscription_id_extracted_from_target() -> None: + assert check._subscription_id_from_target(_ACCOUNT_TARGET) == _SUBSCRIPTION_ID + assert check._subscription_id_from_target(None) is None + assert check._subscription_id_from_target("not-an-arm-id") is None + + +# --------------------------------------------------------------------------- +# Helper module: resolve_signed_in_principal_object_id +# --------------------------------------------------------------------------- + + +def test_resolve_principal_returns_oid_from_token() -> None: + from agentops.agent.checks import _rbac_authorization as helper + + token = MagicMock() + token.token = _fake_jwt({"oid": _PRINCIPAL_OID}) + cred = MagicMock() + cred.get_token.return_value = token + + with patch( + "agentops.agent.sources._credentials.get_shared_credential", + return_value=cred, + ): + assert helper.resolve_signed_in_principal_object_id() == _PRINCIPAL_OID + cred.get_token.assert_called_once_with( + "https://management.azure.com/.default" + ) + + +def test_resolve_principal_raises_when_token_lacks_oid() -> None: + from agentops.agent.checks import _rbac_authorization as helper + + token = MagicMock() + token.token = _fake_jwt({"aud": "x"}) # no oid claim + cred = MagicMock() + cred.get_token.return_value = token + + with patch( + "agentops.agent.sources._credentials.get_shared_credential", + return_value=cred, + ): + with pytest.raises(AuthorizationCheckError, match="oid"): + helper.resolve_signed_in_principal_object_id() + + +def test_resolve_principal_raises_on_credential_failure() -> None: + from agentops.agent.checks import _rbac_authorization as helper + + cred = MagicMock() + cred.get_token.side_effect = RuntimeError("network down") + + with patch( + "agentops.agent.sources._credentials.get_shared_credential", + return_value=cred, + ): + with pytest.raises(AuthorizationCheckError): + helper.resolve_signed_in_principal_object_id() + + +# --------------------------------------------------------------------------- +# Helper module: list_principal_role_definition_ids +# --------------------------------------------------------------------------- + + +def test_list_role_definition_ids_extracts_guid_suffix() -> None: + from agentops.agent.checks import _rbac_authorization as helper + + a1 = MagicMock(role_definition_id=( + f"/subscriptions/{_SUBSCRIPTION_ID}/providers/Microsoft.Authorization/" + "roleDefinitions/" + check.COGNITIVE_SERVICES_OPENAI_USER_ROLE_ID + )) + a2 = MagicMock(role_definition_id=( + f"/subscriptions/{_SUBSCRIPTION_ID}/providers/Microsoft.Authorization/" + "roleDefinitions/acdd72a7-3385-48ef-bd42-f606fba81ae7" + )) + role_assignments = MagicMock() + role_assignments.list_for_scope.return_value = [a1, a2] + client = MagicMock() + client.role_assignments = role_assignments + + with patch( + "agentops.agent.sources._credentials.get_shared_credential", + return_value=MagicMock(), + ), patch( + "azure.mgmt.authorization.AuthorizationManagementClient", + return_value=client, + ): + ids: List[str] = helper.list_principal_role_definition_ids( + subscription_id=_SUBSCRIPTION_ID, + scope=_ACCOUNT_TARGET, + principal_object_id=_PRINCIPAL_OID, + ) + + assert ids == [ + check.COGNITIVE_SERVICES_OPENAI_USER_ROLE_ID, + "acdd72a7-3385-48ef-bd42-f606fba81ae7", + ] + role_assignments.list_for_scope.assert_called_once() + kwargs = role_assignments.list_for_scope.call_args.kwargs + assert kwargs["scope"] == _ACCOUNT_TARGET + assert _PRINCIPAL_OID in kwargs["filter"] + assert "atScopeAndAbove" in kwargs["filter"] + + +def test_list_role_definition_ids_raises_when_sdk_missing() -> None: + """Simulate ``azure-mgmt-authorization`` not installed.""" + from agentops.agent.checks import _rbac_authorization as helper + import sys + + saved = sys.modules.pop("azure.mgmt.authorization", None) + sys.modules["azure.mgmt.authorization"] = None # type: ignore[assignment] + try: + with pytest.raises(AuthorizationCheckError, match="azure-mgmt-authorization"): + helper.list_principal_role_definition_ids( + subscription_id=_SUBSCRIPTION_ID, + scope=_ACCOUNT_TARGET, + principal_object_id=_PRINCIPAL_OID, + ) + finally: + if saved is not None: + sys.modules["azure.mgmt.authorization"] = saved + else: + sys.modules.pop("azure.mgmt.authorization", None)