Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions cmd/core/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/cocoonstack/cocoon/config"
"github.com/cocoonstack/cocoon/metering"
meteringfile "github.com/cocoonstack/cocoon/metering/file"
meteringstderr "github.com/cocoonstack/cocoon/metering/stderr"
)

const (
Expand All @@ -22,16 +24,41 @@ var (
meteringRec metering.Recorder
)

// MeteringRecorder returns a process-wide lifecycle recorder; lazy-init shares one ledger fd across all backends, falls back to NopRecorder on fs error.
// MeteringRecorder returns the process-wide recorder per conf.Metering.Backend; lazy-init shared across callers.
func MeteringRecorder(ctx context.Context, conf *config.Config) metering.Recorder {
meteringOnce.Do(func() {
dir := filepath.Join(conf.RootDir, meteringSubdir)
if err := os.MkdirAll(dir, 0o750); err != nil {
log.WithFunc("core.MeteringRecorder").Warnf(ctx, "mkdir %s: %v; metering disabled", dir, err)
meteringRec = metering.NopRecorder{}
return
}
meteringRec = metering.NewFileRecorder(ctx, filepath.Join(dir, meteringFile))
meteringRec = buildRecorder(ctx, conf)
})
return meteringRec
}

func buildRecorder(ctx context.Context, conf *config.Config) metering.Recorder {
switch conf.Metering.Backend {
case "", "file":
return buildFileRecorder(ctx, conf)
case "nop":
return metering.NopRecorder{}
case "stderr":
return meteringstderr.New()
default:
log.WithFunc("core.MeteringRecorder").Warnf(ctx, "unknown metering backend %q; using nop", conf.Metering.Backend)
return metering.NopRecorder{}
}
}

func buildFileRecorder(ctx context.Context, conf *config.Config) metering.Recorder {
path := conf.Metering.File.Path
if path == "" {
path = filepath.Join(conf.RootDir, meteringSubdir, meteringFile)
}
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
log.WithFunc("core.buildFileRecorder").Warnf(ctx, "mkdir %s: %v; metering disabled", filepath.Dir(path), err)
return metering.NopRecorder{}
}
r, err := meteringfile.New(path)
if err != nil {
log.WithFunc("core.buildFileRecorder").Warnf(ctx, "open ledger: %v; metering disabled", err)
return metering.NopRecorder{}
}
return r
}
52 changes: 52 additions & 0 deletions cmd/core/metering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package core

import (
"fmt"
"testing"

"github.com/cocoonstack/cocoon/config"
"github.com/cocoonstack/cocoon/metering"
meteringfile "github.com/cocoonstack/cocoon/metering/file"
meteringstderr "github.com/cocoonstack/cocoon/metering/stderr"
)

func TestBuildRecorderBackends(t *testing.T) {
tests := []struct {
name string
backend config.MeteringBackend
want metering.Recorder
}{
{"empty defaults to file", "", (*meteringfile.Recorder)(nil)},
{"explicit file", config.MeteringFile, (*meteringfile.Recorder)(nil)},
{"nop", config.MeteringNop, metering.NopRecorder{}},
{"stderr", config.MeteringStderr, (*meteringstderr.Recorder)(nil)},
{"unknown falls back to nop", "kafka", metering.NopRecorder{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := &config.Config{
RootDir: t.TempDir(),
Metering: config.MeteringConfig{Backend: tt.backend},
}
got := buildRecorder(t.Context(), conf)
if gotT, wantT := fmt.Sprintf("%T", got), fmt.Sprintf("%T", tt.want); gotT != wantT {
t.Errorf("backend=%q got %s, want %s", tt.backend, gotT, wantT)
}
})
}
}

func TestBuildFileRecorderCustomPath(t *testing.T) {
dir := t.TempDir()
conf := &config.Config{
RootDir: dir,
Metering: config.MeteringConfig{
Backend: config.MeteringFile,
File: config.FileMeteringConfig{Path: dir + "/custom/ledger.jsonl"},
},
}
r := buildRecorder(t.Context(), conf)
if _, ok := r.(*meteringfile.Recorder); !ok {
t.Fatalf("got %T, want *meteringfile.Recorder", r)
}
}
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Config struct {
TerminateGracePeriodSeconds int `json:"terminate_grace_period_seconds" mapstructure:"terminate_grace_period_seconds"`
// Log configuration, uses eru core's ServerLogConfig.
Log *coretypes.ServerLogConfig `json:"log" mapstructure:"log"`
// Metering selects the lifecycle-event recorder backend.
Metering MeteringConfig `json:"metering,omitzero" mapstructure:"metering"`
}

// Hypervisor returns the selected hypervisor backend type.
Expand Down Expand Up @@ -93,6 +95,9 @@ func (c *Config) Validate() error {
if _, err := c.DNSServers(); err != nil {
return fmt.Errorf("dns: %w", err)
}
if err := c.Metering.Validate(); err != nil {
return err
}
return nil
}

Expand Down
32 changes: 32 additions & 0 deletions config/metering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package config

import "fmt"

const (
MeteringFile MeteringBackend = "file"
MeteringNop MeteringBackend = "nop"
MeteringStderr MeteringBackend = "stderr"
)

// MeteringBackend identifies the lifecycle-event recorder backend.
type MeteringBackend string

// MeteringConfig selects the recorder backend; empty Backend defaults to MeteringFile.
type MeteringConfig struct {
Backend MeteringBackend `json:"backend,omitempty" mapstructure:"backend"`
File FileMeteringConfig `json:"file,omitzero" mapstructure:"file"`
}

// FileMeteringConfig parameterizes the file-backend recorder; empty Path resolves to <RootDir>/metering/ledger.jsonl.
type FileMeteringConfig struct {
Path string `json:"path,omitempty" mapstructure:"path"`
}

func (m MeteringConfig) Validate() error {
switch m.Backend {
case "", MeteringFile, MeteringNop, MeteringStderr:
return nil
default:
return fmt.Errorf("metering.backend %q is not one of file|nop|stderr", m.Backend)
}
}
31 changes: 31 additions & 0 deletions config/metering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package config

import (
"strings"
"testing"
)

func TestMeteringConfigValidate(t *testing.T) {
tests := []struct {
name string
backend MeteringBackend
wantErr bool
}{
{"empty defaults", "", false},
{"file", MeteringFile, false},
{"nop", MeteringNop, false},
{"stderr", MeteringStderr, false},
{"unknown", "kafka", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := MeteringConfig{Backend: tt.backend}.Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Validate(%q) err = %v, wantErr=%v", tt.backend, err, tt.wantErr)
}
if tt.wantErr && err != nil && !strings.Contains(err.Error(), "metering.backend") {
t.Errorf("err = %v; want containing 'metering.backend'", err)
}
})
}
}
5 changes: 3 additions & 2 deletions hypervisor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cocoonstack/cocoon/lock/flock"
"github.com/cocoonstack/cocoon/metering"
meteringcapture "github.com/cocoonstack/cocoon/metering/capture"
storejson "github.com/cocoonstack/cocoon/storage/json"
"github.com/cocoonstack/cocoon/types"
)
Expand Down Expand Up @@ -41,12 +42,12 @@ func (stubBackendConfig) LogDir() string { panic("LogDir: n
func (stubBackendConfig) VMRunDir(string) string { panic("VMRunDir: not implemented in stub") }
func (stubBackendConfig) VMLogDir(string) string { panic("VMLogDir: not implemented in stub") }

func newMeteringTestBackend(t *testing.T) (*Backend, *metering.CaptureRecorder) {
func newMeteringTestBackend(t *testing.T) (*Backend, *meteringcapture.Recorder) {
t.Helper()
dir := t.TempDir()
locker := flock.New(filepath.Join(dir, "index.lock"))
store := storejson.New[VMIndex](filepath.Join(dir, "index.json"), locker)
rec := &metering.CaptureRecorder{}
rec := meteringcapture.New()
return &Backend{
Typ: "test-hv",
Conf: stubBackendConfig{},
Expand Down
34 changes: 0 additions & 34 deletions metering/capture.go

This file was deleted.

39 changes: 39 additions & 0 deletions metering/capture/capture.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Package capture is a test-only metering recorder; Entries and Reset are testing helpers.
package capture

import (
"context"
"sync"

"github.com/cocoonstack/cocoon/metering"
)

var _ metering.Recorder = (*Recorder)(nil)

type Recorder struct {
mu sync.Mutex
entries []metering.Entry
}

func New() *Recorder { return &Recorder{} }

func (r *Recorder) Emit(_ context.Context, e metering.Entry) {
r.mu.Lock()
defer r.mu.Unlock()
r.entries = append(r.entries, e)
}

// Entries returns a snapshot copy; callers may mutate the returned slice.
func (r *Recorder) Entries() []metering.Entry {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]metering.Entry, len(r.entries))
copy(out, r.entries)
return out
}

func (r *Recorder) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
r.entries = nil
}
57 changes: 57 additions & 0 deletions metering/capture/capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package capture

import (
"sync"
"testing"

"github.com/cocoonstack/cocoon/metering"
)

func TestRecorderBasic(t *testing.T) {
r := New()
ctx := t.Context()
r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "a"})
r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStop, VMID: "a"})
got := r.Entries()
if len(got) != 2 {
t.Fatalf("got %d entries, want 2", len(got))
}
if got[0].Kind != metering.KindVMComputeStart || got[1].Kind != metering.KindVMComputeStop {
t.Errorf("got kinds %v %v", got[0].Kind, got[1].Kind)
}
}

func TestRecorderEntriesIsCopy(t *testing.T) {
r := New()
r.Emit(t.Context(), metering.Entry{VMID: "a"})
got := r.Entries()
got[0].VMID = "tampered"
if again := r.Entries(); again[0].VMID != "a" {
t.Errorf("Entries() must return a copy; got %q after mutation", again[0].VMID)
}
}

func TestRecorderReset(t *testing.T) {
r := New()
r.Emit(t.Context(), metering.Entry{VMID: "a"})
r.Reset()
if got := r.Entries(); len(got) != 0 {
t.Errorf("after Reset got %d entries, want 0", len(got))
}
}

func TestRecorderConcurrent(t *testing.T) {
r := New()
ctx := t.Context()
const N = 200
var wg sync.WaitGroup
for range N {
wg.Go(func() {
r.Emit(ctx, metering.Entry{Kind: metering.KindVMComputeStart, VMID: "vm"})
})
}
wg.Wait()
if got := len(r.Entries()); got != N {
t.Errorf("got %d entries, want %d", got, N)
}
}
47 changes: 0 additions & 47 deletions metering/capture_test.go

This file was deleted.

Loading
Loading