Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,27 @@ Temporary Items
.key
.crt
.csr
.pem
.pem

# Go build
**/bin/
**/vendor/

# Editor
*.swp
*.swo
.idea/
.vscode/
.DS_Store

# Debug
*.log

# Local Terraform variable overrides
**/.terraform.lock.hcl
**/terraform_local.tfvars
**/.terraform*

*.tfstate
*.tfstate.backup
**/__pycache__/
184 changes: 184 additions & 0 deletions api-gateway-function-queue-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# API Gateway Function Queue Async

This project demonstrates an asynchronous messaging pattern using Oracle Cloud Infrastructure (OCI) services:

- **API Gateway**: Receives HTTP requests and routes them to functions
- **OCI Functions**: Serverless functions for processing requests
- **OCI Queue**: Asynchronous message queue for decoupling services

## Architecture

The application follows an asynchronous messaging pattern:

1. HTTP request arrives at API Gateway → `/order` endpoint
2. API Gateway routes to **place-order OCI Function**
3. Function validates and enqueues order to **OCI Queue**
4. Function returns immediately (async response)
5. **process-order Container Instance** continuously polls the queue
6. Container Instance processes messages and inserts into **NoSQL Database**

```
HTTP Request → API Gateway → place-order Function → Queue → process-order Container Instance → NoSQL Table
(Async Processing)
```

## Components

### API Gateway
- Receives HTTP requests at `/order` endpoint
- Routes requests to the `place-order` function

### Functions
- **place-order**: OCI Function that receives order requests via API Gateway and enqueues them to the queue
- **process-order**: Container Instance (not an OCI Function) that continuously polls the OCI Queue and processes orders by inserting them into the NoSQL table

### Queue
- **OrderQueue**: Asynchronous message queue for order processing
- Provides reliable message delivery and processing

## Directory Structure

```
api-gateway-function-queue-async/
├── README.md # Project documentation
├── functions/ # Application components
│ ├── place-order/ # OCI Function for receiving orders
│ └── process-order/ # Container Instance for queue polling
└── terraform/ # Infrastructure as Code
└── modules/ # Reusable Terraform modules
├── apigateway/ # API Gateway module
├── container_repository/ # OCI Container Registry module
├── functions/ # OCI Functions module
├── queue/ # OCI Queue module
└── vcn/ # Virtual Cloud Network module
```

## Prerequisites

Before you begin, ensure you have the following:

- **OCI Account**: Active Oracle Cloud Infrastructure account with appropriate compartment access
- **Terraform**: Version 1.10.0 or higher installed on your local machine
- **Docker**: Docker Desktop or equivalent for building and pushing container images
- **OCI CLI**: Configured with your OCI credentials for authentication
- **Network**: A public subnet in your VPC for the API Gateway and Container Instance
- **IAM Permissions**: Your OCI user must have permissions to:
- Create API Gateways
- Create and manage OCI Functions
- Create and manage OCI Queues
- Create NoSQL tables
- Create Container Instances
- Create Container Registries
- Manage IAM policies and dynamic groups
- **Authentication**: OCI auth token for Docker Container Registry access

## Getting Started

1. **Configure OCI Variables**:

Update `terraform/terraform.tfvars` with your OCI configuration:
```hcl
region = "us-ashburn-1" # Your OCI region
compartment_ocid = "ocid1.compartment.oc1..." # Your compartment OCID
subnet_ocid = "ocid1.subnet.oc1..." # Your public subnet OCID
tenancy_ocid = "ocid1.tenancy.oc1..." # Your tenancy OCID
queue_name = "OrderQueue"
post_order_container_repository_name = "queue_async_repo"
process_order_container_repository_name = "queue_async_process_repo"
application_display_name = "queue_async_app"
nosql_table_name = "orders"
```

2. **Deploy Infrastructure**:
```bash
cd terraform
terraform init
terraform apply -var-file=terraform.tfvars
```

**Note**: After successful deployment, note the OCIR repository addresses from the Terraform outputs:
- `post_order_repository_path`: Container registry path for place-order function
- `process_order_repository_path`: Container registry path for process-order function

These will be used in the next step for building and pushing Docker images.

3. **Build and Deploy Place-Order Image**:
```bash
cd functions/place-order
docker build -t place-order:1 .
docker tag place-order:1 <post_order_repository_path>:1
docker push <post_order_repository_path>:1
```

4. **Build and Deploy Process-Order Image**:
```bash
cd ../process-order
docker build -t process-order:1 .
docker tag process-order:1 <process_order_repository_path>:1
docker push <process_order_repository_path>:1
```

5. **Update Terraform Variables with Image Tags**:

Update `terraform/terraform.tfvars` to add the function image URIs you just pushed:
```hcl
# Add these to your existing terraform.tfvars
functions = {
"place-order" = {
source_image = "<post_order_repository_path>:1"
path = "place-order"
config = {}
}
}
queue_poller_image = "<process_order_repository_path>:1"
```

6. **Re-apply Terraform Configuration**:

Apply the updated configuration with the new image tags:
```bash
cd terraform
terraform apply -var-file=terraform.tfvars
```

This will deploy the OCI Function and Container Instance with the container images you just built and pushed.

7. **Get API Gateway Endpoint**:

After the Terraform apply completes, retrieve the API Gateway endpoint URL:
```bash
terraform output api_gateway_endpoint
```

Save this URL for testing the API in the next step.

8. **Test the API**:
```bash
curl -X POST https://<api_gateway_endpoint>/order \
-H "Content-Type: application/json" \
-d '{
"data": {
"order_id": "ORD-001",
"customer_id": "CUST123",
"amount": 99.99
}
}'
```

9. **Verify Records in NoSQL Table**:

After submitting a few orders, verify that the records have been processed and inserted into the NoSQL table:
```bash
# Query the NoSQL table for all orders
oci nosql query execute --statement "SELECT * FROM order_info" --compartment-id <compartment_ocid> --region <region>
```

You should see the orders you submitted through the API in the results.

## Benefits

- **Scalability**: Queue decouples request handling from processing
- **Reliability**: Messages are persisted and can be retried
- **Performance**: API responds immediately while processing happens asynchronously
- **Monitoring**: Queue provides visibility into message processing
16 changes: 16 additions & 0 deletions api-gateway-function-queue-async/functions/place-order/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM fnproject/python:3.11-dev as build-stage
WORKDIR /function
ADD requirements.txt /function/

RUN pip3 install --target /python/ --no-cache --no-cache-dir -r requirements.txt &&\
rm -fr ~/.cache/pip /tmp* requirements.txt func.yaml Dockerfile .venv &&\
chmod -R o+r /python
ADD . /function/
RUN rm -fr /function/.pip_cache
FROM fnproject/python:3.11
WORKDIR /function
COPY --from=build-stage /python /python
COPY --from=build-stage /function /function
RUN chmod -R o+r /function
ENV PYTHONPATH=/function:/python
ENTRYPOINT ["/python/bin/fdk", "/function/func.py", "handler"]
103 changes: 103 additions & 0 deletions api-gateway-function-queue-async/functions/place-order/func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import io
import json
import oci
import logging
import base64
from io import BytesIO

from fdk import response

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def handler(ctx, data: str = None):
"""
OCI Function handler to post a message to an OCI Queue using Resource Principal.

Args:
ctx: Function context
data: Input data (JSON string)

Returns:
JSON response indicating success or failure
"""
try:
# Initialize Resource Principal signer
signer = oci.auth.signers.get_resource_principals_signer()

# Get region from function configuration or default to us-ashburn-1
region = ctx.Config().get("OCI_REGION", "us-ashburn-1")

# Set endpoint explicitly
endpoint = f"https://cell-1.queue.messaging.{region}.oci.oraclecloud.com"

# Initialize Queue client with the correct endpoint
config = {
"region": ctx.Config().get("QUEUE_OCID")
}

# Initialize Queue client
queue_client = oci.queue.QueueClient(config=config, signer=signer, service_endpoint= endpoint)

# Get queue OCID from function configuration
queue_ocid = ctx.Config().get("QUEUE_OCID")
if not queue_ocid:
raise ValueError("QUEUE_OCID not found in function configuration")

# Handle input data (string or BytesIO)
if data is None:
raise ValueError("No payload provided in POST body")

# If data is BytesIO (e.g., from fn invoke), read and decode it
if isinstance(data, BytesIO):
data = data.read().decode('utf-8')

# Parse the input as JSON
payload = json.loads(data)

# Validate payload structure
if not isinstance(payload, dict) or "data" not in payload:
raise ValueError("Invalid payload format: 'data' key is missing")
if not all(key in payload["data"] for key in ["order_id", "customer_id", "amount"]):
raise ValueError("Invalid payload: 'order_id', 'customer_id', and 'amount' are required in 'data'")

# Convert payload to JSON string and encode as base64
message_content = json.dumps(payload)

# Prepare message
message = {
"content": message_content,
"contentType": "application/json"
}

put_messages_details = oci.queue.models.PutMessagesDetails(
messages=[oci.queue.models.PutMessagesDetailsEntry(content=message["content"])]
)

response = queue_client.put_messages(
queue_id=queue_ocid,
put_messages_details=put_messages_details
)

# Check response
if response.status == 200:
logger.info(f"Successfully posted message to queue {queue_ocid}")
return {
"status": "success",
"messageId": response.data.messages[0].id
}
else:
logger.error(f"Failed to post message: {response.status}")
return {
"status": "error",
"message": f"Failed to post message: {response.status}"
}

except Exception as e:
logger.error(f"Error in handler: {str(e)}")
return {
"status": "error",
"message": str(e)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
schema_version: 20180708
name: my-python-function
version: 0.0.18
runtime: python
build_image: fnproject/python:3.11-dev
run_image: fnproject/python:3.11
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
fdk>=0.1.96
oci
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.9-slim
WORKDIR /app
COPY queue_poller.py .
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python", "queue_poller.py"]
Loading