diff --git a/CLAUDE.md b/CLAUDE.md index e9c7c01..dfe5eea 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,6 +30,7 @@ cli/ │ ├── correlation/ # W3C trace_id/span_id generation and per-session persistence │ ├── envelope/ # Raw event envelope types │ ├── git/ # Git context extraction +│ ├── outbound/ # http.RoundTripper that mirrors every outbound request to a local ndjson file (drives `promptconduit watch`) │ ├── sync/ # Transcript sync and parsing (Claude Code parser, state management) │ ├── transcript/ # Transcript parsing and attachment extraction │ └── updater/ # GitHub-release version check + self-replace upgrade diff --git a/README.md b/README.md index 555ae52..82721ca 100644 --- a/README.md +++ b/README.md @@ -183,8 +183,38 @@ promptconduit version # Upgrade to the latest release promptconduit upgrade + +# Tail outbound HTTP traffic in real time (great for debugging hooks) +promptconduit watch +promptconduit watch --verbose # also pretty-print request/response bodies +promptconduit watch --lines 20 # backfill the last 20 entries before going live ``` +### Tail outbound traffic + +`promptconduit watch` streams every HTTP request the CLI makes to the +platform — hook envelope sends, transcript syncs, insights queries, +skills traffic — to your terminal as it happens. Useful for answering +"is my hook actually sending anything when Claude Code fires?" without +spelunking the debug log. + +Each request appears as a one-line summary by default: + +``` +15:30:42 POST /v1/events/raw 3.2KB → 200 (87ms) +``` + +Add `--verbose` to also see the pretty-printed JSON body underneath +each summary. + +Implementation note: every request the CLI makes is mirrored to +`~/.config/promptconduit/outbound.ndjson` (mode `0600`). Authorization, +Cookie, and similar credential headers are redacted before write; +bodies are capped at 64KB per row and the file rotates to +`outbound.ndjson.1` when it crosses 50MB. Update-check traffic from +`internal/updater` uses a separate HTTP client and is **not** mirrored +— that traffic is predictable and noisy and isn't what `watch` is for. + ### Sync Command The `sync` command uploads historical conversation transcripts to the platform. **This is a manual process** - there is no automatic syncing of transcripts. diff --git a/cmd/root.go b/cmd/root.go index 60377c0..5e832f2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -55,6 +55,7 @@ func init() { rootCmd.AddCommand(skillsCmd) rootCmd.AddCommand(debugCmd) rootCmd.AddCommand(upgradeCmd) + rootCmd.AddCommand(watchCmd) } var versionCmd = &cobra.Command{ @@ -194,7 +195,10 @@ func skipUpdateCheckFor(cmd *cobra.Command) bool { } name := commandPathRoot(cmd) switch name { - case "hook", "upgrade": + case "hook", "upgrade", "watch": + // hook is per-event and must stay fast; upgrade and watch + // drive their own long-running loops and shouldn't have a + // random update banner interrupt them. return true } return false diff --git a/cmd/watch.go b/cmd/watch.go new file mode 100644 index 0000000..8e80efa --- /dev/null +++ b/cmd/watch.go @@ -0,0 +1,80 @@ +package cmd + +import ( + "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/promptconduit/cli/internal/client" + "github.com/promptconduit/cli/internal/outbound" + "github.com/spf13/cobra" +) + +var ( + watchVerbose bool + watchLines int +) + +var watchCmd = &cobra.Command{ + Use: "watch", + Short: "Tail outbound HTTP traffic from the CLI in real time", + SilenceUsage: true, + SilenceErrors: true, + Long: `Stream every HTTP request the CLI makes to the platform — hook +envelope sends, transcript syncs, insights queries, skills traffic, the +whole lot — to your terminal as it happens. + +Each request appears as a one-line summary by default: + + 15:30:42 POST /v1/events/raw 3.2KB → 200 (87ms) + +Use --verbose to also print the request body (and response body, when +the server returned one) pretty-printed beneath the summary. Useful for +checking what your hook is actually sending when events aren't showing +up on the dashboard. + +The underlying file lives at ~/.config/promptconduit/outbound.ndjson +(mode 0600). Authorization, Cookie, and similar credential headers are +redacted before write; request bodies are capped at 64KB per row and +the file rotates to outbound.ndjson.1 at 50MB. + +Examples: + promptconduit watch # tail live traffic + promptconduit watch --verbose # include full bodies + promptconduit watch --lines 20 # backfill the last 20 entries`, + RunE: runWatch, +} + +func init() { + watchCmd.Flags().BoolVarP(&watchVerbose, "verbose", "v", false, "include full request/response bodies under each summary line") + watchCmd.Flags().IntVar(&watchLines, "lines", 0, "backfill the last N entries before going live") +} + +func runWatch(cmd *cobra.Command, args []string) error { + path := filepath.Join(client.ConfigDir(), outbound.MirrorFileName) + color := outbound.IsTerminal(os.Stdout) + + ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + fmt.Fprintf(cmd.ErrOrStderr(), "Watching %s — Ctrl-C to stop.\n", path) + + lines := outbound.Tail(ctx, path, watchLines) + for raw := range lines { + entry, err := outbound.ParseLine(raw) + if err != nil { + // Best-effort: show the unparseable raw line so the user + // can still see something went through. + fmt.Fprintln(cmd.OutOrStdout(), string(raw)) + continue + } + fmt.Fprintln(cmd.OutOrStdout(), outbound.RenderSummary(entry, watchVerbose, color)) + } + + // Ctrl-C / SIGTERM is the normal way to leave watch; treat as a + // successful exit rather than an error so cobra doesn't print a + // "Error: context canceled" banner. + return nil +} diff --git a/internal/client/api.go b/internal/client/api.go index 5f9a7ea..dd7abfa 100644 --- a/internal/client/api.go +++ b/internal/client/api.go @@ -10,11 +10,13 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "runtime" "strings" "time" "github.com/promptconduit/cli/internal/envelope" + "github.com/promptconduit/cli/internal/outbound" ) // APIResponse represents a response from the API @@ -33,15 +35,20 @@ type Client struct { version string } -// NewClient creates a new API client +// NewClient creates a new API client. Every outbound HTTP request is +// mirrored to ~/.config/promptconduit/outbound.ndjson so users can run +// `promptconduit watch` to see what the CLI is uploading in real time. func NewClient(config *Config, version string) *Client { + mirror := outbound.New(filepath.Join(ConfigDir(), outbound.MirrorFileName), http.DefaultTransport) return &Client{ config: config, httpClient: &http.Client{ - Timeout: time.Duration(config.TimeoutSeconds) * time.Second, + Timeout: time.Duration(config.TimeoutSeconds) * time.Second, + Transport: mirror, }, longHttpClient: &http.Client{ - Timeout: 600 * time.Second, // 10 min for large transcript sync (chunked complete can be slow) + Timeout: 600 * time.Second, // 10 min for large transcript sync (chunked complete can be slow) + Transport: mirror, }, version: version, } diff --git a/internal/outbound/entry.go b/internal/outbound/entry.go new file mode 100644 index 0000000..74039b4 --- /dev/null +++ b/internal/outbound/entry.go @@ -0,0 +1,77 @@ +// Package outbound mirrors every HTTP request the CLI makes to a local +// ndjson file, so users can run `promptconduit watch` and see what their +// hooks are actually uploading to the platform. +// +// The mirror is a single http.RoundTripper that wraps http.DefaultTransport +// and is installed into every *http.Client constructed by +// internal/client.NewClient. Both the foreground command and the +// `hook --send-event` subprocess thus log into the same file. The file +// is owner-only-readable (0600), bodies are capped at 64KB, and the file +// rotates to a .1 backup when it crosses 50MB. +package outbound + +import ( + "encoding/json" + "net/http" + "time" +) + +// MirrorFileName is the basename of the on-disk mirror, written into +// client.ConfigDir(). Exposed so callers (the watch command, the HTTP +// client wiring) agree on a single source of truth. +const MirrorFileName = "outbound.ndjson" + +// Entry is one line in outbound.ndjson — one HTTP request + response. +// +// Field ordering matches the ndjson layout for easy `jq` use. Bodies are +// stored as strings (not raw JSON) so the mirror file remains +// well-formed even when the body bytes are themselves valid JSON +// containing embedded newlines. +type Entry struct { + TS time.Time `json:"ts"` + Method string `json:"method"` + URL string `json:"url"` + ReqHeaders map[string]string `json:"req_headers,omitempty"` + ReqContentType string `json:"req_content_type,omitempty"` + ReqBody string `json:"req_body,omitempty"` + ReqTruncated bool `json:"req_truncated,omitempty"` + ReqOriginalSize int `json:"req_original_size_bytes,omitempty"` + Status int `json:"status,omitempty"` + RespContentType string `json:"resp_content_type,omitempty"` + RespBody string `json:"resp_body,omitempty"` + RespTruncated bool `json:"resp_truncated,omitempty"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` +} + +// MarshalLine produces one ndjson line for the entry (no trailing newline). +// Use AppendLine to write to a file; this is split out so it can be tested +// without disk. +func (e Entry) MarshalLine() ([]byte, error) { + return json.Marshal(e) +} + +// headerMap returns a single-value header map keyed by canonical name. +// Multi-value headers are joined with ", "; this is fine for an +// observability surface and avoids the cost of nested slices in ndjson. +func headerMap(h http.Header) map[string]string { + if len(h) == 0 { + return nil + } + out := make(map[string]string, len(h)) + for k, v := range h { + if len(v) == 1 { + out[k] = v[0] + continue + } + joined := "" + for i, s := range v { + if i > 0 { + joined += ", " + } + joined += s + } + out[k] = joined + } + return out +} diff --git a/internal/outbound/inode_unix.go b/internal/outbound/inode_unix.go new file mode 100644 index 0000000..d43251f --- /dev/null +++ b/internal/outbound/inode_unix.go @@ -0,0 +1,23 @@ +//go:build !windows + +package outbound + +import ( + "os" + "syscall" +) + +// inodeOf returns the file's inode number on Unix. Used by Tail to +// detect rotation: when the inode changes under the same path, the +// follower reopens at offset 0. +func inodeOf(path string) (uint64, error) { + info, err := os.Stat(path) + if err != nil { + return 0, err + } + st, ok := info.Sys().(*syscall.Stat_t) + if !ok { + return 0, nil + } + return uint64(st.Ino), nil +} diff --git a/internal/outbound/inode_windows.go b/internal/outbound/inode_windows.go new file mode 100644 index 0000000..8ced12a --- /dev/null +++ b/internal/outbound/inode_windows.go @@ -0,0 +1,10 @@ +//go:build windows + +package outbound + +// inodeOf returns 0 on Windows — there is no portable inode equivalent +// in stdlib. Tail falls back to size-shrink detection for rotation, +// which is adequate for this observability surface. +func inodeOf(path string) (uint64, error) { + return 0, nil +} diff --git a/internal/outbound/locking_unix.go b/internal/outbound/locking_unix.go new file mode 100644 index 0000000..4519aaf --- /dev/null +++ b/internal/outbound/locking_unix.go @@ -0,0 +1,49 @@ +//go:build !windows + +package outbound + +import ( + "os" + "sync" + "syscall" +) + +// inProcessAppendMu serializes appendLine calls from a single process. +// Across processes we lean on flock for lines >4KB and on POSIX's +// O_APPEND atomicity guarantee for smaller writes. +var inProcessAppendMu sync.Mutex + +// appendLine writes line + "\n" to path. POSIX guarantees an O_APPEND +// write of <=PIPE_BUF (typically 4KB) is atomic between processes, so +// small lines never interleave. For larger lines we take an exclusive +// file lock (flock LOCK_EX) for the duration of the write. +// +// Within a single process, inProcessAppendMu prevents tearing if two +// goroutines append concurrently — important because the parent CLI +// can fire several requests in quick succession. +func appendLine(path string, line []byte) error { + inProcessAppendMu.Lock() + defer inProcessAppendMu.Unlock() + + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return err + } + defer f.Close() + + needFlock := len(line)+1 > 4096 + if needFlock { + if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { + return err + } + // LOCK_UN is implicit on file close, but be explicit for clarity. + defer syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + } + + // One Write call so the kernel sees the line+newline as a unit. + buf := make([]byte, len(line)+1) + copy(buf, line) + buf[len(line)] = '\n' + _, err = f.Write(buf) + return err +} diff --git a/internal/outbound/locking_windows.go b/internal/outbound/locking_windows.go new file mode 100644 index 0000000..693dd8c --- /dev/null +++ b/internal/outbound/locking_windows.go @@ -0,0 +1,32 @@ +//go:build windows + +package outbound + +import ( + "os" + "sync" +) + +// On Windows we don't have a portable cross-process advisory lock in the +// stdlib, so we serialize within a single process and accept best-effort +// behavior across processes. In practice the only concurrent writers are +// the foreground CLI process and the `hook --send-event` subprocess it +// spawns, which fire on different cadences, so collisions are very rare. +var inProcessAppendMu sync.Mutex + +func appendLine(path string, line []byte) error { + inProcessAppendMu.Lock() + defer inProcessAppendMu.Unlock() + + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return err + } + defer f.Close() + + buf := make([]byte, len(line)+1) + copy(buf, line) + buf[len(line)] = '\n' + _, err = f.Write(buf) + return err +} diff --git a/internal/outbound/mirror.go b/internal/outbound/mirror.go new file mode 100644 index 0000000..1adebb5 --- /dev/null +++ b/internal/outbound/mirror.go @@ -0,0 +1,200 @@ +package outbound + +import ( + "bytes" + "errors" + "io" + "net/http" + "os" + "path/filepath" + "time" +) + +// DefaultMaxBodyBytes is the per-direction body cap recorded in the mirror. +// 64KB keeps the file small enough to tail comfortably while still +// preserving the full envelope JSON for typical hook events (a few KB). +// Image-bearing UserPromptSubmit envelopes or large transcript chunks +// will exceed this and be recorded as truncated. +const DefaultMaxBodyBytes = 64 * 1024 // 64 KB + +// Mirror is an http.RoundTripper that records each request/response pair +// to an ndjson file before returning the response to the caller. It +// wraps a base transport — typically http.DefaultTransport. +// +// Mirror is safe for concurrent use. +type Mirror struct { + Path string // destination ndjson file + Base http.RoundTripper // wrapped transport + MaxBodyBytes int // per-direction body cap; 0 ⇒ DefaultMaxBodyBytes + RotateAt int64 // file-size rotation threshold; 0 ⇒ DefaultRotateAt + Now func() time.Time // injectable clock for tests; nil ⇒ time.Now +} + +// New constructs a Mirror writing to path. If base is nil, +// http.DefaultTransport is used. The parent directory is created with +// mode 0700 if missing; the file itself is created on first write with +// mode 0600. +func New(path string, base http.RoundTripper) *Mirror { + if base == nil { + base = http.DefaultTransport + } + // Best-effort: the parent dir is usually already created by + // SaveFileConfig, but be defensive in case the user has only ever + // run hooks before running anything else. + _ = os.MkdirAll(filepath.Dir(path), 0o700) + return &Mirror{ + Path: path, + Base: base, + MaxBodyBytes: DefaultMaxBodyBytes, + RotateAt: DefaultRotateAt, + Now: time.Now, + } +} + +// RoundTrip captures the request, forwards it through the base +// transport, captures the response, and appends a single ndjson line +// to the mirror file. Mirror write failures are intentionally +// swallowed: observability must never break the network path. +func (m *Mirror) RoundTrip(req *http.Request) (*http.Response, error) { + start := m.now() + + reqBody, reqOriginal, reqTrunc, err := drainAndReplace(req, m.maxBody()) + if err != nil { + // We failed to read the request body before sending. Forward + // to the base transport with whatever Go has left of the body + // (often nothing) and record the error. + resp, sendErr := m.Base.RoundTrip(req) + m.record(req, reqBody, reqOriginal, reqTrunc, resp, nil, false, 0, time.Since(start), errors.Join(err, sendErr)) + return resp, sendErr + } + + resp, sendErr := m.Base.RoundTrip(req) + + var respBody []byte + var respOriginal int + var respTrunc bool + if resp != nil { + respBody, respOriginal, respTrunc, _ = drainAndReplaceResponse(resp, m.maxBody()) + _ = respOriginal // recorded for completeness; not currently exposed + } + + m.record(req, reqBody, reqOriginal, reqTrunc, resp, respBody, respTrunc, 0, time.Since(start), sendErr) + return resp, sendErr +} + +func (m *Mirror) maxBody() int { + if m.MaxBodyBytes > 0 { + return m.MaxBodyBytes + } + return DefaultMaxBodyBytes +} + +func (m *Mirror) now() time.Time { + if m.Now != nil { + return m.Now() + } + return time.Now() +} + +// record builds an Entry and appends one ndjson line. Best-effort: +// errors are dropped so a failed write never corrupts the actual +// network call. We do attempt rotation first. +func (m *Mirror) record(req *http.Request, reqBody []byte, reqOriginal int, reqTrunc bool, resp *http.Response, respBody []byte, respTrunc bool, _ int, latency time.Duration, sendErr error) { + entry := Entry{ + TS: m.now().UTC(), + Method: req.Method, + URL: safeURL(req), + LatencyMs: latency.Milliseconds(), + } + + if reqHeaders := redactHeaders(req.Header); reqHeaders != nil { + entry.ReqHeaders = headerMap(reqHeaders) + entry.ReqContentType = req.Header.Get("Content-Type") + } + if len(reqBody) > 0 { + entry.ReqBody = string(reqBody) + entry.ReqTruncated = reqTrunc + if reqTrunc { + entry.ReqOriginalSize = reqOriginal + } + } + + if resp != nil { + entry.Status = resp.StatusCode + entry.RespContentType = resp.Header.Get("Content-Type") + if len(respBody) > 0 { + entry.RespBody = string(respBody) + entry.RespTruncated = respTrunc + } + } + if sendErr != nil { + entry.Error = sendErr.Error() + } + + rotateAt := m.RotateAt + if rotateAt == 0 { + rotateAt = DefaultRotateAt + } + _ = rotateIfNeeded(m.Path, rotateAt) + + line, err := entry.MarshalLine() + if err != nil { + return + } + _ = appendLine(m.Path, line) +} + +// drainAndReplace reads at most max bytes from req.Body for the mirror, +// then replaces req.Body with a new ReadCloser that yields the full +// original content (read part + remainder). Returns (bytes-for-mirror, +// original-size, truncated?, error). +// +// If the request has no body, returns (nil, 0, false, nil) and leaves +// req.Body untouched. +func drainAndReplace(req *http.Request, max int) ([]byte, int, bool, error) { + if req.Body == nil || req.Body == http.NoBody { + return nil, 0, false, nil + } + all, err := io.ReadAll(req.Body) + if err != nil { + return nil, 0, false, err + } + if cerr := req.Body.Close(); cerr != nil { + return nil, 0, false, cerr + } + // Replace the body so the inner transport sees the full bytes. + req.Body = io.NopCloser(bytes.NewReader(all)) + // Reset ContentLength so chunked/length headers stay consistent. + req.ContentLength = int64(len(all)) + clipped, truncated, original := truncateBody(all, max) + return clipped, original, truncated, nil +} + +// drainAndReplaceResponse is the response-side equivalent. The caller +// must still close the returned response body when done; we replace it +// with a NopCloser around a bytes.Reader containing the original +// content so downstream consumers see the same bytes. +func drainAndReplaceResponse(resp *http.Response, max int) ([]byte, int, bool, error) { + if resp.Body == nil || resp.Body == http.NoBody { + return nil, 0, false, nil + } + all, err := io.ReadAll(resp.Body) + if err != nil { + return nil, 0, false, err + } + if cerr := resp.Body.Close(); cerr != nil { + return nil, 0, false, cerr + } + resp.Body = io.NopCloser(bytes.NewReader(all)) + clipped, truncated, original := truncateBody(all, max) + return clipped, original, truncated, nil +} + +func safeURL(req *http.Request) string { + if req.URL == nil { + return "" + } + // Don't redact query strings here — endpoints don't put secrets in + // query strings today. If that changes, redact at this layer. + return req.URL.String() +} diff --git a/internal/outbound/mirror_test.go b/internal/outbound/mirror_test.go new file mode 100644 index 0000000..c57d707 --- /dev/null +++ b/internal/outbound/mirror_test.go @@ -0,0 +1,296 @@ +package outbound + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestMirror_RoundTrip(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + + client := &http.Client{Transport: m} + req, _ := http.NewRequest("POST", srv.URL+"/v1/events/raw", bytes.NewReader([]byte(`{"hook_event_name":"UserPromptSubmit"}`))) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer sk_live_supersecret") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Do: %v", err) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if string(body) != `{"ok":true}` { + t.Errorf("downstream caller saw wrong body: %q", body) + } + + // File should exist with one ndjson line. + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read mirror: %v", err) + } + if !strings.HasSuffix(string(data), "\n") { + t.Error("mirror line should end with newline") + } + var entry Entry + if err := json.Unmarshal(bytes.TrimSpace(data), &entry); err != nil { + t.Fatalf("ndjson parse: %v", err) + } + if entry.Method != "POST" { + t.Errorf("method = %q, want POST", entry.Method) + } + if !strings.HasSuffix(entry.URL, "/v1/events/raw") { + t.Errorf("url = %q, want suffix /v1/events/raw", entry.URL) + } + if entry.Status != 200 { + t.Errorf("status = %d, want 200", entry.Status) + } + if entry.ReqBody == "" { + t.Error("expected request body to be recorded") + } + if entry.ReqHeaders["Authorization"] != redactedValue { + t.Errorf("authorization not redacted: %q", entry.ReqHeaders["Authorization"]) + } + if strings.Contains(string(data), "sk_live_supersecret") { + t.Error("raw bearer token leaked to mirror file") + } +} + +func TestMirror_FilePermissions(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(204) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + client := &http.Client{Transport: m} + + req, _ := http.NewRequest("GET", srv.URL, nil) + if _, err := client.Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + + info, err := os.Stat(path) + if err != nil { + t.Fatalf("stat: %v", err) + } + // On Unix the file should be created with mode 0600. On Windows the + // concept doesn't apply the same way, so just assert "not world-rw". + mode := info.Mode().Perm() + if mode&0o077 != 0 { + t.Errorf("file mode %o is world-readable; expected owner-only", mode) + } +} + +func TestMirror_BodyTruncation(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + m.MaxBodyBytes = 16 // tiny cap so we can hit truncation easily + + body := strings.Repeat("x", 100) + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(body)) + if _, err := (&http.Client{Transport: m}).Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read: %v", err) + } + var entry Entry + if err := json.Unmarshal(bytes.TrimSpace(data), &entry); err != nil { + t.Fatalf("parse: %v", err) + } + if !entry.ReqTruncated { + t.Error("expected ReqTruncated=true") + } + if entry.ReqOriginalSize != 100 { + t.Errorf("ReqOriginalSize = %d, want 100", entry.ReqOriginalSize) + } + if len(entry.ReqBody) != 16 { + t.Errorf("ReqBody = %d bytes, want 16", len(entry.ReqBody)) + } +} + +func TestMirror_Rotates(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + m.RotateAt = 512 // tiny so we rotate quickly + + client := &http.Client{Transport: m} + for i := 0; i < 20; i++ { + body := fmt.Sprintf(`{"i":%d,"pad":"%s"}`, i, strings.Repeat("p", 100)) + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(body)) + if _, err := client.Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + } + + if _, err := os.Stat(path + ".1"); err != nil { + t.Errorf("expected backup file at %s: %v", path+".1", err) + } +} + +func TestMirror_RequestBodyStillReadableDownstream(t *testing.T) { + // The mirror reads the request body to record it; we must replace + // it so the inner transport still sees the bytes. Tighten that + // invariant with an explicit test against an httptest.Server that + // echoes what it received. + var seen []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + seen, _ = io.ReadAll(r.Body) + w.WriteHeader(200) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + m := New(filepath.Join(dir, MirrorFileName), http.DefaultTransport) + client := &http.Client{Transport: m} + + want := `{"hook_event_name":"Stop","session_id":"abc"}` + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(want)) + if _, err := client.Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + if string(seen) != want { + t.Errorf("server received %q, want %q", seen, want) + } +} + +func TestMirror_NetworkErrorRecorded(t *testing.T) { + // Point at a closed server so RoundTrip fails. Mirror should still + // record an entry with the error. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + srv.Close() // immediately + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(`{}`)) + if _, err := (&http.Client{Transport: m}).Do(req); err == nil { + t.Fatal("expected request to a closed server to fail") + } + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read mirror: %v", err) + } + if !bytes.Contains(data, []byte(`"error":`)) { + t.Errorf("expected error field in entry; got %s", data) + } +} + +func TestTail_BackfillAndLive(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + + // Seed with three lines. + mustAppend := func(line string) { + t.Helper() + if err := appendLine(path, []byte(line)); err != nil { + t.Fatalf("appendLine: %v", err) + } + } + mustAppend(`{"i":1}`) + mustAppend(`{"i":2}`) + mustAppend(`{"i":3}`) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + ch := Tail(ctx, path, 2) + + // Expect the last two backfilled lines. + got1 := waitLine(t, ch) + got2 := waitLine(t, ch) + if string(got1) != `{"i":2}` || string(got2) != `{"i":3}` { + t.Errorf("backfill = %q, %q; want {i:2}, {i:3}", got1, got2) + } + + // Append a new line; it should arrive. + mustAppend(`{"i":4}`) + got3 := waitLine(t, ch) + if string(got3) != `{"i":4}` { + t.Errorf("live = %q; want {i:4}", got3) + } +} + +func TestTail_HandlesRotation(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + mustAppend := func(line string) { + t.Helper() + if err := appendLine(path, []byte(line)); err != nil { + t.Fatalf("appendLine: %v", err) + } + } + + mustAppend(`{"pre":1}`) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + ch := Tail(ctx, path, 1) + + // Backfill: {"pre":1} + if got := waitLine(t, ch); string(got) != `{"pre":1}` { + t.Fatalf("backfill = %q", got) + } + + // Simulate rotation: rename to .1, then create a fresh file with + // new content. + if err := os.Rename(path, path+".1"); err != nil { + t.Fatalf("rotate rename: %v", err) + } + mustAppend(`{"post":1}`) + + // Tail should detect the rotation and yield the new line. + got := waitLine(t, ch) + if string(got) != `{"post":1}` { + t.Errorf("after rotate = %q; want {post:1}", got) + } +} + +func waitLine(t *testing.T, ch <-chan []byte) []byte { + t.Helper() + select { + case l, ok := <-ch: + if !ok { + t.Fatal("tail channel closed unexpectedly") + } + return l + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for tail line") + } + return nil +} diff --git a/internal/outbound/redact.go b/internal/outbound/redact.go new file mode 100644 index 0000000..b5fe351 --- /dev/null +++ b/internal/outbound/redact.go @@ -0,0 +1,59 @@ +package outbound + +import ( + "net/http" + "strings" +) + +// redactedValue is what every auth-bearing header value becomes in the mirror. +const redactedValue = "***" + +// alwaysRedact is the hard-coded list of header names whose values must +// never reach the mirror file. Matched case-insensitively against +// http.Header's canonical form. +var alwaysRedact = map[string]struct{}{ + "Authorization": {}, + "Cookie": {}, + "Set-Cookie": {}, + "X-Api-Key": {}, + "Proxy-Authorization": {}, +} + +// substrRedact catches anything whose name contains one of these tokens +// (case-insensitive). Belt-and-braces in case the platform grows new +// header names later. +var substrRedact = []string{"token", "secret", "key"} + +// redactHeaders returns a copy of h with values that look like +// credentials replaced by "***". Multi-value headers collapse to a +// single redacted entry. +func redactHeaders(h http.Header) http.Header { + if len(h) == 0 { + return nil + } + out := make(http.Header, len(h)) + for name, values := range h { + if shouldRedact(name) { + out[name] = []string{redactedValue} + continue + } + // Copy the slice so the caller can't mutate ours by surprise. + dup := make([]string, len(values)) + copy(dup, values) + out[name] = dup + } + return out +} + +func shouldRedact(name string) bool { + if _, ok := alwaysRedact[http.CanonicalHeaderKey(name)]; ok { + return true + } + lower := strings.ToLower(name) + for _, needle := range substrRedact { + if strings.Contains(lower, needle) { + return true + } + } + return false +} diff --git a/internal/outbound/redact_test.go b/internal/outbound/redact_test.go new file mode 100644 index 0000000..c4095c8 --- /dev/null +++ b/internal/outbound/redact_test.go @@ -0,0 +1,70 @@ +package outbound + +import ( + "net/http" + "testing" +) + +func TestRedactHeaders(t *testing.T) { + cases := []struct { + name string + in http.Header + want map[string]string + }{ + { + name: "authorization is redacted", + in: http.Header{"Authorization": []string{"Bearer sk_live_1234"}}, + want: map[string]string{"Authorization": redactedValue}, + }, + { + name: "cookie is redacted", + in: http.Header{"Cookie": []string{"session=abc"}}, + want: map[string]string{"Cookie": redactedValue}, + }, + { + name: "x-api-key is redacted (canonical form)", + in: http.Header{"X-Api-Key": []string{"super-secret"}}, + want: map[string]string{"X-Api-Key": redactedValue}, + }, + { + name: "x-secret-token is redacted (substring rule)", + in: http.Header{"X-Secret-Token": []string{"nope"}}, + want: map[string]string{"X-Secret-Token": redactedValue}, + }, + { + name: "content-type is preserved", + in: http.Header{"Content-Type": []string{"application/json"}}, + want: map[string]string{"Content-Type": "application/json"}, + }, + { + name: "user-agent is preserved", + in: http.Header{"User-Agent": []string{"PromptConduit-CLI/0.5.0"}}, + want: map[string]string{"User-Agent": "PromptConduit-CLI/0.5.0"}, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + out := redactHeaders(c.in) + got := headerMap(out) + for k, v := range c.want { + if got[k] != v { + t.Errorf("%s = %q, want %q", k, got[k], v) + } + } + }) + } +} + +func TestRedactHeaders_NilSafe(t *testing.T) { + if h := redactHeaders(nil); h != nil { + t.Errorf("expected nil for empty input; got %v", h) + } +} + +func TestRedactHeaders_DoesNotMutateInput(t *testing.T) { + in := http.Header{"Authorization": []string{"Bearer real-token"}} + _ = redactHeaders(in) + if in.Get("Authorization") != "Bearer real-token" { + t.Errorf("input header was mutated: %q", in.Get("Authorization")) + } +} diff --git a/internal/outbound/render.go b/internal/outbound/render.go new file mode 100644 index 0000000..fa03d68 --- /dev/null +++ b/internal/outbound/render.go @@ -0,0 +1,156 @@ +package outbound + +import ( + "bytes" + "encoding/json" + "fmt" + "net/url" + "os" + "strings" +) + +// RenderSummary formats one mirror entry for the `watch` command. +// When verbose is true, the request body (and response body, if any) +// follow the summary line, pretty-printed and indented. When color is +// true the HTTP status is ANSI-colored — green 2xx, yellow 3xx-4xx, +// red ≥500 or transport error. +func RenderSummary(e Entry, verbose, color bool) string { + var b strings.Builder + + b.WriteString(e.TS.Local().Format("15:04:05")) + b.WriteString(" ") + b.WriteString(fmt.Sprintf("%-5s", e.Method)) + b.WriteString(" ") + b.WriteString(pathOnly(e.URL)) + b.WriteString(" ") + b.WriteString(humanBytes(len(e.ReqBody))) + if e.ReqTruncated { + b.WriteString("+") + } + + if e.Error != "" { + b.WriteString(" → ") + b.WriteString(colorize("ERR", colorRed, color)) + b.WriteString(" (") + b.WriteString(fmt.Sprintf("%dms", e.LatencyMs)) + b.WriteString(") ") + b.WriteString(e.Error) + } else { + b.WriteString(" → ") + b.WriteString(colorize(fmt.Sprintf("%d", e.Status), statusColor(e.Status), color)) + b.WriteString(" (") + b.WriteString(fmt.Sprintf("%dms", e.LatencyMs)) + b.WriteString(")") + } + + if verbose { + if e.ReqBody != "" { + b.WriteString("\n") + b.WriteString(indent(" ", pretty(e.ReqBody))) + } + if e.RespBody != "" { + b.WriteString("\n -- response --\n") + b.WriteString(indent(" ", pretty(e.RespBody))) + } + } + + return b.String() +} + +// ParseLine deserializes one ndjson line into an Entry. +func ParseLine(line []byte) (Entry, error) { + var e Entry + err := json.Unmarshal(line, &e) + return e, err +} + +// IsTerminal reports whether f is connected to a character device, +// i.e. a real terminal rather than a pipe or file redirect. Used to +// decide whether to emit ANSI color codes. +func IsTerminal(f *os.File) bool { + if f == nil { + return false + } + fi, err := f.Stat() + if err != nil { + return false + } + return (fi.Mode() & os.ModeCharDevice) != 0 +} + +// pathOnly strips scheme/host from a URL for the summary line; full URL +// is still in the underlying entry for users who want it. +func pathOnly(s string) string { + u, err := url.Parse(s) + if err != nil { + return s + } + if u.RawQuery != "" { + return u.Path + "?" + u.RawQuery + } + return u.Path +} + +func humanBytes(n int) string { + switch { + case n < 1024: + return fmt.Sprintf("%dB", n) + case n < 1024*1024: + return fmt.Sprintf("%.1fKB", float64(n)/1024) + default: + return fmt.Sprintf("%.1fMB", float64(n)/(1024*1024)) + } +} + +// pretty re-indents body if it parses as JSON; otherwise returns it +// unchanged. We don't want a malformed body to drop the row entirely. +func pretty(s string) string { + var v interface{} + if err := json.Unmarshal([]byte(s), &v); err != nil { + return s + } + out, err := json.MarshalIndent(v, "", " ") + if err != nil { + return s + } + return string(out) +} + +func indent(prefix, s string) string { + if s == "" { + return s + } + lines := bytes.Split([]byte(s), []byte{'\n'}) + for i, l := range lines { + lines[i] = append([]byte(prefix), l...) + } + return string(bytes.Join(lines, []byte{'\n'})) +} + +const ( + colorReset = "\x1b[0m" + colorRed = "\x1b[31m" + colorYellow = "\x1b[33m" + colorGreen = "\x1b[32m" +) + +func statusColor(status int) string { + switch { + case status >= 500 || status == 0: + return colorRed + case status >= 400: + return colorRed + case status >= 300: + return colorYellow + case status >= 200: + return colorGreen + } + return "" +} + +func colorize(s, code string, enabled bool) string { + if !enabled || code == "" { + return s + } + return code + s + colorReset +} diff --git a/internal/outbound/render_test.go b/internal/outbound/render_test.go new file mode 100644 index 0000000..d7e1cc5 --- /dev/null +++ b/internal/outbound/render_test.go @@ -0,0 +1,110 @@ +package outbound + +import ( + "strings" + "testing" + "time" +) + +func TestRenderSummary_Default(t *testing.T) { + e := Entry{ + TS: time.Date(2026, 5, 11, 15, 30, 42, 0, time.UTC), + Method: "POST", + URL: "https://api.example.com/v1/events/raw", + ReqBody: strings.Repeat("x", 3200), + Status: 200, + LatencyMs: 87, + } + got := RenderSummary(e, false, false) + // Time is rendered in local TZ; just match the rest. + for _, want := range []string{"POST", "/v1/events/raw", "3.1KB", "200", "(87ms)"} { + if !strings.Contains(got, want) { + t.Errorf("missing %q in summary: %q", want, got) + } + } + // No ANSI color codes when disabled. + if strings.Contains(got, "\x1b[") { + t.Errorf("color leaked into summary: %q", got) + } +} + +func TestRenderSummary_VerboseIncludesBody(t *testing.T) { + e := Entry{ + TS: time.Now(), + Method: "POST", + URL: "https://example/v1/events/raw", + ReqBody: `{"foo":"bar"}`, + Status: 200, + LatencyMs: 1, + } + got := RenderSummary(e, true, false) + if !strings.Contains(got, `"foo"`) { + t.Errorf("verbose summary missing body: %q", got) + } + // Pretty-printed JSON should be indented with two spaces. + if !strings.Contains(got, ` "foo": "bar"`) { + t.Errorf("expected pretty-printed JSON; got %q", got) + } +} + +func TestRenderSummary_ColorOnStatus(t *testing.T) { + cases := []struct { + status int + want string + }{ + {200, colorGreen}, + {301, colorYellow}, + {404, colorRed}, + {500, colorRed}, + } + for _, c := range cases { + e := Entry{TS: time.Now(), Method: "GET", URL: "/x", Status: c.status} + got := RenderSummary(e, false, true) + if !strings.Contains(got, c.want) { + t.Errorf("status %d: missing color %q in %q", c.status, c.want, got) + } + } +} + +func TestRenderSummary_NetworkError(t *testing.T) { + e := Entry{ + TS: time.Now(), + Method: "POST", + URL: "https://nope", + LatencyMs: 5000, + Error: "dial tcp: connect: connection refused", + } + got := RenderSummary(e, false, false) + if !strings.Contains(got, "ERR") { + t.Errorf("expected ERR marker for transport failure; got %q", got) + } + if !strings.Contains(got, "connection refused") { + t.Errorf("expected error message preserved; got %q", got) + } +} + +func TestTruncateBody(t *testing.T) { + body, trunc, orig := truncateBody([]byte("hello"), 100) + if string(body) != "hello" || trunc || orig != 5 { + t.Errorf("short body should pass through: %q %v %d", body, trunc, orig) + } + body, trunc, orig = truncateBody([]byte("hello world"), 5) + if string(body) != "hello" || !trunc || orig != 11 { + t.Errorf("long body should truncate: %q %v %d", body, trunc, orig) + } + body, trunc, orig = truncateBody([]byte("any"), 0) + if string(body) != "any" || trunc || orig != 3 { + t.Errorf("max=0 should disable truncation: %q %v %d", body, trunc, orig) + } +} + +func TestParseLine(t *testing.T) { + line := []byte(`{"ts":"2026-05-11T15:30:42Z","method":"POST","url":"/x","status":204,"latency_ms":7}`) + e, err := ParseLine(line) + if err != nil { + t.Fatalf("ParseLine: %v", err) + } + if e.Method != "POST" || e.Status != 204 || e.LatencyMs != 7 { + t.Errorf("parsed wrong fields: %+v", e) + } +} diff --git a/internal/outbound/rotate.go b/internal/outbound/rotate.go new file mode 100644 index 0000000..ca1ee5f --- /dev/null +++ b/internal/outbound/rotate.go @@ -0,0 +1,50 @@ +package outbound + +import ( + "errors" + "os" +) + +// DefaultRotateAt is the file-size threshold (in bytes) at which we +// rotate outbound.ndjson to outbound.ndjson.1 and start fresh. One +// backup is kept; older content is overwritten. +const DefaultRotateAt int64 = 50 * 1024 * 1024 // 50 MB + +// truncateBody clips b to at most max bytes. Returns the (possibly +// shorter) bytes, a flag indicating whether truncation happened, and +// the original length so the consumer can show "[truncated, was 10MB]". +// +// Pure function, kept out of mirror.go so it is easy to test without +// any disk or HTTP plumbing. +func truncateBody(b []byte, max int) ([]byte, bool, int) { + if max <= 0 || len(b) <= max { + return b, false, len(b) + } + return b[:max], true, len(b) +} + +// rotateIfNeeded renames path -> path+".1" when the file is at or above +// max bytes. Missing files and missing prior backups are not errors. +// Errors stat'ing or renaming are returned to the caller; callers +// typically log-and-continue, because rotation failure should not +// silently drop traffic. +func rotateIfNeeded(path string, max int64) error { + if max <= 0 { + return nil + } + info, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if info.Size() < max { + return nil + } + backup := path + ".1" + if err := os.Remove(backup); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + return os.Rename(path, backup) +} diff --git a/internal/outbound/tail.go b/internal/outbound/tail.go new file mode 100644 index 0000000..0fbb7a7 --- /dev/null +++ b/internal/outbound/tail.go @@ -0,0 +1,201 @@ +package outbound + +import ( + "bufio" + "bytes" + "context" + "errors" + "io" + "os" + "time" +) + +// TailPollInterval is how often Tail re-checks the file for new bytes +// and possible rotation. Tuned for human eyes — 200ms feels live +// without burning CPU. +const TailPollInterval = 200 * time.Millisecond + +// Tail follows path the way `tail -f` does. It first delivers up to +// backfill lines from the end of the file (0 means none — start at the +// current end), then streams new lines as they appear. Detects file +// rotation (inode change on Unix, size shrink everywhere) and reopens +// the file at offset 0 when it happens. +// +// The returned channel is closed when ctx is canceled or when an +// unrecoverable I/O error occurs. +func Tail(ctx context.Context, path string, backfill int) <-chan []byte { + out := make(chan []byte, 64) + go func() { + defer close(out) + + // Wait for the file to appear if it doesn't yet exist. The + // mirror creates it lazily on first write, so a `watch` + // invocation before any traffic has flowed is normal. + f, err := waitForFile(ctx, path) + if err != nil { + return + } + defer f.Close() + + offset := int64(0) + if backfill > 0 { + off, lines, err := readLastLines(f, backfill) + if err == nil { + for _, l := range lines { + if !sendLine(ctx, out, l) { + return + } + } + offset = off + } + } else { + // Start at end of file. + end, err := f.Seek(0, io.SeekEnd) + if err == nil { + offset = end + } + } + + lastInode, _ := inodeOf(path) + lastSize := offset + + // Polling loop. + reader := bufio.NewReader(f) + ticker := time.NewTicker(TailPollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + info, err := os.Stat(path) + if err != nil { + // File may have been rotated and not yet recreated; + // keep polling, don't bail. + continue + } + + currentInode, _ := inodeOf(path) + rotated := (lastInode != 0 && currentInode != 0 && currentInode != lastInode) || info.Size() < lastSize + if rotated { + // Reopen. + _ = f.Close() + f, err = os.Open(path) + if err != nil { + return + } + reader = bufio.NewReader(f) + lastInode = currentInode + lastSize = 0 + offset = 0 + } + + if info.Size() == offset { + continue + } + + // Stream any new bytes line by line. + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + // Drop the trailing newline before sending. + if line[len(line)-1] == '\n' { + line = line[:len(line)-1] + } + if !sendLine(ctx, out, append([]byte(nil), line...)) { + return + } + } + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return + } + } + offset, _ = f.Seek(0, io.SeekCurrent) + lastSize = info.Size() + } + }() + return out +} + +func sendLine(ctx context.Context, out chan<- []byte, line []byte) bool { + select { + case <-ctx.Done(): + return false + case out <- line: + return true + } +} + +// waitForFile blocks until path exists and is openable, or ctx ends. +func waitForFile(ctx context.Context, path string) (*os.File, error) { + for { + f, err := os.Open(path) + if err == nil { + return f, nil + } + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(TailPollInterval): + } + } +} + +// readLastLines reads up to n trailing lines from f and returns them in +// order, along with the file offset they end at (which becomes the +// streaming offset for the polling loop). +func readLastLines(f *os.File, n int) (int64, [][]byte, error) { + info, err := f.Stat() + if err != nil { + return 0, nil, err + } + size := info.Size() + if size == 0 { + return 0, nil, nil + } + + const chunkSize = int64(4096) + var accumulated []byte + pos := size + for pos > 0 && bytes.Count(accumulated, []byte{'\n'}) <= n { + readSize := chunkSize + if pos < readSize { + readSize = pos + } + pos -= readSize + buf := make([]byte, readSize) + if _, err := f.ReadAt(buf, pos); err != nil { + return 0, nil, err + } + accumulated = append(buf, accumulated...) + } + + // Split into lines and take the last n. + all := bytes.Split(accumulated, []byte{'\n'}) + // If the file ends with a newline the last element is empty — drop it. + if len(all) > 0 && len(all[len(all)-1]) == 0 { + all = all[:len(all)-1] + } + if len(all) > n { + all = all[len(all)-n:] + } + + // Position the file at end so the streaming loop picks up from there. + end, _ := f.Seek(0, io.SeekEnd) + + // Copy to avoid aliasing into accumulated. + out := make([][]byte, len(all)) + for i, l := range all { + out[i] = append([]byte(nil), l...) + } + return end, out, nil +}