diff --git a/contrib/bubseek-marimo/README.md b/contrib/bubseek-marimo/README.md index 7991a8b..4e2ee67 100644 --- a/contrib/bubseek-marimo/README.md +++ b/contrib/bubseek-marimo/README.md @@ -29,7 +29,7 @@ Open `http://localhost:2718/` — marimo gallery. Click **dashboard** for chat + - Canonical runtime location: `/insights/*.py` - `dashboard.py` and `index.py` are generated starter notebooks, not hand-maintained repository assets -- `workspace` resolution order: `BUB_MARIMO_WORKSPACE` -> `BUB_WORKSPACE_PATH` -> current working directory +- `workspace` resolution order: `BUB_MARIMO_WORKSPACE` -> current working directory - Packaged plugin installs still write generated notebooks into the active workspace, never into the installed package directory - Format: single `.py` with `@app.cell`, PEP 723 - Gallery notebooks must contain the scanner markers `import marimo` and `marimo.App` @@ -52,4 +52,3 @@ The E2E suite runs with `BUB_RUNTIME_ENABLED=0`, so it does not need a remote mo | `BUB_MARIMO_HOST` | Bind host (default: `127.0.0.1`) | | `BUB_MARIMO_PORT` | Bind port (default: `2718`) | | `BUB_MARIMO_WORKSPACE` | Override the Bub workspace root used for runtime notebook output | -| `BUB_WORKSPACE_PATH` | Default Bub workspace root when `BUB_MARIMO_WORKSPACE` is unset | diff --git a/contrib/bubseek-marimo/src/bubseek_marimo/channel.py b/contrib/bubseek-marimo/src/bubseek_marimo/channel.py index d6a997e..3466300 100644 --- a/contrib/bubseek-marimo/src/bubseek_marimo/channel.py +++ b/contrib/bubseek-marimo/src/bubseek_marimo/channel.py @@ -17,6 +17,7 @@ from loguru import logger from pydantic_settings import BaseSettings, SettingsConfigDict +from bubseek.oceanbase import resolve_tapestore_url from bubseek_marimo.chat_store import MarimoChatStore, TurnConflictError from bubseek_marimo.notebooks import ensure_seed_notebooks @@ -24,71 +25,6 @@ from aiohttp import web as web_types -def _discover_project_root_fallback(start: Path) -> Path | None: - """Walk up from start for a directory containing .env (used when bubseek not installed).""" - for d in [start, *start.parents]: - if (d / ".env").is_file(): - return d - return None - - -def discover_project_root(start: Path | str | None = None) -> Path | None: - """Use bubseek.config when available, otherwise fall back to local discovery.""" - if start is None: - start = Path.cwd() - start = Path(start) - try: - from bubseek.config import discover_project_root as _discover_project_root - except ImportError: - return _discover_project_root_fallback(start) - else: - return _discover_project_root(start) - - -def env_with_workspace_dotenv(workspace: Path | str) -> dict[str, str]: - """Load environment variables from workspace/.env with a local fallback.""" - workspace = Path(workspace) - try: - from bubseek.config import env_with_workspace_dotenv as _env_with_workspace_dotenv - except ImportError: - from dotenv import dotenv_values - - env = dict(os.environ) - env_file = workspace / ".env" - if env_file.is_file(): - for key, value in dotenv_values(env_file).items(): - if isinstance(key, str) and isinstance(value, str): - env[key] = value - return env - else: - return _env_with_workspace_dotenv(workspace) - - -def resolve_tapestore_url(workspace: Path | str | None = None, discover_from: Path | str | None = None) -> str: - """Resolve the tape store URL using bubseek helpers when they are installed.""" - if workspace is not None: - workspace = Path(workspace) - try: - from bubseek.config import resolve_tapestore_url as _resolve_tapestore_url - except ImportError: - if workspace is not None: - url = (env_with_workspace_dotenv(workspace).get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() - else: - start = Path(discover_from).resolve() if discover_from else Path.cwd().resolve() - for d in [start, *start.parents]: - if (d / ".env").is_file(): - url = (env_with_workspace_dotenv(d).get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() - break - else: - url = (os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() - else: - url = _resolve_tapestore_url(workspace, discover_from) - - if url: - return url - raise RuntimeError("BUB_TAPESTORE_SQLALCHEMY_URL is required for the marimo channel") - - def _load_web() -> Any: """Load aiohttp.web or raise a clear runtime error.""" try: @@ -148,39 +84,30 @@ def __init__(self, on_receive: MessageHandler) -> None: def _workspace_dir(self) -> Path: if self._config.workspace: return Path(self._config.workspace).resolve() - - workspace = os.environ.get("BUB_WORKSPACE_PATH") - if workspace: - return Path(workspace).resolve() - - for start in (Path.cwd(), Path(__file__).resolve().parent): - root = discover_project_root(start) - if root is not None: - return root return Path.cwd().resolve() def _insights_dir(self) -> Path: return self._workspace_dir() / "insights" def _tapestore_url(self) -> str: - return resolve_tapestore_url(self._workspace_dir()) + if url := resolve_tapestore_url(): + return url + raise RuntimeError("BUB_TAPESTORE_SQLALCHEMY_URL is required for the marimo channel") def _ensure_seed_notebooks(self) -> None: insights_dir = self._insights_dir() workspace = self._workspace_dir() logger.info( - "Marimo workspace={} .env={} exists={}", + "Marimo workspace={}", workspace, - workspace / ".env", - (workspace / ".env").is_file(), ) ensure_seed_notebooks(insights_dir) logger.info("Ensured starter notebooks under {}", insights_dir) def _marimo_env(self) -> dict[str, str]: workspace = self._workspace_dir() - env = env_with_workspace_dotenv(workspace) - env["BUB_WORKSPACE_PATH"] = str(workspace) + env = dict(os.environ) + env["BUB_MARIMO_WORKSPACE"] = str(workspace) env["BUB_MARIMO_PORT"] = str(self._config.port) return env diff --git a/contrib/bubseek-marimo/src/skills/marimo/references/marimo-conventions.md b/contrib/bubseek-marimo/src/skills/marimo/references/marimo-conventions.md index c87b689..c6c8276 100644 --- a/contrib/bubseek-marimo/src/skills/marimo/references/marimo-conventions.md +++ b/contrib/bubseek-marimo/src/skills/marimo/references/marimo-conventions.md @@ -66,7 +66,7 @@ def _(items, mo): ## 7. Embedded data - For **self-contained** notebooks (e.g. Iris, demos), embed the full dataset **inside the first cell** (or in the same cell that uses it). Define any helper (e.g. `_row(...)`) in the same cell so it runs in the same isolated namespace; do not define it at module level and expect it to be visible in the cell. -- When the notebook depends on **environment or config** (e.g. DB URL), read via **bubseek settings** where possible (e.g. `BubSeekSettings().db.resolved_tapestore_url`) so it matches the rest of the app; document required env (e.g. `BUB_TAPESTORE_SQLALCHEMY_URL`) and add dependencies (e.g. `pyobvector`) in the script block when using OceanBase/seekdb. +- When the notebook depends on **environment or config** (e.g. DB URL), read via **bubseek runtime helpers** where possible (e.g. `resolve_tapestore_url()` from `bubseek.oceanbase`) so it matches the rest of the app; document required env (e.g. `BUB_TAPESTORE_SQLALCHEMY_URL`) and add dependencies (e.g. `pyobvector`) in the script block when using OceanBase/seekdb. --- diff --git a/contrib/bubseek-marimo/tests/test_marimo_e2e.py b/contrib/bubseek-marimo/tests/test_marimo_e2e.py index e3da55d..7b58e7e 100644 --- a/contrib/bubseek-marimo/tests/test_marimo_e2e.py +++ b/contrib/bubseek-marimo/tests/test_marimo_e2e.py @@ -40,8 +40,14 @@ async def _noop_handler(*_args, **_kwargs) -> None: return None +class _OceanBaseStubModule(ModuleType): + def resolve_tapestore_url(self, url: str | None = None) -> str: + return (url or os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() + + def _stub_bubseek_oceanbase(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setitem(sys.modules, "bubseek.oceanbase", ModuleType("bubseek.oceanbase")) + module = _OceanBaseStubModule("bubseek.oceanbase") + monkeypatch.setitem(sys.modules, "bubseek.oceanbase", module) def _require_tapestore_url() -> str: @@ -102,15 +108,13 @@ def _assert_notebook_loads(filename: str) -> tuple[int, str]: return status, body -def test_workspace_resolution_priority(monkeypatch, tmp_path) -> None: +def test_workspace_resolution_uses_explicit_marimo_workspace(monkeypatch, tmp_path) -> None: _stub_bubseek_oceanbase(monkeypatch) from bubseek_marimo.channel import MarimoChannel marimo_workspace = tmp_path / "marimo-workspace" - bubb_workspace = tmp_path / "bub-workspace" monkeypatch.setenv("BUB_TAPESTORE_SQLALCHEMY_URL", "mysql+oceanbase://seek:secret@seekdb.example:2881/analytics") monkeypatch.setenv("BUB_MARIMO_WORKSPACE", str(marimo_workspace)) - monkeypatch.setenv("BUB_WORKSPACE_PATH", str(bubb_workspace)) channel = MarimoChannel(_noop_handler) @@ -124,11 +128,7 @@ def test_workspace_resolution_falls_back_to_cwd(monkeypatch, tmp_path) -> None: monkeypatch.setenv("BUB_TAPESTORE_SQLALCHEMY_URL", "mysql+oceanbase://seek:secret@seekdb.example:2881/analytics") monkeypatch.delenv("BUB_MARIMO_WORKSPACE", raising=False) - monkeypatch.delenv("BUB_WORKSPACE_PATH", raising=False) monkeypatch.chdir(tmp_path) - # When no env and cwd has no .env, discover finds repo from channel __file__; force fallback to cwd - monkeypatch.setattr("bubseek_marimo.channel.discover_project_root", lambda start: None) - monkeypatch.setattr("bubseek_marimo.channel._discover_project_root_fallback", lambda start: None) channel = MarimoChannel(_noop_handler) @@ -142,10 +142,7 @@ def test_marimo_channel_requires_explicit_tapestore_url(monkeypatch, tmp_path) - monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) monkeypatch.delenv("BUB_MARIMO_WORKSPACE", raising=False) - monkeypatch.delenv("BUB_WORKSPACE_PATH", raising=False) monkeypatch.chdir(tmp_path) - monkeypatch.setattr("bubseek_marimo.channel.discover_project_root", lambda start: None) - monkeypatch.setattr("bubseek_marimo.channel._discover_project_root_fallback", lambda start: None) with pytest.raises(RuntimeError, match="BUB_TAPESTORE_SQLALCHEMY_URL is required"): MarimoChannel(_noop_handler) @@ -165,7 +162,7 @@ def gateway_process(): MARIMO_PORT = _pick_free_port() env["BUB_MARIMO_PORT"] = str(PORT) env["BUB_MARIMO_MARIMO_PORT"] = str(MARIMO_PORT) - env["BUB_WORKSPACE_PATH"] = str(workspace) + env["BUB_MARIMO_WORKSPACE"] = str(workspace) env["BUB_RUNTIME_ENABLED"] = "0" if shutil.which("marimo") is None: pytest.skip("marimo executable is not available in the current environment") diff --git a/contrib/bubseek-schedule/README.md b/contrib/bubseek-schedule/README.md index f81f2f2..6d57a21 100644 --- a/contrib/bubseek-schedule/README.md +++ b/contrib/bubseek-schedule/README.md @@ -47,7 +47,7 @@ dependencies = [ ## Debug: job in chat but not in Marimo kanban / DB -The gateway resolves the job store URL from `BUB_TAPESTORE_SQLALCHEMY_URL` in the workspace `.env` or process environment. Marimo must use the **same** URL. +The gateway resolves the job store URL from `BUB_TAPESTORE_SQLALCHEMY_URL` in the process environment. Marimo must use the **same** URL. From the bubseek repo root: diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py b/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py index 5bfff01..1ab9bdb 100644 --- a/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py +++ b/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py @@ -18,17 +18,17 @@ def _get_jobstore_url() -> str: - """Resolve tapestore URL (workspace .env, BUB_WORKSPACE_PATH, cwd) like the rest of bubseek.""" - from bubseek.config import resolve_tapestore_url + """Resolve tapestore URL from the shared runtime environment.""" + from bubseek.oceanbase import resolve_tapestore_url return resolve_tapestore_url() def _normalize_url(url: str) -> str: """Use mysql+oceanbase for pyobvector dialect when mysql is configured.""" - if "mysql" in url.lower() and "oceanbase" not in url.lower(): - return url.replace("mysql+pymysql", "mysql+oceanbase", 1).replace("mysql://", "mysql+oceanbase://", 1) - return url + from bubseek.oceanbase import normalize_oceanbase_url + + return normalize_oceanbase_url(url) class OceanBaseJobStore(BaseJobStore): diff --git a/docs/configuration.md b/docs/configuration.md index 7aa94c3..91c69ac 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -89,7 +89,6 @@ Visit http://127.0.0.1:2718 after enabling. | `BUB_MAX_STEPS` | Max steps per conversation | | `BUB_MAX_TOKENS` | Max tokens per response | | `BUB_SEARCH_OLLAMA_API_KEY` | For web search tool | -| `BUB_WORKSPACE_PATH` | Workspace directory | ## Add contrib packages diff --git a/insights/schedule_kanban.py b/insights/schedule_kanban.py index 8ba8104..9a5d346 100644 --- a/insights/schedule_kanban.py +++ b/insights/schedule_kanban.py @@ -21,65 +21,26 @@ @app.cell -def _(): # noqa: C901 +def _(): import contextlib - import os import pickle from datetime import UTC, datetime - from pathlib import Path import marimo as mo from sqlalchemy import MetaData, Table, case, create_engine, inspect, select, text _default_seekdb = "mysql+oceanbase://root:@127.0.0.1:2881/bub" - def _resolve_tapestore_like_gateway() -> str: # noqa: C901 - """Same DB as bubseek: env → workspace → repo .env (via __file__) → notebook_dir.""" + def _resolve_tapestore_like_gateway() -> str: + """Use the same tapestore URL as the gateway process.""" try: - from bubseek.config import discover_project_root, resolve_tapestore_url + from bubseek.oceanbase import resolve_tapestore_url except Exception: return _default_seekdb - direct = (os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() - if direct: - return direct - - ws = (os.environ.get("BUB_WORKSPACE_PATH") or "").strip() - if ws: - with contextlib.suppress(Exception): - return resolve_tapestore_url(workspace=Path(ws).resolve()) - - file_parent = None - with contextlib.suppress(NameError): - if __file__ and str(__file__).strip(): - file_parent = Path(__file__).resolve().parent - if file_parent is not None: - root = discover_project_root(file_parent) - if root is not None: - with contextlib.suppress(Exception): - return resolve_tapestore_url(workspace=root) - - discover = None with contextlib.suppress(Exception): - nd = getattr(mo, "notebook_dir", None) - if callable(nd) and nd() is not None: - discover = Path(nd()).resolve() - if discover is not None: - with contextlib.suppress(Exception): - u = resolve_tapestore_url(workspace=None, discover_from=discover) - if u: - return u - - if file_parent is not None: - with contextlib.suppress(Exception): - u = resolve_tapestore_url(workspace=None, discover_from=file_parent) - if u: - return u - - with contextlib.suppress(Exception): - u = resolve_tapestore_url() - if u: - return u + if url := resolve_tapestore_url(): + return url return _default_seekdb try: diff --git a/insights/tape_monitor.py b/insights/tape_monitor.py index 4220f45..49cfa19 100644 --- a/insights/tape_monitor.py +++ b/insights/tape_monitor.py @@ -4,7 +4,7 @@ # /// """Tape Monitor — compact seekdb tapestore dashboard (tabs: Summary / Runs / Tokens / More). -Tapestore URL: `bubseek.config.resolve_tapestore_url` · Marimo: `http://localhost:2718/?file=tape_monitor.py` +Tapestore URL: `bubseek.oceanbase.resolve_tapestore_url` · Marimo: `http://localhost:2718/?file=tape_monitor.py` """ import marimo as mo @@ -19,7 +19,6 @@ def _(): import os import re from datetime import UTC, datetime - from pathlib import Path from urllib.parse import urlparse import marimo as mo @@ -30,18 +29,9 @@ def _(): tapestore_url = None try: try: - from bubseek.config import resolve_tapestore_url - - discover = None - with contextlib.suppress(Exception): - nd = getattr(mo, "notebook_dir", None) - if callable(nd) and nd() is not None: - discover = Path(nd()).resolve() - if discover is None: - with contextlib.suppress(NameError): - if __file__ and str(__file__).strip(): - discover = Path(__file__).resolve().parent - tapestore_url = resolve_tapestore_url(workspace=None, discover_from=discover) + from bubseek.oceanbase import resolve_tapestore_url + + tapestore_url = resolve_tapestore_url() except Exception: tapestore_url = (os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() or _default_seekdb if not tapestore_url: diff --git a/scripts/create-bub-db.py b/scripts/create-bub-db.py index 723dd57..583e1c4 100644 --- a/scripts/create-bub-db.py +++ b/scripts/create-bub-db.py @@ -4,6 +4,6 @@ from __future__ import annotations if __name__ == "__main__": - from bubseek.database import ensure_database + from bubseek.oceanbase import ensure_database ensure_database() diff --git a/scripts/query_apscheduler_jobs.py b/scripts/query_apscheduler_jobs.py index 66a0465..32058fe 100644 --- a/scripts/query_apscheduler_jobs.py +++ b/scripts/query_apscheduler_jobs.py @@ -7,7 +7,6 @@ uv run python scripts/query_apscheduler_jobs.py uv run python scripts/query_apscheduler_jobs.py --job-id 6718144d - uv run python scripts/query_apscheduler_jobs.py --workspace /path/to/bubseek BUB_TAPESTORE_SQLALCHEMY_URL='mysql+oceanbase://...' uv run python scripts/query_apscheduler_jobs.py """ @@ -17,16 +16,10 @@ import contextlib import os import sys -from pathlib import Path def main() -> int: parser = argparse.ArgumentParser(description="List APScheduler persisted jobs (debug).") - parser.add_argument( - "--workspace", - type=Path, - help="bubseek project root containing .env (default: cwd, with parent walk for .env)", - ) parser.add_argument("--job-id", dest="job_id", help="only print rows whose id contains this substring") parser.add_argument("--url", help="override SQLAlchemy URL") args = parser.parse_args() @@ -38,15 +31,10 @@ def main() -> int: url = os.environ["BUB_TAPESTORE_SQLALCHEMY_URL"].strip() label = "env BUB_TAPESTORE_SQLALCHEMY_URL" else: - from bubseek.config import resolve_tapestore_url - - ws = args.workspace.resolve() if args.workspace else None - if ws is not None: - url = resolve_tapestore_url(workspace=ws) - label = f"resolve_tapestore_url(workspace={ws})" - else: - url = resolve_tapestore_url() - label = "resolve_tapestore_url() from cwd / .env walk" + from bubseek.oceanbase import resolve_tapestore_url + + url = resolve_tapestore_url() + label = "resolve_tapestore_url() from process env" if not url: print("Could not resolve tapestore URL.", file=sys.stderr) diff --git a/src/bubseek/config.py b/src/bubseek/config.py deleted file mode 100644 index 4a9c269..0000000 --- a/src/bubseek/config.py +++ /dev/null @@ -1,156 +0,0 @@ -"""Configuration for Bubseek bootstrap and tape store resolution.""" - -from __future__ import annotations - -import os -from pathlib import Path -from urllib.parse import urlparse - -from pydantic import Field -from pydantic_settings import BaseSettings, SettingsConfigDict - -_SETTINGS_CONFIG = SettingsConfigDict( - env_file=".env", - env_file_encoding="utf-8", - extra="ignore", -) - - -def normalize_oceanbase_url(url: str) -> str: - """Treat MySQL-style seekdb URLs as OceanBase URLs.""" - normalized = url.strip() - lowered = normalized.lower() - if not lowered.startswith("mysql"): - return normalized - if lowered.startswith("mysql+oceanbase://"): - return normalized - if lowered.startswith("mysql+pymysql://"): - return normalized.replace("mysql+pymysql://", "mysql+oceanbase://", 1) - if lowered.startswith("mysql://"): - return normalized.replace("mysql://", "mysql+oceanbase://", 1) - return normalized - - -def discover_project_root(start: Path | str | None = None) -> Path | None: - """Walk up from start (default cwd) until a directory containing .env is found. Use when package runs from .venv and cwd may not be project root.""" - if start is None: - start = Path.cwd() - start = Path(start).resolve() - for d in [start, *start.parents]: - if (d / ".env").is_file(): - return d - return None - - -def _workspace_env_file() -> Path | None: - """Return workspace/.env path when BUB_WORKSPACE_PATH is set (only that file, so kernel cwd does not change which .env is loaded).""" - workspace = os.environ.get("BUB_WORKSPACE_PATH") - if not workspace: - return None - path = Path(workspace).resolve() / ".env" - if not path.is_file(): - return None - return path - - -def _env_values(env_file: Path | None) -> dict[str, str]: - """Merge os.environ with an optional .env file using pydantic-settings compatible precedence.""" - from dotenv import dotenv_values - - env = dict(os.environ) - if env_file is None: - return env - for key, value in dotenv_values(env_file).items(): - if isinstance(key, str) and isinstance(value, str): - env[key] = value - return env - - -def env_with_workspace_dotenv(workspace: Path | str) -> dict[str, str]: - """Merge os.environ with workspace/.env for subprocess env (bub, marimo). Uses python-dotenv like pydantic-settings.""" - return _env_values(Path(workspace).resolve() / ".env") - - -class DatabaseSettings(BaseSettings): - """Database connection settings for tape store (OceanBase/seekdb only).""" - - model_config = _SETTINGS_CONFIG - - bub_home: Path = Field(default=Path.home() / ".bub", validation_alias="BUB_HOME") - tapestore_sqlalchemy_url: str = Field(default="", validation_alias="BUB_TAPESTORE_SQLALCHEMY_URL") - - @property - def resolved_tapestore_url(self) -> str: - """Return the explicit tape store URL.""" - return normalize_oceanbase_url(self.tapestore_sqlalchemy_url) - - @property - def backend_name(self) -> str: - """Return the normalized SQLAlchemy backend name for the resolved URL.""" - scheme = urlparse(self.resolved_tapestore_url).scheme.lower() - return scheme.split("+", 1)[0] - - def mysql_connection_params(self) -> tuple[str, int, str, str, str] | None: - """Return connection params when using a MySQL-compatible backend.""" - if self.backend_name != "mysql": - return None - try: - parsed = urlparse(self.resolved_tapestore_url) - host = parsed.hostname or "" - port = parsed.port or 3306 - user = parsed.username or "" - password = parsed.password or "" - database = parsed.path.strip("/") - except Exception: - return None - if not host or not database: - return None - return host, port, user, password, database - - -class BubSeekSettings(BaseSettings): - """Main bubseek configuration.""" - - model_config = _SETTINGS_CONFIG - - db: DatabaseSettings = Field(default_factory=DatabaseSettings) - - @classmethod - def from_workspace(cls, workspace: Path | str | None = None) -> BubSeekSettings: - """Load settings from workspace .env and apply the same values to nested settings.""" - env_file: Path | None = None - if workspace is not None: - path = Path(workspace).resolve() / ".env" - if path.is_file(): - env_file = path - else: - env_file = _workspace_env_file() - - env = _env_values(env_file) - return cls.model_validate({ - **env, - "db": DatabaseSettings.model_validate(env), - }) - - -def resolve_tapestore_url( - workspace: Path | str | None = None, - discover_from: Path | str | None = None, -) -> str: - """Single source of truth for tapestore URL. - - - If workspace is given: use workspace/.env (BubSeekSettings). - - Else if BUB_WORKSPACE_PATH is set: use that workspace. - - Else walk discover_from (or cwd) and parents for first .env, use that directory as workspace. - - Else use process environment only. - """ - if workspace is not None: - return BubSeekSettings.from_workspace(workspace).db.resolved_tapestore_url - env_workspace = os.environ.get("BUB_WORKSPACE_PATH") - if env_workspace: - return BubSeekSettings.from_workspace(env_workspace).db.resolved_tapestore_url - start = Path(discover_from).resolve() if discover_from else Path.cwd().resolve() - for d in [start, *start.parents]: - if (d / ".env").is_file(): - return BubSeekSettings.from_workspace(d).db.resolved_tapestore_url - return BubSeekSettings.from_workspace(None).db.resolved_tapestore_url diff --git a/src/bubseek/database.py b/src/bubseek/database.py deleted file mode 100644 index db00016..0000000 --- a/src/bubseek/database.py +++ /dev/null @@ -1,93 +0,0 @@ -"""Database bootstrap helpers for seekdb/OceanBase-backed runtimes.""" - -from __future__ import annotations - -import sys -from pathlib import Path - -import typer - -from bubseek.config import BubSeekSettings - -CREATE_DB_HINT = """ -Please create the database manually, for example: - mysql -h{host} -P{port} -u{user} -p -e "CREATE DATABASE `{database}` DEFAULT CHARACTER SET utf8mb4" - -Or run: uv run python scripts/create-bub-db.py -""" - - -def database_exists(host: str, port: int, user: str, password: str, database: str) -> bool: - """Return whether the configured OceanBase or seekdb database already exists.""" - import pymysql - - try: - conn = pymysql.connect( - host=host, - port=port, - user=user, - password=password, - database=database, - charset="utf8mb4", - ) - conn.close() - except pymysql.err.OperationalError as exc: - if exc.args[0] == 1049: - return False - raise - else: - return True - - -def create_database(host: str, port: int, user: str, password: str, database: str) -> bool: - """Create the configured OceanBase or seekdb database when credentials permit.""" - import pymysql - - try: - conn = pymysql.connect( - host=host, - port=port, - user=user, - password=password, - charset="utf8mb4", - ) - with conn.cursor() as cursor: - cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{database}` DEFAULT CHARACTER SET utf8mb4") - conn.close() - except Exception: - return False - else: - return True - - -def ensure_database(workspace: Path | None = None) -> None: - """Pre-flight database creation for MySQL-compatible backends only.""" - settings = BubSeekSettings.from_workspace((workspace or Path.cwd()).resolve()) - params = settings.db.mysql_connection_params() - if params is None: - return - - host, port, user, password, database = params - try: - if database_exists(host, port, user, password, database): - return - except Exception as exc: - typer.echo(f"Cannot connect to {host}:{port}: {exc}", err=True) - typer.echo("Ensure OceanBase/seekdb is running.", err=True) - raise typer.Exit(1) from exc - - hint = CREATE_DB_HINT.format(host=host, port=port, user=user, database=database).strip() - if sys.stdin.isatty() and not typer.confirm( - f"Database {database!r} does not exist. Create it?", - default=False, - ): - typer.echo(hint, err=True) - raise typer.Exit(1) - - if create_database(host, port, user, password, database): - typer.echo(f"Database {database!r} created at {host}:{port}", err=True) - return - - typer.echo(f"Cannot create database {database!r}.", err=True) - typer.echo(hint, err=True) - raise typer.Exit(1) diff --git a/src/bubseek/oceanbase.py b/src/bubseek/oceanbase.py index 33f2ceb..e3ba940 100644 --- a/src/bubseek/oceanbase.py +++ b/src/bubseek/oceanbase.py @@ -1,51 +1,177 @@ -"""Register pyobvector SQLAlchemy dialect for OceanBase/seekdb compatibility.""" +"""OceanBase/seekdb runtime helpers and SQLAlchemy dialect patches.""" from __future__ import annotations +import os +import sys from collections.abc import Callable from typing import Any, cast +from urllib.parse import urlparse import pymysql import pyobvector # noqa: F401 +import typer from bub import hookimpl from pyobvector.schema.dialect import OceanBaseDialect as _OceanBaseDialect from sqlalchemy.dialects import registry +DEFAULT_MYSQL_PORT = 3306 +MYSQL_DATABASE_NOT_FOUND_ERROR = 1049 +MYSQL_SAVEPOINT_NOT_FOUND_ERROR = 1305 +MYSQL_DUPLICATE_INDEX_ERROR = 1061 + +CREATE_DB_HINT = """ +Please create the database manually, for example: + mysql -h{host} -P{port} -u{user} -p -e "CREATE DATABASE `{database}` DEFAULT CHARACTER SET utf8mb4" + +Or run: uv run python scripts/create-bub-db.py +""" + + +def normalize_oceanbase_url(url: str) -> str: + """Treat MySQL-style seekdb URLs as OceanBase URLs.""" + normalized = url.strip() + lowered = normalized.lower() + if not lowered.startswith("mysql"): + return normalized + if lowered.startswith("mysql+oceanbase://"): + return normalized + if lowered.startswith("mysql+pymysql://"): + return normalized.replace("mysql+pymysql://", "mysql+oceanbase://", 1) + if lowered.startswith("mysql://"): + return normalized.replace("mysql://", "mysql+oceanbase://", 1) + return normalized + + +def resolve_tapestore_url(url: str | None = None) -> str: + """Resolve the tapestore URL from an explicit value or the process environment.""" + raw_url = os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL", "") if url is None else url + return normalize_oceanbase_url(raw_url) + + +def mysql_connection_params( + url: str | None = None, +) -> tuple[str, int, str, str, str] | None: + """Return connection params for MySQL-compatible URLs.""" + resolved_url = resolve_tapestore_url(url) + if not resolved_url: + return None + + parsed = urlparse(resolved_url) + backend_name = parsed.scheme.lower().split("+", 1)[0] + if backend_name != "mysql": + return None + + host = parsed.hostname or "" + database = parsed.path.strip("/") + if not host or not database: + return None + + return ( + host, + parsed.port or DEFAULT_MYSQL_PORT, + parsed.username or "", + parsed.password or "", + database, + ) + + +def database_exists(host: str, port: int, user: str, password: str, database: str) -> bool: + """Return whether the configured OceanBase or seekdb database already exists.""" + try: + connection = pymysql.connect( + host=host, + port=port, + user=user, + password=password, + database=database, + charset="utf8mb4", + ) + connection.close() + except pymysql.err.OperationalError as exc: + if exc.args[0] == MYSQL_DATABASE_NOT_FOUND_ERROR: + return False + raise + return True + + +def create_database(host: str, port: int, user: str, password: str, database: str) -> bool: + """Create the configured OceanBase or seekdb database when credentials permit.""" + try: + connection = pymysql.connect( + host=host, + port=port, + user=user, + password=password, + charset="utf8mb4", + ) + with connection.cursor() as cursor: + cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{database}` DEFAULT CHARACTER SET utf8mb4") + connection.close() + except Exception: + return False + return True + + +def ensure_database(url: str | None = None) -> None: + """Create the configured database on demand for MySQL-compatible backends.""" + params = mysql_connection_params(url) + if params is None: + return + + host, port, user, password, database = params + try: + if database_exists(host, port, user, password, database): + return + except Exception as exc: + typer.echo(f"Cannot connect to {host}:{port}: {exc}", err=True) + typer.echo("Ensure OceanBase/seekdb is running.", err=True) + raise typer.Exit(1) from exc + + hint = CREATE_DB_HINT.format(host=host, port=port, user=user, database=database).strip() + if sys.stdin.isatty() and not typer.confirm( + f"Database {database!r} does not exist. Create it?", + default=False, + ): + typer.echo(hint, err=True) + raise typer.Exit(1) + + if create_database(host, port, user, password, database): + typer.echo(f"Database {database!r} created at {host}:{port}", err=True) + return + + typer.echo(f"Cannot create database {database!r}.", err=True) + typer.echo(hint, err=True) + raise typer.Exit(1) + def _is_savepoint_not_exist(exc: BaseException) -> bool: """Check if exception is MySQL 1305 (savepoint does not exist).""" - if isinstance(exc, pymysql.err.OperationalError) and exc.args and exc.args[0] == 1305: + if isinstance(exc, pymysql.err.OperationalError) and exc.args and exc.args[0] == MYSQL_SAVEPOINT_NOT_FOUND_ERROR: return True - orig = getattr(exc, "orig", None) - if orig is not None and orig is not exc: - return _is_savepoint_not_exist(orig) + original = getattr(exc, "orig", None) + if original is not None and original is not exc: + return _is_savepoint_not_exist(original) return False class OceanBaseDialect(_OceanBaseDialect): - """OceanBase dialect that tolerates missing savepoints. - - OceanBase/seekdb may implicitly release savepoints on errors (e.g. deadlock, - failed DML). When SQLAlchemy later tries RELEASE SAVEPOINT or ROLLBACK TO - SAVEPOINT, it gets (1305, 'savepoint does not exist'). We catch and ignore - that to avoid masking the original error. - """ + """OceanBase dialect that tolerates missing savepoints.""" - # SQLAlchemy only reads this on the concrete dialect class (__dict__), not via MRO. supports_statement_cache = True def do_release_savepoint(self, connection, name: str) -> None: try: super().do_release_savepoint(connection, name) - except Exception as e: - if not _is_savepoint_not_exist(e): + except Exception as exc: + if not _is_savepoint_not_exist(exc): raise def do_rollback_to_savepoint(self, connection, name: str) -> None: try: super().do_rollback_to_savepoint(connection, name) - except Exception as e: - if not _is_savepoint_not_exist(e): + except Exception as exc: + if not _is_savepoint_not_exist(exc): raise @@ -53,44 +179,40 @@ def do_rollback_to_savepoint(self, connection, name: str) -> None: def _patch_tape_store_validate_schema() -> None: - """Tolerate duplicate index (MySQL 1061) in bub_tapestore_sqlalchemy. - - seekdb/OceanBase introspection may not match SQLAlchemy's checkfirst, so - CREATE INDEX is attempted even when the index already exists on the table. - """ + """Tolerate duplicate index creation during tapestore schema validation.""" try: from bub_tapestore_sqlalchemy import store as _store except ImportError: return - _Store = _store.SQLAlchemyTapeStore - _orig = cast(Callable[[Any], None], _Store._validate_schema) + + store_cls = _store.SQLAlchemyTapeStore + original_validate = cast(Callable[[Any], None], store_cls._validate_schema) def _validate_schema_tolerant(self: Any) -> None: try: - _orig(self) - except Exception as e: - _orig_e = getattr(e, "orig", e) - if getattr(_orig_e, "args", (None,))[0] == 1061: + original_validate(self) + except Exception as exc: + original_exc = getattr(exc, "orig", exc) + if getattr(original_exc, "args", (None,))[0] == MYSQL_DUPLICATE_INDEX_ERROR: return - if "Duplicate key name" in str(e): + if "Duplicate key name" in str(exc): return raise - cast(Any, _Store)._validate_schema = _validate_schema_tolerant + cast(Any, store_cls)._validate_schema = _validate_schema_tolerant _patch_tape_store_validate_schema() def register(framework: object) -> object: - """Bub plugin entry point. Registers dialect only.""" + """Bub plugin entry point. Registers the OceanBase dialect plugin.""" return _OceanBaseDialectPlugin() class _OceanBaseDialectPlugin: - """Minimal plugin to satisfy Bub loader. Dialect already registered at import.""" + """Minimal plugin to satisfy the Bub loader.""" @hookimpl def provide_tape_store(self) -> None: - """Skip; let bub_tapestore_sqlalchemy provide the store.""" return None diff --git a/tests/test_bubseek.py b/tests/test_bubseek.py index 2e777d6..e76171e 100644 --- a/tests/test_bubseek.py +++ b/tests/test_bubseek.py @@ -6,8 +6,8 @@ from collections.abc import Iterator from contextlib import contextmanager from pathlib import Path -from types import ModuleType, SimpleNamespace -from typing import Any +from types import ModuleType +from typing import cast import pytest from bub.skills import _read_skill @@ -28,29 +28,33 @@ def imported_bubseek_modules(*module_names: str) -> Iterator[list[ModuleType]]: sys.modules.pop(module_name, None) -def _load_pyproject() -> dict[str, Any]: - return tomllib.loads((REPO_ROOT / "pyproject.toml").read_text(encoding="utf-8")) +def _load_pyproject() -> dict[str, object]: + return _as_dict(tomllib.loads((REPO_ROOT / "pyproject.toml").read_text(encoding="utf-8"))) -def _settings_with_db_params(params: tuple[str, int, str, str, str] | None) -> SimpleNamespace: - db = SimpleNamespace(mysql_connection_params=lambda: params) - return SimpleNamespace(db=db) +def _as_dict(value: object) -> dict[str, object]: + assert isinstance(value, dict) + return cast(dict[str, object], value) def test_distribution_metadata_exposes_bub_plugin_without_console_script() -> None: data = _load_pyproject() - project = data["project"] + project = _as_dict(data["project"]) assert "scripts" not in project - assert project["entry-points"]["bub"] == { - "oceanbase-dialect": "bubseek.oceanbase:register", + assert project["entry-points"] == { + "bub": { + "oceanbase-dialect": "bubseek.oceanbase:register", + }, } def test_pyproject_includes_package_and_builtin_skills_in_wheel() -> None: data = _load_pyproject() - build = data["tool"]["pdm"]["build"] + tool = _as_dict(data["tool"]) + pdm = _as_dict(tool["pdm"]) + build = _as_dict(pdm["build"]) assert build["includes"] == [ "src/bubseek", "src/skills", @@ -70,37 +74,41 @@ def test_bundled_skills_have_valid_frontmatter() -> None: assert "github-repo-cards" in skill_names -def test_resolve_tapestore_url_requires_explicit_url(monkeypatch, tmp_path: Path) -> None: - with imported_bubseek_modules("bubseek.config") as [config_mod]: - monkeypatch.setenv("BUB_HOME", str(tmp_path / "runtime-home")) - monkeypatch.delenv("BUB_WORKSPACE_PATH", raising=False) +def test_resolve_tapestore_url_requires_explicit_url(monkeypatch) -> None: + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setenv("BUB_TAPESTORE_SQLALCHEMY_URL", "") - settings = config_mod.DatabaseSettings() - - assert settings.resolved_tapestore_url == "" - assert settings.backend_name == "" - assert settings.mysql_connection_params() is None + assert oceanbase_mod.resolve_tapestore_url() == "" + assert oceanbase_mod.mysql_connection_params() is None -def test_database_settings_extract_mysql_params(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.config") as [config_mod]: +def test_mysql_connection_params_extract_mysql_values(monkeypatch) -> None: + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setenv( "BUB_TAPESTORE_SQLALCHEMY_URL", "mysql+pymysql://seek:secret@seekdb.example:2881/analytics", ) - settings = config_mod.DatabaseSettings() + assert oceanbase_mod.resolve_tapestore_url() == "mysql+oceanbase://seek:secret@seekdb.example:2881/analytics" + assert oceanbase_mod.mysql_connection_params() == ( + "seekdb.example", + 2881, + "seek", + "secret", + "analytics", + ) - assert settings.resolved_tapestore_url == "mysql+oceanbase://seek:secret@seekdb.example:2881/analytics" - assert settings.backend_name == "mysql" - assert settings.mysql_connection_params() == ( - "seekdb.example", - 2881, - "seek", - "secret", - "analytics", - ) + +def test_resolve_tapestore_url_prefers_explicit_argument_over_env(monkeypatch) -> None: + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: + monkeypatch.setenv( + "BUB_TAPESTORE_SQLALCHEMY_URL", + "mysql+oceanbase://env:secret@seekdb.example:2881/env_db", + ) + + url = oceanbase_mod.resolve_tapestore_url("mysql://cli:secret@seekdb.example:2881/cli_db") + + assert url == "mysql+oceanbase://cli:secret@seekdb.example:2881/cli_db" def test_oceanbase_registers_mysql_pymysql_alias() -> None: @@ -112,69 +120,9 @@ def test_oceanbase_registers_mysql_pymysql_alias() -> None: assert dialect_cls is oceanbase_mod.OceanBaseDialect -def test_resolve_tapestore_url_reads_workspace_env_file(monkeypatch, tmp_path: Path) -> None: - workspace = tmp_path / "workspace" - workspace.mkdir() - (workspace / ".env").write_text( - "BUB_TAPESTORE_SQLALCHEMY_URL=mysql+oceanbase://workspace:secret@seekdb.example:2881/workspace_db\n", - encoding="utf-8", - ) - - with imported_bubseek_modules("bubseek.config") as [config_mod]: - monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) - url = config_mod.resolve_tapestore_url(workspace=workspace) - - assert url == "mysql+oceanbase://workspace:secret@seekdb.example:2881/workspace_db" - - -def test_resolve_tapestore_url_prefers_bub_workspace_path(monkeypatch, tmp_path: Path) -> None: - workspace = tmp_path / "workspace" - workspace.mkdir() - (workspace / ".env").write_text( - "BUB_TAPESTORE_SQLALCHEMY_URL=mysql+oceanbase://workspace:secret@seekdb.example:2881/workspace_db\n", - encoding="utf-8", - ) - - other_root = tmp_path / "other" - nested = other_root / "nested" - nested.mkdir(parents=True) - (other_root / ".env").write_text( - "BUB_TAPESTORE_SQLALCHEMY_URL=mysql+oceanbase://discovered:secret@seekdb.example:2881/discovered_db\n", - encoding="utf-8", - ) - - with imported_bubseek_modules("bubseek.config") as [config_mod]: - monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) - monkeypatch.setenv("BUB_WORKSPACE_PATH", str(workspace)) - url = config_mod.resolve_tapestore_url(discover_from=nested) - - assert url == "mysql+oceanbase://workspace:secret@seekdb.example:2881/workspace_db" - - -def test_resolve_tapestore_url_discovers_parent_env(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "project" - nested = root / "src" / "pkg" - nested.mkdir(parents=True) - (root / ".env").write_text( - "BUB_TAPESTORE_SQLALCHEMY_URL=mysql+oceanbase://discovered:secret@seekdb.example:2881/discovered_db\n", - encoding="utf-8", - ) - - with imported_bubseek_modules("bubseek.config") as [config_mod]: - monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) - monkeypatch.delenv("BUB_WORKSPACE_PATH", raising=False) - url = config_mod.resolve_tapestore_url(discover_from=nested) - - assert url == "mysql+oceanbase://discovered:secret@seekdb.example:2881/discovered_db" - - def test_ensure_database_skips_non_mysql_backends(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.database") as [database_mod]: - monkeypatch.setattr( - database_mod.BubSeekSettings, - "from_workspace", - lambda workspace=None: _settings_with_db_params(None), - ) + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: + monkeypatch.setattr(oceanbase_mod, "mysql_connection_params", lambda *_: None) create_called = False exists_called = False @@ -188,47 +136,47 @@ def _database_exists(*args): exists_called = True return True - monkeypatch.setattr(database_mod, "create_database", _create_database) - monkeypatch.setattr(database_mod, "database_exists", _database_exists) + monkeypatch.setattr(oceanbase_mod, "create_database", _create_database) + monkeypatch.setattr(oceanbase_mod, "database_exists", _database_exists) - database_mod.ensure_database() + oceanbase_mod.ensure_database() assert not exists_called assert not create_called def test_ensure_database_returns_when_database_exists(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.database") as [database_mod]: + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setattr( - database_mod.BubSeekSettings, - "from_workspace", - lambda workspace=None: _settings_with_db_params(("seekdb.example", 2881, "seek", "secret", "analytics")), + oceanbase_mod, + "mysql_connection_params", + lambda *_: ("seekdb.example", 2881, "seek", "secret", "analytics"), ) create_called = False - monkeypatch.setattr(database_mod, "database_exists", lambda *args: True) + monkeypatch.setattr(oceanbase_mod, "database_exists", lambda *args: True) def _create_database(*args): nonlocal create_called create_called = True return True - monkeypatch.setattr(database_mod, "create_database", _create_database) + monkeypatch.setattr(oceanbase_mod, "create_database", _create_database) - database_mod.ensure_database() + oceanbase_mod.ensure_database() assert not create_called def test_ensure_database_creates_missing_database_without_prompt(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.database") as [database_mod]: + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setattr( - database_mod.BubSeekSettings, - "from_workspace", - lambda workspace=None: _settings_with_db_params(("seekdb.example", 2881, "seek", "secret", "analytics")), + oceanbase_mod, + "mysql_connection_params", + lambda *_: ("seekdb.example", 2881, "seek", "secret", "analytics"), ) - monkeypatch.setattr(database_mod, "database_exists", lambda *args: False) - monkeypatch.setattr(database_mod.sys.stdin, "isatty", lambda: False) + monkeypatch.setattr(oceanbase_mod, "database_exists", lambda *args: False) + monkeypatch.setattr(oceanbase_mod.sys.stdin, "isatty", lambda: False) created = False @@ -237,25 +185,25 @@ def _create_database(*args): created = True return True - monkeypatch.setattr(database_mod, "create_database", _create_database) + monkeypatch.setattr(oceanbase_mod, "create_database", _create_database) - database_mod.ensure_database() + oceanbase_mod.ensure_database() assert created def test_ensure_database_respects_tty_decline(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.database") as [database_mod]: + with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setattr( - database_mod.BubSeekSettings, - "from_workspace", - lambda workspace=None: _settings_with_db_params(("seekdb.example", 2881, "seek", "secret", "analytics")), + oceanbase_mod, + "mysql_connection_params", + lambda *_: ("seekdb.example", 2881, "seek", "secret", "analytics"), ) - monkeypatch.setattr(database_mod, "database_exists", lambda *args: False) - monkeypatch.setattr(database_mod.sys.stdin, "isatty", lambda: True) - monkeypatch.setattr(database_mod.typer, "confirm", lambda *args, **kwargs: False) + monkeypatch.setattr(oceanbase_mod, "database_exists", lambda *args: False) + monkeypatch.setattr(oceanbase_mod.sys.stdin, "isatty", lambda: True) + monkeypatch.setattr(oceanbase_mod.typer, "confirm", lambda *args, **kwargs: False) - with pytest.raises(database_mod.typer.Exit) as exc_info: - database_mod.ensure_database() + with pytest.raises(oceanbase_mod.typer.Exit) as exc_info: + oceanbase_mod.ensure_database() assert exc_info.value.exit_code == 1