Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 58 additions & 22 deletions hamilton/function_modifiers/expanders.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,36 @@ def _validate_extract_fields(fields: dict):
)


async def dict_generator_async(
*args,
fn,
fill_with,
fields,
**kwargs,
):
dict_generated = await fn(*args, **kwargs)
if fill_with is not None:
for field in fields:
if field not in dict_generated:
dict_generated[field] = fill_with
return dict_generated


async def dict_generator(
*args,
fn,
fill_with,
fields,
**kwargs,
):
dict_generated = fn(*args, **kwargs)
if fill_with is not None:
for field in fields:
if field not in dict_generated:
dict_generated[field] = fill_with
return dict_generated


class extract_fields(base.SingleNodeNodeTransformer):
"""Extracts fields from a dictionary of output."""

Expand Down Expand Up @@ -804,29 +834,35 @@ def transform_node(
"""
fn = node_.callable
base_doc = node_.documentation

dict_generator_fn = (
functools.partial(dict_generator, fn=fn, fill_with=self.fill_with, fields=self.fields)
if not (inspect.iscoroutinefunction(fn))
else functools.partial(
dict_generator_async, fn=fn, fill_with=self.fill_with, fields=self.fields
)
)
# if fn is async
if inspect.iscoroutinefunction(fn):

async def dict_generator(*args, **kwargs):
dict_generated = await fn(*args, **kwargs)
if self.fill_with is not None:
for field in self.fields:
if field not in dict_generated:
dict_generated[field] = self.fill_with
return dict_generated

else:

def dict_generator(*args, **kwargs):
dict_generated = fn(*args, **kwargs)
if self.fill_with is not None:
for field in self.fields:
if field not in dict_generated:
dict_generated[field] = self.fill_with
return dict_generated

output_nodes = [node_.copy_with(callabl=dict_generator)]
# if inspect.iscoroutinefunction(fn):
#
# async def dict_generator(*args, **kwargs):
# dict_generated = await fn(*args, **kwargs)
# if self.fill_with is not None:
# for field in self.fields:
# if field not in dict_generated:
# dict_generated[field] = self.fill_with
# return dict_generated
#
# else:
#
# def dict_generator(*args, **kwargs):
# dict_generated = fn(*args, **kwargs)
# if self.fill_with is not None:
# for field in self.fields:
# if field not in dict_generated:
# dict_generated[field] = self.fill_with
# return dict_generated

output_nodes = [node_.copy_with(callabl=dict_generator_fn)]

for field, field_type in self.fields.items():
doc_string = base_doc # default doc string of base function.
Expand Down