Skip to content
Merged
9 changes: 5 additions & 4 deletions .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.22.7
go-version: 1.25.5

- name: Build
run: go build -v ./...

- name: Run tests
run: |
go test -v ..\..\pkg\consumer\
go test -v ..\..\pkg\producer\
run: go test -v ./pkg/consumer/... ./pkg/producer/...

- name: Run all tests
run: go test -v ./...
288 changes: 201 additions & 87 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,136 +9,250 @@

## Description

Redsumer is a GO library that provides a simple way to consume and produce messages from a Redis Stream. It is designed to be used in a microservices architecture, where a service needs to consume messages from a Redis Stream and process them. It is built on top of the [go-redis]("https://github.com/redis/go-redis") library.
Redsumer is a Go library that abstracts Redis Stream consumption. It provides horizontal scalability, adaptive protection against Redis overload when the queue is idle or stalled, priority for new messages when the PEL is not progressing, and a simple contract: the library handles infrastructure, the user handles business logic.

## Installation
Built on top of [valkey-go](https://github.com/valkey-io/valkey-go).

## Features

Use the package manager [go get](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) to install Redsumer.
- **Adaptive ratio** — interleaves new messages and PEL processing at a configurable ratio; automatically reduces PEL attempt frequency when the PEL is stalled
- **Adaptive backoff** — exponential-style wait when the queue is completely empty, configurable via a slice of durations
- **PEL stall detection** — compares PEL size across full XAUTOCLAIM traversals and adjusts both ratio and wait accordingly
- **Blocking XREADGROUP** — efficient wait for new messages without busy-polling
- **Auto group creation and recreation** — creates the consumer group on startup; recreates it automatically if deleted at runtime
- **Context-aware sleeps** — all waits respect context cancellation for clean shutdown
- **Horizontal scaling** — multiple instances under the same consumer group, Redis guarantees each message is delivered to exactly one instance
- **Observability** — `Stats()` exposes current adaptive state indices

## Installation

```bash
go get github.com/enerBit/redsumer
go get github.com/enerBit/redsumer/v4
```

## Usage

### Consuming messages from a Redis Stream
## Consumer usage

```golang
```go
package main

import (
"context"
"context"
"fmt"
"time"
"log"

"github.com/enerBit/redsumer/v3/pkg/client"
"github.com/enerBit/redsumer/v3/pkg/consumer"
"github.com/enerBit/redsumer/v4/pkg/client"
"github.com/enerBit/redsumer/v4/pkg/consumer"
)

func main() {
// Redis client configuration
redisArgs := client.RedisArgs{
RedisHost: "localhost",
RedisPort: 6379,
Db: 0,
}
ctx := context.Background()

c := &consumer.Consumer{
// Redis connection
Client: &client.ClientArgs{
Host: "localhost",
Port: "6379",
},

var claimBatchSize int64 = 1
var pendingBatchSize int64 = 1
consumerArgs := consumer.ConsumerArgs{
// stream, group and consumer names
StreamName: "stream_name",
GroupName: "group_name",
ConsumerName: "consumer_name",
// batch of messages to new messages
BatchSize: 1,
// batch of messages to claim, if is nil, it will dont claim messages
ClaimBatchSize: &claimBatchSize,
// batch of messages to pending, if is nil, it will dont pending messages
PendingBatchSize: &pendingBatchSize,
// time to block the connection
Block: time.Millisecond * 1,
// MinDurationToClaim is the minimum time that a message must be in the pending state to be claimed
MinDurationToClaim: time.Second * 1,
// IdleStillMine is the time that a message is still mine after the last ack
IdleStillMine: 0,
// MaxTries is the maximum number of tries to wait for the stream to be created
Tries: []int{1, 2, 3, 10, 15},
// Stream identity
StreamName: "my-stream",
GroupName: "my-group",
ConsumerName: consumer.DefaultConsumerName(), // hostname-pid; must be unique per instance

// How long to wait for new messages before triggering PEL (ms)
BlockMs: 2000,

// XAUTOCLAIM settings
ClaimMinIdleMs: 30000, // claim messages idle for > 30s
ClaimBatch: 10,

// XREADGROUP batch size
BatchSize: 10,

// Idle threshold for StillMine check (ms)
IdleStillMine: 5000,

// Retries while waiting for the stream to exist (seconds between each)
Tries: []int{1, 2, 5, 10},

// Adaptive slices — index 0 is the healthy-state value;
// index advances when the PEL is stalled, resets when it progresses.
RatioSlice: []int{5, 10, 20, 50}, // new messages per PEL batch
PelWaitSlice: []int{0, 1, 5, 30}, // seconds before each PEL attempt
BackoffSlice: []int{1, 2, 5, 10, 30}, // seconds when queue is completely empty
}

ctx := context.Background()
// Create a new consumer
consumerClient, err := consumer.NewConsumer(ctx, redisArgs, consumerArgs)
if err != nil {
panic(err)
}
if err := c.InitConsumer(ctx); err != nil {
log.Fatal(err)
}

for {
// Consume messages, get messages news, pending and claimed
messages, err := consumerClient.Consume(ctx)
messages, err := c.Consume(ctx)
if err != nil {
log.Println("consume error:", err)
continue
}

for _, msg := range messages {
// Optional: verify the message is still assigned to this consumer
if ok, _ := c.StillMine(ctx, msg.ID); !ok {
fmt.Println("message reclaimed by another consumer:", msg.ID)
continue
}

fmt.Println("processing:", msg.ID, msg.Values)

// Acknowledge when processing is complete
if err := c.AcknowledgeMessage(ctx, msg.ID); err != nil {
log.Println("ack error:", err)
}
}

// Optional: inspect adaptive state
s := c.Stats()
fmt.Printf("ratioIdx=%d pelWaitIdx=%d backoffIdx=%d prevPelSize=%d\n",
s.RatioIdx, s.PelWaitIdx, s.BackoffIdx, s.PrevPelSize)
}
}
```

### Graceful shutdown

Pass a cancellable context. All sleeps (backoff, PEL wait) will unblock immediately when the context is cancelled.

```go
package main

import (
"context"
"errors"
"github.com/enerBit/redsumer/v4/pkg/consumer"
"log"
"os"
"os/signal"
"syscall"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
// cancel on SIGTERM / SIGINT
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
cancel()
}()
c, err := consumer.New(consumer.Config{
// TODO: fill in configuration
})
if err != nil {
log.Fatal(err)
}
for {
msgs, err := c.Consume(ctx)
if err != nil {
fmt.Println(err)
}
// Process messages
for _, message := range messages {
// Check if the message is still mine
if ok, _ := client.StillMine(ctx, message.ID); !ok {
fmt.Println("Message", message.ID, "is not mine anymore")
continue
if errors.Is(err, context.Canceled) {
break
}
fmt.Println(message.ID, message.Values)
// Acknowledge the message
err = consumerClient.Ack(ctx, message.ID)
if err != nil {
fmt.Println(err)
}
log.Println(err)
continue
}
_ = msgs
// handle msgs ...
}
}

```

### Producing messages to a Redis Stream
## Producer usage

```golang
```go
package main

import (
"context"
"time"
"context"
"log"

"github.com/enerBit/redsumer/v3/pkg/producer"
"github.com/enerBit/redsumer/v4/pkg/client"
"github.com/enerBit/redsumer/v4/pkg/producer"
)

func main() {
// Redis client configuration
redisArgs := producer.RedisArgs{
RedisHost: "localhost",
RedisPort: 6379,
Db: 0,
}
// Producer configuration
producerArgs := producer.ProducerArgs{StreamName: "stream_name"}
ctx := context.Background()
// Create a new producer
producer, err := producer.NewProducer(ctx, redisArgs, producerArgs)
if err != nil {
panic(err)
}
// Produce a message
err = producer.Produce(ctx, map[string]interface{}{
"key": "value",
})

p := &producer.Producer{
Client: &client.ClientArgs{
Host: "localhost",
Port: "6379",
},
}

if err := p.Client.InitClient(ctx); err != nil {
log.Fatal(err)
}

err := p.Produce(ctx, map[string]string{
"event": "order.created",
"id": "42",
}, "my-stream")
if err != nil {
panic(err)
log.Fatal(err)
}
}
```

## Configuration reference

| Field | Type | Description |
|---|---|---|
| `StreamName` | `string` | Redis stream name |
| `GroupName` | `string` | Consumer group name |
| `ConsumerName` | `string` | Unique consumer name per instance (use `DefaultConsumerName()`) |
| `BlockMs` | `int64` | XREADGROUP blocking wait (ms) |
| `ClaimMinIdleMs` | `int64` | Minimum idle time before XAUTOCLAIM reclaims a message (ms) |
| `ClaimBatch` | `int64` | Messages per XAUTOCLAIM batch |
| `BatchSize` | `int64` | Messages per XREADGROUP batch |
| `IdleStillMine` | `int64` | Idle threshold for `StillMine` check (ms) |
| `Tries` | `[]int` | Seconds between retries while waiting for stream existence |
| `RatioSlice` | `[]int` | New messages per PEL batch. Advances when PEL stalls. e.g. `[5, 10, 20, 50]` |
| `PelWaitSlice` | `[]int` | Seconds before each PEL attempt. Advances when PEL stalls. e.g. `[0, 1, 5, 30]` |
| `BackoffSlice` | `[]int` | Seconds when queue is completely empty. e.g. `[1, 2, 5, 10, 30]` |

All slice fields must have at least one element. `BatchSize` and `ClaimBatch` must be > 0. Validation runs in `InitConsumer` before any network connection is attempted.

## Adaptive loop behaviour

Each call to `Consume()` represents one iteration. The caller drives the outer `for` loop.

**Phase 1 — new messages (`XREADGROUP BLOCK`)**
Reads up to `BatchSize` new messages, blocking for up to `BlockMs` ms. Accumulates a counter. When the counter reaches `RatioSlice[ratioIdx]`, the PEL phase runs and the counter resets. If no new messages arrive, the PEL phase runs immediately.

**Phase 2 — PEL (`XAUTOCLAIM`)**
Claims one batch of up to `ClaimBatch` messages that have been idle for at least `ClaimMinIdleMs` ms. A `PelWaitSlice[pelWaitIdx]` second wait is applied before each attempt.

When the cursor wraps back to `0-0` (full PEL traversal complete), the current PEL size is compared with the previous traversal:
- **PEL size changed** → `ratioIdx` and `pelWaitIdx` reset to 0 (PEL is progressing)
- **PEL size unchanged** → both indices advance (PEL is stalled; reduce frequency)

**Phase 3 — backoff**
Applies only when both phases return empty. Sleeps for `BackoffSlice[backoffIdx]` seconds and advances the index. Resets when messages arrive.

## Horizontal scaling

Multiple instances under the same `GroupName` — Redis delivers each message to exactly one instance. The `ConsumerName` must be unique per instance; `DefaultConsumerName()` generates `hostname-pid`.

## What the library does NOT do

- No business logic on messages
- No retry decision — unacked messages stay in the PEL and are reclaimed by XAUTOCLAIM after `ClaimMinIdleMs`
- No internal concurrency — sequential batch processing; parallelism is the user's responsibility
- No DLQ or dead-letter stream
- No state persistence between restarts
- No circuit breaker

## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change. Please make sure to update tests as appropriate.

## License

[MIT](https://choosealicense.com/licenses/mit/)
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/enerBit/redsumer/v3
module github.com/enerBit/redsumer/v4

go 1.22.7
go 1.25.5

require (
github.com/valkey-io/valkey-go v1.0.45
github.com/valkey-io/valkey-go/mock v1.0.45
go.uber.org/mock v0.4.0
github.com/valkey-io/valkey-go v1.0.73
github.com/valkey-io/valkey-go/mock v1.0.73
go.uber.org/mock v0.6.0
)

require golang.org/x/sys v0.24.0 // indirect
require golang.org/x/sys v0.42.0 // indirect
Loading