Add tool to keep the application token when revoking all the tokens#154
Add tool to keep the application token when revoking all the tokens#154
Conversation
…ens.py to keep the application token used by alert-manager
There was a problem hiding this comment.
Pull request overview
This PR adds a new tool revokeUserTokens.py that safely revokes all user tokens while preserving the application token (specifically the alertmanager token) to prevent service disruptions. It also adds warning messages to the existing revokeAllTokens.py tool to guide users to use the new tool when appropriate.
Changes:
- Added
tools/revokeUserTokens.py- a new tool that retrieves the alertmanager application token, deletes all user tokens, and restores the application token - Updated
tools/revokeAllTokens.pywith warnings about service disruptions and a reference to the new tool for user-only token revocation
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| tools/revokeUserTokens.py | New tool implementing safe token revocation with application token preservation through three steps: retrieve application token, delete all tokens, restore application token |
| tools/revokeAllTokens.py | Added user warnings and confirmation prompt before destructive operations, with guidance to use revokeUserTokens.py for user-only revocations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Licensed under the MIT License. | ||
|
|
||
| # This tool is used to revoke all tokens in the cluster and restore the application token. | ||
| # Usage: python3 revokeAllTokens.py |
There was a problem hiding this comment.
The usage comment references 'revokeAllTokens.py' but the actual filename is 'revokeUserTokens.py'. This should be updated to match the actual filename to avoid confusion.
| # Usage: python3 revokeAllTokens.py | |
| # Usage: python3 revokeUserTokens.py |
| sys.exit(1) | ||
| else: | ||
| print(f"✓ Successfully retrieved alert-manager token") | ||
| print(f"Token (truncated): {alert_token[:20]}...{alert_token[-20:]}") |
There was a problem hiding this comment.
The token string could be empty, and attempting to truncate with slicing could cause an IndexError if the token is shorter than 40 characters (20 from start + 20 from end). Consider adding a check for the token length before truncation or using a safer truncation method that handles short strings gracefully.
| for container in deployment.spec.template.spec.containers: | ||
| if container.name == container_name: | ||
| if container.env: | ||
| for env_var in container.env: | ||
| if env_var.name == "PAI_BEARER_TOKEN": | ||
| if env_var.value: | ||
| return env_var.value | ||
| elif env_var.value_from: | ||
| print(f"PAI_BEARER_TOKEN is from a secret/configmap, not a direct value") | ||
| return None | ||
|
|
||
| print(f"PAI_BEARER_TOKEN not found in container {container_name}") | ||
| return None |
There was a problem hiding this comment.
The function silently returns None when PAI_BEARER_TOKEN is not found in the specified container, but it could also fail to find the container itself. When the container is not found, the function should explicitly inform the user instead of silently returning None with a potentially misleading message. Consider adding an explicit check after the loop to determine if the container was found.
| print("=" * 70) | ||
|
|
||
| alert_namespace = "default" | ||
| alert_deployment ="alertmanager" |
There was a problem hiding this comment.
Missing space after the assignment operator. This is inconsistent with Python PEP 8 style and the spacing conventions used elsewhere in the file (e.g., line 161).
| alert_deployment ="alertmanager" | |
| alert_deployment = "alertmanager" |
| v1 = client.CoreV1Api() | ||
|
|
||
| # Decode JWT to get username | ||
| try: |
There was a problem hiding this comment.
The code uses jwt.decode with verify_signature=False, which is a security risk. While this may be intentional for this specific use case (since you just need to read the username), consider adding a comment explaining why signature verification is disabled. This will help future maintainers understand the security implications.
| try: | |
| try: | |
| # NOTE: Signature verification is intentionally disabled here because: | |
| # - The token is retrieved from a trusted Kubernetes deployment (PAI_BEARER_TOKEN). | |
| # - The token is only used to extract the "username" to derive a Kubernetes Secret name, | |
| # not for authentication or authorization decisions. | |
| # - Avoiding verification means we do not need access to the signing key in this tool. | |
| # Do NOT reuse this pattern for handling untrusted tokens or making security-sensitive decisions. |
| config.load_kube_config() | ||
| apps_v1 = client.AppsV1Api() | ||
|
|
||
| # Get the deployment | ||
| deployment = apps_v1.read_namespaced_deployment(name=deployment_name, namespace=namespace) | ||
|
|
||
| # Find the container and get the PAI_BEARER_TOKEN env var | ||
| for container in deployment.spec.template.spec.containers: | ||
| if container.name == container_name: | ||
| if container.env: | ||
| for env_var in container.env: | ||
| if env_var.name == "PAI_BEARER_TOKEN": | ||
| if env_var.value: | ||
| return env_var.value | ||
| elif env_var.value_from: | ||
| print(f"PAI_BEARER_TOKEN is from a secret/configmap, not a direct value") | ||
| return None | ||
|
|
||
| print(f"PAI_BEARER_TOKEN not found in container {container_name}") | ||
| return None | ||
|
|
||
| except Exception as e: | ||
| print(f"Error retrieving token from deployment: {e}") | ||
| return None | ||
|
|
||
|
|
||
| def delete_all_token_secrets(namespace="pai-user-token"): | ||
| """ | ||
| Delete all secrets in the token namespace. | ||
|
|
||
| Args: | ||
| namespace: Kubernetes namespace (default: pai-user-token) | ||
|
|
||
| Returns: | ||
| Number of secrets deleted, or -1 on error | ||
| """ | ||
| try: | ||
| config.load_kube_config() | ||
| v1 = client.CoreV1Api() | ||
|
|
||
| # List all secrets in the namespace | ||
| secrets = v1.list_namespaced_secret(namespace=namespace) | ||
|
|
||
| deleted_count = 0 | ||
| for secret in secrets.items: | ||
| secret_name = secret.metadata.name | ||
| try: | ||
| v1.delete_namespaced_secret(name=secret_name, namespace=namespace) | ||
| print(f" Deleted secret: {secret_name}") | ||
| deleted_count += 1 | ||
| except Exception as e: | ||
| print(f" Failed to delete secret {secret_name}: {e}") | ||
|
|
||
| return deleted_count | ||
|
|
||
| except Exception as e: | ||
| print(f"Error deleting secrets: {e}") | ||
| return -1 | ||
|
|
||
|
|
||
| def add_token_to_k8s_secret(token_string, namespace="pai-user-token"): | ||
| """ | ||
| Add a token to Kubernetes secret. | ||
|
|
||
| Args: | ||
| token_string: The JWT token to add | ||
| namespace: Kubernetes namespace (default: pai-user-token) | ||
|
|
||
| Returns: | ||
| True if successful, False otherwise | ||
| """ | ||
| try: | ||
| config.load_kube_config() |
There was a problem hiding this comment.
The code calls config.load_kube_config() three times in different functions (lines 27, 64, and 99). This is inefficient and could cause issues if the kubeconfig changes between calls. Consider loading the configuration once at the module level or in the main function and passing the client instances to the functions that need them.
| except Exception as e: | ||
| print(f"Error adding token to K8s secret: {e}") | ||
| return False |
There was a problem hiding this comment.
When creating a secret fails at line 133, the exception is caught and False is returned, but if the secret already exists (which would raise an ApiException with status=409), the function will fail even though the desired state (secret exists) is achieved. Consider checking if the error is due to the secret already existing and handle it appropriately, or at minimum provide more specific error messages to help users diagnose issues.
| alert_deployment ="alertmanager" | ||
| alert_container = "job-status-change-notification" | ||
|
|
||
| alert_token = get_application_token(alert_namespace, alert_deployment, alert_container) |
There was a problem hiding this comment.
The variable name 'alert_token' is used throughout, but the comments and prints refer to it as 'alert-manager token' (with hyphen). For consistency and clarity, consider using 'alertmanager_token' or 'alert_manager_token' to match the deployment name 'alertmanager' used in the code.
| if __name__ == "__main__": | ||
| print("NOTE: This tool will revoke all tokens in the cluster including the application tokens.") | ||
| print("Removing them may cause service disruptions.") | ||
| print("If you try to remove the user releated tokens, please run `revokeUserTokens.py` instead.") |
There was a problem hiding this comment.
Spelling error: "releated" should be "related".
| print("If you try to remove the user releated tokens, please run `revokeUserTokens.py` instead.") | |
| print("If you try to remove the user related tokens, please run `revokeUserTokens.py` instead.") |
| if not alert_token: | ||
| print("\n✗ Failed to retrieve alert-manager token.") | ||
| print("Do you want to continue without it? (The alert-manager token will NOT be restored)") | ||
| continue_choice = input("Type 'yes' to continue: ") | ||
| if continue_choice.lower() != 'yes': | ||
| print("Operation cancelled.") | ||
| sys.exit(1) |
There was a problem hiding this comment.
When the retrieval of the alert-manager token fails (alert_token is None), the user is given the option to continue. However, if they choose to continue, the code will later check 'if alert_token:' at line 193 and skip Step 3 entirely. This means the warning at line 169 is accurate, but it would be clearer to also mention that Step 3 will be skipped entirely in this scenario.
Add a new tool to keep the application token when revoking all the tokens for the user, and all warnings in the original tool.