Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **Phase 3 Step 3 -- Real-Time Alerting MVP (Webhook + Slack)**
- Async fail-open alert dispatcher with queue, retry/backoff, and sink delivery metrics
- Alert event emission for `injection_blocked`, `rate_limit_exceeded`, and `scan_error`
- Generic webhook sink with optional Bearer token and Slack Incoming Webhook sink
- Alerting config/env surface: `alerting.*` and `PIF_ALERTING_*`

## [1.2.0] - 2026-03-07

### Added
Expand Down
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ PIF addresses this critical gap by providing a **transparent, low-latency detect
- **Health check endpoint** (`/healthz`)
- **Prometheus metrics endpoint** (`/metrics`)
- **Embedded monitoring dashboard + custom rule management** (`/dashboard`, optional)
- **Real-time alerting (Webhook + Slack)** with async fail-open delivery
- **golangci-lint** and race-condition-tested CI

</td>
Expand Down Expand Up @@ -527,6 +528,34 @@ dashboard:
# and dashboard.auth.enabled=true.
# - Built-in rule files remain read-only; dashboard mutates only managed custom rules.

# Real-time alerting (optional)
alerting:
enabled: false
queue_size: 1024
events:
block: true
rate_limit: true
scan_error: true
throttle:
window_seconds: 60 # Aggregate rate-limit and scan-error alerts per client/window
webhook:
enabled: false
url: "" # Generic webhook endpoint
timeout: "3s"
max_retries: 3
backoff_initial_ms: 200
auth_bearer_token: "" # Optional outbound bearer token
slack:
enabled: false
incoming_webhook_url: "" # Slack Incoming Webhook URL
timeout: "3s"
max_retries: 3
backoff_initial_ms: 200

# Note:
# - Alert delivery is async and fail-open: request path is never blocked by sink failures.
# - Initial event scope: block, rate-limit, and scan-error.

# Rule file paths
rules:
paths:
Expand Down Expand Up @@ -562,6 +591,12 @@ PIF_DASHBOARD_AUTH_ENABLED=true
PIF_DASHBOARD_AUTH_USERNAME=ops
PIF_DASHBOARD_AUTH_PASSWORD=change-me
PIF_DASHBOARD_RULE_MANAGEMENT_ENABLED=true
PIF_ALERTING_ENABLED=true
PIF_ALERTING_WEBHOOK_ENABLED=true
PIF_ALERTING_WEBHOOK_URL=https://alerts.example.com/pif
PIF_ALERTING_WEBHOOK_AUTH_BEARER_TOKEN=replace-me
PIF_ALERTING_SLACK_ENABLED=true
PIF_ALERTING_SLACK_INCOMING_WEBHOOK_URL=https://hooks.slack.com/services/T000/B000/XXX
PIF_LOGGING_LEVEL=debug
```

Expand Down Expand Up @@ -704,7 +739,8 @@ Automated quality gates on every push and pull request:

- [x] Web-based read-only dashboard UI for monitoring (MVP)
- [x] Dashboard rule management (write/edit workflows)
- [ ] Real-time alerting (Slack, PagerDuty, webhooks)
- [x] Real-time alerting: Webhook + Slack (MVP)
- [ ] Real-time alerting: PagerDuty sink
- [ ] Multi-tenant support with per-tenant policies
- [ ] Attack replay and forensic analysis tools
- [ ] Community rule marketplace
Expand Down
23 changes: 23 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ dashboard:
rule_management:
enabled: false

alerting:
enabled: false
queue_size: 1024
events:
block: true
rate_limit: true
scan_error: true
throttle:
window_seconds: 60
webhook:
enabled: false
url: ""
timeout: "3s"
max_retries: 3
backoff_initial_ms: 200
auth_bearer_token: ""
slack:
enabled: false
incoming_webhook_url: ""
timeout: "3s"
max_retries: 3
backoff_initial_ms: 200

webhook:
listen: ":8443"
tls_cert_file: "/etc/pif/webhook/tls.crt"
Expand Down
58 changes: 58 additions & 0 deletions docs/API_REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,64 @@ Core metric names:
- `pif_injection_detections_total`
- `pif_detection_score`
- `pif_rate_limit_events_total`
- `pif_alert_events_total`
- `pif_alert_sink_deliveries_total`

### Outbound Alerting (Optional)

When `alerting.enabled=true`, PIF emits outbound alerts without exposing new inbound HTTP endpoints.

Initial event types:

- `injection_blocked` (immediate on block action)
- `rate_limit_exceeded` (window-aggregated per client key)
- `scan_error` (window-aggregated per client key)

Delivery model:

- Async queue + worker dispatcher
- Retry with exponential backoff and jitter
- Fail-open behavior (delivery failure never blocks proxy request handling)
- Sink execution order is sequential (`webhook` then `slack` when both are enabled)

Supported sinks:

- Generic webhook (`alerting.webhook.*`)
- Slack Incoming Webhook (`alerting.slack.*`)

Generic webhook sends JSON payloads with the following contract:

```json
{
"event_id": "evt-1741363854757000000-1",
"timestamp": "2026-03-07T12:30:54Z",
"event_type": "injection_blocked",
"action": "block",
"client_key": "203.0.113.10",
"method": "POST",
"path": "/v1/chat/completions",
"target": "https://api.openai.com",
"score": 0.92,
"threshold": 0.50,
"findings_count": 2,
"reason": "blocked_by_policy",
"sample_findings": [
{
"rule_id": "PIF-INJ-001",
"category": "prompt_injection",
"severity": 4,
"match": "ignore all previous instructions"
}
],
"aggregate_count": 1
}
```

Notes:

- `sample_findings` is capped at 3 entries.
- `aggregate_count` is used by aggregated events (`rate_limit_exceeded`, `scan_error`).
- When configured, webhook sink sends `Authorization: Bearer <token>`.

### Embedded Dashboard (Optional)

Expand Down
46 changes: 46 additions & 0 deletions internal/cli/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func runProxy(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("parsing proxy.write_timeout: %w", err)
}
alertingOptions, err := parseAlertingOptions(cfg)
if err != nil {
return err
}

detectorFactory := buildProxyDetectorFactory(cfg, modelPath)
ruleManager, err := proxy.NewRuntimeRuleManager(proxy.RuntimeRuleManagerOptions{
Expand Down Expand Up @@ -146,6 +150,7 @@ func runProxy(cmd *cobra.Command, args []string) error {
},
RuleInventory: ruleSnapshot.RuleSets,
RuleManager: ruleManager,
Alerting: alertingOptions,
}, ruleManager.Detector())
}

Expand Down Expand Up @@ -202,3 +207,44 @@ func buildProxyDetectorFactory(cfg *config.Config, modelPath string) proxy.Detec
return ensemble, nil
}
}

func parseAlertingOptions(cfg *config.Config) (proxy.AlertingOptions, error) {
webhookTimeout, err := time.ParseDuration(cfg.Alerting.Webhook.Timeout)
if err != nil {
return proxy.AlertingOptions{}, fmt.Errorf("parsing alerting.webhook.timeout: %w", err)
}
slackTimeout, err := time.ParseDuration(cfg.Alerting.Slack.Timeout)
if err != nil {
return proxy.AlertingOptions{}, fmt.Errorf("parsing alerting.slack.timeout: %w", err)
}
throttleWindow := time.Duration(cfg.Alerting.Throttle.WindowSeconds) * time.Second
if throttleWindow <= 0 {
throttleWindow = 60 * time.Second
}

return proxy.AlertingOptions{
Enabled: cfg.Alerting.Enabled,
QueueSize: cfg.Alerting.QueueSize,
Events: proxy.AlertingEventOptions{
Block: cfg.Alerting.Events.Block,
RateLimit: cfg.Alerting.Events.RateLimit,
ScanError: cfg.Alerting.Events.ScanError,
},
ThrottleWindow: throttleWindow,
Webhook: proxy.AlertingSinkOptions{
Enabled: cfg.Alerting.Webhook.Enabled,
URL: cfg.Alerting.Webhook.URL,
Timeout: webhookTimeout,
MaxRetries: cfg.Alerting.Webhook.MaxRetries,
BackoffInitial: time.Duration(cfg.Alerting.Webhook.BackoffInitialMs) * time.Millisecond,
AuthBearerToken: cfg.Alerting.Webhook.AuthBearerToken,
},
Slack: proxy.AlertingSinkOptions{
Enabled: cfg.Alerting.Slack.Enabled,
URL: cfg.Alerting.Slack.IncomingWebhookURL,
Timeout: slackTimeout,
MaxRetries: cfg.Alerting.Slack.MaxRetries,
BackoffInitial: time.Duration(cfg.Alerting.Slack.BackoffInitialMs) * time.Millisecond,
},
}, nil
}
57 changes: 57 additions & 0 deletions internal/cli/proxy_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,63 @@ proxy:
assert.Contains(t, err.Error(), "parsing proxy.write_timeout")
}

func TestParseAlertingOptions(t *testing.T) {
cfg := config.Default()
cfg.Alerting.Enabled = true
cfg.Alerting.QueueSize = 256
cfg.Alerting.Events.Block = true
cfg.Alerting.Events.RateLimit = true
cfg.Alerting.Events.ScanError = false
cfg.Alerting.Throttle.WindowSeconds = 45
cfg.Alerting.Webhook.Enabled = true
cfg.Alerting.Webhook.URL = "https://example.com/hook"
cfg.Alerting.Webhook.Timeout = "5s"
cfg.Alerting.Webhook.MaxRetries = 4
cfg.Alerting.Webhook.BackoffInitialMs = 150
cfg.Alerting.Webhook.AuthBearerToken = "token"
cfg.Alerting.Slack.Enabled = true
cfg.Alerting.Slack.IncomingWebhookURL = "https://hooks.slack.test/abc"
cfg.Alerting.Slack.Timeout = "4s"
cfg.Alerting.Slack.MaxRetries = 2
cfg.Alerting.Slack.BackoffInitialMs = 300

opts, err := parseAlertingOptions(cfg)
require.NoError(t, err)

assert.True(t, opts.Enabled)
assert.Equal(t, 256, opts.QueueSize)
assert.True(t, opts.Events.Block)
assert.True(t, opts.Events.RateLimit)
assert.False(t, opts.Events.ScanError)
assert.Equal(t, 45*time.Second, opts.ThrottleWindow)
assert.True(t, opts.Webhook.Enabled)
assert.Equal(t, "https://example.com/hook", opts.Webhook.URL)
assert.Equal(t, 5*time.Second, opts.Webhook.Timeout)
assert.Equal(t, 4, opts.Webhook.MaxRetries)
assert.Equal(t, 150*time.Millisecond, opts.Webhook.BackoffInitial)
assert.Equal(t, "token", opts.Webhook.AuthBearerToken)
assert.True(t, opts.Slack.Enabled)
assert.Equal(t, "https://hooks.slack.test/abc", opts.Slack.URL)
assert.Equal(t, 4*time.Second, opts.Slack.Timeout)
assert.Equal(t, 2, opts.Slack.MaxRetries)
assert.Equal(t, 300*time.Millisecond, opts.Slack.BackoffInitial)
}

func TestParseAlertingOptions_InvalidTimeout(t *testing.T) {
cfg := config.Default()
cfg.Alerting.Webhook.Timeout = "bad"

_, err := parseAlertingOptions(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "parsing alerting.webhook.timeout")

cfg = config.Default()
cfg.Alerting.Slack.Timeout = "bad"
_, err = parseAlertingOptions(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "parsing alerting.slack.timeout")
}

func testContext(t *testing.T) context.Context {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down
Loading
Loading