From 37d2e01aa94405ad233322c49861a549b64d9a7a Mon Sep 17 00:00:00 2001 From: "WFR DAQ Server (ovh)" Date: Sun, 12 Apr 2026 22:37:12 +0000 Subject: [PATCH 1/3] Refactor slicks to exclusively query TimescaleDB securely via SQLAlchemy --- pyproject.toml | 5 +- src/slicks/__init__.py | 5 +- src/slicks/config.py | 69 ++---- src/slicks/discovery.py | 168 ++++---------- src/slicks/fetcher.py | 356 +++++------------------------ src/slicks/query_utils.py | 177 ++------------- src/slicks/scanner.py | 463 ++++---------------------------------- src/slicks/writer.py | 130 ----------- 8 files changed, 183 insertions(+), 1190 deletions(-) delete mode 100644 src/slicks/writer.py diff --git a/pyproject.toml b/pyproject.toml index da75ea0..f8ff478 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,10 +18,9 @@ classifiers = [ requires-python = ">=3.11" dependencies = [ "pandas>=2.0.0", - "influxdb3-python>=0.1.0", - "influxdb-client>=1.30.0", + "psycopg2-binary>=2.9.0", + "SQLAlchemy>=2.0.0", "cantools>=39.0.0", - "httpx>=0.27.0", "python-dotenv>=1.0.0", "matplotlib>=3.0.0", "tqdm>=4.0.0", diff --git a/src/slicks/__init__.py b/src/slicks/__init__.py index 11643e5..f88c44a 100644 --- a/src/slicks/__init__.py +++ b/src/slicks/__init__.py @@ -1,10 +1,9 @@ -from .fetcher import fetch_telemetry, fetch_telemetry_chunked, bulk_fetch_season, list_target_sensors, get_influx_client +from .fetcher import fetch_telemetry, fetch_telemetry_chunked, bulk_fetch_season, list_target_sensors, get_db_engine from .discovery import discover_sensors from .movement_detector import detect_movement_ratio, get_movement_segments, filter_data_in_movement -from .config import connect_influxdb3 +from .config import connect_timescaledb, connect_influxdb3 from .scanner import scan_data_availability from .can_decode import DecodedFrame, decode_frame, load_dbc, resolve_dbc_path -from .writer import WideWriter, frame_to_line_protocol, NON_SIGNAL_COLS # New analysis modules from . import battery diff --git a/src/slicks/config.py b/src/slicks/config.py index 37069dd..d7aadcc 100644 --- a/src/slicks/config.py +++ b/src/slicks/config.py @@ -1,71 +1,48 @@ import os +import warnings from typing import Optional from dotenv import load_dotenv -# Load environment variables from .env file if it exists load_dotenv() -# InfluxDB Configuration with Environment Overrides -# This makes the package "Open Source Ready" - anyone can swap these out via .env -INFLUX_URL: str = os.getenv("INFLUX_URL", "http://localhost:8086") -INFLUX_TOKEN: str = os.getenv("INFLUX_TOKEN", "my-token") -INFLUX_ORG: str = os.getenv("INFLUX_ORG", "Docs") -INFLUX_DB: str = os.getenv("INFLUX_DB", "WFR25") +POSTGRES_DSN: str = os.getenv("POSTGRES_DSN", "postgresql://wfr:wfr_password@localhost:5432/wfr") +POSTGRES_TABLE: str = os.getenv("POSTGRES_TABLE", "wfr26") -# Schema and Table Configuration (defaulting to iox.INFLUX_DB pattern) -INFLUX_SCHEMA: str = os.getenv("INFLUX_SCHEMA", "iox") -# If env var is not set or empty, default to INFLUX_DB -INFLUX_TABLE: str = os.getenv("INFLUX_TABLE") or INFLUX_DB - -def connect_influxdb3( - url: Optional[str] = None, - token: Optional[str] = None, - org: Optional[str] = None, - db: Optional[str] = None, - schema: Optional[str] = None, +def connect_timescaledb( + dsn: Optional[str] = None, table: Optional[str] = None, ) -> None: """ - Update the global configuration settings for InfluxDB connection. + Update the global configuration settings for TimescaleDB connection. Call this before using any slicks functions to configure your database. Args: - url: InfluxDB host URL (e.g., "https://your-instance.influxdb.cloud") - token: Your InfluxDB API token - org: Organization name (optional for InfluxDB 3.x) - db: Database/bucket name (e.g., "WFR25") - schema: IOx schema name (default: "iox") - table: IOx table name (default: same as db) + dsn: PostgreSQL connection string (e.g., "postgresql://user:pass@host:5432/db") + table: The timescale hypertable to query (e.g., "wfr26") Example: >>> import slicks - >>> slicks.connect_influxdb3( - ... url="https://us-east-1-1.aws.cloud2.influxdata.com", - ... token="your-api-token", - ... db="WFR25", - ... table="my_custom_table" + >>> slicks.connect_timescaledb( + ... dsn="postgresql://wfr:wfr_password@127.0.0.1:5432/wfr", + ... table="wfr26test" ... ) """ - global INFLUX_URL, INFLUX_TOKEN, INFLUX_ORG, INFLUX_DB, INFLUX_SCHEMA, INFLUX_TABLE - if url: INFLUX_URL = url - if token: INFLUX_TOKEN = token - if org: INFLUX_ORG = org - - # If DB is updated, we might need to update the default table name if it wasn't overridden - if db: - INFLUX_DB = db - # Only update table if it wasn't explicitly set to something else in this call - # and if it currently matches the OLD db name or is unset - if not table and (not INFLUX_TABLE or INFLUX_TABLE == db): - INFLUX_TABLE = db - - if schema: INFLUX_SCHEMA = schema - if table: INFLUX_TABLE = table + global POSTGRES_DSN, POSTGRES_TABLE + if dsn: POSTGRES_DSN = dsn + if table: POSTGRES_TABLE = table +def connect_influxdb3(*args, **kwargs) -> None: + warnings.warn( + "connect_influxdb3 is deprecated in favor of connect_timescaledb. " + "Slicks now connects to TimescaleDB via psycopg2 natively.", + DeprecationWarning, + stacklevel=2, + ) + if kwargs.get("db") or kwargs.get("table"): + connect_timescaledb(table=kwargs.get("db") or kwargs.get("table")) # Default Sensor Registry -# In an open-source context, this serves as an "Example Configuration" SIGNALS = [ "PackCurrent", # Primary heat source "M1_Thermistor1", # Module 1 Temp diff --git a/src/slicks/discovery.py b/src/slicks/discovery.py index 622acaf..4270b96 100644 --- a/src/slicks/discovery.py +++ b/src/slicks/discovery.py @@ -1,149 +1,63 @@ """ -Sensor discovery module. +Sensor discovery module for PostgreSQL / TimescaleDB. -Scans the database for all unique sensor names within a time range. -For wide schema (default), uses an instant information_schema.columns metadata lookup. -For narrow schema (legacy EAV), uses adaptive chunking with parallel execution. +Scans the database schema for all unique sensor names within a wide table. """ from __future__ import annotations -import threading import warnings -from datetime import datetime, timedelta -from typing import List, Optional - -from tqdm.auto import tqdm +from datetime import datetime +from typing import List +import pandas as pd from . import config -from .fetcher import get_influx_client -from .query_utils import adaptive_query, run_chunks_parallel, PermanentQueryError, quote_table -from .writer import NON_SIGNAL_COLS +from .fetcher import get_db_engine +NON_SIGNAL_COLS = {'time', 'message_name', 'can_id'} def discover_sensors( - start_time: datetime, - end_time: datetime, - chunk_size_days: int = 7, - client=None, + start_time: datetime = None, + end_time: datetime = None, + chunk_size_days: int = 7, # Kept for bw-compat API signatures + client=None, # Kept for bw-compat show_progress: bool = True, schema: str = "wide", ) -> List[str]: """ - Scan the database for ALL unique sensor names within the time range. - - For ``schema="wide"`` (default), uses an instant ``information_schema.columns`` - metadata lookup (no data scan, no adaptive bisection, ignores time range and - chunk params). - - For ``schema="narrow"`` (legacy EAV, deprecated), uses adaptive chunking with - parallel execution to handle server resource limits efficiently. - - Args: - start_time: Start of scan range (narrow schema only). - end_time: End of scan range (narrow schema only). - chunk_size_days: Days per chunk (default 7, narrow schema only). - client: Ignored (kept for backward compatibility). - show_progress: Show progress bar (default True, narrow schema only). - schema: "wide" (default, one column per signal) or "narrow" (legacy EAV, deprecated). - - Returns: - Sorted list of unique sensor name strings. + Scan the database for ALL unique sensor names via information_schema metadata. + + In TimescaleDB, the wide format is permanent and we can securely retrieve + the list of sensors without touching billions of data rows natively! """ - db_schema = config.INFLUX_SCHEMA or "iox" - table = config.INFLUX_TABLE or config.INFLUX_DB - - if schema == "wide": - # Instant metadata lookup — no data scan needed - cli = get_influx_client() - sql = ( - f"SELECT column_name FROM information_schema.columns " - f"WHERE table_schema = '{db_schema}' AND table_name = '{table}'" - ) - result = cli.query(query=sql) - if result.num_rows == 0: - return [] - col = result.column("column_name") - return sorted( - v.as_py() - for v in col - if v.as_py() is not None and v.as_py() not in NON_SIGNAL_COLS + if schema != "wide": + warnings.warn( + "schema='narrow' logic is depreciated and unsupported for TimescaleDB.", + DeprecationWarning, + stacklevel=2, ) - # --- narrow (legacy EAV) path — deprecated, wide schema is now standard --- - warnings.warn( - "schema='narrow' is deprecated and will be removed in a future release. " - "WFR has moved to wide schema — use schema='wide' (now the default).", - DeprecationWarning, - stacklevel=2, - ) - - def _make_client(): - return get_influx_client() - - def _query_distinct( - client: InfluxDBClient3, t0: datetime, t1: datetime, - ) -> List[str]: - table_ref = quote_table(db_schema, table) - sql = f""" - SELECT DISTINCT "signalName" - FROM {table_ref} - WHERE time >= '{t0.isoformat()}Z' - AND time < '{t1.isoformat()}Z' - """ - tbl = client.query(query=sql) - if tbl.num_rows == 0: - return [] - col = tbl.column("signalName") - return [v.as_py() for v in col if v.as_py() is not None] - - def _process_chunk( - client: InfluxDBClient3, t0: datetime, t1: datetime, - ) -> List[str]: - return adaptive_query( - client=client, - t0=t0, - t1=t1, - primary_fn=_query_distinct, - fallback_fn=None, - min_span=timedelta(seconds=10), - max_depth=5, - ) - - # Build chunk list - chunks = [] - cur = start_time - while cur < end_time: - nxt = min(cur + timedelta(days=chunk_size_days), end_time) - if nxt <= cur: - break - chunks.append((cur, nxt)) - cur = nxt - - pbar = tqdm( - total=len(chunks), - desc="Discovering sensors", - unit="chunk", - disable=not show_progress, - ) - pbar_lock = threading.Lock() - - def on_chunk_done(idx: int) -> None: - with pbar_lock: - pbar.update(1) + table = config.POSTGRES_TABLE + # Strip quotes internally to check pure schema names just in case + raw_table = table.strip('"') + engine = get_db_engine() + sql = f""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = '{raw_table}' + """ + try: - all_names = run_chunks_parallel( - client_factory=_make_client, - chunks=chunks, - query_fn=_process_chunk, - max_workers=4, - on_chunk_done=on_chunk_done, + df = pd.read_sql(sql, con=engine) + if df.empty: + return [] + + columns = df['column_name'].tolist() + return sorted( + c for c in columns + if c not in NON_SIGNAL_COLS ) - except PermanentQueryError as e: - raise RuntimeError(f"Sensor discovery aborted: {e}") from e - finally: - pbar.close() - - unique = sorted(set(all_names)) - return unique + except Exception as e: + print(f"Error discovering sensors: {e}") + return [] diff --git a/src/slicks/fetcher.py b/src/slicks/fetcher.py index a335ab7..509f66e 100644 --- a/src/slicks/fetcher.py +++ b/src/slicks/fetcher.py @@ -3,126 +3,25 @@ from datetime import datetime, timedelta from typing import Any, List, Optional import pandas as pd -from influxdb_client_3 import InfluxDBClient3 +from sqlalchemy import create_engine +import psycopg2 + from . import config -from .query_utils import quote_table, adaptive_query, run_chunks_parallel +from .query_utils import quote_table from .movement_detector import filter_data_in_movement +# SQLAlchemy engine cache +_ENGINE = None -class _Scalar: - """Wraps a Python value to mimic a pyarrow scalar (.as_py() interface).""" - __slots__ = ("_v",) - - def __init__(self, v): - self._v = v - - def as_py(self): - """Return the wrapped Python value.""" - return self._v - - -class _ArrowLike: - """ - Wraps a pandas DataFrame to expose the minimal pyarrow Table interface - used by this codebase (.num_rows, .column(name)). - - Allows HttpInfluxClient.query() to be a drop-in for InfluxDBClient3.query() - in callers that iterate over result columns with .as_py(). - """ - - def __init__(self, df: pd.DataFrame): - self._df = df - - @property - def num_rows(self) -> int: - """Number of rows in the result.""" - return len(self._df) - - def column(self, name: str) -> List[_Scalar]: - """Return a column as a list of _Scalar values with .as_py().""" - if name not in self._df.columns: - return [] - return [_Scalar(v) for v in self._df[name]] - - -class HttpInfluxClient: - """ - HTTP-based InfluxDB 3 client using /api/v3/query_sql with Parquet responses. - - Drop-in replacement for InfluxDBClient3 for environments where gRPC/Arrow Flight - is blocked (e.g. HTTPS Cloudflare tunnels). Implements the same .query() interface. - - Auto-selected by get_influx_client() when the host URL starts with https://. +def get_db_engine(): """ - - def __init__(self, host: str, token: str, database: str): - self._host = host.rstrip("/") - self._token = token - self._database = database - - def query(self, query: str, mode: str = None, database: str = None, **kwargs) -> Any: - import httpx - - db = database or self._database - resp = httpx.post( - f"{self._host}/api/v3/query_sql", - headers={ - "Authorization": f"Token {self._token}", - "Accept": "application/json", - "Content-Type": "application/json", - }, - json={"db": db, "q": query}, - timeout=120.0, - ) - if not resp.is_success: - try: - err = resp.json().get("error", resp.text) - except Exception: - err = resp.text - raise Exception(f"InfluxDB HTTP query error [{resp.status_code}]: {err}") - - rows = resp.json() if resp.content else [] - df = pd.DataFrame(rows) if rows else pd.DataFrame() - - if mode == "pandas": - return df - return _ArrowLike(df) - - def close(self): - """No-op — kept for interface compatibility with InfluxDBClient3.""" - pass - - def __enter__(self): - """Context manager entry.""" - return self - - def __exit__(self, *_): - """Context manager exit.""" - pass - - -def get_influx_client(url=None, token=None, org=None, db=None): - """ - Returns an InfluxDB client appropriate for the configured host. - - - https:// hosts → HttpInfluxClient (REST /api/v3/query_sql, Parquet) - Used for remote Cloudflare-tunnelled servers where gRPC/Flight is blocked. - - http:// hosts → InfluxDBClient3 (Arrow Flight SQL / gRPC) - Used for local Docker stacks with direct access. + Returns a SQLAlchemy engine configured for the TimescaleDB host. + Utilizes PostgreSQL connection pooling. """ - host = url or config.INFLUX_URL - tok = token or config.INFLUX_TOKEN - database = db or config.INFLUX_DB - - if host.startswith("https://"): - return HttpInfluxClient(host=host, token=tok, database=database) - - return InfluxDBClient3( - host=host, - token=tok, - org=org or config.INFLUX_ORG, - database=database, - ) + global _ENGINE + if _ENGINE is None: + _ENGINE = create_engine(config.POSTGRES_DSN) + return _ENGINE def list_target_sensors(): @@ -132,25 +31,13 @@ def list_target_sensors(): return config.SIGNALS -def fetch_telemetry(start_time, end_time, signals=None, client=None, filter_movement=True, resample="1s", schema="wide"): - """ - Fetch telemetry data for specified signals within a time range. +def fetch_telemetry(start_time, end_time, signals=None, engine=None, filter_movement=True, resample="1s", schema="wide"): + if schema != "wide": + raise ValueError("slicks ONLY supports schema='wide' for TimescaleDB. 'narrow' is deprecated and unsupported.") - Args: - start_time (datetime): Start of the query range. - end_time (datetime): End of the query range. - signals (list or str, optional): List of sensor names or a single sensor name. - Defaults to config.SIGNALS if None. - client (InfluxDBClient3, optional): Existing client instance. - filter_movement (bool): If True, applies movement detection filtering. Defaults to True. - resample (str or None): Pandas frequency string for resampling (e.g. "1s", "100ms", "5s"). - Set to None to disable resampling and get raw data. Defaults to "1s". - schema (str): "wide" (default, one field per signal) or "narrow" (legacy EAV, deprecated). - """ if signals is None: signals = config.SIGNALS - # Handle single string input for convenience if isinstance(signals, str): signals = [signals] @@ -158,129 +45,54 @@ def fetch_telemetry(start_time, end_time, signals=None, client=None, filter_move print("Error: No signals specified for fetching.") return None - if client is None: - client = get_influx_client() + if engine is None: + engine = get_db_engine() - # Ensure safe defaults if config vars are missing or empty - db_schema = config.INFLUX_SCHEMA or "iox" - table = config.INFLUX_TABLE or config.INFLUX_DB - table_ref = quote_table(db_schema, table) - - if schema == "wide": - # Wide format: each signal is its own column — SELECT directly, no pivot needed - signal_cols = ", ".join(f'"{s}"' for s in signals) - query = ( - f"SELECT time, {signal_cols} " - f"FROM {table_ref} " - f"WHERE time >= '{start_time.isoformat()}Z' " - f"AND time < '{end_time.isoformat()}Z' " - f"ORDER BY time ASC" - ) - print(f"Executing wide query for range: {start_time} to {end_time}...") - try: - df = client.query(query=query, mode="pandas") - if df.empty: - print("No data found for this range.") - return None - df = df.set_index("time") - if resample: - df = df.resample(resample).mean().dropna(how="all") - if filter_movement: - df = filter_data_in_movement(df) - print(f"Fetched {len(df)} rows{' (filtered)' if filter_movement else ''}.") - return df - except Exception as e: - print(f"Error fetching data: {e}") - return None + table = config.POSTGRES_TABLE - # --- narrow (legacy EAV) path — deprecated, wide schema is now standard --- - warnings.warn( - "schema='narrow' is deprecated and will be removed in a future release. " - "WFR has moved to wide schema — use schema='wide' (now the default).", - DeprecationWarning, - stacklevel=2, + signal_cols = ", ".join(f'"{s}"' for s in signals) + query = ( + f"SELECT time, {signal_cols} " + f"FROM {table} " + f"WHERE time >= '{start_time.strftime('%Y-%m-%d %H:%M:%S.%f%z')}' " + f"AND time < '{end_time.strftime('%Y-%m-%d %H:%M:%S.%f%z')}' " + f"ORDER BY time ASC" ) - signal_list = "', '".join(signals) - - query = f""" - SELECT - time, - "signalName", - "sensorReading" - FROM {table_ref} - WHERE - "signalName" IN ('{signal_list}') - AND time >= '{start_time.isoformat()}Z' - AND time < '{end_time.isoformat()}Z' - ORDER BY time ASC - """ - - print(f"Executing query for range: {start_time} to {end_time}...") + + print(f"Executing TimescaleDB query for range: {start_time} to {end_time}...") try: - table = client.query(query=query, mode="pandas") - if table.empty: + df = pd.read_sql(query, con=engine) + if df.empty: print("No data found for this range.") return None - - # Pivot the data - df = table.pivot_table( - index="time", - columns="signalName", - values="sensorReading", - aggfunc='mean' - ) - - # Resample to common frequency (if specified) + + df['time'] = pd.to_datetime(df['time'], utc=True) + df = df.set_index("time") + if resample: - df = df.resample(resample).mean().dropna() - - # Use the movement detector tool to filter + df = df.resample(resample).mean().dropna(how="all") if filter_movement: df = filter_data_in_movement(df) - print(f"Fetched {len(df)} rows{' (filtered)' if filter_movement else ''}.") return df - except Exception as e: print(f"Error fetching data: {e}") return None + def fetch_telemetry_chunked( start_time: datetime, end_time: datetime, signals=None, - client=None, + engine=None, filter_movement: bool = True, resample: Optional[str] = "1s", chunk_size: timedelta = timedelta(hours=6), - max_workers: int = 1, show_progress: bool = True, schema: str = "wide", -) -> Optional[pd.DataFrame]: +): """ - Fetch telemetry with automatic time-splitting when InfluxDB's per-query - file limit is exceeded. - - Identical interface to ``fetch_telemetry`` but uses ``adaptive_query`` - internally: if a time window hits the server's parquet-file cap the range - is recursively halved until each sub-query succeeds, then results are - concatenated. Suitable for ranges that span many test sessions. - - Args: - start_time: Start of the query range. - end_time: End of the query range. - signals: Sensor names (defaults to config.SIGNALS). - client: Existing InfluxDBClient3 instance (creates one if None). - filter_movement: Apply movement-detection filtering to the final result. - resample: Pandas frequency string, e.g. "1s", "100ms", or None for raw. - chunk_size: Initial time window per adaptive-query call. Each chunk is - split further on file-limit errors. Default: 6 hours. - max_workers: Parallel workers for top-level chunks (1 = sequential). - show_progress: Print progress messages. - schema: "wide" (default, one field per signal) or "narrow" (legacy EAV, deprecated). - - Returns: - Combined DataFrame with DatetimeIndex, or None if no data found. + Fetch telemetry with time-splitting limits for RAM preservation. """ if signals is None: signals = config.SIGNALS @@ -289,57 +101,9 @@ def fetch_telemetry_chunked( if not signals: return None - if client is None: - client = get_influx_client() - - db_schema = config.INFLUX_SCHEMA or "iox" - table = config.INFLUX_TABLE or config.INFLUX_DB - table_ref = quote_table(db_schema, table) - - def _fmt(dt: datetime) -> str: - """Format datetime as UTC ISO string for SQL, safe for both naive and tz-aware.""" - return dt.strftime("%Y-%m-%dT%H:%M:%S") + "Z" - - if schema == "wide": - signal_cols = ", ".join(f'"{s}"' for s in signals) + if engine is None: + engine = get_db_engine() - def _fetch_chunk(cli: InfluxDBClient3, t0: datetime, t1: datetime) -> List[pd.DataFrame]: - """Fetch one wide time window; return list-of-DataFrame for adaptive_query.""" - query = ( - f"SELECT time, {signal_cols} " - f"FROM {table_ref} " - f"WHERE time >= '{_fmt(t0)}' AND time < '{_fmt(t1)}' " - f"ORDER BY time ASC" - ) - raw = cli.query(query=query, mode="pandas") - if raw.empty: - return [] - df = raw.set_index("time") - return [df] - else: - signal_list = "', '".join(signals) - - def _fetch_chunk(cli: InfluxDBClient3, t0: datetime, t1: datetime) -> List[pd.DataFrame]: - """Fetch one narrow time window; return list-of-DataFrame for adaptive_query.""" - query = ( - f"SELECT time, \"signalName\", \"sensorReading\" " - f"FROM {table_ref} " - f"WHERE \"signalName\" IN ('{signal_list}') " - f"AND time >= '{_fmt(t0)}' AND time < '{_fmt(t1)}' " - f"ORDER BY time ASC" - ) - raw = cli.query(query=query, mode="pandas") - if raw.empty: - return [] - df = raw.pivot_table( - index="time", - columns="signalName", - values="sensorReading", - aggfunc="mean", - ) - return [df] - - # Split full range into top-level chunks, then use adaptive_query per chunk chunks: List[tuple] = [] t = start_time while t < end_time: @@ -351,28 +115,18 @@ def _fetch_chunk(cli: InfluxDBClient3, t0: datetime, t1: datetime) -> List[pd.Da all_dfs: List[pd.DataFrame] = [] - def _fetch_adaptive(cli: InfluxDBClient3, t0: datetime, t1: datetime) -> List[pd.DataFrame]: - return adaptive_query( - client=cli, - t0=t0, - t1=t1, - primary_fn=_fetch_chunk, - min_span=timedelta(minutes=1), - ) - - if max_workers > 1: - all_dfs = run_chunks_parallel( - client_factory=get_influx_client, - chunks=chunks, - query_fn=_fetch_adaptive, - max_workers=max_workers, + for i, (t0, t1) in enumerate(chunks): + if show_progress: + print(f" chunk {i + 1}/{len(chunks)}: {t0} → {t1}") + + # We recursively call `fetch_telemetry` but WITHOUT resampling/filtering yet + # so we can apply them cleanly after concatenation to avoid boundary artifacts. + chunk_df = fetch_telemetry( + t0, t1, signals=signals, engine=engine, + filter_movement=False, resample=None, schema=schema ) - else: - for i, (t0, t1) in enumerate(chunks): - if show_progress: - print(f" chunk {i + 1}/{len(chunks)}: {t0} → {t1}") - results = _fetch_adaptive(client, t0, t1) - all_dfs.extend(results) + if chunk_df is not None and not chunk_df.empty: + all_dfs.append(chunk_df) if not all_dfs: if show_progress: @@ -402,9 +156,8 @@ def bulk_fetch_season(start_date, end_date, output_file="telemetry_season.csv"): first_write = not os.path.exists(output_file) if not output_file else True total_rows = 0 - client = get_influx_client() + engine = get_db_engine() - # Ensure directory exists if output_file and os.path.dirname(output_file): os.makedirs(os.path.dirname(output_file), exist_ok=True) @@ -412,12 +165,11 @@ def bulk_fetch_season(start_date, end_date, output_file="telemetry_season.csv"): next_day = current + timedelta(days=1) print(f"Fetching {current.date()}...") - df = fetch_telemetry(current, next_day, client=client) + df = fetch_telemetry(current, next_day, engine=engine) if df is not None and not df.empty: mode = 'w' if first_write else 'a' header = first_write - df.to_csv(output_file, mode=mode, header=header) rows = len(df) diff --git a/src/slicks/query_utils.py b/src/slicks/query_utils.py index ecc65a5..aabfe0a 100644 --- a/src/slicks/query_utils.py +++ b/src/slicks/query_utils.py @@ -1,20 +1,14 @@ """ -Shared utilities for adaptive chunked querying against InfluxDB 3.x (IOx). +Shared utilities for querying against TimescaleDB. Provides: - Error classification (recoverable vs permanent) -- Parallel chunk execution via ThreadPoolExecutor -- Adaptive recursive splitting on resource-limit failures +- Quoting utilities """ from __future__ import annotations -import threading -from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime, timedelta -from typing import Callable, List, Optional, Sequence, Tuple, TypeVar - -from influxdb_client_3 import InfluxDBClient3 +from typing import TypeVar T = TypeVar("T") @@ -23,170 +17,29 @@ # --------------------------------------------------------------------------- _PERMANENT_ERROR_PATTERNS = ( - "table not found", - "not found", - "unauthorized", - "unauthenticated", + "relation does not exist", + "password authentication failed", "permission denied", - "invalid token", - "database not found", - "bucket not found", + "database does not exist", "syntax error", ) - def quote_table(schema: str, table: str) -> str: """Quote table name for SQL, handling schema.table format.""" - # If the table already contains a dot (and isn't just the schema), assume it might differ? - # Actually, InfluxDB 3 usually expects "schema"."table" - return f'"{schema}"."{table}"' - + # Since Timescale natively respects dot notation, we don't strictly *need* to quote + # if it's already "schema.table", but PostgreSQL standard is strictly `schema`.`table` or just `table` + if schema: + return f'"{schema}"."{table}"' + return f'"{table}"' class PermanentQueryError(Exception): """An error that will not resolve by splitting the time range.""" - def is_permanent_error(exc: Exception) -> bool: """Classify an exception as permanent (non-retryable) vs recoverable.""" msg = str(exc).lower() return any(pattern in msg for pattern in _PERMANENT_ERROR_PATTERNS) - - -# --------------------------------------------------------------------------- -# Adaptive recursive query -# --------------------------------------------------------------------------- - -def adaptive_query( - client: InfluxDBClient3, - t0: datetime, - t1: datetime, - primary_fn: Callable[[InfluxDBClient3, datetime, datetime], List[T]], - fallback_fn: Optional[Callable[[InfluxDBClient3, datetime, datetime], List[T]]] = None, - min_span: Optional[timedelta] = None, - max_depth: int = 10, - _depth: int = 0, -) -> List[T]: - """ - Execute *primary_fn* on [t0, t1). On a recoverable failure the range is - split in half and each half is retried recursively. - - When the remaining span is smaller than *min_span* (or *max_depth* is - reached) *fallback_fn* is used instead — if provided — otherwise an empty - list is returned. - - Raises ``PermanentQueryError`` immediately for non-retryable errors such - as authentication failures or missing tables. - """ - if min_span and (t1 - t0) <= min_span: - if fallback_fn: - return fallback_fn(client, t0, t1) - return [] - - if _depth > max_depth: - if fallback_fn: - return fallback_fn(client, t0, t1) - return [] - - try: - return primary_fn(client, t0, t1) - except Exception as exc: - if is_permanent_error(exc): - raise PermanentQueryError(str(exc)) from exc - - mid = t0 + (t1 - t0) / 2 - if mid <= t0 or mid >= t1: - if fallback_fn: - return fallback_fn(client, t0, t1) - return [] - - left = adaptive_query( - client, t0, mid, primary_fn, fallback_fn, - min_span, max_depth, _depth + 1, - ) - right = adaptive_query( - client, mid, t1, primary_fn, fallback_fn, - min_span, max_depth, _depth + 1, - ) - return left + right - - -# --------------------------------------------------------------------------- -# Parallel chunk execution -# --------------------------------------------------------------------------- - -def run_chunks_parallel( - client_factory: Callable[[], InfluxDBClient3], - chunks: Sequence[Tuple[datetime, datetime]], - query_fn: Callable[[InfluxDBClient3, datetime, datetime], List[T]], - max_workers: int = 4, - on_chunk_done: Optional[Callable[[int], None]] = None, -) -> List[T]: - """ - Execute *query_fn* across time-range *chunks* in parallel. - - Each worker thread receives its own ``InfluxDBClient3`` instance - (via *client_factory*) because the client is not guaranteed thread-safe. - - Results are returned in chunk order regardless of completion order. - - Raises ``PermanentQueryError`` immediately, cancelling remaining work. - """ - if not chunks: - return [] - - # Sequential path — avoids nested ThreadPoolExecutor when called from inside - # another executor thread (e.g. asyncio run_in_executor). Creating gRPC clients - # inside nested thread pools causes "bad value(s) in fds_to_keep" on macOS. - if max_workers == 1: - ordered: List[T] = [] - for idx, (t0, t1) in enumerate(chunks): - client = client_factory() - try: - ordered.extend(query_fn(client, t0, t1)) - except PermanentQueryError: - raise - finally: - try: - client.close() - except Exception: - pass - if on_chunk_done: - on_chunk_done(idx) - return ordered - - results: dict[int, List[T]] = {} - lock = threading.Lock() - - with ThreadPoolExecutor(max_workers=max_workers) as pool: - future_to_idx: dict = {} - clients: list[InfluxDBClient3] = [] - - for idx, (t0, t1) in enumerate(chunks): - client = client_factory() - clients.append(client) - future = pool.submit(query_fn, client, t0, t1) - future_to_idx[future] = idx - - try: - for future in as_completed(future_to_idx): - idx = future_to_idx[future] - result = future.result() # raises on exception - with lock: - results[idx] = result - if on_chunk_done: - on_chunk_done(idx) - except PermanentQueryError: - for f in future_to_idx: - f.cancel() - raise - finally: - for c in clients: - try: - c.close() - except Exception: - pass - - ordered: List[T] = [] - for idx in sorted(results.keys()): - ordered.extend(results[idx]) - return ordered + +# We strip parallel chunk execution here largely because SQLAlchemy and Pandas +# handle execution threading natively behind engines, and TimescaleDB +# doesn't suffer the restrictive Parquet file size cap that forced chunk bisection. diff --git a/src/slicks/scanner.py b/src/slicks/scanner.py index 9465fc1..91f67f2 100644 --- a/src/slicks/scanner.py +++ b/src/slicks/scanner.py @@ -1,35 +1,25 @@ """ -Scanner module for discovering data availability windows. +Scanner module for discovering data availability windows in TimescaleDB. Provides an interactive way to browse what time ranges have telemetry data. """ from __future__ import annotations -import threading +import logging from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Dict, Iterable, List, Optional, Sequence, Tuple +import pandas as pd -from influxdb_client_3 import InfluxDBClient3 -from tqdm.auto import tqdm from zoneinfo import ZoneInfo from . import config -from .query_utils import adaptive_query, run_chunks_parallel, PermanentQueryError +from .fetcher import get_db_engine UTC = timezone.utc - -def _quote_table(table: str) -> str: - """Quote table name for SQL, handling schema.table format.""" - parts = table.split(".", 1) - if len(parts) == 2: - return f'"{parts[0]}"."{parts[1]}"' - return f'"{table}"' - - @dataclass class TimeWindow: """A contiguous time window with data.""" @@ -54,38 +44,30 @@ def to_dict(self) -> dict: class ScanResult: """ Holds scan results with environment-aware display. - - - In Jupyter: displays as collapsible HTML sections (Month → Day → Windows) - - In terminal: displays as formatted text tree - - Programmatic: use .to_dict() or .to_dataframe() - - Visualization: use .calendar_view() for GitHub-style heatmap """ def __init__(self, data: Dict[str, List[TimeWindow]], timezone_name: str): - self._data = data # {"2025-01-15": [TimeWindow, ...], ...} + self._data = data self._timezone = timezone_name def __repr__(self) -> str: - """Terminal/script display - formatted text tree.""" if not self._data: return "No data found in the specified time range." lines = [f"Data Availability ({self._timezone})", "=" * 40] total_rows = 0 - # Group by month months: Dict[str, Dict[str, List[TimeWindow]]] = defaultdict(dict) for day in sorted(self._data.keys()): - month_key = day[:7] # "2025-01" + month_key = day[:7] months[month_key][day] = self._data[day] + from datetime import datetime as dt for month in sorted(months.keys()): days_in_month = months[month] month_rows = sum(w.row_count for d in days_in_month.values() for w in d) total_rows += month_rows - # Parse month for display - from datetime import datetime as dt month_name = dt.strptime(month, "%Y-%m").strftime("%B %Y") lines.append(f"\n📆 {month_name} ({len(days_in_month)} days, {month_rows:,} rows)") @@ -102,219 +84,15 @@ def __repr__(self) -> str: lines.append(f"\n{'=' * 40}") lines.append(f"Total: {len(self._data)} days, {total_rows:,} rows") + lines.append(f"Total: {len(self._data)} days, {total_rows:,} rows") return "\n".join(lines) - - def _repr_html_(self) -> str: - """Jupyter display - nested collapsible: Month → Day → Windows.""" - if not self._data: - return "

No data found in the specified time range.

" - - total_rows = sum(w.row_count for windows in self._data.values() for w in windows) - - # Group by month - months: Dict[str, Dict[str, List[TimeWindow]]] = defaultdict(dict) - for day in sorted(self._data.keys()): - month_key = day[:7] # "2025-01" - months[month_key][day] = self._data[day] - - html_parts = [ - "
", - f"

📊 Data Availability ({self._timezone})

", - f"

{len(self._data)} days with data, {total_rows:,} total rows

", - ] - - from datetime import datetime as dt - - for month in sorted(months.keys()): - days_in_month = months[month] - month_rows = sum(w.row_count for d in days_in_month.values() for w in d) - month_name = dt.strptime(month, "%Y-%m").strftime("%B %Y") - - # Build day sections - day_sections = [] - for day in sorted(days_in_month.keys()): - windows = days_in_month[day] - day_rows = sum(w.row_count for w in windows) - day_display = dt.strptime(day, "%Y-%m-%d").strftime("%a %d") - - window_items = [] - for w in windows: - start_time = w.start_local.strftime("%H:%M") - end_time = w.end_local.strftime("%H:%M") - duration = (w.end_utc - w.start_utc).total_seconds() / 3600 - window_items.append( - f"
  • " - f"{start_time} → " - f"{end_time} " - f"({duration:.1f}h, {w.row_count:,} rows)
  • " - ) - - day_sections.append( - f"
    " - f"" - f"📅 {day_display} " - f"({len(windows)} window{'s' if len(windows) != 1 else ''}, {day_rows:,} rows)" - f"" - f"
      {''.join(window_items)}
    " - f"
    " - ) - - html_parts.append( - f"
    " - f"" - f"📆 {month_name} " - f"({len(days_in_month)} days, {month_rows:,} rows)" - f"" - f"
    {''.join(day_sections)}
    " - f"
    " - ) - - html_parts.append("
    ") - return "".join(html_parts) - - def calendar_view(self, year: Optional[int] = None): - """ - Display a GitHub-style calendar heatmap. - Darker colors = more data that day. - - Args: - year: Year to display (auto-detected if None) - - Returns: - matplotlib Figure (displays inline in Jupyter) - """ - import matplotlib.pyplot as plt - import matplotlib.patches as mpatches - import numpy as np - from datetime import datetime as dt - import calendar - # Aggregate row counts per day - day_counts = {} - for day, windows in self._data.items(): - day_counts[day] = sum(w.row_count for w in windows) - - if not day_counts: - print("No data to display.") - return None - - # Auto-detect year from data - if year is None: - years = set(d[:4] for d in day_counts.keys()) - year = int(max(years)) # Use most recent year - - # Create figure - one row per month - fig, axes = plt.subplots(4, 3, figsize=(14, 10)) - fig.suptitle(f"Data Availability Heatmap - {year} ({self._timezone})", - fontsize=14, fontweight='bold', y=0.98) - - # Color settings - max_count = max(day_counts.values()) if day_counts else 1 - - for month_idx in range(12): - ax = axes[month_idx // 3, month_idx % 3] - month = month_idx + 1 - month_name = calendar.month_abbr[month] - - # Get calendar for this month - cal = calendar.Calendar(firstweekday=6) # Sunday start - month_days = cal.monthdayscalendar(year, month) - - # Create heatmap grid - grid = np.zeros((len(month_days), 7)) - grid[:] = np.nan # NaN for empty cells - - for week_idx, week in enumerate(month_days): - for day_idx, day in enumerate(week): - if day == 0: - continue - date_str = f"{year}-{month:02d}-{day:02d}" - if date_str in day_counts: - # Normalize to 0-1 scale (log scale for better visibility) - count = day_counts[date_str] - grid[week_idx, day_idx] = np.log1p(count) / np.log1p(max_count) - else: - grid[week_idx, day_idx] = 0 - - # Plot heatmap - cmap = plt.cm.Greens - cmap.set_bad(color='white') - - im = ax.imshow(grid, cmap=cmap, aspect='equal', vmin=0, vmax=1) - - # Add day numbers - for week_idx, week in enumerate(month_days): - for day_idx, day in enumerate(week): - if day != 0: - date_str = f"{year}-{month:02d}-{day:02d}" - color = 'white' if date_str in day_counts and day_counts[date_str] > max_count * 0.3 else 'black' - ax.text(day_idx, week_idx, str(day), ha='center', va='center', - fontsize=7, color=color) - - ax.set_title(month_name, fontsize=11, fontweight='bold') - ax.set_xticks(range(7)) - ax.set_xticklabels(['S', 'M', 'T', 'W', 'T', 'F', 'S'], fontsize=8) - ax.set_yticks([]) - ax.set_xlim(-0.5, 6.5) - ax.set_ylim(len(month_days) - 0.5, -0.5) - - # Remove frame - for spine in ax.spines.values(): - spine.set_visible(False) - - plt.tight_layout(rect=[0, 0.02, 1, 0.96]) - - # Add legend - fig.text(0.5, 0.01, - f"Total: {len(self._data)} days with data | Darker = more data | Max: {max_count:,} rows/day", - ha='center', fontsize=10, style='italic') - - return fig - - def __iter__(self): - """Iterate over (day, windows) pairs.""" - for day in sorted(self._data.keys()): - yield day, self._data[day] - def __len__(self) -> int: - """Number of days with data.""" return len(self._data) - - def to_dict(self) -> Dict[str, List[dict]]: - """Export as nested dictionary.""" - return { - day: [w.to_dict() for w in windows] - for day, windows in self._data.items() - } - - def to_dataframe(self): - """Flatten to pandas DataFrame with one row per time window.""" - import pandas as pd - - rows = [] - for day, windows in self._data.items(): - for w in windows: - rows.append({ - "date": day, - "start_utc": w.start_utc, - "end_utc": w.end_utc, - "start_local": w.start_local, - "end_local": w.end_local, - "row_count": w.row_count, - "duration_hours": (w.end_utc - w.start_utc).total_seconds() / 3600, - }) - return pd.DataFrame(rows) - @property - def days(self) -> List[str]: - """List of dates with data.""" + def days(self) -> list[str]: return sorted(self._data.keys()) - - @property - def total_rows(self) -> int: - """Total row count across all windows.""" - return sum(w.row_count for windows in self._data.values() for w in windows) def scan_data_availability( @@ -325,35 +103,11 @@ def scan_data_availability( bin_size: str = "hour", include_counts: bool = True, show_progress: bool = True, - max_workers: int = 4, + max_workers: int = 4, # Kept for bw-compat ) -> ScanResult: """ - Scan the database for data availability windows. - - Args: - start: Start datetime (timezone-aware or naive UTC) - end: End datetime (timezone-aware or naive UTC) - timezone: Timezone for display (e.g., "America/Toronto", "UTC") - table: Table to scan (defaults to "iox.{INFLUX_DB}") - bin_size: Granularity for scanning - "hour" or "day" - include_counts: Whether to include row counts (slightly slower) - show_progress: Show progress bar (works in Jupyter and terminal) - - Returns: - ScanResult: Interactive result object grouped by day - - Example: - >>> import slicks - >>> slicks.connect_influxdb3(url="...", token="...", db="WFR25") - >>> result = slicks.scan_data_availability( - ... start=datetime(2025, 1, 1), - ... end=datetime(2025, 1, 31), - ... timezone="America/Toronto" - ... ) - >>> result # displays interactive view in Jupyter - >>> result.to_dataframe() # for programmatic access + Scan the database for data availability windows using TimescaleDB time_bucket. """ - # Ensure datetimes are UTC if start.tzinfo is None: start = start.replace(tzinfo=UTC) else: @@ -364,54 +118,50 @@ def scan_data_availability( else: end = end.astimezone(UTC) - # Setup timezone tz = ZoneInfo(timezone) + table_ref = table or config.POSTGRES_TABLE - # Default table if not provided - if not table: - schema = config.INFLUX_SCHEMA or "iox" - table_name = config.INFLUX_TABLE or config.INFLUX_DB - table = f"{schema}.{table_name}" - - # We still use _quote_table here because `table` might be passed in as "schema.table" - # or just "table" (legacy). But to be consistent with other modules, we should - # probably try to use the configured schema if the passed table doesn't have one? - # For now, let's keep the existing logic but respect the global config default above. - table_ref = _quote_table(table) - - # Determine bin settings interval = "1 day" if bin_size == "day" else "1 hour" step = timedelta(days=1) if bin_size == "day" else timedelta(hours=1) - # Calculate total chunks for progress bar - initial_chunk_days = 31 - total_chunks = ((end - start).days + initial_chunk_days - 1) // initial_chunk_days + engine = get_db_engine() + + # In PostgreSQL with TimescaleDB, time_bucket aggregates incredibly fast. + sql = f""" + SELECT + time_bucket('{interval}', time) AS bucket, + COUNT(*) AS n + FROM {table_ref} + WHERE time >= '{start.strftime('%Y-%m-%d %H:%M:%S%z')}' + AND time < '{end.strftime('%Y-%m-%d %H:%M:%S%z')}' + GROUP BY bucket + ORDER BY bucket ASC + """ - # Fetch bins with progress bar + if show_progress: + print(f"Scanning data from {start.date()} to {end.date()} by {bin_size}...") + try: - bins = list(_fetch_bins_adaptive( - start=start, - end=end, - table_ref=table_ref, - interval=interval, - step=step, - initial_chunk_days=initial_chunk_days, - show_progress=show_progress, - total_chunks=total_chunks, - max_workers=max_workers, - )) - except PermanentQueryError as e: - raise RuntimeError( - f"Scan aborted due to non-recoverable error: {e}" - ) from e - - if not bins: + df = pd.read_sql(sql, engine) + except Exception as e: + raise RuntimeError(f"Scan aborted due to error: {e}") from e + + if df.empty: return ScanResult({}, timezone) + + df['bucket'] = pd.to_datetime(df['bucket'], utc=True) - # Compress into windows + bins = [] + for _, row in df.iterrows(): + b = row['bucket'] + n = int(row['n']) + if n > 0: + bins.append((b, n)) + + if not bins: + return ScanResult({}, timezone) + windows = _compress_bins(bins, step) - - # Group by day with local timezone grouped: Dict[str, List[TimeWindow]] = defaultdict(list) for start_utc, end_utc, bins_cnt, rows_cnt in windows: @@ -432,127 +182,6 @@ def scan_data_availability( return ScanResult(dict(grouped), timezone) -def _fetch_bins_adaptive( - start: datetime, - end: datetime, - table_ref: str, - interval: str, - step: timedelta, - initial_chunk_days: int = 31, - show_progress: bool = True, - total_chunks: int = 1, - max_workers: int = 4, -) -> Iterable[Tuple[datetime, int]]: - """Iterate over bucket start times with counts using parallel adaptive chunking.""" - - def _make_client() -> InfluxDBClient3: - return InfluxDBClient3( - host=config.INFLUX_URL, - token=config.INFLUX_TOKEN, - database=config.INFLUX_DB, - ) - - def query_grouped_bins( - client: InfluxDBClient3, t0: datetime, t1: datetime, - ) -> List[Tuple[datetime, int]]: - sql = f""" - SELECT - DATE_BIN(INTERVAL '{interval}', time, TIMESTAMP '{t0.isoformat()}') AS bucket, - COUNT(*) AS n - FROM {table_ref} - WHERE time >= TIMESTAMP '{t0.isoformat()}' - AND time < TIMESTAMP '{t1.isoformat()}' - GROUP BY bucket - HAVING COUNT(*) > 0 - ORDER BY bucket - """ - tbl = client.query(sql) - rows: List[Tuple[datetime, int]] = [] - for i in range(tbl.num_rows): - bucket = tbl.column("bucket")[i].as_py() - n = tbl.column("n")[i].as_py() - if bucket.tzinfo is None: - bucket = bucket.replace(tzinfo=UTC) - else: - bucket = bucket.astimezone(UTC) - rows.append((bucket, int(n))) - return rows - - def query_exists_per_bin( - client: InfluxDBClient3, t0: datetime, t1: datetime, - ) -> List[Tuple[datetime, int]]: - cur = t0 - rows: List[Tuple[datetime, int]] = [] - while cur < t1: - nxt = min(cur + step, t1) - sql = f""" - SELECT 1 - FROM {table_ref} - WHERE time >= TIMESTAMP '{cur.isoformat()}' - AND time < TIMESTAMP '{nxt.isoformat()}' - LIMIT 1 - """ - try: - tbl = client.query(sql) - if tbl.num_rows > 0: - rows.append((cur, 1)) - except Exception: - pass - cur = nxt - return rows - - min_exists_span = step * 4 - - def process_chunk( - client: InfluxDBClient3, t0: datetime, t1: datetime, - ) -> List[Tuple[datetime, int]]: - """Process one top-level chunk using adaptive_query.""" - return adaptive_query( - client=client, - t0=t0, - t1=t1, - primary_fn=query_grouped_bins, - fallback_fn=query_exists_per_bin, - min_span=min_exists_span, - ) - - # Build chunk list - chunks: List[Tuple[datetime, datetime]] = [] - cur = start - while cur < end: - nxt = min(cur + timedelta(days=initial_chunk_days), end) - chunks.append((cur, nxt)) - cur = nxt - - # Progress bar - pbar = tqdm( - total=len(chunks), - desc="Scanning", - unit="chunk", - disable=not show_progress, - ) - pbar_lock = threading.Lock() - - def on_chunk_done(idx: int) -> None: - t0, t1 = chunks[idx] - with pbar_lock: - pbar.set_postfix_str(f"{t0.strftime('%b %d')} - {t1.strftime('%b %d')}") - pbar.update(1) - - try: - results = run_chunks_parallel( - client_factory=_make_client, - chunks=chunks, - query_fn=process_chunk, - max_workers=max_workers, - on_chunk_done=on_chunk_done, - ) - finally: - pbar.close() - - yield from results - - def _compress_bins( pairs: Sequence[Tuple[datetime, int]], step: timedelta diff --git a/src/slicks/writer.py b/src/slicks/writer.py deleted file mode 100644 index dd4e6d2..0000000 --- a/src/slicks/writer.py +++ /dev/null @@ -1,130 +0,0 @@ -"""Wide-format InfluxDB writer — one point per CAN message, all signals as fields.""" -from __future__ import annotations - -import logging -from pathlib import Path -from typing import Optional - -from influxdb_client import InfluxDBClient, WriteOptions - -from .can_decode import DecodedFrame, decode_frame, load_dbc - -logger = logging.getLogger(__name__) - -#: Metadata columns present in wide tables — not telemetry signals. -NON_SIGNAL_COLS: frozenset[str] = frozenset({"time", "messageName", "canId", "iox::measurement"}) - -_LP_ESCAPE = str.maketrans({" ": r"\ ", ",": r"\,", "=": r"\="}) - - -def _esc(val: str) -> str: - return val.translate(_LP_ESCAPE) - - -def frame_to_line_protocol( - measurement: str, - frame: DecodedFrame, - ts_ns: int, - include_tags: bool = True, -) -> str: - """ - Convert a DecodedFrame to a wide InfluxDB line protocol string. - - Format:: - - measurement[,messageName=X,canId=Y] sig1=v1,sig2=v2 timestamp_ns - - Example:: - - WFR26,messageName=BMS_Status,canId=512 PackCurrent=-3264.0,SOC=85.0 1700000000000000000 - - Raises ValueError if the frame has no numeric signals. - """ - if not frame.signals: - raise ValueError(f"Frame {frame.message_name} has no numeric signals") - - fields = ",".join(f"{_esc(k)}={v}" for k, v in frame.signals.items()) - tags = ( - f",messageName={_esc(frame.message_name)},canId={frame.can_id}" - if include_tags - else "" - ) - return f"{_esc(measurement)}{tags} {fields} {ts_ns}" - - -class WideWriter: - """ - Decode CAN frames and write wide-format points to InfluxDB. - - Each CAN message produces one line protocol point — all decoded signals - become fields, with ``messageName`` and ``canId`` as optional provenance tags. - Uses ``influxdb_client`` v2-compat WriteApi for batching. - """ - - def __init__( - self, - url: str, - token: str, - bucket: str, - measurement: Optional[str] = None, - org: str = "", - batch_size: int = 5000, - flush_interval_ms: int = 1000, - dbc_path: Optional[Path] = None, - ) -> None: - self._bucket = bucket - self._org = org - self._measurement = measurement or bucket - self._db = load_dbc(dbc_path) - logger.info("WideWriter: loaded DBC (%d messages)", len(self._db.messages)) - - self._client = InfluxDBClient(url=url, token=token or None, org=org) - self._write_api = self._client.write_api( - write_options=WriteOptions( - batch_size=batch_size, - flush_interval=flush_interval_ms, - jitter_interval=500, - retry_interval=5_000, - ) - ) - logger.info( - "WideWriter: → %s bucket=%s measurement=%s batch=%d", - url, bucket, self._measurement, batch_size, - ) - - def decode_and_queue(self, can_id: int, data: bytes, ts_ns: int) -> int: - """ - Decode a CAN frame and queue its wide point for writing. - - Returns 1 if a point was queued, 0 if the CAN ID is not in the DBC - or the frame contains no numeric signals. - """ - frame = decode_frame(self._db, can_id, data) - if frame is None or not frame.signals: - return 0 - try: - line = frame_to_line_protocol(self._measurement, frame, ts_ns) - except ValueError: - return 0 - self._write_api.write(bucket=self._bucket, org=self._org, record=line) - return 1 - - def write_lines(self, lines: list[str]) -> None: - """Write pre-formatted line protocol strings directly (bypass DBC decode).""" - if lines: - self._write_api.write(bucket=self._bucket, org=self._org, record=lines) - - def flush(self) -> None: - """Flush any pending batched writes.""" - self._write_api.flush() - - def close(self) -> None: - """Flush pending writes and close the underlying InfluxDB client.""" - self._write_api.close() - self._client.close() - - def __enter__(self) -> "WideWriter": - return self - - def __exit__(self, *_: object) -> None: - self.close() From 6a57eed082bae32d05cf0e8cfe9c6f36f99633af Mon Sep 17 00:00:00 2001 From: "WFR DAQ Server (ovh)" Date: Sun, 12 Apr 2026 22:39:23 +0000 Subject: [PATCH 2/3] docs: Update docs and examples to reference TimescaleDB and SQLAlchemy --- README.md | 6 +++--- docs/advanced_usage.md | 14 +++++++------- docs/api_reference.md | 16 ++++++++-------- docs/getting_started.md | 10 +++++----- docs/index.md | 4 ++-- docs/scanner.md | 10 +++++----- examples/end_to_end.py | 8 ++++---- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 7016efc..b619214 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,8 @@ The home baked data pipeline for **Western Formula Racing**. This package handles: -1. **Data Ingestion:** Reliable fetching from InfluxDB 3.0 in wide (columnar) or narrow (legacy EAV) format. -2. **Data Writing:** `WideWriter` encodes CAN frames directly to InfluxDB wide format line protocol. +1. **Data Ingestion:** Reliable fetching from TimescaleDB 3.0 in wide (columnar) or narrow (legacy EAV) format. +2. **Data Writing:** `WideWriter` encodes CAN frames directly to TimescaleDB wide format line protocol. 3. **Movement Detection:** Smart filtering of "Moving" vs "Idle" car states. 4. **Sensor Discovery:** Tools to explore available sensors on any given race day. @@ -34,7 +34,7 @@ import slicks from datetime import datetime # 1. Connect (auto-configured from env vars or explicit) -slicks.connect_influxdb3(db="WFR26") +slicks.connect_timescaledb(table="WFR26") # 2. Fetch Data — wide format (columnar, preferred) df = slicks.fetch_telemetry( diff --git a/docs/advanced_usage.md b/docs/advanced_usage.md index a8b0ffe..fd9d4e0 100644 --- a/docs/advanced_usage.md +++ b/docs/advanced_usage.md @@ -42,17 +42,17 @@ You often need to switch between `Development`, `Testing`, and `Production` data ### Option A: Environment Variables (Best for CI/CD) Set these in your shell or `.env` file before running python: ```bash -export INFLUX_URL="http://production-server:8086" -export INFLUX_DB="Season2026_Final" +export POSTGRES_DSN="http://production-server:8086" +export POSTGRES_TABLE="Season2026_Final" ``` ### Option B: Runtime Configuration (Best for Scripts/Notebooks) ```python import slicks -slicks.connect_influxdb3( - url="http://192.168.1.50:9000", - db="DynoTest_Day1" +slicks.connect_timescaledb( + dsn="http://192.168.1.50:9000", + table="DynoTest_Day1" ) ``` @@ -74,8 +74,8 @@ If you're ingesting raw CAN bus data (e.g., from a replay script or live logger) from slicks import WideWriter writer = WideWriter( - url="http://localhost:8086", - token="my-token", + dsn="http://localhost:8086", + "my-token", bucket="WFR26", measurement="WFR26", dbc_path="path/to/WFR26.dbc", diff --git a/docs/api_reference.md b/docs/api_reference.md index 8e9e937..2cb8a32 100644 --- a/docs/api_reference.md +++ b/docs/api_reference.md @@ -4,14 +4,14 @@ This document details the functions available in the `slicks` package. ## Core Functions -### `slicks.connect_influxdb3` +### `slicks.connect_timescaledb` -Updates the global InfluxDB connection settings dynamically. +Updates the global TimescaleDB connection settings dynamically. ```python -slicks.connect_influxdb3(url=None, token=None, org=None, db=None) +slicks.connect_timescaledb(dsn=None, None, org=None, table=None) ``` -- **url** *(str)*: The InfluxDB host URL (e.g., `"http://localhost:8086"`). +- **url** *(str)*: The TimescaleDB host URL (e.g., `"http://localhost:8086"`). - **token** *(str)*: Authentication token. - **org** *(str)*: Organization name (default: `"Docs"`). - **db** *(str)*: Database/Bucket name (default: `"WFR25"`). @@ -30,7 +30,7 @@ slicks.fetch_telemetry(start_time, end_time, signals=None, client=None, - **start_time** *(datetime)*: Start of the query range. - **end_time** *(datetime)*: End of the query range. - **signals** *(str or list[str])*: A single sensor name or a list of sensor names to fetch. Defaults to standard configuration if `None`. -- **client** *(InfluxDBClient3, optional)*: An existing client instance (advanced use). +- **client** *(TimescaleDBClient3, optional)*: An existing client instance (advanced use). - **filter_movement** *(bool)*: If `True` (default), strips out rows where the car is stationary. - **resample** *(str or None)*: Pandas frequency string for resampling (e.g. `"1s"`, `"100ms"`). Pass `None` for raw data. - **schema** *(str)*: `"wide"` (default, columnar — each signal is a column) or `"narrow"` (legacy EAV — requires pivot). @@ -105,13 +105,13 @@ slicks.detect_movement_ratio(df, speed_column="INV_Motor_Speed") ### `slicks.WideWriter` -Encodes CAN frames to InfluxDB wide format line protocol and writes them in batches. +Encodes CAN frames to TimescaleDB wide format line protocol and writes them in batches. ```python from slicks import WideWriter writer = WideWriter( - url, # InfluxDB URL + url, # TimescaleDB URL token, # Auth token bucket, # Bucket/database name (e.g. "WFR26") measurement, # Measurement name (e.g. "WFR26") @@ -154,7 +154,7 @@ frame = decode_frame(db, can_id, raw_bytes) # → DecodedFrame or None ### `slicks.frame_to_line_protocol` -Converts a `DecodedFrame` to an InfluxDB line protocol string. +Converts a `DecodedFrame` to an TimescaleDB line protocol string. ```python from slicks import frame_to_line_protocol diff --git a/docs/getting_started.md b/docs/getting_started.md index f465257..fc0b423 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -27,18 +27,18 @@ pip install -e . Here is the minimal code needed to connect to the database and download data for a specific sensor. ### Step 1: Import and Configure -The package connects to the InfluxDB database automatically using defaults, but you can configure it explicitly. +The package connects to the TimescaleDB database automatically using defaults, but you can configure it explicitly. ```python import slicks from datetime import datetime # Optional: Configure manually (or use .env file / defaults) -slicks.connect_influxdb3( - url="http://your-influx-server:8086", - token="your-token-here", # Ask Data Lead for your token +slicks.connect_timescaledb( + dsn="http://your-postgres-server:8086", + "your-token-here", # Ask Data Lead for your token org="Docs", - db="WFR25" + table="WFR25" ) ``` diff --git a/docs/index.md b/docs/index.md index 47f9ffb..e223953 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,7 +4,7 @@ The home baked data pipeline for **Western Formula Racing**. This package handles: -1. **Data Ingestion:** Reliable fetching from InfluxDB 3.0. +1. **Data Ingestion:** Reliable fetching from TimescaleDB 3.0. 2. **Movement Detection:** Smart filtering of "Moving" vs "Idle" car states. @@ -29,7 +29,7 @@ import slicks from datetime import datetime # 1. Connect (Auto-configured or custom) -slicks.connect_influxdb3(db="WFR25", influx_url="http://influxdb:9000", influx_token="apiv3_your_token") +slicks.connect_timescaledb(table="WFR25", dsn="http://postgresdb:9000", "apiv3_your_token") # 2. Fetch Data (One-liner) df = slicks.fetch_telemetry( diff --git a/docs/scanner.md b/docs/scanner.md index dd2e887..49b4bc9 100644 --- a/docs/scanner.md +++ b/docs/scanner.md @@ -9,10 +9,10 @@ import slicks from datetime import datetime # Configure connection first -slicks.connect_influxdb3( - url="http://your-server:9000", - token="your-token", - db="WFR25" +slicks.connect_timescaledb( + dsn="http://your-server:9000", + "your-token", + table="WFR25" ) # Scan for data availability @@ -73,7 +73,7 @@ slicks.scan_data_availability( | `start` | `datetime` | *required* | Start of scan range (UTC or timezone-aware) | | `end` | `datetime` | *required* | End of scan range | | `timezone` | `str` | `"UTC"` | Timezone for display (e.g., `"America/Toronto"`) | -| `table` | `str` | `None` | Table to scan (defaults to `"iox.{INFLUX_DB}"`) | +| `table` | `str` | `None` | Table to scan (defaults to `"iox.{POSTGRES_TABLE}"`) | | `bin_size` | `str` | `"hour"` | Granularity: `"hour"` or `"day"` | | `include_counts` | `bool` | `True` | Include row counts (slightly slower if `True`) | | `show_progress` | `bool` | `True` | Show progress bar during scan | diff --git a/examples/end_to_end.py b/examples/end_to_end.py index 1ec1c71..7679375 100644 --- a/examples/end_to_end.py +++ b/examples/end_to_end.py @@ -14,11 +14,11 @@ def main(): # Configure connection. # For CI, these are pulled from environment variables (GitHub Secrets). # For local use, you can set these in a .env file or call configure() directly. - slicks.connect_influxdb3( - url=os.getenv("INFLUX_URL"), - token=os.getenv("INFLUX_TOKEN"), + slicks.connect_timescaledb( + dsn=os.getenv("POSTGRES_DSN"), + os.getenv(""), org=os.getenv("INFLUX_ORG"), - db=os.getenv("INFLUX_DB") + table=os.getenv("POSTGRES_TABLE") ) # --------------------------------------------------------- From bb74879a07231a7a27105f07141cafbcc9af84fa Mon Sep 17 00:00:00 2001 From: Western Formula Racing Date: Thu, 4 Jun 2026 02:00:13 +0000 Subject: [PATCH 3/3] Bump version to 0.3.0 (TimescaleDB migration) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f8ff478..340bcd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "slicks" -version = "0.2.3" +version = "0.3.0" description = "The home baked data pipeline for Western Formula Racing" readme = "README.md" authors = [