diff --git a/docs/reference/decorators/with_columns.rst b/docs/reference/decorators/with_columns.rst index 1cc03d892..aba944a21 100644 --- a/docs/reference/decorators/with_columns.rst +++ b/docs/reference/decorators/with_columns.rst @@ -2,13 +2,18 @@ with_columns ======================= -** Overview ** + +-------- +Overview +-------- This is part of the hamilton pyspark integration. To install, run: -`pip install sf-hamilton[pyspark]` +``pip install sf-hamilton[pyspark]`` -**Reference Documentation** +----------------------- +Reference Documentation +----------------------- .. autoclass:: hamilton.plugins.h_spark.with_columns :special-members: __init__ diff --git a/hamilton/plugins/h_spark.py b/hamilton/plugins/h_spark.py index 56febb872..2f159f7a1 100644 --- a/hamilton/plugins/h_spark.py +++ b/hamilton/plugins/h_spark.py @@ -41,7 +41,7 @@ class SparkKoalasGraphAdapter(base.HamiltonGraphAdapter, base.ResultMixin): using the \ `Pandas API on Spark `__ - Use `pip install sf-hamilton[spark]` to get the dependencies required to run this. + Use ``pip install sf-hamilton[spark]`` to get the dependencies required to run this. Currently, this class assumes you're running SPARK 3.2+. You'd generally use this if you have an existing spark \ cluster running in your workplace, and you want to scale to very large data set sizes. @@ -371,6 +371,8 @@ def _fabricate_spark_function( return FunctionType(func_code, {**globals(), **{"partial_fn": partial_fn}}, func_name) +# TODO -- change this to have a different implementation based on the dataframe type. This will have +# to likely be custom to each dataframe type def _lambda_udf(df: DataFrame, node_: node.Node, actual_kwargs: Dict[str, Any]) -> DataFrame: """Function to create a lambda UDF for a function. @@ -757,7 +759,7 @@ def transform_node( Note that, at this point, we don't actually know which columns will come from the base dataframe, and which will come from the upstream nodes. This is handled in the - `with_columns` decorator, so for now, we need to give it enough information to topologically + ``with_columns`` decorator, so for now, we need to give it enough information to topologically sort/assign dependencies. :param node_: Node to transform @@ -948,10 +950,10 @@ def __init__( """Initializes a with_columns decorator for spark. This allows you to efficiently run groups of map operations on a dataframe, represented as pandas/primitives UDFs. This effectively "linearizes" compute -- meaning that a DAG of map operations can be run - as a set of `.withColumn` operations on a single dataframe -- ensuring that you don't have - to do a complex `extract` then `join` process on spark, which can be inefficient. + as a set of ``.withColumn`` operations on a single dataframe -- ensuring that you don't have + to do a complex ``extract`` then ``join`` process on spark, which can be inefficient. - Here's an example of calling it -- if you've seen `@subdag`, you should be familiar with + Here's an example of calling it -- if you've seen :py:class:`@subdag `, you should be familiar with the concepts: .. code-block:: python @@ -1080,12 +1082,16 @@ def create_selector_node( """ def new_callable(**kwargs) -> DataFrame: + # TODO -- change to have a `select` that's generic to the library + # Use the registry return kwargs[upstream_name].select(*columns) return node.Node( name=node_name, + # TODO -- change to have the right dataframe type (from the registry) typ=DataFrame, callabl=new_callable, + # TODO -- change to have the right dataframe type (from the registry) input_types={upstream_name: DataFrame}, ) @@ -1107,8 +1113,10 @@ def new_callable(**kwargs) -> DataFrame: return node.Node( name=node_name, + # TODO -- change to have the right dataframe type (from the registry) typ=DataFrame, callabl=new_callable, + # TODO -- change to have the right dataframe type (from the registry) input_types={upstream_name: DataFrame}, ) @@ -1195,7 +1203,9 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node column for column in node_.input_types if column in columns_passed_in_from_dataframe } # In the case that we are using pyspark UDFs + # TODO -- use the right dataframe type to do this correctly if require_columns.is_decorated_pyspark_udf(node_): + # TODO -- change to use the right "sparkification" function that is dataframe-type-agnostic sparkified = require_columns.sparkify_node( node_, current_dataframe_node, @@ -1206,6 +1216,7 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node ) # otherwise we're using pandas/primitive UDFs else: + # TODO -- change to use the right "sparkification" function that is dataframe-type-agnostic sparkified = sparkify_node_with_udf( node_, current_dataframe_node,