diff --git a/tests/unit/test_analytics_queries.py b/tests/unit/test_analytics_queries.py new file mode 100644 index 0000000..17967ce --- /dev/null +++ b/tests/unit/test_analytics_queries.py @@ -0,0 +1,198 @@ +"""Unit tests for analytics query user-agent fallback behavior.""" + +from datetime import datetime + +import pandas as pd +import pytest + +import wikidatasearch.services.logger.analytics_queries as analytics_queries +from wikidatasearch.services.logger.analytics_queries import AnalyticsQueryService + + +class _DummyConnection: + """No-op connection context manager for stubbing SQLAlchemy engine.""" + + def __enter__(self): + return object() + + def __exit__(self, exc_type, exc, tb): + return False + + +class _DummyEngine: + """No-op engine that returns a dummy context manager.""" + + def connect(self): + return _DummyConnection() + + +def _stub_read_sql(monkeypatch, df: pd.DataFrame) -> None: + """Patch read_sql and engine to return a predefined DataFrame.""" + + def _fake_read_sql(*_args, **_kwargs): + return df.copy() + + monkeypatch.setattr(analytics_queries, "engine", _DummyEngine()) + monkeypatch.setattr(analytics_queries.pd, "read_sql", _fake_read_sql) + + +def _capture_sql(monkeypatch, df: pd.DataFrame) -> dict[str, str]: + """Patch read_sql and capture the SQL text used by the query.""" + captured: dict[str, str] = {"query": ""} + + def _fake_read_sql(sql, *_args, **_kwargs): + captured["query"] = str(sql) + return df.copy() + + monkeypatch.setattr(analytics_queries, "engine", _DummyEngine()) + monkeypatch.setattr(analytics_queries.pd, "read_sql", _fake_read_sql) + return captured + + +def _assert_vector_routes_and_status_filter(sql_text: str) -> None: + """Assert vector-route queries exclude 400 and 422 statuses.""" + assert f"route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL}" in sql_text + assert "status NOT IN (400, 422)" in sql_text + assert "status <> 422" not in sql_text + + +def test_get_total_user_agents_prefers_original_user_agent(monkeypatch): + """Return original user agents when available, otherwise fallback to hash.""" + df = pd.DataFrame( + [ + {"client": "browser", "user_agent_hash": "hash_browser", "user_agent_value": "Mozilla/5.0 X"}, + {"client": "api", "user_agent_hash": "hash_api_a", "user_agent_value": "WikiBot/1.0"}, + {"client": "api", "user_agent_hash": "hash_api_b", "user_agent_value": "hash_api_b"}, + ] + ) + _stub_read_sql(monkeypatch, df) + + out = AnalyticsQueryService.get_total_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=True, + ) + + assert out["browser"] == 1 + assert out["api"] == 2 + assert out["total"] == 3 + assert out["user_agents"] == ["Mozilla/5.0 X", "WikiBot/1.0", "hash_api_b"] + + +def test_get_new_user_agents_prefers_original_user_agent(monkeypatch): + """Return original user agents in new-user-agents when available.""" + df = pd.DataFrame( + [ + {"user_agent_hash": "hash_a", "user_agent_value": "CustomAgent/2.0"}, + {"user_agent_hash": "hash_b", "user_agent_value": "hash_b"}, + ] + ) + _stub_read_sql(monkeypatch, df) + + out = AnalyticsQueryService.get_new_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=True, + ) + + assert out["total"] == 2 + assert out["user_agents"] == ["CustomAgent/2.0", "hash_b"] + + +def test_get_new_user_agents_count_only_uses_total_query(monkeypatch): + """Return only total count when include_user_agents is False.""" + df = pd.DataFrame([{"total": 7}]) + _stub_read_sql(monkeypatch, df) + + out = AnalyticsQueryService.get_new_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=False, + ) + + assert out == {"total": 7} + + +def test_get_consistent_user_agents_prefers_original_user_agent(monkeypatch): + """Return original user agents in consistent-user-agents when available.""" + df = pd.DataFrame( + [ + {"user_agent_hash": "hash_a", "user_agent_value": "AgentA/1.2"}, + {"user_agent_hash": "hash_b", "user_agent_value": "hash_b"}, + {"user_agent_hash": "hash_c", "user_agent_value": "AgentC/3.4"}, + ] + ) + _stub_read_sql(monkeypatch, df) + + out = AnalyticsQueryService.get_consistent_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=True, + ) + + assert out["total"] == 3 + assert out["user_agents"] == ["AgentA/1.2", "AgentC/3.4", "hash_b"] + + +def test_get_consistent_user_agents_count_only_uses_total_query(monkeypatch): + """Return only total count when include_user_agents is False.""" + df = pd.DataFrame([{"total": 4}]) + _stub_read_sql(monkeypatch, df) + + out = AnalyticsQueryService.get_consistent_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=False, + ) + + assert out == {"total": 4} + + +@pytest.mark.parametrize( + "call", + [ + lambda: AnalyticsQueryService.get_total_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=True, + ), + lambda: AnalyticsQueryService.get_total_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=False, + ), + lambda: AnalyticsQueryService.get_total_requests( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + ), + lambda: AnalyticsQueryService.get_total_requests_by_lang( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + ), + lambda: AnalyticsQueryService.get_new_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=True, + ), + lambda: AnalyticsQueryService.get_new_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=False, + ), + lambda: AnalyticsQueryService.get_consistent_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=True, + ), + lambda: AnalyticsQueryService.get_consistent_user_agents( + datetime(2026, 4, 1), + datetime(2026, 4, 23), + include_user_agents=False, + ), + ], +) +def test_vector_route_queries_exclude_400_and_422(monkeypatch, call): + """Ensure all vector-route analytics queries exclude 400 and 422.""" + captured = _capture_sql(monkeypatch, pd.DataFrame()) + call() + _assert_vector_routes_and_status_filter(captured["query"]) diff --git a/tests/unit/test_routes.py b/tests/unit/test_routes.py index f374f0f..c8a3f43 100644 --- a/tests/unit/test_routes.py +++ b/tests/unit/test_routes.py @@ -7,10 +7,11 @@ from fastapi import BackgroundTasks, HTTPException -def test_languages_route_returns_split_languages(test_ctx, run_async): +def test_languages_route_returns_split_languages(test_ctx, run_async, make_request): """Validate languages route returns split languages.""" frontend = test_ctx["frontend"] - data = run_async(frontend.languages()) + req = make_request("/languages") + data = run_async(frontend.languages(req)) assert data["vectordb_langs"] == ["en", "fr"] assert "de" in data["other_langs"] assert "ar" in data["other_langs"] diff --git a/wikidatasearch/dependencies.py b/wikidatasearch/dependencies.py index 35abaea..3625f11 100644 --- a/wikidatasearch/dependencies.py +++ b/wikidatasearch/dependencies.py @@ -1,11 +1,14 @@ """Dependencies for the FastAPI application.""" +import base64 +import binascii import time from fastapi import FastAPI, HTTPException, Request from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded +from .config import settings from .services.logger import Logger @@ -27,6 +30,32 @@ def user_agent_key(request: Request) -> str: limiter = Limiter(key_func=user_agent_key) +def verify_admin_auth(request: Request) -> str: + """Verify HTTP Basic auth against for admin page.""" + expected = settings.ANALYTICS_API_SECRET + if not expected: + raise HTTPException(status_code=404, detail="Not found") + + authorization = request.headers.get("authorization", "") + if not authorization.startswith("Basic "): + decoded = None + else: + token = authorization[6:].strip() + try: + payload = base64.b64decode(token).decode("utf-8") + except (binascii.Error, UnicodeDecodeError): + payload = "" + decoded = payload.split(":", 1) if ":" in payload else None + + if not decoded or decoded[1] != expected: + raise HTTPException( + status_code=401, + detail="Incorrect admin credentials", + headers={"WWW-Authenticate": "Basic"}, + ) + return decoded[0] or "admin" + + def require_descriptive_user_agent(request: Request) -> None: """Enforce a descriptive User-Agent and blocks generic HTTP clients.""" ua = request.headers.get("user-agent", "").strip() @@ -37,6 +66,7 @@ def require_descriptive_user_agent(request: Request) -> None: def _logged_rate_limit_exceeded_handler(request: Request, exc: Exception): + """Custom handler for rate limit breaches that logs the event.""" error = str(exc) or "Rate limit exceeded" Logger.add_request(request, 429, time.time(), error=error) return _rate_limit_exceeded_handler(request, exc) diff --git a/wikidatasearch/main.py b/wikidatasearch/main.py index ea53402..4440e52 100644 --- a/wikidatasearch/main.py +++ b/wikidatasearch/main.py @@ -7,9 +7,10 @@ from gradio.routes import mount_gradio_app from .config import settings -from .dependencies import register_rate_limit +from .dependencies import register_rate_limit, verify_admin_auth from .routes import frontend, health, item, property, similarity -from .services.analytics import build_analytics_app +from .routes.admin import analytics_api_router, build_analytics_app +from .services.logger.database import initialize_database app = FastAPI( title="Wikidata Vector Search", @@ -37,6 +38,7 @@ @app.on_event("startup") async def startup_event(): """Initialize the FastAPI cache at startup.""" + initialize_database() FastAPICache.init(InMemoryBackend(), prefix="wikidata-cache") @@ -50,4 +52,11 @@ async def startup_event(): frontend.mount_static(app) if settings.ANALYTICS_API_SECRET: - mount_gradio_app(app, build_analytics_app(), path=f"/admin/{settings.ANALYTICS_API_SECRET}") + app.include_router(analytics_api_router) + mount_gradio_app( + app, + build_analytics_app(), + path="/admin", + auth_dependency=verify_admin_auth, + auth_message="Provide HTTP Basic auth using ANALYTICS_API_SECRET as password.", + ) diff --git a/wikidatasearch/routes/admin/__init__.py b/wikidatasearch/routes/admin/__init__.py new file mode 100644 index 0000000..83b2d99 --- /dev/null +++ b/wikidatasearch/routes/admin/__init__.py @@ -0,0 +1,6 @@ +"""Admin route modules exposed by the API package.""" + +from .analytics_api import router as analytics_api_router +from .analytics_ui import build_analytics_app + +__all__ = ["analytics_api_router", "build_analytics_app"] diff --git a/wikidatasearch/routes/admin/analytics_api.py b/wikidatasearch/routes/admin/analytics_api.py new file mode 100644 index 0000000..7f274bc --- /dev/null +++ b/wikidatasearch/routes/admin/analytics_api.py @@ -0,0 +1,118 @@ +"""Admin analytics API routes for the FastAPI application.""" + +import traceback +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException, Query + +from ...dependencies import verify_admin_auth +from ...services.logger import AnalyticsQueryService + +router = APIRouter( + prefix="/admin/analytics", + tags=["Admin Analytics"], + dependencies=[Depends(verify_admin_auth)], +) + + +@router.get("/page-views") +def page_views_route( + start: datetime = Query(...), + end: datetime = Query(...), +): + """Return total page views between start and end datetimes.""" + try: + start_utc, end_utc = AnalyticsQueryService.normalize_dt_interval(start, end) + return AnalyticsQueryService.get_page_views(start_utc, end_utc) + except Exception: + traceback.print_exc() + raise HTTPException(status_code=500, detail="Internal Server Error") + + +@router.get("/total-user-agents") +def total_user_agents_route( + start: datetime = Query(...), + end: datetime = Query(...), + requests_threshold: int = Query(0, ge=0), + include_user_agents: bool = Query(False), +): + """Return unique user agents between start and end datetimes.""" + try: + start_utc, end_utc = AnalyticsQueryService.normalize_dt_interval(start, end) + return AnalyticsQueryService.get_total_user_agents( + start_utc, + end_utc, + requests_threshold=requests_threshold, + include_user_agents=include_user_agents, + ) + except Exception: + traceback.print_exc() + raise HTTPException(status_code=500, detail="Internal Server Error") + + +@router.get("/total-requests") +def total_requests_route( + start: datetime = Query(...), + end: datetime = Query(...), +): + """Return total requests between start and end datetimes.""" + try: + start_utc, end_utc = AnalyticsQueryService.normalize_dt_interval(start, end) + return AnalyticsQueryService.get_total_requests(start_utc, end_utc) + except Exception: + traceback.print_exc() + raise HTTPException(status_code=500, detail="Internal Server Error") + + +@router.get("/total-requests-by-lang") +def total_requests_by_lang_route( + start: datetime = Query(...), + end: datetime = Query(...), +): + """Return total requests by language between start and end datetimes.""" + try: + start_utc, end_utc = AnalyticsQueryService.normalize_dt_interval(start, end) + return AnalyticsQueryService.get_total_requests_by_lang(start_utc, end_utc) + except Exception: + traceback.print_exc() + raise HTTPException(status_code=500, detail="Internal Server Error") + + +@router.get("/new-user-agents") +def new_user_agents_route( + start: datetime = Query(...), + end: datetime = Query(...), + include_user_agents: bool = Query(False), +): + """Return newly seen user agents between start and end datetimes.""" + try: + start_utc, end_utc = AnalyticsQueryService.normalize_dt_interval(start, end) + return AnalyticsQueryService.get_new_user_agents( + start_utc, + end_utc, + include_user_agents=include_user_agents, + ) + except Exception: + traceback.print_exc() + raise HTTPException(status_code=500, detail="Internal Server Error") + + +@router.get("/consistent-user-agents") +def consistent_user_agents_route( + start: datetime = Query(...), + end: datetime = Query(...), + consistent_days: int = Query(3, ge=1), + include_user_agents: bool = Query(False), +): + """Return consistent user agents between start and end datetimes.""" + try: + start_utc, end_utc = AnalyticsQueryService.normalize_dt_interval(start, end) + return AnalyticsQueryService.get_consistent_user_agents( + start_utc, + end_utc, + consistent_days=consistent_days, + include_user_agents=include_user_agents, + ) + except Exception: + traceback.print_exc() + raise HTTPException(status_code=500, detail="Internal Server Error") diff --git a/wikidatasearch/routes/admin/analytics_ui.py b/wikidatasearch/routes/admin/analytics_ui.py new file mode 100644 index 0000000..81ec68d --- /dev/null +++ b/wikidatasearch/routes/admin/analytics_ui.py @@ -0,0 +1,62 @@ +"""Analytics service for the FastAPI application.""" + +from datetime import datetime, timezone + +import gradio as gr +import pandas as pd + +from ...config import settings +from ...services.logger.gradio_utils import run_query + +ROUTES_CHOICES = ["/", "/item/query/", "/property/query/", "/similarity-score/"] +RERANK_CHOICES = ["any", "true", "false", "unset"] +STATUS_CHOICES = ["200", "400", "422", "429", "500"] +LANG_CHOICES = ["all", "translated"] + settings.VECTORDb_LANGS +CLIENT_CHOICES = ["all", "browser", "api"] +PERIOD_CHOICES = ["Hour", "Day", "Week", "Month"] +GROUP_BY_CHOICES = ["None", "route", "user_agent", "status", "rerank", "lang", "client"] + + +def build_analytics_app(): + """Build and return the Gradio analytics dashboard application.""" + now = datetime.now(tz=timezone.utc).replace(microsecond=0) + default_start = now - pd.Timedelta(days=7) + + with gr.Blocks(title="API Analytics") as demo: + gr.Markdown("## API Analytics") + + with gr.Row(): + start_dt = gr.DateTime(label="Start UTC", value=default_start, interactive=True) + end_dt = gr.DateTime(label="End UTC", value=now, interactive=True) + period = gr.Dropdown(choices=PERIOD_CHOICES, value="Day", label="Time bucket") + group_by = gr.Dropdown(choices=GROUP_BY_CHOICES, value="None", label="Group by") + + with gr.Row(): + route_dd = gr.CheckboxGroup(choices=ROUTES_CHOICES, label="Filter routes", value=[]) + status_dd = gr.CheckboxGroup(choices=STATUS_CHOICES, label="Filter status codes", value=[]) + ua_inc = gr.Textbox(label="User agent contains", placeholder="curl, python-requests, chrome") + client_dd = gr.Dropdown(choices=CLIENT_CHOICES, value="all", label="Client type") + + with gr.Row(): + rerank_dd = gr.Dropdown(choices=RERANK_CHOICES, value="any", label="Filter rerank") + langs_dd = gr.Dropdown(choices=LANG_CHOICES, value=[], multiselect=True, label="Filter lang") + + btn = gr.Button("Refresh", variant="primary") + + ts_plot = gr.Plot(label="Requests over time") + bar_plot = gr.Plot(label="Totals") + table = gr.Dataframe(label="Totals table", interactive=False) + table_stats = gr.Markdown("Rows: 0 | Sum requests: 0") + + inputs = [start_dt, end_dt, period, group_by, route_dd, status_dd, ua_inc, client_dd, rerank_dd, langs_dd] + + btn.click(fn=run_query, inputs=inputs, outputs=[ts_plot, bar_plot, table, table_stats], queue=False) + gr.on( + triggers=[x.change for x in inputs], + fn=run_query, + inputs=inputs, + outputs=[ts_plot, bar_plot, table, table_stats], + queue=False, + ) + + return demo diff --git a/wikidatasearch/routes/frontend.py b/wikidatasearch/routes/frontend.py index 89a87db..32eefad 100644 --- a/wikidatasearch/routes/frontend.py +++ b/wikidatasearch/routes/frontend.py @@ -14,8 +14,8 @@ router = APIRouter(include_in_schema=False) -@limiter.limit(settings.RATE_LIMIT) @router.get("/") +@limiter.limit(settings.RATE_LIMIT) async def root(request: Request, background_tasks: BackgroundTasks): """Serve the frontend index page.""" background_tasks.add_task(Logger.add_request, request, 200, time.time()) @@ -29,7 +29,8 @@ def mount_static(app): @router.get("/languages", summary="Supported languages") @cache(expire=settings.CACHE_TTL) -async def languages(): +@limiter.limit(settings.RATE_LIMIT) +async def languages(request: Request): """Return available vector-database and translated language codes.""" vectordb_langs = set(SEARCH.vectordb_langs) other_langs = set(SEARCH.translator.mint_langs) - vectordb_langs @@ -39,8 +40,8 @@ async def languages(): } -@limiter.limit("10/minute") @router.post("/feedback", include_in_schema=False) +@limiter.limit(settings.RATE_LIMIT) async def feedback( request: Request, query: str = Query(..., examples=["testing"]), diff --git a/wikidatasearch/services/__init__.py b/wikidatasearch/services/__init__.py index 22ca8f3..46b6e3d 100644 --- a/wikidatasearch/services/__init__.py +++ b/wikidatasearch/services/__init__.py @@ -2,7 +2,7 @@ from .search import HybridSearch -__all__ = ["HybridSearch", "Logger", "Feedback", "build_analytics_app"] +__all__ = ["HybridSearch", "Logger", "Feedback"] def __getattr__(name: str): @@ -10,8 +10,4 @@ def __getattr__(name: str): from .logger import Feedback, Logger return {"Logger": Logger, "Feedback": Feedback}[name] - if name == "build_analytics_app": - from .analytics import build_analytics_app - - return build_analytics_app raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/wikidatasearch/services/analytics.py b/wikidatasearch/services/analytics.py deleted file mode 100644 index f794c13..0000000 --- a/wikidatasearch/services/analytics.py +++ /dev/null @@ -1,416 +0,0 @@ -"""Analytics service for the FastAPI application.""" - -import json -from dataclasses import dataclass -from datetime import datetime, timezone -from functools import lru_cache -from typing import Any, List, Literal, Optional, Tuple - -import gradio as gr -import pandas as pd -import plotly.express as px -from sqlalchemy import bindparam, text - -from .logger import engine - -# ---------------------------- -# Constants and types -# ---------------------------- - -Period = Literal["Hour", "Day", "Week", "Month"] -GroupBy = Literal["None", "route", "user_agent", "status", "rerank", "lang", "client"] -PERIOD_FREQ = {"Hour": "H", "Day": "D", "Week": "W", "Month": "M"} -PARAM_KEYS = ("rerank", "lang") - - -@dataclass(frozen=True) -class QueryFilters: - """Structured filters for querying the requests data.""" - - start: datetime - end: datetime - routes: List[str] - statuses: List[int] - ua_include: Optional[str] - ua_exclude: Optional[str] - rerank_filter: Literal["any", "true", "false", "unset"] - langs_filter: List[str] - period: Period - group_by: GroupBy - - -# ---------------------------- -# Time helpers -# ---------------------------- - - -def normalize_dt(val: Any) -> datetime: - """Normalize a datetime-like value to a naive UTC `datetime`.""" - if isinstance(val, datetime): - return val.astimezone(timezone.utc).replace(tzinfo=None) - if isinstance(val, (int, float)): - s = float(val) - # allow ns, us, ms heuristics - if s > 1e14: # ns - s /= 1e9 - elif s > 1e11: # us - s /= 1e6 - elif s > 1e10: # ms - s /= 1e3 - return datetime.utcfromtimestamp(s) - dt = pd.to_datetime(val, utc=True, errors="coerce") - if pd.isna(dt): - raise ValueError("Invalid datetime") - return dt.tz_convert(None).to_pydatetime() - - -def _coerce_parameters(value: Any) -> dict: - if isinstance(value, dict): - return value - if isinstance(value, str) and value: - try: - parsed = json.loads(value) - return parsed if isinstance(parsed, dict) else {} - except Exception: - return {} - return {} - - -# ---------------------------- -# Data access -# ---------------------------- - - -def _build_sql_and_params( - start: datetime, - end: datetime, - routes: List[str], - statuses: List[int], - ua_include: Optional[str], -) -> Tuple[Any, dict]: - base = """ - SELECT timestamp, route, user_agent, on_browser, status, parameters - FROM requests - WHERE timestamp BETWEEN :start AND :end - """ - - params: dict = {"start": start, "end": end} - clauses: List[str] = [] - - if routes: - clauses.append("route IN :routes") - if statuses: - clauses.append("status IN :statuses") - if ua_include: - clauses.append("LOWER(user_agent) LIKE :ua_inc") - params["ua_inc"] = f"%{ua_include.lower()}%" - - if clauses: - base += " AND " + " AND ".join(clauses) - - stmt = text(base) - if routes: - stmt = stmt.bindparams(bindparam("routes", expanding=True)) - params["routes"] = list(routes) - if statuses: - stmt = stmt.bindparams(bindparam("statuses", expanding=True)) - params["statuses"] = list(statuses) - - return stmt, params - - -def load_requests_df( - start: datetime, - end: datetime, - routes: List[str], - statuses: List[int], - ua_include: Optional[str], - ua_exclude: Optional[str], -) -> pd.DataFrame: - """Load request logs from the database and apply user-agent exclusion filters.""" - stmt, params = _build_sql_and_params(start, end, routes, statuses, ua_include) - with engine.connect() as conn: - df = pd.read_sql(stmt, conn, params=params, parse_dates=["timestamp"]) - - if df.empty: - return df - - df["status"] = df["status"].astype(int) - - # Exclude UA via pandas for simple contains - if ua_exclude: - mask = ~df["user_agent"].fillna("").str.contains(ua_exclude, case=False, na=False) - df = df[mask] - - return df - - -# ---------------------------- -# Transforms -# ---------------------------- - - -def _extract_params_col(s: pd.Series) -> pd.DataFrame: - """Parse JSON in the `parameters` column and extract `rerank` and `lang`. - - Returns a DataFrame with columns ['rerank','lang'] normalized. - """ - - def parse_one(x: Any) -> dict: - d = _coerce_parameters(x) - rerank = str(d.get("rerank")).strip().lower() if "rerank" in d else "" - if rerank == "" or rerank not in ("true", "false"): - rerank = "unset" - lang = d.get("lang") - lang_norm = str(lang).strip() if lang not in (None, "") else "all" - return {"rerank": rerank, "lang": lang_norm} - - parsed = s.map(parse_one) - return pd.DataFrame(list(parsed)) - - -def apply_param_filters(df: pd.DataFrame, rerank_filter: str, langs_filter: List[str]) -> pd.DataFrame: - """Apply `rerank` and `lang` filters parsed from each row's parameters payload.""" - if df.empty: - return df - if "parameters" in df.columns: - extracted = _extract_params_col(df["parameters"]) - df = pd.concat([df.reset_index(drop=True), extracted], axis=1) - else: - df["rerank"], df["lang"] = "unset", "all" - - if rerank_filter in ("true", "false", "unset"): - df = df[df["rerank"] == rerank_filter] - if langs_filter: - df = df[df["lang"].isin(langs_filter)] - return df - - -def aggregate_requests(df: pd.DataFrame, period: Period, group_by: GroupBy) -> pd.DataFrame: - """Aggregate requests by time bucket and optional grouping dimension.""" - if df.empty: - return df - freq = PERIOD_FREQ[period] - df = df.copy() - - # Derived grouping: browser vs API based on UA - if group_by == "client": - if "on_browser" in df.columns: - is_browser = df["on_browser"].fillna(False).astype(bool) - else: - ua = df.get("user_agent", pd.Series(index=df.index, dtype=object)).fillna("") - is_browser = ua.str.contains("mozilla", case=False, na=False) - df["client"] = is_browser.map({True: "browser", False: "api"}) - - df = df.set_index("timestamp") - - if group_by == "None": - out = df.groupby(pd.Grouper(freq=freq)).size().reset_index(name="requests") - else: - out = df.groupby([pd.Grouper(freq=freq), group_by]).size().reset_index(name="requests") - - out = out.rename(columns={"timestamp": "bucket"}) - return out - - -# ---------------------------- -# Charts -# ---------------------------- - - -def empty_ts(group_by: GroupBy): - """Build an empty time-series chart placeholder for no-data scenarios.""" - if group_by == "None": - base = pd.DataFrame({"bucket": [], "requests": []}) - return px.line(base, x="bucket", y="requests", title="No data", markers=True) - base = pd.DataFrame({"bucket": [], "requests": [], group_by: []}) - return px.line(base, x="bucket", y="requests", color=group_by, title="No data", markers=True) - - -def empty_bar(group_by: GroupBy): - """Build an empty bar chart placeholder for no-data scenarios.""" - if group_by == "None": - base = pd.DataFrame({"category": [], "requests": []}) - return px.bar(base, x="category", y="requests", title="No data") - base = pd.DataFrame({group_by: [], "requests": []}) - return px.bar(base, x=group_by, y="requests", title="No data") - - -def make_charts(agg: pd.DataFrame, group_by: GroupBy): - """Generate timeseries and totals charts from aggregated request data.""" - if agg.empty: - return empty_ts(group_by), empty_bar(group_by), pd.DataFrame() - - if group_by == "None": - fig_ts = px.line(agg, x="bucket", y="requests", markers=True, title="Requests over time") - totals = agg[["requests"]].sum().to_frame(name="requests") - totals["category"] = "All" - totals = totals[["category", "requests"]] - fig_bar = px.bar(totals, x="category", y="requests", title="Total requests") - else: - fig_ts = px.line( - agg, x="bucket", y="requests", color=group_by, markers=True, title=f"Requests over time by {group_by}" - ) - totals = agg.groupby(group_by)["requests"].sum().sort_values(ascending=False).reset_index() - fig_bar = px.bar(totals, x=group_by, y="requests", title=f"Requests by {group_by}") - return fig_ts, fig_bar, totals - - -# ---------------------------- -# Choice helpers with caching -# ---------------------------- - - -@lru_cache(maxsize=1) -def route_choices(limit: int = 500) -> List[str]: - """Return distinct route values for filter controls.""" - q = text(""" - SELECT DISTINCT route AS v - FROM requests - WHERE route IS NOT NULL AND route != '' - ORDER BY 1 - LIMIT :limit - """) - with engine.connect() as conn: - df = pd.read_sql(q, conn, params={"limit": limit}) - return df["v"].dropna().astype(str).tolist() if not df.empty else [] - - -@lru_cache(maxsize=1) -def status_choices(limit: int = 500) -> List[int]: - """Return distinct HTTP status codes for filter controls.""" - q = text(""" - SELECT DISTINCT status AS v - FROM requests - WHERE status IS NOT NULL - ORDER BY 1 - LIMIT :limit - """) - with engine.connect() as conn: - df = pd.read_sql(q, conn, params={"limit": limit}) - return sorted([int(x) for x in df["v"].tolist()]) if not df.empty else [] - - -@lru_cache(maxsize=1) -def lang_choices(sample: int = 2000) -> List[str]: - """Return recently observed `lang` parameter values for filter controls.""" - q = text(""" - SELECT parameters - FROM requests - WHERE parameters IS NOT NULL - ORDER BY timestamp DESC - LIMIT :sample - """) - with engine.connect() as conn: - df = pd.read_sql(q, conn, params={"sample": sample}) - vals: set[str] = set() - for s in df.get("parameters", []): - d = _coerce_parameters(s) - v = d.get("lang") - if v is not None and str(v).strip() != "": - vals.add(str(v)) - return sorted(vals) - - -# ---------------------------- -# Orchestration -# ---------------------------- - - -def run_query(filters: QueryFilters): - """Execute analytics query pipeline and return chart-ready outputs.""" - # Normalize and validate time - s = normalize_dt(filters.start) - e = normalize_dt(filters.end) - if s > e: - s, e = e, s - - df = load_requests_df( - start=s, - end=e, - routes=filters.routes or [], - statuses=filters.statuses or [], - ua_include=filters.ua_include or "", - ua_exclude=filters.ua_exclude or "", - ) - - df = apply_param_filters(df, filters.rerank_filter or "any", filters.langs_filter or []) - agg = aggregate_requests(df, filters.period, filters.group_by) - fig_ts, fig_bar, totals = make_charts(agg, filters.group_by) - return fig_ts, fig_bar, totals - - -# ---------------------------- -# Public Gradio builder -# ---------------------------- - - -def build_analytics_app(): - """Build and return the Gradio analytics dashboard application.""" - now = datetime.now(tz=timezone.utc).replace(microsecond=0) - default_start = now - pd.Timedelta(days=7) - - with gr.Blocks(title="API Analytics") as demo: - gr.Markdown("## API Analytics") - - with gr.Row(): - start_dt = gr.DateTime(label="Start UTC", value=default_start, interactive=True) - end_dt = gr.DateTime(label="End UTC", value=now, interactive=True) - period = gr.Dropdown(choices=list(PERIOD_FREQ.keys()), value="Day", label="Time bucket") - group_by = gr.Dropdown( - choices=["None", "route", "user_agent", "status", "rerank", "lang", "client"], - value="None", - label="Group by", - ) - - with gr.Row(): - route_dd = gr.CheckboxGroup(choices=route_choices(), label="Filter routes", value=[]) - status_dd = gr.CheckboxGroup( - choices=[str(s) for s in status_choices()], - label="Filter status codes", - value=[], - ) - ua_inc = gr.Textbox(label="User agent contains", placeholder="curl, python-requests, chrome") - ua_exc = gr.Textbox(label="User agent does NOT contain", placeholder="bot, uptime, healthcheck") - - with gr.Row(): - rerank_dd = gr.Dropdown(choices=["any", "true", "false", "unset"], value="any", label="Filter rerank") - langs_dd = gr.Dropdown(choices=lang_choices(), value=[], multiselect=True, label="Filter lang") - - btn = gr.Button("Refresh", variant="primary") - - ts_plot = gr.Plot(label="Requests over time") - bar_plot = gr.Plot(label="Totals") - table = gr.Dataframe(label="Totals table", interactive=False) - - def _run( - start, end, period_v, group_by_v, routes, statuses, ua_include, ua_exclude, rerank_filter, langs_filter - ): - f = QueryFilters( - start=start, - end=end, - period=period_v, - group_by=group_by_v, - routes=routes or [], - statuses=[int(x) for x in statuses] if statuses else [], - ua_include=ua_include, - ua_exclude=ua_exclude, - rerank_filter=rerank_filter or "any", - langs_filter=langs_filter or [], - ) - return run_query(f) - - inputs = [start_dt, end_dt, period, group_by, route_dd, status_dd, ua_inc, ua_exc, rerank_dd, langs_dd] - - btn.click(fn=_run, inputs=inputs, outputs=[ts_plot, bar_plot, table], queue=False) - - # Live updates on change without queueing - gr.on( - triggers=[x.change for x in inputs], - fn=_run, - inputs=inputs, - outputs=[ts_plot, bar_plot, table], - queue=False, - ) - - return demo diff --git a/wikidatasearch/services/logger/__init__.py b/wikidatasearch/services/logger/__init__.py new file mode 100644 index 0000000..66b1596 --- /dev/null +++ b/wikidatasearch/services/logger/__init__.py @@ -0,0 +1,6 @@ +"""Logger service for analytics and feedback management.""" + +from .analytics_queries import AnalyticsQueryService +from .database import Feedback, Logger, engine + +__all__ = ["AnalyticsQueryService", "Feedback", "Logger", "engine"] diff --git a/wikidatasearch/services/logger/analytics_queries.py b/wikidatasearch/services/logger/analytics_queries.py new file mode 100644 index 0000000..71d95e8 --- /dev/null +++ b/wikidatasearch/services/logger/analytics_queries.py @@ -0,0 +1,444 @@ +"""Analytics query service for request log analysis.""" + +from datetime import datetime, timezone +from typing import Any, Optional + +import pandas as pd +from sqlalchemy import bindparam, text + +from .database import engine + + +class AnalyticsQueryService: + """Database read/query methods used by analytics.""" + + VECTOR_QUERY_ROUTES_SQL = "('/item/query/', '/property/query/', '/similarity-score/')" + + @staticmethod + def _extract_user_agent_values(df: pd.DataFrame) -> list[str]: + """Extract unique user-agent values from query results.""" + if df.empty or "user_agent_value" not in df.columns: + return [] + + values = df["user_agent_value"].fillna("").astype(str).str.strip().replace("", pd.NA).dropna().unique().tolist() + return sorted(values) + + @classmethod + def query_graph_requests( + cls, + start: datetime, + end: datetime, + routes: list[str], + statuses: list[int], + ua_include: Optional[str], + client_filter: str = "all", + rerank_filter: str = "any", + langs_filter: Optional[list[str]] = None, + group_by: Optional[str] = None, + ) -> pd.DataFrame: + """Load request logs for analytics with filtering applied in SQL. + + The selected columns are minimized based on `group_by` and active filters. + """ + rerank_expr = """ + CASE + WHEN LOWER(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(parameters, '$.rerank')), '')) + IN ('true', 'false') + THEN LOWER(JSON_UNQUOTE(JSON_EXTRACT(parameters, '$.rerank'))) + ELSE 'unset' + END + """ + lang_expr = """ + COALESCE( + NULLIF(LOWER(JSON_UNQUOTE(JSON_EXTRACT(parameters, '$.lang'))), ''), + 'all' + ) + """ + + langs = [str(v).strip().lower() for v in (langs_filter or []) if str(v).strip()] + + select_cols: list[str] = ["timestamp"] + if group_by == "route": + select_cols.append("route") + elif group_by == "user_agent": + select_cols.append("user_agent") + elif group_by == "status": + select_cols.append("status") + elif group_by == "client": + select_cols.append("on_browser") + elif group_by == "rerank": + select_cols.append(f"{rerank_expr} AS rerank") + elif group_by == "lang": + select_cols.append(f"{lang_expr} AS lang") + + base = ( + "SELECT " + + ", ".join(select_cols) + + """ + FROM requests + WHERE timestamp BETWEEN :start AND :end + """ + ) + + params: dict = {"start": start, "end": end} + clauses: list[str] = [] + + if routes: + clauses.append("route IN :routes") + if statuses: + clauses.append("status IN :statuses") + if ua_include: + clauses.append("LOWER(COALESCE(user_agent, '')) LIKE :ua_inc") + params["ua_inc"] = f"%{ua_include.lower()}%" + if client_filter == "browser": + clauses.append("COALESCE(on_browser, 0) = 1") + elif client_filter == "api": + clauses.append("COALESCE(on_browser, 0) = 0") + if rerank_filter in ("true", "false", "unset"): + clauses.append(f"{rerank_expr} = :rerank_filter") + params["rerank_filter"] = rerank_filter + if langs: + clauses.append(f"{lang_expr} IN :langs") + params["langs"] = langs + + if clauses: + base += " AND " + " AND ".join(clauses) + + stmt = text(base) + if routes: + stmt = stmt.bindparams(bindparam("routes", expanding=True)) + params["routes"] = list(routes) + if statuses: + stmt = stmt.bindparams(bindparam("statuses", expanding=True)) + params["statuses"] = list(statuses) + if langs: + stmt = stmt.bindparams(bindparam("langs", expanding=True)) + + with engine.connect() as conn: + df = pd.read_sql(stmt, conn, params=params, parse_dates=["timestamp"]) + + if df.empty or "status" not in df.columns: + return df + + df["status"] = df["status"].astype(int) + return df + + @staticmethod + def get_page_views(start: datetime, end: datetime) -> dict[str, int]: + """Return page views for route '/' between start and end, split by client type (browser vs API).""" + q = text( + """ + SELECT + COALESCE(SUM(CASE WHEN COALESCE(on_browser, 0) = 1 THEN 1 ELSE 0 END), 0) AS browser, + COALESCE(SUM(CASE WHEN COALESCE(on_browser, 0) = 0 THEN 1 ELSE 0 END), 0) AS api + FROM requests + WHERE route = '/' + AND timestamp BETWEEN :start AND :end + """ + ) + + with engine.connect() as conn: + df = pd.read_sql(q, conn, params={"start": start, "end": end}) + + row = df.iloc[0] if not df.empty else {} + browser = row.get("browser", 0) + browser = int(browser) if pd.notna(browser) else 0 + api = row.get("api", 0) + api = int(api) if pd.notna(api) else 0 + return {"browser": browser, "api": api, "total": browser + api} + + @staticmethod + def get_total_user_agents( + start: datetime, + end: datetime, + requests_threshold: int = 0, + include_user_agents: bool = False, + ) -> dict[str, Any]: + """Return unique user agents for vector-query routes between start and end.""" + params = { + "start": start, + "end": end, + "requests_threshold": int(requests_threshold), + } + if include_user_agents: + q = text( + f""" + SELECT + CASE + WHEN COALESCE(on_browser, 0) = 1 THEN 'browser' + ELSE 'api' + END AS client, + user_agent_hash, + COALESCE(MAX(NULLIF(user_agent, '')), user_agent_hash) AS user_agent_value + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp BETWEEN :start AND :end + AND user_agent_hash IS NOT NULL + AND user_agent_hash != '' + GROUP BY client, user_agent_hash + HAVING :requests_threshold <= 0 OR COUNT(*) > :requests_threshold + ORDER BY user_agent_hash + """ + ) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params=params) + + out: dict[str, Any] = {"browser": 0, "api": 0, "total": 0} + if df.empty: + out["user_agents"] = [] + return out + + counts = df["client"].astype(str).value_counts() + out["browser"] = int(counts.get("browser", 0)) + out["api"] = int(counts.get("api", 0)) + out["total"] = out["browser"] + out["api"] + out["user_agents"] = AnalyticsQueryService._extract_user_agent_values(df) + return out + + q = text( + f""" + SELECT + COALESCE(SUM(CASE WHEN t.client = 'browser' THEN 1 ELSE 0 END), 0) AS browser, + COALESCE(SUM(CASE WHEN t.client = 'api' THEN 1 ELSE 0 END), 0) AS api + FROM ( + SELECT + CASE + WHEN COALESCE(on_browser, 0) = 1 THEN 'browser' + ELSE 'api' + END AS client, + user_agent_hash + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp BETWEEN :start AND :end + AND user_agent_hash IS NOT NULL + AND user_agent_hash != '' + GROUP BY client, user_agent_hash + HAVING :requests_threshold <= 0 OR COUNT(*) > :requests_threshold + ) AS t + """ + ) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params=params) + + row = df.iloc[0] if not df.empty else {} + browser = row.get("browser", 0) + browser = int(browser) if pd.notna(browser) else 0 + api = row.get("api", 0) + api = int(api) if pd.notna(api) else 0 + return {"browser": browser, "api": api, "total": browser + api} + + @staticmethod + def get_total_requests(start: datetime, end: datetime) -> dict[str, int]: + """Return the number of requests for routes that query the vector database between start and end.""" + q = text( + f""" + SELECT + COALESCE(SUM(CASE WHEN COALESCE(on_browser, 0) = 1 THEN 1 ELSE 0 END), 0) AS browser, + COALESCE(SUM(CASE WHEN COALESCE(on_browser, 0) = 0 THEN 1 ELSE 0 END), 0) AS api + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp BETWEEN :start AND :end + """ + ) + + with engine.connect() as conn: + df = pd.read_sql(q, conn, params={"start": start, "end": end}) + + row = df.iloc[0] if not df.empty else {} + browser = row.get("browser", 0) + browser = int(browser) if pd.notna(browser) else 0 + api = row.get("api", 0) + api = int(api) if pd.notna(api) else 0 + return {"browser": browser, "api": api, "total": browser + api} + + @staticmethod + def get_total_requests_by_lang(start: datetime, end: datetime) -> dict[str, int]: + """Return API request counts for vector database query routes, grouped by requested language.""" + q = text( + f""" + SELECT + COALESCE( + NULLIF(JSON_UNQUOTE(JSON_EXTRACT(parameters, '$.lang')), ''), + 'all' + ) AS lang, + COUNT(*) AS requests + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND COALESCE(on_browser, 0) = 0 + AND timestamp BETWEEN :start AND :end + GROUP BY lang + ORDER BY requests DESC + """ + ) + + with engine.connect() as conn: + df = pd.read_sql(q, conn, params={"start": start, "end": end}) + + out: dict[str, int] = {"total": 0} + if df.empty: + return out + + lang_df = df.assign( + lang=df["lang"].astype(str), + requests=pd.to_numeric(df["requests"], errors="coerce").fillna(0).astype(int), + ) + out.update({row.lang: int(row.requests) for row in lang_df.itertuples(index=False)}) + out["total"] = int(lang_df["requests"].sum()) + return out + + @staticmethod + def get_new_user_agents( + start: datetime, + end: datetime, + include_user_agents: bool = False, + ) -> dict[str, Any]: + """Return new user agents between start and end and optionally list original values.""" + params = {"start": start, "end": end} + if include_user_agents: + q = text( + f""" + SELECT + user_agent_hash, + COALESCE(MAX(NULLIF(user_agent, '')), user_agent_hash) AS user_agent_value + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp <= :end + AND user_agent_hash IS NOT NULL + AND user_agent_hash != '' + GROUP BY user_agent_hash + HAVING + SUM(CASE WHEN timestamp BETWEEN :start AND :end THEN 1 ELSE 0 END) > 0 + AND SUM(CASE WHEN timestamp < :start THEN 1 ELSE 0 END) = 0 + ORDER BY user_agent_hash + """ + ) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params=params) + + out = {"total": len(df.index)} + out["user_agents"] = AnalyticsQueryService._extract_user_agent_values(df) + return out + + q = text( + f""" + SELECT COUNT(*) AS total + FROM ( + SELECT user_agent_hash + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp <= :end + AND user_agent_hash IS NOT NULL + AND user_agent_hash != '' + GROUP BY user_agent_hash + HAVING + SUM(CASE WHEN timestamp BETWEEN :start AND :end THEN 1 ELSE 0 END) > 0 + AND SUM(CASE WHEN timestamp < :start THEN 1 ELSE 0 END) = 0 + ) AS t + """ + ) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params=params) + + row = df.iloc[0] if not df.empty else {} + total = row.get("total", 0) + total = int(total) if pd.notna(total) else 0 + return {"total": total} + + @staticmethod + def get_consistent_user_agents( + start: datetime, + end: datetime, + consistent_days: int = 3, + include_user_agents: bool = False, + ) -> dict[str, Any]: + """Return consistent user agents and optionally list original values.""" + min_days = max(1, int(consistent_days)) + + params = {"start": start, "end": end, "min_days": min_days} + if include_user_agents: + q = text( + f""" + SELECT + user_agent_hash, + COALESCE(MAX(NULLIF(user_agent, '')), user_agent_hash) AS user_agent_value + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp <= :end + AND user_agent_hash IS NOT NULL + AND user_agent_hash != '' + GROUP BY user_agent_hash + HAVING + SUM(CASE WHEN timestamp BETWEEN :start AND :end THEN 1 ELSE 0 END) > 0 + AND COUNT(DISTINCT DATE(timestamp)) >= :min_days + ORDER BY user_agent_hash + """ + ) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params=params) + + out = {"total": len(df.index)} + out["user_agents"] = AnalyticsQueryService._extract_user_agent_values(df) + return out + + q = text( + f""" + SELECT COUNT(*) AS total + FROM ( + SELECT user_agent_hash + FROM requests + WHERE route IN {AnalyticsQueryService.VECTOR_QUERY_ROUTES_SQL} + AND status NOT IN (400, 422) + AND timestamp <= :end + AND user_agent_hash IS NOT NULL + AND user_agent_hash != '' + GROUP BY user_agent_hash + HAVING + SUM(CASE WHEN timestamp BETWEEN :start AND :end THEN 1 ELSE 0 END) > 0 + AND COUNT(DISTINCT DATE(timestamp)) >= :min_days + ) AS t + """ + ) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params=params) + + row = df.iloc[0] if not df.empty else {} + total = row.get("total", 0) + total = int(total) if pd.notna(total) else 0 + return {"total": total} + + @staticmethod + def normalize_dt_interval(start: Any, end: Any) -> tuple[datetime, datetime]: + """Normalize and validate start and end datetimes for queries.""" + if isinstance(start, datetime): + start = start.astimezone(timezone.utc).replace(tzinfo=None) + if isinstance(end, datetime): + end = end.astimezone(timezone.utc).replace(tzinfo=None) + + if isinstance(start, (int, float)): + start = datetime.utcfromtimestamp(float(start)) + if isinstance(end, (int, float)): + end = datetime.utcfromtimestamp(float(end)) + + if pd.isna(start): + # lowest time ever + start = datetime(1970, 1, 1) + if pd.isna(end): + # highest time ever + end = datetime(3000, 1, 1) + + start = pd.to_datetime(start, utc=True, errors="coerce") + end = pd.to_datetime(end, utc=True, errors="coerce") + + start = start.tz_convert(None).to_pydatetime() + end = end.tz_convert(None).to_pydatetime() + + if start > end: + return end, start + return start, end diff --git a/wikidatasearch/services/logger.py b/wikidatasearch/services/logger/database.py similarity index 95% rename from wikidatasearch/services/logger.py rename to wikidatasearch/services/logger/database.py index 1a4a83b..af50032 100644 --- a/wikidatasearch/services/logger.py +++ b/wikidatasearch/services/logger/database.py @@ -20,7 +20,7 @@ from sqlalchemy.dialects.mysql import JSON from sqlalchemy.orm import declarative_base, sessionmaker -from ..config import settings +from ...config import settings """ MySQL database setup for storing Wikidata labels in all languages. @@ -201,4 +201,11 @@ def add_feedback(query, qid, sentiment, index): traceback.print_exc() -Base.metadata.create_all(bind=engine) +def initialize_database(): + """Create tables if they do not already exist.""" + try: + Base.metadata.create_all(engine) + return True + except Exception as e: + print(f"Error while initializing labels database: {e}") + return False diff --git a/wikidatasearch/services/logger/gradio_utils.py b/wikidatasearch/services/logger/gradio_utils.py new file mode 100644 index 0000000..ef90c42 --- /dev/null +++ b/wikidatasearch/services/logger/gradio_utils.py @@ -0,0 +1,132 @@ +"""Utility functions for Gradio analytics interface.""" + +from typing import Literal + +import pandas as pd +import plotly.express as px + +from .analytics_queries import AnalyticsQueryService + +Period = Literal["Hour", "Day", "Week", "Month"] +GroupBy = Literal["None", "route", "user_agent", "status", "rerank", "lang", "client"] +PERIOD_FREQ = {"Hour": "H", "Day": "D", "Week": "W", "Month": "M"} + + +def aggregate_requests(df: pd.DataFrame, period: Period, group_by: GroupBy) -> pd.DataFrame: + """Aggregate requests by time bucket and optional grouping dimension.""" + if df.empty: + return df + freq = PERIOD_FREQ[period] + df = df.copy() + + if group_by == "client": + if "on_browser" in df.columns: + is_browser = df["on_browser"].fillna(False).astype(bool) + else: + ua = df.get("user_agent", pd.Series(index=df.index, dtype=object)).fillna("") + is_browser = ua.str.contains("mozilla", case=False, na=False) + df["client"] = is_browser.map({True: "browser", False: "api"}) + + df = df.set_index("timestamp") + + if group_by == "None": + out = df.groupby(pd.Grouper(freq=freq)).size().reset_index(name="requests") + else: + out = df.groupby([pd.Grouper(freq=freq), group_by]).size().reset_index(name="requests") + + return out.rename(columns={"timestamp": "bucket"}) + + +def empty_ts(group_by: GroupBy): + """Build an empty time-series chart placeholder for no-data scenarios.""" + if group_by == "None": + base = pd.DataFrame({"bucket": [], "requests": []}) + return px.line(base, x="bucket", y="requests", title="No data", markers=True) + base = pd.DataFrame({"bucket": [], "requests": [], group_by: []}) + return px.line(base, x="bucket", y="requests", color=group_by, title="No data", markers=True) + + +def empty_bar(group_by: GroupBy): + """Build an empty bar chart placeholder for no-data scenarios.""" + if group_by == "None": + base = pd.DataFrame({"category": [], "requests": []}) + return px.bar(base, x="category", y="requests", title="No data") + base = pd.DataFrame({group_by: [], "requests": []}) + return px.bar(base, x=group_by, y="requests", title="No data") + + +def make_charts(agg: pd.DataFrame, group_by: GroupBy): + """Generate timeseries and totals charts from aggregated request data.""" + if agg.empty: + return empty_ts(group_by), empty_bar(group_by), pd.DataFrame() + + if group_by == "None": + fig_ts = px.line(agg, x="bucket", y="requests", markers=True, title="Requests over time") + totals = agg[["requests"]].sum().to_frame(name="requests") + totals["category"] = "All" + totals = totals[["category", "requests"]] + fig_bar = px.bar(totals, x="category", y="requests", title="Total requests") + else: + fig_ts = px.line( + agg, + x="bucket", + y="requests", + color=group_by, + markers=True, + title=f"Requests over time by {group_by}", + ) + totals = agg.groupby(group_by)["requests"].sum().sort_values(ascending=False).reset_index() + fig_bar = px.bar(totals, x=group_by, y="requests", title=f"Requests by {group_by}") + return fig_ts, fig_bar, totals + + +def summarize_totals_table(totals: pd.DataFrame) -> str: + """Generate a summary string for the totals table.""" + rows = len(totals.index) + if rows == 0 or "requests" not in totals.columns: + return "Rows: 0 | Sum requests: 0" + + requests_sum = int(pd.to_numeric(totals["requests"], errors="coerce").fillna(0).sum()) + return f"Rows: {rows} | Sum requests: {requests_sum}" + + +def run_query( + start, + end, + period_v, + group_by_v, + routes, + statuses, + ua_include, + client_filter, + rerank_filter, + langs_filter, +): + """Execute analytics query pipeline and return chart-ready outputs.""" + start, end = AnalyticsQueryService.normalize_dt_interval(start, end) + + normalized_routes = routes or [] + normalized_statuses = [int(x) for x in statuses] if statuses else [] + normalized_ua = ua_include or "" + normalized_client = client_filter or "all" + normalized_rerank = rerank_filter or "any" + normalized_langs = langs_filter or [] + normalized_group_by = group_by_v + + df = AnalyticsQueryService.query_graph_requests( + start=start, + end=end, + routes=normalized_routes, + statuses=normalized_statuses, + ua_include=normalized_ua, + client_filter=normalized_client, + rerank_filter=normalized_rerank, + langs_filter=normalized_langs, + group_by=normalized_group_by, + ) + + agg = aggregate_requests(df, period_v, normalized_group_by) + fig_ts, fig_bar, totals = make_charts(agg, normalized_group_by) + + totals_summary = summarize_totals_table(totals) + return fig_ts, fig_bar, totals, totals_summary