diff --git a/cmd/core/metering.go b/cmd/core/metering.go index fbe108c6..86e038be 100644 --- a/cmd/core/metering.go +++ b/cmd/core/metering.go @@ -10,6 +10,8 @@ import ( "github.com/cocoonstack/cocoon/config" "github.com/cocoonstack/cocoon/metering" + meteringfile "github.com/cocoonstack/cocoon/metering/file" + meteringstderr "github.com/cocoonstack/cocoon/metering/stderr" ) const ( @@ -22,16 +24,41 @@ var ( meteringRec metering.Recorder ) -// MeteringRecorder returns a process-wide lifecycle recorder; lazy-init shares one ledger fd across all backends, falls back to NopRecorder on fs error. +// MeteringRecorder returns the process-wide recorder per conf.Metering.Backend; lazy-init shared across callers. func MeteringRecorder(ctx context.Context, conf *config.Config) metering.Recorder { meteringOnce.Do(func() { - dir := filepath.Join(conf.RootDir, meteringSubdir) - if err := os.MkdirAll(dir, 0o750); err != nil { - log.WithFunc("core.MeteringRecorder").Warnf(ctx, "mkdir %s: %v; metering disabled", dir, err) - meteringRec = metering.NopRecorder{} - return - } - meteringRec = metering.NewFileRecorder(ctx, filepath.Join(dir, meteringFile)) + meteringRec = buildRecorder(ctx, conf) }) return meteringRec } + +func buildRecorder(ctx context.Context, conf *config.Config) metering.Recorder { + switch conf.Metering.Backend { + case "", "file": + return buildFileRecorder(ctx, conf) + case "nop": + return metering.NopRecorder{} + case "stderr": + return meteringstderr.New() + default: + log.WithFunc("core.MeteringRecorder").Warnf(ctx, "unknown metering backend %q; using nop", conf.Metering.Backend) + return metering.NopRecorder{} + } +} + +func buildFileRecorder(ctx context.Context, conf *config.Config) metering.Recorder { + path := conf.Metering.File.Path + if path == "" { + path = filepath.Join(conf.RootDir, meteringSubdir, meteringFile) + } + if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil { + log.WithFunc("core.buildFileRecorder").Warnf(ctx, "mkdir %s: %v; metering disabled", filepath.Dir(path), err) + return metering.NopRecorder{} + } + r, err := meteringfile.New(path) + if err != nil { + log.WithFunc("core.buildFileRecorder").Warnf(ctx, "open ledger: %v; metering disabled", err) + return metering.NopRecorder{} + } + return r +} diff --git a/cmd/core/metering_test.go b/cmd/core/metering_test.go new file mode 100644 index 00000000..2297ada7 --- /dev/null +++ b/cmd/core/metering_test.go @@ -0,0 +1,52 @@ +package core + +import ( + "fmt" + "testing" + + "github.com/cocoonstack/cocoon/config" + "github.com/cocoonstack/cocoon/metering" + meteringfile "github.com/cocoonstack/cocoon/metering/file" + meteringstderr "github.com/cocoonstack/cocoon/metering/stderr" +) + +func TestBuildRecorderBackends(t *testing.T) { + tests := []struct { + name string + backend config.MeteringBackend + want metering.Recorder + }{ + {"empty defaults to file", "", (*meteringfile.Recorder)(nil)}, + {"explicit file", config.MeteringFile, (*meteringfile.Recorder)(nil)}, + {"nop", config.MeteringNop, metering.NopRecorder{}}, + {"stderr", config.MeteringStderr, (*meteringstderr.Recorder)(nil)}, + {"unknown falls back to nop", "kafka", metering.NopRecorder{}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := &config.Config{ + RootDir: t.TempDir(), + Metering: config.MeteringConfig{Backend: tt.backend}, + } + got := buildRecorder(t.Context(), conf) + if gotT, wantT := fmt.Sprintf("%T", got), fmt.Sprintf("%T", tt.want); gotT != wantT { + t.Errorf("backend=%q got %s, want %s", tt.backend, gotT, wantT) + } + }) + } +} + +func TestBuildFileRecorderCustomPath(t *testing.T) { + dir := t.TempDir() + conf := &config.Config{ + RootDir: dir, + Metering: config.MeteringConfig{ + Backend: config.MeteringFile, + File: config.FileMeteringConfig{Path: dir + "/custom/ledger.jsonl"}, + }, + } + r := buildRecorder(t.Context(), conf) + if _, ok := r.(*meteringfile.Recorder); !ok { + t.Fatalf("got %T, want *meteringfile.Recorder", r) + } +} diff --git a/config/config.go b/config/config.go index 079085ff..e921632c 100644 --- a/config/config.go +++ b/config/config.go @@ -57,6 +57,8 @@ type Config struct { TerminateGracePeriodSeconds int `json:"terminate_grace_period_seconds" mapstructure:"terminate_grace_period_seconds"` // Log configuration, uses eru core's ServerLogConfig. Log *coretypes.ServerLogConfig `json:"log" mapstructure:"log"` + // Metering selects the lifecycle-event recorder backend. + Metering MeteringConfig `json:"metering,omitzero" mapstructure:"metering"` } // Hypervisor returns the selected hypervisor backend type. @@ -93,6 +95,9 @@ func (c *Config) Validate() error { if _, err := c.DNSServers(); err != nil { return fmt.Errorf("dns: %w", err) } + if err := c.Metering.Validate(); err != nil { + return err + } return nil } diff --git a/config/metering.go b/config/metering.go new file mode 100644 index 00000000..ffb63cbc --- /dev/null +++ b/config/metering.go @@ -0,0 +1,32 @@ +package config + +import "fmt" + +const ( + MeteringFile MeteringBackend = "file" + MeteringNop MeteringBackend = "nop" + MeteringStderr MeteringBackend = "stderr" +) + +// MeteringBackend identifies the lifecycle-event recorder backend. +type MeteringBackend string + +// MeteringConfig selects the recorder backend; empty Backend defaults to MeteringFile. +type MeteringConfig struct { + Backend MeteringBackend `json:"backend,omitempty" mapstructure:"backend"` + File FileMeteringConfig `json:"file,omitzero" mapstructure:"file"` +} + +// FileMeteringConfig parameterizes the file-backend recorder; empty Path resolves to /metering/ledger.jsonl. +type FileMeteringConfig struct { + Path string `json:"path,omitempty" mapstructure:"path"` +} + +func (m MeteringConfig) Validate() error { + switch m.Backend { + case "", MeteringFile, MeteringNop, MeteringStderr: + return nil + default: + return fmt.Errorf("metering.backend %q is not one of file|nop|stderr", m.Backend) + } +} diff --git a/config/metering_test.go b/config/metering_test.go new file mode 100644 index 00000000..8e15df2e --- /dev/null +++ b/config/metering_test.go @@ -0,0 +1,31 @@ +package config + +import ( + "strings" + "testing" +) + +func TestMeteringConfigValidate(t *testing.T) { + tests := []struct { + name string + backend MeteringBackend + wantErr bool + }{ + {"empty defaults", "", false}, + {"file", MeteringFile, false}, + {"nop", MeteringNop, false}, + {"stderr", MeteringStderr, false}, + {"unknown", "kafka", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := MeteringConfig{Backend: tt.backend}.Validate() + if (err != nil) != tt.wantErr { + t.Errorf("Validate(%q) err = %v, wantErr=%v", tt.backend, err, tt.wantErr) + } + if tt.wantErr && err != nil && !strings.Contains(err.Error(), "metering.backend") { + t.Errorf("err = %v; want containing 'metering.backend'", err) + } + }) + } +} diff --git a/hypervisor/state_test.go b/hypervisor/state_test.go index d358bc0d..c8b5dbc2 100644 --- a/hypervisor/state_test.go +++ b/hypervisor/state_test.go @@ -9,6 +9,7 @@ import ( "github.com/cocoonstack/cocoon/lock/flock" "github.com/cocoonstack/cocoon/metering" + meteringcapture "github.com/cocoonstack/cocoon/metering/capture" storejson "github.com/cocoonstack/cocoon/storage/json" "github.com/cocoonstack/cocoon/types" ) @@ -41,12 +42,12 @@ func (stubBackendConfig) LogDir() string { panic("LogDir: n func (stubBackendConfig) VMRunDir(string) string { panic("VMRunDir: not implemented in stub") } func (stubBackendConfig) VMLogDir(string) string { panic("VMLogDir: not implemented in stub") } -func newMeteringTestBackend(t *testing.T) (*Backend, *metering.CaptureRecorder) { +func newMeteringTestBackend(t *testing.T) (*Backend, *meteringcapture.Recorder) { t.Helper() dir := t.TempDir() locker := flock.New(filepath.Join(dir, "index.lock")) store := storejson.New[VMIndex](filepath.Join(dir, "index.json"), locker) - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() return &Backend{ Typ: "test-hv", Conf: stubBackendConfig{}, diff --git a/metering/capture.go b/metering/capture.go deleted file mode 100644 index 0a06b614..00000000 --- a/metering/capture.go +++ /dev/null @@ -1,34 +0,0 @@ -package metering - -import ( - "context" - "sync" -) - -// CaptureRecorder accumulates entries in memory; intended for tests that assert emit sequences. -type CaptureRecorder struct { - mu sync.Mutex - entries []Entry -} - -func (r *CaptureRecorder) Emit(_ context.Context, e Entry) { - r.mu.Lock() - defer r.mu.Unlock() - r.entries = append(r.entries, e) -} - -// Entries returns a snapshot copy so callers can mutate freely. -func (r *CaptureRecorder) Entries() []Entry { - r.mu.Lock() - defer r.mu.Unlock() - out := make([]Entry, len(r.entries)) - copy(out, r.entries) - return out -} - -// Reset drops buffered entries. -func (r *CaptureRecorder) Reset() { - r.mu.Lock() - defer r.mu.Unlock() - r.entries = nil -} diff --git a/metering/capture/capture.go b/metering/capture/capture.go new file mode 100644 index 00000000..81dfe1a6 --- /dev/null +++ b/metering/capture/capture.go @@ -0,0 +1,39 @@ +// Package capture is a test-only metering recorder; Entries and Reset are testing helpers. +package capture + +import ( + "context" + "sync" + + "github.com/cocoonstack/cocoon/metering" +) + +var _ metering.Recorder = (*Recorder)(nil) + +type Recorder struct { + mu sync.Mutex + entries []metering.Entry +} + +func New() *Recorder { return &Recorder{} } + +func (r *Recorder) Emit(_ context.Context, e metering.Entry) { + r.mu.Lock() + defer r.mu.Unlock() + r.entries = append(r.entries, e) +} + +// Entries returns a snapshot copy; callers may mutate the returned slice. +func (r *Recorder) Entries() []metering.Entry { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]metering.Entry, len(r.entries)) + copy(out, r.entries) + return out +} + +func (r *Recorder) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + r.entries = nil +} diff --git a/metering/capture/capture_test.go b/metering/capture/capture_test.go new file mode 100644 index 00000000..ab69508a --- /dev/null +++ b/metering/capture/capture_test.go @@ -0,0 +1,57 @@ +package capture + +import ( + "sync" + "testing" + + "github.com/cocoonstack/cocoon/metering" +) + +func TestRecorderBasic(t *testing.T) { + r := New() + ctx := t.Context() + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "a"}) + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStop, VMID: "a"}) + got := r.Entries() + if len(got) != 2 { + t.Fatalf("got %d entries, want 2", len(got)) + } + if got[0].Kind != metering.KindVMComputeStart || got[1].Kind != metering.KindVMComputeStop { + t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind) + } +} + +func TestRecorderEntriesIsCopy(t *testing.T) { + r := New() + r.Emit(t.Context(), metering.Entry{VMID: "a"}) + got := r.Entries() + got[0].VMID = "tampered" + if again := r.Entries(); again[0].VMID != "a" { + t.Errorf("Entries() must return a copy; got %q after mutation", again[0].VMID) + } +} + +func TestRecorderReset(t *testing.T) { + r := New() + r.Emit(t.Context(), metering.Entry{VMID: "a"}) + r.Reset() + if got := r.Entries(); len(got) != 0 { + t.Errorf("after Reset got %d entries, want 0", len(got)) + } +} + +func TestRecorderConcurrent(t *testing.T) { + r := New() + ctx := t.Context() + const N = 200 + var wg sync.WaitGroup + for range N { + wg.Go(func() { + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "vm"}) + }) + } + wg.Wait() + if got := len(r.Entries()); got != N { + t.Errorf("got %d entries, want %d", got, N) + } +} diff --git a/metering/capture_test.go b/metering/capture_test.go deleted file mode 100644 index 489f1af1..00000000 --- a/metering/capture_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package metering - -import ( - "sync" - "testing" -) - -func TestCaptureRecorderBasic(t *testing.T) { - var r CaptureRecorder - ctx := t.Context() - r.Emit(ctx, Entry{Kind: KindVMComputeStart, VMID: "a"}) - r.Emit(ctx, Entry{Kind: KindVMComputeStop, VMID: "a"}) - got := r.Entries() - if len(got) != 2 { - t.Fatalf("got %d entries, want 2", len(got)) - } - if got[0].Kind != KindVMComputeStart || got[1].Kind != KindVMComputeStop { - t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind) - } -} - -func TestCaptureRecorderEntriesIsCopy(t *testing.T) { - // Mutating the returned slice must not affect subsequent reads. - var r CaptureRecorder - r.Emit(t.Context(), Entry{VMID: "a"}) - got := r.Entries() - got[0].VMID = "tampered" - if again := r.Entries(); again[0].VMID != "a" { - t.Errorf("Entries() must return a copy; got %q after mutation", again[0].VMID) - } -} - -func TestCaptureRecorderConcurrent(t *testing.T) { - var r CaptureRecorder - ctx := t.Context() - const N = 200 - var wg sync.WaitGroup - for range N { - wg.Go(func() { - r.Emit(ctx, Entry{Kind: KindVMComputeStart, VMID: "vm"}) - }) - } - wg.Wait() - if got := len(r.Entries()); got != N { - t.Errorf("got %d entries, want %d", got, N) - } -} diff --git a/metering/file.go b/metering/file.go deleted file mode 100644 index 028cf3e9..00000000 --- a/metering/file.go +++ /dev/null @@ -1,41 +0,0 @@ -package metering - -import ( - "context" - "encoding/json" - "os" - "sync" - - "github.com/projecteru2/core/log" -) - -// FileRecorder appends JSON-encoded entries one per line under sync.Mutex; cross-process atomicity comes from O_APPEND. -type FileRecorder struct { - mu sync.Mutex - f *os.File -} - -// NewFileRecorder opens path append-only; returns NopRecorder on failure. -func NewFileRecorder(ctx context.Context, path string) Recorder { - f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) //nolint:gosec // internal runtime path - if err != nil { - log.WithFunc("metering.NewFileRecorder").Warnf(ctx, "open %s: %v; metering disabled", path, err) - return NopRecorder{} - } - return &FileRecorder{f: f} -} - -// Emit logs and swallows write errors to never block callers. -func (r *FileRecorder) Emit(ctx context.Context, e Entry) { - data, err := json.Marshal(e) - if err != nil { - log.WithFunc("metering.FileRecorder.Emit").Warnf(ctx, "marshal entry: %v", err) - return - } - data = append(data, '\n') - r.mu.Lock() - defer r.mu.Unlock() - if _, err := r.f.Write(data); err != nil { - log.WithFunc("metering.FileRecorder.Emit").Warnf(ctx, "write entry: %v", err) - } -} diff --git a/metering/file/file.go b/metering/file/file.go new file mode 100644 index 00000000..82699bb0 --- /dev/null +++ b/metering/file/file.go @@ -0,0 +1,37 @@ +package file + +import ( + "context" + "fmt" + "os" + "sync" + + "github.com/projecteru2/core/log" + + "github.com/cocoonstack/cocoon/metering" +) + +var _ metering.Recorder = (*Recorder)(nil) + +// Recorder appends entries under sync.Mutex; O_APPEND gives cross-process atomicity. +type Recorder struct { + mu sync.Mutex + f *os.File +} + +func New(path string) (*Recorder, error) { + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) //nolint:gosec + if err != nil { + return nil, fmt.Errorf("open ledger %s: %w", path, err) + } + return &Recorder{f: f}, nil +} + +// Emit swallows write errors so callers never block. +func (r *Recorder) Emit(ctx context.Context, e metering.Entry) { + r.mu.Lock() + defer r.mu.Unlock() + if _, err := e.WriteTo(r.f); err != nil { + log.WithFunc("metering/file.Recorder.Emit").Warnf(ctx, "emit entry: %v", err) + } +} diff --git a/metering/file_test.go b/metering/file/file_test.go similarity index 53% rename from metering/file_test.go rename to metering/file/file_test.go index 74ef2b28..5dc911ad 100644 --- a/metering/file_test.go +++ b/metering/file/file_test.go @@ -1,4 +1,4 @@ -package metering +package file import ( "bufio" @@ -8,32 +8,37 @@ import ( "sync" "testing" "time" + + "github.com/cocoonstack/cocoon/metering" ) -func TestFileRecorderRoundTrip(t *testing.T) { +func TestRecorderRoundTrip(t *testing.T) { ctx := t.Context() path := filepath.Join(t.TempDir(), "ledger.jsonl") - r := NewFileRecorder(ctx, path) + r, err := New(path) + if err != nil { + t.Fatalf("New: %v", err) + } now := time.Now().UTC().Truncate(time.Microsecond) - r.Emit(ctx, Entry{ + r.Emit(ctx, metering.Entry{ EmittedAt: now, - Kind: KindVMComputeStart, + Kind: metering.KindVMComputeStart, VMID: "vm1", - Reason: ReasonBoot, - Shape: Shape{CPU: 4, MemBytes: 1 << 30}, + Reason: metering.ReasonBoot, + Shape: metering.Shape{CPU: 4, MemBytes: 1 << 30}, }) - r.Emit(ctx, Entry{ + r.Emit(ctx, metering.Entry{ EmittedAt: now.Add(time.Second), - Kind: KindVMComputeStop, + Kind: metering.KindVMComputeStop, VMID: "vm1", - Reason: ReasonStopUser, + Reason: metering.ReasonStopUser, }) got := readEntries(t, path) if len(got) != 2 { t.Fatalf("got %d entries, want 2", len(got)) } - if got[0].Kind != KindVMComputeStart || got[1].Kind != KindVMComputeStop { + if got[0].Kind != metering.KindVMComputeStart || got[1].Kind != metering.KindVMComputeStop { t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind) } if got[0].Shape.CPU != 4 { @@ -41,15 +46,18 @@ func TestFileRecorderRoundTrip(t *testing.T) { } } -func TestFileRecorderConcurrent(t *testing.T) { +func TestRecorderConcurrent(t *testing.T) { ctx := t.Context() path := filepath.Join(t.TempDir(), "ledger.jsonl") - r := NewFileRecorder(ctx, path) + r, err := New(path) + if err != nil { + t.Fatalf("New: %v", err) + } const N = 200 var wg sync.WaitGroup for i := range N { wg.Go(func() { - r.Emit(ctx, Entry{Kind: KindVMComputeStart, VMID: "vm", Shape: Shape{CPU: i}}) + r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "vm", Shape: metering.Shape{CPU: i}}) }) } wg.Wait() @@ -60,15 +68,14 @@ func TestFileRecorderConcurrent(t *testing.T) { } } -func TestNewFileRecorderFallback(t *testing.T) { - // Parent dir doesn't exist → OpenFile fails → fallback to NopRecorder. - r := NewFileRecorder(t.Context(), filepath.Join(t.TempDir(), "missing-subdir", "ledger.jsonl")) - if _, ok := r.(NopRecorder); !ok { - t.Errorf("got %T, want NopRecorder", r) +func TestNewMissingParentDirErrors(t *testing.T) { + _, err := New(filepath.Join(t.TempDir(), "missing-subdir", "ledger.jsonl")) + if err == nil { + t.Error("expected error opening ledger in missing dir; got nil") } } -func readEntries(t *testing.T, path string) []Entry { +func readEntries(t *testing.T, path string) []metering.Entry { t.Helper() f, err := os.Open(path) //nolint:gosec // test-controlled path if err != nil { @@ -76,10 +83,10 @@ func readEntries(t *testing.T, path string) []Entry { } defer f.Close() //nolint:errcheck - var out []Entry + var out []metering.Entry sc := bufio.NewScanner(f) for sc.Scan() { - var e Entry + var e metering.Entry if err := json.Unmarshal(sc.Bytes(), &e); err != nil { t.Fatalf("unmarshal %q: %v", sc.Text(), err) } diff --git a/metering/metering.go b/metering/metering.go index 23612d75..3447a184 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -1,11 +1,15 @@ -// Package metering emits append-only VM/snapshot lifecycle endpoints; tenant attribution lives upstream. +// Package metering emits append-only VM/snapshot lifecycle endpoints; tenant attribution lives upstream. Recorder is the contract; backends live in subpackages (file, stderr, capture). package metering import ( "context" + "encoding/json" + "io" "time" ) +var _ io.WriterTo = Entry{} + // Kind identifies a lifecycle endpoint; downstream pairs *.start with *.stop by id. type Kind string @@ -50,6 +54,17 @@ type Entry struct { EmittedAt time.Time `json:"emitted_at"` } +// WriteTo writes one JSONL record. +func (e Entry) WriteTo(w io.Writer) (int64, error) { + data, err := json.Marshal(e) + if err != nil { + return 0, err + } + data = append(data, '\n') + n, err := w.Write(data) + return int64(n), err +} + // Recorder accepts lifecycle entries; implementations must be safe for concurrent use. type Recorder interface { Emit(context.Context, Entry) diff --git a/metering/metering_test.go b/metering/metering_test.go index be34c77b..1a4607b6 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -1,7 +1,9 @@ package metering import ( + "bytes" "encoding/json" + "errors" "testing" "time" ) @@ -51,5 +53,39 @@ func TestKindWireFormat(t *testing.T) { func TestNopRecorder(t *testing.T) { var r NopRecorder r.Emit(t.Context(), Entry{Kind: KindVMComputeStart, VMID: "x"}) - // no panic, no state — only assertion is "does not crash" } + +func TestEntryWriteToProducesJSONLine(t *testing.T) { + e := Entry{Kind: KindVMComputeStart, VMID: "vm1", Shape: Shape{CPU: 2}} + var buf bytes.Buffer + n, err := e.WriteTo(&buf) + if err != nil { + t.Fatalf("WriteTo: %v", err) + } + if int(n) != buf.Len() { + t.Errorf("n=%d, want %d", n, buf.Len()) + } + if buf.Bytes()[buf.Len()-1] != '\n' { + t.Errorf("missing trailing newline; got %q", buf.String()) + } + var got Entry + if err := json.Unmarshal(buf.Bytes()[:buf.Len()-1], &got); err != nil { + t.Fatalf("unmarshal %q: %v", buf.String(), err) + } + if got.Kind != e.Kind || got.VMID != e.VMID || got.Shape.CPU != e.Shape.CPU { + t.Errorf("got %+v, want %+v", got, e) + } +} + +func TestEntryWriteToPropagatesWriterError(t *testing.T) { + sentinel := errors.New("boom") + e := Entry{Kind: KindVMComputeStart, VMID: "vm1"} + _, err := e.WriteTo(errWriter{err: sentinel}) + if !errors.Is(err, sentinel) { + t.Errorf("got err %v, want sentinel %v", err, sentinel) + } +} + +type errWriter struct{ err error } + +func (w errWriter) Write([]byte) (int, error) { return 0, w.err } diff --git a/metering/stderr/stderr.go b/metering/stderr/stderr.go new file mode 100644 index 00000000..cc56989c --- /dev/null +++ b/metering/stderr/stderr.go @@ -0,0 +1,26 @@ +package stderr + +import ( + "context" + "io" + "os" + "sync" + + "github.com/cocoonstack/cocoon/metering" +) + +var _ metering.Recorder = (*Recorder)(nil) + +// Recorder writes entries to os.Stderr; dev/debug only. +type Recorder struct { + mu sync.Mutex + out io.Writer +} + +func New() *Recorder { return &Recorder{out: os.Stderr} } + +func (r *Recorder) Emit(_ context.Context, e metering.Entry) { + r.mu.Lock() + defer r.mu.Unlock() + _, _ = e.WriteTo(r.out) +} diff --git a/metering/stderr/stderr_test.go b/metering/stderr/stderr_test.go new file mode 100644 index 00000000..b201a29a --- /dev/null +++ b/metering/stderr/stderr_test.go @@ -0,0 +1,47 @@ +package stderr + +import ( + "bytes" + "encoding/json" + "strings" + "sync" + "testing" + + "github.com/cocoonstack/cocoon/metering" +) + +func TestRecorderEmitsJSONLine(t *testing.T) { + var buf bytes.Buffer + r := &Recorder{out: &buf} + r.Emit(t.Context(), metering.Entry{ + Kind: metering.KindVMComputeStart, + VMID: "vm1", + Hypervisor: "test", + Shape: metering.Shape{CPU: 2}, + }) + line := strings.TrimRight(buf.String(), "\n") + var got metering.Entry + if err := json.Unmarshal([]byte(line), &got); err != nil { + t.Fatalf("emitted line is not valid JSON %q: %v", line, err) + } + if got.Kind != metering.KindVMComputeStart || got.VMID != "vm1" { + t.Errorf("got %+v", got) + } +} + +func TestRecorderConcurrent(t *testing.T) { + var buf bytes.Buffer + r := &Recorder{out: &buf} + const N = 100 + var wg sync.WaitGroup + for i := range N { + wg.Go(func() { + r.Emit(t.Context(), metering.Entry{Kind: metering.KindVMComputeStart, Shape: metering.Shape{CPU: i}}) + }) + } + wg.Wait() + lines := strings.Split(strings.TrimRight(buf.String(), "\n"), "\n") + if len(lines) != N { + t.Errorf("got %d lines, want %d", len(lines), N) + } +} diff --git a/snapshot/localfile/gc_test.go b/snapshot/localfile/gc_test.go index f81da45e..069542e0 100644 --- a/snapshot/localfile/gc_test.go +++ b/snapshot/localfile/gc_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cocoonstack/cocoon/metering" + meteringcapture "github.com/cocoonstack/cocoon/metering/capture" "github.com/cocoonstack/cocoon/snapshot" "github.com/cocoonstack/cocoon/types" ) @@ -367,7 +368,7 @@ func TestGCModule_EvictRealRecordEmitsSnapStorageStop(t *testing.T) { } } - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() mod := gcModule(lf.conf, lf.store, lf.locker, EvictionPolicy{Enabled: true}, rec) snap, err := mod.ReadDB(ctx) if err != nil { @@ -424,7 +425,7 @@ func TestGCModule_OrphanAndStalePendingDoNotEmit(t *testing.T) { t.Fatal(err) } - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() mod := gcModule(lf.conf, lf.store, lf.locker, EvictionPolicy{}, rec) snap, err := mod.ReadDB(ctx) if err != nil { diff --git a/snapshot/localfile/localfile_test.go b/snapshot/localfile/localfile_test.go index f2ea4c58..6adc48c7 100644 --- a/snapshot/localfile/localfile_test.go +++ b/snapshot/localfile/localfile_test.go @@ -15,6 +15,7 @@ import ( "github.com/cocoonstack/cocoon/config" "github.com/cocoonstack/cocoon/metering" + meteringcapture "github.com/cocoonstack/cocoon/metering/capture" "github.com/cocoonstack/cocoon/snapshot" "github.com/cocoonstack/cocoon/types" "github.com/cocoonstack/cocoon/utils" @@ -90,7 +91,7 @@ func TestNew_NilConfig(t *testing.T) { // Create func TestCreateAndDeleteEmitMetering(t *testing.T) { - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() lf := newTestLFWithRecorder(t, rec) ctx := t.Context() @@ -132,7 +133,7 @@ func TestDeleteOneIdempotentDoesNotEmitTwice(t *testing.T) { // phantom snap.storage.stop with an empty Hypervisor field. We exercise this // by calling deleteOne twice on the same id (idempotent), simulating the // loser running its loop body after the winner already committed. - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() lf := newTestLFWithRecorder(t, rec) ctx := t.Context() @@ -176,7 +177,7 @@ func kinds(entries []metering.Entry) []metering.Kind { } func TestImportEmitsSnapStorageStart(t *testing.T) { - rec := &metering.CaptureRecorder{} + rec := meteringcapture.New() lf := newTestLFWithRecorder(t, rec) ctx := t.Context()