diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 9289b03..2d9cef6 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -20,23 +20,32 @@ maturin develop # compile Rust + install in dev mode
```
### Run Tests
-
+
```bash
# Rust unit tests (no DB needed)
cargo test
-
+
# Python unit tests (no DB needed)
python test.py
-
+
# Integration tests (SQLite)
python test.py --integration
-
+
# All tests
python test.py --all
```
-
+
+### Run Benchmarks
+
+To measure the performance of the query compiler:
+
+```bash
+cd ryx-query && cargo bench
+```
+
### Type Check
+
```bash
mypy ryx/
```
diff --git a/README.md b/README.md
index 25722f0..20e5791 100644
--- a/README.md
+++ b/README.md
@@ -75,10 +75,20 @@ async with ryx.transaction():
| **Backends** | All | All | **PG · MySQL · SQLite** |
| **Migrations** | Built-in | Alembic | **Built-in** |
-## Performance
+## Architecture
+
+
+
+
+
+Your Python queries are compiled to SQL in Rust, executed by sqlx, and decoded back — all without blocking the Python event loop.
+Since v0.1.3, the query engine has been extracted into a standalone crate `ryx-query`. This decouples the SQL compilation logic from the PyO3 bindings, enabling extreme performance and independent testing.
+
+## Performance
+
Benchmark of 1 000 rows on SQLite (lower is better):
-
+
| Operation | Ryx ORM | SQLAlchemy ORM | SQLAlchemy Core | Ryx raw |
|-----------|--------:|---------------:|----------------:|--------:|
| **bulk_create** | 0.0074 s | 0.1696 s | 0.0022 s | 0.0011 s |
@@ -86,59 +96,13 @@ Benchmark of 1 000 rows on SQLite (lower is better):
| **bulk_delete** | 0.0005 s | 0.0012 s | 0.0009 s | 0.0004 s |
| **filter + order + limit** | 0.0009 s | 0.0019 s | 0.0008 s | 0.0004 s |
| **aggregate** | 0.0002 s | 0.0015 s | 0.0005 s | 0.0001 s |
-
+
Ryx ORM is **16× faster** than SQLAlchemy ORM on bulk inserts and **2× faster** on deletes — while keeping the same Django-style API. The raw SQL layer (`raw_execute` / `raw_fetch`) gives you near-C speed when you need it.
+**Internal Compilation Speed**: Our query compiler is blindingly fast, with simple lookups compiled in **~248ns** and complex query trees in **~1µs**.
+
Run the benchmark yourself:
-```bash
-uv add sqlalchemy[asyncio] aiosqlite
-uv run python examples/13_benchmark_sqlalchemy.py
-```
-
-## Quick Start
-
-```bash
-pip install maturin
-maturin develop # compile Rust + install
-```
-
-```python
-import asyncio, ryx
-from ryx import Model, CharField
-
-class Article(Model):
- title = CharField(max_length=200)
-
-async def main():
- await ryx.setup("sqlite:///app.db")
- await ryx.migrate([Article])
- await Article.objects.create(title="Hello Ryx")
- print(await Article.objects.all())
-
-asyncio.run(main())
-```
-
-## Key Features
-
-- **30+ field types** — from `AutoField` to `JSONField`, with validation built in
-- **Q objects** — complex `AND` / `OR` / `NOT` expressions with nesting
-- **Aggregations** — `Count`, `Sum`, `Avg`, `Min`, `Max` with `GROUP BY` and `HAVING`
-- **Relationships** — `ForeignKey`, `OneToOneField`, `ManyToManyField` with `select_related` / `prefetch_related`
-- **Transactions** — async context managers with nested savepoints
-- **Signals** — `pre_save`, `post_save`, `pre_delete`, `post_delete` and more
-- **Migrations** — autodetect schema changes, generate and apply
-- **Validation** — field-level + model-level, collects all errors before raising
-- **Sync/async bridge** — use from sync or async code seamlessly
-- **CLI** — `python -m ryx migrate`, `makemigrations`, `shell`, `inspectdb`
-
-## Architecture
-
-
-
-
-
-Your Python queries are compiled to SQL in Rust, executed by sqlx, and decoded back — all without blocking the Python event loop.
## Documentation
diff --git a/docs/doc/advanced/index.mdx b/docs/doc/advanced/index.mdx
index 3a33771..23e33d3 100644
--- a/docs/doc/advanced/index.mdx
+++ b/docs/doc/advanced/index.mdx
@@ -15,5 +15,6 @@ Deep-dive topics for production-ready applications.
- **[Caching](./caching)** — Query result caching
- **[Custom Lookups](./custom-lookups)** — Extend the query API
- **[Sync/Async](./sync-async)** — Bridge between sync and async code
+- **[Multi-Databases](./multi-db)** - Multi-Database Support
- **[Raw SQL](./raw-sql)** — Escape hatch for complex queries
- **[CLI](./cli)** — Command-line management commands
diff --git a/docs/doc/advanced/multi-db.mdx b/docs/doc/advanced/multi-db.mdx
new file mode 100644
index 0000000..046f0c0
--- /dev/null
+++ b/docs/doc/advanced/multi-db.mdx
@@ -0,0 +1,99 @@
+---
+sidebar_position: 11
+title: Multi-Database Support
+description: Learn how to route queries across multiple databases in Ryx.
+---
+
+Ryx supports routing queries across multiple databases, allowing you to separate read and write workloads, split data across different servers, or use a dedicated database for specific models.
+
+## Configuration
+
+To enable multi-database support, provide a dictionary of URLs to `ryx_core.setup` instead of a single string. Each key in the dictionary serves as an **alias** for that database pool.
+
+```python
+from ryx import ryx_core
+
+# Configure multiple databases
+urls = {
+ "default": "postgresql://user:pass@localhost/main_db",
+ "users": "postgresql://user:pass@localhost/user_db",
+ "logs": "sqlite::memory:",
+}
+
+await ryx_core.setup(urls)
+```
+
+## Routing Strategies
+
+Ryx resolves which database to use for a query in the following order of priority:
+
+1. **Explicit Routing**: Using `.using(alias)` on a QuerySet.
+2. **Dynamic Router**: Using a configured `BaseRouter`.
+3. **Model Metadata**: Using the `database` option in `Model.Meta`.
+4. **Default**: Falling back to the `'default'` alias.
+
+### 1. Explicit Routing
+
+You can force a query to run on a specific database using the `.using()` method. This is useful for one-off queries or manual routing.
+
+```python
+# Read from the 'users' database
+users = await User.objects.using("users").all()
+
+# Write to the 'logs' database
+await Log.objects.using("logs").create(message="System boot")
+```
+
+### 2. Model-Level Routing
+
+You can assign a model to a specific database by default using the `database` option in its `Meta` class.
+
+```python
+class Log(Model):
+ message = CharField()
+
+ class Meta:
+ database = "logs"
+```
+
+Any query on `Log` will now use the `logs` database unless overridden by `.using()`.
+
+### 3. Dynamic Routing (The Router)
+
+For more complex logic (e.g., routing based on the environment, user, or model type), you can implement a custom router by inheriting from `BaseRouter`.
+
+```python
+from ryx.router import BaseRouter, set_router
+
+class MyProjectRouter(BaseRouter):
+ def db_for_read(self, model, **hints):
+ if model.__name__ == "User":
+ return "users"
+ return None # Fallback to default
+
+ def db_for_write(self, model, **hints):
+ if model.__name__ == "User":
+ return "users"
+ return None
+
+# Activate the router globally
+set_router(MyProjectRouter())
+```
+
+## Multi-Database Transactions
+
+Transactions in Ryx are tied to a specific database connection. To start a transaction on a non-default database, pass the `alias` to the `transaction()` context manager.
+
+```python
+import ryx
+
+async with Ryx.transaction(alias="users"):
+ await User.objects.create(name="Alice")
+ await User.objects.create(name="Bob")
+ # If an exception occurs, only changes to 'users' DB are rolled back.
+```
+
+### Nesting and Multiple Databases
+
+- If you start a transaction on a database that already has an active transaction on the current task, Ryx creates a **SAVEPOINT**.
+- If you start a transaction on a *different* database while another is active, Ryx starts a new independent transaction for that database.
diff --git a/docs/doc/internals/architecture.mdx b/docs/doc/internals/architecture.mdx
index a2d2471..0a8e9c2 100644
--- a/docs/doc/internals/architecture.mdx
+++ b/docs/doc/internals/architecture.mdx
@@ -7,7 +7,7 @@ sidebar_position: 2
Ryx is built in three layers, each with a clear responsibility.
## Layer Diagram
-
+
```
┌──────────────────────────────────────────────────────────┐
│ Python Layer (ryx/) │
@@ -17,8 +17,11 @@ Ryx is built in three layers, each with a clear responsibility.
│ PyO3 Boundary (src/lib.rs) │
│ QueryBuilder · TransactionHandle · Type Bridge · Async │
├──────────────────────────────────────────────────────────┤
-│ Rust Core (src/) │
-│ AST · Q-Trees · SQL Compiler · Executor · Pool · Tx │
+│ Modular Query Engine (ryx-query crate) │
+│ AST · Q-Trees · SQL Compiler · Lookup Registry │
+├──────────────────────────────────────────────────────────┤
+│ Rust Core (src/) │
+│ Executor · Pool · Transaction Logic │
├──────────────────────────────────────────────────────────┤
│ sqlx 0.8.6 + tokio 1.40 │
│ AnyPool · Async Drivers · Transactions │
@@ -27,6 +30,7 @@ Ryx is built in three layers, each with a clear responsibility.
└──────────────────────────────────────────────────────────┘
```
+
## Query Execution Flow
```
diff --git a/docs/doc/internals/query-compiler.mdx b/docs/doc/internals/query-compiler.mdx
index 8905064..2c86595 100644
--- a/docs/doc/internals/query-compiler.mdx
+++ b/docs/doc/internals/query-compiler.mdx
@@ -3,11 +3,13 @@ sidebar_position: 4
---
# Query Compiler
+
+The heart of Ryx — transforms Python query expressions into optimized SQL.
-The heart of Ryx — transforms Python query expressions into optimized SQL.
-
+Since v0.1.3, the compiler resides in the standalone `ryx-query` crate, decoupled from the Python bindings for maximum performance and testability.
+
## Pipeline
-
+
```
Python QuerySet methods
│
@@ -15,26 +17,27 @@ Python QuerySet methods
QueryNode (Rust AST)
│
▼
-compiler::compile()
+ryx_query::compiler::compile()
│
▼
CompiledQuery { sql: String, values: Vec }
```
-
+
## AST Types
-
+
### QueryNode
-
+
The root of every query:
-
+
```rust
pub struct QueryNode {
- pub operation: QueryOperation, // Select, Aggregate, Count, Delete, Update, Insert
pub table: String,
- pub columns: Vec,
+ pub backend: Backend, // DB backend for SQL generation
+ pub operation: QueryOperation, // Select, Aggregate, Count, Delete, Update, Insert
pub filters: Vec,
- pub q_tree: Option,
+ pub q_filter: Option,
pub joins: Vec,
+ pub annotations: Vec,
pub group_by: Vec,
pub having: Vec,
pub order_by: Vec,
@@ -43,20 +46,20 @@ pub struct QueryNode {
pub distinct: bool,
}
```
-
+
### QNode — Boolean Expression Tree
-
+
```rust
pub enum QNode {
Leaf { field: String, lookup: String, value: SqlValue, negated: bool },
- And { left: Box, right: Box },
- Or { left: Box, right: Box },
- Not { inner: Box },
+ And(Vec),
+ Or(Vec),
+ Not(Box),
}
```
-
+
### SqlValue — Type-Safe Values
-
+
```rust
pub enum SqlValue {
Null,
@@ -64,27 +67,25 @@ pub enum SqlValue {
Int(i64),
Float(f64),
Text(String),
- Bytes(Vec),
- Date(chrono::NaiveDate),
- Time(chrono::NaiveTime),
- DateTime(chrono::NaiveDateTime),
- Json(serde_json::Value),
+ List(Vec),
}
```
-
+
### JoinClause
-
+
```rust
-pub enum JoinKind { Inner, LeftOuter, RightOuter, FullOuter, Cross }
-
+pub enum JoinKind { Inner, LeftOuter, RightOuter, FullOuter, CrossJoin }
+
pub struct JoinClause {
- pub table: String,
- pub condition: String,
pub kind: JoinKind,
+ pub table: String,
pub alias: Option,
+ pub on_left: String,
+ pub on_right: String,
}
```
+
## Compilation Process
1. **SELECT clause** — `columns` or `*`
diff --git a/docs/docusaurus.config.js b/docs/docusaurus.config.js
index 0bf37a6..f1c2db3 100644
--- a/docs/docusaurus.config.js
+++ b/docs/docusaurus.config.js
@@ -62,7 +62,7 @@ const config = {
},
{
type: 'custom-search-bar',
- position: 'right',
+ position: 'center',
},
{
type: 'custom-github-stats',
diff --git a/docs/docusaurus.config.ts b/docs/docusaurus.config.ts
index 84afc25..bf3879e 100644
--- a/docs/docusaurus.config.ts
+++ b/docs/docusaurus.config.ts
@@ -4,8 +4,8 @@ import type { Config } from '@docusaurus/types';
const config: Config = {
title: 'Ryx ORM',
tagline: 'Django-style Python ORM. Powered by Rust.',
- favicon: 'img/favicon.ico',
- url: 'https://ryx.alldotpy.dev',
+ favicon: 'img/logo.svg',
+ url: 'https://ryx.alldotpy.com',
baseUrl: '/',
organizationName: 'AllDotPy',
projectName: 'Ryx',
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 6541724..e466e9b 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -70,6 +70,7 @@ const sidebars = {
'advanced/caching',
'advanced/custom-lookups',
'advanced/sync-async',
+ 'advanced/multi-db',
'advanced/raw-sql',
'advanced/cli',
],
diff --git a/ryx-query/src/ast.rs b/ryx-query/src/ast.rs
index 13b00c9..2701b6a 100644
--- a/ryx-query/src/ast.rs
+++ b/ryx-query/src/ast.rs
@@ -253,7 +253,8 @@ pub enum QueryOperation {
#[derive(Debug, Clone)]
pub struct QueryNode {
pub table: String,
- pub backend: Backend, // Database backend for SQL generation
+ pub backend: Backend, // Database backend for SQL generation
+ pub db_alias: Option, // Optional alias for multi-db routing
pub operation: QueryOperation,
// # WHERE
@@ -287,6 +288,7 @@ impl QueryNode {
Self {
table: table.into(),
backend: Backend::PostgreSQL, // default, will be overridden at runtime
+ db_alias: None,
operation: QueryOperation::Select { columns: None },
filters: Vec::new(),
q_filter: None,
@@ -377,4 +379,10 @@ impl QueryNode {
self.backend = backend;
self
}
+
+ #[must_use]
+ pub fn with_db_alias(mut self, alias: String) -> Self {
+ self.db_alias = Some(alias);
+ self
+ }
}
diff --git a/ryx-query/src/compiler/compiler.rs b/ryx-query/src/compiler/compiler.rs
index 98faa35..7ddab47 100644
--- a/ryx-query/src/compiler/compiler.rs
+++ b/ryx-query/src/compiler/compiler.rs
@@ -25,6 +25,7 @@ use super::helpers;
pub struct CompiledQuery {
pub sql: String,
pub values: Vec,
+ pub db_alias: Option,
}
pub fn compile(node: &QueryNode) -> QueryResult {
@@ -42,7 +43,11 @@ pub fn compile(node: &QueryNode) -> QueryResult {
returning_id,
} => compile_insert(node, cv, *returning_id, &mut values)?,
};
- Ok(CompiledQuery { sql, values })
+ Ok(CompiledQuery {
+ sql,
+ values,
+ db_alias: node.db_alias.clone(),
+ })
}
fn compile_select(
diff --git a/ryx/__init__.py b/ryx/__init__.py
index d4e6ec7..2a02f66 100644
--- a/ryx/__init__.py
+++ b/ryx/__init__.py
@@ -110,7 +110,7 @@
# Setup
async def setup(
- url: str,
+ urls: str | dict, # str | dict to maintain backward.
*,
max_connections: int = 10,
min_connections: int = 1,
@@ -119,8 +119,13 @@ async def setup(
max_lifetime: int = 1800,
) -> None:
"""Initialize the ryx connection pool. Call once at startup."""
+
+ # For old versions wrap the url with a dict
+ if isinstance(urls, str):
+ urls = {'default': urls}
+
await _core.setup(
- url,
+ urls,
max_connections=max_connections,
min_connections=min_connections,
connect_timeout=connect_timeout,
@@ -149,8 +154,8 @@ def available_transforms() -> list[str]:
return list(_core.list_transforms())
-def is_connected() -> bool:
- return _core.is_connected()
+def is_connected(db_alias: str = 'default') -> bool:
+ return _core.is_connected(db_alias)
def pool_stats() -> dict:
diff --git a/ryx/models.py b/ryx/models.py
index b5ad841..598aa1b 100644
--- a/ryx/models.py
+++ b/ryx/models.py
@@ -28,7 +28,7 @@ async def after_delete(self) → post-SQL hook
import re
from datetime import datetime
-from typing import Any, Dict, List, Optional, Type
+from typing import Any, Dict, List, Optional
from ryx import ryx_core as _core
from ryx.exceptions import DoesNotExist, MultipleObjectsReturned
@@ -93,6 +93,7 @@ class Options:
Attributes:
table_name : SQL table name.
app_label : Optional namespace prefix.
+ database : Optional database alias (e.g. "logs").
fields : Ordered dict name → Field.
many_to_many : Dict name → ManyToManyField (populated by M2M fields).
pk_field : The primary key Field.
@@ -113,7 +114,9 @@ def __init__(self, meta_class: Optional[type], model_name: str) -> None:
self.table_name = _to_table_name(model_name)
self.app_label: str = getattr(meta_class, "app_label", "")
+ self.database: Optional[str] = getattr(meta_class, "database", None)
self.ordering: List[str] = list(getattr(meta_class, "ordering", []))
+
self.unique_together: List[tuple] = list(
getattr(meta_class, "unique_together", [])
)
@@ -157,8 +160,12 @@ def get_field(self, name: str) -> Field:
class Manager:
"""Default query manager. Proxies to QuerySet."""
- def __init__(self) -> None:
- self._model: Optional[type] = None
+ def __init__(self, alias: Optional[str] = None) -> None:
+ self._model: Optional[type[Model]] = None
+ self._alias = alias
+
+ def contribute_to_class(self, model: type, name: str) -> None:
+ self._model = model
def contribute_to_class(self, model: type, name: str) -> None:
self._model = model
@@ -166,7 +173,7 @@ def contribute_to_class(self, model: type, name: str) -> None:
def get_queryset(self):
from ryx.queryset import QuerySet
- return QuerySet(self._model)
+ return QuerySet(self._model, _using=self._alias)
# Proxy shortcuts
def all(self):
@@ -181,8 +188,12 @@ def exclude(self, *q, **kw):
def order_by(self, *f):
return self.get_queryset().order_by(*f)
- def using(self, alias):
- return self.get_queryset() # future: multi-db
+ def using(self, alias: str) -> "Manager":
+ """Return a new Manager bound to the specified database alias."""
+ new_mgr = Manager()
+ new_mgr._model = self._model
+ new_mgr._alias = alias
+ return new_mgr
def cache(self, **kw):
return self.get_queryset().cache(**kw)
@@ -226,7 +237,22 @@ async def count(self) -> int:
async def create(self, **kw):
"""Create and save a new model instance."""
instance = self._model(**kw)
- await instance.save()
+
+ # Use the manager's alias if specified
+ from ryx.router import get_router
+
+ router = get_router()
+ alias = None
+ if router:
+ alias = router.db_for_write(self._model)
+ if not alias:
+ alias = self._model._meta.database
+ if not alias:
+ alias = self._alias
+
+ # We need a way to pass the alias to instance.save()
+ # Let's add an optional `using` argument to save()
+ await instance.save(using=alias)
return instance
async def get_or_create(self, defaults: Optional[dict] = None, **kw):
@@ -502,7 +528,11 @@ async def full_clean(self) -> None:
# Persistence
async def save(
- self, *, validate: bool = True, update_fields: Optional[List[str]] = None
+ self,
+ *,
+ validate: bool = True,
+ update_fields: Optional[List[str]] = None,
+ using: Optional[str] = None,
) -> None:
"""Save the instance to the database.
@@ -513,6 +543,7 @@ async def save(
Args:
validate: Run field validators + clean() before SQL (default: True).
update_fields: If given, only UPDATE these field names (reduces SQL chatter).
+ using: Explicitly specify the database alias to use.
"""
created = self.pk is None
@@ -529,6 +560,17 @@ async def save(
# pre_save signal
await pre_save.send(sender=type(self), instance=self, created=created)
+ # Resolve database alias: using -> Router.db_for_write -> Meta.database -> 'default'
+ from ryx.router import get_router
+
+ router = get_router()
+ alias = using
+ if not alias:
+ if router:
+ alias = router.db_for_write(type(self))
+ if not alias:
+ alias = self._meta.database
+
# SQL execution
# Creation
if created:
@@ -542,6 +584,8 @@ async def save(
(f.column, f.to_db(getattr(self, f.attname))) for f in fields_to_save
]
builder = _core.QueryBuilder(self._meta.table_name)
+ if alias:
+ builder = builder.set_using(alias)
new_id = await builder.execute_insert(values, returning_id=True)
if self._meta.pk_field:
object.__setattr__(self, self._meta.pk_field.attname, new_id)
@@ -566,6 +610,8 @@ async def save(
]
pk_field = self._meta.pk_field
builder = _core.QueryBuilder(self._meta.table_name)
+ if alias:
+ builder = builder.set_using(alias)
builder = builder.add_filter(
pk_field.column, "exact", self.pk, negated=False
)
@@ -591,10 +637,22 @@ async def delete(self) -> None:
await self.before_delete()
await pre_delete.send(sender=type(self), instance=self)
+ # Resolve database alias: Router.db_for_write -> Meta.database -> 'default'
+ from ryx.router import get_router
+
+ router = get_router()
+ alias = None
+ if router:
+ alias = router.db_for_write(type(self))
+ if not alias:
+ alias = self._meta.database
+
from ryx import ryx_core as _core
pk_field = self._meta.pk_field
builder = _core.QueryBuilder(self._meta.table_name)
+ if alias:
+ builder = builder.set_using(alias)
builder = builder.add_filter(pk_field.column, "exact", self.pk, negated=False)
await builder.execute_delete()
diff --git a/ryx/queryset.py b/ryx/queryset.py
index 376fac8..512dc0f 100644
--- a/ryx/queryset.py
+++ b/ryx/queryset.py
@@ -256,6 +256,7 @@ def __init__(
_select_columns: Optional[List[str]] = None,
_annotations: Optional[List[dict]] = None,
_group_by: Optional[List[str]] = None,
+ _using: Optional[str] = None,
) -> None:
self._model = model
@@ -265,6 +266,7 @@ def __init__(
self._select_columns = _select_columns
self._annotations = _annotations or []
self._group_by = _group_by or []
+ self._using = _using
def _clone(self, builder=None, **overrides) -> "QuerySet":
return QuerySet(
@@ -273,6 +275,7 @@ def _clone(self, builder=None, **overrides) -> "QuerySet":
_select_columns=overrides.get("_select_columns", self._select_columns),
_annotations=overrides.get("_annotations", list(self._annotations)),
_group_by=overrides.get("_group_by", list(self._group_by)),
+ _using=overrides.get("_using", self._using),
)
def _validate_filters(self, kwargs: Dict[str, Any]) -> None:
@@ -558,8 +561,12 @@ def stream(
)
def using(self, alias: str) -> "QuerySet":
- """Stub for multi-database routing (planned feature)."""
- return self._clone()
+ """Switch the database used for this query.
+
+ Example::
+ posts = await Post.objects.using("replica").filter(active=True)
+ """
+ return self._clone(_using=alias)
# Evaluation (async)
def cache(
@@ -605,28 +612,78 @@ def cache(
def __await__(self):
return self._execute().__await__()
+ def _resolve_db_alias(self, operation: str = "read") -> str:
+ """
+ Resolve the database alias based on priority:
+ 1. .using(alias)
+ 2. Router.db_for_read/write
+ 3. Model.Meta.database
+ 4. 'default'
+ """
+ # 1. Explicitly set via .using()
+ if self._using:
+ return self._using
+
+ # 2. Dynamic Router
+ from ryx.router import get_router
+
+ router = get_router()
+ if router:
+ if operation == "read":
+ res = router.db_for_read(self._model)
+ else:
+ res = router.db_for_write(self._model)
+ if res:
+ return res
+
+ # 3. Model Meta
+ if self._model._meta.database:
+ return self._model._meta.database
+
+ # 4. Fallback
+ return "default"
+
async def _execute(self) -> list:
- raw_rows = await self._builder.fetch_all()
+ alias = self._resolve_db_alias("read")
+
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
+ raw_rows = await builder.fetch_all()
return [self._model._from_row(row) for row in raw_rows]
async def count(self) -> int:
- return await self._builder.fetch_count()
+ alias = self._resolve_db_alias("read")
+
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
+ return await builder.fetch_count()
async def first(self) -> Optional["Model"]:
- raw = await self._builder.set_limit(1).fetch_first()
- return None if raw is None else self._model._from_row(raw)
+ alias = self._resolve_db_alias("read")
- async def last(self) -> Optional["Model"]:
- # Support explicit ordering from .order_by(...).last().
- # If no rows, return None.
- results = await self._execute()
- return results[-1] if results else None
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
+ raw = await builder.set_limit(1).fetch_first()
+ return None if raw is None else self._model._from_row(raw)
async def get(self, *q_args: Q, **kwargs: Any) -> "Model":
"""Return exactly one instance. Raises DoesNotExist / MultipleObjectsReturned."""
qs = self.filter(*q_args, **kwargs) if (q_args or kwargs) else self
+
+ alias = qs._resolve_db_alias("read")
+
+ builder = qs._builder
+ if alias:
+ builder = builder.set_using(alias)
+
try:
- raw = await qs._builder.fetch_get()
+ raw = await builder.fetch_get()
except RuntimeError as e:
msg = str(e)
if "No matching" in msg:
@@ -641,16 +698,44 @@ async def get(self, *q_args: Q, **kwargs: Any) -> "Model":
return self._model._from_row(raw)
async def exists(self) -> bool:
- return await self.count() > 0
+ alias = self._resolve_db_alias("read")
+
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
+ return await builder.count() > 0
async def delete(self) -> int:
"""Bulk delete. Fires pre_bulk_delete / post_bulk_delete signals."""
+ alias = self._resolve_db_alias("write")
+
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
await pre_bulk_delete.send(sender=self._model, queryset=self)
- n = await self._builder.execute_delete()
+ n = await builder.execute_delete()
await post_bulk_delete.send(sender=self._model, queryset=self, deleted_count=n)
return n
+ async def update(self, **kwargs: Any) -> int:
+ """Bulk update. Fires pre_update / post_update signals."""
+
+ alias = self._resolve_db_alias("write")
+
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
+ await pre_update.send(sender=self._model, queryset=self, fields=kwargs)
+ n = await builder.execute_update(list(kwargs.items()))
+ await post_update.send(
+ sender=self._model, queryset=self, updated_count=n, fields=kwargs
+ )
+ return n
+
async def bulk_delete(self) -> int:
"""Alias for delete()."""
return await self.delete()
@@ -658,8 +743,15 @@ async def bulk_delete(self) -> int:
async def update(self, **kwargs: Any) -> int:
"""Bulk update. Fires pre_update / post_update signals."""
+ # Resolve database alias: .using() -> Meta.database -> default
+ alias = self._using or self._model._meta.database
+
+ builder = self._builder
+ if alias:
+ builder = builder.set_using(alias)
+
await pre_update.send(sender=self._model, queryset=self, fields=kwargs)
- n = await self._builder.execute_update(list(kwargs.items()))
+ n = await builder.execute_update(list(kwargs.items()))
await post_update.send(
sender=self._model, queryset=self, updated_count=n, fields=kwargs
)
diff --git a/ryx/router.py b/ryx/router.py
new file mode 100644
index 0000000..6fa6451
--- /dev/null
+++ b/ryx/router.py
@@ -0,0 +1,49 @@
+"""
+Ryx ORM — Database Router
+
+A router allows you to automatically route queries to different databases
+based on the model, the operation (read vs write), or other hints.
+"""
+
+from __future__ import annotations
+from typing import Any, Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from ryx.models import Model
+
+
+class BaseRouter:
+ """
+ Base class for database routers.
+ Override these methods to implement custom routing logic.
+
+ Returning None tells Ryx to fall back to the model's Meta.database
+ or the global 'default' database.
+ """
+
+ def db_for_read(self, model: type[Model], **hints: Any) -> Optional[str]:
+ """Return the alias of the database to use for read operations."""
+ return None
+
+ def db_for_write(self, model: type[Model], **hints: Any) -> Optional[str]:
+ """Return the alias of the database to use for write operations."""
+ return None
+
+ def allow_migrate(self, db: str, app_label: str, model_name: str) -> Optional[bool]:
+ """Return True/False to allow/disallow migrations on a specific DB."""
+ return None
+
+
+# Global router instance
+_router: Optional[BaseRouter] = None
+
+
+def set_router(router: BaseRouter) -> None:
+ """Set the global router for the application."""
+ global _router
+ _router = router
+
+
+def get_router() -> Optional[BaseRouter]:
+ """Retrieve the currently configured router."""
+ return _router
diff --git a/ryx/ryx_core.pyi b/ryx/ryx_core.pyi
index 5387ec5..f9e44f5 100644
--- a/ryx/ryx_core.pyi
+++ b/ryx/ryx_core.pyi
@@ -35,12 +35,11 @@ from typing import Any, Optional
__version__: str
"""Semver version of the compiled Rust core, e.g. ``"0.2.0"``."""
-# ---------------------------------------------------------------------------
+#
# Module-level functions
-# ---------------------------------------------------------------------------
-
+#
async def setup(
- url: str,
+ urls: dict,
max_connections: int = 10,
min_connections: int = 1,
connect_timeout: int = 30,
@@ -139,7 +138,7 @@ def list_transforms() -> list[str]:
...
-def is_connected() -> bool:
+def is_connected(alias: str = 'default') -> bool:
"""Return ``True`` if ``setup()`` has been called successfully.
Pure in-memory check — no database round-trip.
@@ -354,10 +353,7 @@ class QueryBuilder:
"""
...
- # ------------------------------------------------------------------
# Filter / WHERE
- # ------------------------------------------------------------------
-
def add_filter(
self,
field: str,
@@ -447,10 +443,7 @@ class QueryBuilder:
"""
...
- # ------------------------------------------------------------------
# Aggregation / GROUP BY
- # ------------------------------------------------------------------
-
def add_annotation(
self,
alias: str,
@@ -495,10 +488,7 @@ class QueryBuilder:
"""
...
- # ------------------------------------------------------------------
# JOIN
- # ------------------------------------------------------------------
-
def add_join(
self,
kind: str,
@@ -538,10 +528,7 @@ class QueryBuilder:
"""
...
- # ------------------------------------------------------------------
# Ordering / pagination
- # ------------------------------------------------------------------
-
def add_order_by(self, field: str) -> "QueryBuilder":
"""Append an ``ORDER BY`` term.
@@ -597,10 +584,16 @@ class QueryBuilder:
"""
...
- # ------------------------------------------------------------------
- # Introspection
- # ------------------------------------------------------------------
+ def set_using(alias: str) -> "QueryBuilder":
+ """Set the database to use for this query
+
+ Returns
+ -------
+ A new ``QueryBuilder`` with bd_alias set to the new alias.
+ """
+ ...
+ # Introspection
def compiled_sql(self) -> str:
"""Return the compiled SQL string without executing the query.
@@ -622,10 +615,7 @@ class QueryBuilder:
"""
...
- # ------------------------------------------------------------------
# Async execution
- # ------------------------------------------------------------------
-
async def fetch_all(self) -> list[dict[str, Any]]:
"""Execute the current SELECT and return all matching rows.
@@ -799,10 +789,9 @@ class QueryBuilder:
...
-# ---------------------------------------------------------------------------
+#
# TransactionHandle
-# ---------------------------------------------------------------------------
-
+#
class TransactionHandle:
"""A live database transaction, owned by the Rust ``Arc>>``.
diff --git a/ryx/transaction.py b/ryx/transaction.py
index 07d5af2..5c5f08a 100644
--- a/ryx/transaction.py
+++ b/ryx/transaction.py
@@ -73,7 +73,8 @@ class TransactionContext:
value so callers can use explicit ``savepoint()`` / ``rollback_to()``.
"""
- def __init__(self) -> None:
+ def __init__(self, alias: Optional[str] = None) -> None:
+ self._alias = alias
self._handle = None # set in __aenter__
self._savepoint_name: Optional[str] = None
self._outer_token = None # for ContextVar reset
@@ -84,20 +85,23 @@ def __init__(self) -> None:
async def __aenter__(self):
outer = _active_tx.get()
+ # If there is an outer transaction, check if it's for the same database.
+ # If it's for a different database, we treat this as a new outermost
+ # transaction for that specific database.
if outer is not None:
- # Nested transaction → SAVEPOINT
- # We reuse the outer transaction's connection and create a named
- # savepoint. The name is unique per nesting level.
- sp_name = f"_Ryx_sp_{id(self)}"
- self._savepoint_name = sp_name
- await outer.savepoint(sp_name)
- self._handle = outer
- logger.debug("Nested transaction: created savepoint %s", sp_name)
- else:
- # Outermost transaction → BEGIN
- self._handle = await _core.begin_transaction()
- logger.debug("Transaction BEGIN")
-
+ outer_alias = outer.get_alias()
+ if outer_alias == self._alias:
+ # Nested transaction on same DB → SAVEPOINT
+ sp_name = f"_Ryx_sp_{id(self)}"
+ self._savepoint_name = sp_name
+ await outer.savepoint(sp_name)
+ self._handle = outer
+ logger.debug("Nested transaction: created savepoint %s", sp_name)
+ return self._handle
+
+ # Outermost transaction (or transaction on a different DB) → BEGIN
+ self._handle = await _core.begin_transaction(self._alias)
+ logger.debug("Transaction BEGIN (alias=%s)", self._alias)
self._outer_token = _active_tx.set(self._handle)
self._previous_tx = outer
_core._set_active_transaction(self._handle)
@@ -136,17 +140,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
return False
-def transaction() -> TransactionContext:
+def transaction(alias: Optional[str] = None) -> TransactionContext:
"""Return an async context manager for database transactions.
Usage::
-
- async with Ryx.transaction():
+ async with ryx.transaction():
await Post.objects.create(title="Atomic post")
await Tag.objects.create(name="python")
+ # Transaction on a specific database:
+ async with ryx.transaction(alias="user_db"):
+ await User.objects.create(name="Atomic user")
+
# With explicit handle for savepoints:
- async with Ryx.transaction() as tx:
+ async with ryx.transaction() as tx:
await Order.objects.create(total=99.99)
await tx.savepoint("before_items")
try:
@@ -157,7 +164,6 @@ def transaction() -> TransactionContext:
raise
Nesting::
-
async with Ryx.transaction(): # BEGIN
...
async with Ryx.transaction(): # SAVEPOINT _Ryx_sp_...
@@ -167,7 +173,7 @@ def transaction() -> TransactionContext:
Returns:
:class:`TransactionContext` — an async context manager.
"""
- return TransactionContext()
+ return TransactionContext(alias)
def get_active_transaction():
@@ -178,7 +184,7 @@ def get_active_transaction():
Example::
- tx = Ryx.get_active_transaction()
+ tx = ryx.get_active_transaction()
if tx:
# we're inside a transaction — the next ORM call auto-enlists
pass
diff --git a/src/executor.rs b/src/executor.rs
index 585f293..f37dbb1 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -87,25 +87,22 @@ pub async fn fetch_all(query: CompiledQuery) -> RyxResult> {
}
return Err(RyxError::Internal("Transaction is no longer active".into()));
}
-
- let pool = pool::get()?;
-
+
+ let pool = pool::get(query.db_alias.as_deref())?;
+
debug!(sql = %query.sql, "Executing SELECT");
-
- // Build the sqlx query and bind all values.
- // We use `sqlx::query()` (the dynamic version) because our SQL is
- // constructed at runtime — we can't use the compile-time `query!` macro.
+
let mut q = sqlx::query(&query.sql);
q = bind_values(q, &query.values);
-
- // Fetch all rows and decode each one into a DecodedRow.
- let rows = q.fetch_all(pool).await.map_err(RyxError::Database)?;
-
+
+ let rows = q.fetch_all(&*pool).await.map_err(RyxError::Database)?;
+
let decoded = rows.iter().map(decode_row).collect();
Ok(decoded)
}
-
+
/// Execute a SELECT COUNT(*) query and return the count.
+
///
/// # Errors
/// Same as [`fetch_all`].
@@ -118,7 +115,6 @@ pub async fn fetch_count(query: CompiledQuery) -> RyxResult {
if rows.is_empty() {
return Ok(0);
}
- // COUNT() returns a single column whose name may vary by backend.
if let Some(value) = rows[0].values().next() {
if let Some(i) = value.as_i64() {
return Ok(i);
@@ -133,26 +129,25 @@ pub async fn fetch_count(query: CompiledQuery) -> RyxResult {
}
return Err(RyxError::Internal("Transaction is no longer active".into()));
}
-
- let pool = pool::get()?;
-
+
+ let pool = pool::get(query.db_alias.as_deref())?;
+
debug!(sql = %query.sql, "Executing COUNT");
-
+
let mut q = sqlx::query(&query.sql);
q = bind_values(q, &query.values);
-
- let row = q.fetch_one(pool).await.map_err(RyxError::Database)?;
-
- // COUNT(*) always returns a single column. We try to get it as i64
- // first (Postgres/SQLite), then fall back to i32 (some MySQL versions).
+
+ let row = q.fetch_one(&*pool).await.map_err(RyxError::Database)?;
+
let count: i64 = row.try_get(0).unwrap_or_else(|_| {
let n: i32 = row.try_get(0).unwrap_or(0);
n as i64
});
-
+
Ok(count)
}
+
/// Execute a SELECT and return at most one row.
///
/// # Errors
@@ -178,16 +173,16 @@ pub async fn fetch_one(query: CompiledQuery) -> RyxResult {
Err(RyxError::Internal("Transaction is no longer active".into()))
}
} else {
- let pool = pool::get()?;
-
+ let pool = pool::get(query.db_alias.as_deref())?;
+
let mut q = sqlx::query(&query.sql);
q = bind_values(q, &query.values);
-
+
// Limit to 2 at the executor level (the QueryNode may already have
// LIMIT 1 set by `.first()`, but for `.get()` it doesn't).
// We check the count in Rust rather than adding SQL complexity.
- let rows = q.fetch_all(pool).await.map_err(RyxError::Database)?;
-
+ let rows = q.fetch_all(&*pool).await.map_err(RyxError::Database)?;
+
match rows.len() {
0 => Err(RyxError::DoesNotExist),
1 => Ok(decode_row(&rows[0])),
@@ -196,6 +191,7 @@ pub async fn fetch_one(query: CompiledQuery) -> RyxResult {
}
}
+
/// Execute an INSERT, UPDATE, or DELETE query.
///
/// For INSERT queries with `RETURNING` clause, this fetches the returned
@@ -228,37 +224,38 @@ pub async fn execute(query: CompiledQuery) -> RyxResult {
}
return Err(RyxError::Internal("Transaction is no longer active".into()));
}
-
- let pool = pool::get()?;
-
+
+ let pool = pool::get(query.db_alias.as_deref())?;
+
debug!(sql = %query.sql, "Executing mutation");
-
+
// Check if this is a RETURNING query (e.g. INSERT ... RETURNING id)
if query.sql.to_uppercase().contains("RETURNING") {
let mut q = sqlx::query(&query.sql);
q = bind_values(q, &query.values);
-
- let rows = q.fetch_all(pool).await.map_err(RyxError::Database)?;
-
+
+ let rows = q.fetch_all(&*pool).await.map_err(RyxError::Database)?;
+
let last_insert_id = rows.first().and_then(|row| row.try_get::(0).ok());
-
+
return Ok(MutationResult {
rows_affected: rows.len() as u64,
last_insert_id,
});
}
-
+
let mut q = sqlx::query(&query.sql);
q = bind_values(q, &query.values);
-
- let result = q.execute(pool).await.map_err(RyxError::Database)?;
-
+
+ let result = q.execute(&*pool).await.map_err(RyxError::Database)?;
+
Ok(MutationResult {
rows_affected: result.rows_affected(),
last_insert_id: None,
})
}
+
// ###
// Internal helpers
// ###
diff --git a/src/lib.rs b/src/lib.rs
index 0a7e04c..4bbba98 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -28,7 +28,7 @@ use crate::transaction::TransactionHandle;
#[pyfunction]
#[pyo3(signature = (
- url,
+ urls,
max_connections = 10,
min_connections = 1,
connect_timeout = 30,
@@ -37,13 +37,22 @@ use crate::transaction::TransactionHandle;
))]
fn setup<'py>(
py: Python<'py>,
- url: String,
+ urls: Bound<'_, PyAny>,
max_connections: u32,
min_connections: u32,
connect_timeout: u64,
idle_timeout: u64,
max_lifetime: u64,
) -> PyResult> {
+ let urls_py = urls.cast::()?;
+ let mut database_urls = HashMap::new();
+
+ for (key, value) in urls_py.iter() {
+ let alias = key.cast::()?.to_str()?.to_string();
+ let url = value.cast::()?.to_str()?.to_string();
+ database_urls.insert(alias, url);
+ }
+
let config = PoolConfig {
max_connections,
min_connections,
@@ -52,7 +61,7 @@ fn setup<'py>(
max_lifetime_secs: max_lifetime,
};
pyo3_async_runtimes::tokio::future_into_py(py, async move {
- pool::initialize(&url, config).await.map_err(PyErr::from)?;
+ pool::initialize(database_urls, config).await.map_err(PyErr::from)?;
Python::attach(|py| Ok(py.None().into_pyobject(py)?.unbind()))
})
}
@@ -79,25 +88,32 @@ fn list_transforms() -> Vec<&'static str> {
#[pyfunction]
-fn is_connected(py: Python<'_>) -> bool {
- pool::is_initialized()
+fn is_connected(_py: Python<'_>, alias: Option) -> bool {
+ // For now we just check if the registry is initialized
+ pool::is_initialized(alias)
}
#[pyfunction]
-fn pool_stats(py: Python<'_>) -> PyResult> {
- let stats = pool::stats().map_err(PyErr::from)?;
+fn pool_stats<'py>(py: Python<'py>, alias: Option) -> PyResult> {
+ let stats = pool::stats(alias.as_deref()).map_err(PyErr::from)?;
let dict = PyDict::new(py);
dict.set_item("size", stats.size)?;
dict.set_item("idle", stats.idle)?;
- Ok(dict.into())
+ Ok(dict.into_any())
}
#[pyfunction]
-fn raw_fetch<'py>(py: Python<'py>, sql: String) -> PyResult> {
+#[pyo3(signature = (sql, alias=None))]
+fn raw_fetch<'py>(
+ py: Python<'py>,
+ sql: String,
+ alias: Option,
+) -> PyResult> {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let compiled = compiler::CompiledQuery {
sql,
values: vec![],
+ db_alias: alias,
};
let rows = executor::fetch_all(compiled).await.map_err(PyErr::from)?;
Python::attach(|py| {
@@ -106,19 +122,27 @@ fn raw_fetch<'py>(py: Python<'py>, sql: String) -> PyResult> {
})
})
}
-
+
#[pyfunction]
-fn raw_execute<'py>(py: Python<'py>, sql: String) -> PyResult> {
+#[pyo3(signature = (sql, alias=None))]
+fn raw_execute<'py>(
+ py: Python<'py>,
+ sql: String,
+ alias: Option,
+) -> PyResult> {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let compiled = compiler::CompiledQuery {
sql,
values: vec![],
+ db_alias: alias,
};
executor::execute(compiled).await.map_err(PyErr::from)?;
Python::attach(|py| Ok(py.None().into_pyobject(py)?.unbind()))
})
}
+
+
// ###
// QueryBuilder
// ###
@@ -134,12 +158,18 @@ impl PyQueryBuilder {
#[new]
fn new(table: String) -> PyResult {
// Get the backend from the pool at QueryBuilder creation time
- let backend = pool::get_backend().unwrap_or(ryx_query::Backend::PostgreSQL);
+ let backend = pool::get_backend(None).unwrap_or(ryx_query::Backend::PostgreSQL);
Ok(Self {
node: QueryNode::select(table).with_backend(backend),
})
}
+
+ fn set_using(&self, alias: String) -> PyResult {
+ Ok(PyQueryBuilder {
+ node: self.node.clone().with_db_alias(alias),
+ })
+ }
fn add_filter(
&self,
@@ -552,6 +582,15 @@ pub struct PyTransactionHandle {
#[pymethods]
impl PyTransactionHandle {
+ fn get_alias(&self) -> PyResult