feat(publishmq): NATS JetStream source with per-account multi-tenancy#910
feat(publishmq): NATS JetStream source with per-account multi-tenancy#910michaeldoehler wants to merge 7 commits into
Conversation
Adds a publish-mq-only driver for NATS JetStream. Outpost reads events from one or more pre-provisioned JetStream consumers via a pull-based multiplexed subscription. Key design points: - Multi-account: one NATS Account per Outpost tenant. Each account gets its own connection and pull loop; messages are merged into a single Receive channel. - Account.TenantID overrides the tenant_id field on incoming payloads, so an Account can only ever produce events for its mapped tenant. - Stream and Consumer are operator-provisioned. Outpost only verifies existence on Init and fails loudly if either is missing. - Auth via credentials_file (.creds, Operator/JWT-resolver mode). - ConcurrentSubscription: pull concurrency is bounded by PullMaxMessages per account; upstream consumer skips its own semaphore. - Publish() is intentionally unimplemented; JetStream is read-only here.
Adds dynamic add/remove of NATS Accounts at runtime via a watched
directory. Layout under accounts_dir:
<account-name>/
user.creds NATS .creds (JWT + NKey seed)
meta.yaml stream/consumer/tenant_id metadata
The watcher debounces filesystem events (250ms) and triggers a
reconcile against the current connection set. Static accounts from
config.Accounts are preserved across reconciles; only dir-derived
accounts are added or removed.
Refactors NATSQueue internals to keep connections in a map keyed by
account name, with safe add/remove that also starts/stops the
per-account pump when a subscription is active.
Adds PublishNATSConfig + PublishNATSAccountConfig to the PublishMQ config, plus GetInfraType / GetQueueConfig branches that map them onto the mqs.NATSConfig the driver expects. Static account lists and the watched accounts_dir can be used independently or combined; the queue treats them as additive.
Adds a small NATS publisher to the local dev publish service, matching the existing rabbitmq/aws_sqs/gcp_pubsub helpers. Reads URL/subject/ stream/consumer/creds from environment with defaults matching the docker-compose example. declareNATS() creates a work-queue stream + durable consumer so a fresh local NATS server is usable in seconds.
Adds four integration tests covering the NATS driver: - TestIntegrationMQ_NATS: basic publish + receive + ack via JetStream - TestIntegrationMQ_NATS_TenantOverride: account.TenantID rewrites the payload's tenant_id field even when payload contains a value - TestIntegrationMQ_NATS_MultiAccount: two accounts consumed in parallel, each tagged with its own tenant_id - TestIntegrationMQ_NATS_AccountsDir: directory watcher picks up an account directory created after Init and starts consuming from it within a few seconds Supporting infrastructure: - internal/util/testinfra/nats.go: nats:2.10-alpine testcontainer with JetStream enabled - internal/util/testutil/nats.go: stream/consumer declare + teardown helpers plus a small publish helper for injecting test events - testinfra.Config.NATSURL + TEST_NATS_URL in .env.test Drive-by: relax NATSAccountConfig validation so credentials_file is optional (no-auth and token-via-URL deployments are legitimate); the accounts-dir loader only defaults to user.creds when the file actually exists in the account directory.
- docs/content/publishing/publish-from-nats.mdoc: new guide covering message structure, prerequisites, configuration (env + yaml), the accounts-dir layout, and the multi-tenancy / NATS-account pattern. - .env.example: PUBLISH_NATS_SERVERS / PUBLISH_NATS_ACCOUNTS_DIR. - .outpost.yaml.example: full publishmq.nats block under publishmq. - contributing/mq.md: tick NATS in the supported-MQ list and add a section describing scope, configuration, infra ownership, and retry/visibility behavior. - examples/docker-compose/compose-publish-nats.yml + helper script: single-node JetStream container for local development, paired with the existing publish dev service (method=nats).
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds NATS JetStream as a publish-mq source, including runtime account discovery via a watched accounts directory, integration test coverage, and developer tooling/docs for local setup.
Changes:
- Implement NATS JetStream queue driver with multi-account support and optional
accounts_dirhot-add/remove. - Add test infrastructure utilities (testcontainers + JetStream provisioning helpers) and integration tests for NATS behaviors.
- Extend publishmq config, docs, and examples to support NATS JetStream.
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/util/testutil/nats.go | Test helper to provision/teardown JetStream streams/consumers and publish test messages. |
| internal/util/testinfra/testinfra.go | Adds TEST_NATS_URL support to test infra config. |
| internal/util/testinfra/nats.go | Testcontainers-based NATS JetStream bring-up + per-test queue config generator. |
| internal/mqs/queue_nats_test.go | Integration tests for NATS driver: basic consume/ack, tenant overrides, multi-account, accounts_dir watcher. |
| internal/mqs/queue_nats.go | New NATS JetStream queue driver implementation with per-account connections and multiplexed subscription. |
| internal/mqs/queue.go | Wires NATS into NewQueue selection logic. |
| internal/mqs/nats_accounts.go | Accounts directory loader + fsnotify watcher with debounce. |
| internal/config/publishmq.go | Adds publishmq NATS config schema and mapping to mqs.QueueConfig. |
| go.mod | Adds direct dependencies for fsnotify and nats.go. |
| go.sum | Adds checksums for new NATS dependencies. |
| examples/docker-compose/start-nats-publish.sh | Convenience script to start a local JetStream container for publishing examples. |
| examples/docker-compose/compose-publish-nats.yml | Docker Compose service definition for local NATS JetStream. |
| docs/content/publishing/publish-from-nats.mdoc | End-user documentation for configuring publish-from-NATS JetStream. |
| contributing/mq.md | Documents NATS JetStream support in contributor MQ guide. |
| cmd/publish/publish_nats.go | Adds dev publish/declare helper for NATS JetStream. |
| cmd/publish/publish_handler.go | Routes method=nats publish requests to NATS implementation. |
| cmd/publish/declare_handler.go | Routes method=nats declare requests to NATS implementation. |
| .outpost.yaml.example | Adds example publishmq NATS config block. |
| .env.test | Adds TEST_NATS_URL placeholder. |
| .env.example | Adds example publishmq NATS env vars. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| jmsg, err := iter.Next() | ||
| if err != nil { | ||
| if errors.Is(err, jetstream.ErrMsgIteratorClosed) { | ||
| return | ||
| } | ||
| continue | ||
| } |
| cfg.NATSURL = endpoint | ||
| cfg.cleanupFns = append(cfg.cleanupFns, func() { | ||
| if err := container.Terminate(ctx); err != nil { | ||
| log.Printf("failed to terminate nats container: %s", err) | ||
| } | ||
| }) |
| accountDir := filepath.Join(dir, e.Name()) | ||
| metaPath := filepath.Join(accountDir, "meta.yaml") | ||
| if _, err := os.Stat(metaPath); err != nil { | ||
| continue |
| defer w.w.Close() | ||
|
|
||
| var ( | ||
| timer *time.Timer | ||
| timerC <-chan time.Time | ||
| armTimer = func() { | ||
| if timer != nil { | ||
| timer.Stop() | ||
| } | ||
| timer = time.NewTimer(debounceWindow) |
| if err := js.DeleteStream(ctx, acc.Stream); err != nil { | ||
| // Best-effort teardown; ignore "not found" so re-runs don't fail. | ||
| if !strings.Contains(err.Error(), "stream not found") { | ||
| return err | ||
| } | ||
| } |
| mockServerURL = "http://" + mockServerURL | ||
| } | ||
| natsURL := v.GetString("TEST_NATS_URL") | ||
| if natsURL != "" && !strings.Contains(natsURL, "nats://") { |
| if _, alreadyHave := current[acc.Name]; alreadyHave { | ||
| continue | ||
| } | ||
| _ = q.addAccount(context.Background(), acc) |
- queue_nats.go pump: add 250ms backoff between non-fatal iter.Next() errors so the loop doesn't busy-spin during transient consumer unavailability (e.g. leadership change, connection blip). - queue_nats.go reconcileFromDir: log addAccount failures instead of silently dropping them, so operators can spot bad creds / missing streams / unreachable servers when a tenant directory lands. - nats_accounts.go loadAccountsFromDir: surface non-ENOENT os.Stat errors (permission, transient IO) instead of treating them as 'subdirectory has no meta.yaml'. - nats_accounts.go watcher: switch to a single reusable timer with Reset/Stop and proper channel drain, eliminating per-event timer allocations and the Stop-without-drain edge case. - testutil/nats.go: replace strings.Contains substring matching on 'stream not found' with errors.Is(err, jetstream.ErrStreamNotFound). - testinfra/nats.go EnsureNATS: move the cfg.NATSURL check inside sync.Once.Do to close a data race between concurrent t.Parallel() callers and the container-start write path. Verified with -race. - testinfra/testinfra.go: use strings.HasPrefix (nats:// and tls://) instead of strings.Contains for the scheme normalization.
|
Thanks for the review. Pushed
On the remaining point:
|
|
Hi @michaeldoehler, thanks for opening the PR, this was quite interesting. My main concern is the complexity around multi-tenancy via multi-account NATS setup. Can you elaborate more on this design decision? Is this a current use case within your system? From Outpost POV, we see the publisher to Outpost as 1 entity (your service), which would then result in 1 publishmq only. I'd love to understand the use case where per-account publish queues are necessary for each tenant? |
Adds NATS JetStream as a publish-mq source for Outpost. Outpost reads
events from one or more pre-provisioned JetStream consumers; streams
and consumers are operator-owned. The driver is built for the common
SaaS pattern of one NATS Account per Outpost tenant.
Why
NATS JetStream is a popular event bus in self-hosted Kubernetes / on-prem
setups, and a natural source for Outpost's webhook-fanout role. The driver
slots into the existing PublishMQ interface alongside RabbitMQ, AWS SQS,
GCP Pub/Sub and Azure Service Bus.
Scope is intentionally limited to publish-mq only — Outpost does not run
its internal delivery/log queues on NATS. That keeps the surface area
small and avoids dragging JetStream into Outpost's auto-provisioning
contract.
Design highlights
+ "PublishNATSAccountConfig" +holds its own credentialsfile and is dialled on its own NATS connection. Pull loops run in
parallel and feed into a single
+ "Subscription.Receive" +channel.+ "tenant_id" +(recommended),Outpost overrides the payload's
+ "tenant_id" +so a publisher with credsfor Account A can only produce events for the mapped tenant.
+ "accounts_dir" +is watched via+ "fsnotify" +with a 250ms debounce. New tenant subdirectories trigger anew connection without restarting Outpost; removed dirs drain the
connection. Provisioning flow: mint NATS account JWT → push to resolver
→ drop
+ "meta.yaml" +++ ".creds" +into+ "accounts_dir" +→ Outpost picks it up.+ "credentials_file" +(.creds) is the primary mode. It isoptional — empty means no-auth (valid for trusted-network setups or
token-via-URL).
before Outpost starts.
+ "Init" +verifies both and fails loudly on anymissing piece.
+ "AckWait" +/+ "MaxDeliver" +are configured on the consumer.+ "Publish()" +intentionally returns an error;JetStream is read-only from Outpost's side.
Configuration
+ "```yaml" +publishmq:
nats:
servers:
- nats://nats:4222
accounts_dir: /etc/outpost/nats-accounts
accounts:
- name: acme
credentials_file: /etc/outpost/acme.creds
stream: events
consumer: outpost
tenant_id: acme
+ "```" +Env vars:
+ "PUBLISH_NATS_SERVERS" +(comma-separated cluster URLs),+ "PUBLISH_NATS_ACCOUNTS_DIR" +.Accounts-dir layout:
+ "```" +/etc/outpost/nats-accounts/
├── acme/
│ ├── user.creds # optional NATS .creds (JWT + NKey seed)
│ └── meta.yaml # name/stream/consumer/tenant_id
└── globex/
├── user.creds
└── meta.yaml
+ "```" +Full guide added at
+ "docs/content/publishing/publish-from-nats.mdoc" +.Commit walk-through
+ "feat(mqs): add NATS JetStream queue driver" +— driver skeleton,+ "NATSConfig" +/+ "NATSAccountConfig" +, pull-consumer subscription, tenantoverride, wired into
+ "mqs.QueueConfig" +and+ "NewQueue" +.+ "feat(mqs): NATS accounts directory watcher" +— refactors connectionstorage to a name-keyed map and adds the
+ "fsnotify" +-based watcherwith dynamic add/remove.
+ "feat(config): wire PublishNATSConfig into publishmq" +— configsurface area plus
+ "GetInfraType" +/+ "GetQueueConfig" +branches.+ "feat(cmd/publish): NATS JetStream dev publish helper" +— adds+ "method=nats" +to the existing publish dev service.+ "test(mqs): NATS JetStream integration tests + testinfra" +— fourintegration tests (basic, tenant-override, multi-account, watcher),
+ "nats:2.10-alpine" +testcontainer, declare/teardown helpers.+ "docs: publish-from-nats guide + env/yaml examples + compose" +—user-facing guide,
+ ".env.example" +,+ ".outpost.yaml.example" +,+ "contributing/mq.md" +update, and a docker-compose example with+ "nats:2.10-alpine" ++ helper script.Verification
+ "go build ./..." +++ "go vet ./..." +clean.+ "go test -run TestIntegrationMQ_NATS -count=1 ./internal/mqs/..." +passes locally (testcontainer
+ "nats:2.10-alpine -js" +):+ "TestIntegrationMQ_NATS" +— basic publish/receive/ack+ "TestIntegrationMQ_NATS_TenantOverride" +— payload tenant_id isoverridden by account config
+ "TestIntegrationMQ_NATS_MultiAccount" +— two accounts consumed inparallel, each tagged with its own tenant
+ "TestIntegrationMQ_NATS_AccountsDir" +— watcher picks up anaccount dir created after Init
Dependencies
Promotes
+ "github.com/nats-io/nats.go" +from indirect to direct (alreadypresent in
+ "go.sum" +via transitive deps; no version bump). Promotes+ "github.com/fsnotify/fsnotify" +from indirect to direct.Out of scope
separate decision around JetStream's storage/replication model.
+ "contributing/mq.md" +).Happy to split commits differently or rework any part — the staging is
designed to keep each commit independently reviewable.