Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions alembic/versions/b2c52ee8ff12_add_ingestion_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Add ingestion status

Revision ID: b2c52ee8ff12
Revises: 9e9a4a7cd639
Create Date: 2026-05-11 16:16:03.768893

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "b2c52ee8ff12"
down_revision: Union[str, Sequence[str], None] = "9e9a4a7cd639"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
with op.batch_alter_table("simulations", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"ingestion_status",
sa.Enum(
"QUEUED",
"COPYING",
"COPIED",
"VALIDATING",
"VALIDATED",
"COMPLETED",
"COPY_FAILED",
"VALIDATION_FAILED",
name="ingestionstatus",
),
nullable=False,
)
)
batch_op.add_column(
sa.Column("ingestion_version", sa.Integer(), nullable=False)
)


def downgrade() -> None:
"""Downgrade schema."""
with op.batch_alter_table("simulations", schema=None) as batch_op:
batch_op.drop_column("ingestion_version")
batch_op.drop_column("ingestion_status")
77 changes: 77 additions & 0 deletions docs/celery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Celery async task processing

SimDB uses [Celery](https://docs.celeryproject.org/) to run asynchronous background
tasks such as copying simulation files and completing the ingestion pipeline.

## Overview

When simulations are uploaded via the REST API, the server offloads heavy operations
to Celery workers instead of blocking the HTTP request. Tasks are defined in
`src/simdb/workers/tasks.py`:

- `copy_files_task` — copies input/output files from source locations to the server's
upload folder and updates the simulation's ingestion status.
- `complete_ingestion_task` — marks a simulation as fully ingested.
- `validate_imas_task` — runs validation checks on IMAS data (placeholder).
- `send_email_task` — sends email notifications.

Tasks can be chained in the API endpoint:

```python
copy_files = copy_files_task.si(simulation.uuid, ...)
complete = complete_ingestion_task.si(simulation.uuid)
_ = (copy_files | complete).apply_async()
```

## Configuration

Celery is configured via `app.cfg`:

| Section | Option | Required | Description |
|---------|----------------|----------|--------------------------------------------------|
| celery | broker_url | no | Redis URL for the message broker. Defaults to `redis://localhost:6379/0` |
| celery | result_backend | no | Redis URL for results storage. Defaults to `redis://localhost:6379/0` |

Example:

```ini
[celery]
broker_url = redis://localhost:6379/0
result_backend = redis://localhost:6379/0
```

## Running workers

### Standalone worker

Start a Celery worker using the built-in CLI:

```bash
simdb_celery worker
```

### Worker with beat scheduler

For periodic tasks (e.g. cleanup, reports), run both the worker and beat:

```bash
# Terminal 1: worker
simdb_celery worker

# Terminal 2: beat scheduler
simdb_celery beat
```

### Flower monitoring

[Flower](https://flower.readthedocs.io/) provides a web UI for monitoring Celery
workers and tasks:

```bash
celery -A simdb.workers.celery flower --port=5555
```

## Testing with eager mode

In tests, set `task_always_eager = True` to run tasks synchronously without a
broker.
17 changes: 17 additions & 0 deletions docs/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ simdb_server

This will start a server on port 5000. You can test this server is running by opening http://localhost:5000 in a browser.

## Running Celery workers

For development, you typically want to run Celery tasks synchronously. This is
enabled by setting `task_always_eager = True` in tests (see `tests/remote/api/v1.3/test_simulations3.py`).

To run actual background workers during development:

```bash
# Worker
simdb_celery worker

# Beat scheduler (if needed)
simdb_celery beat
```

See the [Celery documentation](celery.md) for full details.

## Linting and formatting

SimDB uses [Ruff](https://docs.astral.sh/ruff/) for both linting and code
Expand Down
6 changes: 6 additions & 0 deletions docs/maintenance_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ service nginx restart

You should now be able to check the simdb server is running by going to the http address defined in your nginx site (localhost:80 in the example above).

## Celery background workers

SimDB uses Celery to run asynchronous background tasks such as copying simulation
files. See the [Celery documentation](celery.md) for details on configuration and
running workers.

#### Nginx Request Entity Size

You may need to increase the size of uploaded files that Nginx will accept. For SimDB this should be at least 100MB.
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,19 @@ build-docs = [
postgres = [
"psycopg2-binary>=2.8.0",
]
celery = [
"celery>=5.3.0",
"redis>=5.0.0",
]
all = [
"imas-simdb[server, imas-validator, postgres]"
"imas-simdb[server, imas-validator, postgres, celery]",
]

[project.scripts]
simdb = "simdb.cli.simdb:main"
simdb_server = "simdb.remote.wsgi:run"
simdb_worker = "simdb.workers.cli:worker"
simdb_beat = "simdb.workers.cli:beat"

[project.urls]
Homepage = "https://simdb.iter.org/dashboard/"
Expand Down Expand Up @@ -167,5 +173,5 @@ dev = [
"pytest-cov>=5.0.0",
"ruff~=0.15.0",
"ty==0.0.34",
"imas-simdb[server, imas-validator, postgres, auth]"
"imas-simdb[server, imas-validator, postgres, auth, celery]"
]
17 changes: 17 additions & 0 deletions src/simdb/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,23 @@ def get_string_option(
)
return value

def get_int_option(
self, name: str, default: Union[int, None, _NothingSentinel] = NOTHING
) -> int:
"""
Returns the value for the option with the given name from the configuration but
also ensures the resulting value is an integer.

@see get_option
@raise TypeError if the found value was not an integer
"""
value = self.get_option(name, default)
if value is not None and not isinstance(value, int):
raise TypeError(
f"Invalid type of option {name}: expected int, got {type(value)}"
)
return value

def delete_option(self, name: str) -> None:
"""
Delete the option with the given name from the configuration.
Expand Down
35 changes: 19 additions & 16 deletions src/simdb/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,6 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
self.engine: sqlalchemy.engine.Engine = create_engine(
"sqlite:///{file}".format(**kwargs)
)
with contextlib.closing(self.engine.connect()) as con:
res: sqlalchemy.engine.ResultProxy = con.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT "
"LIKE 'sqlite_%';"
)
new_db = res.rowcount == -1

elif db_type == Database.DBMS.POSTGRESQL:
if "host" not in kwargs:
Expand All @@ -131,12 +125,6 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
pool_pre_ping=True,
pool_recycle=3600,
)
with contextlib.closing(self.engine.connect()) as con:
res: sqlalchemy.engine.ResultProxy = con.execute(
"SELECT * FROM pg_catalog.pg_tables WHERE schemaname = 'public';"
)
new_db = res.rowcount == 0

elif db_type == Database.DBMS.MSSQL:
if "user" not in kwargs:
raise ValueError("Missing user parameter for MSSQL database")
Expand All @@ -147,12 +135,8 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
self.engine: sqlalchemy.engine.Engine = create_engine(
"mssql+pyodbc://{user}:{password}@{dsnname}".format(**kwargs)
)
new_db = False

else:
raise ValueError("Unknown database type: " + db_type.name)
if new_db:
Base.metadata.create_all(self.engine)
Base.metadata.bind = self.engine
if scopefunc is None:

Expand Down Expand Up @@ -745,3 +729,22 @@ def get_local_db(config: Config) -> Database:
db_file.parent.mkdir(parents=True, exist_ok=True)
database = Database(Database.DBMS.SQLITE, file=db_file)
return database


def get_db(config: Config) -> Database:
db_type = config.get_option("database.type")
if db_type == "postgres":
args = config.get_section("database")
return Database(
Database.DBMS.POSTGRESQL,
**args,
)
elif db_type == "sqlite":
db_dir = appdirs.user_data_dir("simdb")
file = Path(config.get_string_option("database.file", default=None)) or Path(
db_dir, "remote.db"
)
file.parent.mkdir(parents=True, exist_ok=True)
return Database(Database.DBMS.SQLITE, file=file)
else:
raise RuntimeError(f"Unknown database type in configuration: {db_type}.")
8 changes: 8 additions & 0 deletions src/simdb/database/models/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union

from simdb.enums import IngestionStatus
from simdb.remote.models import (
FileDataList,
MetadataDataList,
Expand Down Expand Up @@ -120,6 +121,13 @@ class Status(Enum):
"Watcher", secondary=simulation_watchers, lazy="dynamic"
)

ingestion_status = Column(
sql_types.Enum(IngestionStatus),
nullable=False,
default=IngestionStatus.COMPLETED,
)
ingestion_version = Column(sql_types.Integer, nullable=False, default=0)

def __init__(
self, manifest: Union[Manifest, None], config: Optional[Config] = None
) -> None:
Expand Down
13 changes: 13 additions & 0 deletions src/simdb/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from enum import Enum


class IngestionStatus(str, Enum):
QUEUED = "queued"
COPYING = "copying"
COPIED = "copied"
VALIDATING = "validating"
VALIDATED = "validated"
COMPLETED = "completed"

COPY_FAILED = "copy_failed"
VALIDATION_FAILED = "validation_failed"
5 changes: 5 additions & 0 deletions src/simdb/imas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ def imas_files(uri: URI) -> List[Path]:
@return: a list of files which contains the IDS data for the backend specified in
the URI
"""

# Early exit for NetCDF files
if uri.scheme == "file" and uri.path and uri.path.suffix == "nc":
return [uri.path]

backend = str(uri.path)
if backend.startswith("/"):
backend = backend[1:]
Expand Down
3 changes: 3 additions & 0 deletions src/simdb/remote/apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from .v1_1 import namespaces as namespaces_v1_1
from .v1_2 import api as api_v1_2
from .v1_2 import namespaces as namespaces_v1_2
from .v1_3 import api as api_v1_3
from .v1_3 import namespaces as namespaces_v1_3


def error(message: str) -> Response:
Expand Down Expand Up @@ -144,3 +146,4 @@ def get(self, user: User):
register(api_v1, "v1", namespaces_v1)
register(api_v1_1, "v1.1", namespaces_v1_1)
register(api_v1_2, "v1.2", namespaces_v1_2)
register(api_v1_3, "v1.3", namespaces_v1_3)
28 changes: 28 additions & 0 deletions src/simdb/remote/apis/v1_3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from flask_restx import Api

from simdb.remote.apis.files import api as file_ns
from simdb.remote.apis.metadata import api as metadata_ns
from simdb.remote.apis.watchers import api as watcher_ns
from simdb.remote.core.auth import TokenAuthenticator

from .simulations import api as sim_ns

api = Api(
title="SimDB REST API",
version="1.3",
description="SimDB REST API",
authorizations={
"basicAuth": {
"type": "basic",
},
"apiToken": {
"type": "apiKey",
"in": "header",
"name": TokenAuthenticator.TOKEN_HEADER_NAME,
},
},
security=["basicAuth", "apiToken"],
doc="/docs",
)

namespaces = [metadata_ns, watcher_ns, file_ns, sim_ns]
Loading