Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
46bc255
Implement a pending status for tasks.
tobiasraabe May 4, 2024
7131a1f
Leftover commit.
tobiasraabe May 7, 2024
b73ed65
Add correct pending status for cf and loky.
tobiasraabe May 7, 2024
90e05a2
Fix tests.
tobiasraabe May 7, 2024
1e2006c
FIx.
tobiasraabe May 7, 2024
89ff770
fix.
tobiasraabe May 7, 2024
ba20e22
Fix types.
tobiasraabe May 7, 2024
53097ce
Merge branch 'main' into pending-status
tobiasraabe May 20, 2024
fde5668
Merge branch 'main' into pending-status
tobiasraabe Jul 27, 2025
1e2c9d2
Finalize pending status.
tobiasraabe Jul 27, 2025
a14289d
Install pytask form github.
tobiasraabe Jul 27, 2025
970b94c
Merge main into pending-status
tobiasraabe Jan 31, 2026
79c1137
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2026
b993e76
Switch pending-status tracking to a start queue
tobiasraabe Jan 31, 2026
3aa12ff
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2026
36b0749
Use a simple status queue for threads
tobiasraabe Jan 31, 2026
6183c1b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2026
a9439b9
Drop pytask git override now in release
tobiasraabe Jan 31, 2026
a07a356
Keep task submission count tied to worker availability
tobiasraabe Jan 31, 2026
ae30df7
Add demo script for pending/running status
tobiasraabe Jan 31, 2026
a29427b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2026
42de1cc
Add debug status logs for demo
tobiasraabe Jan 31, 2026
f8c3fab
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2026
6812a12
Prefetch tasks for pending status
tobiasraabe Jan 31, 2026
8fd6d9c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2026
f3abd8a
Fix pre-commit for demo script
tobiasraabe Jan 31, 2026
0fcd62d
Remove demo scripts and fix priority queue
tobiasraabe Jan 31, 2026
74748f2
Pin Sphinx below 9 for docs
tobiasraabe Jan 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
or processes automatically.
- {pull}`96` handles local paths with remote executors. `PathNode`s are not supported as
dependencies or products (except for return annotations).
- {pull}`99` changes that all tasks that are ready are being scheduled. It improves
interactions with adaptive scaling. {issue}`98` does handle the resulting issues: no
strong adherence to priorities, no pending status.
- {pull}`99` changes that all ready tasks are being scheduled. It improves interactions
with adaptive scaling. {issue}`98` does handle the resulting issues: no strong
adherence to priorities, no pending status.
- {pull}`100` adds project management with rye.
- {pull}`101` adds syncing for local paths as dependencies or products in remote
environments with the same OS.
- {pull}`102` implements a pending status for scheduled but not started tasks.
- {pull}`106` fixes {pull}`99` such that only when there are coiled functions, all ready
tasks are submitted.
- {pull}`107` removes status from `pytask_execute_task_log_start` hook call.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/coiled.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# coiled

```{caution}
Currently, the coiled backend can only be used if your workflow code is organized in a
Currently, the coiled backend can only be used if your workflow code is organized as a
package due to how pytask imports your code and dask serializes task functions
([issue](https://github.com/dask/distributed/issues/8607)).
```
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ docs = [
"matplotlib",
"myst-parser",
"nbsphinx",
"sphinx",
"sphinx<9",
"sphinx-autobuild",
"sphinx-click",
"sphinx-copybutton",
Expand Down
223 changes: 191 additions & 32 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

from __future__ import annotations

import os
import queue
import sys
import time
from collections import deque
from contextlib import ExitStack
from multiprocessing import Manager
from typing import TYPE_CHECKING
from typing import Any
from typing import cast
Expand All @@ -17,13 +22,15 @@
from pytask import PTask
from pytask import PythonNode
from pytask import Session
from pytask import TaskExecutionStatus
from pytask import console
from pytask import get_marks
from pytask import hookimpl
from pytask.tree_util import PyTree
from pytask.tree_util import tree_map
from pytask.tree_util import tree_structure

from pytask_parallel.backends import ParallelBackend
from pytask_parallel.backends import WorkerType
from pytask_parallel.backends import registry
from pytask_parallel.typing import CarryOverPath
Expand All @@ -33,7 +40,9 @@
from pytask_parallel.utils import parse_future_result

if TYPE_CHECKING:
from collections.abc import Callable
from concurrent.futures import Future
from multiprocessing.managers import SyncManager

from pytask_parallel.wrappers import WrapperResult

Expand All @@ -53,33 +62,81 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
__tracebackhide__ = True
reports = session.execution_reports
running_tasks: dict[str, Future[Any]] = {}
running_try_last: set[str] = set()
queued_try_first_tasks: deque[str] = deque()
queued_tasks: deque[str] = deque()
queued_try_last_tasks: deque[str] = deque()
sleeper = _Sleeper()
debug_status = _is_debug_status_enabled()

# Create a shared queue to differentiate between running and pending tasks for
# some parallel backends.
if session.config["parallel_backend"] in (
ParallelBackend.PROCESSES,
ParallelBackend.LOKY,
):
manager_cls: Callable[[], SyncManager] | type[ExitStack] = Manager
start_execution_state = TaskExecutionStatus.PENDING
status_queue_factory = "manager"
elif session.config["parallel_backend"] == ParallelBackend.THREADS:
manager_cls = ExitStack
start_execution_state = TaskExecutionStatus.PENDING
status_queue_factory = "simple"
else:
manager_cls = ExitStack
start_execution_state = TaskExecutionStatus.RUNNING
status_queue_factory = None

# Get the live execution manager from the registry if it exists.
live_execution = session.config["pm"].get_plugin("live_execution")
any_coiled_task = any(is_coiled_function(task) for task in session.tasks)

# The executor can only be created after the collection to give users the
# possibility to inject their own executors.
session.config["_parallel_executor"] = registry.get_parallel_backend(
session.config["parallel_backend"], n_workers=session.config["n_workers"]
)
with session.config["_parallel_executor"], manager_cls() as manager:
if status_queue_factory == "manager":
session.config["_status_queue"] = manager.Queue() # type: ignore[union-attr]
elif status_queue_factory == "simple":
session.config["_status_queue"] = queue.SimpleQueue()

with session.config["_parallel_executor"]:
sleeper = _Sleeper()
if live_execution:
live_execution.initial_status = start_execution_state

i = 0
prefetch_factor = (
2
if session.config["parallel_backend"]
in (
ParallelBackend.PROCESSES,
ParallelBackend.LOKY,
ParallelBackend.THREADS,
)
else 1
)
use_prefetch_queue = prefetch_factor > 1
while session.scheduler.is_active():
try:
newly_collected_reports = []
did_enqueue = False
did_submit = False

# If there is any coiled function, the user probably wants to exploit
# adaptive scaling. Thus, we need to submit all ready tasks.
# Unfortunately, all submitted tasks are shown as running although some
# are pending.
#
# Without coiled functions, we submit as many tasks as there are
# available workers since we cannot reliably detect a pending status.
#
# See #98 for more information.
if any_coiled_task:
n_new_tasks = 10_000
elif use_prefetch_queue:
n_new_tasks = (session.config["n_workers"] * prefetch_factor) - (
len(running_tasks)
+ len(queued_try_first_tasks)
+ len(queued_tasks)
+ len(queued_try_last_tasks)
)
else:
n_new_tasks = session.config["n_workers"] - len(running_tasks)

Expand All @@ -89,31 +146,96 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
else []
)

for task_name in ready_tasks:
task = session.dag.nodes[task_name]["task"]
session.hook.pytask_execute_task_log_start(
session=session, task=task
)
try:
session.hook.pytask_execute_task_setup(
session=session, task=task
if use_prefetch_queue:
for task_signature in ready_tasks:
task = session.dag.nodes[task_signature]["task"]
if debug_status:
_log_status("PENDING", task_signature)
session.hook.pytask_execute_task_log_start(
session=session,
task=task,
status=start_execution_state,
)
running_tasks[task_name] = session.hook.pytask_execute_task(
session=session, task=task
if get_marks(task, "try_first"):
queued_try_first_tasks.append(task_signature)
elif get_marks(task, "try_last"):
queued_try_last_tasks.append(task_signature)
else:
queued_tasks.append(task_signature)
did_enqueue = True

def _can_run_try_last() -> bool:
return not (
queued_try_first_tasks
or queued_tasks
or (len(running_tasks) > len(running_try_last))
)
sleeper.reset()
except Exception: # noqa: BLE001
report = ExecutionReport.from_task_and_exception(
task, sys.exc_info()

while len(running_tasks) < session.config["n_workers"]:
if queued_try_first_tasks:
task_signature = queued_try_first_tasks.popleft()
elif queued_tasks:
task_signature = queued_tasks.popleft()
elif queued_try_last_tasks and _can_run_try_last():
task_signature = queued_try_last_tasks.popleft()
else:
break
task = session.dag.nodes[task_signature]["task"]
try:
session.hook.pytask_execute_task_setup(
session=session, task=task
)
running_tasks[task_signature] = (
session.hook.pytask_execute_task(
session=session, task=task
)
)
if get_marks(task, "try_last"):
running_try_last.add(task_signature)
sleeper.reset()
did_submit = True
except Exception: # noqa: BLE001
report = ExecutionReport.from_task_and_exception(
task, sys.exc_info()
)
newly_collected_reports.append(report)
session.scheduler.done(task_signature)
else:
for task_signature in ready_tasks:
task = session.dag.nodes[task_signature]["task"]
if debug_status:
_log_status(
"PENDING"
if start_execution_state == TaskExecutionStatus.PENDING
else "RUNNING",
task_signature,
)
session.hook.pytask_execute_task_log_start(
session=session, task=task, status=start_execution_state
)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
try:
session.hook.pytask_execute_task_setup(
session=session, task=task
)
running_tasks[task_signature] = (
session.hook.pytask_execute_task(
session=session, task=task
)
)
sleeper.reset()
did_submit = True
except Exception: # noqa: BLE001
report = ExecutionReport.from_task_and_exception(
task, sys.exc_info()
)
newly_collected_reports.append(report)
session.scheduler.done(task_signature)

if not ready_tasks:
if not ready_tasks and not did_enqueue and not did_submit:
sleeper.increment()

for task_name in list(running_tasks):
future = running_tasks[task_name]
for task_signature in list(running_tasks):
future = running_tasks[task_signature]

if future.done():
wrapper_result = parse_future_result(future)
Expand All @@ -129,17 +251,18 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
)

if wrapper_result.exc_info is not None:
task = session.dag.nodes[task_name]["task"]
task = session.dag.nodes[task_signature]["task"]
newly_collected_reports.append(
ExecutionReport.from_task_and_exception(
task,
wrapper_result.exc_info, # type: ignore[arg-type]
)
)
running_tasks.pop(task_name)
session.scheduler.done(task_name)
running_tasks.pop(task_signature)
running_try_last.discard(task_signature)
session.scheduler.done(task_signature)
else:
task = session.dag.nodes[task_name]["task"]
task = session.dag.nodes[task_signature]["task"]
_update_carry_over_products(
task, wrapper_result.carry_over_products
)
Expand All @@ -155,9 +278,29 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
else:
report = ExecutionReport.from_task(task)

running_tasks.pop(task_name)
running_tasks.pop(task_signature)
running_try_last.discard(task_signature)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
session.scheduler.done(task_signature)

# Check if tasks are not pending but running and update the live
# status.
if (
live_execution or debug_status
) and "_status_queue" in session.config:
status_queue = session.config["_status_queue"]
while True:
try:
started_task = status_queue.get(block=False)
except queue.Empty:
break
if started_task in running_tasks:
if live_execution:
live_execution.update_task(
started_task, status=TaskExecutionStatus.RUNNING
)
if debug_status:
_log_status("RUNNING", started_task)

for report in newly_collected_reports:
session.hook.pytask_execute_task_process_report(
Expand Down Expand Up @@ -239,6 +382,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
kwargs=kwargs,
remote=remote,
session_filterwarnings=session.config["filterwarnings"],
status_queue=session.config.get("_status_queue"),
show_locals=session.config["show_locals"],
task_filterwarnings=get_marks(task, "filterwarnings"),
)
Expand All @@ -248,7 +392,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
from pytask_parallel.wrappers import wrap_task_in_thread # noqa: PLC0415

return session.config["_parallel_executor"].submit(
wrap_task_in_thread, task=task, remote=False, **kwargs
wrap_task_in_thread,
task=task,
remote=False,
status_queue=session.config.get("_status_queue"),
**kwargs,
)

msg = f"Unknown worker type {worker_type}"
Expand All @@ -261,6 +409,17 @@ def pytask_unconfigure() -> None:
registry.reset()


def _is_debug_status_enabled() -> bool:
"""Return whether to emit debug status updates."""
value = os.environ.get("PYTASK_PARALLEL_DEBUG_STATUS", "")
return value.strip().lower() in {"1", "true", "yes", "on"}


def _log_status(status: str, task_signature: str) -> None:
"""Log a status transition for a task."""
console.print(f"[pytask-parallel] {status}: {task_signature}")


def _update_carry_over_products(
task: PTask, carry_over_products: PyTree[CarryOverPath | PythonNode | None] | None
) -> None:
Expand Down
Loading