diff --git a/google/auth/credentials.py b/google/auth/credentials.py index 2c67e0443..51c16d873 100644 --- a/google/auth/credentials.py +++ b/google/auth/credentials.py @@ -17,7 +17,9 @@ import abc from enum import Enum +import json import os +import typing from google.auth import _helpers, environment_vars from google.auth import exceptions @@ -26,6 +28,8 @@ from google.auth._refresh_worker import RefreshThreadManager DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" +NO_OP_TRUST_BOUNDARY_LOCATIONS: "typing.Tuple[str]" = () +NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" class Credentials(_BaseCredentials): @@ -178,22 +182,7 @@ def apply(self, headers, token=None): token (Optional[str]): If specified, overrides the current access token. """ - self._apply(headers, token=token) - """Trust boundary value will be a cached value from global lookup. - - The response of trust boundary will be a list of regions and a hex - encoded representation. - - An example of global lookup response: - { - "locations": [ - "us-central1", "us-east1", "europe-west1", "asia-east1" - ] - "encoded_locations": "0xA30" - } - """ - if self._trust_boundary is not None: - headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"] + self._apply(headers, token) if self.quota_project_id: headers["x-goog-user-project"] = self.quota_project_id @@ -299,6 +288,100 @@ def with_universe_domain(self, universe_domain): ) +class CredentialsWithTrustBoundary(Credentials): + """Abstract base for credentials supporting ``with_trust_boundary`` factory""" + + def with_trust_boundary(self, trust_boundary): + """Returns a copy of these credentials with a modified trust boundary. + + Args: + trust_boundary Mapping[str, str]: The trust boundary to use for the + credential. This should be a map with a "locations" key that maps to + a list of GCP regions, and a "encodedLocations" key that maps to a + hex string. + + Returns: + google.auth.credentials.Credentials: A new credentials instance. + """ + raise NotImplementedError("This credential does not support trust boundaries.") + + def apply(self, headers, token=None): + """Apply the token to the authentication header.""" + super().apply(headers, token) + if self._trust_boundary is not None: + headers["x-allowed-locations"] = self._trust_boundary["encodedLocations"] + + def _refresh_trust_boundary(self, request): + """Triggers a refresh of the trust boundary and updates the cache if necessary. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + + Raises: + google.auth.exceptions.RefreshError: If the trust boundary could + not be refreshed and no cached value is available. + """ + # Do not trigger refresh if credential has a cached no-op trust boundary. + if self._has_no_op_trust_boundary(): + return + new_trust_boundary = {} + try: + new_trust_boundary = self._lookup_trust_boundary(request) + except exceptions.RefreshError as error: + # If the call to the lookup API failed, check if there is a trust boundary + # already cached. If there is, do nothing. If not, then throw the error. + if self._trust_boundary is None: + raise (error) + return + else: + self._trust_boundary = new_trust_boundary + + @abc.abstractmethod + def _lookup_trust_boundary(self, request): + """Calls the trust boundary lookup API to refresh the trust boundary cache. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + + Returns: + trust_boundary (dict): The trust boundary object returned by the lookup API. + + Raises: + google.auth.exceptions.RefreshError: If the trust boundary could not be + retrieved. + """ + # pylint: disable=missing-raises-doc + # (pylint doesn't recognize that this is abstract) + raise NotImplementedError("_lookup_trust_boundary must be implemented") + + @staticmethod + def _parse_trust_boundary(trust_boundary_string: str): + try: + trust_boundary = json.loads(trust_boundary_string) + if ( + "locations" not in trust_boundary + or "encodedLocations" not in trust_boundary + ): + raise exceptions.MalformedError + return trust_boundary + except Exception: + raise exceptions.MalformedError( + "Cannot parse trust boundary {}".format(trust_boundary_string) + ) + + def _has_no_op_trust_boundary(self): + if ( + self._trust_boundary != None + and self._trust_boundary["locations"] == NO_OP_TRUST_BOUNDARY_LOCATIONS + and self._trust_boundary["encodedLocations"] + == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS + ): + return True + return False + + class AnonymousCredentials(Credentials): """Credentials that do not provide any authentication information. diff --git a/google/auth/external_account.py b/google/auth/external_account.py index 161e6c50c..737211956 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -81,6 +81,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, + credentials.CredentialsWithTrustBoundary, metaclass=abc.ABCMeta, ): """Base class for all external account credentials. @@ -133,14 +134,14 @@ def __init__( authorization grant. default_scopes (Optional[Sequence[str]]): Default scopes passed by a Google client library. Use 'scopes' for user-defined scopes. - workforce_pool_user_project (Optona[str]): The optional workforce pool user + workforce_pool_user_project (Optonal[str]): The optional workforce pool user project number when the credential corresponds to a workforce pool and not a workload identity pool. The underlying principal must still have serviceusage.services.use IAM permission to use the project for billing/quota. universe_domain (str): The universe domain. The default universe domain is googleapis.com. - trust_boundary (str): String representation of trust boundary meta. + trust_boundary (str): String representation of trust boundary metadata. Raises: google.auth.exceptions.RefreshError: If the generateAccessToken endpoint returned an error. @@ -167,9 +168,9 @@ def __init__( self._default_scopes = default_scopes self._workforce_pool_user_project = workforce_pool_user_project self._trust_boundary = { - "locations": [], - "encoded_locations": "0x0", - } # expose a placeholder trust boundary value. + "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } # Sets a no-op trust boundary value. if self._client_id: self._client_auth = utils.ClientAuthentication( @@ -456,6 +457,12 @@ def refresh(self, request): self.expiry = now + lifetime + def _lookup_trust_boundary(self, request): + """Trust boundary lookup for external account. Currently a no-op because the lookup + endpoint does not support external account lookup. + """ + return + def _make_copy(self): kwargs = self._constructor_args() new_cred = self.__class__(**kwargs) diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index ed7e3f00b..ebbb7a48e 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -46,6 +46,9 @@ _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" +_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( + "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" +) def _make_iam_token_request( @@ -111,7 +114,10 @@ def _make_iam_token_request( class Credentials( - credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing + credentials.Scoped, + credentials.CredentialsWithQuotaProject, + credentials.Signing, + credentials.CredentialsWithTrustBoundary, ): """This module defines impersonated credentials which are essentially impersonated identities. @@ -184,6 +190,7 @@ def __init__( lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, quota_project_id=None, iam_endpoint_override=None, + trust_boundary=None, ): """ Args: @@ -214,6 +221,7 @@ def __init__( subject (Optional[str]): sub field of a JWT. This field should only be set if you wish to impersonate as a user. This feature is useful when using domain wide delegation. + trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() @@ -245,6 +253,7 @@ def __init__( self._quota_project_id = quota_project_id self._iam_endpoint_override = iam_endpoint_override self._cred_file_path = None + self._trust_boundary = trust_boundary def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_IMPERSONATE @@ -252,6 +261,7 @@ def _metric_header_for_usage(self): @_helpers.copy_docstring(credentials.Credentials) def refresh(self, request): self._update_token(request) + self._refresh_trust_boundary(request) def _update_token(self, request): """Updates credentials with a new access_token representing @@ -325,6 +335,19 @@ def _update_token(self, request): iam_endpoint_override=self._iam_endpoint_override, ) + def _lookup_trust_boundary(self, request): + """Trust boundary lookup for service account using endpoint: + iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations + And we are using a fresh access token as basic auth. + """ + # Skip trust boundary flow for non-gdu universe domain. + if self.universe_domain == credentials.DEFAULT_UNIVERSE_DOMAIN: + return + url = _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + self.universe_domain, self.service_account_email + ) + return _client.lookup_trust_boundary(request, url, self.token) + def sign_bytes(self, message): from google.auth.transport.requests import AuthorizedSession diff --git a/google/oauth2/_client.py b/google/oauth2/_client.py index 5a9fc3503..d6cc9a53f 100644 --- a/google/oauth2/_client.py +++ b/google/oauth2/_client.py @@ -506,3 +506,145 @@ def refresh_grant( request, token_uri, body, can_retry=can_retry ) return _handle_refresh_grant_response(response_data, refresh_token) + + +def lookup_trust_boundary(request, url, access_token): + """ Implements the global lookup of a credential trust boundary. + For the lookup, we send a request to the global lookup endpoint and then + parse the response. Service account credentials, workload identity + pools and workforce pools implementation may have trust boundaries configured. + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + url (str): The trust boundary lookup url. + access_token (Optional(str)): The access token needed to make the request + headers (Optional[Mapping[str, str]]): The headers for the request. + kwargs: Additional arguments passed on to the request method. The + kwargs will be passed to `requests.request` method, see: + https://docs.python-requests.org/en/latest/api/#requests.request. + For example, you can use `cert=("cert_pem_path", "key_pem_path")` + to set up client side SSL certificate, and use + `verify="ca_bundle_path"` to set up the CA certificates for sever + side SSL certificate verification. + Returns: + Mapping[str,list|str]: A dictionary containing + "locations" as a list of allowed locations as strings and + "encodedLocations" as a hex string. + e.g: + { + "locations": [ + "us-central1", "us-east1", "europe-west1", "asia-east1" + ], + "encodedLocations": "0xA30" + } + If the credential is not set up with explicit trust boundaries, a trust boundary + of "all" will be returned as a default response. + { + "locations": [], + "encodedLocations": "0x0" + } + Raises: + exceptions.RefreshError: If the response status code is not 200. + exceptions.MalformedError: If the response is not in a valid format. + """ + + response_data = _lookup_trust_boundary_request(request, url, access_token, True) + if "locations" not in response_data or "encodedLocations" not in response_data: + raise exceptions.MalformedError( + "Invalid trust boundary info: {}".format(response_data) + ) + return response_data + + +def _lookup_trust_boundary_request( + request, url, access_token, can_retry=True, **kwargs +): + """Makes a request to the trust boundary lookup endpoint. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + url (str): The trust boundary lookup url. + access_token (Optional(str)): The access token needed to make the request + can_retry (bool): Enable or disable request retry behavior. Defaults to true. + kwargs: Additional arguments passed on to the request method. The + kwargs will be passed to `requests.request` method, see: + https://docs.python-requests.org/en/latest/api/#requests.request. + For example, you can use `cert=("cert_pem_path", "key_pem_path")` + to set up client side SSL certificate, and use + `verify="ca_bundle_path"` to set up the CA certificates for sever + side SSL certificate verification. + + Returns: + Mapping[str, str]: The JSON-decoded response data. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + response_status_ok, response_data, retryable_error = _lookup_trust_boundary_request_no_throw( + request, url, access_token=access_token, can_retry=can_retry, **kwargs + ) + if not response_status_ok: + _handle_error_response(response_data, retryable_error) + return response_data + + +def _lookup_trust_boundary_request_no_throw( + request, url, access_token=None, can_retry=True, **kwargs +): + """Makes a request to the trust boundary lookup endpoint. This + function doesn't throw on response errors. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + url (str): The trust boundary lookup url. + access_token (Optional(str)): The access token needed to make the request + can_retry (bool): Enable or disable request retry behavior. Defaults to true. + kwargs: Additional arguments passed on to the request method. The + kwargs will be passed to `requests.request` method, see: + https://docs.python-requests.org/en/latest/api/#requests.request. + For example, you can use `cert=("cert_pem_path", "key_pem_path")` + to set up client side SSL certificate, and use + `verify="ca_bundle_path"` to set up the CA certificates for sever + side SSL certificate verification. + + Returns: + Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating + if the request is successful, a mapping for the JSON-decoded response + data and in the case of an error a boolean indicating if the error + is retryable. + """ + + headers_to_use = {"Authorization", "Bearer {}".format(access_token)} + + response_data = {} + retryable_error = False + + retries = _exponential_backoff.ExponentialBackoff() + for _ in retries: + response = request(method="GET", url=url, headers=headers_to_use, **kwargs) + response_body = ( + response.data.decode("utf-8") + if hasattr(response.data, "decode") + else response.data + ) + + try: + # response_body should be a JSON + response_data = json.loads(response_body) + except ValueError: + response_data = response_body + + if response.status == http_client.OK: + return True, response_data, None + + retryable_error = _can_retry( + status_code=response.status, response_data=response_data + ) + + if not can_retry or not retryable_error: + return False, response_data, retryable_error + + return False, response_data, retryable_error diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py index 3e84194ac..442b1d4b2 100644 --- a/google/oauth2/service_account.py +++ b/google/oauth2/service_account.py @@ -84,6 +84,9 @@ _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" +_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( + "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" +) class Credentials( @@ -91,6 +94,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, + credentials.CredentialsWithTrustBoundary, ): """Service account credentials @@ -164,7 +168,7 @@ def __init__( universe_domain (str): The universe domain. The default universe domain is googleapis.com. For default value self signed jwt is used for token refresh. - trust_boundary (str): String representation of trust boundary meta. + trust_boundary (Mapping[str,str]): A credential trust boundary. .. note:: Typically one of the helper constructors :meth:`from_service_account_file` or @@ -194,7 +198,7 @@ def __init__( self._additional_claims = additional_claims else: self._additional_claims = {} - self._trust_boundary = {"locations": [], "encoded_locations": "0x0"} + self._trust_boundary = trust_boundary @classmethod def _from_signer_and_info(cls, signer, info, **kwargs): @@ -294,6 +298,7 @@ def _make_copy(self): additional_claims=self._additional_claims.copy(), always_use_jwt_access=self._always_use_jwt_access, universe_domain=self._universe_domain, + trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path return cred @@ -450,6 +455,7 @@ def refresh(self, request): ) self.token = access_token self.expiry = expiry + self._refresh_trust_boundary(request) def _create_self_signed_jwt(self, audience): """Create a self-signed JWT from the credentials if requirements are met. @@ -491,6 +497,19 @@ def _create_self_signed_jwt(self, audience): self, audience ) + def _lookup_trust_boundary(self, request): + """Trust boundary lookup for service account using endpoint: + iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations + And we are using a fresh access token as basic auth. + """ + # Skip trust boundary flow for non-gdu universe domain. + if self._universe_domain == credentials.DEFAULT_UNIVERSE_DOMAIN: + return + url = _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + self._universe_domain, self._service_account_email + ) + return _client.lookup_trust_boundary(request, url, self.token) + @_helpers.copy_docstring(credentials.Signing) def sign_bytes(self, message): return self._signer.sign(message) diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py index 6a085729f..21b6d6deb 100644 --- a/tests/oauth2/test__client.py +++ b/tests/oauth2/test__client.py @@ -630,3 +630,130 @@ def test__token_endpoint_request_no_throw_with_retry(can_retry): assert mock_request.call_count == 3 else: assert mock_request.call_count == 1 + + +def test_lookup_trust_boundary(): + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0x80080000000000", + } + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + response = _client.lookup_trust_boundary(mock_request, mock.Mock(), mock.Mock()) + + assert response["encodedLocations"] == "0x80080000000000" + assert response["locations"] == ["us-central1", "us-east1"] + + +def test_lookup_trust_boundary_no_op_response(): + response_data = {"locations": [], "encodedLocations": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + response = _client.lookup_trust_boundary(mock_request, mock.Mock(), mock.Mock()) + + assert response["encodedLocations"] == "0x0" + assert response["locations"] == [] + + +def test_lookup_trust_boundary_error(): + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.INTERNAL_SERVER_ERROR + mock_response.data = "this is an error message" + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + with pytest.raises(exceptions.RefreshError) as excinfo: + _client.lookup_trust_boundary(mock_request, mock.Mock(), mock.Mock()) + assert excinfo.match("this is an error message") + + +def test_lookup_trust_boundary_missing_location(): + response_data = {"bad_field": [], "encodedLocations": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + with pytest.raises(exceptions.MalformedError) as excinfo: + _client.lookup_trust_boundary(mock_request, mock.Mock(), mock.Mock()) + assert excinfo.match("Invalid trust boundary info") + + +def test_lookup_trust_boundary_missing_encoded_locations(): + response_data = {"locations": [], "bad_field": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + with pytest.raises(exceptions.MalformedError) as excinfo: + _client.lookup_trust_boundary(mock_request, mock.Mock(), mock.Mock()) + assert excinfo.match("Invalid trust boundary info") + + +def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): + retryable_error = mock.create_autospec(transport.Response, instance=True) + retryable_error.status = http_client.BAD_REQUEST + retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( + "utf-8" + ) + + unretryable_error = mock.create_autospec(transport.Response, instance=True) + unretryable_error.status = http_client.BAD_REQUEST + unretryable_error.data = json.dumps({"error_description": "invalid_scope"}).encode( + "utf-8" + ) + + request = mock.create_autospec(transport.Request) + + request.side_effect = [retryable_error, retryable_error, unretryable_error] + + with pytest.raises(exceptions.RefreshError): + _client._lookup_trust_boundary_request( + request, "http://example.com", mock.Mock() + ) + # request should be called three times. Two retryable errors and one + # unretryable error to break the retry loop. + assert request.call_count == 3 + + +def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): + retryable_error = mock.create_autospec(transport.Response, instance=True) + retryable_error.status = http_client.BAD_REQUEST + retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( + "utf-8" + ) + + response_data = {"locations": [], "encodedLocations": "0x0"} + response = mock.create_autospec(transport.Response, instance=True) + response.status = http_client.OK + response.data = json.dumps(response_data).encode("utf-8") + + request = mock.create_autospec(transport.Request) + + request.side_effect = [retryable_error, response] + + _ = _client._lookup_trust_boundary_request( + request, "http://example.com", mock.Mock() + ) + + assert request.call_count == 2 diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py index 91a7d93e0..5ebdb93db 100644 --- a/tests/oauth2/test_service_account.py +++ b/tests/oauth2/test_service_account.py @@ -25,6 +25,7 @@ from google.auth import iam from google.auth import jwt from google.auth import transport +from google.auth import credentials from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN from google.oauth2 import service_account @@ -58,14 +59,23 @@ class TestCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" TOKEN_URI = "https://example.com/oauth2/token" + NO_OP_TRUST_BOUNDARY = { + "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } @classmethod - def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN): + def make_credentials( + cls, + universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=NO_OP_TRUST_BOUNDARY, + ): return service_account.Credentials( SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, universe_domain=universe_domain, + trust_boundary=trust_boundary, ) def test_get_cred_info(self): diff --git a/tests/test_credentials.py b/tests/test_credentials.py index e11bcb4e5..7b45a71b3 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -113,7 +113,7 @@ def test_before_request(): def test_before_request_with_trust_boundary(): DUMMY_BOUNDARY = "0xA30" credentials = CredentialsImpl() - credentials._trust_boundary = {"locations": [], "encoded_locations": DUMMY_BOUNDARY} + credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY} request = "token" headers = {} diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 8f6b22670..ff3330719 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -25,6 +25,7 @@ from google.auth import exceptions from google.auth import impersonated_credentials from google.auth import transport +from google.auth import credentials as auth_credentials from google.auth.impersonated_credentials import Credentials from google.oauth2 import credentials from google.oauth2 import service_account @@ -120,8 +121,12 @@ class TestImpersonatedCredentials(object): # Because Python 2.7: DELEGATES = [] # type: ignore LIFETIME = 3600 + NO_OP_TRUST_BOUNDARY = { + "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } SOURCE_CREDENTIALS = service_account.Credentials( - SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY ) USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") IAM_ENDPOINT_OVERRIDE = ( @@ -136,6 +141,7 @@ def make_credentials( target_principal=TARGET_PRINCIPAL, subject=None, iam_endpoint_override=None, + trust_boundary=NO_OP_TRUST_BOUNDARY, ): return Credentials( @@ -146,6 +152,7 @@ def make_credentials( lifetime=lifetime, subject=subject, iam_endpoint_override=iam_endpoint_override, + trust_boundary=trust_boundary, ) def test_get_cred_info(self):