Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and

## Unreleased

- Nothing yet.
- {pull}`766` moves runtime profiling persistence from SQLite to a JSON snapshot plus
append-only journal in `.pytask/`, keeping runtime data resilient to crashes and
compacted on normal build exits.

## 0.5.8 - 2025-12-30

Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ test-cov *FLAGS:

# Run type checking
typing:
uv run --group typing --group test --isolated ty check src/ tests/
uv run --group typing --group test --isolated ty check

# Run linting
lint:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"attrs>=21.3.0",
"click>=8.1.8,!=8.2.0",
"click-default-group>=1.2.4",
"msgspec>=0.18.6",
"networkx>=2.4.0",
"optree>=0.9.0",
"packaging>=23.0.0",
Expand Down Expand Up @@ -178,6 +179,7 @@ include = [
unused-ignore-comment = "ignore"

[tool.ty.src]
include = ["src", "tests"]
exclude = ["src/_pytask/_hashlib.py"]

[tool.ty.terminal]
Expand Down
55 changes: 55 additions & 0 deletions src/_pytask/journal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Helpers for append-only JSONL journals."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import Generic
from typing import TypeVar

import msgspec

if TYPE_CHECKING:
from pathlib import Path

T = TypeVar("T")


@dataclass(frozen=True)
class JsonlJournal(Generic[T]):
"""Append-only JSONL journal with best-effort recovery."""

path: Path
type_: type[T]

def append(self, payload: msgspec.Struct) -> None:
"""Append a JSON line to the journal."""
with self.path.open("ab") as journal_file:
journal_file.write(msgspec.json.encode(payload) + b"\n")

def read(self) -> list[T]:
"""Read entries, keeping valid entries on decode errors."""
if not self.path.exists():
return []

entries: list[T] = []
data = self.path.read_bytes()
offset = 0
for line in data.splitlines(keepends=True):
stripped = line.strip()
if not stripped:
offset += len(line)
continue
try:
entries.append(msgspec.json.decode(stripped, type=self.type_))
except msgspec.DecodeError:
with self.path.open("rb+") as journal_file:
journal_file.truncate(offset)
return entries
offset += len(line)
return entries

def delete(self) -> None:
"""Delete the journal if it exists."""
if self.path.exists():
self.path.unlink()
100 changes: 45 additions & 55 deletions src/_pytask/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@
import sys
import time
from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import Any

import click
from rich.table import Table
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column

from _pytask.click import ColoredCommand
from _pytask.click import EnumChoice
from _pytask.console import console
from _pytask.console import format_task_name
from _pytask.dag import create_dag
from _pytask.database_utils import BaseTable
from _pytask.database_utils import DatabaseSession
from _pytask.exceptions import CollectionError
from _pytask.exceptions import ConfigurationError
from _pytask.node_protocols import PPathNode
Expand All @@ -31,6 +28,7 @@
from _pytask.outcomes import TaskOutcome
from _pytask.pluginmanager import hookimpl
from _pytask.pluginmanager import storage
from _pytask.runtime_store import RuntimeState
from _pytask.session import Session
from _pytask.traceback import Traceback

Expand All @@ -48,16 +46,6 @@ class _ExportFormats(enum.Enum):
CSV = "csv"


class Runtime(BaseTable):
"""Record of runtimes of tasks."""

__tablename__ = "runtime"

task: Mapped[str] = mapped_column(primary_key=True)
date: Mapped[float]
duration: Mapped[float]


@hookimpl(tryfirst=True)
def pytask_extend_command_line_interface(cli: click.Group) -> None:
"""Extend the command line interface."""
Expand All @@ -67,8 +55,10 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
@hookimpl
def pytask_post_parse(config: dict[str, Any]) -> None:
"""Register the export option."""
runtime_state = RuntimeState.from_root(config["root"])
config["pm"].register(ProfilePlugin(runtime_state))
config["pm"].register(DurationNameSpace(runtime_state))
config["pm"].register(ExportNameSpace)
config["pm"].register(DurationNameSpace)
config["pm"].register(FileSizeNameSpace)


Expand All @@ -82,27 +72,50 @@ def pytask_execute_task(task: PTask) -> Generator[None, None, None]:
return result


@hookimpl
def pytask_execute_task_process_report(report: ExecutionReport) -> None:
"""Store runtime of successfully finishing tasks in database."""
task = report.task
duration = task.attributes.get("duration")
if report.outcome == TaskOutcome.SUCCESS and duration is not None:
_create_or_update_runtime(task.signature, *duration)
@dataclass
class ProfilePlugin:
"""Collect and persist runtime profiling data."""

runtime_state: RuntimeState

@hookimpl
def pytask_execute_task_process_report(
self, session: Session, report: ExecutionReport
) -> None:
"""Store runtime of successfully finishing tasks."""
_ = session
task = report.task
duration = task.attributes.get("duration")
if report.outcome == TaskOutcome.SUCCESS and duration is not None:
self.runtime_state.update_task(task, *duration)

@hookimpl
def pytask_unconfigure(self, session: Session) -> None:
"""Flush runtime information on normal build exits."""
if session.config.get("command") != "build":
return
if session.config.get("dry_run") or session.config.get("explain"):
return
self.runtime_state.flush()

def _create_or_update_runtime(task_signature: str, start: float, end: float) -> None:
"""Create or update a runtime entry."""
with DatabaseSession() as session:
runtime = session.get(Runtime, task_signature)

if not runtime:
session.add(Runtime(task=task_signature, date=start, duration=end - start))
else:
for attr, val in (("date", start), ("duration", end - start)):
setattr(runtime, attr, val)
@dataclass
class DurationNameSpace:
"""A namespace for adding durations to the profile."""

def __init__(self, runtime_state: RuntimeState) -> None:
self.runtime_state = runtime_state

session.commit()
@hookimpl
def pytask_profile_add_info_on_task(
self, session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]]
) -> None:
"""Add the runtime for tasks to the profile."""
_ = session
for task in tasks:
duration = self.runtime_state.get_duration(task)
if duration is not None:
profile[task.name]["Duration (in s)"] = round(duration, 2)


@click.command(cls=ColoredCommand)
Expand Down Expand Up @@ -183,29 +196,6 @@ def _print_profile_table(
console.print("No information is stored on the collected tasks.")


class DurationNameSpace:
"""A namespace for adding durations to the profile."""

@staticmethod
@hookimpl
def pytask_profile_add_info_on_task(
tasks: list[PTask], profile: dict[str, dict[str, Any]]
) -> None:
"""Add the runtime for tasks to the profile."""
runtimes = _collect_runtimes(tasks)
for name, duration in runtimes.items():
profile[name]["Duration (in s)"] = round(duration, 2)


def _collect_runtimes(tasks: list[PTask]) -> dict[str, float]:
"""Collect runtimes."""
with DatabaseSession() as session:
runtimes = [session.get(Runtime, task.signature) for task in tasks]
return {
task.name: r.duration for task, r in zip(tasks, runtimes, strict=False) if r
}


class FileSizeNameSpace:
"""A namespace for adding the total file size of products to a task."""

Expand Down
138 changes: 138 additions & 0 deletions src/_pytask/runtime_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Runtime storage with an append-only journal."""

from __future__ import annotations

from dataclasses import dataclass
from dataclasses import field
from typing import TYPE_CHECKING

import msgspec

from _pytask.journal import JsonlJournal

if TYPE_CHECKING:
from pathlib import Path

from _pytask.node_protocols import PTask


class _RuntimeEntry(msgspec.Struct):
id: str
date: float
duration: float


class _RuntimeFile(msgspec.Struct, forbid_unknown_fields=False):
task: list[_RuntimeEntry] = msgspec.field(default_factory=list)


class _RuntimeJournalEntry(msgspec.Struct, forbid_unknown_fields=False):
id: str
date: float
duration: float


def _read_runtimes(path: Path) -> _RuntimeFile | None:
if not path.exists():
return None
try:
data = msgspec.json.decode(path.read_bytes(), type=_RuntimeFile)
except msgspec.DecodeError:
path.unlink()
return None
return data


def _write_runtimes(path: Path, runtimes: _RuntimeFile) -> None:
data = msgspec.json.encode(runtimes)
tmp = path.with_suffix(f"{path.suffix}.tmp")
tmp.write_bytes(data)
tmp.replace(path)


def _read_journal(
journal: JsonlJournal[_RuntimeJournalEntry],
) -> list[_RuntimeJournalEntry]:
return journal.read()


def _apply_journal(
runtimes: _RuntimeFile, entries: list[_RuntimeJournalEntry]
) -> _RuntimeFile:
if not entries:
return runtimes
index = {entry.id: entry for entry in runtimes.task}
for entry in entries:
index[entry.id] = _RuntimeEntry(
id=entry.id, date=entry.date, duration=entry.duration
)
return _RuntimeFile(
task=list(index.values()),
)


@dataclass
class RuntimeState:
path: Path
runtimes: _RuntimeFile
journal: JsonlJournal[_RuntimeJournalEntry]
_index: dict[str, _RuntimeEntry] = field(init=False, default_factory=dict)
_dirty: bool = field(init=False, default=False)

def __post_init__(self) -> None:
self._rebuild_index()

@classmethod
def from_root(cls, root: Path) -> RuntimeState:
path = root / ".pytask" / "runtimes.json"
journal = JsonlJournal(
path=path.with_suffix(".journal"), type_=_RuntimeJournalEntry
)
existing = _read_runtimes(path)
journal_entries = _read_journal(journal)
if existing is None:
runtimes = _RuntimeFile(
task=[],
)
runtimes = _apply_journal(runtimes, journal_entries)
state = cls(path=path, runtimes=runtimes, journal=journal)
else:
runtimes = _apply_journal(existing, journal_entries)
state = cls(path=path, runtimes=runtimes, journal=journal)

if journal_entries:
state._dirty = True
return state

def _rebuild_index(self) -> None:
self._index = {entry.id: entry for entry in self.runtimes.task}

def update_task(self, task: PTask, start: float, end: float) -> None:
task_id = task.name
entry = _RuntimeEntry(id=task_id, date=start, duration=end - start)
self._index[entry.id] = entry
self.runtimes = _RuntimeFile(
task=list(self._index.values()),
)
self._rebuild_index()
journal_entry = _RuntimeJournalEntry(
id=entry.id,
date=entry.date,
duration=entry.duration,
)
self.journal.append(journal_entry)
self._dirty = True

def get_duration(self, task: PTask) -> float | None:
task_id = task.name
entry = self._index.get(task_id)
if entry is None:
return None
return entry.duration

def flush(self) -> None:
if not self._dirty:
return
_write_runtimes(self.path, self.runtimes)
self.journal.delete()
self._dirty = False
Loading