From a7cf5c75e5497dfb3e7768939ae054c8942a1dba Mon Sep 17 00:00:00 2001 From: Jonathan George Date: Fri, 10 Jan 2025 12:14:35 +0000 Subject: [PATCH 1/6] Add OpenTelemetry API package and tracing helpers. --- poetry.lock | 108 +++++++++++++++++++++++- pyproject.toml | 1 + src/corvus_python/monitoring/tracing.py | 76 +++++++++++++++++ 3 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 src/corvus_python/monitoring/tracing.py diff --git a/poetry.lock b/poetry.lock index d98f66b..414a85b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -699,6 +699,23 @@ files = [ importlib-metadata = ">=1.0.0" pyspark = ">=3.3.0,<3.4.0" +[[package]] +name = "deprecated" +version = "1.2.15" +description = "Python @deprecated decorator to deprecate old python classes, functions or methods." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.7" +files = [ + {file = "Deprecated-1.2.15-py2.py3-none-any.whl", hash = "sha256:353bc4a8ac4bfc96800ddab349d89c25dec1079f65fd53acdcc1e0b975b21320"}, + {file = "deprecated-1.2.15.tar.gz", hash = "sha256:683e561a90de76239796e6b6feac66b99030d2dd3fcf61ef996330f14bbb9b0d"}, +] + +[package.dependencies] +wrapt = ">=1.10,<2" + +[package.extras] +dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "jinja2 (>=3.0.3,<3.1.0)", "setuptools", "sphinx (<2)", "tox"] + [[package]] name = "docxcompose" version = "1.4.0" @@ -1897,6 +1914,21 @@ files = [ {file = "numpy-2.1.3.tar.gz", hash = "sha256:aa08e04e08aaf974d4458def539dece0d28146d866a39da5639596f4921fd761"}, ] +[[package]] +name = "opentelemetry-api" +version = "1.29.0" +description = "OpenTelemetry Python API" +optional = false +python-versions = ">=3.8" +files = [ + {file = "opentelemetry_api-1.29.0-py3-none-any.whl", hash = "sha256:5fcd94c4141cc49c736271f3e1efb777bebe9cc535759c54c936cca4f1b312b8"}, + {file = "opentelemetry_api-1.29.0.tar.gz", hash = "sha256:d04a6cf78aad09614f52964ecb38021e248f5714dc32c2e0d8fd99517b4d69cf"}, +] + +[package.dependencies] +deprecated = ">=1.2.6" +importlib-metadata = ">=6.0,<=8.5.0" + [[package]] name = "overrides" version = "7.7.0" @@ -3113,6 +3145,80 @@ files = [ {file = "widgetsnbextension-4.0.13.tar.gz", hash = "sha256:ffcb67bc9febd10234a362795f643927f4e0c05d9342c727b65d2384f8feacb6"}, ] +[[package]] +name = "wrapt" +version = "1.17.0" +description = "Module for decorators, wrappers and monkey patching." +optional = false +python-versions = ">=3.8" +files = [ + {file = "wrapt-1.17.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2a0c23b8319848426f305f9cb0c98a6e32ee68a36264f45948ccf8e7d2b941f8"}, + {file = "wrapt-1.17.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b1ca5f060e205f72bec57faae5bd817a1560fcfc4af03f414b08fa29106b7e2d"}, + {file = "wrapt-1.17.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e185ec6060e301a7e5f8461c86fb3640a7beb1a0f0208ffde7a65ec4074931df"}, + {file = "wrapt-1.17.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bb90765dd91aed05b53cd7a87bd7f5c188fcd95960914bae0d32c5e7f899719d"}, + {file = "wrapt-1.17.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:879591c2b5ab0a7184258274c42a126b74a2c3d5a329df16d69f9cee07bba6ea"}, + {file = "wrapt-1.17.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:fce6fee67c318fdfb7f285c29a82d84782ae2579c0e1b385b7f36c6e8074fffb"}, + {file = "wrapt-1.17.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:0698d3a86f68abc894d537887b9bbf84d29bcfbc759e23f4644be27acf6da301"}, + {file = "wrapt-1.17.0-cp310-cp310-win32.whl", hash = "sha256:69d093792dc34a9c4c8a70e4973a3361c7a7578e9cd86961b2bbf38ca71e4e22"}, + {file = "wrapt-1.17.0-cp310-cp310-win_amd64.whl", hash = "sha256:f28b29dc158ca5d6ac396c8e0a2ef45c4e97bb7e65522bfc04c989e6fe814575"}, + {file = "wrapt-1.17.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:74bf625b1b4caaa7bad51d9003f8b07a468a704e0644a700e936c357c17dd45a"}, + {file = "wrapt-1.17.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0f2a28eb35cf99d5f5bd12f5dd44a0f41d206db226535b37b0c60e9da162c3ed"}, + {file = "wrapt-1.17.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:81b1289e99cf4bad07c23393ab447e5e96db0ab50974a280f7954b071d41b489"}, + {file = "wrapt-1.17.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9f2939cd4a2a52ca32bc0b359015718472d7f6de870760342e7ba295be9ebaf9"}, + {file = "wrapt-1.17.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:6a9653131bda68a1f029c52157fd81e11f07d485df55410401f745007bd6d339"}, + {file = "wrapt-1.17.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:4e4b4385363de9052dac1a67bfb535c376f3d19c238b5f36bddc95efae15e12d"}, + {file = "wrapt-1.17.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:bdf62d25234290db1837875d4dceb2151e4ea7f9fff2ed41c0fde23ed542eb5b"}, + {file = "wrapt-1.17.0-cp311-cp311-win32.whl", hash = "sha256:5d8fd17635b262448ab8f99230fe4dac991af1dabdbb92f7a70a6afac8a7e346"}, + {file = "wrapt-1.17.0-cp311-cp311-win_amd64.whl", hash = "sha256:92a3d214d5e53cb1db8b015f30d544bc9d3f7179a05feb8f16df713cecc2620a"}, + {file = "wrapt-1.17.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:89fc28495896097622c3fc238915c79365dd0ede02f9a82ce436b13bd0ab7569"}, + {file = "wrapt-1.17.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:875d240fdbdbe9e11f9831901fb8719da0bd4e6131f83aa9f69b96d18fae7504"}, + {file = "wrapt-1.17.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e5ed16d95fd142e9c72b6c10b06514ad30e846a0d0917ab406186541fe68b451"}, + {file = "wrapt-1.17.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:18b956061b8db634120b58f668592a772e87e2e78bc1f6a906cfcaa0cc7991c1"}, + {file = "wrapt-1.17.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:daba396199399ccabafbfc509037ac635a6bc18510ad1add8fd16d4739cdd106"}, + {file = "wrapt-1.17.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:4d63f4d446e10ad19ed01188d6c1e1bb134cde8c18b0aa2acfd973d41fcc5ada"}, + {file = "wrapt-1.17.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:8a5e7cc39a45fc430af1aefc4d77ee6bad72c5bcdb1322cfde852c15192b8bd4"}, + {file = "wrapt-1.17.0-cp312-cp312-win32.whl", hash = "sha256:0a0a1a1ec28b641f2a3a2c35cbe86c00051c04fffcfcc577ffcdd707df3f8635"}, + {file = "wrapt-1.17.0-cp312-cp312-win_amd64.whl", hash = "sha256:3c34f6896a01b84bab196f7119770fd8466c8ae3dfa73c59c0bb281e7b588ce7"}, + {file = "wrapt-1.17.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:714c12485aa52efbc0fc0ade1e9ab3a70343db82627f90f2ecbc898fdf0bb181"}, + {file = "wrapt-1.17.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:da427d311782324a376cacb47c1a4adc43f99fd9d996ffc1b3e8529c4074d393"}, + {file = "wrapt-1.17.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ba1739fb38441a27a676f4de4123d3e858e494fac05868b7a281c0a383c098f4"}, + {file = "wrapt-1.17.0-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e711fc1acc7468463bc084d1b68561e40d1eaa135d8c509a65dd534403d83d7b"}, + {file = "wrapt-1.17.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:140ea00c87fafc42739bd74a94a5a9003f8e72c27c47cd4f61d8e05e6dec8721"}, + {file = "wrapt-1.17.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:73a96fd11d2b2e77d623a7f26e004cc31f131a365add1ce1ce9a19e55a1eef90"}, + {file = "wrapt-1.17.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:0b48554952f0f387984da81ccfa73b62e52817a4386d070c75e4db7d43a28c4a"}, + {file = "wrapt-1.17.0-cp313-cp313-win32.whl", hash = "sha256:498fec8da10e3e62edd1e7368f4b24aa362ac0ad931e678332d1b209aec93045"}, + {file = "wrapt-1.17.0-cp313-cp313-win_amd64.whl", hash = "sha256:fd136bb85f4568fffca995bd3c8d52080b1e5b225dbf1c2b17b66b4c5fa02838"}, + {file = "wrapt-1.17.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:17fcf043d0b4724858f25b8826c36e08f9fb2e475410bece0ec44a22d533da9b"}, + {file = "wrapt-1.17.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4a557d97f12813dc5e18dad9fa765ae44ddd56a672bb5de4825527c847d6379"}, + {file = "wrapt-1.17.0-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0229b247b0fc7dee0d36176cbb79dbaf2a9eb7ecc50ec3121f40ef443155fb1d"}, + {file = "wrapt-1.17.0-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8425cfce27b8b20c9b89d77fb50e368d8306a90bf2b6eef2cdf5cd5083adf83f"}, + {file = "wrapt-1.17.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:9c900108df470060174108012de06d45f514aa4ec21a191e7ab42988ff42a86c"}, + {file = "wrapt-1.17.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:4e547b447073fc0dbfcbff15154c1be8823d10dab4ad401bdb1575e3fdedff1b"}, + {file = "wrapt-1.17.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:914f66f3b6fc7b915d46c1cc424bc2441841083de01b90f9e81109c9759e43ab"}, + {file = "wrapt-1.17.0-cp313-cp313t-win32.whl", hash = "sha256:a4192b45dff127c7d69b3bdfb4d3e47b64179a0b9900b6351859f3001397dabf"}, + {file = "wrapt-1.17.0-cp313-cp313t-win_amd64.whl", hash = "sha256:4f643df3d4419ea3f856c5c3f40fec1d65ea2e89ec812c83f7767c8730f9827a"}, + {file = "wrapt-1.17.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:69c40d4655e078ede067a7095544bcec5a963566e17503e75a3a3e0fe2803b13"}, + {file = "wrapt-1.17.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2f495b6754358979379f84534f8dd7a43ff8cff2558dcdea4a148a6e713a758f"}, + {file = "wrapt-1.17.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:baa7ef4e0886a6f482e00d1d5bcd37c201b383f1d314643dfb0367169f94f04c"}, + {file = "wrapt-1.17.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8fc931382e56627ec4acb01e09ce66e5c03c384ca52606111cee50d931a342d"}, + {file = "wrapt-1.17.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:8f8909cdb9f1b237786c09a810e24ee5e15ef17019f7cecb207ce205b9b5fcce"}, + {file = "wrapt-1.17.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:ad47b095f0bdc5585bced35bd088cbfe4177236c7df9984b3cc46b391cc60627"}, + {file = "wrapt-1.17.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:948a9bd0fb2c5120457b07e59c8d7210cbc8703243225dbd78f4dfc13c8d2d1f"}, + {file = "wrapt-1.17.0-cp38-cp38-win32.whl", hash = "sha256:5ae271862b2142f4bc687bdbfcc942e2473a89999a54231aa1c2c676e28f29ea"}, + {file = "wrapt-1.17.0-cp38-cp38-win_amd64.whl", hash = "sha256:f335579a1b485c834849e9075191c9898e0731af45705c2ebf70e0cd5d58beed"}, + {file = "wrapt-1.17.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:d751300b94e35b6016d4b1e7d0e7bbc3b5e1751e2405ef908316c2a9024008a1"}, + {file = "wrapt-1.17.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7264cbb4a18dc4acfd73b63e4bcfec9c9802614572025bdd44d0721983fc1d9c"}, + {file = "wrapt-1.17.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:33539c6f5b96cf0b1105a0ff4cf5db9332e773bb521cc804a90e58dc49b10578"}, + {file = "wrapt-1.17.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c30970bdee1cad6a8da2044febd824ef6dc4cc0b19e39af3085c763fdec7de33"}, + {file = "wrapt-1.17.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:bc7f729a72b16ee21795a943f85c6244971724819819a41ddbaeb691b2dd85ad"}, + {file = "wrapt-1.17.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:6ff02a91c4fc9b6a94e1c9c20f62ea06a7e375f42fe57587f004d1078ac86ca9"}, + {file = "wrapt-1.17.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:2dfb7cff84e72e7bf975b06b4989477873dcf160b2fd89959c629535df53d4e0"}, + {file = "wrapt-1.17.0-cp39-cp39-win32.whl", hash = "sha256:2399408ac33ffd5b200480ee858baa58d77dd30e0dd0cab6a8a9547135f30a88"}, + {file = "wrapt-1.17.0-cp39-cp39-win_amd64.whl", hash = "sha256:4f763a29ee6a20c529496a20a7bcb16a73de27f5da6a843249c7047daf135977"}, + {file = "wrapt-1.17.0-py3-none-any.whl", hash = "sha256:d2c63b93548eda58abf5188e505ffed0229bf675f7c3090f8e36ad55b8cbc371"}, + {file = "wrapt-1.17.0.tar.gz", hash = "sha256:16187aa2317c731170a88ef35e8937ae0f533c402872c1ee5e6d079fcf320801"}, +] + [[package]] name = "zipp" version = "3.21.0" @@ -3135,4 +3241,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "~3.10" -content-hash = "05c42897239ce6c5efc5ad03fff9f64272412510859c88063abde65afbc872da" +content-hash = "8faf09facd975d4a85935e7e4c718a09f293772f275afe52f803b520f6d58621" diff --git a/pyproject.toml b/pyproject.toml index 71a8af5..bebab12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ azure-identity = "^1.15.0" docxtpl = "^0.16.7" pyodbc = "^5.1.0" dummy-notebookutils = "^1.5.1" +opentelemetry-api = "^1.29.0" [tool.poetry.group.dev.dependencies] jupyter = "~1.0.0" diff --git a/src/corvus_python/monitoring/tracing.py b/src/corvus_python/monitoring/tracing.py new file mode 100644 index 0000000..e5eb558 --- /dev/null +++ b/src/corvus_python/monitoring/tracing.py @@ -0,0 +1,76 @@ +from opentelemetry import trace +from functools import wraps + + +def start_as_current_span_with_method_name(tracer: trace.Tracer): + """ + Function decorator which starts a new span with the full name of the method (i.e. class_name.method_name for + methods within classes, or just method_name for standalone functions) as the span name. The span is then set as + the current span for the duration of the method call and can be accessed using trace.get_current_span(). + + Args: + tracer (trace.Tracer): The tracer to use for starting the span. Create a tracer for the source file using + trace.get_tracer(__name__) and pass it to this decorator. + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + with tracer.start_as_current_span(name=func.__qualname__): + return func(*args, **kwargs) + + return wrapper + + return decorator + + +def all_methods_start_new_current_span_with_method_name(tracer: trace.Tracer): + """ + Class decorator which applies start_as_current_span_with_method_name to all methods within the class. + + Args: + tracer (trace.Tracer): The tracer to use for starting the span. Create a tracer for the source file using + trace.get_tracer(__name__) and pass it to this decorator. + """ + + decorator = start_as_current_span_with_method_name(tracer) + + def decorate(cls): + for attr in cls.__dict__: + item = getattr(cls, attr) + if callable(item): + setattr(cls, attr, decorator(item)) + + return cls + + return decorate + + +def add_kwargs_to_span(span: trace.Span, keys: list[str], source_kwargs: dict[str, any]): + """ + Adds the specified keys from the source_kwargs dictionary to the span as attributes. + + Args: + span (trace.Span): The span to add the attributes to. + keys (list[str]): The keys from the source_kwargs to add to the span. These are manually specified to avoid + adding sensitive information to the span. + source_kwargs (dict[str, any]): The dictionary to get the values from. This is typically the kwargs + dictionary of the method being traced. + """ + for key in keys: + if key in source_kwargs: + span.set_attribute(key, str(source_kwargs[key])) + + +def add_kwargs_to_current_span(keys: list[str], source_kwargs: dict[str, any]): + """ + Adds the specified keys from the source_kwargs dictionary to the current span as attributes. + + Args: + keys (list[str]): The keys from the source_kwargs to add to the span. These are manually specified to avoid + adding sensitive information to the span. + source_kwargs (dict[str, any]): The dictionary to get the values from. This is typically the kwargs + dictionary of the method being traced. + """ + span = trace.get_current_span() + add_kwargs_to_span(span, keys, source_kwargs) From df261d2478b5632665af61fe0ed5440836a88b53 Mon Sep 17 00:00:00 2001 From: Jonathan George Date: Fri, 10 Jan 2025 17:01:34 +0000 Subject: [PATCH 2/6] Apply instrumentation --- src/corvus_python/monitoring/tracing.py | 27 ++++++++++++++++--- .../session_utils/local_spark_session.py | 5 ++++ .../spark_utils/local_spark_utils.py | 7 +++++ src/corvus_python/synapse/synapse_utils.py | 5 ++++ .../word_document_generator.py | 5 ++++ 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/src/corvus_python/monitoring/tracing.py b/src/corvus_python/monitoring/tracing.py index e5eb558..c65ad74 100644 --- a/src/corvus_python/monitoring/tracing.py +++ b/src/corvus_python/monitoring/tracing.py @@ -46,6 +46,28 @@ def decorate(cls): return decorate +def add_attributes_to_span(span: trace.Span, **kwargs: dict[str, any]): + """ + Adds the specified key-value pairs to the specified span as attributes. + + Args: + **kwargs: The key-value pairs to add to the span as attributes. + """ + if span is not None: + kwargs_as_strings = {k: str(v) for k, v in kwargs.items()} + span.set_attributes(kwargs_as_strings) + + +def add_attributes_to_current_span(**kwargs: dict[str, any]): + """ + Adds the specified key-value pairs to the current span as attributes. + + Args: + **kwargs: The key-value pairs to add to the span as attributes. + """ + add_attributes_to_span(trace.get_current_span(), **kwargs) + + def add_kwargs_to_span(span: trace.Span, keys: list[str], source_kwargs: dict[str, any]): """ Adds the specified keys from the source_kwargs dictionary to the span as attributes. @@ -57,9 +79,8 @@ def add_kwargs_to_span(span: trace.Span, keys: list[str], source_kwargs: dict[st source_kwargs (dict[str, any]): The dictionary to get the values from. This is typically the kwargs dictionary of the method being traced. """ - for key in keys: - if key in source_kwargs: - span.set_attribute(key, str(source_kwargs[key])) + kwargs_to_add = {key: source_kwargs[key] for key in keys if key in source_kwargs} + add_attributes_to_span(span, **kwargs_to_add) def add_kwargs_to_current_span(keys: list[str], source_kwargs: dict[str, any]): diff --git a/src/corvus_python/pyspark/utilities/session_utils/local_spark_session.py b/src/corvus_python/pyspark/utilities/session_utils/local_spark_session.py index 9068ad5..8b39238 100644 --- a/src/corvus_python/pyspark/utilities/session_utils/local_spark_session.py +++ b/src/corvus_python/pyspark/utilities/session_utils/local_spark_session.py @@ -5,6 +5,10 @@ from delta import configure_spark_with_delta_pip import os from ...storage import StorageConfiguration, LocalFileSystemStorageConfiguration +from ....monitoring.tracing import all_methods_start_new_current_span_with_method_name +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) CWD = os.path.join(os.getcwd()) @@ -47,6 +51,7 @@ class LocalSparkSessionConfig(): @dataclass +@all_methods_start_new_current_span_with_method_name(tracer) class LocalSparkSession(): """Class to represent a Local Spark session. Attributes: diff --git a/src/corvus_python/pyspark/utilities/spark_utils/local_spark_utils.py b/src/corvus_python/pyspark/utilities/spark_utils/local_spark_utils.py index 8ef4e2e..94f9204 100644 --- a/src/corvus_python/pyspark/utilities/spark_utils/local_spark_utils.py +++ b/src/corvus_python/pyspark/utilities/spark_utils/local_spark_utils.py @@ -1,6 +1,10 @@ """Copyright (c) Endjin Limited. All rights reserved.""" from corvus_python.auth import get_az_cli_token +from ....monitoring.tracing import all_methods_start_new_current_span_with_method_name +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) class LSRLinkedServiceFailure(Exception): @@ -46,6 +50,7 @@ def __init__(self, secret_name: str): super().__init__(self.message) +@all_methods_start_new_current_span_with_method_name(tracer) class LocalCredentialUtils(): """Class which mirrors elements of the mssparkutils.credentials API. Intentionally not a full representation - additional methods will be added to it as and when the need arises. @@ -110,6 +115,7 @@ def getToken(self, audience: str) -> str: return get_az_cli_token(scope, tenant_id=tenant_id) +@all_methods_start_new_current_span_with_method_name(tracer) class LocalEnvUtils(): """Class which mirrors elements of the mssparkutils.env API. Intentionally not a full representation - additional methods will be added to it as and when the need arises. @@ -133,6 +139,7 @@ def getWorkspaceName(self) -> str: return self.config.get("getWorkspaceName") +@all_methods_start_new_current_span_with_method_name(tracer) class LocalSparkUtils(): """Class which mirrors elements of the mssparkutils API. Intentionally not a full representation - additional sub-classes will be added to it as and when the need arises. diff --git a/src/corvus_python/synapse/synapse_utils.py b/src/corvus_python/synapse/synapse_utils.py index dbfc2b5..bbe4dfd 100644 --- a/src/corvus_python/synapse/synapse_utils.py +++ b/src/corvus_python/synapse/synapse_utils.py @@ -4,6 +4,10 @@ import time from corvus_python.pyspark.utilities.spark_utils.spark_utils import get_spark_utils +from ..monitoring.tracing import all_methods_start_new_current_span_with_method_name +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) class SynapsePipelineError(Exception): @@ -31,6 +35,7 @@ class SynapsePipelineStatus: CANCELLED: str = 'Cancelled' +@all_methods_start_new_current_span_with_method_name(tracer) class SynapseUtilities: """ A utility class for interacting with Azure Synapse Analytics. diff --git a/src/corvus_python/word_document_generator/word_document_generator.py b/src/corvus_python/word_document_generator/word_document_generator.py index e29bd1d..ab9b712 100644 --- a/src/corvus_python/word_document_generator/word_document_generator.py +++ b/src/corvus_python/word_document_generator/word_document_generator.py @@ -4,8 +4,13 @@ from typing import IO, Dict from docxtpl import DocxTemplate, InlineImage from docx.shared import Mm +from ..monitoring.tracing import all_methods_start_new_current_span_with_method_name +from opentelemetry import trace +tracer = trace.get_tracer(__name__) + +@all_methods_start_new_current_span_with_method_name(tracer) class WordDocumentGenerator: """A class for generating Word documents from templates.""" From 632afdd27e96322ea9f1b0fb40445f08da6f658c Mon Sep 17 00:00:00 2001 From: Jonathan George Date: Fri, 10 Jan 2025 17:34:49 +0000 Subject: [PATCH 3/6] Added information on the tracing code to README --- src/corvus_python/README.md | 80 +++++++++++++++++++++++++ src/corvus_python/monitoring/tracing.py | 5 ++ 2 files changed, 85 insertions(+) diff --git a/src/corvus_python/README.md b/src/corvus_python/README.md index c9777e9..a46d22d 100644 --- a/src/corvus_python/README.md +++ b/src/corvus_python/README.md @@ -4,3 +4,83 @@ This package is an interim home for data-related & python-based utility function * Improving the inner dev loop when working with Azure Synapse Notebooks * Encapsulating opionated data engineering design concepts (e.g. Medallion Data Lake Architecture) + +## Tracing + +Helper methods to support Open Telemetry-based tracing are provided in `corvus_python.monitoring.tracing`. These can be applied to code, but will only have any effect if suitable OpenTelemetry configuration is performed in the consuming package. + +There are two decorators and four helper methods: + +Decorators: +- `start_as_current_span_with_method_name` - applied to a function, this will start a span named for the function that's created when the function is called and lasts for the duration of the function execution. +- `all_methods_start_new_current_span_with_method_name` - applies the `start_as_current_span_with_method_name` to all callable members of the class + +Helpers: +- `add_attributes_to_span` adds the specified keyword arguments to the given span as attributes. +- `add_attributes_to_current_span` is similar to `add_attributes_to_span`, but uses the current span rather than requiring it to be passed in. +- `add_kwargs_to_span` adds the specified keys from the kwargs of a function to the given span. +- `add_kwargs_to_current_span` is similar to `add_kwargs_to_span`, but uses the current span rather than requiring it to be passed in. + +### Configuration for Application Insights + +To use these decorators and methods, you must configure Open Telemetry correctly in your code. To configure traces to be sent to Application Insights, take the following steps: + +1. Add the `azure-monitor-opentelemetry` to your project. Assuming your package is being deployed to a Spark cluster, you will also need to deploy this package on the cluster. +2. Use the `configure_azure_monitor` function to set up Open Telemetry: + +```python +configure_azure_monitor( + connection_string="[Your connection string here]", + resource=Resource.create({ + "service.name": "[Your service name here]", + "service.instance.id": "[Your service instance Id here]" + }) +``` + +Store your connection string from an Azure KeyVault and retrieve using `mssparkutils.credentials.getSecretWithLS` or equivalent. + +### Writing log messages to App Insights + +If your code is already using standard Python logging via the `logging` namespace, the `configure_azure_monitor` method will ensure that these messages are written to App Insights as traces of type `TRACE`, with their parent being the current span at the point they were logged. However, you should prefer writing useful data as attributes of your spans rather than in log messages; this will make it simpler to query them using the Kusto query language in Azure Monitor. + +### Recommendations when running in Synapse + +These recommendations are intended for the scenario when you are deploying packages into a Spark cluster to be executed from a Synapse notebook, although there will be equivalent implementations for other platforms. + +#### Add the Azure Monitor configuration to the notebook which calls your code + +Adding the configuration to the notebook rather than inside your package means that it will be possible to apply different Open Telemetry configuration elsewhere - for example, in test runs - or leave it out completely if required to effectively disable tracing. + +#### Service name and service instance Id + +Use your package name as the service name. This will be accesible in App Insights as the `Role name` in every span that is sent. If applying a filter in the transaction search screen, it is referred to as `Cloud role name`. + +Use a suitable correlation Id for the service instance Id. This will be accessible in App Insights as the `Role instance` in every span that is sent. If applying a filter in the transaction search screen, it is referred to as `Cloud role instance`. + +When code is executed from a Synapse notebook, it's likely that it will be being invoked from a pipeline. In this case you can set `service.instance.id` to the Pipeline run Id to provide an easy link between pipeline runs in the Synapse monitor. This can either be passed into the notebook as a parameter, or accessed in the notebook using `mssparkutils.runtime.context["pipelinejobid"]`. + +#### Span types + +The supplied decorators create spans of kind `SpanKind.INTERNAL` - the default. This is intended to indicate that the span represents an internal operation within an application, as opposed to an operations with remote parents or children. This will result in a trace of type `DEPENDENCY` in App Insights (also referred to as `InProc`). + +Consider wrapping calls to your package with a manually created span of kind `SpanKind.REQUEST`. This will result in a trace of type `REQUEST` in App Insights, and will make it easier to locate invocations of your package by searching for spans of type `REQUEST` using your correlation Id. + +If your package has a small number of entrypoints, you can create the Request span inside your package. Alternatively, you can create the Request span in the notebook which uses your package. The code looks like this: + +```python +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) + +def my_entrypoint(): + + with tracer.start_as_current_span( + "my_entrypoint_name", + kind=trace.SpanKind.SERVER) as span: + + # Your code here. +``` + +#### Sensitive data + +Never send sensitive data to App Insights. The need to avoid this is why the helper methods do not add all of a functions arguments to span attributes, and why the helper method `add_kwargs_to_span` requires you to explicitly specify the keys to add. If sensitive data is accidentally logged into App Insights, it should be considered a data breach. \ No newline at end of file diff --git a/src/corvus_python/monitoring/tracing.py b/src/corvus_python/monitoring/tracing.py index c65ad74..7ee846c 100644 --- a/src/corvus_python/monitoring/tracing.py +++ b/src/corvus_python/monitoring/tracing.py @@ -50,6 +50,11 @@ def add_attributes_to_span(span: trace.Span, **kwargs: dict[str, any]): """ Adds the specified key-value pairs to the specified span as attributes. + For example, calling: + add_attributes_to_span(span, key1="value1", key2="value2") + is equivalent to calling: + span.set_attributes({"key1": "value1", "key2": "value2"}) + Args: **kwargs: The key-value pairs to add to the span as attributes. """ From edfc1a19d0116252a69c3632ee0a0feb61a8a06d Mon Sep 17 00:00:00 2001 From: Jonathan George Date: Fri, 10 Jan 2025 17:47:59 +0000 Subject: [PATCH 4/6] README tweak --- src/corvus_python/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/corvus_python/README.md b/src/corvus_python/README.md index a46d22d..b92bd3f 100644 --- a/src/corvus_python/README.md +++ b/src/corvus_python/README.md @@ -29,6 +29,9 @@ To use these decorators and methods, you must configure Open Telemetry correctly 2. Use the `configure_azure_monitor` function to set up Open Telemetry: ```python +from azure.monitor.opentelemetry import configure_azure_monitor +from opentelemetry.sdk.resources import Resource + configure_azure_monitor( connection_string="[Your connection string here]", resource=Resource.create({ From 98cc17d8b77349cab8a207a2b1d4c5c89a1baed1 Mon Sep 17 00:00:00 2001 From: Jonathan George Date: Tue, 14 Jan 2025 16:54:45 +0000 Subject: [PATCH 5/6] Readme tweak --- src/corvus_python/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/corvus_python/README.md b/src/corvus_python/README.md index b92bd3f..1f3e988 100644 --- a/src/corvus_python/README.md +++ b/src/corvus_python/README.md @@ -66,9 +66,9 @@ When code is executed from a Synapse notebook, it's likely that it will be being The supplied decorators create spans of kind `SpanKind.INTERNAL` - the default. This is intended to indicate that the span represents an internal operation within an application, as opposed to an operations with remote parents or children. This will result in a trace of type `DEPENDENCY` in App Insights (also referred to as `InProc`). -Consider wrapping calls to your package with a manually created span of kind `SpanKind.REQUEST`. This will result in a trace of type `REQUEST` in App Insights, and will make it easier to locate invocations of your package by searching for spans of type `REQUEST` using your correlation Id. +Consider wrapping calls to your package with a manually created span of kind `SpanKind.SERVER`. This will result in a trace of type `REQUEST` in App Insights, and will make it easier to locate invocations of your package by searching for traces of type `REQUEST` using your correlation Id. -If your package has a small number of entrypoints, you can create the Request span inside your package. Alternatively, you can create the Request span in the notebook which uses your package. The code looks like this: +If your package has a small number of entrypoints, you can create the `SpanKind.SERVER` span inside your package. Alternatively, you can create it in the notebook which uses your package. The code looks like this: ```python from opentelemetry import trace From 2606bd9e87340bf4094be9dcce36f6bb93749607 Mon Sep 17 00:00:00 2001 From: Mike Evans-Larah Date: Mon, 30 Mar 2026 13:03:38 +0000 Subject: [PATCH 6/6] Fix import --- src/corvus_python/synapse/synapse_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/corvus_python/synapse/synapse_utils.py b/src/corvus_python/synapse/synapse_utils.py index b742318..47cb8de 100644 --- a/src/corvus_python/synapse/synapse_utils.py +++ b/src/corvus_python/synapse/synapse_utils.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta import time -from ..monitoring.tracing import all_methods_start_new_current_span_with_method_name +from corvus_python.monitoring.tracing import all_methods_start_new_current_span_with_method_name from opentelemetry import trace tracer = trace.get_tracer(__name__)