From d3689754aa2283e9022ffb0973d6f09f09d2145c Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 18:45:54 -0300
Subject: [PATCH 01/22] Check for subclasses of Parallelizable
---
hamilton/htypes.py | 5 +++--
hamilton/node.py | 4 ++--
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index c2ac13314..5a83a31ba 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -68,7 +68,7 @@ def custom_subclass_check(requested_type: Type, param_type: Type):
has_generic = True
# TODO -- consider moving into a graph adapter or elsewhere -- this is perhaps a little too
# low-level
- if has_generic and requested_origin_type in (Parallelizable,):
+ if has_generic and is_parallelizable(requested_origin_type):
(requested_type_arg,) = _get_args(requested_type)
return custom_subclass_check(requested_type_arg, param_type)
if has_generic and param_origin_type == Collect:
@@ -297,11 +297,12 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
+def is_parallelizable(type:Type) -> bool:
+ return type == Parallelizable or Parallelizable in type.__bases__
def is_parallelizable_type(type_: Type) -> bool:
return _get_origin(type_) == Parallelizable
-
class Collect(Iterable[CollectElement], Protocol[CollectElement]):
"""Marks a function node parameter as collectable.
diff --git a/hamilton/node.py b/hamilton/node.py
index 5755b0ed7..97002c1f6 100644
--- a/hamilton/node.py
+++ b/hamilton/node.py
@@ -6,7 +6,7 @@
import typing_inspect
-from hamilton.htypes import Collect, Parallelizable
+from hamilton.htypes import Collect, Parallelizable, is_parallelizable
"""
Module that contains the primitive components of the graph.
@@ -285,7 +285,7 @@ def from_fn(fn: Callable, name: str = None) -> "Node":
node_source = NodeType.STANDARD
# TODO - extract this into a function + clean up!
if typing_inspect.is_generic_type(return_type):
- if typing_inspect.get_origin(return_type) == Parallelizable:
+ if is_parallelizable(typing_inspect.get_origin(return_type)):
node_source = NodeType.EXPAND
for hint in typing.get_type_hints(fn, **type_hint_kwargs).values():
if typing_inspect.is_generic_type(hint):
From 47d2cf3e84b3151551e95973a7650ec7710775a5 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 18:48:03 -0300
Subject: [PATCH 02/22] Specific ParallelizableList for annotation
---
hamilton/htypes.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 5a83a31ba..83194d9b7 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -1,7 +1,7 @@
import inspect
import sys
import typing
-from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union
+from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic
import typing_inspect
@@ -297,6 +297,10 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
+class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]):
+ pass
+
+
def is_parallelizable(type:Type) -> bool:
return type == Parallelizable or Parallelizable in type.__bases__
From 55f061f66132e86966b6a15502e8d26fb51719e2 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 19:22:25 -0300
Subject: [PATCH 03/22] Test for ParallelizableList
---
tests/execution/test_executors.py | 12 +++++
.../dynamic_parallelism/parallel_list.py | 51 +++++++++++++++++++
2 files changed, 63 insertions(+)
create mode 100644 tests/resources/dynamic_parallelism/parallel_list.py
diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py
index cced38d2d..a67558458 100644
--- a/tests/execution/test_executors.py
+++ b/tests/execution/test_executors.py
@@ -30,6 +30,7 @@
parallel_complex,
parallel_delayed,
parallel_linear_basic,
+ parallel_list,
)
ADAPTER = base.DefaultAdapter()
@@ -347,3 +348,14 @@ def test_parallel_end_to_end_with_empty_list():
parallel_collect_multiple_arguments._reset_counter()
res = dr.execute(["final"], overrides={"number_of_steps": 0})
assert res["final"] == parallel_linear_basic._calc(0)
+
+
+def test_parallel_list():
+ dr = (
+ driver.Builder()
+ .with_modules(parallel_list)
+ .enable_dynamic_execution(allow_experimental_mode=True)
+ .build()
+ )
+ result = dr.execute(["final"])
+ assert result["final"] == parallel_linear_basic._calc()
diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py
new file mode 100644
index 000000000..e8b048485
--- /dev/null
+++ b/tests/resources/dynamic_parallelism/parallel_list.py
@@ -0,0 +1,51 @@
+from hamilton.htypes import Collect, ParallelizableList
+
+
+# input
+def number_of_steps() -> int:
+ return 6
+
+
+# expand
+def steps(number_of_steps: int) -> ParallelizableList[int]:
+ return list(range(number_of_steps))
+
+
+# process
+def step_squared(steps: int) -> int:
+ return steps**2
+
+
+# process
+def step_cubed(steps: int) -> int:
+ return steps**3
+
+
+def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int:
+ return step_squared + step_cubed
+
+
+# join
+def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int:
+ out = 0
+ for step in step_squared_plus_step_cubed:
+ out += step
+ return out
+
+
+# final
+def final(sum_step_squared_plus_step_cubed: int) -> int:
+ return sum_step_squared_plus_step_cubed
+
+
+def _calc(number_of_steps: int = number_of_steps()) -> int:
+ steps_ = steps(number_of_steps)
+ to_sum = []
+ for step_ in steps_:
+ step_squared_ = step_squared(step_)
+ step_cubed_ = step_cubed(step_)
+ step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_)
+ to_sum.append(step_squared_plus_step_cubed_)
+ sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum)
+ final_ = final(sum_step_squared_plus_step_cubed_)
+ return final_
From e1f1267b9e7e4a81a66ddf84ea14aeb66745d9c8 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 19:31:44 -0300
Subject: [PATCH 04/22] Documentation
Documentation of ParallelizableList and is_parallelizable
---
hamilton/htypes.py | 24 +++++++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 83194d9b7..9bcc46270 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -1,7 +1,7 @@
import inspect
import sys
import typing
-from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic
+from typing import Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union
import typing_inspect
@@ -297,16 +297,34 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
-class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]):
+
+class ParallelizableList(
+ list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
+):
+ """
+ Marks the output of a function node as parallelizable and also as a list.
+
+ It has the same usage as "Parallelizable", but for returns that are specifically
+ lists, for correct functioning of linters and other tools.
+ """
+
pass
-def is_parallelizable(type:Type) -> bool:
+def is_parallelizable(type: Type) -> bool:
+ """
+ Checks if a type is parallelizable.
+
+ :param type: Type to check.
+ :return: True if the type is parallelizable, False otherwise.
+ """
return type == Parallelizable or Parallelizable in type.__bases__
+
def is_parallelizable_type(type_: Type) -> bool:
return _get_origin(type_) == Parallelizable
+
class Collect(Iterable[CollectElement], Protocol[CollectElement]):
"""Marks a function node parameter as collectable.
From 252664de8dad3d971965b3987cd70f58bbac2c06 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 19:39:16 -0300
Subject: [PATCH 05/22] Pre-commit fixes on node.py
---
hamilton/node.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/hamilton/node.py b/hamilton/node.py
index 97002c1f6..bc16ca8b7 100644
--- a/hamilton/node.py
+++ b/hamilton/node.py
@@ -6,7 +6,7 @@
import typing_inspect
-from hamilton.htypes import Collect, Parallelizable, is_parallelizable
+from hamilton.htypes import Collect, is_parallelizable
"""
Module that contains the primitive components of the graph.
From 0a7c0f7eb5ffbec77f2f65d1fe3bb5dbf23cc184 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 20:00:53 -0300
Subject: [PATCH 06/22] Fix Parallelizable checking
It was throwing exception when checking against None
---
hamilton/htypes.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 9bcc46270..387f19610 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -318,6 +318,9 @@ def is_parallelizable(type: Type) -> bool:
:param type: Type to check.
:return: True if the type is parallelizable, False otherwise.
"""
+ if type is None:
+ return False
+
return type == Parallelizable or Parallelizable in type.__bases__
From 31538947be4a8717e9b674ab5632a9f25b92d75e Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 20:01:36 -0300
Subject: [PATCH 07/22] Fix ParallelizableList on 3.8
Python 3.8 does not support list[], changed to typing.List[]
---
hamilton/htypes.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 387f19610..afc720d2a 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -1,7 +1,7 @@
import inspect
import sys
import typing
-from typing import Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union
+from typing import Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar, Union
import typing_inspect
@@ -299,7 +299,7 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
class ParallelizableList(
- list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
+ List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
):
"""
Marks the output of a function node as parallelizable and also as a list.
From b6e4298d7e0307154838043f4ba222c2a4194aeb Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:16:43 -0300
Subject: [PATCH 08/22] Fix validte_examples in Windows
Windows use backslashes and the generated path for the URLs was been generated with mixed "/" and "\"
---
examples/validate_examples.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/examples/validate_examples.py b/examples/validate_examples.py
index d85dcbbf7..e7eabfa0e 100644
--- a/examples/validate_examples.py
+++ b/examples/validate_examples.py
@@ -15,13 +15,13 @@
def _create_github_badge(path: pathlib.Path) -> str:
- github_url = f"https://github.com/dagworks-inc/hamilton/blob/main/{path}"
+ github_url = f"https://github.com/dagworks-inc/hamilton/blob/main/{path.as_posix()}"
github_badge = f"[]({github_url})"
return github_badge
def _create_colab_badge(path: pathlib.Path) -> str:
- colab_url = f"https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/{path}"
+ colab_url = f"https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/{path.as_posix()}"
colab_badge = (
f"[]({colab_url})"
)
From 5a39b886333599c57e8d8a47cbb1ffb0b4360d5c Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:16:54 -0300
Subject: [PATCH 09/22] Parallelizable subclass example
---
.../parallelizable_subclass/README.md | 11 +
.../parallelizable_subclass/functions.py | 15 +
.../parallelizable_subclass/notebook.ipynb | 299 ++++++++++++++++++
.../parallelizable_list.py | 16 +
4 files changed, 341 insertions(+)
create mode 100644 examples/parallelism/parallelizable_subclass/README.md
create mode 100644 examples/parallelism/parallelizable_subclass/functions.py
create mode 100644 examples/parallelism/parallelizable_subclass/notebook.ipynb
create mode 100644 examples/parallelism/parallelizable_subclass/parallelizable_list.py
diff --git a/examples/parallelism/parallelizable_subclass/README.md b/examples/parallelism/parallelizable_subclass/README.md
new file mode 100644
index 000000000..69ffa9ae3
--- /dev/null
+++ b/examples/parallelism/parallelizable_subclass/README.md
@@ -0,0 +1,11 @@
+# Parallelizable Subclass
+
+## Overview
+
+When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem.
+
+To solve this problem, it is possible to create subclasses of the `Parallelizable` classes, as demonstrated in this example.
+
+## Running
+
+The `notebook.ipynb` exemplifies how to use a `Parallelizable` subclass.
diff --git a/examples/parallelism/parallelizable_subclass/functions.py b/examples/parallelism/parallelizable_subclass/functions.py
new file mode 100644
index 000000000..a3cae5992
--- /dev/null
+++ b/examples/parallelism/parallelizable_subclass/functions.py
@@ -0,0 +1,15 @@
+from parallelizable_list import ParallelizableList
+
+from hamilton.htypes import Collect
+
+
+def hello_list() -> ParallelizableList[str]:
+ return ["h", "e", "l", "l", "o", " ", "l", "i", "s", "t"]
+
+
+def uppercase(hello_list: str) -> str:
+ return hello_list.upper()
+
+
+def hello_uppercase(uppercase: Collect[str]) -> str:
+ return "".join(uppercase)
diff --git a/examples/parallelism/parallelizable_subclass/notebook.ipynb b/examples/parallelism/parallelizable_subclass/notebook.ipynb
new file mode 100644
index 000000000..b070a38f6
--- /dev/null
+++ b/examples/parallelism/parallelizable_subclass/notebook.ipynb
@@ -0,0 +1,299 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "#Install Hamilton if not avaiable\n",
+ "\n",
+ "try:\n",
+ " import hamilton\n",
+ "except ModuleNotFoundError:\n",
+ " %pip install sf-hamilton"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Parallelism: Paralellizable Subclass [](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass/notebook.ipynb) [](https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass/notebook.ipynb)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem.\n",
+ "\n",
+ "To solve this problem, it is possible to create subclasses of the `Parallelizable` classes, as demonstrated in this example."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We start by importing Hamilton and the created example functions:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from hamilton import driver\n",
+ "\n",
+ "import functions"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Creating a driver and displaing all the module functions, we can see the `hello_list` function, that returns a `ParallelizableList`. This is a example `Parallelizable` subclass created for annotate functions that returns `list`. Is important to note that all `Parallelizable` subclasses must return a `Iterable` subclass, as for example list.\n",
+ "\n",
+ "The `ParallelizableList` implementation can be found in the [\"parallelizable_list.py\" file](https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass/parallelizable_list.py)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(functions)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .build()\n",
+ " )\n",
+ "\n",
+ "dr.display_all_functions()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In this simple example, the created flow generates a list with \"hello list\" letters, converts each letter to uppercase in parallel, and then joins the letters together:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'hello_uppercase': 'HELLO LIST'}"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dr.execute([\"hello_uppercase\"])"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Focusing attention on the function that was annotated with ParallelizableList, running it manually we can see that it actually returns a list:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['h', 'e', 'l', 'l', 'o', ' ', 'l', 'i', 's', 't']"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "functions.hello_list()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Checking the annotation, we can see the return annotation as \"ParallelizableList[str]\":"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'return': parallelizable_list.ParallelizableList[str]}"
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "functions.hello_list.__annotations__"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "And, the key point of using subtypes of `Parallelizable`, it is considered a list instance:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 7,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "issubclass(functions.hello_list.__annotations__[\"return\"], list)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This means that when using a linter or static type checking, it will correctly identify the return type as a list instance."
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "venv",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.8"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/examples/parallelism/parallelizable_subclass/parallelizable_list.py b/examples/parallelism/parallelizable_subclass/parallelizable_list.py
new file mode 100644
index 000000000..21d9f9d0c
--- /dev/null
+++ b/examples/parallelism/parallelizable_subclass/parallelizable_list.py
@@ -0,0 +1,16 @@
+from typing import Generic, List
+
+from hamilton.htypes import Parallelizable, ParallelizableElement
+
+
+class ParallelizableList(
+ List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
+):
+ """
+ Marks the output of a function node as parallelizable and also as a list.
+
+ It has the same usage as "Parallelizable", but for returns that are specifically
+ lists, for correct functioning of linters and other tools.
+ """
+
+ pass
From 97b9826edbd03ea135fda7a812e7e4c844c09b53 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:17:07 -0300
Subject: [PATCH 10/22] Revert "Test for ParallelizableList"
This reverts commit ec92486ceecdd1f73441450fbfa9da7e86443d6e.
---
tests/execution/test_executors.py | 12 -----
.../dynamic_parallelism/parallel_list.py | 51 -------------------
2 files changed, 63 deletions(-)
delete mode 100644 tests/resources/dynamic_parallelism/parallel_list.py
diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py
index a67558458..cced38d2d 100644
--- a/tests/execution/test_executors.py
+++ b/tests/execution/test_executors.py
@@ -30,7 +30,6 @@
parallel_complex,
parallel_delayed,
parallel_linear_basic,
- parallel_list,
)
ADAPTER = base.DefaultAdapter()
@@ -348,14 +347,3 @@ def test_parallel_end_to_end_with_empty_list():
parallel_collect_multiple_arguments._reset_counter()
res = dr.execute(["final"], overrides={"number_of_steps": 0})
assert res["final"] == parallel_linear_basic._calc(0)
-
-
-def test_parallel_list():
- dr = (
- driver.Builder()
- .with_modules(parallel_list)
- .enable_dynamic_execution(allow_experimental_mode=True)
- .build()
- )
- result = dr.execute(["final"])
- assert result["final"] == parallel_linear_basic._calc()
diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py
deleted file mode 100644
index e8b048485..000000000
--- a/tests/resources/dynamic_parallelism/parallel_list.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from hamilton.htypes import Collect, ParallelizableList
-
-
-# input
-def number_of_steps() -> int:
- return 6
-
-
-# expand
-def steps(number_of_steps: int) -> ParallelizableList[int]:
- return list(range(number_of_steps))
-
-
-# process
-def step_squared(steps: int) -> int:
- return steps**2
-
-
-# process
-def step_cubed(steps: int) -> int:
- return steps**3
-
-
-def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int:
- return step_squared + step_cubed
-
-
-# join
-def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int:
- out = 0
- for step in step_squared_plus_step_cubed:
- out += step
- return out
-
-
-# final
-def final(sum_step_squared_plus_step_cubed: int) -> int:
- return sum_step_squared_plus_step_cubed
-
-
-def _calc(number_of_steps: int = number_of_steps()) -> int:
- steps_ = steps(number_of_steps)
- to_sum = []
- for step_ in steps_:
- step_squared_ = step_squared(step_)
- step_cubed_ = step_cubed(step_)
- step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_)
- to_sum.append(step_squared_plus_step_cubed_)
- sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum)
- final_ = final(sum_step_squared_plus_step_cubed_)
- return final_
From ca091f4405ba30646c2843ca7accbd0808865bfc Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:19:21 -0300
Subject: [PATCH 11/22] Conflict from commit revert fix
---
hamilton/htypes.py | 15 +--------------
1 file changed, 1 insertion(+), 14 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index afc720d2a..e6d52cc9f 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -1,7 +1,7 @@
import inspect
import sys
import typing
-from typing import Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar, Union
+from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union
import typing_inspect
@@ -298,19 +298,6 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
-class ParallelizableList(
- List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
-):
- """
- Marks the output of a function node as parallelizable and also as a list.
-
- It has the same usage as "Parallelizable", but for returns that are specifically
- lists, for correct functioning of linters and other tools.
- """
-
- pass
-
-
def is_parallelizable(type: Type) -> bool:
"""
Checks if a type is parallelizable.
From 6b65aa25d5e9dc7394cb19d233211dcb11c0aad6 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:32:07 -0300
Subject: [PATCH 12/22] Added Parallelizable Subclass info to docs
---
docs/concepts/parallel-task.rst | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/docs/concepts/parallel-task.rst b/docs/concepts/parallel-task.rst
index b3898b0e9..78deb613c 100644
--- a/docs/concepts/parallel-task.rst
+++ b/docs/concepts/parallel-task.rst
@@ -266,3 +266,10 @@ to set what should be determined to be collected at DAG construction time:
# Then in the driver building pass in the configuration:
.with_config(_config)
+
+Parallelizable Subclassing
+=============
+
+When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem.
+
+To solve this problem, it is possible to create subclasses of the `Parallelizable` classes. The ["Parallelizable Subclass" example](https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass) showcases how to do that.
From f915e1becfd29a9e51c7920ac082a7b8093139c7 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:41:56 -0300
Subject: [PATCH 13/22] Fix documentation title
---
docs/concepts/parallel-task.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/concepts/parallel-task.rst b/docs/concepts/parallel-task.rst
index 78deb613c..a85cff6c2 100644
--- a/docs/concepts/parallel-task.rst
+++ b/docs/concepts/parallel-task.rst
@@ -268,7 +268,7 @@ to set what should be determined to be collected at DAG construction time:
.with_config(_config)
Parallelizable Subclassing
-=============
+==========================
When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem.
From 9168c912609abc5f51f6bc0005def947e59e32d0 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 18:45:54 -0300
Subject: [PATCH 14/22] Check for subclasses of Parallelizable
---
hamilton/htypes.py | 3 ++-
hamilton/node.py | 2 +-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index e6d52cc9f..9188ccc7c 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -297,6 +297,8 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
+def is_parallelizable(type:Type) -> bool:
+ return type == Parallelizable or Parallelizable in type.__bases__
def is_parallelizable(type: Type) -> bool:
"""
@@ -314,7 +316,6 @@ def is_parallelizable(type: Type) -> bool:
def is_parallelizable_type(type_: Type) -> bool:
return _get_origin(type_) == Parallelizable
-
class Collect(Iterable[CollectElement], Protocol[CollectElement]):
"""Marks a function node parameter as collectable.
diff --git a/hamilton/node.py b/hamilton/node.py
index bc16ca8b7..97002c1f6 100644
--- a/hamilton/node.py
+++ b/hamilton/node.py
@@ -6,7 +6,7 @@
import typing_inspect
-from hamilton.htypes import Collect, is_parallelizable
+from hamilton.htypes import Collect, Parallelizable, is_parallelizable
"""
Module that contains the primitive components of the graph.
From 12c838fcf6913ca42b6003ad68761f8ca5a8fe9d Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 18:48:03 -0300
Subject: [PATCH 15/22] Specific ParallelizableList for annotation
---
hamilton/htypes.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 9188ccc7c..7148c32d7 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -1,7 +1,7 @@
import inspect
import sys
import typing
-from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union
+from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic
import typing_inspect
@@ -297,6 +297,10 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
+class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]):
+ pass
+
+
def is_parallelizable(type:Type) -> bool:
return type == Parallelizable or Parallelizable in type.__bases__
From b195758290aec321c1ebeee7f3c3c29b606e83c7 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 19:22:25 -0300
Subject: [PATCH 16/22] Test for ParallelizableList
---
tests/execution/test_executors.py | 12 +++++
.../dynamic_parallelism/parallel_list.py | 51 +++++++++++++++++++
2 files changed, 63 insertions(+)
create mode 100644 tests/resources/dynamic_parallelism/parallel_list.py
diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py
index cced38d2d..a67558458 100644
--- a/tests/execution/test_executors.py
+++ b/tests/execution/test_executors.py
@@ -30,6 +30,7 @@
parallel_complex,
parallel_delayed,
parallel_linear_basic,
+ parallel_list,
)
ADAPTER = base.DefaultAdapter()
@@ -347,3 +348,14 @@ def test_parallel_end_to_end_with_empty_list():
parallel_collect_multiple_arguments._reset_counter()
res = dr.execute(["final"], overrides={"number_of_steps": 0})
assert res["final"] == parallel_linear_basic._calc(0)
+
+
+def test_parallel_list():
+ dr = (
+ driver.Builder()
+ .with_modules(parallel_list)
+ .enable_dynamic_execution(allow_experimental_mode=True)
+ .build()
+ )
+ result = dr.execute(["final"])
+ assert result["final"] == parallel_linear_basic._calc()
diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py
new file mode 100644
index 000000000..e8b048485
--- /dev/null
+++ b/tests/resources/dynamic_parallelism/parallel_list.py
@@ -0,0 +1,51 @@
+from hamilton.htypes import Collect, ParallelizableList
+
+
+# input
+def number_of_steps() -> int:
+ return 6
+
+
+# expand
+def steps(number_of_steps: int) -> ParallelizableList[int]:
+ return list(range(number_of_steps))
+
+
+# process
+def step_squared(steps: int) -> int:
+ return steps**2
+
+
+# process
+def step_cubed(steps: int) -> int:
+ return steps**3
+
+
+def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int:
+ return step_squared + step_cubed
+
+
+# join
+def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int:
+ out = 0
+ for step in step_squared_plus_step_cubed:
+ out += step
+ return out
+
+
+# final
+def final(sum_step_squared_plus_step_cubed: int) -> int:
+ return sum_step_squared_plus_step_cubed
+
+
+def _calc(number_of_steps: int = number_of_steps()) -> int:
+ steps_ = steps(number_of_steps)
+ to_sum = []
+ for step_ in steps_:
+ step_squared_ = step_squared(step_)
+ step_cubed_ = step_cubed(step_)
+ step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_)
+ to_sum.append(step_squared_plus_step_cubed_)
+ sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum)
+ final_ = final(sum_step_squared_plus_step_cubed_)
+ return final_
From 7939d7a51e994ecde9852fe391cec47e12452527 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 19:31:44 -0300
Subject: [PATCH 17/22] Documentation
Documentation of ParallelizableList and is_parallelizable
---
hamilton/htypes.py | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 7148c32d7..eba8ddca5 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -1,7 +1,8 @@
import inspect
import sys
import typing
-from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic
+from typing import (
+ Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union)
import typing_inspect
@@ -297,12 +298,19 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
-class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]):
- pass
+class ParallelizableList(
+ list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
+):
+ """
+ Marks the output of a function node as parallelizable and also as a list.
+
+ It has the same usage as "Parallelizable", but for returns that are specifically
+ lists, for correct functioning of linters and other tools.
+ """
+
+ pass
-def is_parallelizable(type:Type) -> bool:
- return type == Parallelizable or Parallelizable in type.__bases__
def is_parallelizable(type: Type) -> bool:
"""
@@ -320,6 +328,7 @@ def is_parallelizable(type: Type) -> bool:
def is_parallelizable_type(type_: Type) -> bool:
return _get_origin(type_) == Parallelizable
+
class Collect(Iterable[CollectElement], Protocol[CollectElement]):
"""Marks a function node parameter as collectable.
From f422559be36852cbea4e817e1ff7842d0ff5f4aa Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 19:39:16 -0300
Subject: [PATCH 18/22] Pre-commit fixes on node.py
---
hamilton/node.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/hamilton/node.py b/hamilton/node.py
index 97002c1f6..bc16ca8b7 100644
--- a/hamilton/node.py
+++ b/hamilton/node.py
@@ -6,7 +6,7 @@
import typing_inspect
-from hamilton.htypes import Collect, Parallelizable, is_parallelizable
+from hamilton.htypes import Collect, is_parallelizable
"""
Module that contains the primitive components of the graph.
From d8f4a99c26597e8b77c595bb79cbcc70a9781742 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Thu, 16 Jan 2025 20:01:36 -0300
Subject: [PATCH 19/22] Fix ParallelizableList on 3.8
Python 3.8 does not support list[], changed to typing.List[]
---
hamilton/htypes.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index eba8ddca5..8599d639d 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -2,7 +2,8 @@
import sys
import typing
from typing import (
- Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union)
+ Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar,
+ Union)
import typing_inspect
@@ -300,7 +301,7 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
class ParallelizableList(
- list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
+ List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
):
"""
Marks the output of a function node as parallelizable and also as a list.
From 783d0b9b836f51039a4c9a361e63717a8abd5f6f Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:17:07 -0300
Subject: [PATCH 20/22] Revert "Test for ParallelizableList"
This reverts commit ec92486ceecdd1f73441450fbfa9da7e86443d6e.
---
tests/execution/test_executors.py | 12 -----
.../dynamic_parallelism/parallel_list.py | 51 -------------------
2 files changed, 63 deletions(-)
delete mode 100644 tests/resources/dynamic_parallelism/parallel_list.py
diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py
index a67558458..cced38d2d 100644
--- a/tests/execution/test_executors.py
+++ b/tests/execution/test_executors.py
@@ -30,7 +30,6 @@
parallel_complex,
parallel_delayed,
parallel_linear_basic,
- parallel_list,
)
ADAPTER = base.DefaultAdapter()
@@ -348,14 +347,3 @@ def test_parallel_end_to_end_with_empty_list():
parallel_collect_multiple_arguments._reset_counter()
res = dr.execute(["final"], overrides={"number_of_steps": 0})
assert res["final"] == parallel_linear_basic._calc(0)
-
-
-def test_parallel_list():
- dr = (
- driver.Builder()
- .with_modules(parallel_list)
- .enable_dynamic_execution(allow_experimental_mode=True)
- .build()
- )
- result = dr.execute(["final"])
- assert result["final"] == parallel_linear_basic._calc()
diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py
deleted file mode 100644
index e8b048485..000000000
--- a/tests/resources/dynamic_parallelism/parallel_list.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from hamilton.htypes import Collect, ParallelizableList
-
-
-# input
-def number_of_steps() -> int:
- return 6
-
-
-# expand
-def steps(number_of_steps: int) -> ParallelizableList[int]:
- return list(range(number_of_steps))
-
-
-# process
-def step_squared(steps: int) -> int:
- return steps**2
-
-
-# process
-def step_cubed(steps: int) -> int:
- return steps**3
-
-
-def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int:
- return step_squared + step_cubed
-
-
-# join
-def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int:
- out = 0
- for step in step_squared_plus_step_cubed:
- out += step
- return out
-
-
-# final
-def final(sum_step_squared_plus_step_cubed: int) -> int:
- return sum_step_squared_plus_step_cubed
-
-
-def _calc(number_of_steps: int = number_of_steps()) -> int:
- steps_ = steps(number_of_steps)
- to_sum = []
- for step_ in steps_:
- step_squared_ = step_squared(step_)
- step_cubed_ = step_cubed(step_)
- step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_)
- to_sum.append(step_squared_plus_step_cubed_)
- sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum)
- final_ = final(sum_step_squared_plus_step_cubed_)
- return final_
From 225e91b0bb1241f371c68150333b4f032fe03b0a Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:19:21 -0300
Subject: [PATCH 21/22] Conflict from commit revert fix
---
hamilton/htypes.py | 13 -------------
1 file changed, 13 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 8599d639d..95b9f41f1 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -300,19 +300,6 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle
pass
-class ParallelizableList(
- List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]
-):
- """
- Marks the output of a function node as parallelizable and also as a list.
-
- It has the same usage as "Parallelizable", but for returns that are specifically
- lists, for correct functioning of linters and other tools.
- """
-
- pass
-
-
def is_parallelizable(type: Type) -> bool:
"""
Checks if a type is parallelizable.
From 7f3899dc3a2e3fb4992df1fc246a510e0629fef6 Mon Sep 17 00:00:00 2001
From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com>
Date: Mon, 27 Jan 2025 13:32:07 -0300
Subject: [PATCH 22/22] Added Parallelizable Subclass info to docs
---
hamilton/htypes.py | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/hamilton/htypes.py b/hamilton/htypes.py
index 95b9f41f1..623cdea18 100644
--- a/hamilton/htypes.py
+++ b/hamilton/htypes.py
@@ -2,8 +2,7 @@
import sys
import typing
from typing import (
- Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar,
- Union)
+ Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union)
import typing_inspect