From 4fd26d1c3490bc7bc8517b842d0b17a7dac85696 Mon Sep 17 00:00:00 2001 From: Raezil Date: Sat, 21 Feb 2026 15:12:58 +0100 Subject: [PATCH] refactor: use Data field in Event for type-safe payloads and fix tests/examples --- README.MD | 14 +-- eventstore.go | 9 +- eventstore_test.go | 88 +++++++++---------- examples/drop_oldest/main.go | 2 +- examples/fasthttp/main.go | 10 +-- .../goroutines-subscribe-publisher/main.go | 4 +- examples/handler_timeout/main.go | 2 +- examples/hello_world/main.go | 4 +- examples/middleware/main.go | 12 +-- examples/publisher/main.go | 10 +-- transaction.go | 2 +- transaction_test.go | 8 +- 12 files changed, 83 insertions(+), 82 deletions(-) diff --git a/README.MD b/README.MD index dcded31..93a2b3f 100644 --- a/README.MD +++ b/README.MD @@ -74,14 +74,14 @@ type HouseWasSold struct{} func main() { // Create a dispatcher mapping projections (string or struct) to handlers dispatcher := GoEventBus.Dispatcher{ - "user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - userID := args["id"].(string) + "user_created": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + userID := ev.Args["id"].(string) fmt.Println("User created with ID:", userID) return GoEventBus.Result{Message: "handled user_created"}, nil }, - HouseWasSold{}: func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - address := args["address"].(string) - price := args["price"].(int) + HouseWasSold{}: func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + address := ev.Args["address"].(string) + price := ev.Args["price"].(int) fmt.Printf("House sold at %s for $%d\n", address, price) return GoEventBus.Result{Message: "handled HouseWasSold"}, nil }, @@ -140,10 +140,10 @@ func main() { // Create a dispatcher mapping projections to handlers dispatcher := GoEventBus.Dispatcher{ - "user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { + "user_created": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { return GoEventBus.Result{Message: "handled user_created"}, nil }, - "send_email": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { + "send_email": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { log.Println("Hello") return GoEventBus.Result{Message: "handled send_email"}, nil }, diff --git a/eventstore.go b/eventstore.go index ef35567..6e23c6f 100644 --- a/eventstore.go +++ b/eventstore.go @@ -35,7 +35,7 @@ const cacheLine = 64 type pad [cacheLine - unsafe.Sizeof(uint64(0))]byte // HandlerFunc is the signature for event handlers and middleware. -type HandlerFunc func(context.Context, map[string]any) (Result, error) +type HandlerFunc func(context.Context, Event) (Result, error) // Middleware wraps a HandlerFunc, returning a new HandlerFunc. type Middleware func(HandlerFunc) HandlerFunc @@ -52,7 +52,8 @@ type Dispatcher map[interface{}]HandlerFunc type Event struct { ID string Projection interface{} - Args map[string]any + Data any // Type-safe payload (preferred) + Args map[string]any // Legacy payload (deprecated) Ctx context.Context // carried context from Subscribe } @@ -220,7 +221,7 @@ func (es *EventStore) execute(h HandlerFunc, ev Event) { if ctx == nil { ctx = context.Background() } - // override with explicit __ctx if set + // override with explicit __ctx if set in legacy Args if c, ok := ev.Args["__ctx"].(context.Context); ok && c != nil { ctx = c } @@ -232,7 +233,7 @@ func (es *EventStore) execute(h HandlerFunc, ev Event) { for i := len(es.middlewares) - 1; i >= 0; i-- { wrapped = es.middlewares[i](wrapped) } - res, err := wrapped(ctx, ev.Args) + res, err := wrapped(ctx, ev) atomic.AddUint64(&es.processedCount, 1) for _, hook := range es.afterHooks { diff --git a/eventstore_test.go b/eventstore_test.go index 036d253..f1ab461 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -21,11 +21,11 @@ var bg = context.Background() func TestSubscribeAndPublish(t *testing.T) { dispatcher := Dispatcher{} var called1, called2 int32 - dispatcher["evt1"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evt1"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called1, 1) return Result{Message: "ok1"}, nil } - dispatcher["evt2"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evt2"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called2, 1) return Result{Message: "ok2"}, nil } @@ -56,7 +56,7 @@ func TestPublishWithMissingHandler(t *testing.T) { func TestPublishMixedExistingAndNonExisting(t *testing.T) { dispatcher := Dispatcher{} var called int32 - dispatcher["evt"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evt"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called, 1) return Result{}, nil } @@ -74,7 +74,7 @@ func TestPublishMixedExistingAndNonExisting(t *testing.T) { func TestOverflowBehavior(t *testing.T) { dispatcher := Dispatcher{} var count uint64 - dispatcher["evtOverflow"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evtOverflow"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil } @@ -104,9 +104,9 @@ func TestOverflowReturnError(t *testing.T) { // TestConcurrentSubscribe ensures safety of concurrent subscriptions. func TestConcurrentSubscribe(t *testing.T) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} var count uint64 - dispatcher["evt"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evt"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil } @@ -131,7 +131,7 @@ func TestConcurrentSubscribe(t *testing.T) { // Benchmarks -------------------------------------------------------------- func BenchmarkSubscribe(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -140,7 +140,7 @@ func BenchmarkSubscribe(b *testing.B) { } func BenchmarkSubscribeParallel(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -150,7 +150,7 @@ func BenchmarkSubscribeParallel(b *testing.B) { } func BenchmarkPublish(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size; i++ { _ = es.Subscribe(bg, Event{ID: "p", Projection: "evt", Args: nil}) @@ -162,7 +162,7 @@ func BenchmarkPublish(b *testing.B) { } func BenchmarkPublishAfterPrefill(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size; i++ { _ = es.Subscribe(bg, Event{ID: "pp", Projection: "evt", Args: nil}) @@ -181,7 +181,7 @@ type LargeStruct struct { } func BenchmarkSubscribeLargePayload(b *testing.B) { - dispatcher := Dispatcher{"evtLarge": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evtLarge": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) var payload LargeStruct @@ -192,7 +192,7 @@ func BenchmarkSubscribeLargePayload(b *testing.B) { } func BenchmarkPublishLargePayload(b *testing.B) { - dispatcher := Dispatcher{"evtLarge": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evtLarge": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) var payload LargeStruct const largeSize = 100 @@ -209,7 +209,7 @@ func BenchmarkPublishLargePayload(b *testing.B) { func TestExactBufferSizeNoOverflow(t *testing.T) { dispatcher := Dispatcher{} var count uint64 - dispatcher["evtExact"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evtExact"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil } @@ -226,7 +226,7 @@ func TestExactBufferSizeNoOverflow(t *testing.T) { func TestOverflowThreshold(t *testing.T) { dispatcher := Dispatcher{} var count uint64 - dispatcher["evtThresh"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evtThresh"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil } @@ -243,7 +243,7 @@ func TestOverflowThreshold(t *testing.T) { func TestPublishIdempotent(t *testing.T) { dispatcher := Dispatcher{} var called int32 - dispatcher["evtOnce"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["evtOnce"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called, 1) return Result{}, nil } @@ -261,7 +261,7 @@ func TestEventStore_AsyncDispatch(t *testing.T) { called := 0 dispatcher := Dispatcher{ - "print": func(_ context.Context, args map[string]any) (Result, error) { + "print": func(_ context.Context, ev Event) (Result, error) { mu.Lock() defer mu.Unlock() called++ @@ -287,7 +287,7 @@ func TestEventStore_AsyncDispatch(t *testing.T) { } func BenchmarkEventStore_Async(b *testing.B) { - dispatcher := Dispatcher{"async": func(_ context.Context, args map[string]any) (Result, error) { return Result{Message: "done"}, nil }} + dispatcher := Dispatcher{"async": func(_ context.Context, ev Event) (Result, error) { return Result{Message: "done"}, nil }} store := NewEventStore(&dispatcher, 1<<16, DropOldest) store.Async = true for i := 0; i < b.N; i++ { @@ -297,7 +297,7 @@ func BenchmarkEventStore_Async(b *testing.B) { } func BenchmarkEventStore_Sync(b *testing.B) { - dispatcher := Dispatcher{"sync": func(_ context.Context, args map[string]any) (Result, error) { return Result{Message: "done"}, nil }} + dispatcher := Dispatcher{"sync": func(_ context.Context, ev Event) (Result, error) { return Result{Message: "done"}, nil }} store := NewEventStore(&dispatcher, 1<<16, DropOldest) store.Async = false for i := 0; i < b.N; i++ { @@ -308,7 +308,7 @@ func BenchmarkEventStore_Sync(b *testing.B) { // FastHTTP benchmarks updated for new API func benchmarkFastHTTP(b *testing.B, async bool) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) es.Async = async @@ -328,7 +328,7 @@ func BenchmarkFastHTTPSync(b *testing.B) { benchmarkFastHTTP(b, false) } func BenchmarkFastHTTPAsync(b *testing.B) { benchmarkFastHTTP(b, true) } func BenchmarkFastHTTPParallel(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) es.Async = true @@ -360,8 +360,8 @@ func TestPublishEmpty(t *testing.T) { func TestArgsPassing(t *testing.T) { dispatcher := Dispatcher{} var received string - dispatcher["echo"] = func(_ context.Context, args map[string]any) (Result, error) { - received = args["foo"].(string) + dispatcher["echo"] = func(_ context.Context, ev Event) (Result, error) { + received = ev.Args["foo"].(string) return Result{}, nil } es := NewEventStore(&dispatcher, 1<<16, DropOldest) @@ -375,13 +375,13 @@ func TestArgsPassing(t *testing.T) { func TestDispatcherSnapshot(t *testing.T) { dispatcher := Dispatcher{} var calledOriginal, calledModified int32 - dispatcher["snap"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["snap"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&calledOriginal, 1) return Result{}, nil } es := NewEventStore(&dispatcher, 1<<16, DropOldest) // swap handler - dispatcher["snap"] = func(_ context.Context, args map[string]any) (Result, error) { + dispatcher["snap"] = func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&calledModified, 1) return Result{}, nil } @@ -396,7 +396,7 @@ func TestDispatcherSnapshot(t *testing.T) { } func TestEventStore_Metrics(t *testing.T) { - dispatcher := Dispatcher{"metric": func(_ context.Context, args map[string]any) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"metric": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} es := NewEventStore(&dispatcher, 1<<16, DropOldest) _ = es.Subscribe(bg, Event{ID: "1", Projection: "metric", Args: nil}) _ = es.Subscribe(bg, Event{ID: "2", Projection: "metric", Args: nil}) @@ -413,8 +413,8 @@ func TestEventStore_Metrics(t *testing.T) { } // helper no‑op handler that increments a counter so we know it was invoked. -func noopHandler(counter *uint64) func(context.Context, map[string]any) (Result, error) { - return func(ctx context.Context, args map[string]any) (Result, error) { +func noopHandler(counter *uint64) func(context.Context, Event) (Result, error) { + return func(ctx context.Context, ev Event) (Result, error) { atomic.AddUint64(counter, 1) return Result{Message: "ok"}, nil } @@ -495,7 +495,7 @@ func TestContextPropagation(t *testing.T) { var received string disp := Dispatcher{ - "ctx": func(c context.Context, _ map[string]any) (Result, error) { + "ctx": func(c context.Context, ev Event) (Result, error) { if v, ok := c.Value(key).(string); ok { received = v } @@ -543,7 +543,7 @@ func TestOverrunPolicyBlockRespectsContext(t *testing.T) { // TestErrorMetrics verifies that handler failures are reflected in Metrics(). func TestErrorMetrics(t *testing.T) { - disp := Dispatcher{"boom": func(_ context.Context, _ map[string]any) (Result, error) { + disp := Dispatcher{"boom": func(_ context.Context, ev Event) (Result, error) { return Result{}, errors.New("boom") }} es := NewEventStore(&disp, 4, DropOldest) @@ -580,7 +580,7 @@ func BenchmarkSubscribeBlockPolicy(b *testing.B) { } func BenchmarkPublishWithErrors(b *testing.B) { - disp := Dispatcher{"err": func(_ context.Context, _ map[string]any) (Result, error) { return Result{}, errors.New("fail") }} + disp := Dispatcher{"err": func(_ context.Context, ev Event) (Result, error) { return Result{}, errors.New("fail") }} es := NewEventStore(&disp, 1<<16, DropOldest) // pre‑populate with events that will all fail @@ -598,8 +598,8 @@ func BenchmarkPublishWithErrors(b *testing.B) { func TestEventStore_SubscribePublish_Sync(t *testing.T) { disp := Dispatcher{} // simple echo handler - disp["echo"] = func(ctx context.Context, args map[string]any) (Result, error) { - return Result{Message: args["msg"].(string)}, nil + disp["echo"] = func(ctx context.Context, ev Event) (Result, error) { + return Result{Message: ev.Args["msg"].(string)}, nil } store := NewEventStore(&disp, 8, DropOldest) e := Event{ID: "1", Projection: "echo", Args: map[string]any{"msg": "hello"}} @@ -616,16 +616,16 @@ func TestEventStore_SubscribePublish_Sync(t *testing.T) { // Test middleware chaining. func TestEventStore_Middleware(t *testing.T) { disp := Dispatcher{} - disp["inc"] = func(ctx context.Context, args map[string]any) (Result, error) { + disp["inc"] = func(ctx context.Context, ev Event) (Result, error) { // return current value return Result{Message: ""}, nil } store := NewEventStore(&disp, 8, DropOldest) // middleware increments counter before handler store.Use(func(next HandlerFunc) HandlerFunc { - return func(ctx context.Context, args map[string]any) (Result, error) { - args["cnt"] = args["cnt"].(int) + 1 - return next(ctx, args) + return func(ctx context.Context, ev Event) (Result, error) { + ev.Args["cnt"] = ev.Args["cnt"].(int) + 1 + return next(ctx, ev) } }) e := Event{ID: "1", Projection: "inc", Args: map[string]any{"cnt": 0}} @@ -643,7 +643,7 @@ func TestEventStore_Middleware(t *testing.T) { func TestEventStore_Hooks(t *testing.T) { disp := Dispatcher{} errorMsg := "handler error" - disp["fail"] = func(ctx context.Context, args map[string]any) (Result, error) { + disp["fail"] = func(ctx context.Context, ev Event) (Result, error) { return Result{}, errors.New(errorMsg) } store := NewEventStore(&disp, 8, DropOldest) @@ -678,7 +678,7 @@ func TestEventStore_Hooks(t *testing.T) { func TestEventStore_SubscribePublishDrainMetrics(t *testing.T) { var processed atomic.Uint64 dispatcher := Dispatcher{ - "testEvent": func(ctx context.Context, args map[string]any) (Result, error) { + "testEvent": func(ctx context.Context, ev Event) (Result, error) { processed.Add(1) return Result{Message: "ok"}, nil }, @@ -725,7 +725,7 @@ func TestEventStore_SubscribePublishDrainMetrics(t *testing.T) { } // noOpHandler is a dummy handler for benchmarks. -func noOpHandler(ctx context.Context, args map[string]any) (Result, error) { +func noOpHandler(ctx context.Context, ev Event) (Result, error) { return Result{}, nil } @@ -776,7 +776,7 @@ func BenchmarkDrainAsync(b *testing.B) { func TestScheduleAfter(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "foo": func(ctx context.Context, args map[string]any) (Result, error) { + "foo": func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil }, @@ -794,7 +794,7 @@ func TestScheduleAfter(t *testing.T) { func TestScheduleAfter_FiresOnce(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "foo": func(ctx context.Context, args map[string]any) (Result, error) { + "foo": func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil }, @@ -812,7 +812,7 @@ func TestScheduleAfter_FiresOnce(t *testing.T) { func TestSchedule_FiresImmediatelyIfPast(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "bar": func(ctx context.Context, args map[string]any) (Result, error) { + "bar": func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil }, @@ -830,7 +830,7 @@ func TestSchedule_FiresImmediatelyIfPast(t *testing.T) { func TestSchedule_FiresAtFutureTime(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "baz": func(ctx context.Context, args map[string]any) (Result, error) { + "baz": func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil }, @@ -853,7 +853,7 @@ func TestSchedule_FiresAtFutureTime(t *testing.T) { func TestSchedule_Cancel(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "qux": func(ctx context.Context, args map[string]any) (Result, error) { + "qux": func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil }, diff --git a/examples/drop_oldest/main.go b/examples/drop_oldest/main.go index b11ec6e..2fb0081 100644 --- a/examples/drop_oldest/main.go +++ b/examples/drop_oldest/main.go @@ -11,7 +11,7 @@ import ( // OverrunPolicy=DropOldest silently discards events. func main() { dispatcher := GoEventBus.Dispatcher{ - "noop": func(ctx context.Context, _ map[string]any) (GoEventBus.Result, error) { + "noop": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { time.Sleep(80 * time.Millisecond) return GoEventBus.Result{}, nil }, diff --git a/examples/fasthttp/main.go b/examples/fasthttp/main.go index 23155e1..7f4d43a 100644 --- a/examples/fasthttp/main.go +++ b/examples/fasthttp/main.go @@ -16,16 +16,16 @@ import ( func main() { // 1) Setup your dispatcher dispatcher := GoEventBus.Dispatcher{ - "sayHello": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { + "sayHello": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { fmt.Printf("[%s] Hello, %s!\n", time.Now().Format(time.StampMilli), - args["name"], + ev.Args["name"], ) return GoEventBus.Result{}, nil }, - "computeSum": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - a := args["a"].(int) - b := args["b"].(int) + "computeSum": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + a := ev.Args["a"].(int) + b := ev.Args["b"].(int) sum := a + b fmt.Printf("[%s] %d + %d = %d\n", time.Now().Format(time.StampMilli), diff --git a/examples/goroutines-subscribe-publisher/main.go b/examples/goroutines-subscribe-publisher/main.go index 144a54a..d84c66d 100644 --- a/examples/goroutines-subscribe-publisher/main.go +++ b/examples/goroutines-subscribe-publisher/main.go @@ -15,10 +15,10 @@ import ( func main() { // 1) Build your dispatcher dispatcher := GoEventBus.Dispatcher{ - "user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { + "user_created": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { select { case <-time.After(500 * time.Millisecond): - fmt.Println("Processed user_created:", args["id"]) + fmt.Println("Processed user_created:", ev.Args["id"]) case <-ctx.Done(): return GoEventBus.Result{}, ctx.Err() } diff --git a/examples/handler_timeout/main.go b/examples/handler_timeout/main.go index f1fd831..467d226 100644 --- a/examples/handler_timeout/main.go +++ b/examples/handler_timeout/main.go @@ -11,7 +11,7 @@ import ( // Handler runs for 150 ms unless its context is cancelled. func main() { dispatcher := GoEventBus.Dispatcher{ - "demo": func(ctx context.Context, _ map[string]any) (GoEventBus.Result, error) { + "demo": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { select { case <-time.After(150 * time.Millisecond): fmt.Println("handler OK") diff --git a/examples/hello_world/main.go b/examples/hello_world/main.go index 4cfc420..f81c572 100644 --- a/examples/hello_world/main.go +++ b/examples/hello_world/main.go @@ -10,8 +10,8 @@ import ( func main() { // 1) Create a dispatcher mapping projections to handlers dispatcher := GoEventBus.Dispatcher{ - "say_hello": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - name := args["name"].(string) + "say_hello": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + name := ev.Args["name"].(string) fmt.Printf("Hello, %s!\n", name) return GoEventBus.Result{Message: "greeted"}, nil }, diff --git a/examples/middleware/main.go b/examples/middleware/main.go index ed1d0ae..516c78f 100644 --- a/examples/middleware/main.go +++ b/examples/middleware/main.go @@ -10,10 +10,10 @@ import ( // Example of a logging middleware that logs before and after handler execution. func loggingMiddleware(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc { - return func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { + return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { start := time.Now() - fmt.Printf("[Middleware] Starting handler, args=%v\n", args) - res, err := next(ctx, args) + fmt.Printf("[Middleware] Starting handler, args=%v\n", ev.Args) + res, err := next(ctx, ev) fmt.Printf("[Middleware] Finished handler in %s, result=%+v, err=%v\n", time.Since(start), res, err) return res, err } @@ -37,13 +37,13 @@ func errorHook(ctx context.Context, ev GoEventBus.Event, err error) { func main() { // 1. Create dispatcher and register handlers disp := GoEventBus.Dispatcher{} - disp["greet"] = func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - name := args["name"].(string) + disp["greet"] = func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + name := ev.Args["name"].(string) msg := fmt.Sprintf("Hello, %s!", name) return GoEventBus.Result{Message: msg}, nil } // A handler that sometimes errors - disp["fail"] = func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { + disp["fail"] = func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { return GoEventBus.Result{}, fmt.Errorf("intentional error") } diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 44607d2..2058c92 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -17,14 +17,14 @@ type HouseWasSold struct{} func main() { dispatcher := GoEventBus.Dispatcher{ - "user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - userID := args["id"].(string) + "user_created": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + userID := ev.Args["id"].(string) fmt.Println("User created with ID:", userID) return GoEventBus.Result{Message: "handled user_created"}, nil }, - HouseWasSold{}: func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { - address := args["address"].(string) - price := args["price"].(int) + HouseWasSold{}: func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + address := ev.Args["address"].(string) + price := ev.Args["price"].(int) fmt.Printf("House sold at %s for $%d\n", address, price) return GoEventBus.Result{Message: "handled HouseWasSold"}, nil }, diff --git a/transaction.go b/transaction.go index 8a94b85..6a4242a 100644 --- a/transaction.go +++ b/transaction.go @@ -67,7 +67,7 @@ func (tx *Transaction) Commit(ctx context.Context) error { } // invoke handler - res, err := wrapped(cctx, ev.Args) + res, err := wrapped(cctx, ev) atomic.AddUint64(&tx.store.processedCount, 1) // after hooks diff --git a/transaction_test.go b/transaction_test.go index 2f2eb9d..ddb8e39 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -9,7 +9,7 @@ import ( // dummy handler just increments a counter func makeCounterHandler(counter *uint64) HandlerFunc { - return func(ctx context.Context, args map[string]any) (Result, error) { + return func(ctx context.Context, ev Event) (Result, error) { atomic.AddUint64(counter, 1) return Result{Message: "ok"}, nil } @@ -56,7 +56,7 @@ func TestTransaction_PartialFailure(t *testing.T) { // handler that errors on the second event cnt := uint64(0) dispatcher := Dispatcher{ - "x": func(ctx context.Context, args map[string]any) (Result, error) { + "x": func(ctx context.Context, ev Event) (Result, error) { i := atomic.AddUint64(&cnt, 1) if i == 2 { return Result{}, errors.New("boom") @@ -83,7 +83,7 @@ func TestTransaction_PartialFailure(t *testing.T) { func BenchmarkTransaction_SyncCommit(b *testing.B) { dispatcher := Dispatcher{ - "p": func(ctx context.Context, args map[string]any) (Result, error) { + "p": func(ctx context.Context, ev Event) (Result, error) { return Result{Message: "ok"}, nil }, } @@ -106,7 +106,7 @@ func BenchmarkTransaction_SyncCommit(b *testing.B) { func BenchmarkTransaction_AsyncCommit(b *testing.B) { dispatcher := Dispatcher{ - "p": func(ctx context.Context, args map[string]any) (Result, error) { + "p": func(ctx context.Context, ev Event) (Result, error) { return Result{Message: "ok"}, nil }, }