From 5c0feb7e69f2223433fa6304d8b988ee463cc39a Mon Sep 17 00:00:00 2001 From: Srikanth Date: Thu, 20 Nov 2025 19:08:35 +0530 Subject: [PATCH 1/2] Refine README content. Updated README to improve clarity and add architecture suggestions with docker. --- README.md | 180 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 116 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 5ec081e..becea43 100644 --- a/README.md +++ b/README.md @@ -1,116 +1,84 @@ # Taskiq + FastAPI -This repository has a code to integrate FastAPI with taskiq easily. +This repository provides code to integrate FastAPI with Taskiq easily. -Taskiq and FastAPI both have dependencies and this library makes it possible to depend on -`fastapi.Request` or `starlette.requests.HTTPConnection` in taskiq tasks. +Taskiq and FastAPI both use dependency injection, but they function differently. This library bridges the gap, allowing you to depend on `fastapi.Request` or `starlette.requests.HTTPConnection` inside your Taskiq tasks. -With this library you can easily re-use your fastapi dependencies in taskiq functions. +With this library, you can easily re-use your FastAPI dependencies (like database pools or config loaders) inside your Taskiq background functions. ## How does it work? -It adds startup functions to broker so it imports your fastapi application -and creates a single worker-wide Request and HTTPConnection objects that you depend on. +### 1. Process Separation +Taskiq tasks usually run in a separate **Worker process**, not inside your FastAPI web server process. (_It does not run within event loop_) -THIS REQUEST IS NOT RELATED TO THE ACTUAL REQUESTS IN FASTAPI! -This request won't have actual data about the request you were handling while sending task. +### 2. Context Bridging +When the Worker starts, this library initializes your FastAPI application in the background. -## Usage +### 3. Dependency Injection +It creates a **dummy Request object** within the Worker. This allows functions that need FastAPI request context (like accessing `app.state`) to work identically in both the Web App and the Background Worker. -Here we have an example of function that is being used by both taskiq's task and -fastapi's handler function. +> **Note:** The injected Request object in a task is **NOT** the original HTTP request from the user. It is a simulated request context solely for accessing application state. -I have a script called `test_script.py` so my app can be found at `test_script:app`. -We use strings to resolve application to bypass circular imports. +--- -Also, as you can see, we use `TaskiqDepends` for Request. That's because -taskiq dependency resolver must know that this type must be injected. FastAPI disallow -Depends for Request type. That's why we use `TaskiqDepends`. +## Usage Example + +### File: `test_script.py` ```python -from fastapi import FastAPI, Request +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request, Depends as FastAPIDepends from pydantic import BaseModel from redis.asyncio import ConnectionPool, Redis -from fastapi import Depends as FastAPIDepends -from taskiq import TaskiqDepends +from taskiq import TaskiqDepends, ZeroMQBroker import taskiq_fastapi -from taskiq import ZeroMQBroker broker = ZeroMQBroker() -app = FastAPI() - +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Creating redis pool") + app.state.redis_pool = ConnectionPool.from_url("redis://localhost") -@app.on_event("startup") -async def app_startup(): - ##################### - # IMPORTANT NOTE # - ##################### - # If you won't check that this is not - # a worker process, you'll - # create an infinite recursion. Because in worker processes - # fastapi startup will be called. if not broker.is_worker_process: - print("Starting broker") + print("Starting broker client") await broker.startup() - print("Creating redis pool") - app.state.redis_pool = ConnectionPool.from_url("redis://localhost") + yield -@app.on_event("shutdown") -async def app_shutdown(): - ##################### - # IMPORTANT NOTE # - ##################### - # If you won't check that this is not - # a worker process, you'll - # create an infinite recursion. Because in worker processes - # fastapi startup will be called. if not broker.is_worker_process: - print("Shutting down broker") + print("Shutting down broker client") await broker.shutdown() + print("Stopping redis pool") await app.state.redis_pool.disconnect() +app = FastAPI(lifespan=lifespan) -# Here we call our magic function. taskiq_fastapi.init(broker, "test_script:app") - -# We use TaskiqDepends here, because if we use FastAPIDepends fastapi -# initialization will fail. def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: return request.app.state.redis_pool - @broker.task async def my_redis_task( key: str, val: str, - # Here we depend using TaskiqDepends. - # Please use TaskiqDepends for all tasks to be resolved correctly. - # Or dependencies won't be injected. pool: ConnectionPool = TaskiqDepends(get_redis_pool), ): async with Redis(connection_pool=pool) as redis: await redis.set(key, val) print("Value set.") - class MyVal(BaseModel): key: str val: str - @app.post("/val") async def setval_endpoint(val: MyVal) -> None: - await my_redis_task.kiq( - key=val.key, - val=val.val, - ) + await my_redis_task.kiq(key=val.key, val=val.val) print("Task sent") - @app.get("/val") async def getval_endpoint( key: str, @@ -118,21 +86,105 @@ async def getval_endpoint( ) -> str: async with Redis(connection_pool=pool, decode_responses=True) as redis: return await redis.get(key) - ``` -## Manually update dependency context +--- + +## Key Takeaways + +### `if not broker.is_worker_process` + +- **True for FastAPI server (uvicorn)** → lifespan starts broker client so tasks can be sent. +- **False for Taskiq worker** → worker manages its own broker connection; lifespan still initializes dependencies. + +Prevents double-starting the broker. + +### `TaskiqDepends` vs `FastAPIDepends` + +| Purpose | Use | +|--------|------| +| Inside Taskiq tasks | `TaskiqDepends` | +| Inside HTTP routes | `FastAPIDepends` | + +--- -When using `InMemoryBroker` it may be required to update the dependency context manually. This may also be useful when setting up tests. +## Manual Dependency Context Update (InMemoryBroker) -```py +When using `InMemoryBroker` (often used for unit testing) it may be required to update the dependency context manually, as there is no separate worker process to trigger the initialization. + +```python import taskiq_fastapi from taskiq import InMemoryBroker +from fastapi import FastAPI broker = InMemoryBroker() - app = FastAPI() taskiq_fastapi.init(broker, "test_script:app") taskiq_fastapi.populate_dependency_context(broker, app) ``` + +--- + +## Deployment with Docker (Single Artifact Pattern) + +The best way to deploy this system is to use the Single Artifact pattern. You build one Docker image that contains your entire codebase, and you run it with different commands to start the API or the Worker. + +### 1. Dockerfile + +This Dockerfile copies your code and installs dependencies. It does not specify a `CMD` because we will set that in `docker-compose.yml`. + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . +``` + +### 2. docker-compose.yml + +Notice how api and worker use the same build: . context but run different commands. + +```yaml +version: "3.8" + +services: + api: + build: . + container_name: fastapi_app + command: uvicorn test_script:app --host 0.0.0.0 --port 8000 + ports: + - "8000:8000" + environment: + - NATS_URL=nats://nats:4222 + depends_on: + - nats + + worker: + build: . + container_name: taskiq_worker + command: taskiq worker test_script:broker + environment: + - NATS_URL=nats://nats:4222 + depends_on: + - nats + + nats: + image: nats:latest + ports: + - "4222:4222" +``` + +### Run the stack + +```bash +docker-compose up --build +``` + +You may also run it individually using `docker run` or integrate into the kubernetes environment. + +--- From 9b175896e7495d090fcdf4946f1e3f5e8754cb78 Mon Sep 17 00:00:00 2001 From: Srikanth Date: Thu, 20 Nov 2025 19:14:51 +0530 Subject: [PATCH 2/2] Re-add important comments Added back important notes regarding worker processes and TaskiqDepends usage. --- README.md | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index becea43..fcad595 100644 --- a/README.md +++ b/README.md @@ -39,13 +39,23 @@ broker = ZeroMQBroker() async def lifespan(app: FastAPI): print("Creating redis pool") app.state.redis_pool = ConnectionPool.from_url("redis://localhost") - + ##################### + # IMPORTANT NOTE # + ##################### + # If you won't check that this is not + # a worker process, you'll + # create an infinite recursion. Because in worker processes + # fastapi startup will be called. if not broker.is_worker_process: print("Starting broker client") await broker.startup() - yield + yield # waits for shutdown + ##################### + # IMPORTANT NOTE # + ##################### + # Same as above if not broker.is_worker_process: print("Shutting down broker client") await broker.shutdown() @@ -55,8 +65,11 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) +# Here we call our magic function. taskiq_fastapi.init(broker, "test_script:app") +# We use TaskiqDepends here, because if we use FastAPIDepends fastapi +# initialization will fail. def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: return request.app.state.redis_pool @@ -64,6 +77,9 @@ def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: async def my_redis_task( key: str, val: str, + # Here we depend using TaskiqDepends. + # Please use TaskiqDepends for all tasks to be resolved correctly. + # Or dependencies won't be injected. pool: ConnectionPool = TaskiqDepends(get_redis_pool), ): async with Redis(connection_pool=pool) as redis: