Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,107 @@ name = generator.generate_name()
```

See the [Thread Name Generator Guide](./docs/thread-name-generator.md) for custom implementations.

## Security

AgentFlow CLI provides enterprise-grade security features for production deployments.

### Security Features

- ✅ **Authentication** - Built-in JWT and custom authentication backends
- ✅ **Authorization** - Resource-based access control with extensible backends
- ✅ **Request Limits** - DoS protection with configurable size limits (default 10MB)
- ✅ **Error Sanitization** - Production-safe error messages preventing information disclosure
- ✅ **Log Sanitization** - Automatic redaction of sensitive data (tokens, passwords, secrets)
- ✅ **Security Warnings** - Startup validation for insecure configurations
- ✅ **HTTPS Ready** - SSL/TLS support with secure headers

### Production Security Checklist

Before deploying to production, ensure:

```bash
# Required: Set production mode
MODE=production

# Required: Strong JWT secret (32+ characters)
JWT_SECRET_KEY=<generate-with-secrets.token_urlsafe(32)>

# Required: Disable debug mode
IS_DEBUG=false

# Required: Specific CORS origins (not *)
ORIGINS=https://yourdomain.com

# Required: Specific allowed hosts (not *)
ALLOWED_HOST=yourdomain.com

# Recommended: Disable API docs
DOCS_PATH=
REDOCS_PATH=

# Recommended: Configure request size limit
MAX_REQUEST_SIZE=10485760 # 10MB default
```

### Quick Security Setup

**1. Enable JWT Authentication:**
```json
{
"auth": "jwt"
}
```

**2. Implement Authorization:**
```python
# auth/rbac_backend.py
from agentflow_cli.src.app.core.auth.authorization import AuthorizationBackend

class RBACAuthorizationBackend(AuthorizationBackend):
async def authorize(self, user, resource, action, resource_id=None, **context):
role = user.get("role", "viewer")
# Implement your authorization logic
return role == "admin" or (role == "developer" and action == "read")
```

**3. Configure in agentflow.json:**
```json
{
"auth": "jwt",
"authorization": {
"path": "auth.rbac_backend:RBACAuthorizationBackend"
}
}
```

### Security Validation

AgentFlow automatically validates your configuration and warns about security issues:

```
⚠️ SECURITY WARNING: CORS ORIGINS='*' in production.
Set ORIGINS to specific domains.

⚠️ SECURITY WARNING: DEBUG mode enabled in production!
Set IS_DEBUG=false
```

### Comprehensive Security Guide

For detailed security documentation, threat model, best practices, and deployment guidelines, see:

📖 **[SECURITY.md](./SECURITY.md)** - Complete Security Guide

Topics covered:
- Threat model and attack vectors
- Authentication and authorization patterns
- Production deployment checklist
- Docker and Kubernetes security configurations
- Security testing and monitoring
- Incident response procedures
- Vulnerability reporting

## Deployment

See the [Deployment Guide](./docs/deployment.md) for complete deployment instructions.
Expand Down
31 changes: 0 additions & 31 deletions Task.md

This file was deleted.

5 changes: 4 additions & 1 deletion agentflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
"agent": "graph.react:app",
"thread_name_generator": "graph.thread_name_generator:MyNameGenerator",
"env": ".env",
"auth": null
"auth": {
"path": "graph.custom_auth:CustomAuth",
"method": "custom"
}
}
90 changes: 90 additions & 0 deletions agentflow_cli/src/app/core/auth/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
Authorization backend system for AgentFlow CLI.

This module provides an authorization interface that developers can implement
to add resource-level access control to their AgentFlow applications.
"""

from abc import ABC, abstractmethod
from typing import Any


class AuthorizationBackend(ABC):
"""
Abstract base class for authorization backends.

Developers should implement this class to define custom authorization logic
for their AgentFlow applications. The authorize method is called before
any resource operation to check if the user has permission.

Example:
class MyAuthorizationBackend(AuthorizationBackend):
async def authorize(self, user, resource, action, resource_id=None, **context):
# Check if user has permission
if user.get("role") == "admin":
return True
# Add custom logic here
return False
"""

@abstractmethod
async def authorize(
self,
user: dict[str, Any],
resource: str,
action: str,
resource_id: str | None = None,
**context: Any,
) -> bool:
"""
Check if user can perform action on resource.

Args:
user: User information dictionary containing at least 'user_id'
resource: Resource type (e.g., 'graph', 'checkpointer', 'store')
action: Action to perform (e.g., 'invoke', 'stream', 'read', 'write', 'delete')
resource_id: Optional specific resource identifier (e.g., thread_id, namespace)
**context: Additional context for authorization decision

Returns:
bool: True if authorized, False otherwise

Raises:
Exception: Can raise exceptions for auth failures or errors
"""


class DefaultAuthorizationBackend(AuthorizationBackend):
"""
Default authorization backend that allows all authenticated users.

This implementation performs basic authentication check (user has user_id)
but allows all operations. Use this as a starting point or for development.

For production use, implement a custom AuthorizationBackend with proper
access control logic based on your application's requirements.
"""

async def authorize(
self,
user: dict[str, Any],
resource: str,
action: str,
resource_id: str | None = None,
**context: Any,
) -> bool:
"""
Allow all authenticated users to perform any action.

Args:
user: User information dictionary
resource: Resource type (not used in default implementation)
action: Action to perform (not used in default implementation)
resource_id: Optional resource identifier (not used in default implementation)
**context: Additional context (not used in default implementation)

Returns:
bool: True if user has 'user_id', False otherwise
"""
# Only check if user is authenticated (has user_id)
return bool(user.get("user_id"))
150 changes: 150 additions & 0 deletions agentflow_cli/src/app/core/auth/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""
Unified authentication and authorization dependency for FastAPI endpoints.

This module provides a reusable dependency that combines authentication and
authorization checks, reducing code duplication across routers.
"""

from collections.abc import Callable
from typing import Any

from fastapi import Depends, HTTPException, Request, Response
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from injectq.integrations import InjectAPI

from agentflow_cli.src.app.core import logger
from agentflow_cli.src.app.core.auth.auth_backend import BaseAuth
from agentflow_cli.src.app.core.auth.authorization import AuthorizationBackend
from agentflow_cli.src.app.core.config.graph_config import GraphConfig
from agentflow_cli.src.app.core.utils.log_sanitizer import sanitize_for_logging


class RequirePermission:
"""
FastAPI dependency that combines authentication and authorization.

This class-based dependency verifies user authentication and checks
authorization in a single step, reducing boilerplate code in endpoints.

Usage:
@router.post("/v1/resource")
async def endpoint(
user: dict = Depends(RequirePermission("resource", "action"))
):
# User is authenticated and authorized
pass

Args:
resource: Resource type being accessed (e.g., "graph", "checkpointer", "store")
action: Action being performed (e.g., "invoke", "read", "write", "delete")
extract_resource_id: Optional function to extract resource_id from request
"""

def __init__(
self,
resource: str,
action: str,
extract_resource_id: Callable[[Request], str | None] | None = None,
):
"""
Initialize the permission requirement.

Args:
resource: Resource type (graph, checkpointer, store)
action: Action type (invoke, stream, read, write, delete, etc.)
extract_resource_id: Optional callable that extracts resource_id from request
"""
self.resource = resource
self.action = action
self.extract_resource_id_fn = extract_resource_id

async def __call__(
self,
request: Request,
response: Response,
credential: HTTPAuthorizationCredentials = Depends(
HTTPBearer(auto_error=False),
),
config: GraphConfig = InjectAPI(GraphConfig),
auth_backend: BaseAuth = InjectAPI(BaseAuth),
authz: AuthorizationBackend = InjectAPI(AuthorizationBackend),
) -> dict[str, Any]:
"""
Verify authentication and authorization.

Returns:
dict: User information if authenticated and authorized

Raises:
HTTPException: 403 if authorization fails
"""
# Step 1: Authentication (reusing verify_current_user logic)
user = {}
backend = config.auth_config()
if not backend:
user = {}
elif not auth_backend:
logger.error("Auth backend is not configured")
user = {}
else:
user_result = auth_backend.authenticate(
request,
response,
credential,
)
if user_result and "user_id" not in user_result:
logger.error("Authentication failed: 'user_id' not found in user info")
user = user_result or {}

# Step 2: Extract resource_id if available
resource_id = None
if self.extract_resource_id_fn:
resource_id = self.extract_resource_id_fn(request)
else:
resource_id = self._extract_resource_id_from_path(request)

# Step 3: Authorization
if not await authz.authorize(
user,
self.resource,
self.action,
resource_id=resource_id,
):
logger.warning(
f"Authorization failed for user {user.get('user_id')} "
f"on {self.resource}:{self.action}"
)
raise HTTPException(
status_code=403,
detail=f"Not authorized to {self.action} {self.resource}",
)

# Log successful auth/authz (with sanitized user info)
logger.debug(
f"Auth/Authz success for {self.resource}:{self.action}, "
f"user: {sanitize_for_logging(user)}"
)

return user

def _extract_resource_id_from_path(self, request: Request) -> str | None:
"""
Extract resource ID from request path parameters.

Looks for common patterns like thread_id, memory_id in path params.

Args:
request: FastAPI request object

Returns:
Resource ID as string, or None if not found
"""
# Check path parameters
path_params = request.path_params

# Common resource ID patterns
for param_name in ["thread_id", "memory_id", "namespace"]:
if param_name in path_params:
return str(path_params[param_name])

return None
Loading