From 46bc25577fd5d6737417b71ee706fbfc22b8ceb6 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 4 May 2024 18:36:26 +0200 Subject: [PATCH 01/25] Implement a pending status for tasks. --- docs/source/changes.md | 7 ++++--- docs/source/coiled.md | 2 +- src/pytask_parallel/execute.py | 37 +++++++++++++++++++++------------- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/docs/source/changes.md b/docs/source/changes.md index bcaa1a6..0c2da68 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -19,12 +19,13 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and or processes automatically. - {pull}`96` handles local paths with remote executors. `PathNode`s are not supported as dependencies or products (except for return annotations). -- {pull}`99` changes that all tasks that are ready are being scheduled. It improves - interactions with adaptive scaling. {issue}`98` does handle the resulting issues: no - strong adherence to priorities, no pending status. +- {pull}`99` changes that all ready tasks are being scheduled. It improves interactions + with adaptive scaling. {issue}`98` does handle the resulting issues: no strong + adherence to priorities, no pending status. - {pull}`100` adds project management with rye. - {pull}`101` adds syncing for local paths as dependencies or products in remote environments with the same OS. +- {pull}`102` implements a pending status for scheduled but not started tasks. ## 0.4.1 - 2024-01-12 diff --git a/docs/source/coiled.md b/docs/source/coiled.md index 5463f7b..f53912e 100644 --- a/docs/source/coiled.md +++ b/docs/source/coiled.md @@ -1,7 +1,7 @@ # coiled ```{caution} -Currently, the coiled backend can only be used if your workflow code is organized in a +Currently, the coiled backend can only be used if your workflow code is organized as a package due to how pytask imports your code and dask serializes task functions ([issue](https://github.com/dask/distributed/issues/8607)). ``` diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1ee0d71..afb9572 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -16,6 +16,7 @@ from pytask import PTask from pytask import PythonNode from pytask import Session +from pytask import TaskExecutionStatus from pytask import console from pytask import get_marks from pytask import hookimpl @@ -53,6 +54,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + # Get the live execution manager from the registry if it exists. + live_execution = session.config["pm"].get_plugin("live_execution") + # The executor can only be created after the collection to give users the # possibility to inject their own executors. session.config["_parallel_executor"] = registry.get_parallel_backend( @@ -68,17 +72,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports = [] ready_tasks = list(session.scheduler.get_ready(10_000)) - for task_name in ready_tasks: - task = session.dag.nodes[task_name]["task"] + for task_signature in ready_tasks: + task = session.dag.nodes[task_signature]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task + session=session, task=task, status=TaskExecutionStatus.PENDING ) try: session.hook.pytask_execute_task_setup( session=session, task=task ) - running_tasks[task_name] = session.hook.pytask_execute_task( - session=session, task=task + running_tasks[task_signature] = ( + session.hook.pytask_execute_task(session=session, task=task) ) sleeper.reset() except Exception: # noqa: BLE001 @@ -86,13 +90,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 task, sys.exc_info() ) newly_collected_reports.append(report) - session.scheduler.done(task_name) + session.scheduler.done(task_signature) if not ready_tasks: sleeper.increment() - for task_name in list(running_tasks): - future = running_tasks[task_name] + for task_signature in list(running_tasks): + future = running_tasks[task_signature] if future.done(): wrapper_result = parse_future_result(future) @@ -108,17 +112,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) if wrapper_result.exc_info is not None: - task = session.dag.nodes[task_name]["task"] + task = session.dag.nodes[task_signature]["task"] newly_collected_reports.append( ExecutionReport.from_task_and_exception( task, wrapper_result.exc_info, # type: ignore[arg-type] ) ) - running_tasks.pop(task_name) - session.scheduler.done(task_name) + running_tasks.pop(task_signature) + session.scheduler.done(task_signature) else: - task = session.dag.nodes[task_name]["task"] + task = session.dag.nodes[task_signature]["task"] _update_carry_over_products( task, wrapper_result.carry_over_products ) @@ -134,9 +138,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 else: report = ExecutionReport.from_task(task) - running_tasks.pop(task_name) + running_tasks.pop(task_signature) newly_collected_reports.append(report) - session.scheduler.done(task_name) + session.scheduler.done(task_signature) + + elif live_execution and future.running(): + live_execution.update_task( + task_signature, status=TaskExecutionStatus.RUNNING + ) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( From 7131a1f8c44f0449e46fe9d7048b2089c3590aa4 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:00:42 +0200 Subject: [PATCH 02/25] Leftover commit. --- src/pytask_parallel/execute.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index afb9572..292aae2 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -62,7 +62,6 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["_parallel_executor"] = registry.get_parallel_backend( session.config["parallel_backend"], n_workers=session.config["n_workers"] ) - with session.config["_parallel_executor"]: sleeper = _Sleeper() @@ -142,10 +141,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - elif live_execution and future.running(): - live_execution.update_task( - task_signature, status=TaskExecutionStatus.RUNNING + elif not future.done(): + pass + elif live_execution: + status = _get_status_from_undone_task( + task_signature, future, session.config["_parallel_executor"] ) + if status == TaskExecutionStatus.RUNNING: + live_execution.update_task(task_signature, status=status) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -299,3 +302,15 @@ def increment(self) -> None: def sleep(self) -> None: time.sleep(self.timings[self.timing_idx]) + + +def _get_status_from_undone_task( + task_signature: str, future: Future, executor: Any +) -> TaskExecutionStatus: + """Get the status of a task that is undone.""" + if hasattr(future, "_state"): + status = future._state + if status == "RUNNING": + breakpoint() + return TaskExecutionStatus.RUNNING + return TaskExecutionStatus.PENDING From b73ed65382710c719575296571be2d345ca4fa47 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:32:45 +0200 Subject: [PATCH 03/25] Add correct pending status for cf and loky. --- src/pytask_parallel/execute.py | 44 ++++++++++++++++----------------- src/pytask_parallel/wrappers.py | 23 ++++++++++++++++- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 292aae2..e471c5c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,6 +2,7 @@ from __future__ import annotations +import multiprocessing import sys import time from typing import TYPE_CHECKING @@ -24,6 +25,7 @@ from pytask.tree_util import tree_map from pytask.tree_util import tree_structure +from pytask_parallel.backends import ParallelBackend from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry from pytask_parallel.typing import CarryOverPath @@ -53,6 +55,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 __tracebackhide__ = True reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + sleeper = _Sleeper() # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -63,7 +66,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"]: - sleeper = _Sleeper() + # Create a shared memory object to differentiate between running and pending + # tasks. + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = multiprocessing.Manager().dict() i = 0 while session.scheduler.is_active(): @@ -141,14 +151,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - elif not future.done(): - pass - elif live_execution: - status = _get_status_from_undone_task( - task_signature, future, session.config["_parallel_executor"] - ) - if status == TaskExecutionStatus.RUNNING: - live_execution.update_task(task_signature, status=status) + elif live_execution and "_shared_memory" in session.config: + if task_signature in session.config["_shared_memory"]: + live_execution.update_task( + task_signature, status=TaskExecutionStatus.RUNNING + ) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -228,6 +235,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], + shared_memory=session.config["_shared_memory"], show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -236,7 +244,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: from pytask_parallel.wrappers import wrap_task_in_thread return session.config["_parallel_executor"].submit( - wrap_task_in_thread, task=task, remote=False, **kwargs + wrap_task_in_thread, + task=task, + remote=False, + shared_memory=session.config["_shared_memory"], + **kwargs, ) msg = f"Unknown worker type {worker_type}" raise ValueError(msg) @@ -302,15 +314,3 @@ def increment(self) -> None: def sleep(self) -> None: time.sleep(self.timings[self.timing_idx]) - - -def _get_status_from_undone_task( - task_signature: str, future: Future, executor: Any -) -> TaskExecutionStatus: - """Get the status of a task that is undone.""" - if hasattr(future, "_state"): - status = future._state - if status == "RUNNING": - breakpoint() - return TaskExecutionStatus.RUNNING - return TaskExecutionStatus.PENDING diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 6157873..ad8c2ed 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -55,7 +55,9 @@ class WrapperResult: stderr: str -def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: +def wrap_task_in_thread( + task: PTask, *, remote: bool, shared_memory: dict[str, bool] | None, **kwargs: Any +) -> WrapperResult: """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -64,6 +66,11 @@ def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperR """ __tracebackhide__ = True + + # Add task to shared memory to indicate that it is currently being executed. + if shared_memory is not None: + shared_memory[task.signature] = True + try: out = task.function(**kwargs) except Exception: # noqa: BLE001 @@ -71,6 +78,11 @@ def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperR else: _handle_function_products(task, out, remote=remote) exc_info = None # type: ignore[assignment] + + # Remove task from shared memory to indicate that it is no longer being executed. + if shared_memory is not None: + shared_memory.pop(task.signature) + return WrapperResult( carry_over_products=None, # type: ignore[arg-type] warning_reports=[], @@ -87,6 +99,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], + shared_memory: dict[str, bool] | None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: @@ -99,6 +112,10 @@ def wrap_task_in_process( # noqa: PLR0913 # Hide this function from tracebacks. __tracebackhide__ = True + # Add task to shared memory to indicate that it is currently being executed. + if shared_memory is not None: + shared_memory[task.signature] = True + # Patch set_trace and breakpoint to show a better error message. _patch_set_trace_and_breakpoint() @@ -156,6 +173,10 @@ def wrap_task_in_process( # noqa: PLR0913 captured_stdout_buffer.close() captured_stderr_buffer.close() + # Remove task from shared memory to indicate that it is no longer being executed. + if shared_memory is not None: + shared_memory.pop(task.signature) + return WrapperResult( carry_over_products=products, # type: ignore[arg-type] warning_reports=warning_reports, From 90e05a2d5b1eb67d21f1163f70e357c512f720fe Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:38:05 +0200 Subject: [PATCH 04/25] Fix tests. --- src/pytask_parallel/execute.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e471c5c..363b48c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -235,7 +235,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], - shared_memory=session.config["_shared_memory"], + shared_memory=session.config.get("_shared_memory"), show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -247,7 +247,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: wrap_task_in_thread, task=task, remote=False, - shared_memory=session.config["_shared_memory"], + shared_memory=session.config.get("_shared_memory"), **kwargs, ) msg = f"Unknown worker type {worker_type}" From 1e2006c04c194934cded892c86ea17817544c734 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:47:43 +0200 Subject: [PATCH 05/25] FIx. --- src/pytask_parallel/execute.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 363b48c..c789b16 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -57,6 +57,18 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 running_tasks: dict[str, Future[Any]] = {} sleeper = _Sleeper() + # Create a shared memory object to differentiate between running and pending + # tasks for some parallel backends. + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = multiprocessing.Manager().dict() + start_execution_state = TaskExecutionStatus.PENDING + else: + start_execution_state = TaskExecutionStatus.RUNNING + # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -66,15 +78,6 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"]: - # Create a shared memory object to differentiate between running and pending - # tasks. - if session.config["parallel_backend"] in ( - ParallelBackend.PROCESSES, - ParallelBackend.THREADS, - ParallelBackend.LOKY, - ): - session.config["_shared_memory"] = multiprocessing.Manager().dict() - i = 0 while session.scheduler.is_active(): try: @@ -84,7 +87,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 for task_signature in ready_tasks: task = session.dag.nodes[task_signature]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task, status=TaskExecutionStatus.PENDING + session=session, task=task, status=start_execution_state ) try: session.hook.pytask_execute_task_setup( From 89ff770f05d1889a2a12688a9b1ff6712bfd2065 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 23:02:50 +0200 Subject: [PATCH 06/25] fix. --- src/pytask_parallel/execute.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index c789b16..79ba551 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -5,6 +5,7 @@ import multiprocessing import sys import time +from contextlib import ExitStack from typing import TYPE_CHECKING from typing import Any @@ -64,9 +65,10 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = multiprocessing.Manager().dict() + manager_cls = multiprocessing.Manager start_execution_state = TaskExecutionStatus.PENDING else: + manager_cls = ExitStack start_execution_state = TaskExecutionStatus.RUNNING # Get the live execution manager from the registry if it exists. @@ -77,7 +79,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["_parallel_executor"] = registry.get_parallel_backend( session.config["parallel_backend"], n_workers=session.config["n_workers"] ) - with session.config["_parallel_executor"]: + with session.config["_parallel_executor"], manager_cls() as manager: + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = manager.dict() + i = 0 while session.scheduler.is_active(): try: From ba20e229aa49062a4d59f9e383427ca490dc683f Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 23:24:53 +0200 Subject: [PATCH 07/25] Fix types. --- src/pytask_parallel/execute.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 79ba551..135d31e 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,12 +2,13 @@ from __future__ import annotations -import multiprocessing import sys import time from contextlib import ExitStack +from multiprocessing import Manager from typing import TYPE_CHECKING from typing import Any +from typing import Callable import cloudpickle from _pytask.node_protocols import PPathNode @@ -37,6 +38,7 @@ if TYPE_CHECKING: from concurrent.futures import Future + from multiprocessing.managers import SyncManager from pytask_parallel.wrappers import WrapperResult @@ -65,7 +67,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - manager_cls = multiprocessing.Manager + manager_cls: Callable[[], SyncManager] | type[ExitStack] = Manager start_execution_state = TaskExecutionStatus.PENDING else: manager_cls = ExitStack @@ -85,7 +87,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = manager.dict() + session.config["_shared_memory"] = manager.dict() # type: ignore[union-attr] i = 0 while session.scheduler.is_active(): From 1e2c9d21f32a079479a1929c7b096d56e3ce141d Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 27 Jul 2025 16:22:12 +0200 Subject: [PATCH 08/25] Finalize pending status. --- pyproject.toml | 2 +- src/pytask_parallel/execute.py | 9 ++++++--- src/pytask_parallel/wrappers.py | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1b3452d..8c6d984 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "cloudpickle", "loky", "pluggy>=1.0.0", - "pytask>=0.5.2", + "git+https://github.com/pytask-dev/pytask@allow-setting-task-status", "rich", ] dynamic = ["version"] diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index c38e026..cc406eb 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -100,14 +100,15 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # Unfortunately, all submitted tasks are shown as running although some # are pending. # - # Without coiled functions, we submit as many tasks as there are - # available workers since we cannot reliably detect a pending status. + # For all other backends, at least four more tasks are submitted and + # otherwise 10% more. This is a heuristic to avoid submitting too few + # tasks. # # See #98 for more information. if any_coiled_task: n_new_tasks = 10_000 else: - n_new_tasks = session.config["n_workers"] - len(running_tasks) + n_new_tasks = max(4, int(session.config["n_workers"] * 0.1)) ready_tasks = ( list(session.scheduler.get_ready(n_new_tasks)) @@ -185,6 +186,8 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) + # Check if tasks are not pending but running and update the live + # status. elif live_execution and "_shared_memory" in session.config: if task_signature in session.config["_shared_memory"]: live_execution.update_task( diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index e989b2f..689dba2 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -81,7 +81,7 @@ def wrap_task_in_thread( # Remove task from shared memory to indicate that it is no longer being executed. if shared_memory is not None: - shared_memory.pop(task.signature) + shared_memory.pop(task.signature, None) return WrapperResult( carry_over_products=None, # type: ignore[arg-type] @@ -177,7 +177,7 @@ def wrap_task_in_process( # noqa: PLR0913 # Remove task from shared memory to indicate that it is no longer being executed. if shared_memory is not None: - shared_memory.pop(task.signature) + shared_memory.pop(task.signature, None) return WrapperResult( carry_over_products=products, # type: ignore[arg-type] From a14289d14c1ec8cd79ec5170a91070c3c4d59f6e Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 27 Jul 2025 16:25:15 +0200 Subject: [PATCH 09/25] Install pytask form github. --- pyproject.toml | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8c6d984..4ea9440 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "cloudpickle", "loky", "pluggy>=1.0.0", - "git+https://github.com/pytask-dev/pytask@allow-setting-task-status", + "pytask>=0.5.2", "rich", ] dynamic = ["version"] @@ -45,14 +45,14 @@ test = [ "nbmake", "pytest>=8.4.0", "pytest-cov>=5.0.0", - {include-group = "coiled"}, - {include-group = "dask"}, + { include-group = "coiled" }, + { include-group = "dask" }, ] typing = [ "pytask-parallel", "ty", - {include-group = "coiled"}, - {include-group = "dask"}, + { include-group = "coiled" }, + { include-group = "dask" }, ] [project.readme] @@ -76,6 +76,9 @@ pytask_parallel = "pytask_parallel.plugin" requires = ["hatchling", "hatch_vcs"] build-backend = "hatchling.build" +[tool.uv.source] +pytask = { git = "https://github.com/pytask-dev/pytask", rev = "allow-setting-task-status" } + [tool.hatch.build.hooks.vcs] version-file = "src/pytask_parallel/_version.py" From 79c1137d783ffb534637bb26c6f707f0bfc7ef44 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 19:13:55 +0000 Subject: [PATCH 10/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytask_parallel/execute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 811d87e..e1220b8 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -8,7 +8,6 @@ from multiprocessing import Manager from typing import TYPE_CHECKING from typing import Any -from typing import Callable from typing import cast import cloudpickle @@ -38,6 +37,7 @@ from pytask_parallel.utils import parse_future_result if TYPE_CHECKING: + from collections.abc import Callable from concurrent.futures import Future from multiprocessing.managers import SyncManager From b993e7633752a26fea81e93d695314b522871027 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 20:23:41 +0100 Subject: [PATCH 11/25] Switch pending-status tracking to a start queue --- src/pytask_parallel/execute.py | 27 +++++++++++++++++---------- src/pytask_parallel/wrappers.py | 28 +++++++++++++--------------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e1220b8..4104508 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,6 +2,7 @@ from __future__ import annotations +import queue import sys import time from contextlib import ExitStack @@ -61,8 +62,8 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 running_tasks: dict[str, Future[Any]] = {} sleeper = _Sleeper() - # Create a shared memory object to differentiate between running and pending - # tasks for some parallel backends. + # Create a shared queue to differentiate between running and pending tasks for + # some parallel backends. if session.config["parallel_backend"] in ( ParallelBackend.PROCESSES, ParallelBackend.THREADS, @@ -89,7 +90,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = manager.dict() # type: ignore[union-attr] + session.config["_status_queue"] = manager.Queue() # type: ignore[union-attr] i = 0 while session.scheduler.is_active(): @@ -187,12 +188,18 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - # Check if tasks are not pending but running and update the live - # status. - elif live_execution and "_shared_memory" in session.config: - if task_signature in session.config["_shared_memory"]: + # Check if tasks are not pending but running and update the live + # status. + if live_execution and "_status_queue" in session.config: + status_queue = session.config["_status_queue"] + while True: + try: + started_task = status_queue.get(block=False) + except queue.Empty: + break + if started_task in running_tasks: live_execution.update_task( - task_signature, status=TaskExecutionStatus.RUNNING + started_task, status=TaskExecutionStatus.RUNNING ) for report in newly_collected_reports: @@ -275,7 +282,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], - shared_memory=session.config.get("_shared_memory"), + status_queue=session.config.get("_status_queue"), show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -288,7 +295,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: wrap_task_in_thread, task=task, remote=False, - shared_memory=session.config.get("_shared_memory"), + status_queue=session.config.get("_status_queue"), **kwargs, ) diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index e5f648b..3a9330e 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -37,6 +37,7 @@ if TYPE_CHECKING: from collections.abc import Callable + from queue import Queue from types import TracebackType from pytask import Mark @@ -58,7 +59,11 @@ class WrapperResult: def wrap_task_in_thread( - task: PTask, *, remote: bool, shared_memory: dict[str, bool] | None, **kwargs: Any + task: PTask, + *, + remote: bool, + status_queue: "Queue[str] | None" = None, + **kwargs: Any, ) -> WrapperResult: """Mock execution function such that it returns the same as for processes. @@ -69,9 +74,9 @@ def wrap_task_in_thread( """ __tracebackhide__ = True - # Add task to shared memory to indicate that it is currently being executed. - if shared_memory is not None: - shared_memory[task.signature] = True + # Add task to the status queue to indicate that it is currently being executed. + if status_queue is not None: + status_queue.put(task.signature) try: out = task.function(**kwargs) @@ -89,9 +94,6 @@ def wrap_task_in_thread( _handle_function_products(task, out, remote=remote) exc_info = None - # Remove task from shared memory to indicate that it is no longer being executed. - if shared_memory is not None: - shared_memory.pop(task.signature, None) return WrapperResult( carry_over_products=None, warning_reports=[], @@ -108,7 +110,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], - shared_memory: dict[str, bool] | None, + status_queue: "Queue[str] | None" = None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: @@ -121,9 +123,9 @@ def wrap_task_in_process( # noqa: PLR0913 # Hide this function from tracebacks. __tracebackhide__ = True - # Add task to shared memory to indicate that it is currently being executed. - if shared_memory is not None: - shared_memory[task.signature] = True + # Add task to the status queue to indicate that it is currently being executed. + if status_queue is not None: + status_queue.put(task.signature) # Patch set_trace and breakpoint to show a better error message. _patch_set_trace_and_breakpoint() @@ -184,10 +186,6 @@ def wrap_task_in_process( # noqa: PLR0913 captured_stdout_buffer.close() captured_stderr_buffer.close() - # Remove task from shared memory to indicate that it is no longer being executed. - if shared_memory is not None: - shared_memory.pop(task.signature, None) - return WrapperResult( carry_over_products=products, warning_reports=warning_reports, From 3aa12ff611b6a55eee68a59462d8876b8735a655 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 19:22:28 +0000 Subject: [PATCH 12/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytask_parallel/wrappers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 3a9330e..33afb0a 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -62,7 +62,7 @@ def wrap_task_in_thread( task: PTask, *, remote: bool, - status_queue: "Queue[str] | None" = None, + status_queue: Queue[str] | None = None, **kwargs: Any, ) -> WrapperResult: """Mock execution function such that it returns the same as for processes. @@ -110,7 +110,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], - status_queue: "Queue[str] | None" = None, + status_queue: Queue[str] | None = None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: From 36b07493e85e2cdf46c51a56380299bf8412bb5d Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 20:26:57 +0100 Subject: [PATCH 13/25] Use a simple status queue for threads --- src/pytask_parallel/execute.py | 15 +++++++++------ src/pytask_parallel/wrappers.py | 5 +++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 4104508..0a7e174 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -66,14 +66,19 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # some parallel backends. if session.config["parallel_backend"] in ( ParallelBackend.PROCESSES, - ParallelBackend.THREADS, ParallelBackend.LOKY, ): manager_cls: Callable[[], SyncManager] | type[ExitStack] = Manager start_execution_state = TaskExecutionStatus.PENDING + status_queue_factory = "manager" + elif session.config["parallel_backend"] == ParallelBackend.THREADS: + manager_cls = ExitStack + start_execution_state = TaskExecutionStatus.PENDING + status_queue_factory = "simple" else: manager_cls = ExitStack start_execution_state = TaskExecutionStatus.RUNNING + status_queue_factory = None # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -85,12 +90,10 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"], manager_cls() as manager: - if session.config["parallel_backend"] in ( - ParallelBackend.PROCESSES, - ParallelBackend.THREADS, - ParallelBackend.LOKY, - ): + if status_queue_factory == "manager": session.config["_status_queue"] = manager.Queue() # type: ignore[union-attr] + elif status_queue_factory == "simple": + session.config["_status_queue"] = queue.SimpleQueue() i = 0 while session.scheduler.is_active(): diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 33afb0a..e4c3488 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -38,6 +38,7 @@ if TYPE_CHECKING: from collections.abc import Callable from queue import Queue + from queue import SimpleQueue from types import TracebackType from pytask import Mark @@ -62,7 +63,7 @@ def wrap_task_in_thread( task: PTask, *, remote: bool, - status_queue: Queue[str] | None = None, + status_queue: "Queue[str] | SimpleQueue[str] | None" = None, **kwargs: Any, ) -> WrapperResult: """Mock execution function such that it returns the same as for processes. @@ -110,7 +111,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], - status_queue: Queue[str] | None = None, + status_queue: "Queue[str] | SimpleQueue[str] | None" = None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: From 6183c1be19f038df85d7320c8d90506ca49097e3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 19:27:33 +0000 Subject: [PATCH 14/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytask_parallel/wrappers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index e4c3488..72be158 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -63,7 +63,7 @@ def wrap_task_in_thread( task: PTask, *, remote: bool, - status_queue: "Queue[str] | SimpleQueue[str] | None" = None, + status_queue: Queue[str] | SimpleQueue[str] | None = None, **kwargs: Any, ) -> WrapperResult: """Mock execution function such that it returns the same as for processes. @@ -111,7 +111,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], - status_queue: "Queue[str] | SimpleQueue[str] | None" = None, + status_queue: Queue[str] | SimpleQueue[str] | None = None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: From a9439b935fa167048f722d3ace36ad7272c0ffac Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 20:33:43 +0100 Subject: [PATCH 15/25] Drop pytask git override now in release --- pyproject.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7cf48b4..8520904 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,9 +76,6 @@ pytask_parallel = "pytask_parallel.plugin" requires = ["hatchling", "hatch_vcs"] build-backend = "hatchling.build" -[tool.uv.source] -pytask = { git = "https://github.com/pytask-dev/pytask", rev = "allow-setting-task-status" } - [tool.hatch.build.hooks.vcs] version-file = "src/pytask_parallel/_version.py" From a07a356c76e742aa051bea598b18b356291a4f65 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 20:40:41 +0100 Subject: [PATCH 16/25] Keep task submission count tied to worker availability --- src/pytask_parallel/execute.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 0a7e174..8f9364c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -105,15 +105,10 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # Unfortunately, all submitted tasks are shown as running although some # are pending. # - # For all other backends, at least four more tasks are submitted and - # otherwise 10% more. This is a heuristic to avoid submitting too few - # tasks. - # - # See #98 for more information. if any_coiled_task: n_new_tasks = 10_000 else: - n_new_tasks = max(4, int(session.config["n_workers"] * 0.1)) + n_new_tasks = session.config["n_workers"] - len(running_tasks) ready_tasks = ( list(session.scheduler.get_ready(n_new_tasks)) From ae30df7a93cedc523a76c28f25894333d1defe41 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 21:12:22 +0100 Subject: [PATCH 17/25] Add demo script for pending/running status --- scripts/pending_status_demo.py | 86 ++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 scripts/pending_status_demo.py diff --git a/scripts/pending_status_demo.py b/scripts/pending_status_demo.py new file mode 100644 index 0000000..7359bfc --- /dev/null +++ b/scripts/pending_status_demo.py @@ -0,0 +1,86 @@ +"""Demo script for pending/running task status updates.""" + +from __future__ import annotations + +import argparse +import os +import subprocess +import sys +from pathlib import Path + + +def _write_tasks(path: Path, n_tasks: int, sleep_s: float, jitter_s: float) -> None: + task_file = path / "task_status_demo.py" + lines = [ + "from __future__ import annotations", + "", + "from pathlib import Path", + "import time", + "", + "from pytask import task", + "", + f"N_TASKS = {n_tasks}", + f"SLEEP_S = {sleep_s}", + f"JITTER_S = {jitter_s}", + "", + "for i in range(N_TASKS):", + " @task(id=str(i), kwargs={'produces': Path(f'out_{i}.txt')})", + " def task_sleep(produces, i=i):", + " time.sleep(SLEEP_S + (i % 3) * JITTER_S)", + " produces.write_text('done')", + "", + ] + task_file.write_text("\n".join(lines), encoding="utf-8") + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Run a pytask demo to observe pending/running status updates." + ) + parser.add_argument("--n-tasks", type=int, default=30) + parser.add_argument("--sleep", type=float, default=2.0) + parser.add_argument("--jitter", type=float, default=0.5) + parser.add_argument("--workers", type=int, default=4) + parser.add_argument( + "--backend", + choices=["processes", "threads", "loky", "dask", "none"], + default="processes", + ) + parser.add_argument("--entries", type=int, default=30) + parser.add_argument( + "--dir", + type=Path, + default=Path(__file__).with_name("pending_status_demo"), + help="Directory to store the demo task file.", + ) + args = parser.parse_args() + + demo_dir = args.dir.resolve() + demo_dir.mkdir(parents=True, exist_ok=True) + + for path in demo_dir.glob("out_*.txt"): + path.unlink() + + _write_tasks(demo_dir, args.n_tasks, args.sleep, args.jitter) + + cmd = [ + sys.executable, + "-m", + "pytask", + demo_dir.as_posix(), + "--n-workers", + str(args.workers), + "--parallel-backend", + args.backend, + "--n-entries-in-table", + str(args.entries), + ] + print("Running:", " ".join(cmd)) + env = dict(os.environ) + env.setdefault("PYTHONIOENCODING", "utf-8") + env.setdefault("PYTHONUTF8", "1") + return subprocess.call(cmd, env=env) + + +if __name__ == "__main__": + raise SystemExit(main()) From a29427b241b9e322593c3c0400627b73992848e5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 20:11:07 +0000 Subject: [PATCH 18/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- scripts/pending_status_demo.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/pending_status_demo.py b/scripts/pending_status_demo.py index 7359bfc..b4092c4 100644 --- a/scripts/pending_status_demo.py +++ b/scripts/pending_status_demo.py @@ -75,7 +75,6 @@ def main() -> int: "--n-entries-in-table", str(args.entries), ] - print("Running:", " ".join(cmd)) env = dict(os.environ) env.setdefault("PYTHONIOENCODING", "utf-8") env.setdefault("PYTHONUTF8", "1") From 42de1cc1d328f915c398520e1842335ea3c839cb Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 21:21:02 +0100 Subject: [PATCH 19/25] Add debug status logs for demo --- scripts/pending_status_demo.py | 17 +++++++++++++++++ src/pytask_parallel/execute.py | 31 +++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/scripts/pending_status_demo.py b/scripts/pending_status_demo.py index b4092c4..7168581 100644 --- a/scripts/pending_status_demo.py +++ b/scripts/pending_status_demo.py @@ -47,6 +47,17 @@ def main() -> int: default="processes", ) parser.add_argument("--entries", type=int, default=30) + parser.add_argument( + "--live", + action="store_true", + help="Use the live rich table instead of raw logs.", + ) + parser.add_argument( + "--log-status", + action=argparse.BooleanOptionalAction, + default=True, + help="Emit pending/running status logs from the main process.", + ) parser.add_argument( "--dir", type=Path, @@ -75,9 +86,15 @@ def main() -> int: "--n-entries-in-table", str(args.entries), ] + if not args.live: + cmd.extend(["-s", "-v", "0"]) + sys.stdout.write(f"Running: {' '.join(cmd)}\n") + sys.stdout.flush() env = dict(os.environ) env.setdefault("PYTHONIOENCODING", "utf-8") env.setdefault("PYTHONUTF8", "1") + if args.log_status: + env.setdefault("PYTASK_PARALLEL_DEBUG_STATUS", "1") return subprocess.call(cmd, env=env) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 8f9364c..056353e 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,6 +2,7 @@ from __future__ import annotations +import os import queue import sys import time @@ -61,6 +62,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} sleeper = _Sleeper() + debug_status = _is_debug_status_enabled() # Create a shared queue to differentiate between running and pending tasks for # some parallel backends. @@ -118,6 +120,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 for task_signature in ready_tasks: task = session.dag.nodes[task_signature]["task"] + if debug_status: + _log_status( + "PENDING" + if start_execution_state == TaskExecutionStatus.PENDING + else "RUNNING", + task_signature, + ) session.hook.pytask_execute_task_log_start( session=session, task=task, status=start_execution_state ) @@ -188,7 +197,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # Check if tasks are not pending but running and update the live # status. - if live_execution and "_status_queue" in session.config: + if (live_execution or debug_status) and "_status_queue" in session.config: status_queue = session.config["_status_queue"] while True: try: @@ -196,9 +205,12 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 except queue.Empty: break if started_task in running_tasks: - live_execution.update_task( - started_task, status=TaskExecutionStatus.RUNNING - ) + if live_execution: + live_execution.update_task( + started_task, status=TaskExecutionStatus.RUNNING + ) + if debug_status: + _log_status("RUNNING", started_task) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -307,6 +319,17 @@ def pytask_unconfigure() -> None: registry.reset() +def _is_debug_status_enabled() -> bool: + """Return whether to emit debug status updates.""" + value = os.environ.get("PYTASK_PARALLEL_DEBUG_STATUS", "") + return value.strip().lower() in {"1", "true", "yes", "on"} + + +def _log_status(status: str, task_signature: str) -> None: + """Log a status transition for a task.""" + console.print(f"[pytask-parallel] {status}: {task_signature}") + + def _update_carry_over_products( task: PTask, carry_over_products: PyTree[CarryOverPath | PythonNode | None] | None ) -> None: From f8c3fab972bccbc7b839af1492e7c58bf2a22129 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 20:21:08 +0000 Subject: [PATCH 20/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytask_parallel/execute.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 056353e..fbd5b6c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -197,7 +197,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # Check if tasks are not pending but running and update the live # status. - if (live_execution or debug_status) and "_status_queue" in session.config: + if ( + live_execution or debug_status + ) and "_status_queue" in session.config: status_queue = session.config["_status_queue"] while True: try: From 6812a12b1e5a5f7af8bd4603ac26aca0bb4b0d09 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 21:57:25 +0100 Subject: [PATCH 21/25] Prefetch tasks for pending status --- src/pytask_parallel/execute.py | 134 +++++++++++++++++++++++++++------ 1 file changed, 109 insertions(+), 25 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index fbd5b6c..482b6f5 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -6,6 +6,7 @@ import queue import sys import time +from collections import deque from contextlib import ExitStack from multiprocessing import Manager from typing import TYPE_CHECKING @@ -61,6 +62,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 __tracebackhide__ = True reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + running_try_last: set[str] = set() + queued_tasks: deque[str] = deque() + queued_try_last_tasks: deque[str] = deque() sleeper = _Sleeper() debug_status = _is_debug_status_enabled() @@ -97,10 +101,26 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 elif status_queue_factory == "simple": session.config["_status_queue"] = queue.SimpleQueue() + if live_execution: + live_execution.initial_status = start_execution_state + i = 0 + prefetch_factor = ( + 2 + if session.config["parallel_backend"] + in ( + ParallelBackend.PROCESSES, + ParallelBackend.LOKY, + ParallelBackend.THREADS, + ) + else 1 + ) + use_prefetch_queue = prefetch_factor > 1 while session.scheduler.is_active(): try: newly_collected_reports = [] + did_enqueue = False + did_submit = False # If there is any coiled function, the user probably wants to exploit # adaptive scaling. Thus, we need to submit all ready tasks. @@ -110,7 +130,16 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 if any_coiled_task: n_new_tasks = 10_000 else: - n_new_tasks = session.config["n_workers"] - len(running_tasks) + if use_prefetch_queue: + n_new_tasks = ( + session.config["n_workers"] * prefetch_factor + ) - ( + len(running_tasks) + + len(queued_tasks) + + len(queued_try_last_tasks) + ) + else: + n_new_tasks = session.config["n_workers"] - len(running_tasks) ready_tasks = ( list(session.scheduler.get_ready(n_new_tasks)) @@ -118,34 +147,87 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 else [] ) - for task_signature in ready_tasks: - task = session.dag.nodes[task_signature]["task"] - if debug_status: - _log_status( - "PENDING" - if start_execution_state == TaskExecutionStatus.PENDING - else "RUNNING", - task_signature, + if use_prefetch_queue: + for task_signature in ready_tasks: + task = session.dag.nodes[task_signature]["task"] + if debug_status: + _log_status("PENDING", task_signature) + session.hook.pytask_execute_task_log_start( + session=session, + task=task, + status=start_execution_state, ) - session.hook.pytask_execute_task_log_start( - session=session, task=task, status=start_execution_state - ) - try: - session.hook.pytask_execute_task_setup( - session=session, task=task - ) - running_tasks[task_signature] = ( - session.hook.pytask_execute_task(session=session, task=task) + if get_marks(task, "try_last"): + queued_try_last_tasks.append(task_signature) + else: + queued_tasks.append(task_signature) + did_enqueue = True + + def _can_run_try_last() -> bool: + return not ( + queued_tasks + or (len(running_tasks) > len(running_try_last)) ) - sleeper.reset() - except Exception: # noqa: BLE001 - report = ExecutionReport.from_task_and_exception( - task, sys.exc_info() + + while len(running_tasks) < session.config["n_workers"]: + if queued_tasks: + task_signature = queued_tasks.popleft() + elif queued_try_last_tasks and _can_run_try_last(): + task_signature = queued_try_last_tasks.popleft() + else: + break + task = session.dag.nodes[task_signature]["task"] + try: + session.hook.pytask_execute_task_setup( + session=session, task=task + ) + running_tasks[task_signature] = ( + session.hook.pytask_execute_task( + session=session, task=task + ) + ) + if get_marks(task, "try_last"): + running_try_last.add(task_signature) + sleeper.reset() + did_submit = True + except Exception: # noqa: BLE001 + report = ExecutionReport.from_task_and_exception( + task, sys.exc_info() + ) + newly_collected_reports.append(report) + session.scheduler.done(task_signature) + else: + for task_signature in ready_tasks: + task = session.dag.nodes[task_signature]["task"] + if debug_status: + _log_status( + "PENDING" + if start_execution_state == TaskExecutionStatus.PENDING + else "RUNNING", + task_signature, + ) + session.hook.pytask_execute_task_log_start( + session=session, task=task, status=start_execution_state ) - newly_collected_reports.append(report) - session.scheduler.done(task_signature) + try: + session.hook.pytask_execute_task_setup( + session=session, task=task + ) + running_tasks[task_signature] = ( + session.hook.pytask_execute_task( + session=session, task=task + ) + ) + sleeper.reset() + did_submit = True + except Exception: # noqa: BLE001 + report = ExecutionReport.from_task_and_exception( + task, sys.exc_info() + ) + newly_collected_reports.append(report) + session.scheduler.done(task_signature) - if not ready_tasks: + if not ready_tasks and not did_enqueue and not did_submit: sleeper.increment() for task_signature in list(running_tasks): @@ -173,6 +255,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) ) running_tasks.pop(task_signature) + running_try_last.discard(task_signature) session.scheduler.done(task_signature) else: task = session.dag.nodes[task_signature]["task"] @@ -192,6 +275,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 report = ExecutionReport.from_task(task) running_tasks.pop(task_signature) + running_try_last.discard(task_signature) newly_collected_reports.append(report) session.scheduler.done(task_signature) From 8fd6d9c47965dca895508f9629ac66c74db85b87 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 20:56:04 +0000 Subject: [PATCH 22/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytask_parallel/execute.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 482b6f5..6fc5aa6 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -129,17 +129,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # if any_coiled_task: n_new_tasks = 10_000 + elif use_prefetch_queue: + n_new_tasks = (session.config["n_workers"] * prefetch_factor) - ( + len(running_tasks) + + len(queued_tasks) + + len(queued_try_last_tasks) + ) else: - if use_prefetch_queue: - n_new_tasks = ( - session.config["n_workers"] * prefetch_factor - ) - ( - len(running_tasks) - + len(queued_tasks) - + len(queued_try_last_tasks) - ) - else: - n_new_tasks = session.config["n_workers"] - len(running_tasks) + n_new_tasks = session.config["n_workers"] - len(running_tasks) ready_tasks = ( list(session.scheduler.get_ready(n_new_tasks)) @@ -165,8 +162,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 def _can_run_try_last() -> bool: return not ( - queued_tasks - or (len(running_tasks) > len(running_try_last)) + queued_tasks or (len(running_tasks) > len(running_try_last)) ) while len(running_tasks) < session.config["n_workers"]: From f3abd8afd7b8b4168bdf7eb7b5749513a1fc7f67 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 22:03:07 +0100 Subject: [PATCH 23/25] Fix pre-commit for demo script --- scripts/__init__.py | 1 + scripts/pending_status_demo.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 scripts/__init__.py diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..dba62ab --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""Helper scripts for pytask-parallel.""" diff --git a/scripts/pending_status_demo.py b/scripts/pending_status_demo.py index 7168581..c2495ff 100644 --- a/scripts/pending_status_demo.py +++ b/scripts/pending_status_demo.py @@ -34,6 +34,7 @@ def _write_tasks(path: Path, n_tasks: int, sleep_s: float, jitter_s: float) -> N def main() -> int: + """Run a demo pytask session to show pending/running status updates.""" parser = argparse.ArgumentParser( description="Run a pytask demo to observe pending/running status updates." ) @@ -95,7 +96,8 @@ def main() -> int: env.setdefault("PYTHONUTF8", "1") if args.log_status: env.setdefault("PYTASK_PARALLEL_DEBUG_STATUS", "1") - return subprocess.call(cmd, env=env) + # Controlled command built from explicit args; safe for local demo usage. + return subprocess.call(cmd, env=env) # noqa: S603 if __name__ == "__main__": From 0fcd62d1e1c9045385960bb6d10bff7247c2c504 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 23:06:05 +0100 Subject: [PATCH 24/25] Remove demo scripts and fix priority queue --- scripts/__init__.py | 1 - scripts/pending_status_demo.py | 104 --------------------------------- src/pytask_parallel/execute.py | 14 ++++- 3 files changed, 11 insertions(+), 108 deletions(-) delete mode 100644 scripts/__init__.py delete mode 100644 scripts/pending_status_demo.py diff --git a/scripts/__init__.py b/scripts/__init__.py deleted file mode 100644 index dba62ab..0000000 --- a/scripts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Helper scripts for pytask-parallel.""" diff --git a/scripts/pending_status_demo.py b/scripts/pending_status_demo.py deleted file mode 100644 index c2495ff..0000000 --- a/scripts/pending_status_demo.py +++ /dev/null @@ -1,104 +0,0 @@ -"""Demo script for pending/running task status updates.""" - -from __future__ import annotations - -import argparse -import os -import subprocess -import sys -from pathlib import Path - - -def _write_tasks(path: Path, n_tasks: int, sleep_s: float, jitter_s: float) -> None: - task_file = path / "task_status_demo.py" - lines = [ - "from __future__ import annotations", - "", - "from pathlib import Path", - "import time", - "", - "from pytask import task", - "", - f"N_TASKS = {n_tasks}", - f"SLEEP_S = {sleep_s}", - f"JITTER_S = {jitter_s}", - "", - "for i in range(N_TASKS):", - " @task(id=str(i), kwargs={'produces': Path(f'out_{i}.txt')})", - " def task_sleep(produces, i=i):", - " time.sleep(SLEEP_S + (i % 3) * JITTER_S)", - " produces.write_text('done')", - "", - ] - task_file.write_text("\n".join(lines), encoding="utf-8") - - -def main() -> int: - """Run a demo pytask session to show pending/running status updates.""" - parser = argparse.ArgumentParser( - description="Run a pytask demo to observe pending/running status updates." - ) - parser.add_argument("--n-tasks", type=int, default=30) - parser.add_argument("--sleep", type=float, default=2.0) - parser.add_argument("--jitter", type=float, default=0.5) - parser.add_argument("--workers", type=int, default=4) - parser.add_argument( - "--backend", - choices=["processes", "threads", "loky", "dask", "none"], - default="processes", - ) - parser.add_argument("--entries", type=int, default=30) - parser.add_argument( - "--live", - action="store_true", - help="Use the live rich table instead of raw logs.", - ) - parser.add_argument( - "--log-status", - action=argparse.BooleanOptionalAction, - default=True, - help="Emit pending/running status logs from the main process.", - ) - parser.add_argument( - "--dir", - type=Path, - default=Path(__file__).with_name("pending_status_demo"), - help="Directory to store the demo task file.", - ) - args = parser.parse_args() - - demo_dir = args.dir.resolve() - demo_dir.mkdir(parents=True, exist_ok=True) - - for path in demo_dir.glob("out_*.txt"): - path.unlink() - - _write_tasks(demo_dir, args.n_tasks, args.sleep, args.jitter) - - cmd = [ - sys.executable, - "-m", - "pytask", - demo_dir.as_posix(), - "--n-workers", - str(args.workers), - "--parallel-backend", - args.backend, - "--n-entries-in-table", - str(args.entries), - ] - if not args.live: - cmd.extend(["-s", "-v", "0"]) - sys.stdout.write(f"Running: {' '.join(cmd)}\n") - sys.stdout.flush() - env = dict(os.environ) - env.setdefault("PYTHONIOENCODING", "utf-8") - env.setdefault("PYTHONUTF8", "1") - if args.log_status: - env.setdefault("PYTASK_PARALLEL_DEBUG_STATUS", "1") - # Controlled command built from explicit args; safe for local demo usage. - return subprocess.call(cmd, env=env) # noqa: S603 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 6fc5aa6..60c6c66 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -63,6 +63,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} running_try_last: set[str] = set() + queued_try_first_tasks: deque[str] = deque() queued_tasks: deque[str] = deque() queued_try_last_tasks: deque[str] = deque() sleeper = _Sleeper() @@ -132,6 +133,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 elif use_prefetch_queue: n_new_tasks = (session.config["n_workers"] * prefetch_factor) - ( len(running_tasks) + + len(queued_try_first_tasks) + len(queued_tasks) + len(queued_try_last_tasks) ) @@ -154,7 +156,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 task=task, status=start_execution_state, ) - if get_marks(task, "try_last"): + if get_marks(task, "try_first"): + queued_try_first_tasks.append(task_signature) + elif get_marks(task, "try_last"): queued_try_last_tasks.append(task_signature) else: queued_tasks.append(task_signature) @@ -162,11 +166,15 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 def _can_run_try_last() -> bool: return not ( - queued_tasks or (len(running_tasks) > len(running_try_last)) + queued_try_first_tasks + or queued_tasks + or (len(running_tasks) > len(running_try_last)) ) while len(running_tasks) < session.config["n_workers"]: - if queued_tasks: + if queued_try_first_tasks: + task_signature = queued_try_first_tasks.popleft() + elif queued_tasks: task_signature = queued_tasks.popleft() elif queued_try_last_tasks and _can_run_try_last(): task_signature = queued_try_last_tasks.popleft() From 74748f2c63e2e37cb24b533c7279edb8e61d6d27 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 31 Jan 2026 23:27:39 +0100 Subject: [PATCH 25/25] Pin Sphinx below 9 for docs --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8520904..d010638 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ docs = [ "matplotlib", "myst-parser", "nbsphinx", - "sphinx", + "sphinx<9", "sphinx-autobuild", "sphinx-click", "sphinx-copybutton",