Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
- {pull}`130` switches type checking to ty.
- {pull}`131` updates pre-commit hooks.
- {pull}`132` removes the tox configuration in favor of uv and just.
- {pull}`137` fixes pickling errors in parallel workers when task modules contain
non-picklable globals. Fixes {issue}`136`.

## 0.5.1 - 2025-03-09

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ docs = [
"matplotlib",
"myst-parser",
"nbsphinx",
"sphinx",
"sphinx<9",
"sphinx-autobuild",
"sphinx-click",
"sphinx-copybutton",
Expand Down
86 changes: 0 additions & 86 deletions requirements-dev.lock

This file was deleted.

51 changes: 0 additions & 51 deletions requirements.lock

This file was deleted.

55 changes: 52 additions & 3 deletions src/pytask_parallel/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import os
import sys
import warnings
from concurrent.futures import Executor
from concurrent.futures import Future
Expand All @@ -19,7 +21,46 @@
if TYPE_CHECKING:
from collections.abc import Callable

__all__ = ["ParallelBackend", "ParallelBackendRegistry", "WorkerType", "registry"]
__all__ = [
"ParallelBackend",
"ParallelBackendRegistry",
"WorkerType",
"registry",
"set_worker_root",
]

_WORKER_ROOT: str | None = None


def set_worker_root(path: os.PathLike[str] | str) -> None:
"""Configure the root path for worker processes.

Spawned workers (notably on Windows) start with a clean interpreter and may not
inherit the parent's import path. We set both ``sys.path`` and ``PYTHONPATH`` so
task modules are importable by reference, which avoids pickling module globals.

"""
root = os.fspath(path)
global _WORKER_ROOT # noqa: PLW0603
_WORKER_ROOT = root
if root not in sys.path:
sys.path.insert(0, root)
# Ensure custom process backends can import task modules by reference.
separator = os.pathsep
current = os.environ.get("PYTHONPATH", "")
parts = [p for p in current.split(separator) if p] if current else []
if root not in parts:
parts.insert(0, root)
os.environ["PYTHONPATH"] = separator.join(parts)


def _configure_worker(root: str | None) -> None:
"""Set cwd and sys.path for worker processes."""
if not root:
return
os.chdir(root)
if root not in sys.path:
sys.path.insert(0, root)


def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
Expand Down Expand Up @@ -75,12 +116,20 @@ def _get_dask_executor(n_workers: int) -> Executor:

def _get_loky_executor(n_workers: int) -> Executor:
"""Get a loky executor."""
return get_reusable_executor(max_workers=n_workers)
return get_reusable_executor(
max_workers=n_workers,
initializer=_configure_worker,
initargs=(_WORKER_ROOT,),
)


def _get_process_pool_executor(n_workers: int) -> Executor:
"""Get a process pool executor."""
return _CloudpickleProcessPoolExecutor(max_workers=n_workers)
return _CloudpickleProcessPoolExecutor(
max_workers=n_workers,
initializer=_configure_worker,
initargs=(_WORKER_ROOT,),
)


def _get_thread_pool_executor(n_workers: int) -> Executor:
Expand Down
9 changes: 7 additions & 2 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

from pytask_parallel.backends import WorkerType
from pytask_parallel.backends import registry
from pytask_parallel.backends import set_worker_root
from pytask_parallel.typing import CarryOverPath
from pytask_parallel.typing import is_coiled_function
from pytask_parallel.utils import create_kwargs_for_task
from pytask_parallel.utils import get_module
from pytask_parallel.utils import parse_future_result
from pytask_parallel.utils import should_pickle_module_by_value

if TYPE_CHECKING:
from concurrent.futures import Future
Expand All @@ -57,6 +59,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091

# The executor can only be created after the collection to give users the
# possibility to inject their own executors.
set_worker_root(session.config["root"])
session.config["_parallel_executor"] = registry.get_parallel_backend(
session.config["parallel_backend"], n_workers=session.config["n_workers"]
)
Expand Down Expand Up @@ -208,7 +211,8 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
# cloudpickle will pickle it with the function. See cloudpickle#417, pytask#373
# and pytask#374.
task_module = get_module(task.function, getattr(task, "path", None))
cloudpickle.register_pickle_by_value(task_module)
if should_pickle_module_by_value(task_module):
cloudpickle.register_pickle_by_value(task_module)

return cast("Any", wrapper_func).submit(
task=task,
Expand All @@ -230,7 +234,8 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
# cloudpickle will pickle it with the function. See cloudpickle#417, pytask#373
# and pytask#374.
task_module = get_module(task.function, getattr(task, "path", None))
cloudpickle.register_pickle_by_value(task_module)
if should_pickle_module_by_value(task_module):
cloudpickle.register_pickle_by_value(task_module)

return session.config["_parallel_executor"].submit(
wrap_task_in_process,
Expand Down
31 changes: 30 additions & 1 deletion src/pytask_parallel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from __future__ import annotations

import importlib.util
import inspect
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

Expand All @@ -20,7 +22,6 @@
if TYPE_CHECKING:
from collections.abc import Callable
from concurrent.futures import Future
from pathlib import Path
from types import ModuleType
from types import TracebackType

Expand All @@ -39,6 +40,7 @@ class CoiledFunction: ...
"create_kwargs_for_task",
"get_module",
"parse_future_result",
"should_pickle_module_by_value",
]


Expand Down Expand Up @@ -150,3 +152,30 @@ def get_module(func: Callable[..., Any], path: Path | None) -> ModuleType:
if path:
return inspect.getmodule(func, path.as_posix()) # type: ignore[return-value]
return inspect.getmodule(func) # type: ignore[return-value]


def should_pickle_module_by_value(module: ModuleType) -> bool:
"""Return whether a module should be pickled by value.

We only pickle by value when the module is not importable by name in the worker.
This avoids serializing all module globals, which can fail for non-picklable
objects (e.g., closed file handles or locks stored at module scope).

"""
module_name = getattr(module, "__name__", None)
module_file = getattr(module, "__file__", None)
if not module_name or module_name == "__main__" or module_file is None:
return True

try:
spec = importlib.util.find_spec(module_name)
except (ImportError, ValueError, AttributeError):
return True

if spec is None or spec.origin is None:
return True

try:
return Path(spec.origin).resolve() != Path(module_file).resolve()
except OSError:
return True
2 changes: 1 addition & 1 deletion src/pytask_parallel/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _render_traceback_to_string(
traceback = Traceback(exc_info, show_locals=show_locals)
segments = console.render(cast("Any", traceback), options=console_options)
text = "".join(segment.text for segment in segments)
return (*exc_info[:2], text) # ty: ignore[invalid-return-type]
return (*exc_info[:2], text)


def _handle_function_products(
Expand Down
Loading
Loading