From 1fed0146cdaa1e8e65fde7b001ed9bfc83c4c4f2 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 08:25:24 +0000 Subject: [PATCH] feat: `promptconduit watch [--verbose]` to tail outbound HTTP traffic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New top-level `watch` subcommand that lets users see every request the CLI sends to the platform in real time. Useful for debugging the "events aren't showing up on the dashboard" class of problem without spelunking the debug log. Mechanism: - New internal/outbound package implements an http.RoundTripper that wraps http.DefaultTransport, captures each request+response, and appends one ndjson line to ~/.config/promptconduit/outbound.ndjson (mode 0600). Wired into internal/client.NewClient so both the foreground command and the `hook --send-event` subprocess feed into the same file — no edits to the 15+ send methods individually. - Authorization / Cookie / X-Api-Key / *secret* / *token* / *key* headers redacted to "***" before write. - Bodies capped at 64KB per direction; oversized requests record `req_truncated: true` plus the original size for context. File rotates to outbound.ndjson.1 when it crosses 50MB; one backup kept. - Cross-process concurrency: O_APPEND with a per-process sync.Mutex on every platform; on Unix, lines >4KB take an exclusive flock so they don't tear under the 4KB POSIX PIPE_BUF atomicity limit. - Tail uses stdlib polling (200ms) and recovers across rotation by watching the inode (Unix) or size-shrink (Windows). UX: - `promptconduit watch` — live tail, one summary line per request: `15:30:42 POST /v1/events/raw 3.2KB → 200 (87ms)` with ANSI status color when stdout is a tty. - `promptconduit watch --verbose` — also pretty-prints the JSON body indented under each summary. - `promptconduit watch --lines N` — backfill the last N entries before going live. - Ctrl-C / SIGTERM exits cleanly with no usage banner. Tests in internal/outbound/ (22 cases): round-trip via httptest.Server, file permissions 0600, body truncation, rotation under a low threshold, downstream-readable body after capture, network-error recording, tail backfill+live, tail recovery across rotation, header redaction table, render summary (default + verbose + color + error path), truncateBody pure helper, ParseLine round-trip. Out of scope: internal/updater builds its own *http.Client for the GitHub releases check, so update-check traffic bypasses the mirror. Documented in the README — it's predictable and noisy and isn't what `watch` is for. The hook update notifier and upgrade command continue to work normally; they're just invisible to `watch`. https://claude.ai/code/session_019CWBC2E8pQuShejfsKYrvp --- CLAUDE.md | 1 + README.md | 30 +++ cmd/root.go | 6 +- cmd/watch.go | 80 ++++++++ internal/client/api.go | 13 +- internal/outbound/entry.go | 77 +++++++ internal/outbound/inode_unix.go | 23 +++ internal/outbound/inode_windows.go | 10 + internal/outbound/locking_unix.go | 49 +++++ internal/outbound/locking_windows.go | 32 +++ internal/outbound/mirror.go | 200 ++++++++++++++++++ internal/outbound/mirror_test.go | 296 +++++++++++++++++++++++++++ internal/outbound/redact.go | 59 ++++++ internal/outbound/redact_test.go | 70 +++++++ internal/outbound/render.go | 156 ++++++++++++++ internal/outbound/render_test.go | 110 ++++++++++ internal/outbound/rotate.go | 50 +++++ internal/outbound/tail.go | 201 ++++++++++++++++++ 18 files changed, 1459 insertions(+), 4 deletions(-) create mode 100644 cmd/watch.go create mode 100644 internal/outbound/entry.go create mode 100644 internal/outbound/inode_unix.go create mode 100644 internal/outbound/inode_windows.go create mode 100644 internal/outbound/locking_unix.go create mode 100644 internal/outbound/locking_windows.go create mode 100644 internal/outbound/mirror.go create mode 100644 internal/outbound/mirror_test.go create mode 100644 internal/outbound/redact.go create mode 100644 internal/outbound/redact_test.go create mode 100644 internal/outbound/render.go create mode 100644 internal/outbound/render_test.go create mode 100644 internal/outbound/rotate.go create mode 100644 internal/outbound/tail.go diff --git a/CLAUDE.md b/CLAUDE.md index e9c7c01..dfe5eea 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,6 +30,7 @@ cli/ │ ├── correlation/ # W3C trace_id/span_id generation and per-session persistence │ ├── envelope/ # Raw event envelope types │ ├── git/ # Git context extraction +│ ├── outbound/ # http.RoundTripper that mirrors every outbound request to a local ndjson file (drives `promptconduit watch`) │ ├── sync/ # Transcript sync and parsing (Claude Code parser, state management) │ ├── transcript/ # Transcript parsing and attachment extraction │ └── updater/ # GitHub-release version check + self-replace upgrade diff --git a/README.md b/README.md index 555ae52..82721ca 100644 --- a/README.md +++ b/README.md @@ -183,8 +183,38 @@ promptconduit version # Upgrade to the latest release promptconduit upgrade + +# Tail outbound HTTP traffic in real time (great for debugging hooks) +promptconduit watch +promptconduit watch --verbose # also pretty-print request/response bodies +promptconduit watch --lines 20 # backfill the last 20 entries before going live ``` +### Tail outbound traffic + +`promptconduit watch` streams every HTTP request the CLI makes to the +platform — hook envelope sends, transcript syncs, insights queries, +skills traffic — to your terminal as it happens. Useful for answering +"is my hook actually sending anything when Claude Code fires?" without +spelunking the debug log. + +Each request appears as a one-line summary by default: + +``` +15:30:42 POST /v1/events/raw 3.2KB → 200 (87ms) +``` + +Add `--verbose` to also see the pretty-printed JSON body underneath +each summary. + +Implementation note: every request the CLI makes is mirrored to +`~/.config/promptconduit/outbound.ndjson` (mode `0600`). Authorization, +Cookie, and similar credential headers are redacted before write; +bodies are capped at 64KB per row and the file rotates to +`outbound.ndjson.1` when it crosses 50MB. Update-check traffic from +`internal/updater` uses a separate HTTP client and is **not** mirrored +— that traffic is predictable and noisy and isn't what `watch` is for. + ### Sync Command The `sync` command uploads historical conversation transcripts to the platform. **This is a manual process** - there is no automatic syncing of transcripts. diff --git a/cmd/root.go b/cmd/root.go index 60377c0..5e832f2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -55,6 +55,7 @@ func init() { rootCmd.AddCommand(skillsCmd) rootCmd.AddCommand(debugCmd) rootCmd.AddCommand(upgradeCmd) + rootCmd.AddCommand(watchCmd) } var versionCmd = &cobra.Command{ @@ -194,7 +195,10 @@ func skipUpdateCheckFor(cmd *cobra.Command) bool { } name := commandPathRoot(cmd) switch name { - case "hook", "upgrade": + case "hook", "upgrade", "watch": + // hook is per-event and must stay fast; upgrade and watch + // drive their own long-running loops and shouldn't have a + // random update banner interrupt them. return true } return false diff --git a/cmd/watch.go b/cmd/watch.go new file mode 100644 index 0000000..8e80efa --- /dev/null +++ b/cmd/watch.go @@ -0,0 +1,80 @@ +package cmd + +import ( + "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/promptconduit/cli/internal/client" + "github.com/promptconduit/cli/internal/outbound" + "github.com/spf13/cobra" +) + +var ( + watchVerbose bool + watchLines int +) + +var watchCmd = &cobra.Command{ + Use: "watch", + Short: "Tail outbound HTTP traffic from the CLI in real time", + SilenceUsage: true, + SilenceErrors: true, + Long: `Stream every HTTP request the CLI makes to the platform — hook +envelope sends, transcript syncs, insights queries, skills traffic, the +whole lot — to your terminal as it happens. + +Each request appears as a one-line summary by default: + + 15:30:42 POST /v1/events/raw 3.2KB → 200 (87ms) + +Use --verbose to also print the request body (and response body, when +the server returned one) pretty-printed beneath the summary. Useful for +checking what your hook is actually sending when events aren't showing +up on the dashboard. + +The underlying file lives at ~/.config/promptconduit/outbound.ndjson +(mode 0600). Authorization, Cookie, and similar credential headers are +redacted before write; request bodies are capped at 64KB per row and +the file rotates to outbound.ndjson.1 at 50MB. + +Examples: + promptconduit watch # tail live traffic + promptconduit watch --verbose # include full bodies + promptconduit watch --lines 20 # backfill the last 20 entries`, + RunE: runWatch, +} + +func init() { + watchCmd.Flags().BoolVarP(&watchVerbose, "verbose", "v", false, "include full request/response bodies under each summary line") + watchCmd.Flags().IntVar(&watchLines, "lines", 0, "backfill the last N entries before going live") +} + +func runWatch(cmd *cobra.Command, args []string) error { + path := filepath.Join(client.ConfigDir(), outbound.MirrorFileName) + color := outbound.IsTerminal(os.Stdout) + + ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + fmt.Fprintf(cmd.ErrOrStderr(), "Watching %s — Ctrl-C to stop.\n", path) + + lines := outbound.Tail(ctx, path, watchLines) + for raw := range lines { + entry, err := outbound.ParseLine(raw) + if err != nil { + // Best-effort: show the unparseable raw line so the user + // can still see something went through. + fmt.Fprintln(cmd.OutOrStdout(), string(raw)) + continue + } + fmt.Fprintln(cmd.OutOrStdout(), outbound.RenderSummary(entry, watchVerbose, color)) + } + + // Ctrl-C / SIGTERM is the normal way to leave watch; treat as a + // successful exit rather than an error so cobra doesn't print a + // "Error: context canceled" banner. + return nil +} diff --git a/internal/client/api.go b/internal/client/api.go index 5f9a7ea..dd7abfa 100644 --- a/internal/client/api.go +++ b/internal/client/api.go @@ -10,11 +10,13 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "runtime" "strings" "time" "github.com/promptconduit/cli/internal/envelope" + "github.com/promptconduit/cli/internal/outbound" ) // APIResponse represents a response from the API @@ -33,15 +35,20 @@ type Client struct { version string } -// NewClient creates a new API client +// NewClient creates a new API client. Every outbound HTTP request is +// mirrored to ~/.config/promptconduit/outbound.ndjson so users can run +// `promptconduit watch` to see what the CLI is uploading in real time. func NewClient(config *Config, version string) *Client { + mirror := outbound.New(filepath.Join(ConfigDir(), outbound.MirrorFileName), http.DefaultTransport) return &Client{ config: config, httpClient: &http.Client{ - Timeout: time.Duration(config.TimeoutSeconds) * time.Second, + Timeout: time.Duration(config.TimeoutSeconds) * time.Second, + Transport: mirror, }, longHttpClient: &http.Client{ - Timeout: 600 * time.Second, // 10 min for large transcript sync (chunked complete can be slow) + Timeout: 600 * time.Second, // 10 min for large transcript sync (chunked complete can be slow) + Transport: mirror, }, version: version, } diff --git a/internal/outbound/entry.go b/internal/outbound/entry.go new file mode 100644 index 0000000..74039b4 --- /dev/null +++ b/internal/outbound/entry.go @@ -0,0 +1,77 @@ +// Package outbound mirrors every HTTP request the CLI makes to a local +// ndjson file, so users can run `promptconduit watch` and see what their +// hooks are actually uploading to the platform. +// +// The mirror is a single http.RoundTripper that wraps http.DefaultTransport +// and is installed into every *http.Client constructed by +// internal/client.NewClient. Both the foreground command and the +// `hook --send-event` subprocess thus log into the same file. The file +// is owner-only-readable (0600), bodies are capped at 64KB, and the file +// rotates to a .1 backup when it crosses 50MB. +package outbound + +import ( + "encoding/json" + "net/http" + "time" +) + +// MirrorFileName is the basename of the on-disk mirror, written into +// client.ConfigDir(). Exposed so callers (the watch command, the HTTP +// client wiring) agree on a single source of truth. +const MirrorFileName = "outbound.ndjson" + +// Entry is one line in outbound.ndjson — one HTTP request + response. +// +// Field ordering matches the ndjson layout for easy `jq` use. Bodies are +// stored as strings (not raw JSON) so the mirror file remains +// well-formed even when the body bytes are themselves valid JSON +// containing embedded newlines. +type Entry struct { + TS time.Time `json:"ts"` + Method string `json:"method"` + URL string `json:"url"` + ReqHeaders map[string]string `json:"req_headers,omitempty"` + ReqContentType string `json:"req_content_type,omitempty"` + ReqBody string `json:"req_body,omitempty"` + ReqTruncated bool `json:"req_truncated,omitempty"` + ReqOriginalSize int `json:"req_original_size_bytes,omitempty"` + Status int `json:"status,omitempty"` + RespContentType string `json:"resp_content_type,omitempty"` + RespBody string `json:"resp_body,omitempty"` + RespTruncated bool `json:"resp_truncated,omitempty"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` +} + +// MarshalLine produces one ndjson line for the entry (no trailing newline). +// Use AppendLine to write to a file; this is split out so it can be tested +// without disk. +func (e Entry) MarshalLine() ([]byte, error) { + return json.Marshal(e) +} + +// headerMap returns a single-value header map keyed by canonical name. +// Multi-value headers are joined with ", "; this is fine for an +// observability surface and avoids the cost of nested slices in ndjson. +func headerMap(h http.Header) map[string]string { + if len(h) == 0 { + return nil + } + out := make(map[string]string, len(h)) + for k, v := range h { + if len(v) == 1 { + out[k] = v[0] + continue + } + joined := "" + for i, s := range v { + if i > 0 { + joined += ", " + } + joined += s + } + out[k] = joined + } + return out +} diff --git a/internal/outbound/inode_unix.go b/internal/outbound/inode_unix.go new file mode 100644 index 0000000..d43251f --- /dev/null +++ b/internal/outbound/inode_unix.go @@ -0,0 +1,23 @@ +//go:build !windows + +package outbound + +import ( + "os" + "syscall" +) + +// inodeOf returns the file's inode number on Unix. Used by Tail to +// detect rotation: when the inode changes under the same path, the +// follower reopens at offset 0. +func inodeOf(path string) (uint64, error) { + info, err := os.Stat(path) + if err != nil { + return 0, err + } + st, ok := info.Sys().(*syscall.Stat_t) + if !ok { + return 0, nil + } + return uint64(st.Ino), nil +} diff --git a/internal/outbound/inode_windows.go b/internal/outbound/inode_windows.go new file mode 100644 index 0000000..8ced12a --- /dev/null +++ b/internal/outbound/inode_windows.go @@ -0,0 +1,10 @@ +//go:build windows + +package outbound + +// inodeOf returns 0 on Windows — there is no portable inode equivalent +// in stdlib. Tail falls back to size-shrink detection for rotation, +// which is adequate for this observability surface. +func inodeOf(path string) (uint64, error) { + return 0, nil +} diff --git a/internal/outbound/locking_unix.go b/internal/outbound/locking_unix.go new file mode 100644 index 0000000..4519aaf --- /dev/null +++ b/internal/outbound/locking_unix.go @@ -0,0 +1,49 @@ +//go:build !windows + +package outbound + +import ( + "os" + "sync" + "syscall" +) + +// inProcessAppendMu serializes appendLine calls from a single process. +// Across processes we lean on flock for lines >4KB and on POSIX's +// O_APPEND atomicity guarantee for smaller writes. +var inProcessAppendMu sync.Mutex + +// appendLine writes line + "\n" to path. POSIX guarantees an O_APPEND +// write of <=PIPE_BUF (typically 4KB) is atomic between processes, so +// small lines never interleave. For larger lines we take an exclusive +// file lock (flock LOCK_EX) for the duration of the write. +// +// Within a single process, inProcessAppendMu prevents tearing if two +// goroutines append concurrently — important because the parent CLI +// can fire several requests in quick succession. +func appendLine(path string, line []byte) error { + inProcessAppendMu.Lock() + defer inProcessAppendMu.Unlock() + + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return err + } + defer f.Close() + + needFlock := len(line)+1 > 4096 + if needFlock { + if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { + return err + } + // LOCK_UN is implicit on file close, but be explicit for clarity. + defer syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + } + + // One Write call so the kernel sees the line+newline as a unit. + buf := make([]byte, len(line)+1) + copy(buf, line) + buf[len(line)] = '\n' + _, err = f.Write(buf) + return err +} diff --git a/internal/outbound/locking_windows.go b/internal/outbound/locking_windows.go new file mode 100644 index 0000000..693dd8c --- /dev/null +++ b/internal/outbound/locking_windows.go @@ -0,0 +1,32 @@ +//go:build windows + +package outbound + +import ( + "os" + "sync" +) + +// On Windows we don't have a portable cross-process advisory lock in the +// stdlib, so we serialize within a single process and accept best-effort +// behavior across processes. In practice the only concurrent writers are +// the foreground CLI process and the `hook --send-event` subprocess it +// spawns, which fire on different cadences, so collisions are very rare. +var inProcessAppendMu sync.Mutex + +func appendLine(path string, line []byte) error { + inProcessAppendMu.Lock() + defer inProcessAppendMu.Unlock() + + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return err + } + defer f.Close() + + buf := make([]byte, len(line)+1) + copy(buf, line) + buf[len(line)] = '\n' + _, err = f.Write(buf) + return err +} diff --git a/internal/outbound/mirror.go b/internal/outbound/mirror.go new file mode 100644 index 0000000..1adebb5 --- /dev/null +++ b/internal/outbound/mirror.go @@ -0,0 +1,200 @@ +package outbound + +import ( + "bytes" + "errors" + "io" + "net/http" + "os" + "path/filepath" + "time" +) + +// DefaultMaxBodyBytes is the per-direction body cap recorded in the mirror. +// 64KB keeps the file small enough to tail comfortably while still +// preserving the full envelope JSON for typical hook events (a few KB). +// Image-bearing UserPromptSubmit envelopes or large transcript chunks +// will exceed this and be recorded as truncated. +const DefaultMaxBodyBytes = 64 * 1024 // 64 KB + +// Mirror is an http.RoundTripper that records each request/response pair +// to an ndjson file before returning the response to the caller. It +// wraps a base transport — typically http.DefaultTransport. +// +// Mirror is safe for concurrent use. +type Mirror struct { + Path string // destination ndjson file + Base http.RoundTripper // wrapped transport + MaxBodyBytes int // per-direction body cap; 0 ⇒ DefaultMaxBodyBytes + RotateAt int64 // file-size rotation threshold; 0 ⇒ DefaultRotateAt + Now func() time.Time // injectable clock for tests; nil ⇒ time.Now +} + +// New constructs a Mirror writing to path. If base is nil, +// http.DefaultTransport is used. The parent directory is created with +// mode 0700 if missing; the file itself is created on first write with +// mode 0600. +func New(path string, base http.RoundTripper) *Mirror { + if base == nil { + base = http.DefaultTransport + } + // Best-effort: the parent dir is usually already created by + // SaveFileConfig, but be defensive in case the user has only ever + // run hooks before running anything else. + _ = os.MkdirAll(filepath.Dir(path), 0o700) + return &Mirror{ + Path: path, + Base: base, + MaxBodyBytes: DefaultMaxBodyBytes, + RotateAt: DefaultRotateAt, + Now: time.Now, + } +} + +// RoundTrip captures the request, forwards it through the base +// transport, captures the response, and appends a single ndjson line +// to the mirror file. Mirror write failures are intentionally +// swallowed: observability must never break the network path. +func (m *Mirror) RoundTrip(req *http.Request) (*http.Response, error) { + start := m.now() + + reqBody, reqOriginal, reqTrunc, err := drainAndReplace(req, m.maxBody()) + if err != nil { + // We failed to read the request body before sending. Forward + // to the base transport with whatever Go has left of the body + // (often nothing) and record the error. + resp, sendErr := m.Base.RoundTrip(req) + m.record(req, reqBody, reqOriginal, reqTrunc, resp, nil, false, 0, time.Since(start), errors.Join(err, sendErr)) + return resp, sendErr + } + + resp, sendErr := m.Base.RoundTrip(req) + + var respBody []byte + var respOriginal int + var respTrunc bool + if resp != nil { + respBody, respOriginal, respTrunc, _ = drainAndReplaceResponse(resp, m.maxBody()) + _ = respOriginal // recorded for completeness; not currently exposed + } + + m.record(req, reqBody, reqOriginal, reqTrunc, resp, respBody, respTrunc, 0, time.Since(start), sendErr) + return resp, sendErr +} + +func (m *Mirror) maxBody() int { + if m.MaxBodyBytes > 0 { + return m.MaxBodyBytes + } + return DefaultMaxBodyBytes +} + +func (m *Mirror) now() time.Time { + if m.Now != nil { + return m.Now() + } + return time.Now() +} + +// record builds an Entry and appends one ndjson line. Best-effort: +// errors are dropped so a failed write never corrupts the actual +// network call. We do attempt rotation first. +func (m *Mirror) record(req *http.Request, reqBody []byte, reqOriginal int, reqTrunc bool, resp *http.Response, respBody []byte, respTrunc bool, _ int, latency time.Duration, sendErr error) { + entry := Entry{ + TS: m.now().UTC(), + Method: req.Method, + URL: safeURL(req), + LatencyMs: latency.Milliseconds(), + } + + if reqHeaders := redactHeaders(req.Header); reqHeaders != nil { + entry.ReqHeaders = headerMap(reqHeaders) + entry.ReqContentType = req.Header.Get("Content-Type") + } + if len(reqBody) > 0 { + entry.ReqBody = string(reqBody) + entry.ReqTruncated = reqTrunc + if reqTrunc { + entry.ReqOriginalSize = reqOriginal + } + } + + if resp != nil { + entry.Status = resp.StatusCode + entry.RespContentType = resp.Header.Get("Content-Type") + if len(respBody) > 0 { + entry.RespBody = string(respBody) + entry.RespTruncated = respTrunc + } + } + if sendErr != nil { + entry.Error = sendErr.Error() + } + + rotateAt := m.RotateAt + if rotateAt == 0 { + rotateAt = DefaultRotateAt + } + _ = rotateIfNeeded(m.Path, rotateAt) + + line, err := entry.MarshalLine() + if err != nil { + return + } + _ = appendLine(m.Path, line) +} + +// drainAndReplace reads at most max bytes from req.Body for the mirror, +// then replaces req.Body with a new ReadCloser that yields the full +// original content (read part + remainder). Returns (bytes-for-mirror, +// original-size, truncated?, error). +// +// If the request has no body, returns (nil, 0, false, nil) and leaves +// req.Body untouched. +func drainAndReplace(req *http.Request, max int) ([]byte, int, bool, error) { + if req.Body == nil || req.Body == http.NoBody { + return nil, 0, false, nil + } + all, err := io.ReadAll(req.Body) + if err != nil { + return nil, 0, false, err + } + if cerr := req.Body.Close(); cerr != nil { + return nil, 0, false, cerr + } + // Replace the body so the inner transport sees the full bytes. + req.Body = io.NopCloser(bytes.NewReader(all)) + // Reset ContentLength so chunked/length headers stay consistent. + req.ContentLength = int64(len(all)) + clipped, truncated, original := truncateBody(all, max) + return clipped, original, truncated, nil +} + +// drainAndReplaceResponse is the response-side equivalent. The caller +// must still close the returned response body when done; we replace it +// with a NopCloser around a bytes.Reader containing the original +// content so downstream consumers see the same bytes. +func drainAndReplaceResponse(resp *http.Response, max int) ([]byte, int, bool, error) { + if resp.Body == nil || resp.Body == http.NoBody { + return nil, 0, false, nil + } + all, err := io.ReadAll(resp.Body) + if err != nil { + return nil, 0, false, err + } + if cerr := resp.Body.Close(); cerr != nil { + return nil, 0, false, cerr + } + resp.Body = io.NopCloser(bytes.NewReader(all)) + clipped, truncated, original := truncateBody(all, max) + return clipped, original, truncated, nil +} + +func safeURL(req *http.Request) string { + if req.URL == nil { + return "" + } + // Don't redact query strings here — endpoints don't put secrets in + // query strings today. If that changes, redact at this layer. + return req.URL.String() +} diff --git a/internal/outbound/mirror_test.go b/internal/outbound/mirror_test.go new file mode 100644 index 0000000..c57d707 --- /dev/null +++ b/internal/outbound/mirror_test.go @@ -0,0 +1,296 @@ +package outbound + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestMirror_RoundTrip(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + + client := &http.Client{Transport: m} + req, _ := http.NewRequest("POST", srv.URL+"/v1/events/raw", bytes.NewReader([]byte(`{"hook_event_name":"UserPromptSubmit"}`))) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer sk_live_supersecret") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Do: %v", err) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if string(body) != `{"ok":true}` { + t.Errorf("downstream caller saw wrong body: %q", body) + } + + // File should exist with one ndjson line. + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read mirror: %v", err) + } + if !strings.HasSuffix(string(data), "\n") { + t.Error("mirror line should end with newline") + } + var entry Entry + if err := json.Unmarshal(bytes.TrimSpace(data), &entry); err != nil { + t.Fatalf("ndjson parse: %v", err) + } + if entry.Method != "POST" { + t.Errorf("method = %q, want POST", entry.Method) + } + if !strings.HasSuffix(entry.URL, "/v1/events/raw") { + t.Errorf("url = %q, want suffix /v1/events/raw", entry.URL) + } + if entry.Status != 200 { + t.Errorf("status = %d, want 200", entry.Status) + } + if entry.ReqBody == "" { + t.Error("expected request body to be recorded") + } + if entry.ReqHeaders["Authorization"] != redactedValue { + t.Errorf("authorization not redacted: %q", entry.ReqHeaders["Authorization"]) + } + if strings.Contains(string(data), "sk_live_supersecret") { + t.Error("raw bearer token leaked to mirror file") + } +} + +func TestMirror_FilePermissions(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(204) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + client := &http.Client{Transport: m} + + req, _ := http.NewRequest("GET", srv.URL, nil) + if _, err := client.Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + + info, err := os.Stat(path) + if err != nil { + t.Fatalf("stat: %v", err) + } + // On Unix the file should be created with mode 0600. On Windows the + // concept doesn't apply the same way, so just assert "not world-rw". + mode := info.Mode().Perm() + if mode&0o077 != 0 { + t.Errorf("file mode %o is world-readable; expected owner-only", mode) + } +} + +func TestMirror_BodyTruncation(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + m.MaxBodyBytes = 16 // tiny cap so we can hit truncation easily + + body := strings.Repeat("x", 100) + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(body)) + if _, err := (&http.Client{Transport: m}).Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read: %v", err) + } + var entry Entry + if err := json.Unmarshal(bytes.TrimSpace(data), &entry); err != nil { + t.Fatalf("parse: %v", err) + } + if !entry.ReqTruncated { + t.Error("expected ReqTruncated=true") + } + if entry.ReqOriginalSize != 100 { + t.Errorf("ReqOriginalSize = %d, want 100", entry.ReqOriginalSize) + } + if len(entry.ReqBody) != 16 { + t.Errorf("ReqBody = %d bytes, want 16", len(entry.ReqBody)) + } +} + +func TestMirror_Rotates(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + m.RotateAt = 512 // tiny so we rotate quickly + + client := &http.Client{Transport: m} + for i := 0; i < 20; i++ { + body := fmt.Sprintf(`{"i":%d,"pad":"%s"}`, i, strings.Repeat("p", 100)) + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(body)) + if _, err := client.Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + } + + if _, err := os.Stat(path + ".1"); err != nil { + t.Errorf("expected backup file at %s: %v", path+".1", err) + } +} + +func TestMirror_RequestBodyStillReadableDownstream(t *testing.T) { + // The mirror reads the request body to record it; we must replace + // it so the inner transport still sees the bytes. Tighten that + // invariant with an explicit test against an httptest.Server that + // echoes what it received. + var seen []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + seen, _ = io.ReadAll(r.Body) + w.WriteHeader(200) + })) + t.Cleanup(srv.Close) + + dir := t.TempDir() + m := New(filepath.Join(dir, MirrorFileName), http.DefaultTransport) + client := &http.Client{Transport: m} + + want := `{"hook_event_name":"Stop","session_id":"abc"}` + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(want)) + if _, err := client.Do(req); err != nil { + t.Fatalf("Do: %v", err) + } + if string(seen) != want { + t.Errorf("server received %q, want %q", seen, want) + } +} + +func TestMirror_NetworkErrorRecorded(t *testing.T) { + // Point at a closed server so RoundTrip fails. Mirror should still + // record an entry with the error. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + srv.Close() // immediately + + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + m := New(path, http.DefaultTransport) + req, _ := http.NewRequest("POST", srv.URL, strings.NewReader(`{}`)) + if _, err := (&http.Client{Transport: m}).Do(req); err == nil { + t.Fatal("expected request to a closed server to fail") + } + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read mirror: %v", err) + } + if !bytes.Contains(data, []byte(`"error":`)) { + t.Errorf("expected error field in entry; got %s", data) + } +} + +func TestTail_BackfillAndLive(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + + // Seed with three lines. + mustAppend := func(line string) { + t.Helper() + if err := appendLine(path, []byte(line)); err != nil { + t.Fatalf("appendLine: %v", err) + } + } + mustAppend(`{"i":1}`) + mustAppend(`{"i":2}`) + mustAppend(`{"i":3}`) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + ch := Tail(ctx, path, 2) + + // Expect the last two backfilled lines. + got1 := waitLine(t, ch) + got2 := waitLine(t, ch) + if string(got1) != `{"i":2}` || string(got2) != `{"i":3}` { + t.Errorf("backfill = %q, %q; want {i:2}, {i:3}", got1, got2) + } + + // Append a new line; it should arrive. + mustAppend(`{"i":4}`) + got3 := waitLine(t, ch) + if string(got3) != `{"i":4}` { + t.Errorf("live = %q; want {i:4}", got3) + } +} + +func TestTail_HandlesRotation(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, MirrorFileName) + mustAppend := func(line string) { + t.Helper() + if err := appendLine(path, []byte(line)); err != nil { + t.Fatalf("appendLine: %v", err) + } + } + + mustAppend(`{"pre":1}`) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + ch := Tail(ctx, path, 1) + + // Backfill: {"pre":1} + if got := waitLine(t, ch); string(got) != `{"pre":1}` { + t.Fatalf("backfill = %q", got) + } + + // Simulate rotation: rename to .1, then create a fresh file with + // new content. + if err := os.Rename(path, path+".1"); err != nil { + t.Fatalf("rotate rename: %v", err) + } + mustAppend(`{"post":1}`) + + // Tail should detect the rotation and yield the new line. + got := waitLine(t, ch) + if string(got) != `{"post":1}` { + t.Errorf("after rotate = %q; want {post:1}", got) + } +} + +func waitLine(t *testing.T, ch <-chan []byte) []byte { + t.Helper() + select { + case l, ok := <-ch: + if !ok { + t.Fatal("tail channel closed unexpectedly") + } + return l + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for tail line") + } + return nil +} diff --git a/internal/outbound/redact.go b/internal/outbound/redact.go new file mode 100644 index 0000000..b5fe351 --- /dev/null +++ b/internal/outbound/redact.go @@ -0,0 +1,59 @@ +package outbound + +import ( + "net/http" + "strings" +) + +// redactedValue is what every auth-bearing header value becomes in the mirror. +const redactedValue = "***" + +// alwaysRedact is the hard-coded list of header names whose values must +// never reach the mirror file. Matched case-insensitively against +// http.Header's canonical form. +var alwaysRedact = map[string]struct{}{ + "Authorization": {}, + "Cookie": {}, + "Set-Cookie": {}, + "X-Api-Key": {}, + "Proxy-Authorization": {}, +} + +// substrRedact catches anything whose name contains one of these tokens +// (case-insensitive). Belt-and-braces in case the platform grows new +// header names later. +var substrRedact = []string{"token", "secret", "key"} + +// redactHeaders returns a copy of h with values that look like +// credentials replaced by "***". Multi-value headers collapse to a +// single redacted entry. +func redactHeaders(h http.Header) http.Header { + if len(h) == 0 { + return nil + } + out := make(http.Header, len(h)) + for name, values := range h { + if shouldRedact(name) { + out[name] = []string{redactedValue} + continue + } + // Copy the slice so the caller can't mutate ours by surprise. + dup := make([]string, len(values)) + copy(dup, values) + out[name] = dup + } + return out +} + +func shouldRedact(name string) bool { + if _, ok := alwaysRedact[http.CanonicalHeaderKey(name)]; ok { + return true + } + lower := strings.ToLower(name) + for _, needle := range substrRedact { + if strings.Contains(lower, needle) { + return true + } + } + return false +} diff --git a/internal/outbound/redact_test.go b/internal/outbound/redact_test.go new file mode 100644 index 0000000..c4095c8 --- /dev/null +++ b/internal/outbound/redact_test.go @@ -0,0 +1,70 @@ +package outbound + +import ( + "net/http" + "testing" +) + +func TestRedactHeaders(t *testing.T) { + cases := []struct { + name string + in http.Header + want map[string]string + }{ + { + name: "authorization is redacted", + in: http.Header{"Authorization": []string{"Bearer sk_live_1234"}}, + want: map[string]string{"Authorization": redactedValue}, + }, + { + name: "cookie is redacted", + in: http.Header{"Cookie": []string{"session=abc"}}, + want: map[string]string{"Cookie": redactedValue}, + }, + { + name: "x-api-key is redacted (canonical form)", + in: http.Header{"X-Api-Key": []string{"super-secret"}}, + want: map[string]string{"X-Api-Key": redactedValue}, + }, + { + name: "x-secret-token is redacted (substring rule)", + in: http.Header{"X-Secret-Token": []string{"nope"}}, + want: map[string]string{"X-Secret-Token": redactedValue}, + }, + { + name: "content-type is preserved", + in: http.Header{"Content-Type": []string{"application/json"}}, + want: map[string]string{"Content-Type": "application/json"}, + }, + { + name: "user-agent is preserved", + in: http.Header{"User-Agent": []string{"PromptConduit-CLI/0.5.0"}}, + want: map[string]string{"User-Agent": "PromptConduit-CLI/0.5.0"}, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + out := redactHeaders(c.in) + got := headerMap(out) + for k, v := range c.want { + if got[k] != v { + t.Errorf("%s = %q, want %q", k, got[k], v) + } + } + }) + } +} + +func TestRedactHeaders_NilSafe(t *testing.T) { + if h := redactHeaders(nil); h != nil { + t.Errorf("expected nil for empty input; got %v", h) + } +} + +func TestRedactHeaders_DoesNotMutateInput(t *testing.T) { + in := http.Header{"Authorization": []string{"Bearer real-token"}} + _ = redactHeaders(in) + if in.Get("Authorization") != "Bearer real-token" { + t.Errorf("input header was mutated: %q", in.Get("Authorization")) + } +} diff --git a/internal/outbound/render.go b/internal/outbound/render.go new file mode 100644 index 0000000..fa03d68 --- /dev/null +++ b/internal/outbound/render.go @@ -0,0 +1,156 @@ +package outbound + +import ( + "bytes" + "encoding/json" + "fmt" + "net/url" + "os" + "strings" +) + +// RenderSummary formats one mirror entry for the `watch` command. +// When verbose is true, the request body (and response body, if any) +// follow the summary line, pretty-printed and indented. When color is +// true the HTTP status is ANSI-colored — green 2xx, yellow 3xx-4xx, +// red ≥500 or transport error. +func RenderSummary(e Entry, verbose, color bool) string { + var b strings.Builder + + b.WriteString(e.TS.Local().Format("15:04:05")) + b.WriteString(" ") + b.WriteString(fmt.Sprintf("%-5s", e.Method)) + b.WriteString(" ") + b.WriteString(pathOnly(e.URL)) + b.WriteString(" ") + b.WriteString(humanBytes(len(e.ReqBody))) + if e.ReqTruncated { + b.WriteString("+") + } + + if e.Error != "" { + b.WriteString(" → ") + b.WriteString(colorize("ERR", colorRed, color)) + b.WriteString(" (") + b.WriteString(fmt.Sprintf("%dms", e.LatencyMs)) + b.WriteString(") ") + b.WriteString(e.Error) + } else { + b.WriteString(" → ") + b.WriteString(colorize(fmt.Sprintf("%d", e.Status), statusColor(e.Status), color)) + b.WriteString(" (") + b.WriteString(fmt.Sprintf("%dms", e.LatencyMs)) + b.WriteString(")") + } + + if verbose { + if e.ReqBody != "" { + b.WriteString("\n") + b.WriteString(indent(" ", pretty(e.ReqBody))) + } + if e.RespBody != "" { + b.WriteString("\n -- response --\n") + b.WriteString(indent(" ", pretty(e.RespBody))) + } + } + + return b.String() +} + +// ParseLine deserializes one ndjson line into an Entry. +func ParseLine(line []byte) (Entry, error) { + var e Entry + err := json.Unmarshal(line, &e) + return e, err +} + +// IsTerminal reports whether f is connected to a character device, +// i.e. a real terminal rather than a pipe or file redirect. Used to +// decide whether to emit ANSI color codes. +func IsTerminal(f *os.File) bool { + if f == nil { + return false + } + fi, err := f.Stat() + if err != nil { + return false + } + return (fi.Mode() & os.ModeCharDevice) != 0 +} + +// pathOnly strips scheme/host from a URL for the summary line; full URL +// is still in the underlying entry for users who want it. +func pathOnly(s string) string { + u, err := url.Parse(s) + if err != nil { + return s + } + if u.RawQuery != "" { + return u.Path + "?" + u.RawQuery + } + return u.Path +} + +func humanBytes(n int) string { + switch { + case n < 1024: + return fmt.Sprintf("%dB", n) + case n < 1024*1024: + return fmt.Sprintf("%.1fKB", float64(n)/1024) + default: + return fmt.Sprintf("%.1fMB", float64(n)/(1024*1024)) + } +} + +// pretty re-indents body if it parses as JSON; otherwise returns it +// unchanged. We don't want a malformed body to drop the row entirely. +func pretty(s string) string { + var v interface{} + if err := json.Unmarshal([]byte(s), &v); err != nil { + return s + } + out, err := json.MarshalIndent(v, "", " ") + if err != nil { + return s + } + return string(out) +} + +func indent(prefix, s string) string { + if s == "" { + return s + } + lines := bytes.Split([]byte(s), []byte{'\n'}) + for i, l := range lines { + lines[i] = append([]byte(prefix), l...) + } + return string(bytes.Join(lines, []byte{'\n'})) +} + +const ( + colorReset = "\x1b[0m" + colorRed = "\x1b[31m" + colorYellow = "\x1b[33m" + colorGreen = "\x1b[32m" +) + +func statusColor(status int) string { + switch { + case status >= 500 || status == 0: + return colorRed + case status >= 400: + return colorRed + case status >= 300: + return colorYellow + case status >= 200: + return colorGreen + } + return "" +} + +func colorize(s, code string, enabled bool) string { + if !enabled || code == "" { + return s + } + return code + s + colorReset +} diff --git a/internal/outbound/render_test.go b/internal/outbound/render_test.go new file mode 100644 index 0000000..d7e1cc5 --- /dev/null +++ b/internal/outbound/render_test.go @@ -0,0 +1,110 @@ +package outbound + +import ( + "strings" + "testing" + "time" +) + +func TestRenderSummary_Default(t *testing.T) { + e := Entry{ + TS: time.Date(2026, 5, 11, 15, 30, 42, 0, time.UTC), + Method: "POST", + URL: "https://api.example.com/v1/events/raw", + ReqBody: strings.Repeat("x", 3200), + Status: 200, + LatencyMs: 87, + } + got := RenderSummary(e, false, false) + // Time is rendered in local TZ; just match the rest. + for _, want := range []string{"POST", "/v1/events/raw", "3.1KB", "200", "(87ms)"} { + if !strings.Contains(got, want) { + t.Errorf("missing %q in summary: %q", want, got) + } + } + // No ANSI color codes when disabled. + if strings.Contains(got, "\x1b[") { + t.Errorf("color leaked into summary: %q", got) + } +} + +func TestRenderSummary_VerboseIncludesBody(t *testing.T) { + e := Entry{ + TS: time.Now(), + Method: "POST", + URL: "https://example/v1/events/raw", + ReqBody: `{"foo":"bar"}`, + Status: 200, + LatencyMs: 1, + } + got := RenderSummary(e, true, false) + if !strings.Contains(got, `"foo"`) { + t.Errorf("verbose summary missing body: %q", got) + } + // Pretty-printed JSON should be indented with two spaces. + if !strings.Contains(got, ` "foo": "bar"`) { + t.Errorf("expected pretty-printed JSON; got %q", got) + } +} + +func TestRenderSummary_ColorOnStatus(t *testing.T) { + cases := []struct { + status int + want string + }{ + {200, colorGreen}, + {301, colorYellow}, + {404, colorRed}, + {500, colorRed}, + } + for _, c := range cases { + e := Entry{TS: time.Now(), Method: "GET", URL: "/x", Status: c.status} + got := RenderSummary(e, false, true) + if !strings.Contains(got, c.want) { + t.Errorf("status %d: missing color %q in %q", c.status, c.want, got) + } + } +} + +func TestRenderSummary_NetworkError(t *testing.T) { + e := Entry{ + TS: time.Now(), + Method: "POST", + URL: "https://nope", + LatencyMs: 5000, + Error: "dial tcp: connect: connection refused", + } + got := RenderSummary(e, false, false) + if !strings.Contains(got, "ERR") { + t.Errorf("expected ERR marker for transport failure; got %q", got) + } + if !strings.Contains(got, "connection refused") { + t.Errorf("expected error message preserved; got %q", got) + } +} + +func TestTruncateBody(t *testing.T) { + body, trunc, orig := truncateBody([]byte("hello"), 100) + if string(body) != "hello" || trunc || orig != 5 { + t.Errorf("short body should pass through: %q %v %d", body, trunc, orig) + } + body, trunc, orig = truncateBody([]byte("hello world"), 5) + if string(body) != "hello" || !trunc || orig != 11 { + t.Errorf("long body should truncate: %q %v %d", body, trunc, orig) + } + body, trunc, orig = truncateBody([]byte("any"), 0) + if string(body) != "any" || trunc || orig != 3 { + t.Errorf("max=0 should disable truncation: %q %v %d", body, trunc, orig) + } +} + +func TestParseLine(t *testing.T) { + line := []byte(`{"ts":"2026-05-11T15:30:42Z","method":"POST","url":"/x","status":204,"latency_ms":7}`) + e, err := ParseLine(line) + if err != nil { + t.Fatalf("ParseLine: %v", err) + } + if e.Method != "POST" || e.Status != 204 || e.LatencyMs != 7 { + t.Errorf("parsed wrong fields: %+v", e) + } +} diff --git a/internal/outbound/rotate.go b/internal/outbound/rotate.go new file mode 100644 index 0000000..ca1ee5f --- /dev/null +++ b/internal/outbound/rotate.go @@ -0,0 +1,50 @@ +package outbound + +import ( + "errors" + "os" +) + +// DefaultRotateAt is the file-size threshold (in bytes) at which we +// rotate outbound.ndjson to outbound.ndjson.1 and start fresh. One +// backup is kept; older content is overwritten. +const DefaultRotateAt int64 = 50 * 1024 * 1024 // 50 MB + +// truncateBody clips b to at most max bytes. Returns the (possibly +// shorter) bytes, a flag indicating whether truncation happened, and +// the original length so the consumer can show "[truncated, was 10MB]". +// +// Pure function, kept out of mirror.go so it is easy to test without +// any disk or HTTP plumbing. +func truncateBody(b []byte, max int) ([]byte, bool, int) { + if max <= 0 || len(b) <= max { + return b, false, len(b) + } + return b[:max], true, len(b) +} + +// rotateIfNeeded renames path -> path+".1" when the file is at or above +// max bytes. Missing files and missing prior backups are not errors. +// Errors stat'ing or renaming are returned to the caller; callers +// typically log-and-continue, because rotation failure should not +// silently drop traffic. +func rotateIfNeeded(path string, max int64) error { + if max <= 0 { + return nil + } + info, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if info.Size() < max { + return nil + } + backup := path + ".1" + if err := os.Remove(backup); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + return os.Rename(path, backup) +} diff --git a/internal/outbound/tail.go b/internal/outbound/tail.go new file mode 100644 index 0000000..0fbb7a7 --- /dev/null +++ b/internal/outbound/tail.go @@ -0,0 +1,201 @@ +package outbound + +import ( + "bufio" + "bytes" + "context" + "errors" + "io" + "os" + "time" +) + +// TailPollInterval is how often Tail re-checks the file for new bytes +// and possible rotation. Tuned for human eyes — 200ms feels live +// without burning CPU. +const TailPollInterval = 200 * time.Millisecond + +// Tail follows path the way `tail -f` does. It first delivers up to +// backfill lines from the end of the file (0 means none — start at the +// current end), then streams new lines as they appear. Detects file +// rotation (inode change on Unix, size shrink everywhere) and reopens +// the file at offset 0 when it happens. +// +// The returned channel is closed when ctx is canceled or when an +// unrecoverable I/O error occurs. +func Tail(ctx context.Context, path string, backfill int) <-chan []byte { + out := make(chan []byte, 64) + go func() { + defer close(out) + + // Wait for the file to appear if it doesn't yet exist. The + // mirror creates it lazily on first write, so a `watch` + // invocation before any traffic has flowed is normal. + f, err := waitForFile(ctx, path) + if err != nil { + return + } + defer f.Close() + + offset := int64(0) + if backfill > 0 { + off, lines, err := readLastLines(f, backfill) + if err == nil { + for _, l := range lines { + if !sendLine(ctx, out, l) { + return + } + } + offset = off + } + } else { + // Start at end of file. + end, err := f.Seek(0, io.SeekEnd) + if err == nil { + offset = end + } + } + + lastInode, _ := inodeOf(path) + lastSize := offset + + // Polling loop. + reader := bufio.NewReader(f) + ticker := time.NewTicker(TailPollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + info, err := os.Stat(path) + if err != nil { + // File may have been rotated and not yet recreated; + // keep polling, don't bail. + continue + } + + currentInode, _ := inodeOf(path) + rotated := (lastInode != 0 && currentInode != 0 && currentInode != lastInode) || info.Size() < lastSize + if rotated { + // Reopen. + _ = f.Close() + f, err = os.Open(path) + if err != nil { + return + } + reader = bufio.NewReader(f) + lastInode = currentInode + lastSize = 0 + offset = 0 + } + + if info.Size() == offset { + continue + } + + // Stream any new bytes line by line. + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + // Drop the trailing newline before sending. + if line[len(line)-1] == '\n' { + line = line[:len(line)-1] + } + if !sendLine(ctx, out, append([]byte(nil), line...)) { + return + } + } + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return + } + } + offset, _ = f.Seek(0, io.SeekCurrent) + lastSize = info.Size() + } + }() + return out +} + +func sendLine(ctx context.Context, out chan<- []byte, line []byte) bool { + select { + case <-ctx.Done(): + return false + case out <- line: + return true + } +} + +// waitForFile blocks until path exists and is openable, or ctx ends. +func waitForFile(ctx context.Context, path string) (*os.File, error) { + for { + f, err := os.Open(path) + if err == nil { + return f, nil + } + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(TailPollInterval): + } + } +} + +// readLastLines reads up to n trailing lines from f and returns them in +// order, along with the file offset they end at (which becomes the +// streaming offset for the polling loop). +func readLastLines(f *os.File, n int) (int64, [][]byte, error) { + info, err := f.Stat() + if err != nil { + return 0, nil, err + } + size := info.Size() + if size == 0 { + return 0, nil, nil + } + + const chunkSize = int64(4096) + var accumulated []byte + pos := size + for pos > 0 && bytes.Count(accumulated, []byte{'\n'}) <= n { + readSize := chunkSize + if pos < readSize { + readSize = pos + } + pos -= readSize + buf := make([]byte, readSize) + if _, err := f.ReadAt(buf, pos); err != nil { + return 0, nil, err + } + accumulated = append(buf, accumulated...) + } + + // Split into lines and take the last n. + all := bytes.Split(accumulated, []byte{'\n'}) + // If the file ends with a newline the last element is empty — drop it. + if len(all) > 0 && len(all[len(all)-1]) == 0 { + all = all[:len(all)-1] + } + if len(all) > n { + all = all[len(all)-n:] + } + + // Position the file at end so the streaming loop picks up from there. + end, _ := f.Seek(0, io.SeekEnd) + + // Copy to avoid aliasing into accumulated. + out := make([][]byte, len(all)) + for i, l := range all { + out[i] = append([]byte(nil), l...) + } + return end, out, nil +}