From 0201476b06a79be93951b347a495b51368ccfd50 Mon Sep 17 00:00:00 2001 From: Peep van Puijenbroek Date: Thu, 22 Jan 2026 15:47:30 +0100 Subject: [PATCH 1/5] feat: add Cloudflare Queues destination Add support for publishing events to Cloudflare Queues via their HTTP API. Implements #655 - Add destcfqueues Go provider with Publish, Validate, and ComputeTarget - Add provider metadata (metadata.json and instructions.md) - Update OpenAPI spec with CloudflareQueues schemas - Add user documentation (cloudflare-queues.mdx) - Register provider in default.go - Add validation and publish unit tests - Add SDK test factory and integration tests --- docs/apis/openapi.yaml | 171 +++++++ docs/pages/destinations.mdx | 1 + docs/pages/destinations/cloudflare-queues.mdx | 92 ++++ .../cloudflare_queues/instructions.md | 176 +++++++ .../providers/cloudflare_queues/metadata.json | 33 ++ internal/destregistry/providers/default.go | 7 + .../providers/destcfqueues/destcfqueues.go | 294 ++++++++++++ .../destcfqueues/destcfqueues_publish_test.go | 394 ++++++++++++++++ .../destcfqueues_validate_test.go | 130 ++++++ .../factories/destination.factory.ts | 18 + spec-sdk-tests/package.json | 2 +- .../destinations/cloudflare-queues.test.ts | 440 ++++++++++++++++++ 12 files changed, 1757 insertions(+), 1 deletion(-) create mode 100644 docs/pages/destinations/cloudflare-queues.mdx create mode 100644 internal/destregistry/metadata/providers/cloudflare_queues/instructions.md create mode 100644 internal/destregistry/metadata/providers/cloudflare_queues/metadata.json create mode 100644 internal/destregistry/providers/destcfqueues/destcfqueues.go create mode 100644 internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go create mode 100644 internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go create mode 100644 spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index 39760b654..6ce27e008 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -268,6 +268,25 @@ components: type: string description: Optional AWS Session Token (for temporary credentials). example: "AQoDYXdzEPT//////////wEXAMPLE..." + CloudflareQueuesConfig: + type: object + required: [account_id, queue_id] + properties: + account_id: + type: string + description: Cloudflare Account ID + example: "abc123def456789" + queue_id: + type: string + description: Cloudflare Queue ID + example: "my-queue" + CloudflareQueuesCredentials: + type: object + required: [api_token] + properties: + api_token: + type: string + description: Cloudflare API Token with queues:write permission RabbitMQConfig: type: object required: [server_url, exchange] @@ -1094,6 +1113,92 @@ components: credentials: service_account_json: '{"type":"service_account","project_id":"my-project-123",...}' + DestinationCloudflareQueues: + type: object + required: + [ + id, + type, + topics, + config, + credentials, + created_at, + updated_at, + disabled_at, + ] + properties: + id: + type: string + description: Control plane generated ID or user provided ID for the destination. + example: "des_12345" + type: + type: string + description: Type of the destination. + enum: [cloudflare_queues] + example: "cloudflare_queues" + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + disabled_at: + type: string + format: date-time + nullable: true + description: ISO Date when the destination was disabled, or null if enabled. + example: null + created_at: + type: string + format: date-time + description: ISO Date when the destination was created. + example: "2024-01-01T00:00:00Z" + updated_at: + type: string + format: date-time + description: ISO Date when the destination was last updated. + example: "2024-01-01T00:00:00Z" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } + target: + type: string + description: A human-readable representation of the destination target (Cloudflare Queue ID). Read-only. + readOnly: true + example: "my-queue" + target_url: + type: string + format: url + nullable: true + description: A URL link to the destination target (Cloudflare Dashboard link to the queue). Read-only. + readOnly: true + example: "https://dash.cloudflare.com/abc123def456789/queues/my-queue" + example: + id: "des_cf_queues_123" + type: "cloudflare_queues" + topics: ["*"] + disabled_at: null + created_at: "2024-03-10T14:30:00Z" + updated_at: "2024-03-10T14:30:00Z" + config: + account_id: "abc123def456789" + queue_id: "my-queue" + credentials: + api_token: "cf_token_..." + # Polymorphic Destination Schema (for Responses) Destination: oneOf: @@ -1105,6 +1210,7 @@ components: - $ref: "#/components/schemas/DestinationAzureServiceBus" - $ref: "#/components/schemas/DestinationAWSS3" - $ref: "#/components/schemas/DestinationGCPPubSub" + - $ref: "#/components/schemas/DestinationCloudflareQueues" discriminator: propertyName: type mapping: @@ -1116,6 +1222,7 @@ components: azure_servicebus: "#/components/schemas/DestinationAzureServiceBus" aws_s3: "#/components/schemas/DestinationAWSS3" gcp_pubsub: "#/components/schemas/DestinationGCPPubSub" + cloudflare_queues: "#/components/schemas/DestinationCloudflareQueues" DestinationCreateWebhook: type: object @@ -1391,6 +1498,40 @@ components: nullable: true description: Arbitrary contextual information stored with the destination. example: { "internal-id": "123", "team": "platform" } + DestinationCreateCloudflareQueues: + type: object + required: [type, topics, config, credentials] + properties: + id: + type: string + description: Optional user-provided ID. A UUID will be generated if empty. + example: "user-provided-id" + type: + type: string + description: Type of the destination. Must be 'cloudflare_queues'. + enum: [cloudflare_queues] + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } # Polymorphic Destination Creation Schema (for Request Bodies) DestinationCreate: @@ -1403,6 +1544,7 @@ components: - $ref: "#/components/schemas/DestinationCreateAzureServiceBus" - $ref: "#/components/schemas/DestinationCreateAWSS3" - $ref: "#/components/schemas/DestinationCreateGCPPubSub" + - $ref: "#/components/schemas/DestinationCreateCloudflareQueues" discriminator: propertyName: type mapping: @@ -1414,6 +1556,7 @@ components: azure_servicebus: "#/components/schemas/DestinationCreateAzureServiceBus" aws_s3: "#/components/schemas/DestinationCreateAWSS3" gcp_pubsub: "#/components/schemas/DestinationCreateGCPPubSub" + cloudflare_queues: "#/components/schemas/DestinationCreateCloudflareQueues" # Type-Specific Destination Update Schemas (for Request Bodies) WebhookCredentialsUpdate: @@ -1640,6 +1783,32 @@ components: nullable: true description: Arbitrary contextual information stored with the destination. example: { "internal-id": "123", "team": "platform" } + DestinationUpdateCloudflareQueues: + type: object + # Properties duplicated from DestinationUpdateBase + properties: + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" # account_id/queue_id required here, but PATCH means optional + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" # api_token required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } # Polymorphic Destination Update Schema (for Request Bodies) DestinationUpdate: @@ -1652,6 +1821,7 @@ components: - $ref: "#/components/schemas/DestinationUpdateAzureServiceBus" - $ref: "#/components/schemas/DestinationUpdateAWSS3" - $ref: "#/components/schemas/DestinationUpdateGCPPubSub" + - $ref: "#/components/schemas/DestinationUpdateCloudflareQueues" # Event Schemas PublishRequest: type: object @@ -1908,6 +2078,7 @@ components: - azure_servicebus - aws_s3 - gcp_pubsub + - cloudflare_queues description: Type of destination. example: "webhook" DestinationTypeSchema: diff --git a/docs/pages/destinations.mdx b/docs/pages/destinations.mdx index a902f88ec..41196a0a8 100644 --- a/docs/pages/destinations.mdx +++ b/docs/pages/destinations.mdx @@ -18,6 +18,7 @@ Outpost supports multiple event destination types. Each tenant can have multiple | [Azure Service Bus](/docs/destinations/azure-service-bus) | Send events to Azure Service Bus | | [GCP Pub/Sub](/docs/destinations/gcp-pubsub) | Publish events to Google Cloud Pub/Sub | | [RabbitMQ](/docs/destinations/rabbitmq) | Send events to a RabbitMQ exchange | +| [Cloudflare Queues](/docs/destinations/cloudflare-queues) | Send events to Cloudflare Queues | See the [roadmap](/docs/references/roadmap) for planned destination types. To be eligible as a destination type, it must be asynchronous in nature and not run any business logic. diff --git a/docs/pages/destinations/cloudflare-queues.mdx b/docs/pages/destinations/cloudflare-queues.mdx new file mode 100644 index 000000000..1dab186d7 --- /dev/null +++ b/docs/pages/destinations/cloudflare-queues.mdx @@ -0,0 +1,92 @@ +--- +title: Cloudflare Queues +--- + +Send events to Cloudflare Queues. + +## Configuration + +### Config + +| Field | Type | Required | Description | +| ----- | ---- | -------- | ----------- | +| `config.account_id` | string | Yes | Cloudflare Account ID | +| `config.queue_id` | string | Yes | Queue ID | + +### Credentials + +| Field | Type | Required | Description | +| ----- | ---- | -------- | ----------- | +| `credentials.api_token` | string | Yes | Cloudflare API Token | + +### Example + +```sh +curl --location 'https:///api/v1/tenants//destinations' \ +--header 'Content-Type: application/json' \ +--header 'Authorization: Bearer ' \ +--data '{ + "type": "cloudflare_queues", + "topics": ["orders"], + "config": { + "account_id": "", + "queue_id": "" + }, + "credentials": { + "api_token": "" + } +}' +``` + +## Message Format + +Events are sent to Cloudflare Queues as JSON messages with the following structure: + +```json +{ + "data": , + "metadata": +} +``` + +### Example Message + +If you publish an event: + +```json +{ + "topic": "orders", + "data": { + "order_id": "123", + "status": "created" + }, + "metadata": { + "source": "checkout-service" + } +} +``` + +The message sent to Cloudflare Queues will be: + +```json +{ + "data": { + "order_id": "123", + "status": "created" + }, + "metadata": { + "event-id": "evt_123", + "topic": "orders", + "timestamp": "1704067200", + "source": "checkout-service" + } +} +``` + +The `metadata` field contains system metadata (`event-id`, `topic`, `timestamp`) merged with any custom event metadata. + +## Required Permissions + +The Cloudflare API Token must have the following permission: + +- `queues:write` - Required to send messages to the queue diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md new file mode 100644 index 000000000..0fce38722 --- /dev/null +++ b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md @@ -0,0 +1,176 @@ +# Cloudflare Queues Configuration Instructions + +[Cloudflare Queues](https://developers.cloudflare.com/queues/) is a global message queue that integrates natively with Cloudflare Workers. It enables you to: + +- Send and receive messages with guaranteed delivery +- Process messages asynchronously using Workers +- Build reliable, distributed architectures +- Scale automatically with no capacity planning + +## Prerequisites + +- **Cloudflare Account**: A Cloudflare account with a Workers Paid plan (required for Queues) +- **Wrangler CLI** (optional): Install via `npm install -g wrangler` for CLI-based setup + +## How to Find Your Account ID + +Your Cloudflare Account ID is required for API access. + +### Via Dashboard + +1. Log in to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Select your account +3. The Account ID is displayed in the URL: `https://dash.cloudflare.com//...` +4. Alternatively, go to **Workers & Pages** > **Overview** and find the Account ID in the right sidebar + +### Via Wrangler CLI + +```bash +# Authenticate with Cloudflare +npx wrangler login + +# List accounts and their IDs +npx wrangler whoami +``` + +## How to Create a Queue + +### Via Dashboard + +1. Log in to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Navigate to **Workers & Pages** > **Queues** +3. Click **Create Queue** +4. Enter a name for your queue +5. Click **Create** +6. Copy the **Queue ID** from the queue details page + +### Via Wrangler CLI + +```bash +# Create a new queue +npx wrangler queues create my-queue + +# List all queues to get the Queue ID +npx wrangler queues list +``` + +The output will show your queue with its ID: + +``` +┌──────────────────────────────────────┬──────────┐ +│ id │ name │ +├──────────────────────────────────────┼──────────┤ +│ 12345678-1234-1234-1234-123456789abc │ my-queue │ +└──────────────────────────────────────┴──────────┘ +``` + +## How to Create an API Token + +You need a Cloudflare API Token with permissions to write to Queues. + +### Via Dashboard + +1. Go to [Cloudflare API Tokens](https://dash.cloudflare.com/profile/api-tokens) +2. Click **Create Token** +3. Select **Create Custom Token** +4. Configure the token: + - **Token name**: e.g., "Outpost Queues Publisher" + - **Permissions**: + - Account > Queues > Edit + - **Account Resources**: + - Include > Your Account (or specific account) +5. Click **Continue to summary** +6. Click **Create Token** +7. Copy the token immediately (it won't be shown again) + +### Permission Details + +The API Token requires the following permission: +- **Account** > **Queues** > **Edit** - This grants `queues:write` access to send messages to queues + +## Configuration + +When configuring your Cloudflare Queues destination, you'll need: + +1. **Account ID**: Your Cloudflare Account ID +2. **Queue ID**: The UUID of your Cloudflare Queue +3. **API Token**: A Cloudflare API Token with Queues write permission + +## Message Format + +When events are sent to Cloudflare Queues, each message contains: + +- **body**: The event payload as a JSON object +- **contentType**: Set to `application/json` + +Messages are sent using the [Cloudflare Queues REST API](https://developers.cloudflare.com/api/operations/queue-send-messages). + +## Testing the Integration + +### Create a Consumer Worker + +To verify messages are being delivered, create a simple consumer Worker: + +```javascript +export default { + async queue(batch, env) { + for (const message of batch.messages) { + console.log('Received message:', JSON.stringify(message.body)); + message.ack(); + } + }, +}; +``` + +Deploy with wrangler.toml: + +```toml +name = "queue-consumer" +main = "src/index.js" + +[[queues.consumers]] +queue = "my-queue" +max_batch_size = 10 +max_batch_timeout = 30 +``` + +### View Queue Metrics + +1. Go to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Navigate to **Workers & Pages** > **Queues** +3. Select your queue +4. View metrics for messages sent, delivered, and acknowledged + +## Troubleshooting + +### Authentication Errors (401) + +- Verify your API Token is correct and hasn't been revoked +- Ensure the token has **Queues > Edit** permission +- Check the token is scoped to the correct account + +### Queue Not Found (404) + +- Verify the Queue ID is correct (it's a UUID, not the queue name) +- Ensure the queue exists in the account associated with your API Token +- Check the Account ID matches where the queue was created + +### Permission Denied (403) + +- Verify your API Token has the **Queues > Edit** permission +- Ensure the token is scoped to the account containing the queue + +### Rate Limiting (429) + +Cloudflare Queues has rate limits. If you encounter rate limiting: +- Implement backoff/retry logic +- Consider batching messages +- Review [Cloudflare Queues limits](https://developers.cloudflare.com/queues/platform/limits/) + +## Additional Resources + +- [Cloudflare Queues Documentation](https://developers.cloudflare.com/queues/) +- [Queues REST API Reference](https://developers.cloudflare.com/api/operations/queue-send-messages) +- [Cloudflare API Tokens](https://developers.cloudflare.com/fundamentals/api/get-started/create-token/) +- [Wrangler CLI Documentation](https://developers.cloudflare.com/workers/wrangler/) +- [Queues Pricing](https://developers.cloudflare.com/queues/platform/pricing/) diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json new file mode 100644 index 000000000..fe4e20ab1 --- /dev/null +++ b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json @@ -0,0 +1,33 @@ +{ + "type": "cloudflare_queues", + "config_fields": [ + { + "key": "account_id", + "type": "text", + "label": "Account ID", + "description": "Your Cloudflare Account ID", + "required": true + }, + { + "key": "queue_id", + "type": "text", + "label": "Queue ID", + "description": "The ID of your Cloudflare Queue", + "required": true + } + ], + "credential_fields": [ + { + "key": "api_token", + "type": "text", + "label": "API Token", + "description": "Cloudflare API Token with queues:write permission", + "required": true, + "sensitive": true + } + ], + "label": "Cloudflare Queues", + "link": "https://developers.cloudflare.com/queues/", + "description": "Send events to Cloudflare Queues, a message queue that integrates natively with Cloudflare Workers.", + "icon": "" +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index ae19b0dfb..b47a3a95c 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -6,6 +6,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry/providers/destawss3" "github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs" "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" @@ -138,5 +139,11 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("rabbitmq", rabbitmq) + cloudflareQueues, err := destcfqueues.New(loader, basePublisherOpts) + if err != nil { + return err + } + registry.RegisterProvider("cloudflare_queues", cloudflareQueues) + return nil } diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues.go b/internal/destregistry/providers/destcfqueues/destcfqueues.go new file mode 100644 index 000000000..964001ed9 --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues.go @@ -0,0 +1,294 @@ +package destcfqueues + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/models" +) + +const ( + cloudflareAPIBaseURL = "https://api.cloudflare.com/client/v4" + providerType = "cloudflare_queues" +) + +// CloudflareQueuesDestination implements the destregistry.Provider interface for Cloudflare Queues. +type CloudflareQueuesDestination struct { + *destregistry.BaseProvider +} + +// CloudflareQueuesConfig holds the configuration for a Cloudflare Queues destination. +type CloudflareQueuesConfig struct { + AccountID string `json:"account_id" mapstructure:"account_id"` + QueueID string `json:"queue_id" mapstructure:"queue_id"` +} + +// CloudflareQueuesCredentials holds the credentials for authenticating with Cloudflare. +type CloudflareQueuesCredentials struct { + APIToken string `json:"api_token" mapstructure:"api_token"` +} + +var _ destregistry.Provider = (*CloudflareQueuesDestination)(nil) + +// New creates a new CloudflareQueuesDestination provider. +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*CloudflareQueuesDestination, error) { + base, err := destregistry.NewBaseProvider(loader, providerType, basePublisherOpts...) + if err != nil { + return nil, err + } + + return &CloudflareQueuesDestination{ + BaseProvider: base, + }, nil +} + +// Validate validates the destination configuration. +func (d *CloudflareQueuesDestination) Validate(ctx context.Context, destination *models.Destination) error { + _, _, err := d.resolveMetadata(ctx, destination) + if err != nil { + return err + } + return nil +} + +// CreatePublisher creates a new publisher for the destination. +func (d *CloudflareQueuesDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + cfg, creds, err := d.resolveMetadata(ctx, destination) + if err != nil { + return nil, err + } + + httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{}) + if err != nil { + return nil, err + } + + return &CloudflareQueuesPublisher{ + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), + httpClient: httpClient, + accountID: cfg.AccountID, + queueID: cfg.QueueID, + apiToken: creds.APIToken, + }, nil +} + +// ComputeTarget returns the target information for display purposes. +func (d *CloudflareQueuesDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + accountID := destination.Config["account_id"] + queueID := destination.Config["queue_id"] + + return destregistry.DestinationTarget{ + Target: queueID, + TargetURL: makeCloudflareQueuesDashboardURL(accountID, queueID), + } +} + +// resolveMetadata validates and resolves the destination configuration and credentials. +func (d *CloudflareQueuesDestination) resolveMetadata(ctx context.Context, destination *models.Destination) (*CloudflareQueuesConfig, *CloudflareQueuesCredentials, error) { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + return &CloudflareQueuesConfig{ + AccountID: destination.Config["account_id"], + QueueID: destination.Config["queue_id"], + }, &CloudflareQueuesCredentials{ + APIToken: destination.Credentials["api_token"], + }, nil +} + +// makeCloudflareQueuesDashboardURL constructs the Cloudflare dashboard URL for a queue. +func makeCloudflareQueuesDashboardURL(accountID, queueID string) string { + if accountID == "" || queueID == "" { + return "" + } + return fmt.Sprintf("https://dash.cloudflare.com/%s/queues/%s", accountID, queueID) +} + +// CloudflareQueuesPublisher handles publishing events to Cloudflare Queues. +type CloudflareQueuesPublisher struct { + *destregistry.BasePublisher + httpClient *http.Client + accountID string + queueID string + apiToken string +} + +// Close gracefully shuts down the publisher. +func (p *CloudflareQueuesPublisher) Close() error { + p.BasePublisher.StartClose() + return nil +} + +// SetHTTPClient allows setting a custom HTTP client, primarily for testing purposes. +func (p *CloudflareQueuesPublisher) SetHTTPClient(client *http.Client) { + p.httpClient = client +} + +// cloudflareMessage represents a single message in the Cloudflare Queues API request. +type cloudflareMessage struct { + Body interface{} `json:"body"` +} + +// cloudflareMessagesRequest represents the request body for the Cloudflare Queues API. +type cloudflareMessagesRequest struct { + Messages []cloudflareMessage `json:"messages"` +} + +// cloudflareAPIResponse represents the response from the Cloudflare API. +type cloudflareAPIResponse struct { + Success bool `json:"success"` + Errors []cloudflareAPIError `json:"errors"` + Messages []string `json:"messages"` + Result []map[string]interface{} `json:"result"` +} + +// cloudflareAPIError represents an error from the Cloudflare API. +type cloudflareAPIError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// messageBody represents the body structure sent to Cloudflare Queues. +type messageBody struct { + Data interface{} `json:"data"` + Metadata map[string]string `json:"metadata"` +} + +// Format builds the HTTP request for publishing to Cloudflare Queues. +func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Event) (*http.Request, error) { + now := time.Now() + metadata := p.BasePublisher.MakeMetadata(event, now) + + // Build the message body with data and metadata + body := messageBody{ + Data: event.Data, + Metadata: metadata, + } + + // Build the request payload + reqPayload := cloudflareMessagesRequest{ + Messages: []cloudflareMessage{ + {Body: body}, + }, + } + + payloadBytes, err := json.Marshal(reqPayload) + if err != nil { + return nil, fmt.Errorf("failed to marshal request payload: %w", err) + } + + // Build the API URL + url := fmt.Sprintf("%s/accounts/%s/queues/%s/messages", cloudflareAPIBaseURL, p.accountID, p.queueID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.apiToken)) + + return req, nil +} + +// Publish sends an event to Cloudflare Queues. +func (p *CloudflareQueuesPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + req, err := p.Format(ctx, event) + if err != nil { + return nil, err + } + + resp, err := p.httpClient.Do(req) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, providerType, map[string]interface{}{ + "error": err.Error(), + }) + } + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": fmt.Sprintf("failed to read response body: %s", err.Error()), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, providerType, map[string]interface{}{ + "error": fmt.Sprintf("failed to read response body: %s", err.Error()), + }) + } + + var apiResponse cloudflareAPIResponse + if err := json.Unmarshal(bodyBytes, &apiResponse); err != nil { + // If we can't parse the response, check status code + if resp.StatusCode >= 400 { + return &destregistry.Delivery{ + Status: "failed", + Code: fmt.Sprintf("%d", resp.StatusCode), + Response: map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + }, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("request failed with status %d", resp.StatusCode), + providerType, + map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + }) + } + } + + // Check for API-level errors + if !apiResponse.Success || len(apiResponse.Errors) > 0 { + errorMsg := "unknown error" + if len(apiResponse.Errors) > 0 { + errorMsg = apiResponse.Errors[0].Message + } + + return &destregistry.Delivery{ + Status: "failed", + Code: fmt.Sprintf("%d", resp.StatusCode), + Response: map[string]interface{}{ + "status": resp.StatusCode, + "success": apiResponse.Success, + "errors": apiResponse.Errors, + }, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("cloudflare API error: %s", errorMsg), + providerType, + map[string]interface{}{ + "status": resp.StatusCode, + "errors": apiResponse.Errors, + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{ + "status": resp.StatusCode, + "result": apiResponse.Result, + }, + }, nil +} diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go new file mode 100644 index 000000000..a0907ac64 --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go @@ -0,0 +1,394 @@ +package destcfqueues_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// cloudflareAPIResponse mirrors the response structure from Cloudflare API +type cloudflareAPIResponse struct { + Success bool `json:"success"` + Errors []cloudflareAPIError `json:"errors"` + Messages []string `json:"messages"` + Result []map[string]interface{} `json:"result"` +} + +type cloudflareAPIError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// cloudflareMessagesRequest mirrors the request structure for Cloudflare Queues API +type cloudflareMessagesRequest struct { + Messages []cloudflareMessage `json:"messages"` +} + +type cloudflareMessage struct { + Body messageBody `json:"body"` +} + +type messageBody struct { + Data interface{} `json:"data"` + Metadata map[string]string `json:"metadata"` +} + +func TestCloudflareQueuesPublisher_Format(t *testing.T) { + t.Parallel() + + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithData(map[string]interface{}{ + "order_id": "test-order-123", + "amount": 99.99, + }), + testutil.EventFactory.WithMetadata(map[string]string{ + "source": "test-service", + }), + ) + + t.Run("should produce correct HTTP request structure", func(t *testing.T) { + t.Parallel() + req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) + require.NoError(t, err) + + // Verify HTTP method + assert.Equal(t, http.MethodPost, req.Method) + + // Verify URL structure + assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/test-account-id/queues/test-queue-id/messages", req.URL.String()) + + // Verify Content-Type header + assert.Equal(t, "application/json", req.Header.Get("Content-Type")) + }) + + t.Run("should contain bearer token in Authorization header", func(t *testing.T) { + t.Parallel() + req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) + require.NoError(t, err) + + authHeader := req.Header.Get("Authorization") + assert.Equal(t, "Bearer test-api-token", authHeader) + }) + + t.Run("should contain event data and metadata in request body", func(t *testing.T) { + t.Parallel() + req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) + require.NoError(t, err) + + // Read and parse the request body + bodyBytes, err := io.ReadAll(req.Body) + require.NoError(t, err) + + var reqPayload cloudflareMessagesRequest + err = json.Unmarshal(bodyBytes, &reqPayload) + require.NoError(t, err) + + // Verify the message structure + require.Len(t, reqPayload.Messages, 1) + + // Verify event data is in the body + dataMap, ok := reqPayload.Messages[0].Body.Data.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "test-order-123", dataMap["order_id"]) + assert.Equal(t, 99.99, dataMap["amount"]) + + // Verify metadata is present + metadata := reqPayload.Messages[0].Body.Metadata + assert.Equal(t, "evt_123", metadata["event-id"]) + assert.Equal(t, "order.created", metadata["topic"]) + assert.Equal(t, "test-service", metadata["source"]) + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present in metadata") + }) +} + +func TestCloudflareQueuesPublisher_Publish_Success(t *testing.T) { + t.Parallel() + + // Create a mock server that simulates successful Cloudflare API response + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the request + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, "Bearer test-api-token", r.Header.Get("Authorization")) + assert.True(t, strings.HasSuffix(r.URL.Path, "/accounts/test-account-id/queues/test-queue-id/messages")) + + // Return success response + response := cloudflareAPIResponse{ + Success: true, + Errors: []cloudflareAPIError{}, + Messages: []string{}, + Result: []map[string]interface{}{ + {"messageId": "msg-123"}, + }, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + // Create provider with custom HTTP client that routes to test server + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + // Replace the HTTP client with one that routes to our test server + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + cfPublisher.SetHTTPClient(&http.Client{ + Transport: &testTransport{serverURL: server.URL}, + }) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithData(map[string]interface{}{ + "order_id": "test-order-123", + }), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) +} + +func TestCloudflareQueuesPublisher_Publish_APIError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + statusCode int + response cloudflareAPIResponse + expectedStatus string + expectedCode string + }{ + { + name: "401 Unauthorized", + statusCode: http.StatusUnauthorized, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10000, Message: "Authentication error"}, + }, + }, + expectedStatus: "failed", + expectedCode: "401", + }, + { + name: "403 Forbidden", + statusCode: http.StatusForbidden, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10001, Message: "Access denied"}, + }, + }, + expectedStatus: "failed", + expectedCode: "403", + }, + { + name: "404 Not Found", + statusCode: http.StatusNotFound, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10002, Message: "Queue not found"}, + }, + }, + expectedStatus: "failed", + expectedCode: "404", + }, + { + name: "500 Internal Server Error", + statusCode: http.StatusInternalServerError, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10003, Message: "Internal error"}, + }, + }, + expectedStatus: "failed", + expectedCode: "500", + }, + { + name: "API success false with errors", + statusCode: http.StatusOK, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10004, Message: "Validation error"}, + }, + }, + expectedStatus: "failed", + expectedCode: "200", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.statusCode) + json.NewEncoder(w).Encode(tt.response) + })) + defer server.Close() + + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + cfPublisher.SetHTTPClient(&http.Client{ + Transport: &testTransport{serverURL: server.URL}, + }) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.Error(t, err) + require.NotNil(t, delivery, "delivery should not be nil for API errors") + assert.Equal(t, tt.expectedStatus, delivery.Status) + assert.Equal(t, tt.expectedCode, delivery.Code) + }) + } +} + +func TestCloudflareQueuesPublisher_Publish_HTTPSuccess(t *testing.T) { + t.Parallel() + + successCodes := []int{ + http.StatusOK, + http.StatusCreated, + http.StatusAccepted, + } + + for _, statusCode := range successCodes { + t.Run(http.StatusText(statusCode), func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := cloudflareAPIResponse{ + Success: true, + Errors: []cloudflareAPIError{}, + Messages: []string{}, + Result: []map[string]interface{}{ + {"messageId": "msg-123"}, + }, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + cfPublisher.SetHTTPClient(&http.Client{ + Transport: &testTransport{serverURL: server.URL}, + }) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) + }) + } +} + +// testTransport redirects requests to the test server +type testTransport struct { + serverURL string +} + +func (t *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { + // Replace the URL with test server URL while keeping the path + newURL := t.serverURL + req.URL.Path + newReq, err := http.NewRequestWithContext(req.Context(), req.Method, newURL, req.Body) + if err != nil { + return nil, err + } + newReq.Header = req.Header + return http.DefaultTransport.RoundTrip(newReq) +} diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go new file mode 100644 index 000000000..f05c5708d --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go @@ -0,0 +1,130 @@ +package destcfqueues_test + +import ( + "context" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCloudflareQueuesDestination_Validate(t *testing.T) { + t.Parallel() + + validDestination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + cloudflareQueuesDestination, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should validate valid destination", func(t *testing.T) { + t.Parallel() + assert.NoError(t, cloudflareQueuesDestination.Validate(context.Background(), &validDestination)) + }) + + t.Run("should validate invalid type", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Type = "invalid" + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "type", validationErr.Errors[0].Field) + assert.Equal(t, "invalid_type", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing account_id", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Config = map[string]string{ + "queue_id": "test-queue-id", + } + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.account_id", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing queue_id", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Config = map[string]string{ + "account_id": "test-account-id", + } + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.queue_id", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing api_token", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Credentials = map[string]string{} + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "credentials.api_token", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) +} + +func TestCloudflareQueuesDestination_ComputeTarget(t *testing.T) { + t.Parallel() + + cloudflareQueuesDestination, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should return queue_id as target and dashboard URL", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "my-account-123", + "queue_id": "my-queue-456", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "my-queue-456", target.Target) + assert.Equal(t, "https://dash.cloudflare.com/my-account-123/queues/my-queue-456", target.TargetURL) + }) + + t.Run("should return empty target URL when account_id is missing", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "queue_id": "my-queue-456", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "my-queue-456", target.Target) + assert.Equal(t, "", target.TargetURL) + }) + + t.Run("should return empty target URL when queue_id is missing", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "my-account-123", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "", target.Target) + assert.Equal(t, "", target.TargetURL) + }) +} diff --git a/spec-sdk-tests/factories/destination.factory.ts b/spec-sdk-tests/factories/destination.factory.ts index 99cdb1674..a2e054c89 100644 --- a/spec-sdk-tests/factories/destination.factory.ts +++ b/spec-sdk-tests/factories/destination.factory.ts @@ -7,6 +7,7 @@ import type { DestinationCreateAzureServiceBus, DestinationCreateAwss3, DestinationCreateGCPPubSub, + DestinationCreateCloudflareQueues, } from '../../sdks/outpost-typescript/dist/commonjs/models/components/index'; export function createWebhookDestination( @@ -143,3 +144,20 @@ export function createGcpPubSubDestination( ...overrides, }; } + +export function createCloudflareQueuesDestination( + overrides?: Partial +): DestinationCreateCloudflareQueues { + return { + type: 'cloudflare_queues', + topics: ['*'], + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + ...overrides, + }; +} diff --git a/spec-sdk-tests/package.json b/spec-sdk-tests/package.json index 727e43aff..b4f107074 100644 --- a/spec-sdk-tests/package.json +++ b/spec-sdk-tests/package.json @@ -24,7 +24,7 @@ "author": "Outpost Team", "license": "Apache-2.0", "dependencies": { - "@hookdeck/outpost-sdk": "file:../../../sdks/outpost-typescript" + "@hookdeck/outpost-sdk": "file:../sdks/outpost-typescript" }, "devDependencies": { "@stoplight/spectral-cli": "^6.11.0", diff --git a/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts b/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts new file mode 100644 index 000000000..2ecc30ead --- /dev/null +++ b/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts @@ -0,0 +1,440 @@ +import { describe, it, before, after } from 'mocha'; +import { expect } from 'chai'; +import { SdkClient, createSdkClient } from '../../utils/sdk-client'; +import { createCloudflareQueuesDestination } from '../../factories/destination.factory'; +/* eslint-disable no-console */ +/* eslint-disable no-undef */ + +// Get configured test topics from environment (required) +if (!process.env.TEST_TOPICS) { + throw new Error('TEST_TOPICS environment variable is required. Please set it in .env file.'); +} +const TEST_TOPICS = process.env.TEST_TOPICS.split(',').map((t) => t.trim()); + +describe('Cloudflare Queues Destinations - Contract Tests (SDK-based validation)', () => { + let client: SdkClient; + + before(async () => { + client = createSdkClient(); + + // Create tenant if it doesn't exist (idempotent operation) + try { + await client.upsertTenant(); + } catch (error) { + console.warn('Failed to create tenant (may already exist):', error); + } + }); + + after(async () => { + // Cleanup: delete all destinations for the test tenant + try { + const destinations = await client.listDestinations(); + console.log(`Cleaning up ${destinations.length} destinations...`); + + for (const destination of destinations) { + try { + await client.deleteDestination(destination.id); + console.log(`Deleted destination: ${destination.id}`); + } catch (error) { + console.warn(`Failed to delete destination ${destination.id}:`, error); + } + } + + console.log('All destinations cleaned up'); + } catch (error) { + console.warn('Failed to list destinations for cleanup:', error); + } + + // Cleanup: delete the test tenant + try { + await client.deleteTenant(); + console.log('Test tenant deleted'); + } catch (error) { + console.warn('Failed to delete tenant:', error); + } + }); + + describe('POST /api/v1/tenants/{tenant_id}/destinations - Create Cloudflare Queues Destination', () => { + it('should create a Cloudflare Queues destination with valid config', async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + + expect(destination.type).to.equal('cloudflare_queues'); + expect(destination.config.accountId).to.equal(destinationData.config.accountId); + expect(destination.config.queueId).to.equal(destinationData.config.queueId); + }); + + it('should create a Cloudflare Queues destination with array of topics', async () => { + const destinationData = createCloudflareQueuesDestination({ + topics: TEST_TOPICS, + }); + const destination = await client.createDestination(destinationData); + + expect(destination.topics).to.have.lengthOf(TEST_TOPICS.length); + TEST_TOPICS.forEach((topic) => { + expect(destination.topics).to.include(topic); + }); + + // Cleanup + await client.deleteDestination(destination.id); + }); + + it('should create destination with user-provided ID', async () => { + const customId = `custom-cf-queues-${Date.now()}`; + const destinationData = createCloudflareQueuesDestination({ + id: customId, + }); + const destination = await client.createDestination(destinationData); + + expect(destination.id).to.equal(customId); + + // Cleanup + await client.deleteDestination(destination.id); + }); + + it('should reject creation with missing required config field: account_id', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + // Missing accountId + queueId: 'my-queue-id', + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing required config field: queue_id', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + accountId: 'abc123def456', + // Missing queueId + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing required credential field: api_token', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + credentials: { + // Missing apiToken + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing credentials', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + // Missing credentials + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing type field', async () => { + let errorThrown = false; + try { + await client.createDestination({ + topics: '*', + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with empty topics', async () => { + let errorThrown = false; + try { + const destinationData = createCloudflareQueuesDestination({ + topics: [], + }); + await client.createDestination(destinationData); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); + + describe('GET /api/v1/tenants/{tenant_id}/destinations/{id} - Retrieve Cloudflare Queues Destination', () => { + let destinationId: string; + + before(async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + destinationId = destination.id; + }); + + after(async () => { + try { + await client.deleteDestination(destinationId); + } catch (error) { + console.warn('Failed to cleanup destination:', error); + } + }); + + it('should retrieve an existing Cloudflare Queues destination', async () => { + const destination = await client.getDestination(destinationId); + + expect(destination.id).to.equal(destinationId); + expect(destination.type).to.equal('cloudflare_queues'); + expect(destination.config.accountId).to.exist; + expect(destination.config.queueId).to.exist; + }); + + it('should return 404 for non-existent destination', async () => { + let errorThrown = false; + try { + await client.getDestination('non-existent-id-12345'); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.equal(404); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); + + describe('GET /api/v1/tenants/{tenant_id}/destinations - List Cloudflare Queues Destinations', () => { + before(async () => { + // Create multiple Cloudflare Queues destinations for listing + await client.createDestination(createCloudflareQueuesDestination()); + await client.createDestination( + createCloudflareQueuesDestination({ + topics: [TEST_TOPICS[0]], + config: { + accountId: 'abc123def456', + queueId: 'my-queue-2', + }, + }) + ); + }); + + it('should list all destinations', async () => { + const destinations = await client.listDestinations(); + + expect(destinations.length).to.be.greaterThan(0); + }); + + it('should filter destinations by type', async () => { + const destinations = await client.listDestinations({ type: 'cloudflare_queues' }); + + destinations.forEach((dest) => { + expect(dest.type).to.equal('cloudflare_queues'); + }); + }); + }); + + describe('PATCH /api/v1/tenants/{tenant_id}/destinations/{id} - Update Cloudflare Queues Destination', () => { + let destinationId: string; + + before(async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + destinationId = destination.id; + }); + + after(async () => { + try { + await client.deleteDestination(destinationId); + } catch (error) { + console.warn('Failed to cleanup destination:', error); + } + }); + + it('should update destination topics', async () => { + const updated = await client.updateDestination(destinationId, { + topics: ['user.created', 'user.updated'], + }); + + expect(updated.id).to.equal(destinationId); + expect(updated.type).to.equal('cloudflare_queues'); + expect(updated.topics).to.include('user.created'); + expect(updated.topics).to.include('user.updated'); + }); + + it('should update destination config', async () => { + const updated = await client.updateDestination(destinationId, { + config: { + accountId: 'updated-account-id', + queueId: 'updated-queue-id', + }, + }); + + expect(updated.id).to.equal(destinationId); + expect(updated.config).to.exist; + if (updated.config) { + expect(updated.config.accountId).to.equal('updated-account-id'); + expect(updated.config.queueId).to.equal('updated-queue-id'); + } + }); + + it('should update destination credentials', async () => { + const updated = await client.updateDestination(destinationId, { + credentials: { + apiToken: 'updated-api-token', + }, + }); + + expect(updated.id).to.equal(destinationId); + }); + + it('should return 404 for updating non-existent destination', async () => { + let errorThrown = false; + try { + await client.updateDestination('non-existent-id-12345', { + topics: ['test'], + }); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.equal(404); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); + + describe('DELETE /api/v1/tenants/{tenant_id}/destinations/{id} - Delete Cloudflare Queues Destination', () => { + it('should delete an existing destination', async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + + await client.deleteDestination(destination.id); + + // Verify deletion by trying to get the destination + let errorThrown = false; + try { + await client.getDestination(destination.id); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + } + expect(errorThrown).to.be.true; + }); + + it('should return 404 for deleting non-existent destination', async () => { + let errorThrown = false; + try { + await client.deleteDestination('non-existent-id-12345'); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.equal(404); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); +}); From 991855bff1f5ca976db13f8cab490e0881e18007 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 14 May 2026 01:55:34 +0700 Subject: [PATCH 2/5] fix(destcfqueues): correct CF Queues API payload and response shapes The original implementation posted a {"messages":[{"body":...}]} batch wrapper to the single-message push endpoint, which is the wrong shape for that path, and parsed `result` as []map (the docs show an object). On a real success response the slice/object mismatch would silently fall into the `apiResponse.Success` zero-value path and report every successful publish as failed. Verified against: - developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/ Changes: - Format(): drop the messages array wrapper; post {"body":{"data":..., "metadata":...}, "content_type":"json"} directly to /accounts/{id}/queues/{id}/messages - cloudflareAPIResponse.Result is now an object matching CF docs (result.metadata.metrics.{backlog_bytes,backlog_count,oldest_message_timestamp_ms}) - cloudflareAPIError gains documentation_url and source.pointer - Publish(): non-2xx always returns failure with as much detail as parseable; 2xx with unparseable body now trusts the status instead of falling through to the success=false branch - Tests rewritten to mock the real CF success shape; added a roundtrip guard that fails loudly if CF's documented shape stops decoding - Added nav.json + overview/concepts mdoc entries for the destination --- docs/content/concepts.mdoc | 1 + docs/content/nav.json | 3 +- docs/content/overview.mdoc | 1 + .../providers/destcfqueues/destcfqueues.go | 149 ++++--- .../destcfqueues/destcfqueues_publish_test.go | 419 +++++++----------- 5 files changed, 235 insertions(+), 338 deletions(-) diff --git a/docs/content/concepts.mdoc b/docs/content/concepts.mdoc index 4d4944a59..ce17f0078 100644 --- a/docs/content/concepts.mdoc +++ b/docs/content/concepts.mdoc @@ -97,6 +97,7 @@ The following destination types are available for your tenants to configure: - [GCP Pub/Sub](/docs/outpost/destinations/gcp-pubsub) - [RabbitMQ (AMQP)](/docs/outpost/destinations/rabbitmq) - [Kafka](/docs/outpost/destinations/kafka) +- [Cloudflare Queues](/docs/outpost/destinations/cloudflare-queues) - Amazon EventBridge (planned) **Hookdeck Outpost** is the same [open-source Outpost](https://github.com/hookdeck/outpost) project, operated on Hookdeck’s infrastructure. We do not maintain a separate hosted fork; what we run tracks the public codebase. diff --git a/docs/content/nav.json b/docs/content/nav.json index b653a7f67..2493fea62 100644 --- a/docs/content/nav.json +++ b/docs/content/nav.json @@ -71,7 +71,8 @@ "title": "Azure Service Bus" }, { "slug": "destinations/rabbitmq", "title": "RabbitMQ" }, - { "slug": "destinations/kafka", "title": "Apache Kafka" } + { "slug": "destinations/kafka", "title": "Apache Kafka" }, + { "slug": "destinations/cloudflare-queues", "title": "Cloudflare Queues" } ] ] }, diff --git a/docs/content/overview.mdoc b/docs/content/overview.mdoc index 2d92d9c8c..c7ebe54e1 100644 --- a/docs/content/overview.mdoc +++ b/docs/content/overview.mdoc @@ -42,6 +42,7 @@ Outpost delivers events to any of the following destination types: - **[GCP Pub/Sub](/docs/outpost/destinations/gcp-pubsub)** — Publish to a Pub/Sub topic - **[RabbitMQ (AMQP)](/docs/outpost/destinations/rabbitmq)** — Send to a remote RabbitMQ exchange - **[Kafka](/docs/outpost/destinations/kafka)** — Send to a remote Kafka topic +- **[Cloudflare Queues](/docs/outpost/destinations/cloudflare-queues)** — Publish to a Cloudflare Queue via the HTTP API If you'd like to see more destination types added, [open an issue or PR](https://github.com/hookdeck/outpost/issues). diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues.go b/internal/destregistry/providers/destcfqueues/destcfqueues.go index 964001ed9..a5f05f8de 100644 --- a/internal/destregistry/providers/destcfqueues/destcfqueues.go +++ b/internal/destregistry/providers/destcfqueues/destcfqueues.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "strconv" "time" "github.com/hookdeck/outpost/internal/destregistry" @@ -65,7 +66,7 @@ func (d *CloudflareQueuesDestination) CreatePublisher(ctx context.Context, desti return nil, err } - httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{}) + httpClient, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{}) if err != nil { return nil, err } @@ -132,52 +133,55 @@ func (p *CloudflareQueuesPublisher) SetHTTPClient(client *http.Client) { p.httpClient = client } -// cloudflareMessage represents a single message in the Cloudflare Queues API request. -type cloudflareMessage struct { - Body interface{} `json:"body"` -} - -// cloudflareMessagesRequest represents the request body for the Cloudflare Queues API. -type cloudflareMessagesRequest struct { - Messages []cloudflareMessage `json:"messages"` +// cloudflareMessageRequest is the body for POST /accounts/{id}/queues/{id}/messages +// (single-message push). See https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/ +type cloudflareMessageRequest struct { + Body messageBody `json:"body"` + ContentType string `json:"content_type"` } // cloudflareAPIResponse represents the response from the Cloudflare API. type cloudflareAPIResponse struct { - Success bool `json:"success"` - Errors []cloudflareAPIError `json:"errors"` - Messages []string `json:"messages"` - Result []map[string]interface{} `json:"result"` + Success bool `json:"success"` + Errors []cloudflareAPIError `json:"errors"` + Messages []string `json:"messages"` + Result *cloudflareResult `json:"result"` +} + +type cloudflareResult struct { + Metadata struct { + Metrics struct { + BacklogBytes int64 `json:"backlog_bytes"` + BacklogCount int64 `json:"backlog_count"` + OldestMessageTimestampMs int64 `json:"oldest_message_timestamp_ms"` + } `json:"metrics"` + } `json:"metadata"` } // cloudflareAPIError represents an error from the Cloudflare API. type cloudflareAPIError struct { - Code int `json:"code"` - Message string `json:"message"` + Code int `json:"code"` + Message string `json:"message"` + DocumentationURL string `json:"documentation_url,omitempty"` + Source *struct { + Pointer string `json:"pointer"` + } `json:"source,omitempty"` } -// messageBody represents the body structure sent to Cloudflare Queues. +// messageBody is the wrapper Outpost places inside the Cloudflare message body. type messageBody struct { - Data interface{} `json:"data"` + Data json.RawMessage `json:"data"` Metadata map[string]string `json:"metadata"` } -// Format builds the HTTP request for publishing to Cloudflare Queues. +// Format builds the HTTP request for publishing a single message to a Cloudflare Queue. func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Event) (*http.Request, error) { - now := time.Now() - metadata := p.BasePublisher.MakeMetadata(event, now) - - // Build the message body with data and metadata - body := messageBody{ - Data: event.Data, - Metadata: metadata, - } - - // Build the request payload - reqPayload := cloudflareMessagesRequest{ - Messages: []cloudflareMessage{ - {Body: body}, + reqPayload := cloudflareMessageRequest{ + Body: messageBody{ + Data: event.Data, + Metadata: p.BasePublisher.MakeMetadata(event, time.Now()), }, + ContentType: "json", } payloadBytes, err := json.Marshal(reqPayload) @@ -185,7 +189,6 @@ func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Ev return nil, fmt.Errorf("failed to marshal request payload: %w", err) } - // Build the API URL url := fmt.Sprintf("%s/accounts/%s/queues/%s/messages", cloudflareAPIBaseURL, p.accountID, p.queueID) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(payloadBytes)) @@ -194,7 +197,7 @@ func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Ev } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.apiToken)) + req.Header.Set("Authorization", "Bearer "+p.apiToken) return req, nil } @@ -238,49 +241,56 @@ func (p *CloudflareQueuesPublisher) Publish(ctx context.Context, event *models.E }) } - var apiResponse cloudflareAPIResponse - if err := json.Unmarshal(bodyBytes, &apiResponse); err != nil { - // If we can't parse the response, check status code - if resp.StatusCode >= 400 { - return &destregistry.Delivery{ - Status: "failed", - Code: fmt.Sprintf("%d", resp.StatusCode), - Response: map[string]interface{}{ - "status": resp.StatusCode, - "body": string(bodyBytes), - }, - }, destregistry.NewErrDestinationPublishAttempt( - fmt.Errorf("request failed with status %d", resp.StatusCode), - providerType, - map[string]interface{}{ - "status": resp.StatusCode, - "body": string(bodyBytes), - }) - } - } + statusCode := strconv.Itoa(resp.StatusCode) - // Check for API-level errors - if !apiResponse.Success || len(apiResponse.Errors) > 0 { - errorMsg := "unknown error" - if len(apiResponse.Errors) > 0 { + // Any non-2xx is a failure regardless of body parseability. + if resp.StatusCode >= 400 { + var apiResponse cloudflareAPIResponse + errorMsg := fmt.Sprintf("request failed with status %d", resp.StatusCode) + response := map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + } + if json.Unmarshal(bodyBytes, &apiResponse) == nil && len(apiResponse.Errors) > 0 { errorMsg = apiResponse.Errors[0].Message + response["errors"] = apiResponse.Errors } - return &destregistry.Delivery{ - Status: "failed", - Code: fmt.Sprintf("%d", resp.StatusCode), - Response: map[string]interface{}{ + Status: "failed", + Code: statusCode, + Response: response, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("cloudflare API error: %s", errorMsg), + providerType, + response, + ) + } + + // 2xx. Try to parse — if the body is unparseable but status is OK, + // trust the status. CF returns valid JSON on success; only weird + // proxies would land us here. + var apiResponse cloudflareAPIResponse + if err := json.Unmarshal(bodyBytes, &apiResponse); err == nil { + if !apiResponse.Success { + errorMsg := "cloudflare API reported success=false" + if len(apiResponse.Errors) > 0 { + errorMsg = apiResponse.Errors[0].Message + } + response := map[string]interface{}{ "status": resp.StatusCode, "success": apiResponse.Success, "errors": apiResponse.Errors, - }, - }, destregistry.NewErrDestinationPublishAttempt( - fmt.Errorf("cloudflare API error: %s", errorMsg), - providerType, - map[string]interface{}{ - "status": resp.StatusCode, - "errors": apiResponse.Errors, - }) + } + return &destregistry.Delivery{ + Status: "failed", + Code: statusCode, + Response: response, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("cloudflare API error: %s", errorMsg), + providerType, + response, + ) + } } return &destregistry.Delivery{ @@ -288,7 +298,6 @@ func (p *CloudflareQueuesPublisher) Publish(ctx context.Context, event *models.E Code: "OK", Response: map[string]interface{}{ "status": resp.StatusCode, - "result": apiResponse.Result, }, }, nil } diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go index a0907ac64..8edacd276 100644 --- a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go @@ -15,39 +15,49 @@ import ( "github.com/stretchr/testify/require" ) -// cloudflareAPIResponse mirrors the response structure from Cloudflare API -type cloudflareAPIResponse struct { - Success bool `json:"success"` - Errors []cloudflareAPIError `json:"errors"` - Messages []string `json:"messages"` - Result []map[string]interface{} `json:"result"` -} - -type cloudflareAPIError struct { - Code int `json:"code"` - Message string `json:"message"` -} - -// cloudflareMessagesRequest mirrors the request structure for Cloudflare Queues API -type cloudflareMessagesRequest struct { - Messages []cloudflareMessage `json:"messages"` -} - -type cloudflareMessage struct { - Body messageBody `json:"body"` -} - -type messageBody struct { - Data interface{} `json:"data"` - Metadata map[string]string `json:"metadata"` -} - -func TestCloudflareQueuesPublisher_Format(t *testing.T) { +// successResponseJSON mirrors a real Cloudflare Queues push success response. +// Captured from CF docs; replace with a recorded fixture once we have live creds. +const successResponseJSON = `{ + "success": true, + "result": { + "metadata": { + "metrics": { + "backlog_bytes": 1024, + "backlog_count": 5, + "oldest_message_timestamp_ms": 1710950954154 + } + } + }, + "messages": [], + "errors": [] +}` + +// Roundtrip guard: if CF's documented success shape ever stops decoding cleanly +// into our types, this fails loudly instead of degrading silently to "failed". +func TestCloudflareAPIResponse_DecodesDocumentedShape(t *testing.T) { t.Parallel() + var parsed struct { + Success bool `json:"success"` + Result *struct { + Metadata struct { + Metrics struct { + BacklogBytes int64 `json:"backlog_bytes"` + BacklogCount int64 `json:"backlog_count"` + OldestMessageTimestampMs int64 `json:"oldest_message_timestamp_ms"` + } `json:"metrics"` + } `json:"metadata"` + } `json:"result"` + } + require.NoError(t, json.Unmarshal([]byte(successResponseJSON), &parsed)) + assert.True(t, parsed.Success) + require.NotNil(t, parsed.Result) + assert.Equal(t, int64(1024), parsed.Result.Metadata.Metrics.BacklogBytes) +} +func newPublisher(t *testing.T, serverURL string) *destcfqueues.CloudflareQueuesPublisher { + t.Helper() provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) - destination := testutil.DestinationFactory.Any( testutil.DestinationFactory.WithType("cloudflare_queues"), testutil.DestinationFactory.WithConfig(map[string]string{ @@ -58,15 +68,24 @@ func TestCloudflareQueuesPublisher_Format(t *testing.T) { "api_token": "test-api-token", }), ) - publisher, err := provider.CreatePublisher(context.Background(), &destination) require.NoError(t, err) + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + if serverURL != "" { + cfPublisher.SetHTTPClient(&http.Client{Transport: &testTransport{serverURL: serverURL}}) + } + return cfPublisher +} + +func TestCloudflareQueuesPublisher_Format(t *testing.T) { + t.Parallel() + publisher := newPublisher(t, "") defer publisher.Close() event := testutil.EventFactory.Any( testutil.EventFactory.WithID("evt_123"), testutil.EventFactory.WithTopic("order.created"), - testutil.EventFactory.WithData(map[string]interface{}{ + testutil.EventFactory.WithDataMap(map[string]interface{}{ "order_id": "test-order-123", "amount": 99.99, }), @@ -75,315 +94,181 @@ func TestCloudflareQueuesPublisher_Format(t *testing.T) { }), ) - t.Run("should produce correct HTTP request structure", func(t *testing.T) { - t.Parallel() - req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) - require.NoError(t, err) - - // Verify HTTP method - assert.Equal(t, http.MethodPost, req.Method) - - // Verify URL structure - assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/test-account-id/queues/test-queue-id/messages", req.URL.String()) - - // Verify Content-Type header - assert.Equal(t, "application/json", req.Header.Get("Content-Type")) - }) - - t.Run("should contain bearer token in Authorization header", func(t *testing.T) { - t.Parallel() - req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) - require.NoError(t, err) - - authHeader := req.Header.Get("Authorization") - assert.Equal(t, "Bearer test-api-token", authHeader) - }) - - t.Run("should contain event data and metadata in request body", func(t *testing.T) { - t.Parallel() - req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) - require.NoError(t, err) - - // Read and parse the request body - bodyBytes, err := io.ReadAll(req.Body) - require.NoError(t, err) - - var reqPayload cloudflareMessagesRequest - err = json.Unmarshal(bodyBytes, &reqPayload) - require.NoError(t, err) + req, err := publisher.Format(context.Background(), &event) + require.NoError(t, err) - // Verify the message structure - require.Len(t, reqPayload.Messages, 1) + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/test-account-id/queues/test-queue-id/messages", req.URL.String()) + assert.Equal(t, "application/json", req.Header.Get("Content-Type")) + assert.Equal(t, "Bearer test-api-token", req.Header.Get("Authorization")) - // Verify event data is in the body - dataMap, ok := reqPayload.Messages[0].Body.Data.(map[string]interface{}) - require.True(t, ok) - assert.Equal(t, "test-order-123", dataMap["order_id"]) - assert.Equal(t, 99.99, dataMap["amount"]) + bodyBytes, err := io.ReadAll(req.Body) + require.NoError(t, err) - // Verify metadata is present - metadata := reqPayload.Messages[0].Body.Metadata - assert.Equal(t, "evt_123", metadata["event-id"]) - assert.Equal(t, "order.created", metadata["topic"]) - assert.Equal(t, "test-service", metadata["source"]) - assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present in metadata") - }) + var payload struct { + Body struct { + Data map[string]interface{} `json:"data"` + Metadata map[string]string `json:"metadata"` + } `json:"body"` + ContentType string `json:"content_type"` + } + require.NoError(t, json.Unmarshal(bodyBytes, &payload)) + + assert.Equal(t, "json", payload.ContentType, "must set content_type=json so CF consumers decode JSON correctly") + assert.Equal(t, "test-order-123", payload.Body.Data["order_id"]) + assert.Equal(t, 99.99, payload.Body.Data["amount"]) + assert.Equal(t, "evt_123", payload.Body.Metadata["event-id"]) + assert.Equal(t, "order.created", payload.Body.Metadata["topic"]) + assert.Equal(t, "test-service", payload.Body.Metadata["source"]) + assert.NotEmpty(t, payload.Body.Metadata["timestamp"]) } func TestCloudflareQueuesPublisher_Publish_Success(t *testing.T) { t.Parallel() - // Create a mock server that simulates successful Cloudflare API response + var receivedBody []byte server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Verify the request assert.Equal(t, http.MethodPost, r.Method) assert.Equal(t, "application/json", r.Header.Get("Content-Type")) assert.Equal(t, "Bearer test-api-token", r.Header.Get("Authorization")) assert.True(t, strings.HasSuffix(r.URL.Path, "/accounts/test-account-id/queues/test-queue-id/messages")) - - // Return success response - response := cloudflareAPIResponse{ - Success: true, - Errors: []cloudflareAPIError{}, - Messages: []string{}, - Result: []map[string]interface{}{ - {"messageId": "msg-123"}, - }, - } + receivedBody, _ = io.ReadAll(r.Body) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(response) + _, _ = w.Write([]byte(successResponseJSON)) })) defer server.Close() - // Create provider with custom HTTP client that routes to test server - provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) - require.NoError(t, err) - - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithType("cloudflare_queues"), - testutil.DestinationFactory.WithConfig(map[string]string{ - "account_id": "test-account-id", - "queue_id": "test-queue-id", - }), - testutil.DestinationFactory.WithCredentials(map[string]string{ - "api_token": "test-api-token", - }), - ) - - publisher, err := provider.CreatePublisher(context.Background(), &destination) - require.NoError(t, err) + publisher := newPublisher(t, server.URL) defer publisher.Close() - // Replace the HTTP client with one that routes to our test server - cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) - cfPublisher.SetHTTPClient(&http.Client{ - Transport: &testTransport{serverURL: server.URL}, - }) - event := testutil.EventFactory.Any( testutil.EventFactory.WithID("evt_123"), testutil.EventFactory.WithTopic("order.created"), - testutil.EventFactory.WithData(map[string]interface{}{ - "order_id": "test-order-123", - }), + testutil.EventFactory.WithDataMap(map[string]interface{}{"order_id": "test-order-123"}), ) delivery, err := publisher.Publish(context.Background(), &event) require.NoError(t, err) assert.Equal(t, "success", delivery.Status) assert.Equal(t, "OK", delivery.Code) + + // Verify server received the right single-message shape (not a batch wrapper). + var sent map[string]interface{} + require.NoError(t, json.Unmarshal(receivedBody, &sent)) + _, hasMessagesWrapper := sent["messages"] + assert.False(t, hasMessagesWrapper, "must not wrap in a batch 'messages' array — single-message endpoint") + assert.Equal(t, "json", sent["content_type"]) + assert.NotNil(t, sent["body"]) +} + +func TestCloudflareQueuesPublisher_Publish_HTTPSuccess(t *testing.T) { + t.Parallel() + + for _, statusCode := range []int{http.StatusOK, http.StatusCreated, http.StatusAccepted} { + statusCode := statusCode + t.Run(http.StatusText(statusCode), func(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + _, _ = w.Write([]byte(successResponseJSON)) + })) + defer server.Close() + + publisher := newPublisher(t, server.URL) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithDataMap(map[string]interface{}{"key": "value"}), + ) + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) + }) + } } func TestCloudflareQueuesPublisher_Publish_APIError(t *testing.T) { t.Parallel() tests := []struct { - name string - statusCode int - response cloudflareAPIResponse - expectedStatus string - expectedCode string + name string + statusCode int + responseBody string + expectedCode string }{ { - name: "401 Unauthorized", - statusCode: http.StatusUnauthorized, - response: cloudflareAPIResponse{ - Success: false, - Errors: []cloudflareAPIError{ - {Code: 10000, Message: "Authentication error"}, - }, - }, - expectedStatus: "failed", - expectedCode: "401", + name: "401 Unauthorized", + statusCode: http.StatusUnauthorized, + responseBody: `{"success":false,"errors":[{"code":10000,"message":"Authentication error","documentation_url":"https://developers.cloudflare.com/api","source":{"pointer":"/"}}],"messages":[],"result":null}`, + expectedCode: "401", + }, + { + name: "403 Forbidden", + statusCode: http.StatusForbidden, + responseBody: `{"success":false,"errors":[{"code":10001,"message":"Access denied"}],"messages":[],"result":null}`, + expectedCode: "403", }, { - name: "403 Forbidden", - statusCode: http.StatusForbidden, - response: cloudflareAPIResponse{ - Success: false, - Errors: []cloudflareAPIError{ - {Code: 10001, Message: "Access denied"}, - }, - }, - expectedStatus: "failed", - expectedCode: "403", + name: "404 Not Found", + statusCode: http.StatusNotFound, + responseBody: `{"success":false,"errors":[{"code":10002,"message":"Queue not found"}],"messages":[],"result":null}`, + expectedCode: "404", }, { - name: "404 Not Found", - statusCode: http.StatusNotFound, - response: cloudflareAPIResponse{ - Success: false, - Errors: []cloudflareAPIError{ - {Code: 10002, Message: "Queue not found"}, - }, - }, - expectedStatus: "failed", - expectedCode: "404", + name: "500 Internal Server Error", + statusCode: http.StatusInternalServerError, + responseBody: `{"success":false,"errors":[{"code":10003,"message":"Internal error"}],"messages":[],"result":null}`, + expectedCode: "500", }, { - name: "500 Internal Server Error", - statusCode: http.StatusInternalServerError, - response: cloudflareAPIResponse{ - Success: false, - Errors: []cloudflareAPIError{ - {Code: 10003, Message: "Internal error"}, - }, - }, - expectedStatus: "failed", - expectedCode: "500", + name: "200 OK but success=false", + statusCode: http.StatusOK, + responseBody: `{"success":false,"errors":[{"code":10004,"message":"Validation error"}],"messages":[],"result":null}`, + expectedCode: "200", }, { - name: "API success false with errors", - statusCode: http.StatusOK, - response: cloudflareAPIResponse{ - Success: false, - Errors: []cloudflareAPIError{ - {Code: 10004, Message: "Validation error"}, - }, - }, - expectedStatus: "failed", - expectedCode: "200", + name: "unparseable error body still returns failure", + statusCode: http.StatusBadGateway, + responseBody: `bad gateway`, + expectedCode: "502", }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(tt.statusCode) - json.NewEncoder(w).Encode(tt.response) + _, _ = w.Write([]byte(tt.responseBody)) })) defer server.Close() - provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) - require.NoError(t, err) - - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithType("cloudflare_queues"), - testutil.DestinationFactory.WithConfig(map[string]string{ - "account_id": "test-account-id", - "queue_id": "test-queue-id", - }), - testutil.DestinationFactory.WithCredentials(map[string]string{ - "api_token": "test-api-token", - }), - ) - - publisher, err := provider.CreatePublisher(context.Background(), &destination) - require.NoError(t, err) + publisher := newPublisher(t, server.URL) defer publisher.Close() - cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) - cfPublisher.SetHTTPClient(&http.Client{ - Transport: &testTransport{serverURL: server.URL}, - }) - event := testutil.EventFactory.Any( - testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + testutil.EventFactory.WithDataMap(map[string]interface{}{"key": "value"}), ) - delivery, err := publisher.Publish(context.Background(), &event) require.Error(t, err) - require.NotNil(t, delivery, "delivery should not be nil for API errors") - assert.Equal(t, tt.expectedStatus, delivery.Status) + require.NotNil(t, delivery) + assert.Equal(t, "failed", delivery.Status) assert.Equal(t, tt.expectedCode, delivery.Code) }) } } -func TestCloudflareQueuesPublisher_Publish_HTTPSuccess(t *testing.T) { - t.Parallel() - - successCodes := []int{ - http.StatusOK, - http.StatusCreated, - http.StatusAccepted, - } - - for _, statusCode := range successCodes { - t.Run(http.StatusText(statusCode), func(t *testing.T) { - t.Parallel() - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - response := cloudflareAPIResponse{ - Success: true, - Errors: []cloudflareAPIError{}, - Messages: []string{}, - Result: []map[string]interface{}{ - {"messageId": "msg-123"}, - }, - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(statusCode) - json.NewEncoder(w).Encode(response) - })) - defer server.Close() - - provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) - require.NoError(t, err) - - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithType("cloudflare_queues"), - testutil.DestinationFactory.WithConfig(map[string]string{ - "account_id": "test-account-id", - "queue_id": "test-queue-id", - }), - testutil.DestinationFactory.WithCredentials(map[string]string{ - "api_token": "test-api-token", - }), - ) - - publisher, err := provider.CreatePublisher(context.Background(), &destination) - require.NoError(t, err) - defer publisher.Close() - - cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) - cfPublisher.SetHTTPClient(&http.Client{ - Transport: &testTransport{serverURL: server.URL}, - }) - - event := testutil.EventFactory.Any( - testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), - ) - - delivery, err := publisher.Publish(context.Background(), &event) - require.NoError(t, err) - assert.Equal(t, "success", delivery.Status) - assert.Equal(t, "OK", delivery.Code) - }) - } -} - -// testTransport redirects requests to the test server +// testTransport redirects requests to the test server while preserving the path. type testTransport struct { serverURL string } func (t *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { - // Replace the URL with test server URL while keeping the path newURL := t.serverURL + req.URL.Path newReq, err := http.NewRequestWithContext(req.Context(), req.Method, newURL, req.Body) if err != nil { From 0f8f1312a1fcb4c75bc0b31d7e3a096536ac6399 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 14 May 2026 01:57:54 +0700 Subject: [PATCH 3/5] refactor(destcfqueues): remove test-only setter, fix dashboard URL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace SetHTTPClient + testTransport URL-rewriting hack with a WithBaseURL functional option on New(). Tests now point baseURL at the httptest.Server directly — no path rewriting required. - Dashboard URL: the per-queue dashboard path uses the queue *name*, not the queue *id* we store in config, so the previous /queues/{queue_id} link 404s. Point at the account's queues list page (/workers/queues) instead until we surface the queue name too. --- .../providers/destcfqueues/destcfqueues.go | 38 +++++++++++++------ .../destcfqueues/destcfqueues_publish_test.go | 26 +++---------- .../destcfqueues_validate_test.go | 2 +- 3 files changed, 34 insertions(+), 32 deletions(-) diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues.go b/internal/destregistry/providers/destcfqueues/destcfqueues.go index a5f05f8de..3729e5157 100644 --- a/internal/destregistry/providers/destcfqueues/destcfqueues.go +++ b/internal/destregistry/providers/destcfqueues/destcfqueues.go @@ -23,6 +23,18 @@ const ( // CloudflareQueuesDestination implements the destregistry.Provider interface for Cloudflare Queues. type CloudflareQueuesDestination struct { *destregistry.BaseProvider + baseURL string +} + +// Option configures a CloudflareQueuesDestination. +type Option func(*CloudflareQueuesDestination) + +// WithBaseURL overrides the Cloudflare API base URL. Intended for tests +// pointing at an httptest.Server; production code should never set this. +func WithBaseURL(url string) Option { + return func(d *CloudflareQueuesDestination) { + d.baseURL = url + } } // CloudflareQueuesConfig holds the configuration for a Cloudflare Queues destination. @@ -39,15 +51,20 @@ type CloudflareQueuesCredentials struct { var _ destregistry.Provider = (*CloudflareQueuesDestination)(nil) // New creates a new CloudflareQueuesDestination provider. -func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*CloudflareQueuesDestination, error) { +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...Option) (*CloudflareQueuesDestination, error) { base, err := destregistry.NewBaseProvider(loader, providerType, basePublisherOpts...) if err != nil { return nil, err } - return &CloudflareQueuesDestination{ + d := &CloudflareQueuesDestination{ BaseProvider: base, - }, nil + baseURL: cloudflareAPIBaseURL, + } + for _, opt := range opts { + opt(d) + } + return d, nil } // Validate validates the destination configuration. @@ -74,6 +91,7 @@ func (d *CloudflareQueuesDestination) CreatePublisher(ctx context.Context, desti return &CloudflareQueuesPublisher{ BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), httpClient: httpClient, + baseURL: d.baseURL, accountID: cfg.AccountID, queueID: cfg.QueueID, apiToken: creds.APIToken, @@ -105,18 +123,21 @@ func (d *CloudflareQueuesDestination) resolveMetadata(ctx context.Context, desti }, nil } -// makeCloudflareQueuesDashboardURL constructs the Cloudflare dashboard URL for a queue. +// makeCloudflareQueuesDashboardURL returns the Cloudflare dashboard queues list +// for the account. CF's per-queue dashboard URL uses the queue *name* (not the +// ID we have in config), so we link to the list page rather than risk a 404. func makeCloudflareQueuesDashboardURL(accountID, queueID string) string { if accountID == "" || queueID == "" { return "" } - return fmt.Sprintf("https://dash.cloudflare.com/%s/queues/%s", accountID, queueID) + return fmt.Sprintf("https://dash.cloudflare.com/%s/workers/queues", accountID) } // CloudflareQueuesPublisher handles publishing events to Cloudflare Queues. type CloudflareQueuesPublisher struct { *destregistry.BasePublisher httpClient *http.Client + baseURL string accountID string queueID string apiToken string @@ -128,11 +149,6 @@ func (p *CloudflareQueuesPublisher) Close() error { return nil } -// SetHTTPClient allows setting a custom HTTP client, primarily for testing purposes. -func (p *CloudflareQueuesPublisher) SetHTTPClient(client *http.Client) { - p.httpClient = client -} - // cloudflareMessageRequest is the body for POST /accounts/{id}/queues/{id}/messages // (single-message push). See https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/ type cloudflareMessageRequest struct { @@ -189,7 +205,7 @@ func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Ev return nil, fmt.Errorf("failed to marshal request payload: %w", err) } - url := fmt.Sprintf("%s/accounts/%s/queues/%s/messages", cloudflareAPIBaseURL, p.accountID, p.queueID) + url := fmt.Sprintf("%s/accounts/%s/queues/%s/messages", p.baseURL, p.accountID, p.queueID) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(payloadBytes)) if err != nil { diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go index 8edacd276..39c61089a 100644 --- a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go @@ -56,7 +56,11 @@ func TestCloudflareAPIResponse_DecodesDocumentedShape(t *testing.T) { func newPublisher(t *testing.T, serverURL string) *destcfqueues.CloudflareQueuesPublisher { t.Helper() - provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + var opts []destcfqueues.Option + if serverURL != "" { + opts = append(opts, destcfqueues.WithBaseURL(serverURL)) + } + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil, opts...) require.NoError(t, err) destination := testutil.DestinationFactory.Any( testutil.DestinationFactory.WithType("cloudflare_queues"), @@ -70,11 +74,7 @@ func newPublisher(t *testing.T, serverURL string) *destcfqueues.CloudflareQueues ) publisher, err := provider.CreatePublisher(context.Background(), &destination) require.NoError(t, err) - cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) - if serverURL != "" { - cfPublisher.SetHTTPClient(&http.Client{Transport: &testTransport{serverURL: serverURL}}) - } - return cfPublisher + return publisher.(*destcfqueues.CloudflareQueuesPublisher) } func TestCloudflareQueuesPublisher_Format(t *testing.T) { @@ -263,17 +263,3 @@ func TestCloudflareQueuesPublisher_Publish_APIError(t *testing.T) { } } -// testTransport redirects requests to the test server while preserving the path. -type testTransport struct { - serverURL string -} - -func (t *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { - newURL := t.serverURL + req.URL.Path - newReq, err := http.NewRequestWithContext(req.Context(), req.Method, newURL, req.Body) - if err != nil { - return nil, err - } - newReq.Header = req.Header - return http.DefaultTransport.RoundTrip(newReq) -} diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go index f05c5708d..32473eb52 100644 --- a/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go @@ -99,7 +99,7 @@ func TestCloudflareQueuesDestination_ComputeTarget(t *testing.T) { ) target := cloudflareQueuesDestination.ComputeTarget(&destination) assert.Equal(t, "my-queue-456", target.Target) - assert.Equal(t, "https://dash.cloudflare.com/my-account-123/queues/my-queue-456", target.TargetURL) + assert.Equal(t, "https://dash.cloudflare.com/my-account-123/workers/queues", target.TargetURL) }) t.Run("should return empty target URL when account_id is missing", func(t *testing.T) { From af424149d364c1c9a80e1b353f3a30dd89b218cf Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 14 May 2026 01:59:22 +0700 Subject: [PATCH 4/5] chore(cloudflare_queues): polish spec examples and instructions - OpenAPI: use realistic 32-char hex examples for account_id/queue_id, align target_url example with the new dashboard-list-page format - metadata.json: clarify that account_id/queue_id are hex IDs, not names - instructions.md: replace the camelCase 'contentType' note with the actual API payload shape (body wrapper + content_type:json) and link to the correct CF docs page --- docs/apis/openapi.yaml | 23 ++++++++++--------- .../cloudflare_queues/instructions.md | 17 ++++++++++---- .../providers/cloudflare_queues/metadata.json | 4 ++-- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index e2cbc0601..d4dad8ab9 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -331,19 +331,20 @@ components: properties: account_id: type: string - description: Cloudflare Account ID - example: "abc123def456789" + description: Cloudflare Account ID (32-character hex string). + example: "023e105f4ecef8ad9ca31a8372d0c353" queue_id: type: string - description: Cloudflare Queue ID - example: "my-queue" + description: Cloudflare Queue ID (32-character hex string, not the queue name). + example: "9d7d4cf8a3a14d9aaeb50c3e74e2f4b1" CloudflareQueuesCredentials: type: object required: [api_token] properties: api_token: type: string - description: Cloudflare API Token with queues:write permission + description: Cloudflare API Token with the `queues:write` permission scoped to the target account. + example: "v1.0-1234567890abcdef..." RabbitMQConfig: type: object required: [server_url, exchange] @@ -1373,14 +1374,14 @@ components: type: string description: A human-readable representation of the destination target (Cloudflare Queue ID). Read-only. readOnly: true - example: "my-queue" + example: "9d7d4cf8a3a14d9aaeb50c3e74e2f4b1" target_url: type: string format: url nullable: true - description: A URL link to the destination target (Cloudflare Dashboard link to the queue). Read-only. + description: A URL link to the Cloudflare dashboard queues list for the account. Read-only. readOnly: true - example: "https://dash.cloudflare.com/abc123def456789/queues/my-queue" + example: "https://dash.cloudflare.com/023e105f4ecef8ad9ca31a8372d0c353/workers/queues" example: id: "des_cf_queues_123" type: "cloudflare_queues" @@ -1389,10 +1390,10 @@ components: created_at: "2024-03-10T14:30:00Z" updated_at: "2024-03-10T14:30:00Z" config: - account_id: "abc123def456789" - queue_id: "my-queue" + account_id: "023e105f4ecef8ad9ca31a8372d0c353" + queue_id: "9d7d4cf8a3a14d9aaeb50c3e74e2f4b1" credentials: - api_token: "cf_token_..." + api_token: "v1.0-1234567890abcdef..." # Polymorphic Destination Schema (for Responses) Destination: diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md index 0fce38722..1769583ae 100644 --- a/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md +++ b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md @@ -98,12 +98,19 @@ When configuring your Cloudflare Queues destination, you'll need: ## Message Format -When events are sent to Cloudflare Queues, each message contains: +Each event is published as a single Cloudflare Queue message with the following request body: -- **body**: The event payload as a JSON object -- **contentType**: Set to `application/json` +```json +{ + "body": { + "data": , + "metadata": + }, + "content_type": "json" +} +``` -Messages are sent using the [Cloudflare Queues REST API](https://developers.cloudflare.com/api/operations/queue-send-messages). +`content_type: "json"` tells Cloudflare to deliver the body as a parsed object to consumer Workers. Messages are sent via [`POST /accounts/{account_id}/queues/{queue_id}/messages`](https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/). ## Testing the Integration @@ -170,7 +177,7 @@ Cloudflare Queues has rate limits. If you encounter rate limiting: ## Additional Resources - [Cloudflare Queues Documentation](https://developers.cloudflare.com/queues/) -- [Queues REST API Reference](https://developers.cloudflare.com/api/operations/queue-send-messages) +- [Queues REST API Reference](https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/push/) - [Cloudflare API Tokens](https://developers.cloudflare.com/fundamentals/api/get-started/create-token/) - [Wrangler CLI Documentation](https://developers.cloudflare.com/workers/wrangler/) - [Queues Pricing](https://developers.cloudflare.com/queues/platform/pricing/) diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json index fe4e20ab1..1bcf794ed 100644 --- a/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json +++ b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json @@ -5,14 +5,14 @@ "key": "account_id", "type": "text", "label": "Account ID", - "description": "Your Cloudflare Account ID", + "description": "Your Cloudflare Account ID (32-character hex string, found in the Cloudflare dashboard).", "required": true }, { "key": "queue_id", "type": "text", "label": "Queue ID", - "description": "The ID of your Cloudflare Queue", + "description": "The ID of your Cloudflare Queue (32-character hex string, not the queue name).", "required": true } ], From 8e4a35049b68386e4006a38cd07ddbdace86e317 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 21 May 2026 20:29:36 +0700 Subject: [PATCH 5/5] chore: drop SDK contract test for Cloudflare Queues The SDK contract tests can't run until SDKs are regenerated from the updated OpenAPI, and Kafka shipped without one (#779). Defer this file + factory entry to the follow-up SDK regen PR where the generated SDK shape will be known. --- .../factories/destination.factory.ts | 17 - .../destinations/cloudflare-queues.test.ts | 440 ------------------ 2 files changed, 457 deletions(-) delete mode 100644 spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts diff --git a/spec-sdk-tests/factories/destination.factory.ts b/spec-sdk-tests/factories/destination.factory.ts index a2e054c89..01bc800f9 100644 --- a/spec-sdk-tests/factories/destination.factory.ts +++ b/spec-sdk-tests/factories/destination.factory.ts @@ -7,7 +7,6 @@ import type { DestinationCreateAzureServiceBus, DestinationCreateAwss3, DestinationCreateGCPPubSub, - DestinationCreateCloudflareQueues, } from '../../sdks/outpost-typescript/dist/commonjs/models/components/index'; export function createWebhookDestination( @@ -145,19 +144,3 @@ export function createGcpPubSubDestination( }; } -export function createCloudflareQueuesDestination( - overrides?: Partial -): DestinationCreateCloudflareQueues { - return { - type: 'cloudflare_queues', - topics: ['*'], - config: { - accountId: 'abc123def456', - queueId: 'my-queue-id', - }, - credentials: { - apiToken: 'cf-api-token-example', - }, - ...overrides, - }; -} diff --git a/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts b/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts deleted file mode 100644 index 2ecc30ead..000000000 --- a/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts +++ /dev/null @@ -1,440 +0,0 @@ -import { describe, it, before, after } from 'mocha'; -import { expect } from 'chai'; -import { SdkClient, createSdkClient } from '../../utils/sdk-client'; -import { createCloudflareQueuesDestination } from '../../factories/destination.factory'; -/* eslint-disable no-console */ -/* eslint-disable no-undef */ - -// Get configured test topics from environment (required) -if (!process.env.TEST_TOPICS) { - throw new Error('TEST_TOPICS environment variable is required. Please set it in .env file.'); -} -const TEST_TOPICS = process.env.TEST_TOPICS.split(',').map((t) => t.trim()); - -describe('Cloudflare Queues Destinations - Contract Tests (SDK-based validation)', () => { - let client: SdkClient; - - before(async () => { - client = createSdkClient(); - - // Create tenant if it doesn't exist (idempotent operation) - try { - await client.upsertTenant(); - } catch (error) { - console.warn('Failed to create tenant (may already exist):', error); - } - }); - - after(async () => { - // Cleanup: delete all destinations for the test tenant - try { - const destinations = await client.listDestinations(); - console.log(`Cleaning up ${destinations.length} destinations...`); - - for (const destination of destinations) { - try { - await client.deleteDestination(destination.id); - console.log(`Deleted destination: ${destination.id}`); - } catch (error) { - console.warn(`Failed to delete destination ${destination.id}:`, error); - } - } - - console.log('All destinations cleaned up'); - } catch (error) { - console.warn('Failed to list destinations for cleanup:', error); - } - - // Cleanup: delete the test tenant - try { - await client.deleteTenant(); - console.log('Test tenant deleted'); - } catch (error) { - console.warn('Failed to delete tenant:', error); - } - }); - - describe('POST /api/v1/tenants/{tenant_id}/destinations - Create Cloudflare Queues Destination', () => { - it('should create a Cloudflare Queues destination with valid config', async () => { - const destinationData = createCloudflareQueuesDestination(); - const destination = await client.createDestination(destinationData); - - expect(destination.type).to.equal('cloudflare_queues'); - expect(destination.config.accountId).to.equal(destinationData.config.accountId); - expect(destination.config.queueId).to.equal(destinationData.config.queueId); - }); - - it('should create a Cloudflare Queues destination with array of topics', async () => { - const destinationData = createCloudflareQueuesDestination({ - topics: TEST_TOPICS, - }); - const destination = await client.createDestination(destinationData); - - expect(destination.topics).to.have.lengthOf(TEST_TOPICS.length); - TEST_TOPICS.forEach((topic) => { - expect(destination.topics).to.include(topic); - }); - - // Cleanup - await client.deleteDestination(destination.id); - }); - - it('should create destination with user-provided ID', async () => { - const customId = `custom-cf-queues-${Date.now()}`; - const destinationData = createCloudflareQueuesDestination({ - id: customId, - }); - const destination = await client.createDestination(destinationData); - - expect(destination.id).to.equal(customId); - - // Cleanup - await client.deleteDestination(destination.id); - }); - - it('should reject creation with missing required config field: account_id', async () => { - let errorThrown = false; - try { - await client.createDestination({ - type: 'cloudflare_queues', - topics: '*', - config: { - // Missing accountId - queueId: 'my-queue-id', - }, - credentials: { - apiToken: 'cf-api-token-example', - }, - } as any); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.be.oneOf([400, 422]); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - - it('should reject creation with missing required config field: queue_id', async () => { - let errorThrown = false; - try { - await client.createDestination({ - type: 'cloudflare_queues', - topics: '*', - config: { - accountId: 'abc123def456', - // Missing queueId - }, - credentials: { - apiToken: 'cf-api-token-example', - }, - } as any); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.be.oneOf([400, 422]); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - - it('should reject creation with missing required credential field: api_token', async () => { - let errorThrown = false; - try { - await client.createDestination({ - type: 'cloudflare_queues', - topics: '*', - config: { - accountId: 'abc123def456', - queueId: 'my-queue-id', - }, - credentials: { - // Missing apiToken - }, - } as any); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.be.oneOf([400, 422]); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - - it('should reject creation with missing credentials', async () => { - let errorThrown = false; - try { - await client.createDestination({ - type: 'cloudflare_queues', - topics: '*', - config: { - accountId: 'abc123def456', - queueId: 'my-queue-id', - }, - // Missing credentials - } as any); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.be.oneOf([400, 422]); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - - it('should reject creation with missing type field', async () => { - let errorThrown = false; - try { - await client.createDestination({ - topics: '*', - config: { - accountId: 'abc123def456', - queueId: 'my-queue-id', - }, - credentials: { - apiToken: 'cf-api-token-example', - }, - } as any); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.be.oneOf([400, 422]); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - - it('should reject creation with empty topics', async () => { - let errorThrown = false; - try { - const destinationData = createCloudflareQueuesDestination({ - topics: [], - }); - await client.createDestination(destinationData); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.be.oneOf([400, 422]); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - }); - - describe('GET /api/v1/tenants/{tenant_id}/destinations/{id} - Retrieve Cloudflare Queues Destination', () => { - let destinationId: string; - - before(async () => { - const destinationData = createCloudflareQueuesDestination(); - const destination = await client.createDestination(destinationData); - destinationId = destination.id; - }); - - after(async () => { - try { - await client.deleteDestination(destinationId); - } catch (error) { - console.warn('Failed to cleanup destination:', error); - } - }); - - it('should retrieve an existing Cloudflare Queues destination', async () => { - const destination = await client.getDestination(destinationId); - - expect(destination.id).to.equal(destinationId); - expect(destination.type).to.equal('cloudflare_queues'); - expect(destination.config.accountId).to.exist; - expect(destination.config.queueId).to.exist; - }); - - it('should return 404 for non-existent destination', async () => { - let errorThrown = false; - try { - await client.getDestination('non-existent-id-12345'); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.equal(404); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - }); - - describe('GET /api/v1/tenants/{tenant_id}/destinations - List Cloudflare Queues Destinations', () => { - before(async () => { - // Create multiple Cloudflare Queues destinations for listing - await client.createDestination(createCloudflareQueuesDestination()); - await client.createDestination( - createCloudflareQueuesDestination({ - topics: [TEST_TOPICS[0]], - config: { - accountId: 'abc123def456', - queueId: 'my-queue-2', - }, - }) - ); - }); - - it('should list all destinations', async () => { - const destinations = await client.listDestinations(); - - expect(destinations.length).to.be.greaterThan(0); - }); - - it('should filter destinations by type', async () => { - const destinations = await client.listDestinations({ type: 'cloudflare_queues' }); - - destinations.forEach((dest) => { - expect(dest.type).to.equal('cloudflare_queues'); - }); - }); - }); - - describe('PATCH /api/v1/tenants/{tenant_id}/destinations/{id} - Update Cloudflare Queues Destination', () => { - let destinationId: string; - - before(async () => { - const destinationData = createCloudflareQueuesDestination(); - const destination = await client.createDestination(destinationData); - destinationId = destination.id; - }); - - after(async () => { - try { - await client.deleteDestination(destinationId); - } catch (error) { - console.warn('Failed to cleanup destination:', error); - } - }); - - it('should update destination topics', async () => { - const updated = await client.updateDestination(destinationId, { - topics: ['user.created', 'user.updated'], - }); - - expect(updated.id).to.equal(destinationId); - expect(updated.type).to.equal('cloudflare_queues'); - expect(updated.topics).to.include('user.created'); - expect(updated.topics).to.include('user.updated'); - }); - - it('should update destination config', async () => { - const updated = await client.updateDestination(destinationId, { - config: { - accountId: 'updated-account-id', - queueId: 'updated-queue-id', - }, - }); - - expect(updated.id).to.equal(destinationId); - expect(updated.config).to.exist; - if (updated.config) { - expect(updated.config.accountId).to.equal('updated-account-id'); - expect(updated.config.queueId).to.equal('updated-queue-id'); - } - }); - - it('should update destination credentials', async () => { - const updated = await client.updateDestination(destinationId, { - credentials: { - apiToken: 'updated-api-token', - }, - }); - - expect(updated.id).to.equal(destinationId); - }); - - it('should return 404 for updating non-existent destination', async () => { - let errorThrown = false; - try { - await client.updateDestination('non-existent-id-12345', { - topics: ['test'], - }); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.equal(404); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - }); - - describe('DELETE /api/v1/tenants/{tenant_id}/destinations/{id} - Delete Cloudflare Queues Destination', () => { - it('should delete an existing destination', async () => { - const destinationData = createCloudflareQueuesDestination(); - const destination = await client.createDestination(destinationData); - - await client.deleteDestination(destination.id); - - // Verify deletion by trying to get the destination - let errorThrown = false; - try { - await client.getDestination(destination.id); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - } - expect(errorThrown).to.be.true; - }); - - it('should return 404 for deleting non-existent destination', async () => { - let errorThrown = false; - try { - await client.deleteDestination('non-existent-id-12345'); - } catch (error: any) { - errorThrown = true; - expect(error).to.exist; - if (error.response) { - expect(error.response.status).to.equal(404); - } else { - expect(error.message).to.exist; - } - } - if (!errorThrown) { - expect.fail('Should have thrown an error'); - } - }); - }); -});