diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index 0832890e4..d4dad8ab9 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -325,6 +325,26 @@ components: type: string description: Optional AWS Session Token (for temporary credentials). example: "AQoDYXdzEPT//////////wEXAMPLE..." + CloudflareQueuesConfig: + type: object + required: [account_id, queue_id] + properties: + account_id: + type: string + description: Cloudflare Account ID (32-character hex string). + example: "023e105f4ecef8ad9ca31a8372d0c353" + queue_id: + type: string + description: Cloudflare Queue ID (32-character hex string, not the queue name). + example: "9d7d4cf8a3a14d9aaeb50c3e74e2f4b1" + CloudflareQueuesCredentials: + type: object + required: [api_token] + properties: + api_token: + type: string + description: Cloudflare API Token with the `queues:write` permission scoped to the target account. + example: "v1.0-1234567890abcdef..." RabbitMQConfig: type: object required: [server_url, exchange] @@ -1288,6 +1308,93 @@ components: username: "outpost" password: "secure_password_123" + DestinationCloudflareQueues: + type: object + x-docs-type: "Cloudflare Queues" + required: + [ + id, + type, + topics, + config, + credentials, + created_at, + updated_at, + disabled_at, + ] + properties: + id: + type: string + description: Control plane generated ID or user provided ID for the destination. + example: "des_12345" + type: + type: string + description: Type of the destination. + enum: [cloudflare_queues] + example: "cloudflare_queues" + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + disabled_at: + type: string + format: date-time + nullable: true + description: ISO Date when the destination was disabled, or null if enabled. + example: null + created_at: + type: string + format: date-time + description: ISO Date when the destination was created. + example: "2024-01-01T00:00:00Z" + updated_at: + type: string + format: date-time + description: ISO Date when the destination was last updated. + example: "2024-01-01T00:00:00Z" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } + target: + type: string + description: A human-readable representation of the destination target (Cloudflare Queue ID). Read-only. + readOnly: true + example: "9d7d4cf8a3a14d9aaeb50c3e74e2f4b1" + target_url: + type: string + format: url + nullable: true + description: A URL link to the Cloudflare dashboard queues list for the account. Read-only. + readOnly: true + example: "https://dash.cloudflare.com/023e105f4ecef8ad9ca31a8372d0c353/workers/queues" + example: + id: "des_cf_queues_123" + type: "cloudflare_queues" + topics: ["*"] + disabled_at: null + created_at: "2024-03-10T14:30:00Z" + updated_at: "2024-03-10T14:30:00Z" + config: + account_id: "023e105f4ecef8ad9ca31a8372d0c353" + queue_id: "9d7d4cf8a3a14d9aaeb50c3e74e2f4b1" + credentials: + api_token: "v1.0-1234567890abcdef..." + # Polymorphic Destination Schema (for Responses) Destination: oneOf: @@ -1300,6 +1407,7 @@ components: - $ref: "#/components/schemas/DestinationAzureServiceBus" - $ref: "#/components/schemas/DestinationGCPPubSub" - $ref: "#/components/schemas/DestinationKafka" + - $ref: "#/components/schemas/DestinationCloudflareQueues" discriminator: propertyName: type mapping: @@ -1312,6 +1420,7 @@ components: aws_s3: "#/components/schemas/DestinationAWSS3" gcp_pubsub: "#/components/schemas/DestinationGCPPubSub" kafka: "#/components/schemas/DestinationKafka" + cloudflare_queues: "#/components/schemas/DestinationCloudflareQueues" DestinationCreateWebhook: type: object @@ -1882,6 +1991,69 @@ components: If set, the destination is created in a disabled state with this timestamp. Must not be in the future. Defaults to null (enabled). example: null + DestinationCreateCloudflareQueues: + type: object + x-docs-type: "Cloudflare Queues" + required: [type, topics, config, credentials] + properties: + id: + type: string + description: Optional user-provided ID. A UUID will be generated if empty. + example: "user-provided-id" + type: + type: string + description: Type of the destination. Must be 'cloudflare_queues'. + enum: [cloudflare_queues] + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every attempt. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } + created_at: + type: string + format: date-time + nullable: true + description: >- + Optional override for the creation timestamp. Intended for importing + destinations from another system. Must not be in the future. + **Admin (API key) auth only — sending this with JWT auth returns 403.** + Defaults to the current time when omitted. + example: "2024-02-15T10:00:00Z" + updated_at: + type: string + format: date-time + nullable: true + description: >- + Optional override for the last-updated timestamp. Intended for + importing destinations. Must not be in the future. + **Admin (API key) auth only — sending this with JWT auth returns 403.** + Defaults to created_at when omitted. + example: "2024-02-15T10:00:00Z" + disabled_at: + type: string + format: date-time + nullable: true + description: >- + If set, the destination is created in a disabled state with this + timestamp. Must not be in the future. Defaults to null (enabled). + example: null # Polymorphic Destination Creation Schema (for Request Bodies) DestinationCreate: @@ -1895,6 +2067,7 @@ components: - $ref: "#/components/schemas/DestinationCreateRabbitMQ" - $ref: "#/components/schemas/DestinationCreateGCPPubSub" - $ref: "#/components/schemas/DestinationCreateKafka" + - $ref: "#/components/schemas/DestinationCreateCloudflareQueues" discriminator: propertyName: type mapping: @@ -1907,6 +2080,7 @@ components: aws_s3: "#/components/schemas/DestinationCreateAWSS3" gcp_pubsub: "#/components/schemas/DestinationCreateGCPPubSub" kafka: "#/components/schemas/DestinationCreateKafka" + cloudflare_queues: "#/components/schemas/DestinationCreateCloudflareQueues" # Type-Specific Destination Update Schemas (for Request Bodies) WebhookCredentialsUpdate: @@ -2357,6 +2531,54 @@ components: (must not be in the future) to disable, null to enable, or omit to leave unchanged. example: null + DestinationUpdateCloudflareQueues: + type: object + x-docs-type: "Cloudflare Queues" + # Properties duplicated from DestinationUpdateBase + properties: + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" # account_id/queue_id required here, but PATCH means optional + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" # api_token required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + oneOf: + - type: string + - type: "null" + nullable: true + description: >- + Static key-value pairs merged into event metadata on every attempt. + Uses JSON merge-patch semantics (RFC 7396): send keys to add/update, + null values to delete keys, null for entire field to clear all. + Omit or send {} for no change. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + oneOf: + - type: string + - type: "null" + nullable: true + description: >- + Arbitrary contextual information stored with the destination. + Uses JSON merge-patch semantics (RFC 7396): send keys to add/update, + null values to delete keys, null for entire field to clear all. + Omit or send {} for no change. + example: { "internal-id": "123", "team": "platform" } + disabled_at: + type: string + format: date-time + nullable: true + description: >- + Update the disabled state of the destination. Send a timestamp + (must not be in the future) to disable, null to enable, or omit + to leave unchanged. + example: null # Polymorphic Destination Update Schema (for Request Bodies) DestinationUpdate: @@ -2370,6 +2592,7 @@ components: - $ref: "#/components/schemas/DestinationUpdateGCPPubSub" - $ref: "#/components/schemas/DestinationUpdateRabbitMQ" - $ref: "#/components/schemas/DestinationUpdateKafka" + - $ref: "#/components/schemas/DestinationUpdateCloudflareQueues" # Event Schemas PublishRequest: type: object @@ -2648,6 +2871,8 @@ components: - azure_servicebus - aws_s3 - gcp_pubsub + - kafka + - cloudflare_queues description: Type of destination. example: "webhook" DestinationTypeSchema: @@ -4732,7 +4957,7 @@ paths: required: true schema: type: string - enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub, kafka] + enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub, kafka, cloudflare_queues] description: The type of the destination. get: tags: [Schemas] diff --git a/docs/content/concepts.mdoc b/docs/content/concepts.mdoc index 4d4944a59..ce17f0078 100644 --- a/docs/content/concepts.mdoc +++ b/docs/content/concepts.mdoc @@ -97,6 +97,7 @@ The following destination types are available for your tenants to configure: - [GCP Pub/Sub](/docs/outpost/destinations/gcp-pubsub) - [RabbitMQ (AMQP)](/docs/outpost/destinations/rabbitmq) - [Kafka](/docs/outpost/destinations/kafka) +- [Cloudflare Queues](/docs/outpost/destinations/cloudflare-queues) - Amazon EventBridge (planned) **Hookdeck Outpost** is the same [open-source Outpost](https://github.com/hookdeck/outpost) project, operated on Hookdeck’s infrastructure. We do not maintain a separate hosted fork; what we run tracks the public codebase. diff --git a/docs/content/destinations/cloudflare-queues.mdoc b/docs/content/destinations/cloudflare-queues.mdoc new file mode 100644 index 000000000..9b7961c57 --- /dev/null +++ b/docs/content/destinations/cloudflare-queues.mdoc @@ -0,0 +1,83 @@ +--- +title: "Cloudflare Queues" +description: "Publish events to a Cloudflare Queue via the Cloudflare HTTP API." +--- + +Send events to a [Cloudflare Queue](https://developers.cloudflare.com/queues/) using the Cloudflare HTTP API. + +## Creating a Cloudflare Queues Destination + +```sh +curl '{% $OUTPOST_API_BASE_URL %}/tenants//destinations' \ +--header 'Content-Type: application/json' \ +--header 'Authorization: Bearer ' \ +--data '{ + "type": "cloudflare_queues", + "topics": ["orders"], + "config": { + "account_id": "", + "queue_id": "" + }, + "credentials": { + "api_token": "" + } +}' +``` + +## Configuration + +### Config + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `config.account_id` | string | Yes | Cloudflare Account ID | +| `config.queue_id` | string | Yes | Cloudflare Queue ID | + +### Credentials + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `credentials.api_token` | string | Yes | Cloudflare API Token with `queues:write` permission | + +## Message Format + +Each event is published as a single Cloudflare Queue message with `content_type: "json"`. The message body has this shape: + +```json +{ + "data": , + "metadata": +} +``` + +`metadata` contains system metadata (`event-id`, `topic`, `timestamp`) merged with any custom event metadata. + +### Example Message + +Publishing this event: + +```json +{ + "topic": "orders", + "data": { "order_id": "123", "status": "created" }, + "metadata": { "source": "checkout-service" } +} +``` + +Results in this Cloudflare Queue message body: + +```json +{ + "data": { "order_id": "123", "status": "created" }, + "metadata": { + "event-id": "evt_123", + "topic": "orders", + "timestamp": "2024-01-01T00:00:00Z", + "source": "checkout-service" + } +} +``` + +## Required Permissions + +The Cloudflare API Token must have the `Workers > Queues > Edit` permission (the `queues:write` scope), scoped to the account that owns the target queue. diff --git a/docs/content/nav.json b/docs/content/nav.json index b653a7f67..2493fea62 100644 --- a/docs/content/nav.json +++ b/docs/content/nav.json @@ -71,7 +71,8 @@ "title": "Azure Service Bus" }, { "slug": "destinations/rabbitmq", "title": "RabbitMQ" }, - { "slug": "destinations/kafka", "title": "Apache Kafka" } + { "slug": "destinations/kafka", "title": "Apache Kafka" }, + { "slug": "destinations/cloudflare-queues", "title": "Cloudflare Queues" } ] ] }, diff --git a/docs/content/overview.mdoc b/docs/content/overview.mdoc index 2d92d9c8c..c7ebe54e1 100644 --- a/docs/content/overview.mdoc +++ b/docs/content/overview.mdoc @@ -42,6 +42,7 @@ Outpost delivers events to any of the following destination types: - **[GCP Pub/Sub](/docs/outpost/destinations/gcp-pubsub)** — Publish to a Pub/Sub topic - **[RabbitMQ (AMQP)](/docs/outpost/destinations/rabbitmq)** — Send to a remote RabbitMQ exchange - **[Kafka](/docs/outpost/destinations/kafka)** — Send to a remote Kafka topic +- **[Cloudflare Queues](/docs/outpost/destinations/cloudflare-queues)** — Publish to a Cloudflare Queue via the HTTP API If you'd like to see more destination types added, [open an issue or PR](https://github.com/hookdeck/outpost/issues). diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md new file mode 100644 index 000000000..1769583ae --- /dev/null +++ b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md @@ -0,0 +1,183 @@ +# Cloudflare Queues Configuration Instructions + +[Cloudflare Queues](https://developers.cloudflare.com/queues/) is a global message queue that integrates natively with Cloudflare Workers. It enables you to: + +- Send and receive messages with guaranteed delivery +- Process messages asynchronously using Workers +- Build reliable, distributed architectures +- Scale automatically with no capacity planning + +## Prerequisites + +- **Cloudflare Account**: A Cloudflare account with a Workers Paid plan (required for Queues) +- **Wrangler CLI** (optional): Install via `npm install -g wrangler` for CLI-based setup + +## How to Find Your Account ID + +Your Cloudflare Account ID is required for API access. + +### Via Dashboard + +1. Log in to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Select your account +3. The Account ID is displayed in the URL: `https://dash.cloudflare.com//...` +4. Alternatively, go to **Workers & Pages** > **Overview** and find the Account ID in the right sidebar + +### Via Wrangler CLI + +```bash +# Authenticate with Cloudflare +npx wrangler login + +# List accounts and their IDs +npx wrangler whoami +``` + +## How to Create a Queue + +### Via Dashboard + +1. Log in to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Navigate to **Workers & Pages** > **Queues** +3. Click **Create Queue** +4. Enter a name for your queue +5. Click **Create** +6. Copy the **Queue ID** from the queue details page + +### Via Wrangler CLI + +```bash +# Create a new queue +npx wrangler queues create my-queue + +# List all queues to get the Queue ID +npx wrangler queues list +``` + +The output will show your queue with its ID: + +``` +┌──────────────────────────────────────┬──────────┐ +│ id │ name │ +├──────────────────────────────────────┼──────────┤ +│ 12345678-1234-1234-1234-123456789abc │ my-queue │ +└──────────────────────────────────────┴──────────┘ +``` + +## How to Create an API Token + +You need a Cloudflare API Token with permissions to write to Queues. + +### Via Dashboard + +1. Go to [Cloudflare API Tokens](https://dash.cloudflare.com/profile/api-tokens) +2. Click **Create Token** +3. Select **Create Custom Token** +4. Configure the token: + - **Token name**: e.g., "Outpost Queues Publisher" + - **Permissions**: + - Account > Queues > Edit + - **Account Resources**: + - Include > Your Account (or specific account) +5. Click **Continue to summary** +6. Click **Create Token** +7. Copy the token immediately (it won't be shown again) + +### Permission Details + +The API Token requires the following permission: +- **Account** > **Queues** > **Edit** - This grants `queues:write` access to send messages to queues + +## Configuration + +When configuring your Cloudflare Queues destination, you'll need: + +1. **Account ID**: Your Cloudflare Account ID +2. **Queue ID**: The UUID of your Cloudflare Queue +3. **API Token**: A Cloudflare API Token with Queues write permission + +## Message Format + +Each event is published as a single Cloudflare Queue message with the following request body: + +```json +{ + "body": { + "data": , + "metadata": + }, + "content_type": "json" +} +``` + +`content_type: "json"` tells Cloudflare to deliver the body as a parsed object to consumer Workers. Messages are sent via [`POST /accounts/{account_id}/queues/{queue_id}/messages`](https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/). + +## Testing the Integration + +### Create a Consumer Worker + +To verify messages are being delivered, create a simple consumer Worker: + +```javascript +export default { + async queue(batch, env) { + for (const message of batch.messages) { + console.log('Received message:', JSON.stringify(message.body)); + message.ack(); + } + }, +}; +``` + +Deploy with wrangler.toml: + +```toml +name = "queue-consumer" +main = "src/index.js" + +[[queues.consumers]] +queue = "my-queue" +max_batch_size = 10 +max_batch_timeout = 30 +``` + +### View Queue Metrics + +1. Go to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Navigate to **Workers & Pages** > **Queues** +3. Select your queue +4. View metrics for messages sent, delivered, and acknowledged + +## Troubleshooting + +### Authentication Errors (401) + +- Verify your API Token is correct and hasn't been revoked +- Ensure the token has **Queues > Edit** permission +- Check the token is scoped to the correct account + +### Queue Not Found (404) + +- Verify the Queue ID is correct (it's a UUID, not the queue name) +- Ensure the queue exists in the account associated with your API Token +- Check the Account ID matches where the queue was created + +### Permission Denied (403) + +- Verify your API Token has the **Queues > Edit** permission +- Ensure the token is scoped to the account containing the queue + +### Rate Limiting (429) + +Cloudflare Queues has rate limits. If you encounter rate limiting: +- Implement backoff/retry logic +- Consider batching messages +- Review [Cloudflare Queues limits](https://developers.cloudflare.com/queues/platform/limits/) + +## Additional Resources + +- [Cloudflare Queues Documentation](https://developers.cloudflare.com/queues/) +- [Queues REST API Reference](https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/) +- [Cloudflare API Tokens](https://developers.cloudflare.com/fundamentals/api/get-started/create-token/) +- [Wrangler CLI Documentation](https://developers.cloudflare.com/workers/wrangler/) +- [Queues Pricing](https://developers.cloudflare.com/queues/platform/pricing/) diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json new file mode 100644 index 000000000..1bcf794ed --- /dev/null +++ b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json @@ -0,0 +1,33 @@ +{ + "type": "cloudflare_queues", + "config_fields": [ + { + "key": "account_id", + "type": "text", + "label": "Account ID", + "description": "Your Cloudflare Account ID (32-character hex string, found in the Cloudflare dashboard).", + "required": true + }, + { + "key": "queue_id", + "type": "text", + "label": "Queue ID", + "description": "The ID of your Cloudflare Queue (32-character hex string, not the queue name).", + "required": true + } + ], + "credential_fields": [ + { + "key": "api_token", + "type": "text", + "label": "API Token", + "description": "Cloudflare API Token with queues:write permission", + "required": true, + "sensitive": true + } + ], + "label": "Cloudflare Queues", + "link": "https://developers.cloudflare.com/queues/", + "description": "Send events to Cloudflare Queues, a message queue that integrates natively with Cloudflare Workers.", + "icon": "" +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index bdbe8d81d..c80df7f24 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -6,6 +6,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry/providers/destawss3" "github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs" "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" @@ -147,5 +148,11 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("kafka", kafkaDest) + cloudflareQueues, err := destcfqueues.New(loader, basePublisherOpts) + if err != nil { + return err + } + registry.RegisterProvider("cloudflare_queues", cloudflareQueues) + return nil } diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues.go b/internal/destregistry/providers/destcfqueues/destcfqueues.go new file mode 100644 index 000000000..3729e5157 --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues.go @@ -0,0 +1,319 @@ +package destcfqueues + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "time" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/models" +) + +const ( + cloudflareAPIBaseURL = "https://api.cloudflare.com/client/v4" + providerType = "cloudflare_queues" +) + +// CloudflareQueuesDestination implements the destregistry.Provider interface for Cloudflare Queues. +type CloudflareQueuesDestination struct { + *destregistry.BaseProvider + baseURL string +} + +// Option configures a CloudflareQueuesDestination. +type Option func(*CloudflareQueuesDestination) + +// WithBaseURL overrides the Cloudflare API base URL. Intended for tests +// pointing at an httptest.Server; production code should never set this. +func WithBaseURL(url string) Option { + return func(d *CloudflareQueuesDestination) { + d.baseURL = url + } +} + +// CloudflareQueuesConfig holds the configuration for a Cloudflare Queues destination. +type CloudflareQueuesConfig struct { + AccountID string `json:"account_id" mapstructure:"account_id"` + QueueID string `json:"queue_id" mapstructure:"queue_id"` +} + +// CloudflareQueuesCredentials holds the credentials for authenticating with Cloudflare. +type CloudflareQueuesCredentials struct { + APIToken string `json:"api_token" mapstructure:"api_token"` +} + +var _ destregistry.Provider = (*CloudflareQueuesDestination)(nil) + +// New creates a new CloudflareQueuesDestination provider. +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...Option) (*CloudflareQueuesDestination, error) { + base, err := destregistry.NewBaseProvider(loader, providerType, basePublisherOpts...) + if err != nil { + return nil, err + } + + d := &CloudflareQueuesDestination{ + BaseProvider: base, + baseURL: cloudflareAPIBaseURL, + } + for _, opt := range opts { + opt(d) + } + return d, nil +} + +// Validate validates the destination configuration. +func (d *CloudflareQueuesDestination) Validate(ctx context.Context, destination *models.Destination) error { + _, _, err := d.resolveMetadata(ctx, destination) + if err != nil { + return err + } + return nil +} + +// CreatePublisher creates a new publisher for the destination. +func (d *CloudflareQueuesDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + cfg, creds, err := d.resolveMetadata(ctx, destination) + if err != nil { + return nil, err + } + + httpClient, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{}) + if err != nil { + return nil, err + } + + return &CloudflareQueuesPublisher{ + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), + httpClient: httpClient, + baseURL: d.baseURL, + accountID: cfg.AccountID, + queueID: cfg.QueueID, + apiToken: creds.APIToken, + }, nil +} + +// ComputeTarget returns the target information for display purposes. +func (d *CloudflareQueuesDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + accountID := destination.Config["account_id"] + queueID := destination.Config["queue_id"] + + return destregistry.DestinationTarget{ + Target: queueID, + TargetURL: makeCloudflareQueuesDashboardURL(accountID, queueID), + } +} + +// resolveMetadata validates and resolves the destination configuration and credentials. +func (d *CloudflareQueuesDestination) resolveMetadata(ctx context.Context, destination *models.Destination) (*CloudflareQueuesConfig, *CloudflareQueuesCredentials, error) { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + return &CloudflareQueuesConfig{ + AccountID: destination.Config["account_id"], + QueueID: destination.Config["queue_id"], + }, &CloudflareQueuesCredentials{ + APIToken: destination.Credentials["api_token"], + }, nil +} + +// makeCloudflareQueuesDashboardURL returns the Cloudflare dashboard queues list +// for the account. CF's per-queue dashboard URL uses the queue *name* (not the +// ID we have in config), so we link to the list page rather than risk a 404. +func makeCloudflareQueuesDashboardURL(accountID, queueID string) string { + if accountID == "" || queueID == "" { + return "" + } + return fmt.Sprintf("https://dash.cloudflare.com/%s/workers/queues", accountID) +} + +// CloudflareQueuesPublisher handles publishing events to Cloudflare Queues. +type CloudflareQueuesPublisher struct { + *destregistry.BasePublisher + httpClient *http.Client + baseURL string + accountID string + queueID string + apiToken string +} + +// Close gracefully shuts down the publisher. +func (p *CloudflareQueuesPublisher) Close() error { + p.BasePublisher.StartClose() + return nil +} + +// cloudflareMessageRequest is the body for POST /accounts/{id}/queues/{id}/messages +// (single-message push). See https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/ +type cloudflareMessageRequest struct { + Body messageBody `json:"body"` + ContentType string `json:"content_type"` +} + +// cloudflareAPIResponse represents the response from the Cloudflare API. +type cloudflareAPIResponse struct { + Success bool `json:"success"` + Errors []cloudflareAPIError `json:"errors"` + Messages []string `json:"messages"` + Result *cloudflareResult `json:"result"` +} + +type cloudflareResult struct { + Metadata struct { + Metrics struct { + BacklogBytes int64 `json:"backlog_bytes"` + BacklogCount int64 `json:"backlog_count"` + OldestMessageTimestampMs int64 `json:"oldest_message_timestamp_ms"` + } `json:"metrics"` + } `json:"metadata"` +} + +// cloudflareAPIError represents an error from the Cloudflare API. +type cloudflareAPIError struct { + Code int `json:"code"` + Message string `json:"message"` + DocumentationURL string `json:"documentation_url,omitempty"` + Source *struct { + Pointer string `json:"pointer"` + } `json:"source,omitempty"` +} + +// messageBody is the wrapper Outpost places inside the Cloudflare message body. +type messageBody struct { + Data json.RawMessage `json:"data"` + Metadata map[string]string `json:"metadata"` +} + +// Format builds the HTTP request for publishing a single message to a Cloudflare Queue. +func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Event) (*http.Request, error) { + reqPayload := cloudflareMessageRequest{ + Body: messageBody{ + Data: event.Data, + Metadata: p.BasePublisher.MakeMetadata(event, time.Now()), + }, + ContentType: "json", + } + + payloadBytes, err := json.Marshal(reqPayload) + if err != nil { + return nil, fmt.Errorf("failed to marshal request payload: %w", err) + } + + url := fmt.Sprintf("%s/accounts/%s/queues/%s/messages", p.baseURL, p.accountID, p.queueID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.apiToken) + + return req, nil +} + +// Publish sends an event to Cloudflare Queues. +func (p *CloudflareQueuesPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + req, err := p.Format(ctx, event) + if err != nil { + return nil, err + } + + resp, err := p.httpClient.Do(req) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, providerType, map[string]interface{}{ + "error": err.Error(), + }) + } + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": fmt.Sprintf("failed to read response body: %s", err.Error()), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, providerType, map[string]interface{}{ + "error": fmt.Sprintf("failed to read response body: %s", err.Error()), + }) + } + + statusCode := strconv.Itoa(resp.StatusCode) + + // Any non-2xx is a failure regardless of body parseability. + if resp.StatusCode >= 400 { + var apiResponse cloudflareAPIResponse + errorMsg := fmt.Sprintf("request failed with status %d", resp.StatusCode) + response := map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + } + if json.Unmarshal(bodyBytes, &apiResponse) == nil && len(apiResponse.Errors) > 0 { + errorMsg = apiResponse.Errors[0].Message + response["errors"] = apiResponse.Errors + } + return &destregistry.Delivery{ + Status: "failed", + Code: statusCode, + Response: response, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("cloudflare API error: %s", errorMsg), + providerType, + response, + ) + } + + // 2xx. Try to parse — if the body is unparseable but status is OK, + // trust the status. CF returns valid JSON on success; only weird + // proxies would land us here. + var apiResponse cloudflareAPIResponse + if err := json.Unmarshal(bodyBytes, &apiResponse); err == nil { + if !apiResponse.Success { + errorMsg := "cloudflare API reported success=false" + if len(apiResponse.Errors) > 0 { + errorMsg = apiResponse.Errors[0].Message + } + response := map[string]interface{}{ + "status": resp.StatusCode, + "success": apiResponse.Success, + "errors": apiResponse.Errors, + } + return &destregistry.Delivery{ + Status: "failed", + Code: statusCode, + Response: response, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("cloudflare API error: %s", errorMsg), + providerType, + response, + ) + } + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{ + "status": resp.StatusCode, + }, + }, nil +} diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go new file mode 100644 index 000000000..39c61089a --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go @@ -0,0 +1,265 @@ +package destcfqueues_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// successResponseJSON mirrors a real Cloudflare Queues push success response. +// Captured from CF docs; replace with a recorded fixture once we have live creds. +const successResponseJSON = `{ + "success": true, + "result": { + "metadata": { + "metrics": { + "backlog_bytes": 1024, + "backlog_count": 5, + "oldest_message_timestamp_ms": 1710950954154 + } + } + }, + "messages": [], + "errors": [] +}` + +// Roundtrip guard: if CF's documented success shape ever stops decoding cleanly +// into our types, this fails loudly instead of degrading silently to "failed". +func TestCloudflareAPIResponse_DecodesDocumentedShape(t *testing.T) { + t.Parallel() + var parsed struct { + Success bool `json:"success"` + Result *struct { + Metadata struct { + Metrics struct { + BacklogBytes int64 `json:"backlog_bytes"` + BacklogCount int64 `json:"backlog_count"` + OldestMessageTimestampMs int64 `json:"oldest_message_timestamp_ms"` + } `json:"metrics"` + } `json:"metadata"` + } `json:"result"` + } + require.NoError(t, json.Unmarshal([]byte(successResponseJSON), &parsed)) + assert.True(t, parsed.Success) + require.NotNil(t, parsed.Result) + assert.Equal(t, int64(1024), parsed.Result.Metadata.Metrics.BacklogBytes) +} + +func newPublisher(t *testing.T, serverURL string) *destcfqueues.CloudflareQueuesPublisher { + t.Helper() + var opts []destcfqueues.Option + if serverURL != "" { + opts = append(opts, destcfqueues.WithBaseURL(serverURL)) + } + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil, opts...) + require.NoError(t, err) + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + return publisher.(*destcfqueues.CloudflareQueuesPublisher) +} + +func TestCloudflareQueuesPublisher_Format(t *testing.T) { + t.Parallel() + publisher := newPublisher(t, "") + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithDataMap(map[string]interface{}{ + "order_id": "test-order-123", + "amount": 99.99, + }), + testutil.EventFactory.WithMetadata(map[string]string{ + "source": "test-service", + }), + ) + + req, err := publisher.Format(context.Background(), &event) + require.NoError(t, err) + + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/test-account-id/queues/test-queue-id/messages", req.URL.String()) + assert.Equal(t, "application/json", req.Header.Get("Content-Type")) + assert.Equal(t, "Bearer test-api-token", req.Header.Get("Authorization")) + + bodyBytes, err := io.ReadAll(req.Body) + require.NoError(t, err) + + var payload struct { + Body struct { + Data map[string]interface{} `json:"data"` + Metadata map[string]string `json:"metadata"` + } `json:"body"` + ContentType string `json:"content_type"` + } + require.NoError(t, json.Unmarshal(bodyBytes, &payload)) + + assert.Equal(t, "json", payload.ContentType, "must set content_type=json so CF consumers decode JSON correctly") + assert.Equal(t, "test-order-123", payload.Body.Data["order_id"]) + assert.Equal(t, 99.99, payload.Body.Data["amount"]) + assert.Equal(t, "evt_123", payload.Body.Metadata["event-id"]) + assert.Equal(t, "order.created", payload.Body.Metadata["topic"]) + assert.Equal(t, "test-service", payload.Body.Metadata["source"]) + assert.NotEmpty(t, payload.Body.Metadata["timestamp"]) +} + +func TestCloudflareQueuesPublisher_Publish_Success(t *testing.T) { + t.Parallel() + + var receivedBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, "Bearer test-api-token", r.Header.Get("Authorization")) + assert.True(t, strings.HasSuffix(r.URL.Path, "/accounts/test-account-id/queues/test-queue-id/messages")) + receivedBody, _ = io.ReadAll(r.Body) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(successResponseJSON)) + })) + defer server.Close() + + publisher := newPublisher(t, server.URL) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithDataMap(map[string]interface{}{"order_id": "test-order-123"}), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) + + // Verify server received the right single-message shape (not a batch wrapper). + var sent map[string]interface{} + require.NoError(t, json.Unmarshal(receivedBody, &sent)) + _, hasMessagesWrapper := sent["messages"] + assert.False(t, hasMessagesWrapper, "must not wrap in a batch 'messages' array — single-message endpoint") + assert.Equal(t, "json", sent["content_type"]) + assert.NotNil(t, sent["body"]) +} + +func TestCloudflareQueuesPublisher_Publish_HTTPSuccess(t *testing.T) { + t.Parallel() + + for _, statusCode := range []int{http.StatusOK, http.StatusCreated, http.StatusAccepted} { + statusCode := statusCode + t.Run(http.StatusText(statusCode), func(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + _, _ = w.Write([]byte(successResponseJSON)) + })) + defer server.Close() + + publisher := newPublisher(t, server.URL) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithDataMap(map[string]interface{}{"key": "value"}), + ) + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) + }) + } +} + +func TestCloudflareQueuesPublisher_Publish_APIError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + statusCode int + responseBody string + expectedCode string + }{ + { + name: "401 Unauthorized", + statusCode: http.StatusUnauthorized, + responseBody: `{"success":false,"errors":[{"code":10000,"message":"Authentication error","documentation_url":"https://developers.cloudflare.com/api","source":{"pointer":"/"}}],"messages":[],"result":null}`, + expectedCode: "401", + }, + { + name: "403 Forbidden", + statusCode: http.StatusForbidden, + responseBody: `{"success":false,"errors":[{"code":10001,"message":"Access denied"}],"messages":[],"result":null}`, + expectedCode: "403", + }, + { + name: "404 Not Found", + statusCode: http.StatusNotFound, + responseBody: `{"success":false,"errors":[{"code":10002,"message":"Queue not found"}],"messages":[],"result":null}`, + expectedCode: "404", + }, + { + name: "500 Internal Server Error", + statusCode: http.StatusInternalServerError, + responseBody: `{"success":false,"errors":[{"code":10003,"message":"Internal error"}],"messages":[],"result":null}`, + expectedCode: "500", + }, + { + name: "200 OK but success=false", + statusCode: http.StatusOK, + responseBody: `{"success":false,"errors":[{"code":10004,"message":"Validation error"}],"messages":[],"result":null}`, + expectedCode: "200", + }, + { + name: "unparseable error body still returns failure", + statusCode: http.StatusBadGateway, + responseBody: `bad gateway`, + expectedCode: "502", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.statusCode) + _, _ = w.Write([]byte(tt.responseBody)) + })) + defer server.Close() + + publisher := newPublisher(t, server.URL) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithDataMap(map[string]interface{}{"key": "value"}), + ) + delivery, err := publisher.Publish(context.Background(), &event) + require.Error(t, err) + require.NotNil(t, delivery) + assert.Equal(t, "failed", delivery.Status) + assert.Equal(t, tt.expectedCode, delivery.Code) + }) + } +} + diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go new file mode 100644 index 000000000..32473eb52 --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go @@ -0,0 +1,130 @@ +package destcfqueues_test + +import ( + "context" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCloudflareQueuesDestination_Validate(t *testing.T) { + t.Parallel() + + validDestination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + cloudflareQueuesDestination, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should validate valid destination", func(t *testing.T) { + t.Parallel() + assert.NoError(t, cloudflareQueuesDestination.Validate(context.Background(), &validDestination)) + }) + + t.Run("should validate invalid type", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Type = "invalid" + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "type", validationErr.Errors[0].Field) + assert.Equal(t, "invalid_type", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing account_id", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Config = map[string]string{ + "queue_id": "test-queue-id", + } + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.account_id", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing queue_id", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Config = map[string]string{ + "account_id": "test-account-id", + } + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.queue_id", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing api_token", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Credentials = map[string]string{} + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "credentials.api_token", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) +} + +func TestCloudflareQueuesDestination_ComputeTarget(t *testing.T) { + t.Parallel() + + cloudflareQueuesDestination, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should return queue_id as target and dashboard URL", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "my-account-123", + "queue_id": "my-queue-456", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "my-queue-456", target.Target) + assert.Equal(t, "https://dash.cloudflare.com/my-account-123/workers/queues", target.TargetURL) + }) + + t.Run("should return empty target URL when account_id is missing", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "queue_id": "my-queue-456", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "my-queue-456", target.Target) + assert.Equal(t, "", target.TargetURL) + }) + + t.Run("should return empty target URL when queue_id is missing", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "my-account-123", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "", target.Target) + assert.Equal(t, "", target.TargetURL) + }) +} diff --git a/spec-sdk-tests/factories/destination.factory.ts b/spec-sdk-tests/factories/destination.factory.ts index 99cdb1674..01bc800f9 100644 --- a/spec-sdk-tests/factories/destination.factory.ts +++ b/spec-sdk-tests/factories/destination.factory.ts @@ -143,3 +143,4 @@ export function createGcpPubSubDestination( ...overrides, }; } + diff --git a/spec-sdk-tests/package.json b/spec-sdk-tests/package.json index 01a1cf691..147fe3e41 100644 --- a/spec-sdk-tests/package.json +++ b/spec-sdk-tests/package.json @@ -26,7 +26,7 @@ "author": "Outpost Team", "license": "Apache-2.0", "dependencies": { - "@hookdeck/outpost-sdk": "file:../../../sdks/outpost-typescript" + "@hookdeck/outpost-sdk": "file:../sdks/outpost-typescript" }, "devDependencies": { "@stoplight/spectral-cli": "^6.11.0",