-
Notifications
You must be signed in to change notification settings - Fork 3
[codex] Fix API concurrency by moving sync DB routes off the event loop #652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6b75324
8f0f313
b2941d6
e13b3f5
804e054
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -88,7 +88,7 @@ jobs: | |||||||||||||
| run: | | ||||||||||||||
| export MAX_INSTANCES="10" | ||||||||||||||
| export SERVICE_NAME="ocotillo-api-staging" | ||||||||||||||
| export ENTRYPOINT="gunicorn -w 1 -k uvicorn.workers.UvicornWorker main:app" | ||||||||||||||
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | ||||||||||||||
| export MIN_INSTANCES="0" | ||||||||||||||
|
Comment on lines
+91
to
92
|
||||||||||||||
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | |
| export MIN_INSTANCES="0" | |
| export ENTRYPOINT="gunicorn -w 2 -k uvicorn.workers.UvicornWorker main:app" | |
| export MIN_INSTANCES="0" | |
| export DB_POOL_SIZE="2" | |
| export DB_MAX_OVERFLOW="0" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| name: CD (testing) | ||
|
|
||
| on: | ||
| push: | ||
|
|
||
| permissions: | ||
| contents: write | ||
|
|
||
| jobs: | ||
| staging-deploy: | ||
|
|
||
| runs-on: ubuntu-latest | ||
| environment: staging | ||
|
|
||
| steps: | ||
| - name: Check out source repository | ||
| uses: actions/checkout@v6.0.2 | ||
| with: | ||
| fetch-depth: 0 | ||
|
|
||
| - name: Install uv in container | ||
| uses: astral-sh/setup-uv@v8.0.0 | ||
| with: | ||
| version: "latest" | ||
|
|
||
| - name: Generate requirements.txt | ||
| run: | | ||
| uv export \ | ||
| --format requirements-txt \ | ||
| --no-emit-project \ | ||
| --no-dev \ | ||
| --output-file requirements.txt | ||
|
|
||
| - name: Authenticate to Google Cloud | ||
| uses: 'google-github-actions/auth@v3' | ||
| with: | ||
| credentials_json: ${{ secrets.CLOUD_DEPLOY_SERVICE_ACCOUNT_KEY }} | ||
|
|
||
| - name: Run Alembic migrations on staging database | ||
| env: | ||
| DB_DRIVER: "cloudsql" | ||
| CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" | ||
| CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" | ||
| CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" | ||
| CLOUD_SQL_IAM_AUTH: true | ||
| run: | | ||
| uv run alembic upgrade head | ||
|
|
||
| - name: Refresh materialized views on staging database | ||
| env: | ||
| DB_DRIVER: "cloudsql" | ||
| CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" | ||
| CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" | ||
| CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" | ||
| CLOUD_SQL_IAM_AUTH: true | ||
| run: | | ||
| uv run python -m cli.cli refresh-pygeoapi-materialized-views | ||
|
|
||
| - name: Ensure envsubst is available | ||
| run: | | ||
| if ! command -v envsubst >/dev/null 2>&1; then | ||
| sudo apt-get update | ||
| sudo apt-get install -y gettext-base | ||
| fi | ||
|
|
||
| - name: Render App Engine configs | ||
| env: | ||
| ENVIRONMENT: "staging" | ||
| CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" | ||
| CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" | ||
| CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" | ||
| PYGEOAPI_POSTGRES_DB: "${{ vars.CLOUD_SQL_DATABASE }}" | ||
| PYGEOAPI_POSTGRES_USER: "${{ secrets.PYGEOAPI_POSTGRES_USER }}" | ||
| PYGEOAPI_POSTGRES_HOST: "${{ vars.PYGEOAPI_POSTGRES_HOST || '127.0.0.1' }}" | ||
| PYGEOAPI_POSTGRES_PORT: "${{ vars.PYGEOAPI_POSTGRES_PORT || '5432' }}" | ||
| PYGEOAPI_POSTGRES_PASSWORD: "${{ secrets.PYGEOAPI_POSTGRES_PASSWORD }}" | ||
| PYGEOAPI_SERVER_URL: "${{ vars.PYGEOAPI_SERVER_URL }}" | ||
| CLOUD_SQL_IAM_AUTH: "true" | ||
| GCS_SERVICE_ACCOUNT_KEY: "${{ secrets.GCS_SERVICE_ACCOUNT_KEY }}" | ||
| GCS_BUCKET_NAME: "${{ vars.GCS_BUCKET_NAME }}" | ||
| AUTHENTIK_URL: "${{ vars.AUTHENTIK_URL }}" | ||
| AUTHENTIK_CLIENT_ID: "${{ vars.AUTHENTIK_CLIENT_ID }}" | ||
| AUTHENTIK_AUTHORIZE_URL: "${{ vars.AUTHENTIK_AUTHORIZE_URL }}" | ||
| AUTHENTIK_TOKEN_URL: "${{ vars.AUTHENTIK_TOKEN_URL }}" | ||
| SESSION_SECRET_KEY: "${{ secrets.SESSION_SECRET_KEY }}" | ||
| APITALLY_CLIENT_ID: "${{ vars.APITALLY_CLIENT_ID }}" | ||
| run: | | ||
| export MAX_INSTANCES="10" | ||
| export SERVICE_NAME="ocotillo-api-staging" | ||
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | ||
| export MIN_INSTANCES="0" | ||
| envsubst < .github/app.template.yaml > app.yaml | ||
|
|
||
| - name: Deploy to Google Cloud | ||
| run: | | ||
| gcloud app deploy \ | ||
| app.yaml \ | ||
| --quiet \ | ||
| --project ${{ vars.GCP_PROJECT_ID }} | ||
|
|
||
| - name: Clean up oldest versions | ||
| run: | | ||
| SERVICE="ocotillo-api-testing" | ||
| VERSIONS_JSON="$(gcloud app versions list --service="$SERVICE" --project=${{ vars.GCP_PROJECT_ID }} --format=json --sort-by="version.createTime" 2>/dev/null || printf '[]')" | ||
| export VERSIONS_JSON | ||
| DELETE_VERSION="$(python - <<'PY' | ||
| import json | ||
| import os | ||
|
|
||
| versions = json.loads(os.environ.get("VERSIONS_JSON", "[]") or "[]") | ||
| if len(versions) <= 1: | ||
| print("") | ||
| raise SystemExit(0) | ||
|
|
||
| def traffic_split(version): | ||
| for key in ("traffic_split", "trafficSplit"): | ||
| value = version.get(key) | ||
| if value is not None: | ||
| try: | ||
| return float(value) | ||
| except (TypeError, ValueError): | ||
| return 0.0 | ||
| return 0.0 | ||
|
|
||
| for version in versions: | ||
| if traffic_split(version) == 0.0: | ||
| print(version.get("id", "")) | ||
| break | ||
| else: | ||
| print("") | ||
| PY | ||
| )" | ||
| if [ -n "$DELETE_VERSION" ]; then | ||
| echo "Deleting old non-serving version for $SERVICE: $DELETE_VERSION" | ||
| gcloud app versions delete "$DELETE_VERSION" --service="$SERVICE" --project=${{ vars.GCP_PROJECT_ID }} --quiet | ||
| else | ||
| echo "No old non-serving versions to delete for $SERVICE" | ||
| fi | ||
|
|
||
| - name: Remove rendered configs | ||
| run: | | ||
| rm app.yaml | ||
|
|
||
| # Use PR author's username as git user name | ||
| - name: Set up git user | ||
| run: | | ||
| git config --global user.name "${{ github.actor }}" | ||
| git config --global user.email "${{ github.actor }}@users.noreply.github.com" | ||
|
|
||
| # ":" are not alloed in git tags, so replace with "-" | ||
| - name: Tag commit | ||
| run: | | ||
| git tag -a "staging-deploy-$(date -u +%Y-%m-%d)T$(date -u +%H-%M-%S%z)" -m "staging gcloud deployment: $(date -u +%Y-%m-%d)T$(date -u +%H:%M:%S%z)" | ||
| git push origin --tags |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| # ADR2: API Concurrency Fix Strategy | ||
|
|
||
| ## Summary | ||
|
|
||
| This document describes a verified FastAPI concurrency issue in the API stack and recommends a two-phase remediation plan for maintainers. | ||
|
|
||
| The API uses synchronous SQLAlchemy sessions backed by `psycopg2`. When those sessions are consumed from `async def` route handlers, blocking database work runs on the event loop thread if the handlers call synchronous ORM helpers directly. The lowest-risk immediate fix is to convert database-bound route handlers that do not perform asynchronous work into plain `def`. The longer-term fix is to introduce a real async SQLAlchemy stack and migrate the affected handlers and helpers incrementally. | ||
|
|
||
| ## Problem | ||
|
|
||
| FastAPI supports synchronous generator dependencies such as `get_db_session()`. The issue is not the dependency shape itself. The issue is that the injected object is a synchronous SQLAlchemy `Session`, and any `async def` route that consumes it while executing synchronous ORM queries directly will block the event loop thread. | ||
|
|
||
| In this configuration, FastAPI runs the `async def` route body on the event loop thread. If that body performs blocking database I/O through the synchronous session, the worker cannot make progress on other requests assigned to that event loop until the database call returns. A slow well query can therefore delay unrelated lightweight requests handled by the same worker. | ||
|
|
||
| This is a concurrency problem, not a correctness problem. The endpoints can still return correct data while reducing throughput and responsiveness under load. | ||
|
|
||
| ## Evidence In This Repo | ||
|
|
||
| - [`db/engine.py`](db/engine.py) creates `database_sessionmaker = sessionmaker(engine, expire_on_commit=False)` and `get_db_session()` yields a regular synchronous `Session`. | ||
| - [`db/engine.py`](db/engine.py) builds `postgresql+psycopg2` engines for both the default PostgreSQL path and the Cloud SQL path, confirming that the active database layer is synchronous. | ||
| - [`core/dependencies.py`](core/dependencies.py) injects that session through `session_dependency`. | ||
| - [`services/well_details_helper.py`](services/well_details_helper.py) performs synchronous ORM operations such as `session.scalars(...).all()` and related query chains. | ||
| - [`api/thing.py`](api/thing.py) contains representative database-backed routes that pass the synchronous session into helper functions such as `get_db_things(...)` and `get_well_details_payload(...)`. | ||
| - [`api/asset.py`](api/asset.py) shows a contrasting safe pattern for non-database blocking work by wrapping synchronous GCS calls in `run_in_threadpool(...)`. | ||
| - The short-term fix described in this ADR converts database-bound routes from `async def` to `def` where they do not need `await`, but the helper/query layer remains synchronous until a real async session stack is introduced. | ||
|
|
||
| ## Short-Term Fix | ||
|
|
||
| The short-term fix is to convert database-bound route handlers from `async def` to `def` when they do not actually perform asynchronous work. | ||
|
|
||
| This lets FastAPI offload the entire route function to a worker thread instead of running its synchronous database calls on the event loop thread. It does not require changing the current database engine, dependency, query helpers, or response schemas. | ||
|
|
||
| ### Short-term implementation guidance | ||
|
|
||
| - Convert any route handler that: | ||
| - receives `session: session_dependency`, | ||
| - performs synchronous ORM work directly or through helpers, and | ||
| - does not require `await` for other operations in the route body. | ||
| - Prioritize the highest-value endpoints first: | ||
| - high-traffic list and detail endpoints, | ||
| - endpoints known to run expensive joins or eager-loads, | ||
| - endpoints that affect warmup or perceived application responsiveness. | ||
| - Keep route behavior unchanged: | ||
| - do not change paths, status codes, payloads, or auth dependencies as part of this phase. | ||
| - Avoid mixed patterns: | ||
| - do not leave a route as `async def` if it still calls synchronous SQLAlchemy code directly. | ||
| - Use `run_in_threadpool(...)` only when a route must remain `async def` for a separate reason, such as mixing in another async operation, and only for isolated blocking helpers rather than as a blanket wrapper for all DB access. | ||
|
|
||
| ### Expected impact | ||
|
|
||
| - Lower risk than a full async migration. | ||
| - No intended HTTP contract changes. | ||
| - Better worker responsiveness because blocking DB work moves off the event loop thread. | ||
|
|
||
| ## Long-Term Fix | ||
|
|
||
| The long-term fix is to add a real async database stack and migrate selected API areas to it incrementally. | ||
|
|
||
| This phase should introduce an explicit async path rather than trying to reuse the current synchronous dependency. Importing async SQLAlchemy primitives is not enough; the repo needs a working async engine, async sessionmaker, async dependency, and async query/helper layer for migrated endpoints. | ||
|
|
||
| ### Long-term target architecture | ||
|
|
||
| - Add an `AsyncEngine` configured for the intended async driver. | ||
| - Add an `async_sessionmaker` that yields `AsyncSession` instances. | ||
| - Add a dedicated async dependency such as `get_async_db_session()` rather than overloading `get_db_session()`. | ||
| - Update migrated handlers and helper functions to use async database access: | ||
| - `await session.execute(...)` | ||
| - `await session.scalars(...)` | ||
| - other `AsyncSession`-compatible patterns as needed | ||
|
|
||
| ### Long-term migration guidance | ||
|
|
||
| - Migrate by subsystem, not all at once. | ||
| - Start with a bounded route/helper cluster where the query patterns are understood. | ||
| - Keep sync and async paths separate during migration to avoid ambiguous dependencies and accidental sync calls from async routes. | ||
| - Treat helper-layer migration as part of the work. Converting route signatures alone is insufficient if the helper functions still expect synchronous sessions. | ||
|
|
||
| ### Non-goals and cautions | ||
|
|
||
| - Do not claim the repo already has a working async DB session path unless one is actually implemented and used. | ||
| - Do not treat “switch everything to async” as a trivial refactor. | ||
| - Do not mix `AsyncSession` route code with synchronous helper/query internals. | ||
|
|
||
| ## Recommended Path | ||
|
|
||
| The recommended order is: | ||
|
|
||
| 1. Convert database-bound `async def` routes that do not use `await` into plain `def`. | ||
| 2. Validate behavior and measure the effect on responsiveness. | ||
| 3. Introduce a dedicated async DB stack. | ||
| 4. Migrate selected route/helper subsystems incrementally to `AsyncSession`. | ||
|
|
||
| This sequence delivers immediate concurrency improvement with limited risk, while preserving a clear path to a full async architecture later. | ||
|
|
||
| ## Acceptance Criteria | ||
|
|
||
| ### Short-term acceptance criteria | ||
|
|
||
| - Targeted API tests continue to pass after `async def` to `def` conversions. | ||
| - HTTP behavior is unchanged: | ||
| - same routes, | ||
| - same auth requirements, | ||
| - same status codes, | ||
| - same payload shapes. | ||
| - Concurrency smoke checks or request-timing instrumentation show that DB-heavy requests no longer block the event loop thread for that worker in the same way they do today. | ||
|
|
||
| ### Long-term acceptance criteria | ||
|
|
||
| - Migrated endpoints pass the existing API test coverage for their subsystem. | ||
| - The async session lifecycle is correct for successful and failing requests. | ||
| - Migrated `async def` routes do not call synchronous session helpers. | ||
| - Before/after measurements are captured for latency and concurrency so the migration can be evaluated against real behavior rather than assumptions. | ||
|
|
||
| ## Defaults And Assumptions | ||
|
|
||
| - This document is written for maintainers and assumes familiarity with FastAPI and SQLAlchemy internals. | ||
| - The document is self-contained and does not require code changes to be useful. | ||
| - The recommended short-term action is intentionally conservative and does not prescribe a file-by-file rollout sequence. | ||
| - The recommended long-term action is a staged migration, not a flag-day rewrite. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumping Gunicorn to
-w 4increases process count and therefore multiplies DB connection pool usage (each worker has its own SQLAlchemy pool) and overall memory footprint. GivenDB_POOL_SIZE/DB_MAX_OVERFLOWare also being introduced, please verify the resulting worst-case connection count stays within Cloud SQL limits for the chosen instance tier, and consider making worker count/pool settings explicitly coordinated (or configurable per env) to avoid connection storms under load.