From d3689754aa2283e9022ffb0973d6f09f09d2145c Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:45:54 -0300 Subject: [PATCH 01/22] Check for subclasses of Parallelizable --- hamilton/htypes.py | 5 +++-- hamilton/node.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index c2ac13314..5a83a31ba 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -68,7 +68,7 @@ def custom_subclass_check(requested_type: Type, param_type: Type): has_generic = True # TODO -- consider moving into a graph adapter or elsewhere -- this is perhaps a little too # low-level - if has_generic and requested_origin_type in (Parallelizable,): + if has_generic and is_parallelizable(requested_origin_type): (requested_type_arg,) = _get_args(requested_type) return custom_subclass_check(requested_type_arg, param_type) if has_generic and param_origin_type == Collect: @@ -297,11 +297,12 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass +def is_parallelizable(type:Type) -> bool: + return type == Parallelizable or Parallelizable in type.__bases__ def is_parallelizable_type(type_: Type) -> bool: return _get_origin(type_) == Parallelizable - class Collect(Iterable[CollectElement], Protocol[CollectElement]): """Marks a function node parameter as collectable. diff --git a/hamilton/node.py b/hamilton/node.py index 5755b0ed7..97002c1f6 100644 --- a/hamilton/node.py +++ b/hamilton/node.py @@ -6,7 +6,7 @@ import typing_inspect -from hamilton.htypes import Collect, Parallelizable +from hamilton.htypes import Collect, Parallelizable, is_parallelizable """ Module that contains the primitive components of the graph. @@ -285,7 +285,7 @@ def from_fn(fn: Callable, name: str = None) -> "Node": node_source = NodeType.STANDARD # TODO - extract this into a function + clean up! if typing_inspect.is_generic_type(return_type): - if typing_inspect.get_origin(return_type) == Parallelizable: + if is_parallelizable(typing_inspect.get_origin(return_type)): node_source = NodeType.EXPAND for hint in typing.get_type_hints(fn, **type_hint_kwargs).values(): if typing_inspect.is_generic_type(hint): From 47d2cf3e84b3151551e95973a7650ec7710775a5 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:48:03 -0300 Subject: [PATCH 02/22] Specific ParallelizableList for annotation --- hamilton/htypes.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 5a83a31ba..83194d9b7 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -1,7 +1,7 @@ import inspect import sys import typing -from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union +from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic import typing_inspect @@ -297,6 +297,10 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass +class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]): + pass + + def is_parallelizable(type:Type) -> bool: return type == Parallelizable or Parallelizable in type.__bases__ From 55f061f66132e86966b6a15502e8d26fb51719e2 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:22:25 -0300 Subject: [PATCH 03/22] Test for ParallelizableList --- tests/execution/test_executors.py | 12 +++++ .../dynamic_parallelism/parallel_list.py | 51 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 tests/resources/dynamic_parallelism/parallel_list.py diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py index cced38d2d..a67558458 100644 --- a/tests/execution/test_executors.py +++ b/tests/execution/test_executors.py @@ -30,6 +30,7 @@ parallel_complex, parallel_delayed, parallel_linear_basic, + parallel_list, ) ADAPTER = base.DefaultAdapter() @@ -347,3 +348,14 @@ def test_parallel_end_to_end_with_empty_list(): parallel_collect_multiple_arguments._reset_counter() res = dr.execute(["final"], overrides={"number_of_steps": 0}) assert res["final"] == parallel_linear_basic._calc(0) + + +def test_parallel_list(): + dr = ( + driver.Builder() + .with_modules(parallel_list) + .enable_dynamic_execution(allow_experimental_mode=True) + .build() + ) + result = dr.execute(["final"]) + assert result["final"] == parallel_linear_basic._calc() diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py new file mode 100644 index 000000000..e8b048485 --- /dev/null +++ b/tests/resources/dynamic_parallelism/parallel_list.py @@ -0,0 +1,51 @@ +from hamilton.htypes import Collect, ParallelizableList + + +# input +def number_of_steps() -> int: + return 6 + + +# expand +def steps(number_of_steps: int) -> ParallelizableList[int]: + return list(range(number_of_steps)) + + +# process +def step_squared(steps: int) -> int: + return steps**2 + + +# process +def step_cubed(steps: int) -> int: + return steps**3 + + +def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int: + return step_squared + step_cubed + + +# join +def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int: + out = 0 + for step in step_squared_plus_step_cubed: + out += step + return out + + +# final +def final(sum_step_squared_plus_step_cubed: int) -> int: + return sum_step_squared_plus_step_cubed + + +def _calc(number_of_steps: int = number_of_steps()) -> int: + steps_ = steps(number_of_steps) + to_sum = [] + for step_ in steps_: + step_squared_ = step_squared(step_) + step_cubed_ = step_cubed(step_) + step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_) + to_sum.append(step_squared_plus_step_cubed_) + sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum) + final_ = final(sum_step_squared_plus_step_cubed_) + return final_ From e1f1267b9e7e4a81a66ddf84ea14aeb66745d9c8 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:31:44 -0300 Subject: [PATCH 04/22] Documentation Documentation of ParallelizableList and is_parallelizable --- hamilton/htypes.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 83194d9b7..9bcc46270 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -1,7 +1,7 @@ import inspect import sys import typing -from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic +from typing import Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union import typing_inspect @@ -297,16 +297,34 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass -class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]): + +class ParallelizableList( + list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] +): + """ + Marks the output of a function node as parallelizable and also as a list. + + It has the same usage as "Parallelizable", but for returns that are specifically + lists, for correct functioning of linters and other tools. + """ + pass -def is_parallelizable(type:Type) -> bool: +def is_parallelizable(type: Type) -> bool: + """ + Checks if a type is parallelizable. + + :param type: Type to check. + :return: True if the type is parallelizable, False otherwise. + """ return type == Parallelizable or Parallelizable in type.__bases__ + def is_parallelizable_type(type_: Type) -> bool: return _get_origin(type_) == Parallelizable + class Collect(Iterable[CollectElement], Protocol[CollectElement]): """Marks a function node parameter as collectable. From 252664de8dad3d971965b3987cd70f58bbac2c06 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:39:16 -0300 Subject: [PATCH 05/22] Pre-commit fixes on node.py --- hamilton/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hamilton/node.py b/hamilton/node.py index 97002c1f6..bc16ca8b7 100644 --- a/hamilton/node.py +++ b/hamilton/node.py @@ -6,7 +6,7 @@ import typing_inspect -from hamilton.htypes import Collect, Parallelizable, is_parallelizable +from hamilton.htypes import Collect, is_parallelizable """ Module that contains the primitive components of the graph. From 0a7c0f7eb5ffbec77f2f65d1fe3bb5dbf23cc184 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:00:53 -0300 Subject: [PATCH 06/22] Fix Parallelizable checking It was throwing exception when checking against None --- hamilton/htypes.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 9bcc46270..387f19610 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -318,6 +318,9 @@ def is_parallelizable(type: Type) -> bool: :param type: Type to check. :return: True if the type is parallelizable, False otherwise. """ + if type is None: + return False + return type == Parallelizable or Parallelizable in type.__bases__ From 31538947be4a8717e9b674ab5632a9f25b92d75e Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:01:36 -0300 Subject: [PATCH 07/22] Fix ParallelizableList on 3.8 Python 3.8 does not support list[], changed to typing.List[] --- hamilton/htypes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 387f19610..afc720d2a 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -1,7 +1,7 @@ import inspect import sys import typing -from typing import Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union +from typing import Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar, Union import typing_inspect @@ -299,7 +299,7 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle class ParallelizableList( - list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] + List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] ): """ Marks the output of a function node as parallelizable and also as a list. From b6e4298d7e0307154838043f4ba222c2a4194aeb Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:16:43 -0300 Subject: [PATCH 08/22] Fix validte_examples in Windows Windows use backslashes and the generated path for the URLs was been generated with mixed "/" and "\" --- examples/validate_examples.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/validate_examples.py b/examples/validate_examples.py index d85dcbbf7..e7eabfa0e 100644 --- a/examples/validate_examples.py +++ b/examples/validate_examples.py @@ -15,13 +15,13 @@ def _create_github_badge(path: pathlib.Path) -> str: - github_url = f"https://github.com/dagworks-inc/hamilton/blob/main/{path}" + github_url = f"https://github.com/dagworks-inc/hamilton/blob/main/{path.as_posix()}" github_badge = f"[![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)]({github_url})" return github_badge def _create_colab_badge(path: pathlib.Path) -> str: - colab_url = f"https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/{path}" + colab_url = f"https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/{path.as_posix()}" colab_badge = ( f"[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)]({colab_url})" ) From 5a39b886333599c57e8d8a47cbb1ffb0b4360d5c Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:16:54 -0300 Subject: [PATCH 09/22] Parallelizable subclass example --- .../parallelizable_subclass/README.md | 11 + .../parallelizable_subclass/functions.py | 15 + .../parallelizable_subclass/notebook.ipynb | 299 ++++++++++++++++++ .../parallelizable_list.py | 16 + 4 files changed, 341 insertions(+) create mode 100644 examples/parallelism/parallelizable_subclass/README.md create mode 100644 examples/parallelism/parallelizable_subclass/functions.py create mode 100644 examples/parallelism/parallelizable_subclass/notebook.ipynb create mode 100644 examples/parallelism/parallelizable_subclass/parallelizable_list.py diff --git a/examples/parallelism/parallelizable_subclass/README.md b/examples/parallelism/parallelizable_subclass/README.md new file mode 100644 index 000000000..69ffa9ae3 --- /dev/null +++ b/examples/parallelism/parallelizable_subclass/README.md @@ -0,0 +1,11 @@ +# Parallelizable Subclass + +## Overview + +When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem. + +To solve this problem, it is possible to create subclasses of the `Parallelizable` classes, as demonstrated in this example. + +## Running + +The `notebook.ipynb` exemplifies how to use a `Parallelizable` subclass. diff --git a/examples/parallelism/parallelizable_subclass/functions.py b/examples/parallelism/parallelizable_subclass/functions.py new file mode 100644 index 000000000..a3cae5992 --- /dev/null +++ b/examples/parallelism/parallelizable_subclass/functions.py @@ -0,0 +1,15 @@ +from parallelizable_list import ParallelizableList + +from hamilton.htypes import Collect + + +def hello_list() -> ParallelizableList[str]: + return ["h", "e", "l", "l", "o", " ", "l", "i", "s", "t"] + + +def uppercase(hello_list: str) -> str: + return hello_list.upper() + + +def hello_uppercase(uppercase: Collect[str]) -> str: + return "".join(uppercase) diff --git a/examples/parallelism/parallelizable_subclass/notebook.ipynb b/examples/parallelism/parallelizable_subclass/notebook.ipynb new file mode 100644 index 000000000..b070a38f6 --- /dev/null +++ b/examples/parallelism/parallelizable_subclass/notebook.ipynb @@ -0,0 +1,299 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "#Install Hamilton if not avaiable\n", + "\n", + "try:\n", + " import hamilton\n", + "except ModuleNotFoundError:\n", + " %pip install sf-hamilton" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Parallelism: Paralellizable Subclass [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass/notebook.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass/notebook.ipynb)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem.\n", + "\n", + "To solve this problem, it is possible to create subclasses of the `Parallelizable` classes, as demonstrated in this example." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We start by importing Hamilton and the created example functions:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from hamilton import driver\n", + "\n", + "import functions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Creating a driver and displaing all the module functions, we can see the `hello_list` function, that returns a `ParallelizableList`. This is a example `Parallelizable` subclass created for annotate functions that returns `list`. Is important to note that all `Parallelizable` subclasses must return a `Iterable` subclass, as for example list.\n", + "\n", + "The `ParallelizableList` implementation can be found in the [\"parallelizable_list.py\" file](https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass/parallelizable_list.py)." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "hello_uppercase\n", + "\n", + "\n", + "hello_uppercase\n", + "str\n", + "\n", + "\n", + "\n", + "uppercase\n", + "\n", + "uppercase\n", + "str\n", + "\n", + "\n", + "\n", + "uppercase->hello_uppercase\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "hello_list\n", + "\n", + "\n", + "hello_list\n", + "ParallelizableList\n", + "\n", + "\n", + "\n", + "hello_list->uppercase\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "expand\n", + "\n", + "\n", + "expand\n", + "\n", + "\n", + "\n", + "collect\n", + "\n", + "\n", + "collect\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dr = (\n", + " driver.Builder()\n", + " .with_modules(functions)\n", + " .enable_dynamic_execution(allow_experimental_mode=True)\n", + " .build()\n", + " )\n", + "\n", + "dr.display_all_functions()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this simple example, the created flow generates a list with \"hello list\" letters, converts each letter to uppercase in parallel, and then joins the letters together:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'hello_uppercase': 'HELLO LIST'}" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dr.execute([\"hello_uppercase\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Focusing attention on the function that was annotated with ParallelizableList, running it manually we can see that it actually returns a list:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['h', 'e', 'l', 'l', 'o', ' ', 'l', 'i', 's', 't']" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "functions.hello_list()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Checking the annotation, we can see the return annotation as \"ParallelizableList[str]\":" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'return': parallelizable_list.ParallelizableList[str]}" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "functions.hello_list.__annotations__" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And, the key point of using subtypes of `Parallelizable`, it is considered a list instance:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "issubclass(functions.hello_list.__annotations__[\"return\"], list)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This means that when using a linter or static type checking, it will correctly identify the return type as a list instance." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/parallelism/parallelizable_subclass/parallelizable_list.py b/examples/parallelism/parallelizable_subclass/parallelizable_list.py new file mode 100644 index 000000000..21d9f9d0c --- /dev/null +++ b/examples/parallelism/parallelizable_subclass/parallelizable_list.py @@ -0,0 +1,16 @@ +from typing import Generic, List + +from hamilton.htypes import Parallelizable, ParallelizableElement + + +class ParallelizableList( + List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] +): + """ + Marks the output of a function node as parallelizable and also as a list. + + It has the same usage as "Parallelizable", but for returns that are specifically + lists, for correct functioning of linters and other tools. + """ + + pass From 97b9826edbd03ea135fda7a812e7e4c844c09b53 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:17:07 -0300 Subject: [PATCH 10/22] Revert "Test for ParallelizableList" This reverts commit ec92486ceecdd1f73441450fbfa9da7e86443d6e. --- tests/execution/test_executors.py | 12 ----- .../dynamic_parallelism/parallel_list.py | 51 ------------------- 2 files changed, 63 deletions(-) delete mode 100644 tests/resources/dynamic_parallelism/parallel_list.py diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py index a67558458..cced38d2d 100644 --- a/tests/execution/test_executors.py +++ b/tests/execution/test_executors.py @@ -30,7 +30,6 @@ parallel_complex, parallel_delayed, parallel_linear_basic, - parallel_list, ) ADAPTER = base.DefaultAdapter() @@ -348,14 +347,3 @@ def test_parallel_end_to_end_with_empty_list(): parallel_collect_multiple_arguments._reset_counter() res = dr.execute(["final"], overrides={"number_of_steps": 0}) assert res["final"] == parallel_linear_basic._calc(0) - - -def test_parallel_list(): - dr = ( - driver.Builder() - .with_modules(parallel_list) - .enable_dynamic_execution(allow_experimental_mode=True) - .build() - ) - result = dr.execute(["final"]) - assert result["final"] == parallel_linear_basic._calc() diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py deleted file mode 100644 index e8b048485..000000000 --- a/tests/resources/dynamic_parallelism/parallel_list.py +++ /dev/null @@ -1,51 +0,0 @@ -from hamilton.htypes import Collect, ParallelizableList - - -# input -def number_of_steps() -> int: - return 6 - - -# expand -def steps(number_of_steps: int) -> ParallelizableList[int]: - return list(range(number_of_steps)) - - -# process -def step_squared(steps: int) -> int: - return steps**2 - - -# process -def step_cubed(steps: int) -> int: - return steps**3 - - -def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int: - return step_squared + step_cubed - - -# join -def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int: - out = 0 - for step in step_squared_plus_step_cubed: - out += step - return out - - -# final -def final(sum_step_squared_plus_step_cubed: int) -> int: - return sum_step_squared_plus_step_cubed - - -def _calc(number_of_steps: int = number_of_steps()) -> int: - steps_ = steps(number_of_steps) - to_sum = [] - for step_ in steps_: - step_squared_ = step_squared(step_) - step_cubed_ = step_cubed(step_) - step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_) - to_sum.append(step_squared_plus_step_cubed_) - sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum) - final_ = final(sum_step_squared_plus_step_cubed_) - return final_ From ca091f4405ba30646c2843ca7accbd0808865bfc Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:19:21 -0300 Subject: [PATCH 11/22] Conflict from commit revert fix --- hamilton/htypes.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index afc720d2a..e6d52cc9f 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -1,7 +1,7 @@ import inspect import sys import typing -from typing import Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar, Union +from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union import typing_inspect @@ -298,19 +298,6 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass -class ParallelizableList( - List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] -): - """ - Marks the output of a function node as parallelizable and also as a list. - - It has the same usage as "Parallelizable", but for returns that are specifically - lists, for correct functioning of linters and other tools. - """ - - pass - - def is_parallelizable(type: Type) -> bool: """ Checks if a type is parallelizable. From 6b65aa25d5e9dc7394cb19d233211dcb11c0aad6 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:32:07 -0300 Subject: [PATCH 12/22] Added Parallelizable Subclass info to docs --- docs/concepts/parallel-task.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/concepts/parallel-task.rst b/docs/concepts/parallel-task.rst index b3898b0e9..78deb613c 100644 --- a/docs/concepts/parallel-task.rst +++ b/docs/concepts/parallel-task.rst @@ -266,3 +266,10 @@ to set what should be determined to be collected at DAG construction time: # Then in the driver building pass in the configuration: .with_config(_config) + +Parallelizable Subclassing +============= + +When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem. + +To solve this problem, it is possible to create subclasses of the `Parallelizable` classes. The ["Parallelizable Subclass" example](https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/parallelizable_subclass) showcases how to do that. From f915e1becfd29a9e51c7920ac082a7b8093139c7 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:41:56 -0300 Subject: [PATCH 13/22] Fix documentation title --- docs/concepts/parallel-task.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/parallel-task.rst b/docs/concepts/parallel-task.rst index 78deb613c..a85cff6c2 100644 --- a/docs/concepts/parallel-task.rst +++ b/docs/concepts/parallel-task.rst @@ -268,7 +268,7 @@ to set what should be determined to be collected at DAG construction time: .with_config(_config) Parallelizable Subclassing -============= +========================== When annotating a function with `Parallelizable`, it is not possible to specify in the annotation what the type returned by the function will actually be, and these are not identified by a linter or other tools as static type checking. Especially for functions that can be used with or without Hamilton, this can be a problem. From 9168c912609abc5f51f6bc0005def947e59e32d0 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:45:54 -0300 Subject: [PATCH 14/22] Check for subclasses of Parallelizable --- hamilton/htypes.py | 3 ++- hamilton/node.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index e6d52cc9f..9188ccc7c 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -297,6 +297,8 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass +def is_parallelizable(type:Type) -> bool: + return type == Parallelizable or Parallelizable in type.__bases__ def is_parallelizable(type: Type) -> bool: """ @@ -314,7 +316,6 @@ def is_parallelizable(type: Type) -> bool: def is_parallelizable_type(type_: Type) -> bool: return _get_origin(type_) == Parallelizable - class Collect(Iterable[CollectElement], Protocol[CollectElement]): """Marks a function node parameter as collectable. diff --git a/hamilton/node.py b/hamilton/node.py index bc16ca8b7..97002c1f6 100644 --- a/hamilton/node.py +++ b/hamilton/node.py @@ -6,7 +6,7 @@ import typing_inspect -from hamilton.htypes import Collect, is_parallelizable +from hamilton.htypes import Collect, Parallelizable, is_parallelizable """ Module that contains the primitive components of the graph. From 12c838fcf6913ca42b6003ad68761f8ca5a8fe9d Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:48:03 -0300 Subject: [PATCH 15/22] Specific ParallelizableList for annotation --- hamilton/htypes.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 9188ccc7c..7148c32d7 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -1,7 +1,7 @@ import inspect import sys import typing -from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union +from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic import typing_inspect @@ -297,6 +297,10 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass +class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]): + pass + + def is_parallelizable(type:Type) -> bool: return type == Parallelizable or Parallelizable in type.__bases__ From b195758290aec321c1ebeee7f3c3c29b606e83c7 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:22:25 -0300 Subject: [PATCH 16/22] Test for ParallelizableList --- tests/execution/test_executors.py | 12 +++++ .../dynamic_parallelism/parallel_list.py | 51 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 tests/resources/dynamic_parallelism/parallel_list.py diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py index cced38d2d..a67558458 100644 --- a/tests/execution/test_executors.py +++ b/tests/execution/test_executors.py @@ -30,6 +30,7 @@ parallel_complex, parallel_delayed, parallel_linear_basic, + parallel_list, ) ADAPTER = base.DefaultAdapter() @@ -347,3 +348,14 @@ def test_parallel_end_to_end_with_empty_list(): parallel_collect_multiple_arguments._reset_counter() res = dr.execute(["final"], overrides={"number_of_steps": 0}) assert res["final"] == parallel_linear_basic._calc(0) + + +def test_parallel_list(): + dr = ( + driver.Builder() + .with_modules(parallel_list) + .enable_dynamic_execution(allow_experimental_mode=True) + .build() + ) + result = dr.execute(["final"]) + assert result["final"] == parallel_linear_basic._calc() diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py new file mode 100644 index 000000000..e8b048485 --- /dev/null +++ b/tests/resources/dynamic_parallelism/parallel_list.py @@ -0,0 +1,51 @@ +from hamilton.htypes import Collect, ParallelizableList + + +# input +def number_of_steps() -> int: + return 6 + + +# expand +def steps(number_of_steps: int) -> ParallelizableList[int]: + return list(range(number_of_steps)) + + +# process +def step_squared(steps: int) -> int: + return steps**2 + + +# process +def step_cubed(steps: int) -> int: + return steps**3 + + +def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int: + return step_squared + step_cubed + + +# join +def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int: + out = 0 + for step in step_squared_plus_step_cubed: + out += step + return out + + +# final +def final(sum_step_squared_plus_step_cubed: int) -> int: + return sum_step_squared_plus_step_cubed + + +def _calc(number_of_steps: int = number_of_steps()) -> int: + steps_ = steps(number_of_steps) + to_sum = [] + for step_ in steps_: + step_squared_ = step_squared(step_) + step_cubed_ = step_cubed(step_) + step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_) + to_sum.append(step_squared_plus_step_cubed_) + sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum) + final_ = final(sum_step_squared_plus_step_cubed_) + return final_ From 7939d7a51e994ecde9852fe391cec47e12452527 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:31:44 -0300 Subject: [PATCH 17/22] Documentation Documentation of ParallelizableList and is_parallelizable --- hamilton/htypes.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 7148c32d7..eba8ddca5 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -1,7 +1,8 @@ import inspect import sys import typing -from typing import Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union, Generic +from typing import ( + Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union) import typing_inspect @@ -297,12 +298,19 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass -class ParallelizableList(list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement]): - pass +class ParallelizableList( + list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] +): + """ + Marks the output of a function node as parallelizable and also as a list. + + It has the same usage as "Parallelizable", but for returns that are specifically + lists, for correct functioning of linters and other tools. + """ + + pass -def is_parallelizable(type:Type) -> bool: - return type == Parallelizable or Parallelizable in type.__bases__ def is_parallelizable(type: Type) -> bool: """ @@ -320,6 +328,7 @@ def is_parallelizable(type: Type) -> bool: def is_parallelizable_type(type_: Type) -> bool: return _get_origin(type_) == Parallelizable + class Collect(Iterable[CollectElement], Protocol[CollectElement]): """Marks a function node parameter as collectable. From f422559be36852cbea4e817e1ff7842d0ff5f4aa Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:39:16 -0300 Subject: [PATCH 18/22] Pre-commit fixes on node.py --- hamilton/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hamilton/node.py b/hamilton/node.py index 97002c1f6..bc16ca8b7 100644 --- a/hamilton/node.py +++ b/hamilton/node.py @@ -6,7 +6,7 @@ import typing_inspect -from hamilton.htypes import Collect, Parallelizable, is_parallelizable +from hamilton.htypes import Collect, is_parallelizable """ Module that contains the primitive components of the graph. From d8f4a99c26597e8b77c595bb79cbcc70a9781742 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:01:36 -0300 Subject: [PATCH 19/22] Fix ParallelizableList on 3.8 Python 3.8 does not support list[], changed to typing.List[] --- hamilton/htypes.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index eba8ddca5..8599d639d 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -2,7 +2,8 @@ import sys import typing from typing import ( - Any, Generic, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union) + Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar, + Union) import typing_inspect @@ -300,7 +301,7 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle class ParallelizableList( - list[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] + List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] ): """ Marks the output of a function node as parallelizable and also as a list. From 783d0b9b836f51039a4c9a361e63717a8abd5f6f Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:17:07 -0300 Subject: [PATCH 20/22] Revert "Test for ParallelizableList" This reverts commit ec92486ceecdd1f73441450fbfa9da7e86443d6e. --- tests/execution/test_executors.py | 12 ----- .../dynamic_parallelism/parallel_list.py | 51 ------------------- 2 files changed, 63 deletions(-) delete mode 100644 tests/resources/dynamic_parallelism/parallel_list.py diff --git a/tests/execution/test_executors.py b/tests/execution/test_executors.py index a67558458..cced38d2d 100644 --- a/tests/execution/test_executors.py +++ b/tests/execution/test_executors.py @@ -30,7 +30,6 @@ parallel_complex, parallel_delayed, parallel_linear_basic, - parallel_list, ) ADAPTER = base.DefaultAdapter() @@ -348,14 +347,3 @@ def test_parallel_end_to_end_with_empty_list(): parallel_collect_multiple_arguments._reset_counter() res = dr.execute(["final"], overrides={"number_of_steps": 0}) assert res["final"] == parallel_linear_basic._calc(0) - - -def test_parallel_list(): - dr = ( - driver.Builder() - .with_modules(parallel_list) - .enable_dynamic_execution(allow_experimental_mode=True) - .build() - ) - result = dr.execute(["final"]) - assert result["final"] == parallel_linear_basic._calc() diff --git a/tests/resources/dynamic_parallelism/parallel_list.py b/tests/resources/dynamic_parallelism/parallel_list.py deleted file mode 100644 index e8b048485..000000000 --- a/tests/resources/dynamic_parallelism/parallel_list.py +++ /dev/null @@ -1,51 +0,0 @@ -from hamilton.htypes import Collect, ParallelizableList - - -# input -def number_of_steps() -> int: - return 6 - - -# expand -def steps(number_of_steps: int) -> ParallelizableList[int]: - return list(range(number_of_steps)) - - -# process -def step_squared(steps: int) -> int: - return steps**2 - - -# process -def step_cubed(steps: int) -> int: - return steps**3 - - -def step_squared_plus_step_cubed(step_squared: int, step_cubed: int) -> int: - return step_squared + step_cubed - - -# join -def sum_step_squared_plus_step_cubed(step_squared_plus_step_cubed: Collect[int]) -> int: - out = 0 - for step in step_squared_plus_step_cubed: - out += step - return out - - -# final -def final(sum_step_squared_plus_step_cubed: int) -> int: - return sum_step_squared_plus_step_cubed - - -def _calc(number_of_steps: int = number_of_steps()) -> int: - steps_ = steps(number_of_steps) - to_sum = [] - for step_ in steps_: - step_squared_ = step_squared(step_) - step_cubed_ = step_cubed(step_) - step_squared_plus_step_cubed_ = step_squared_plus_step_cubed(step_squared_, step_cubed_) - to_sum.append(step_squared_plus_step_cubed_) - sum_step_squared_plus_step_cubed_ = sum_step_squared_plus_step_cubed(to_sum) - final_ = final(sum_step_squared_plus_step_cubed_) - return final_ From 225e91b0bb1241f371c68150333b4f032fe03b0a Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:19:21 -0300 Subject: [PATCH 21/22] Conflict from commit revert fix --- hamilton/htypes.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 8599d639d..95b9f41f1 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -300,19 +300,6 @@ class Parallelizable(Iterable[ParallelizableElement], Protocol[ParallelizableEle pass -class ParallelizableList( - List[ParallelizableElement], Parallelizable, Generic[ParallelizableElement] -): - """ - Marks the output of a function node as parallelizable and also as a list. - - It has the same usage as "Parallelizable", but for returns that are specifically - lists, for correct functioning of linters and other tools. - """ - - pass - - def is_parallelizable(type: Type) -> bool: """ Checks if a type is parallelizable. From 7f3899dc3a2e3fb4992df1fc246a510e0629fef6 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:32:07 -0300 Subject: [PATCH 22/22] Added Parallelizable Subclass info to docs --- hamilton/htypes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hamilton/htypes.py b/hamilton/htypes.py index 95b9f41f1..623cdea18 100644 --- a/hamilton/htypes.py +++ b/hamilton/htypes.py @@ -2,8 +2,7 @@ import sys import typing from typing import ( - Any, Generic, Iterable, List, Optional, Protocol, Tuple, Type, TypeVar, - Union) + Any, Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union) import typing_inspect