From b95af807e518d46c943deec53ce754a863c7cf10 Mon Sep 17 00:00:00 2001 From: CMGS Date: Sat, 23 May 2026 02:20:16 +0800 Subject: [PATCH 1/4] refactor(metering): split into subpackages (file/stderr/capture) + pluggable backend - metering/{file,stderr,capture}/ subpackages mirror hypervisor/snapshot pattern: interface lives in main metering package, backends are subpackage Recorder types exposing a New ctor. - config.MeteringConfig selects the backend ("file" default, "nop", "stderr"); empty config keeps v0.4.x behavior. - cmd/core/metering.go switches over conf.Metering.Backend instead of hard-coding NewFileRecorder. - stderr/Recorder is a new debug backend; capture/Recorder keeps the Entries/Reset test helpers (package doc flags it as testing-only). - file/Recorder.New no longer takes ctx (unused) and surfaces open errors so the cmd/core factory owns the NopRecorder fallback. --- cmd/core/metering.go | 43 +++++++++++++++++---- cmd/core/metering_test.go | 52 +++++++++++++++++++++++++ config/config.go | 2 + config/metering.go | 12 ++++++ hypervisor/state_test.go | 5 ++- metering/capture.go | 34 ----------------- metering/capture/capture.go | 39 +++++++++++++++++++ metering/capture/capture_test.go | 57 ++++++++++++++++++++++++++++ metering/capture_test.go | 47 ----------------------- metering/file.go | 41 -------------------- metering/file/file.go | 44 +++++++++++++++++++++ metering/{ => file}/file_test.go | 51 ++++++++++++++----------- metering/metering.go | 2 +- metering/stderr/stderr.go | 33 ++++++++++++++++ metering/stderr/stderr_test.go | 47 +++++++++++++++++++++++ snapshot/localfile/gc_test.go | 5 ++- snapshot/localfile/localfile_test.go | 7 ++-- 17 files changed, 361 insertions(+), 160 deletions(-) create mode 100644 cmd/core/metering_test.go create mode 100644 config/metering.go delete mode 100644 metering/capture.go create mode 100644 metering/capture/capture.go create mode 100644 metering/capture/capture_test.go delete mode 100644 metering/capture_test.go delete mode 100644 metering/file.go create mode 100644 metering/file/file.go rename metering/{ => file}/file_test.go (53%) create mode 100644 metering/stderr/stderr.go create mode 100644 metering/stderr/stderr_test.go diff --git a/cmd/core/metering.go b/cmd/core/metering.go index fbe108c6..4e25579c 100644 --- a/cmd/core/metering.go +++ b/cmd/core/metering.go @@ -10,6 +10,8 @@ import ( "github.com/cocoonstack/cocoon/config" "github.com/cocoonstack/cocoon/metering" + meteringfile "github.com/cocoonstack/cocoon/metering/file" + meteringstderr "github.com/cocoonstack/cocoon/metering/stderr" ) const ( @@ -22,16 +24,41 @@ var ( meteringRec metering.Recorder ) -// MeteringRecorder returns a process-wide lifecycle recorder; lazy-init shares one ledger fd across all backends, falls back to NopRecorder on fs error. +// MeteringRecorder returns the process-wide recorder per conf.Metering.Backend; lazy-init, falls back to NopRecorder on init failure. func MeteringRecorder(ctx context.Context, conf *config.Config) metering.Recorder { meteringOnce.Do(func() { - dir := filepath.Join(conf.RootDir, meteringSubdir) - if err := os.MkdirAll(dir, 0o750); err != nil { - log.WithFunc("core.MeteringRecorder").Warnf(ctx, "mkdir %s: %v; metering disabled", dir, err) - meteringRec = metering.NopRecorder{} - return - } - meteringRec = metering.NewFileRecorder(ctx, filepath.Join(dir, meteringFile)) + meteringRec = buildRecorder(ctx, conf) }) return meteringRec } + +func buildRecorder(ctx context.Context, conf *config.Config) metering.Recorder { + switch conf.Metering.Backend { + case "", "file": + return buildFileRecorder(ctx, conf) + case "nop": + return metering.NopRecorder{} + case "stderr": + return meteringstderr.New() + default: + log.WithFunc("core.MeteringRecorder").Warnf(ctx, "unknown metering backend %q; using nop", conf.Metering.Backend) + return metering.NopRecorder{} + } +} + +func buildFileRecorder(ctx context.Context, conf *config.Config) metering.Recorder { + path := conf.Metering.File.Path + if path == "" { + path = filepath.Join(conf.RootDir, meteringSubdir, meteringFile) + } + if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil { + log.WithFunc("core.buildFileRecorder").Warnf(ctx, "mkdir %s: %v; metering disabled", filepath.Dir(path), err) + return metering.NopRecorder{} + } + r, err := meteringfile.New(path) + if err != nil { + log.WithFunc("core.buildFileRecorder").Warnf(ctx, "open ledger: %v; metering disabled", err) + return metering.NopRecorder{} + } + return r +} diff --git a/cmd/core/metering_test.go b/cmd/core/metering_test.go new file mode 100644 index 00000000..13c9007f --- /dev/null +++ b/cmd/core/metering_test.go @@ -0,0 +1,52 @@ +package core + +import ( + "fmt" + "testing" + + "github.com/cocoonstack/cocoon/config" + "github.com/cocoonstack/cocoon/metering" + meteringfile "github.com/cocoonstack/cocoon/metering/file" + meteringstderr "github.com/cocoonstack/cocoon/metering/stderr" +) + +func TestBuildRecorderBackends(t *testing.T) { + tests := []struct { + name string + backend string + want metering.Recorder + }{ + {"empty defaults to file", "", (*meteringfile.Recorder)(nil)}, + {"explicit file", "file", (*meteringfile.Recorder)(nil)}, + {"nop", "nop", metering.NopRecorder{}}, + {"stderr", "stderr", (*meteringstderr.Recorder)(nil)}, + {"unknown falls back to nop", "kafka", metering.NopRecorder{}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := &config.Config{ + RootDir: t.TempDir(), + Metering: config.MeteringConfig{Backend: tt.backend}, + } + got := buildRecorder(t.Context(), conf) + if gotT, wantT := fmt.Sprintf("%T", got), fmt.Sprintf("%T", tt.want); gotT != wantT { + t.Errorf("backend=%q got %s, want %s", tt.backend, gotT, wantT) + } + }) + } +} + +func TestBuildFileRecorderCustomPath(t *testing.T) { + dir := t.TempDir() + conf := &config.Config{ + RootDir: dir, + Metering: config.MeteringConfig{ + Backend: "file", + File: config.FileMeteringConfig{Path: dir + "/custom/ledger.jsonl"}, + }, + } + r := buildRecorder(t.Context(), conf) + if _, ok := r.(*meteringfile.Recorder); !ok { + t.Fatalf("got %T, want *meteringfile.Recorder", r) + } +} diff --git a/config/config.go b/config/config.go index 079085ff..b2416b2e 100644 --- a/config/config.go +++ b/config/config.go @@ -57,6 +57,8 @@ type Config struct { TerminateGracePeriodSeconds int `json:"terminate_grace_period_seconds" mapstructure:"terminate_grace_period_seconds"` // Log configuration, uses eru core's ServerLogConfig. Log *coretypes.ServerLogConfig `json:"log" mapstructure:"log"` + // Metering selects the lifecycle-event recorder backend; zero value keeps v0.4.x file behavior. + Metering MeteringConfig `json:"metering,omitzero" mapstructure:"metering"` } // Hypervisor returns the selected hypervisor backend type. diff --git a/config/metering.go b/config/metering.go new file mode 100644 index 00000000..b6caf476 --- /dev/null +++ b/config/metering.go @@ -0,0 +1,12 @@ +package config + +// MeteringConfig selects the recorder backend; empty Backend defaults to "file" so v0.4.x configs keep working. +type MeteringConfig struct { + Backend string `json:"backend,omitempty" mapstructure:"backend"` + File FileMeteringConfig `json:"file,omitzero" mapstructure:"file"` +} + +// FileMeteringConfig parameterizes the file-backend recorder; empty Path resolves to /metering/ledger.jsonl. +type FileMeteringConfig struct { + Path string `json:"path,omitempty" mapstructure:"path"` +} diff --git a/hypervisor/state_test.go b/hypervisor/state_test.go index d358bc0d..c8b5dbc2 100644 --- a/hypervisor/state_test.go +++ b/hypervisor/state_test.go @@ -9,6 +9,7 @@ import ( "github.com/cocoonstack/cocoon/lock/flock" "github.com/cocoonstack/cocoon/metering" + meteringcapture "github.com/cocoonstack/cocoon/metering/capture" storejson "github.com/cocoonstack/cocoon/storage/json" "github.com/cocoonstack/cocoon/types" ) @@ -41,12 +42,12 @@ func (stubBackendConfig) LogDir() string { panic("LogDir: n func (stubBackendConfig) VMRunDir(string) string { panic("VMRunDir: not implemented in stub") } func (stubBackendConfig) VMLogDir(string) string { panic("VMLogDir: not implemented in stub") } -func newMeteringTestBackend(t *testing.T) (*Backend, *metering.CaptureRecorder) { +func newMeteringTestBackend(t *testing.T) (*Backend, *meteringcapture.Recorder) { t.Helper() dir := t.TempDir() locker := flock.New(filepath.Join(dir, "index.lock")) store := storejson.New[VMIndex](filepath.Join(dir, "index.json"), locker) - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() return &Backend{ Typ: "test-hv", Conf: stubBackendConfig{}, diff --git a/metering/capture.go b/metering/capture.go deleted file mode 100644 index 0a06b614..00000000 --- a/metering/capture.go +++ /dev/null @@ -1,34 +0,0 @@ -package metering - -import ( - "context" - "sync" -) - -// CaptureRecorder accumulates entries in memory; intended for tests that assert emit sequences. -type CaptureRecorder struct { - mu sync.Mutex - entries []Entry -} - -func (r *CaptureRecorder) Emit(_ context.Context, e Entry) { - r.mu.Lock() - defer r.mu.Unlock() - r.entries = append(r.entries, e) -} - -// Entries returns a snapshot copy so callers can mutate freely. -func (r *CaptureRecorder) Entries() []Entry { - r.mu.Lock() - defer r.mu.Unlock() - out := make([]Entry, len(r.entries)) - copy(out, r.entries) - return out -} - -// Reset drops buffered entries. -func (r *CaptureRecorder) Reset() { - r.mu.Lock() - defer r.mu.Unlock() - r.entries = nil -} diff --git a/metering/capture/capture.go b/metering/capture/capture.go new file mode 100644 index 00000000..12d278ae --- /dev/null +++ b/metering/capture/capture.go @@ -0,0 +1,39 @@ +// Package capture is a metering recorder for tests; production code should not import it. Entries and Reset are testing-only helpers exposed on top of the Recorder contract. +package capture + +import ( + "context" + "sync" + + "github.com/cocoonstack/cocoon/metering" +) + +var _ metering.Recorder = (*Recorder)(nil) + +type Recorder struct { + mu sync.Mutex + entries []metering.Entry +} + +func New() *Recorder { return &Recorder{} } + +func (r *Recorder) Emit(_ context.Context, e metering.Entry) { + r.mu.Lock() + defer r.mu.Unlock() + r.entries = append(r.entries, e) +} + +// Entries returns a snapshot copy; callers may mutate the returned slice. +func (r *Recorder) Entries() []metering.Entry { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]metering.Entry, len(r.entries)) + copy(out, r.entries) + return out +} + +func (r *Recorder) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + r.entries = nil +} diff --git a/metering/capture/capture_test.go b/metering/capture/capture_test.go new file mode 100644 index 00000000..ab69508a --- /dev/null +++ b/metering/capture/capture_test.go @@ -0,0 +1,57 @@ +package capture + +import ( + "sync" + "testing" + + "github.com/cocoonstack/cocoon/metering" +) + +func TestRecorderBasic(t *testing.T) { + r := New() + ctx := t.Context() + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "a"}) + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStop, VMID: "a"}) + got := r.Entries() + if len(got) != 2 { + t.Fatalf("got %d entries, want 2", len(got)) + } + if got[0].Kind != metering.KindVMComputeStart || got[1].Kind != metering.KindVMComputeStop { + t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind) + } +} + +func TestRecorderEntriesIsCopy(t *testing.T) { + r := New() + r.Emit(t.Context(), metering.Entry{VMID: "a"}) + got := r.Entries() + got[0].VMID = "tampered" + if again := r.Entries(); again[0].VMID != "a" { + t.Errorf("Entries() must return a copy; got %q after mutation", again[0].VMID) + } +} + +func TestRecorderReset(t *testing.T) { + r := New() + r.Emit(t.Context(), metering.Entry{VMID: "a"}) + r.Reset() + if got := r.Entries(); len(got) != 0 { + t.Errorf("after Reset got %d entries, want 0", len(got)) + } +} + +func TestRecorderConcurrent(t *testing.T) { + r := New() + ctx := t.Context() + const N = 200 + var wg sync.WaitGroup + for range N { + wg.Go(func() { + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "vm"}) + }) + } + wg.Wait() + if got := len(r.Entries()); got != N { + t.Errorf("got %d entries, want %d", got, N) + } +} diff --git a/metering/capture_test.go b/metering/capture_test.go deleted file mode 100644 index 489f1af1..00000000 --- a/metering/capture_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package metering - -import ( - "sync" - "testing" -) - -func TestCaptureRecorderBasic(t *testing.T) { - var r CaptureRecorder - ctx := t.Context() - r.Emit(ctx, Entry{Kind: KindVMComputeStart, VMID: "a"}) - r.Emit(ctx, Entry{Kind: KindVMComputeStop, VMID: "a"}) - got := r.Entries() - if len(got) != 2 { - t.Fatalf("got %d entries, want 2", len(got)) - } - if got[0].Kind != KindVMComputeStart || got[1].Kind != KindVMComputeStop { - t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind) - } -} - -func TestCaptureRecorderEntriesIsCopy(t *testing.T) { - // Mutating the returned slice must not affect subsequent reads. - var r CaptureRecorder - r.Emit(t.Context(), Entry{VMID: "a"}) - got := r.Entries() - got[0].VMID = "tampered" - if again := r.Entries(); again[0].VMID != "a" { - t.Errorf("Entries() must return a copy; got %q after mutation", again[0].VMID) - } -} - -func TestCaptureRecorderConcurrent(t *testing.T) { - var r CaptureRecorder - ctx := t.Context() - const N = 200 - var wg sync.WaitGroup - for range N { - wg.Go(func() { - r.Emit(ctx, Entry{Kind: KindVMComputeStart, VMID: "vm"}) - }) - } - wg.Wait() - if got := len(r.Entries()); got != N { - t.Errorf("got %d entries, want %d", got, N) - } -} diff --git a/metering/file.go b/metering/file.go deleted file mode 100644 index 028cf3e9..00000000 --- a/metering/file.go +++ /dev/null @@ -1,41 +0,0 @@ -package metering - -import ( - "context" - "encoding/json" - "os" - "sync" - - "github.com/projecteru2/core/log" -) - -// FileRecorder appends JSON-encoded entries one per line under sync.Mutex; cross-process atomicity comes from O_APPEND. -type FileRecorder struct { - mu sync.Mutex - f *os.File -} - -// NewFileRecorder opens path append-only; returns NopRecorder on failure. -func NewFileRecorder(ctx context.Context, path string) Recorder { - f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) //nolint:gosec // internal runtime path - if err != nil { - log.WithFunc("metering.NewFileRecorder").Warnf(ctx, "open %s: %v; metering disabled", path, err) - return NopRecorder{} - } - return &FileRecorder{f: f} -} - -// Emit logs and swallows write errors to never block callers. -func (r *FileRecorder) Emit(ctx context.Context, e Entry) { - data, err := json.Marshal(e) - if err != nil { - log.WithFunc("metering.FileRecorder.Emit").Warnf(ctx, "marshal entry: %v", err) - return - } - data = append(data, '\n') - r.mu.Lock() - defer r.mu.Unlock() - if _, err := r.f.Write(data); err != nil { - log.WithFunc("metering.FileRecorder.Emit").Warnf(ctx, "write entry: %v", err) - } -} diff --git a/metering/file/file.go b/metering/file/file.go new file mode 100644 index 00000000..dcc8aa42 --- /dev/null +++ b/metering/file/file.go @@ -0,0 +1,44 @@ +package file + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/projecteru2/core/log" + + "github.com/cocoonstack/cocoon/metering" +) + +var _ metering.Recorder = (*Recorder)(nil) + +// Recorder appends JSON-encoded entries one per line under sync.Mutex; cross-process atomicity comes from O_APPEND. +type Recorder struct { + mu sync.Mutex + f *os.File +} + +func New(path string) (*Recorder, error) { + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) //nolint:gosec + if err != nil { + return nil, fmt.Errorf("open ledger %s: %w", path, err) + } + return &Recorder{f: f}, nil +} + +// Emit logs and swallows write errors to never block callers. +func (r *Recorder) Emit(ctx context.Context, e metering.Entry) { + data, err := json.Marshal(e) + if err != nil { + log.WithFunc("metering/file.Recorder.Emit").Warnf(ctx, "marshal entry: %v", err) + return + } + data = append(data, '\n') + r.mu.Lock() + defer r.mu.Unlock() + if _, err := r.f.Write(data); err != nil { + log.WithFunc("metering/file.Recorder.Emit").Warnf(ctx, "write entry: %v", err) + } +} diff --git a/metering/file_test.go b/metering/file/file_test.go similarity index 53% rename from metering/file_test.go rename to metering/file/file_test.go index 74ef2b28..5dc911ad 100644 --- a/metering/file_test.go +++ b/metering/file/file_test.go @@ -1,4 +1,4 @@ -package metering +package file import ( "bufio" @@ -8,32 +8,37 @@ import ( "sync" "testing" "time" + + "github.com/cocoonstack/cocoon/metering" ) -func TestFileRecorderRoundTrip(t *testing.T) { +func TestRecorderRoundTrip(t *testing.T) { ctx := t.Context() path := filepath.Join(t.TempDir(), "ledger.jsonl") - r := NewFileRecorder(ctx, path) + r, err := New(path) + if err != nil { + t.Fatalf("New: %v", err) + } now := time.Now().UTC().Truncate(time.Microsecond) - r.Emit(ctx, Entry{ + r.Emit(ctx, metering.Entry{ EmittedAt: now, - Kind: KindVMComputeStart, + Kind: metering.KindVMComputeStart, VMID: "vm1", - Reason: ReasonBoot, - Shape: Shape{CPU: 4, MemBytes: 1 << 30}, + Reason: metering.ReasonBoot, + Shape: metering.Shape{CPU: 4, MemBytes: 1 << 30}, }) - r.Emit(ctx, Entry{ + r.Emit(ctx, metering.Entry{ EmittedAt: now.Add(time.Second), - Kind: KindVMComputeStop, + Kind: metering.KindVMComputeStop, VMID: "vm1", - Reason: ReasonStopUser, + Reason: metering.ReasonStopUser, }) got := readEntries(t, path) if len(got) != 2 { t.Fatalf("got %d entries, want 2", len(got)) } - if got[0].Kind != KindVMComputeStart || got[1].Kind != KindVMComputeStop { + if got[0].Kind != metering.KindVMComputeStart || got[1].Kind != metering.KindVMComputeStop { t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind) } if got[0].Shape.CPU != 4 { @@ -41,15 +46,18 @@ func TestFileRecorderRoundTrip(t *testing.T) { } } -func TestFileRecorderConcurrent(t *testing.T) { +func TestRecorderConcurrent(t *testing.T) { ctx := t.Context() path := filepath.Join(t.TempDir(), "ledger.jsonl") - r := NewFileRecorder(ctx, path) + r, err := New(path) + if err != nil { + t.Fatalf("New: %v", err) + } const N = 200 var wg sync.WaitGroup for i := range N { wg.Go(func() { - r.Emit(ctx, Entry{Kind: KindVMComputeStart, VMID: "vm", Shape: Shape{CPU: i}}) + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "vm", Shape: metering.Shape{CPU: i}}) }) } wg.Wait() @@ -60,15 +68,14 @@ func TestFileRecorderConcurrent(t *testing.T) { } } -func TestNewFileRecorderFallback(t *testing.T) { - // Parent dir doesn't exist → OpenFile fails → fallback to NopRecorder. - r := NewFileRecorder(t.Context(), filepath.Join(t.TempDir(), "missing-subdir", "ledger.jsonl")) - if _, ok := r.(NopRecorder); !ok { - t.Errorf("got %T, want NopRecorder", r) +func TestNewMissingParentDirErrors(t *testing.T) { + _, err := New(filepath.Join(t.TempDir(), "missing-subdir", "ledger.jsonl")) + if err == nil { + t.Error("expected error opening ledger in missing dir; got nil") } } -func readEntries(t *testing.T, path string) []Entry { +func readEntries(t *testing.T, path string) []metering.Entry { t.Helper() f, err := os.Open(path) //nolint:gosec // test-controlled path if err != nil { @@ -76,10 +83,10 @@ func readEntries(t *testing.T, path string) []Entry { } defer f.Close() //nolint:errcheck - var out []Entry + var out []metering.Entry sc := bufio.NewScanner(f) for sc.Scan() { - var e Entry + var e metering.Entry if err := json.Unmarshal(sc.Bytes(), &e); err != nil { t.Fatalf("unmarshal %q: %v", sc.Text(), err) } diff --git a/metering/metering.go b/metering/metering.go index 23612d75..a2b27e21 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -1,4 +1,4 @@ -// Package metering emits append-only VM/snapshot lifecycle endpoints; tenant attribution lives upstream. +// Package metering emits append-only VM/snapshot lifecycle endpoints; tenant attribution lives upstream. Recorder is the contract; backends live in subpackages (file, stderr, capture). package metering import ( diff --git a/metering/stderr/stderr.go b/metering/stderr/stderr.go new file mode 100644 index 00000000..e48757a9 --- /dev/null +++ b/metering/stderr/stderr.go @@ -0,0 +1,33 @@ +package stderr + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "sync" + + "github.com/cocoonstack/cocoon/metering" +) + +var _ metering.Recorder = (*Recorder)(nil) + +// Recorder writes one JSON entry per line to a writer (os.Stderr by default); for dev / debug only. +type Recorder struct { + mu sync.Mutex + out io.Writer +} + +func New() *Recorder { return &Recorder{out: os.Stderr} } + +// Emit best-effort writes; marshal errors silently dropped (stderr is fire-and-forget). +func (r *Recorder) Emit(_ context.Context, e metering.Entry) { + data, err := json.Marshal(e) + if err != nil { + return + } + r.mu.Lock() + defer r.mu.Unlock() + fmt.Fprintln(r.out, string(data)) //nolint:errcheck // fire-and-forget; stderr write failures are not actionable +} diff --git a/metering/stderr/stderr_test.go b/metering/stderr/stderr_test.go new file mode 100644 index 00000000..b201a29a --- /dev/null +++ b/metering/stderr/stderr_test.go @@ -0,0 +1,47 @@ +package stderr + +import ( + "bytes" + "encoding/json" + "strings" + "sync" + "testing" + + "github.com/cocoonstack/cocoon/metering" +) + +func TestRecorderEmitsJSONLine(t *testing.T) { + var buf bytes.Buffer + r := &Recorder{out: &buf} + r.Emit(t.Context(), metering.Entry{ + Kind: metering.KindVMComputeStart, + VMID: "vm1", + Hypervisor: "test", + Shape: metering.Shape{CPU: 2}, + }) + line := strings.TrimRight(buf.String(), "\n") + var got metering.Entry + if err := json.Unmarshal([]byte(line), &got); err != nil { + t.Fatalf("emitted line is not valid JSON %q: %v", line, err) + } + if got.Kind != metering.KindVMComputeStart || got.VMID != "vm1" { + t.Errorf("got %+v", got) + } +} + +func TestRecorderConcurrent(t *testing.T) { + var buf bytes.Buffer + r := &Recorder{out: &buf} + const N = 100 + var wg sync.WaitGroup + for i := range N { + wg.Go(func() { + r.Emit(t.Context(), metering.Entry{Kind: metering.KindVMComputeStart, Shape: metering.Shape{CPU: i}}) + }) + } + wg.Wait() + lines := strings.Split(strings.TrimRight(buf.String(), "\n"), "\n") + if len(lines) != N { + t.Errorf("got %d lines, want %d", len(lines), N) + } +} diff --git a/snapshot/localfile/gc_test.go b/snapshot/localfile/gc_test.go index f81da45e..069542e0 100644 --- a/snapshot/localfile/gc_test.go +++ b/snapshot/localfile/gc_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cocoonstack/cocoon/metering" + meteringcapture "github.com/cocoonstack/cocoon/metering/capture" "github.com/cocoonstack/cocoon/snapshot" "github.com/cocoonstack/cocoon/types" ) @@ -367,7 +368,7 @@ func TestGCModule_EvictRealRecordEmitsSnapStorageStop(t *testing.T) { } } - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() mod := gcModule(lf.conf, lf.store, lf.locker, EvictionPolicy{Enabled: true}, rec) snap, err := mod.ReadDB(ctx) if err != nil { @@ -424,7 +425,7 @@ func TestGCModule_OrphanAndStalePendingDoNotEmit(t *testing.T) { t.Fatal(err) } - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() mod := gcModule(lf.conf, lf.store, lf.locker, EvictionPolicy{}, rec) snap, err := mod.ReadDB(ctx) if err != nil { diff --git a/snapshot/localfile/localfile_test.go b/snapshot/localfile/localfile_test.go index f2ea4c58..6adc48c7 100644 --- a/snapshot/localfile/localfile_test.go +++ b/snapshot/localfile/localfile_test.go @@ -15,6 +15,7 @@ import ( "github.com/cocoonstack/cocoon/config" "github.com/cocoonstack/cocoon/metering" + meteringcapture "github.com/cocoonstack/cocoon/metering/capture" "github.com/cocoonstack/cocoon/snapshot" "github.com/cocoonstack/cocoon/types" "github.com/cocoonstack/cocoon/utils" @@ -90,7 +91,7 @@ func TestNew_NilConfig(t *testing.T) { // Create func TestCreateAndDeleteEmitMetering(t *testing.T) { - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() lf := newTestLFWithRecorder(t, rec) ctx := t.Context() @@ -132,7 +133,7 @@ func TestDeleteOneIdempotentDoesNotEmitTwice(t *testing.T) { // phantom snap.storage.stop with an empty Hypervisor field. We exercise this // by calling deleteOne twice on the same id (idempotent), simulating the // loser running its loop body after the winner already committed. - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() lf := newTestLFWithRecorder(t, rec) ctx := t.Context() @@ -176,7 +177,7 @@ func kinds(entries []metering.Entry) []metering.Kind { } func TestImportEmitsSnapStorageStart(t *testing.T) { - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() lf := newTestLFWithRecorder(t, rec) ctx := t.Context() From 89dead661323f4d445241706d7b0abc09434c6b3 Mon Sep 17 00:00:00 2001 From: CMGS Date: Sat, 23 May 2026 02:32:16 +0800 Subject: [PATCH 2/4] refactor(metering): apply /code review feedback - config.MeteringBackend typed enum + constants (MeteringFile/Nop/Stderr) mirror the HypervisorType pattern; cmd/core switch now uses typed cases. - MeteringConfig.Validate() rejects unknown backends at startup (hooked into config.Config.Validate) instead of warn-and-nop at first emit. - stderr.Recorder.Emit uses Write with appended newline (matches file.Recorder.Emit shape, drops fmt.Fprintln + nolint:errcheck). - Tighten godocs: stderr.Emit doc removed (impl is obvious), capture package doc collapsed to one sentence, MeteringRecorder godoc trimmed to the lazy-init contract. --- cmd/core/metering.go | 2 +- cmd/core/metering_test.go | 10 +++++----- config/config.go | 3 +++ config/metering.go | 24 ++++++++++++++++++++++-- config/metering_test.go | 31 +++++++++++++++++++++++++++++++ metering/capture/capture.go | 2 +- metering/stderr/stderr.go | 7 +++---- 7 files changed, 66 insertions(+), 13 deletions(-) create mode 100644 config/metering_test.go diff --git a/cmd/core/metering.go b/cmd/core/metering.go index 4e25579c..86e038be 100644 --- a/cmd/core/metering.go +++ b/cmd/core/metering.go @@ -24,7 +24,7 @@ var ( meteringRec metering.Recorder ) -// MeteringRecorder returns the process-wide recorder per conf.Metering.Backend; lazy-init, falls back to NopRecorder on init failure. +// MeteringRecorder returns the process-wide recorder per conf.Metering.Backend; lazy-init shared across callers. func MeteringRecorder(ctx context.Context, conf *config.Config) metering.Recorder { meteringOnce.Do(func() { meteringRec = buildRecorder(ctx, conf) diff --git a/cmd/core/metering_test.go b/cmd/core/metering_test.go index 13c9007f..2297ada7 100644 --- a/cmd/core/metering_test.go +++ b/cmd/core/metering_test.go @@ -13,13 +13,13 @@ import ( func TestBuildRecorderBackends(t *testing.T) { tests := []struct { name string - backend string + backend config.MeteringBackend want metering.Recorder }{ {"empty defaults to file", "", (*meteringfile.Recorder)(nil)}, - {"explicit file", "file", (*meteringfile.Recorder)(nil)}, - {"nop", "nop", metering.NopRecorder{}}, - {"stderr", "stderr", (*meteringstderr.Recorder)(nil)}, + {"explicit file", config.MeteringFile, (*meteringfile.Recorder)(nil)}, + {"nop", config.MeteringNop, metering.NopRecorder{}}, + {"stderr", config.MeteringStderr, (*meteringstderr.Recorder)(nil)}, {"unknown falls back to nop", "kafka", metering.NopRecorder{}}, } for _, tt := range tests { @@ -41,7 +41,7 @@ func TestBuildFileRecorderCustomPath(t *testing.T) { conf := &config.Config{ RootDir: dir, Metering: config.MeteringConfig{ - Backend: "file", + Backend: config.MeteringFile, File: config.FileMeteringConfig{Path: dir + "/custom/ledger.jsonl"}, }, } diff --git a/config/config.go b/config/config.go index b2416b2e..5d532d3f 100644 --- a/config/config.go +++ b/config/config.go @@ -95,6 +95,9 @@ func (c *Config) Validate() error { if _, err := c.DNSServers(); err != nil { return fmt.Errorf("dns: %w", err) } + if err := c.Metering.Validate(); err != nil { + return err + } return nil } diff --git a/config/metering.go b/config/metering.go index b6caf476..c2dda553 100644 --- a/config/metering.go +++ b/config/metering.go @@ -1,8 +1,19 @@ package config -// MeteringConfig selects the recorder backend; empty Backend defaults to "file" so v0.4.x configs keep working. +import "fmt" + +const ( + MeteringFile MeteringBackend = "file" + MeteringNop MeteringBackend = "nop" + MeteringStderr MeteringBackend = "stderr" +) + +// MeteringBackend identifies the lifecycle-event recorder backend. +type MeteringBackend string + +// MeteringConfig selects the recorder backend; empty Backend defaults to MeteringFile so v0.4.x configs keep working. type MeteringConfig struct { - Backend string `json:"backend,omitempty" mapstructure:"backend"` + Backend MeteringBackend `json:"backend,omitempty" mapstructure:"backend"` File FileMeteringConfig `json:"file,omitzero" mapstructure:"file"` } @@ -10,3 +21,12 @@ type MeteringConfig struct { type FileMeteringConfig struct { Path string `json:"path,omitempty" mapstructure:"path"` } + +func (m MeteringConfig) Validate() error { + switch m.Backend { + case "", MeteringFile, MeteringNop, MeteringStderr: + return nil + default: + return fmt.Errorf("metering.backend %q is not one of file|nop|stderr", m.Backend) + } +} diff --git a/config/metering_test.go b/config/metering_test.go new file mode 100644 index 00000000..8e15df2e --- /dev/null +++ b/config/metering_test.go @@ -0,0 +1,31 @@ +package config + +import ( + "strings" + "testing" +) + +func TestMeteringConfigValidate(t *testing.T) { + tests := []struct { + name string + backend MeteringBackend + wantErr bool + }{ + {"empty defaults", "", false}, + {"file", MeteringFile, false}, + {"nop", MeteringNop, false}, + {"stderr", MeteringStderr, false}, + {"unknown", "kafka", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := MeteringConfig{Backend: tt.backend}.Validate() + if (err != nil) != tt.wantErr { + t.Errorf("Validate(%q) err = %v, wantErr=%v", tt.backend, err, tt.wantErr) + } + if tt.wantErr && err != nil && !strings.Contains(err.Error(), "metering.backend") { + t.Errorf("err = %v; want containing 'metering.backend'", err) + } + }) + } +} diff --git a/metering/capture/capture.go b/metering/capture/capture.go index 12d278ae..81dfe1a6 100644 --- a/metering/capture/capture.go +++ b/metering/capture/capture.go @@ -1,4 +1,4 @@ -// Package capture is a metering recorder for tests; production code should not import it. Entries and Reset are testing-only helpers exposed on top of the Recorder contract. +// Package capture is a test-only metering recorder; Entries and Reset are testing helpers. package capture import ( diff --git a/metering/stderr/stderr.go b/metering/stderr/stderr.go index e48757a9..8a4edb3c 100644 --- a/metering/stderr/stderr.go +++ b/metering/stderr/stderr.go @@ -3,7 +3,6 @@ package stderr import ( "context" "encoding/json" - "fmt" "io" "os" "sync" @@ -13,7 +12,7 @@ import ( var _ metering.Recorder = (*Recorder)(nil) -// Recorder writes one JSON entry per line to a writer (os.Stderr by default); for dev / debug only. +// Recorder writes one JSON entry per line; dev/debug only. type Recorder struct { mu sync.Mutex out io.Writer @@ -21,13 +20,13 @@ type Recorder struct { func New() *Recorder { return &Recorder{out: os.Stderr} } -// Emit best-effort writes; marshal errors silently dropped (stderr is fire-and-forget). func (r *Recorder) Emit(_ context.Context, e metering.Entry) { data, err := json.Marshal(e) if err != nil { return } + data = append(data, '\n') r.mu.Lock() defer r.mu.Unlock() - fmt.Fprintln(r.out, string(data)) //nolint:errcheck // fire-and-forget; stderr write failures are not actionable + _, _ = r.out.Write(data) } From 4ed98d2e588fcfcb976b53da8ad71f4ec0d55c1d Mon Sep 17 00:00:00 2001 From: CMGS Date: Sat, 23 May 2026 02:41:41 +0800 Subject: [PATCH 3/4] refactor(metering): centralize JSONL wire format on Entry.WriteTo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Entry now implements io.WriterTo; file and stderr recorders share the same encoding path instead of each calling json.Marshal + appending \n. Wire-format changes (encoder swap, framing, compression) are now one diff in metering.go rather than scattered across backends. file.Emit collapses two log paths (marshal-error vs write-error) into one — the wrapped error already carries the cause, and downstream cares only that the entry failed. --- metering/file/file.go | 11 ++--------- metering/metering.go | 15 +++++++++++++++ metering/metering_test.go | 37 +++++++++++++++++++++++++++++++++++++ metering/stderr/stderr.go | 8 +------- 4 files changed, 55 insertions(+), 16 deletions(-) diff --git a/metering/file/file.go b/metering/file/file.go index dcc8aa42..fc6eff4d 100644 --- a/metering/file/file.go +++ b/metering/file/file.go @@ -2,7 +2,6 @@ package file import ( "context" - "encoding/json" "fmt" "os" "sync" @@ -30,15 +29,9 @@ func New(path string) (*Recorder, error) { // Emit logs and swallows write errors to never block callers. func (r *Recorder) Emit(ctx context.Context, e metering.Entry) { - data, err := json.Marshal(e) - if err != nil { - log.WithFunc("metering/file.Recorder.Emit").Warnf(ctx, "marshal entry: %v", err) - return - } - data = append(data, '\n') r.mu.Lock() defer r.mu.Unlock() - if _, err := r.f.Write(data); err != nil { - log.WithFunc("metering/file.Recorder.Emit").Warnf(ctx, "write entry: %v", err) + if _, err := e.WriteTo(r.f); err != nil { + log.WithFunc("metering/file.Recorder.Emit").Warnf(ctx, "emit entry: %v", err) } } diff --git a/metering/metering.go b/metering/metering.go index a2b27e21..a5ac6bcd 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -3,9 +3,13 @@ package metering import ( "context" + "encoding/json" + "io" "time" ) +var _ io.WriterTo = Entry{} + // Kind identifies a lifecycle endpoint; downstream pairs *.start with *.stop by id. type Kind string @@ -50,6 +54,17 @@ type Entry struct { EmittedAt time.Time `json:"emitted_at"` } +// WriteTo encodes e as a JSON object followed by '\n' — the JSONL wire format consumed by downstream tools (BigQuery, log shippers). +func (e Entry) WriteTo(w io.Writer) (int64, error) { + data, err := json.Marshal(e) + if err != nil { + return 0, err + } + data = append(data, '\n') + n, err := w.Write(data) + return int64(n), err +} + // Recorder accepts lifecycle entries; implementations must be safe for concurrent use. type Recorder interface { Emit(context.Context, Entry) diff --git a/metering/metering_test.go b/metering/metering_test.go index be34c77b..4fad3556 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -1,7 +1,9 @@ package metering import ( + "bytes" "encoding/json" + "errors" "testing" "time" ) @@ -53,3 +55,38 @@ func TestNopRecorder(t *testing.T) { r.Emit(t.Context(), Entry{Kind: KindVMComputeStart, VMID: "x"}) // no panic, no state — only assertion is "does not crash" } + +func TestEntryWriteToProducesJSONLine(t *testing.T) { + e := Entry{Kind: KindVMComputeStart, VMID: "vm1", Shape: Shape{CPU: 2}} + var buf bytes.Buffer + n, err := e.WriteTo(&buf) + if err != nil { + t.Fatalf("WriteTo: %v", err) + } + if int(n) != buf.Len() { + t.Errorf("n=%d, want %d", n, buf.Len()) + } + if buf.Bytes()[buf.Len()-1] != '\n' { + t.Errorf("missing trailing newline; got %q", buf.String()) + } + var got Entry + if err := json.Unmarshal(buf.Bytes()[:buf.Len()-1], &got); err != nil { + t.Fatalf("unmarshal %q: %v", buf.String(), err) + } + if got.Kind != e.Kind || got.VMID != e.VMID || got.Shape.CPU != e.Shape.CPU { + t.Errorf("got %+v, want %+v", got, e) + } +} + +func TestEntryWriteToPropagatesWriterError(t *testing.T) { + sentinel := errors.New("boom") + e := Entry{Kind: KindVMComputeStart, VMID: "vm1"} + _, err := e.WriteTo(errWriter{err: sentinel}) + if !errors.Is(err, sentinel) { + t.Errorf("got err %v, want sentinel %v", err, sentinel) + } +} + +type errWriter struct{ err error } + +func (w errWriter) Write([]byte) (int, error) { return 0, w.err } diff --git a/metering/stderr/stderr.go b/metering/stderr/stderr.go index 8a4edb3c..f09b9d48 100644 --- a/metering/stderr/stderr.go +++ b/metering/stderr/stderr.go @@ -2,7 +2,6 @@ package stderr import ( "context" - "encoding/json" "io" "os" "sync" @@ -21,12 +20,7 @@ type Recorder struct { func New() *Recorder { return &Recorder{out: os.Stderr} } func (r *Recorder) Emit(_ context.Context, e metering.Entry) { - data, err := json.Marshal(e) - if err != nil { - return - } - data = append(data, '\n') r.mu.Lock() defer r.mu.Unlock() - _, _ = r.out.Write(data) + _, _ = e.WriteTo(r.out) } From a3af4dff99d0e9cf7c17edb2ff3732c4b40afe0e Mon Sep 17 00:00:00 2001 From: CMGS Date: Sat, 23 May 2026 02:44:47 +0800 Subject: [PATCH 4/4] refactor(metering): trim godocs - WriteTo: "writes one JSONL record" (was: wire-format + downstream tools). - file.Recorder / file.Emit: drop the JSON-encoding narrative now that it lives on Entry.WriteTo; keep only the file-specific WHY (O_APPEND atomicity, swallow rationale). - stderr.Recorder: routes to os.Stderr instead of restating the wire format. - config: drop v0.4.x backstory; the default-behavior sentence carries the same information. - metering_test: drop the comment that just paraphrased the test name. --- config/config.go | 2 +- config/metering.go | 2 +- metering/file/file.go | 4 ++-- metering/metering.go | 2 +- metering/metering_test.go | 1 - metering/stderr/stderr.go | 2 +- 6 files changed, 6 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 5d532d3f..e921632c 100644 --- a/config/config.go +++ b/config/config.go @@ -57,7 +57,7 @@ type Config struct { TerminateGracePeriodSeconds int `json:"terminate_grace_period_seconds" mapstructure:"terminate_grace_period_seconds"` // Log configuration, uses eru core's ServerLogConfig. Log *coretypes.ServerLogConfig `json:"log" mapstructure:"log"` - // Metering selects the lifecycle-event recorder backend; zero value keeps v0.4.x file behavior. + // Metering selects the lifecycle-event recorder backend. Metering MeteringConfig `json:"metering,omitzero" mapstructure:"metering"` } diff --git a/config/metering.go b/config/metering.go index c2dda553..ffb63cbc 100644 --- a/config/metering.go +++ b/config/metering.go @@ -11,7 +11,7 @@ const ( // MeteringBackend identifies the lifecycle-event recorder backend. type MeteringBackend string -// MeteringConfig selects the recorder backend; empty Backend defaults to MeteringFile so v0.4.x configs keep working. +// MeteringConfig selects the recorder backend; empty Backend defaults to MeteringFile. type MeteringConfig struct { Backend MeteringBackend `json:"backend,omitempty" mapstructure:"backend"` File FileMeteringConfig `json:"file,omitzero" mapstructure:"file"` diff --git a/metering/file/file.go b/metering/file/file.go index fc6eff4d..82699bb0 100644 --- a/metering/file/file.go +++ b/metering/file/file.go @@ -13,7 +13,7 @@ import ( var _ metering.Recorder = (*Recorder)(nil) -// Recorder appends JSON-encoded entries one per line under sync.Mutex; cross-process atomicity comes from O_APPEND. +// Recorder appends entries under sync.Mutex; O_APPEND gives cross-process atomicity. type Recorder struct { mu sync.Mutex f *os.File @@ -27,7 +27,7 @@ func New(path string) (*Recorder, error) { return &Recorder{f: f}, nil } -// Emit logs and swallows write errors to never block callers. +// Emit swallows write errors so callers never block. func (r *Recorder) Emit(ctx context.Context, e metering.Entry) { r.mu.Lock() defer r.mu.Unlock() diff --git a/metering/metering.go b/metering/metering.go index a5ac6bcd..3447a184 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -54,7 +54,7 @@ type Entry struct { EmittedAt time.Time `json:"emitted_at"` } -// WriteTo encodes e as a JSON object followed by '\n' — the JSONL wire format consumed by downstream tools (BigQuery, log shippers). +// WriteTo writes one JSONL record. func (e Entry) WriteTo(w io.Writer) (int64, error) { data, err := json.Marshal(e) if err != nil { diff --git a/metering/metering_test.go b/metering/metering_test.go index 4fad3556..1a4607b6 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -53,7 +53,6 @@ func TestKindWireFormat(t *testing.T) { func TestNopRecorder(t *testing.T) { var r NopRecorder r.Emit(t.Context(), Entry{Kind: KindVMComputeStart, VMID: "x"}) - // no panic, no state — only assertion is "does not crash" } func TestEntryWriteToProducesJSONLine(t *testing.T) { diff --git a/metering/stderr/stderr.go b/metering/stderr/stderr.go index f09b9d48..cc56989c 100644 --- a/metering/stderr/stderr.go +++ b/metering/stderr/stderr.go @@ -11,7 +11,7 @@ import ( var _ metering.Recorder = (*Recorder)(nil) -// Recorder writes one JSON entry per line; dev/debug only. +// Recorder writes entries to os.Stderr; dev/debug only. type Recorder struct { mu sync.Mutex out io.Writer