Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
16 changes: 9 additions & 7 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,17 @@ def expand(self, pcoll):
lambda _: 1, sum, 'count')


class _Fakes:
fn = str
class SomeTransform(beam.PTransform):
def __init__(self, *args, **kwargs):
super().__init__()

class SomeTransform(beam.PTransform):
def __init__(*args, **kwargs):
pass
def expand(self, pcoll):
return pcoll

def expand(self, pcoll):
return pcoll

class _Fakes:
fn = str
SomeTransform = SomeTransform


RENDER_DIR = None
Expand Down
174 changes: 96 additions & 78 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
#

"""This module defines the basic MapToFields operation."""
import datetime
import itertools
import json
import re
import threading
Comment thread
derrickaw marked this conversation as resolved.
from collections import abc
from collections.abc import Callable
from collections.abc import Collection
from collections.abc import Iterable
from collections.abc import Mapping
from decimal import Decimal
from typing import Any
from typing import NamedTuple
from typing import Optional
Expand Down Expand Up @@ -53,13 +57,11 @@
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn
from apache_beam.yaml.yaml_provider import dicts_to_rows

# Import js2py package if it exists
# Import quickjs package if it exists
try:
import js2py
from js2py.base import JsObjectWrapper
import quickjs
except ImportError:
js2py = None
JsObjectWrapper = object
quickjs = None

_str_expression_fields = {
'AssignTimestamps': 'timestamp',
Expand Down Expand Up @@ -178,18 +180,52 @@ def _check_mapping_arguments(
raise ValueError(f'{transform_name} cannot specify "name" without "path"')


# js2py's JsObjectWrapper object has a self-referencing __dict__ property
# that cannot be pickled without implementing the __getstate__ and
# __setstate__ methods.
class _CustomJsObjectWrapper(JsObjectWrapper):
def __init__(self, js_obj):
super().__init__(js_obj.__dict__['_obj'])
_THREAD_LOCAL_JS_CACHE = threading.local()

def __getstate__(self):
return self.__dict__.copy()

def __setstate__(self, state):
self.__dict__.update(state)
class _JsFunctionWrapper:
def __init__(self, source_code, entrypoint_name):
self.source_code = source_code
self.entrypoint_name = entrypoint_name

def _get_fn(self):
cache = _THREAD_LOCAL_JS_CACHE
if not hasattr(cache, 'functions'):
cache.functions = {}

cache_key = (self.source_code, self.entrypoint_name)
if cache_key not in cache.functions:
context = quickjs.Context()
context.eval(self.source_code)
f = context.get(self.entrypoint_name)

def call_fn(*args, run_gc=True):
def convert_arg(arg):
if isinstance(arg, (type(None), str, bool, float, int)):
return arg
else:
return context.parse_json(json.dumps(arg))

try:
result = f(*[convert_arg(a) for a in args])
if isinstance(result, quickjs.Object):
result = json.loads(result.json())
return result
finally:
if run_gc:
context.gc()

cache.functions[cache_key] = call_fn

return cache.functions[cache_key]

def __call__(self, row):
fn = self._get_fn()
try:
return dicts_to_rows(fn(py_value_to_js_dict(row)))
except Exception as exn:
raise RuntimeError(
f"Error evaluating javascript expression: {exn}") from exn


# TODO(yaml) Improve type inferencing for JS UDF's
Expand All @@ -199,6 +235,15 @@ def py_value_to_js_dict(py_value):
py_value = py_value._asdict()
if isinstance(py_value, dict):
return {key: py_value_to_js_dict(value) for key, value in py_value.items()}
elif isinstance(py_value, bytes):
return py_value.decode('utf-8', errors='replace')
Comment thread
derrickaw marked this conversation as resolved.
elif isinstance(py_value, (datetime.datetime, datetime.date, datetime.time)):
return {'__date__': True, 'value': py_value.isoformat()}
elif isinstance(py_value, Decimal):
# Coerce Decimal to float since JavaScript's standard number type is a
# 64-bit float. Note that this can cause a loss of precision for
# high-precision decimal values.
return float(py_value)
Comment thread
derrickaw marked this conversation as resolved.
elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable):
return [py_value_to_js_dict(value) for value in list(py_value)]
else:
Expand All @@ -210,80 +255,53 @@ def py_value_to_js_dict(py_value):
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):

# Check for installed js2py package
if js2py is None:
if quickjs is None:
raise ValueError(
"Javascript mapping functions are not supported on"
" Python 3.12 or later.")

# import remaining js2py objects
from js2py import base
from js2py.constructors import jsdate
from js2py.internals import simplex

js_array_type = (
base.PyJsArray,
base.PyJsArrayBuffer,
base.PyJsInt8Array,
base.PyJsUint8Array,
base.PyJsUint8ClampedArray,
base.PyJsInt16Array,
base.PyJsUint16Array,
base.PyJsInt32Array,
base.PyJsUint32Array,
base.PyJsFloat32Array,
base.PyJsFloat64Array)

def _js_object_to_py_object(obj):
if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
return base.to_python(obj)
elif isinstance(obj, js_array_type):
return [_js_object_to_py_object(value) for value in obj.to_list()]
elif isinstance(obj, jsdate.PyJsDate):
return obj.to_utc_dt()
elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
return None
elif isinstance(obj, base.PyJsError):
raise RuntimeError(obj['message'])
elif isinstance(obj, base.PyJsObject):
return {
key: _js_object_to_py_object(value['value'])
for (key, value) in obj.own.items()
}
elif isinstance(obj, base.JsObjectWrapper):
return _js_object_to_py_object(obj._obj)

return obj
"Javascript mapping functions are not supported because the "
"quickjs-ng library is not installed.")

if expression:
source = '\n'.join(['function(__row__) {'] + [
f' {name} = __row__.{name}'
for name in original_fields if name in expression
] + [' return (' + expression + ')'] + ['}'])
js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
source_code = f"""
function udf(__row__) {{
with (__row__) {{
return ({expression});
}}
}}
"""
user_entrypoint = 'udf'

elif callable:
js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
source_code = f"var __udf__ = ({callable});"
user_entrypoint = '__udf__'

else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
js_func = _CustomJsObjectWrapper(getattr(js, name))

def js_wrapper(row):
row_as_dict = py_value_to_js_dict(row)
try:
js_result = js_func(row_as_dict)
except simplex.JsException as exn:
raise RuntimeError(
f"Error evaluating javascript expression: "
f"{exn.mes['message']}") from exn
return dicts_to_rows(_js_object_to_py_object(js_result))
source_code = udf_code
user_entrypoint = name

source_code += f"""
Comment thread
derrickaw marked this conversation as resolved.
function __convert_dates__(obj) {{
if (obj && typeof obj === 'object') {{
if (obj.__date__) {{
return new Date(obj.value);
}}
for (var key in obj) {{
if (obj.hasOwnProperty(key)) {{
obj[key] = __convert_dates__(obj[key]);
}}
}}
}}
return obj;
}}

function __wrapper__(row) {{
return {user_entrypoint}(__convert_dates__(row));
}}
"""

return js_wrapper
return _JsFunctionWrapper(source_code, '__wrapper__')


def _expand_python_mapping_func(
Expand Down
Loading
Loading