-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.py
More file actions
281 lines (225 loc) · 9.43 KB
/
index.py
File metadata and controls
281 lines (225 loc) · 9.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
"""
Smart Inventory Hub - Lambda Function
Hybrid Kro Compositions Blog Sample
This Lambda function provides a REST API for managing product inventory.
It supports CRUD operations with DynamoDB backend.
Endpoints:
- GET /products - List all products (supports filtering)
- POST /products - Create a new product
- GET /products/{id} - Get a specific product
- PUT /products/{id} - Update a product
- DELETE /products/{id} - Delete a product
Environment Variables:
- TABLE_NAME: DynamoDB table name (set by RGD)
"""
import json
import boto3
import os
from decimal import Decimal
from datetime import datetime
import uuid
# Initialize DynamoDB resource
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TABLE_NAME"])
class DecimalEncoder(json.JSONEncoder):
"""Helper to convert Decimal to float for JSON serialization."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super(DecimalEncoder, self).default(obj)
def get_user_claims(event):
"""
Extract authenticated user claims from the API Gateway JWT authorizer.
API Gateway validates the JWT before invoking Lambda — no need to
re-validate here. The decoded claims are available in the event context.
"""
authorizer = event.get("requestContext", {}).get("authorizer", {})
jwt_claims = authorizer.get("jwt", {}).get("claims", {})
return {
"sub": jwt_claims.get("sub", "anonymous"),
"email": jwt_claims.get("email", "unknown"),
"groups": jwt_claims.get("cognito:groups", ""),
}
def handler(event, context):
"""
Lambda handler for Smart Inventory Hub API.
JWT validation is handled by API Gateway's JWT Authorizer (Cognito).
This function extracts user claims from the pre-validated token for
audit logging and ownership tracking.
Supports: GET /products, POST /products, GET /products/{id},
PUT /products/{id}, DELETE /products/{id}
"""
try:
# Extract authenticated user info from JWT claims
user = get_user_claims(event)
print(f"Request from user: {user['email']} (sub: {user['sub']})")
# Extract HTTP method and path
http_method = event.get("requestContext", {}).get("http", {}).get("method", "GET")
raw_path = event.get("requestContext", {}).get("http", {}).get("path", "/")
# Remove stage from path if present (e.g., /dev/products -> /products)
path_parts = raw_path.split("/")
if len(path_parts) > 1 and path_parts[1] in ["dev", "staging", "prod", "$default"]:
path = "/" + "/".join(path_parts[2:]) if len(path_parts) > 2 else "/"
else:
path = raw_path
# GET /products - List all products with optional filters
if http_method == "GET" and path == "/products":
return handle_list_products(event)
# POST /products - Create new product
elif http_method == "POST" and path == "/products":
return handle_create_product(event, user)
# GET /products/{id} - Get specific product
elif http_method == "GET" and path.startswith("/products/"):
product_id = path.split("/")[-1]
return handle_get_product(product_id)
# PUT /products/{id} - Update product
elif http_method == "PUT" and path.startswith("/products/"):
product_id = path.split("/")[-1]
return handle_update_product(product_id, event, user)
# DELETE /products/{id} - Delete product
elif http_method == "DELETE" and path.startswith("/products/"):
product_id = path.split("/")[-1]
return handle_delete_product(product_id, user)
# Unknown route
else:
return error_response(404, "Not found", {"path": path, "method": http_method})
except json.JSONDecodeError:
return error_response(400, "Invalid JSON in request body")
except Exception as e:
print(f"Error: {str(e)}")
return error_response(500, "Internal server error", {"message": str(e)})
def handle_list_products(event):
"""Handle GET /products - List all products with optional filters."""
query_params = event.get("queryStringParameters", {}) or {}
category = query_params.get("category")
low_stock = query_params.get("lowStock")
if category:
# Query by category using GSI (with pagination)
products = []
response = table.query(
IndexName="category-index",
KeyConditionExpression="category = :cat",
ExpressionAttributeValues={":cat": category}
)
products.extend(response.get("Items", []))
while "LastEvaluatedKey" in response:
response = table.query(
IndexName="category-index",
KeyConditionExpression="category = :cat",
ExpressionAttributeValues={":cat": category},
ExclusiveStartKey=response["LastEvaluatedKey"]
)
products.extend(response.get("Items", []))
else:
# Scan all products (with pagination)
products = []
response = table.scan()
products.extend(response.get("Items", []))
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
products.extend(response.get("Items", []))
# Filter by low stock if requested
if low_stock == "true":
products = [p for p in products if p.get("stock", 0) <= p.get("minStock", 0)]
# Calculate summary statistics
total_value = sum(
p.get("price", Decimal(0)) * p.get("stock", p.get("quantity", 0))
for p in products
)
low_stock_count = sum(
1 for p in products
if p.get("stock", p.get("quantity", 0)) <= p.get("minStock", p.get("reorderLevel", 0))
)
return success_response({
"products": products,
"count": len(products),
"summary": {
"totalValue": total_value,
"lowStockCount": low_stock_count
}
})
def handle_create_product(event, user):
"""Handle POST /products - Create a new product."""
body = json.loads(event.get("body", "{}"))
# Validate required fields
required_fields = ["sku", "name", "category", "price", "stock"]
missing_fields = [f for f in required_fields if f not in body]
if missing_fields:
return error_response(400, f"Missing required fields: {', '.join(missing_fields)}")
# Generate ID and add metadata
product_id = body.get("id", str(uuid.uuid4()))
body["id"] = product_id
body["createdAt"] = datetime.utcnow().isoformat()
body["updatedAt"] = datetime.utcnow().isoformat()
body["createdBy"] = user["email"]
body["updatedBy"] = user["email"]
body["minStock"] = body.get("minStock", 5)
# Convert price to Decimal for DynamoDB
if "price" in body:
body["price"] = Decimal(str(body["price"]))
# Convert stock to int
if "stock" in body:
body["stock"] = int(body["stock"])
# Store product
table.put_item(Item=body)
return success_response({
"message": "Product created successfully",
"product": body
}, 201)
def handle_get_product(product_id):
"""Handle GET /products/{id} - Get a specific product."""
response = table.get_item(Key={"id": product_id})
product = response.get("Item")
if product:
return success_response({"product": product})
else:
return error_response(404, "Product not found")
def handle_update_product(product_id, event, user):
"""Handle PUT /products/{id} - Update a product."""
body = json.loads(event.get("body", "{}"))
# Get existing product
response = table.get_item(Key={"id": product_id})
if "Item" not in response:
return error_response(404, "Product not found")
# Update product
body["id"] = product_id
body["updatedAt"] = datetime.utcnow().isoformat()
body["updatedBy"] = user["email"]
body["createdAt"] = response["Item"].get("createdAt", datetime.utcnow().isoformat())
body["createdBy"] = response["Item"].get("createdBy", "unknown")
# Convert price to Decimal for DynamoDB
if "price" in body:
body["price"] = Decimal(str(body["price"]))
# Convert stock to int
if "stock" in body:
body["stock"] = int(body["stock"])
table.put_item(Item=body)
return success_response({
"message": "Product updated successfully",
"product": body
})
def handle_delete_product(product_id, user):
"""Handle DELETE /products/{id} - Delete a product."""
print(f"Product {product_id} deleted by {user['email']}")
table.delete_item(Key={"id": product_id})
return success_response({"message": "Product deleted successfully"})
def success_response(data, status_code=200):
"""Return a successful response."""
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
},
"body": json.dumps(data, cls=DecimalEncoder)
}
def error_response(status_code, message, details=None):
"""Return an error response."""
body = {"error": message}
if details:
body.update(details)
return {
"statusCode": status_code,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body)
}