feat: implement API concurrency fix strategy by converting async rout…#655
Conversation
…e handlers to sync and enhancing error handling
There was a problem hiding this comment.
Pull request overview
This PR aims to mitigate a FastAPI concurrency issue caused by running synchronous SQLAlchemy ORM work inside async def route handlers, by converting DB-bound routes to synchronous def handlers and hardening some database error handling. It also adjusts deployment workflows to run with more Gunicorn workers and adds an ADR documenting the strategy.
Changes:
- Convert many API endpoints from
async deftodefso synchronous DB work runs off the event loop. - Broaden DB exception handling (e.g., handle
IntegrityErrorin addition toProgrammingError) and make error-message extraction driver-tolerant. - Add an ADR describing the concurrency strategy and add/adjust CD workflows (including worker-count changes).
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_nma_chemistry_lineage.py | Updates test comments around NOT NULL violation error surfaces. |
| tests/integration/test_nma_legacy_relationships.py | Updates a legacy test fixture value and adjusts driver-agnostic error comment. |
| api/thing.py | Converts routes to sync def; expands DB error handler and caught exception types. |
| api/sensor.py | Converts CRUD-style sensor endpoints to sync def. |
| api/search.py | Converts search endpoint to sync def. |
| api/sample.py | Converts sample endpoints to sync def; expands DB error handling to include IntegrityError. |
| api/publication.py | Converts publication create endpoint to sync def. |
| api/observation.py | Converts most observation endpoints to sync def (bulk upload remains async). |
| api/ngwmn.py | Converts NGWMN XML endpoints to sync def. |
| api/location.py | Converts location endpoints to sync def. |
| api/lexicon.py | Converts lexicon endpoints to sync def (many already disabled/deprecated). |
| api/group.py | Converts group endpoints to sync def. |
| api/geospatial.py | Converts geospatial endpoints to sync def. |
| api/geochronology.py | Converts geochronology endpoints to sync def. |
| api/contact.py | Converts contact endpoints to sync def; expands DB error handling to include IntegrityError. |
| api/author.py | Converts author publications endpoint to sync def. |
| ADR2.md | Adds ADR documenting the concurrency issue and two-phase remediation plan. |
| .github/workflows/CD_testing.yml | Adds a new CD workflow for testing deploys. |
| .github/workflows/CD_staging.yml | Increases Gunicorn worker count in staging deploy entrypoint. |
| .github/workflows/CD_production.yml | Increases Gunicorn worker count in production deploy entrypoint. |
Comments suppressed due to low confidence (3)
api/thing.py:335
- This module defines
get_thing_id_linksmultiple times (same Python function name for different routes). FastAPI uses the function name to deriveoperationId; duplicate names can lead to OpenAPI collisions/overwrites and client generation issues. Rename these handlers to unique, descriptive names (e.g., list vs get-by-id vs list-by-thing).
@router.get(
"/id-link",
summary="Get all thing links",
)
def get_thing_id_links(
user: viewer_dependency,
session: session_dependency,
filter_: str = Query(alias="filter", default=None),
sort: str = None,
order: str = None,
) -> CustomPage[ThingIdLinkResponse]:
"""
Retrieve all thing links, optionally filtered and sorted.
"""
sql = select(ThingIdLink)
sql = order_sort_filter(sql, ThingIdLink, sort=sort, order=order, filter_=filter_)
return paginate(query=sql, conn=session)
api/thing.py:347
- This route handler reuses the same Python name
get_thing_id_linksas another endpoint above. Even though the decorator registers routes at definition time, duplicate function names can produce duplicateoperationIds in the generated OpenAPI schema. Use a unique function name for this specific route (e.g.,get_thing_id_link_by_id).
@public_route
@router.get("/id-link/{link_id}", summary="Get thing links by link ID")
def get_thing_id_links(
user: viewer_dependency,
link_id: int,
session: session_dependency,
) -> ThingIdLinkResponse:
"""
Retrieve all links for a specific thing by its ID.
"""
return simple_get_by_id(session, ThingIdLink, link_id)
api/thing.py:405
- This endpoint is also named
get_thing_id_links, duplicating other handlers in the same module. Duplicate handler names can cause OpenAPIoperationIdconflicts. Rename to a unique name that reflects the behavior (e.g.,list_thing_id_links_for_thing).
@router.get("/{thing_id}/id-link", summary="Get thing links by thing ID")
def get_thing_id_links(
user: viewer_dependency,
thing_id: int,
session: session_dependency,
) -> CustomPage[ThingIdLinkResponse]:
"""
Retrieve all links for a specific thing by its ID.
"""
thing = simple_get_by_id(session, Thing, thing_id)
sql = select(ThingIdLink).where(ThingIdLink.thing_id == thing.id)
return paginate(query=sql, conn=session)
| export MAX_INSTANCES="10" | ||
| export SERVICE_NAME="ocotillo-api-staging" | ||
| export ENTRYPOINT="gunicorn -w 1 -k uvicorn.workers.UvicornWorker main:app" | ||
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | ||
| export MIN_INSTANCES="0" |
There was a problem hiding this comment.
Bumping Gunicorn workers from 1 to 4 increases process count and typically multiplies DB connections/pool usage (each worker has its own SQLAlchemy engine/pool) and memory. Consider making the worker count configurable via an env var (with a safe default) and verify Cloud SQL connection limits/pool sizing are compatible with the increased concurrency.
| export MAX_INSTANCES="10" | ||
| export SERVICE_NAME="ocotillo-api" | ||
| export ENTRYPOINT="gunicorn -w 1 -k uvicorn.workers.UvicornWorker main:app" | ||
| export ENTRYPOINT="gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" | ||
| export MIN_INSTANCES="0" |
There was a problem hiding this comment.
Bumping Gunicorn workers from 1 to 4 increases process count and can significantly raise DB connection usage (each worker typically maintains its own pool) and memory footprint. Consider making the worker count configurable and confirm Cloud SQL connection limits/pool settings are aligned with this change before rolling out to production.
| else: | ||
| detail = { | ||
| "loc": ["body"], | ||
| "msg": error_message, | ||
| "type": "value_error", | ||
| "input": {}, | ||
| } |
There was a problem hiding this comment.
database_error_handler falls back to returning the raw DB/driver error string to clients (the else branch). For IntegrityError this often includes SQL fragments and bound parameters, which can leak implementation details and potentially sensitive data. Consider returning a generic message (and logging the full exception server-side) or mapping common constraint types to safe, user-facing messages instead of echoing error_message verbatim.
| else: | ||
| detail = { | ||
| "loc": ["body"], | ||
| "msg": error_message, | ||
| "type": "value_error", | ||
| "input": {}, | ||
| } |
There was a problem hiding this comment.
database_error_handler falls back to returning the raw DB/driver error string to clients. For IntegrityError, the string frequently includes SQL and parameters; consider returning a generic message (and logging the full exception) rather than exposing internal DB error text in the HTTP response.
| else: | ||
| detail = { | ||
| "loc": ["body"], | ||
| "msg": error_message, | ||
| "type": "value_error", | ||
| "input": {}, | ||
| } |
There was a problem hiding this comment.
database_error_handler falls back to returning the raw DB/driver error string to clients. This may leak SQL details/parameters for IntegrityError and is generally unsafe. Prefer a generic message and log the original exception, or map known constraint names to safe, user-facing validation errors.
| @router.get("/age", tags=["geochronology"]) | ||
| async def get_geochronology_age( | ||
| def get_geochronology_age( | ||
| user: viewer_dependency, session: session_dependency, method: str = "arar" | ||
| ): |
There was a problem hiding this comment.
Inside this handler the query result is returned via session.scalar(sql).all(), but Session.scalar() returns a single scalar value (not a result collection), so .all() will fail at runtime. Use session.scalars(sql).all() (or session.execute(sql).scalars().all()) if you intend to return all rows.
|
|
||
| The API uses synchronous SQLAlchemy sessions backed by `psycopg`. When those sessions are consumed from `async def` route handlers, blocking database work runs on the event loop thread if the handlers call synchronous ORM helpers directly. The lowest-risk immediate fix is to convert database-bound route handlers that do not perform asynchronous work into plain `def`. The longer-term fix is to introduce a real async SQLAlchemy stack and migrate the affected handlers and helpers incrementally. | ||
|
|
There was a problem hiding this comment.
ADR2 states the API uses synchronous SQLAlchemy sessions backed by psycopg, but db/engine.py constructs postgresql+pg8000:// URLs for both Cloud SQL and local Postgres. To avoid misleading maintainers, update the ADR to reflect the actual driver(s) in use (or explain when/where psycopg is used).
| git tag -a "testing-deploy-$(date -u +%Y-%m-%d)T$(date -u +%H-%M-%S%z)" -m "testing gcloud deployment: $ | ||
| (date | ||
| -u +%Y-%m-%d)T$(date -u +%H:%M:%S%z)" |
There was a problem hiding this comment.
The tag message construction in this step appears syntactically broken: the -m string contains a literal $ followed by a newline, and the date command is split across lines. This will likely cause the shell to error and prevent tags from being created/pushed. Build the message on a single line (or use a heredoc/variable) so the git tag -m argument is valid.
| git tag -a "testing-deploy-$(date -u +%Y-%m-%d)T$(date -u +%H-%M-%S%z)" -m "testing gcloud deployment: $ | |
| (date | |
| -u +%Y-%m-%d)T$(date -u +%H:%M:%S%z)" | |
| git tag -a "testing-deploy-$(date -u +%Y-%m-%d)T$(date -u +%H-%M-%S%z)" -m "testing gcloud deployment: $(date -u +%Y-%m-%d)T$(date -u +%H:%M:%S%z)" |
| git config --global user.name "${{ github.actor }}" | ||
| git config --global user.email "${{ github.actor }}@users.noreply.github.com" | ||
|
|
||
| # ":" are not alloed in git tags, so replace with "-" |
There was a problem hiding this comment.
Spelling: "alloed" -> "allowed" in this comment.
| # ":" are not alloed in git tags, so replace with "-" | |
| # ":" are not allowed in git tags, so replace with "-" |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7c053eb64e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| - name: Run Alembic migrations on staging database | ||
| env: | ||
| DB_DRIVER: "cloudsql" | ||
| CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" | ||
| CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" |
There was a problem hiding this comment.
Isolate testing deploy from staging schema migrations
This workflow runs on every jir* branch push but still executes uv run alembic upgrade head with staging credentials (environment: staging and CLOUD_SQL_* values), so an unmerged feature branch can apply schema changes to the shared staging database. In branches that contain migrations, this can break staging for other users before code review/merge; this job should target an isolated testing database/environment or skip migrations.
Useful? React with 👍 / 👎.
…e handlers to sync and enhancing error handling
Why
This PR addresses the following problem / context:
How
Implementation summary - the following was changed / added / removed:
Notes
Any special considerations, workarounds, or follow-up work to note?