[codex] Fix API concurrency by moving sync DB routes off the event loop#652
[codex] Fix API concurrency by moving sync DB routes off the event loop#652jirhiker wants to merge 5 commits into
Conversation
…e handlers to synchronous and optimizing database connection pooling
…r sample and thing operations
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b2941d6eda
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| orig = getattr(error, "orig", None) | ||
| if hasattr(orig, "args") and orig.args and isinstance(orig.args[0], dict): | ||
| error_message = orig.args[0].get("M", "") | ||
| else: | ||
| error_message = str(orig or error) |
There was a problem hiding this comment.
Match contact DB errors without exact full-message equality
The fallback now sets error_message = str(orig or error), but the checks below still rely on exact string equality. With psycopg2/IntegrityError, error strings commonly include DETAIL/extra text, so none of those branches match and detail remains unset, leading to an UnboundLocalError (500) instead of the intended 409 response for invalid contact_type/FK inputs. Use constraint-name substring matching (or a safe default branch) before raising.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
This PR addresses a verified FastAPI concurrency issue caused by synchronous SQLAlchemy Session usage inside async def route handlers by converting DB-backed handlers to synchronous def so FastAPI runs them in its threadpool, and documents the approach in an ADR.
Changes:
- Converted many synchronous DB-backed
async defAPI endpoints todefto avoid blocking the event loop. - Introduced
normalize_for_db(...)and applied it to multiple write/patch helpers to unwrapEnumvalues (and other nested payload structures) before ORM usage. - Updated DB driver/pooling/deploy settings (psycopg2, pool env vars, Gunicorn worker count) to support the new concurrency posture.
Reviewed changes
Copilot reviewed 28 out of 29 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
uv.lock |
Adds psycopg2 to the locked environment alongside existing DB drivers. |
tests/integration/test_nma_legacy_relationships.py |
Adjusts a test fixture value in legacy relationship tests. |
services/thing_helper.py |
Normalizes payloads for DB writes and improves well screen creation validation. |
services/publication_helper.py |
Normalizes publication payloads before DB insert logic. |
services/payload_helper.py |
Adds normalize_for_db(...) helper for recursive payload normalization. |
services/observation_helper.py |
Normalizes patch payload values before applying updates. |
services/crud_helper.py |
Normalizes model dump output for add/patch helper paths. |
services/contact_helper.py |
Normalizes contact payloads and adds explicit Thing existence validation. |
pyproject.toml |
Adds psycopg2>=2.9.11 dependency. |
db/engine.py |
Switches SQLAlchemy URL/Cloud SQL connector usage to psycopg2 and adds pool sizing via env vars. |
api/thing.py |
Broadly converts handlers to def and hardens DB error parsing/matching. |
api/sensor.py |
Converts DB-backed handlers to synchronous def. |
api/search.py |
Converts handler to synchronous def. |
api/sample.py |
Converts handlers to synchronous def and hardens DB error parsing. |
api/publication.py |
Converts handler to synchronous def. |
api/observation.py |
Converts DB-backed handlers to synchronous def (keeps truly async bulk upload). |
api/ngwmn.py |
Converts handlers to synchronous def. |
api/location.py |
Converts handlers to synchronous def. |
api/lexicon.py |
Converts handlers to synchronous def. |
api/group.py |
Converts handlers to synchronous def. |
api/geospatial.py |
Converts handlers to synchronous def. |
api/geochronology.py |
Converts handlers to synchronous def. |
api/contact.py |
Converts handlers to synchronous def and adds pre-validation for missing contacts. |
api/author.py |
Converts handler to synchronous def. |
api/asset.py |
Converts DB-backed handlers to synchronous def and normalizes asset payloads. |
ADR2.md |
Documents the concurrency problem and phased remediation plan. |
.github/workflows/CD_staging.yml |
Increases Gunicorn worker count for staging deploy entrypoint. |
.github/workflows/CD_production.yml |
Increases Gunicorn worker count for production deploy entrypoint. |
.github/app.template.yaml |
Adds DB pool sizing env vars for deployment. |
| @@ -79,7 +80,11 @@ def database_error_handler(payload: CreateAsset, error: ProgrammingError) -> Non | |||
| Handle errors raised by the database when adding or updating a asset. | |||
There was a problem hiding this comment.
database_error_handler is typed as taking only ProgrammingError, but it’s called from an except (ProgrammingError, IntegrityError) block below. Please widen the handler’s parameter type (and any internal assumptions) to accept IntegrityError too, so the signature matches actual usage.
| - [`db/engine.py`](/Users/jross/Programming/DIG/OcotilloAPI/db/engine.py:186) creates `database_sessionmaker = sessionmaker(engine, expire_on_commit=False)` and [`get_db_session()`](/Users/jross/Programming/DIG/OcotilloAPI/db/engine.py:189) yields a regular synchronous `Session`. | ||
| - [`db/engine.py`](/Users/jross/Programming/DIG/OcotilloAPI/db/engine.py:172) builds a `postgresql+psycopg2://...` URL for the default engine, confirming that the active database path is synchronous. | ||
| - [`core/dependencies.py`](/Users/jross/Programming/DIG/OcotilloAPI/core/dependencies.py:24) injects that session through `session_dependency`. | ||
| - [`api/thing.py`](/Users/jross/Programming/DIG/OcotilloAPI/api/thing.py:153) defines representative `async def` route handlers such as `get_water_wells(...)` that pass the synchronous session into helper functions without awaiting database work. | ||
| - [`services/well_details_helper.py`](/Users/jross/Programming/DIG/OcotilloAPI/services/well_details_helper.py:50) performs synchronous ORM operations such as `session.scalars(...).all()` and related query chains. | ||
| - [`api/asset.py`](/Users/jross/Programming/DIG/OcotilloAPI/api/asset.py:113) shows a contrasting safe pattern for non-database blocking work by wrapping synchronous GCS calls in `run_in_threadpool(...)`. | ||
| - A repo scan of `api/` found 114 `async def` handlers, 111 of which contain no `await`. That is strong evidence that most handlers are currently synchronous in practice even though they are declared async. |
There was a problem hiding this comment.
The “Evidence In This Repo” section uses absolute local filesystem paths (e.g. /Users/...) which won’t work for other contributors or on GitHub, and some bullets are already stale after this PR (e.g. it references async def handlers and counts that this PR changes to def). Please switch to repo-relative links/references (and adjust the wording to stay true after the route conversions).
| - [`db/engine.py`](/Users/jross/Programming/DIG/OcotilloAPI/db/engine.py:186) creates `database_sessionmaker = sessionmaker(engine, expire_on_commit=False)` and [`get_db_session()`](/Users/jross/Programming/DIG/OcotilloAPI/db/engine.py:189) yields a regular synchronous `Session`. | |
| - [`db/engine.py`](/Users/jross/Programming/DIG/OcotilloAPI/db/engine.py:172) builds a `postgresql+psycopg2://...` URL for the default engine, confirming that the active database path is synchronous. | |
| - [`core/dependencies.py`](/Users/jross/Programming/DIG/OcotilloAPI/core/dependencies.py:24) injects that session through `session_dependency`. | |
| - [`api/thing.py`](/Users/jross/Programming/DIG/OcotilloAPI/api/thing.py:153) defines representative `async def` route handlers such as `get_water_wells(...)` that pass the synchronous session into helper functions without awaiting database work. | |
| - [`services/well_details_helper.py`](/Users/jross/Programming/DIG/OcotilloAPI/services/well_details_helper.py:50) performs synchronous ORM operations such as `session.scalars(...).all()` and related query chains. | |
| - [`api/asset.py`](/Users/jross/Programming/DIG/OcotilloAPI/api/asset.py:113) shows a contrasting safe pattern for non-database blocking work by wrapping synchronous GCS calls in `run_in_threadpool(...)`. | |
| - A repo scan of `api/` found 114 `async def` handlers, 111 of which contain no `await`. That is strong evidence that most handlers are currently synchronous in practice even though they are declared async. | |
| - [`db/engine.py`](db/engine.py) creates `database_sessionmaker = sessionmaker(engine, expire_on_commit=False)` and `get_db_session()` yields a regular synchronous `Session`. | |
| - [`db/engine.py`](db/engine.py) builds a `postgresql+psycopg2://...` URL for the default engine, confirming that the active database path is synchronous. | |
| - [`core/dependencies.py`](core/dependencies.py) injects that session through `session_dependency`. | |
| - [`api/thing.py`](api/thing.py) contains representative route handlers that pass the synchronous session into helper functions without awaiting database work in the route body. | |
| - [`services/well_details_helper.py`](services/well_details_helper.py) performs synchronous ORM operations such as `session.scalars(...).all()` and related query chains. | |
| - [`api/asset.py`](api/asset.py) shows a contrasting safe pattern for non-database blocking work by wrapping synchronous GCS calls in `run_in_threadpool(...)`. | |
| - The [`api/`](api/) package contains multiple handlers that use synchronous database access patterns, which is the key risk described here; as routes are converted from `async def` to `def`, this ADR should rely on the file references above rather than hard-coded handler counts. |
| export MAX_INSTANCES="10" | ||
| export SERVICE_NAME="ocotillo-api" | ||
| export ENTRYPOINT="gunicorn -w 1 -k uvicorn.workers.UvicornWorker main:app" | ||
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | ||
| export MIN_INSTANCES="0" |
There was a problem hiding this comment.
Bumping Gunicorn to -w 4 increases process count and therefore multiplies DB connection pool usage (each worker has its own SQLAlchemy pool) and overall memory footprint. Given DB_POOL_SIZE/DB_MAX_OVERFLOW are also being introduced, please verify the resulting worst-case connection count stays within Cloud SQL limits for the chosen instance tier, and consider making worker count/pool settings explicitly coordinated (or configurable per env) to avoid connection storms under load.
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | ||
| export MIN_INSTANCES="0" |
There was a problem hiding this comment.
Bumping Gunicorn to -w 4 increases process count and therefore multiplies DB connection pool usage (each worker has its own SQLAlchemy pool) and overall memory footprint. With DB_POOL_SIZE/DB_MAX_OVERFLOW being introduced, please verify the resulting worst-case connection count stays within Cloud SQL limits for staging as well, and consider coordinating worker/pool sizing explicitly to avoid connection exhaustion under load.
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | |
| export MIN_INSTANCES="0" | |
| export ENTRYPOINT="gunicorn -w 2 -k uvicorn.workers.UvicornWorker main:app" | |
| export MIN_INSTANCES="0" | |
| export DB_POOL_SIZE="2" | |
| export DB_MAX_OVERFLOW="0" |
| def normalize_for_db(value): | ||
| """ | ||
| Recursively convert Python/Pydantic payloads into DB-friendly primitives. | ||
|
|
||
| Dates and datetimes are intentionally preserved as-is. Enum values are | ||
| unwrapped to their underlying primitive values so psycopg2 never sees raw | ||
| Enum objects. | ||
| """ |
There was a problem hiding this comment.
normalize_for_db is now a central piece of write-path behavior (it affects multiple helpers/endpoints). Please add focused unit tests for it (e.g., nested dict/list structures containing Enum values) to lock in behavior and prevent regressions as more payload shapes are added.
| def database_error_handler( | ||
| payload: CreateEmail | CreateContact | CreatePhone, error: ProgrammingError | ||
| ) -> None: |
There was a problem hiding this comment.
database_error_handler is now invoked with IntegrityError too (see the except (ProgrammingError, IntegrityError) blocks below), but the function signature still restricts error to ProgrammingError. Please widen the type annotation (and any related logic) to accept IntegrityError as well to match actual call sites.
| orig = getattr(error, "orig", None) | ||
| if hasattr(orig, "args") and orig.args and isinstance(orig.args[0], dict): | ||
| error_message = orig.args[0].get("M", "") | ||
| else: | ||
| error_message = str(orig or error) | ||
|
|
||
| if ( | ||
| error_message |
There was a problem hiding this comment.
database_error_handler still has no default branch to set detail. If error_message doesn’t match any of the hard-coded cases, detail remains undefined and the later raise PydanticStyleException(..., detail=[detail]) will crash with UnboundLocalError. Add an else fallback that builds a generic detail payload using error_message (as done in other error handlers in this PR).
| orig = getattr(error, "orig", None) | ||
| if hasattr(orig, "args") and orig.args and isinstance(orig.args[0], dict): | ||
| error_message = orig.args[0].get("M", "") | ||
| else: | ||
| error_message = str(orig or error) | ||
|
|
||
| if ( | ||
| error_message == 'null value in column "thing_id" of relation ' |
There was a problem hiding this comment.
database_error_handler only sets detail for one specific message, but it always raises PydanticStyleException(detail=[detail]). For any other error message (including many IntegrityError cases now being caught), detail will be undefined and this handler will crash with UnboundLocalError. Add a default else that sets a generic detail payload (e.g., using error_message).
…tion and database migration steps
…mend remediation plan
Summary
ADR2.mddocumenting the verified FastAPI concurrency issue and the short- and long-term remediation planasync deftodefawaitblocking work unchangedWhy
The API currently injects a synchronous SQLAlchemy
Sessionbacked bypsycopg2into manyasync defendpoints. Those handlers execute synchronous ORM work directly, which blocks the event loop thread for the worker handling the request. The short-term fix is to let FastAPI run those handlers in its threadpool by declaring them as synchronous route functions.What Changed
async deftodefrun_in_threadpool(...)Impact
Validation
blackon the touched API modulesflake8on the touched API modules; it reported many pre-existingE501line-length issues and a few pre-existingF811redefinitions unrelated to this changepytest tests/test_group.py tests/test_sample.py tests/test_publication.py tests/test_query.py tests/test_request_timing.pytests/test_request_timing.pypassedgroup,sample, andpublicationaround enum adaptation and author/publication fixture behavior that were not introduced by this route-signature changeNotes
AsyncSessionmigration as described inADR2.md