Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 148 additions & 30 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ python = "^3.10"
azure-identity = "^1.15.0"
docxtpl = "^0.16.7"
pyodbc = "^5.1.0"
opentelemetry-api = "^1.29.0"
pyspark = { version = "^3.3.1", optional = true }
delta-spark = { version = ">=2.2.0,<4", optional = true }
azure-communication-email = "^1.0.0"
Expand Down
83 changes: 83 additions & 0 deletions src/corvus_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,86 @@ This package is an interim home for data-related & python-based utility function

* Improving the inner dev loop when working with Azure Synapse Notebooks
* Encapsulating opionated data engineering design concepts (e.g. Medallion Data Lake Architecture)

## Tracing

Helper methods to support Open Telemetry-based tracing are provided in `corvus_python.monitoring.tracing`. These can be applied to code, but will only have any effect if suitable OpenTelemetry configuration is performed in the consuming package.

There are two decorators and four helper methods:

Decorators:
- `start_as_current_span_with_method_name` - applied to a function, this will start a span named for the function that's created when the function is called and lasts for the duration of the function execution.
- `all_methods_start_new_current_span_with_method_name` - applies the `start_as_current_span_with_method_name` to all callable members of the class

Helpers:
- `add_attributes_to_span` adds the specified keyword arguments to the given span as attributes.
- `add_attributes_to_current_span` is similar to `add_attributes_to_span`, but uses the current span rather than requiring it to be passed in.
- `add_kwargs_to_span` adds the specified keys from the kwargs of a function to the given span.
- `add_kwargs_to_current_span` is similar to `add_kwargs_to_span`, but uses the current span rather than requiring it to be passed in.

### Configuration for Application Insights

To use these decorators and methods, you must configure Open Telemetry correctly in your code. To configure traces to be sent to Application Insights, take the following steps:

1. Add the `azure-monitor-opentelemetry` to your project. Assuming your package is being deployed to a Spark cluster, you will also need to deploy this package on the cluster.
2. Use the `configure_azure_monitor` function to set up Open Telemetry:

```python
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry.sdk.resources import Resource

configure_azure_monitor(
connection_string="[Your connection string here]",
resource=Resource.create({
"service.name": "[Your service name here]",
"service.instance.id": "[Your service instance Id here]"
})
```

Store your connection string from an Azure KeyVault and retrieve using `mssparkutils.credentials.getSecretWithLS` or equivalent.

### Writing log messages to App Insights

If your code is already using standard Python logging via the `logging` namespace, the `configure_azure_monitor` method will ensure that these messages are written to App Insights as traces of type `TRACE`, with their parent being the current span at the point they were logged. However, you should prefer writing useful data as attributes of your spans rather than in log messages; this will make it simpler to query them using the Kusto query language in Azure Monitor.

### Recommendations when running in Synapse

These recommendations are intended for the scenario when you are deploying packages into a Spark cluster to be executed from a Synapse notebook, although there will be equivalent implementations for other platforms.

#### Add the Azure Monitor configuration to the notebook which calls your code

Adding the configuration to the notebook rather than inside your package means that it will be possible to apply different Open Telemetry configuration elsewhere - for example, in test runs - or leave it out completely if required to effectively disable tracing.

#### Service name and service instance Id

Use your package name as the service name. This will be accesible in App Insights as the `Role name` in every span that is sent. If applying a filter in the transaction search screen, it is referred to as `Cloud role name`.

Use a suitable correlation Id for the service instance Id. This will be accessible in App Insights as the `Role instance` in every span that is sent. If applying a filter in the transaction search screen, it is referred to as `Cloud role instance`.

When code is executed from a Synapse notebook, it's likely that it will be being invoked from a pipeline. In this case you can set `service.instance.id` to the Pipeline run Id to provide an easy link between pipeline runs in the Synapse monitor. This can either be passed into the notebook as a parameter, or accessed in the notebook using `mssparkutils.runtime.context["pipelinejobid"]`.

#### Span types

The supplied decorators create spans of kind `SpanKind.INTERNAL` - the default. This is intended to indicate that the span represents an internal operation within an application, as opposed to an operations with remote parents or children. This will result in a trace of type `DEPENDENCY` in App Insights (also referred to as `InProc`).

Consider wrapping calls to your package with a manually created span of kind `SpanKind.SERVER`. This will result in a trace of type `REQUEST` in App Insights, and will make it easier to locate invocations of your package by searching for traces of type `REQUEST` using your correlation Id.

If your package has a small number of entrypoints, you can create the `SpanKind.SERVER` span inside your package. Alternatively, you can create it in the notebook which uses your package. The code looks like this:

```python
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def my_entrypoint():

with tracer.start_as_current_span(
"my_entrypoint_name",
kind=trace.SpanKind.SERVER) as span:

# Your code here.
```

#### Sensitive data

Never send sensitive data to App Insights. The need to avoid this is why the helper methods do not add all of a functions arguments to span attributes, and why the helper method `add_kwargs_to_span` requires you to explicitly specify the keys to add. If sensitive data is accidentally logged into App Insights, it should be considered a data breach.
5 changes: 5 additions & 0 deletions src/corvus_python/monitoring/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .tracing import all_methods_start_new_current_span_with_method_name

__all__ = [
"all_methods_start_new_current_span_with_method_name",
]
102 changes: 102 additions & 0 deletions src/corvus_python/monitoring/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from opentelemetry import trace
from functools import wraps


def start_as_current_span_with_method_name(tracer: trace.Tracer):
"""
Function decorator which starts a new span with the full name of the method (i.e. class_name.method_name for
methods within classes, or just method_name for standalone functions) as the span name. The span is then set as
the current span for the duration of the method call and can be accessed using trace.get_current_span().

Args:
tracer (trace.Tracer): The tracer to use for starting the span. Create a tracer for the source file using
trace.get_tracer(__name__) and pass it to this decorator.
"""

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(name=func.__qualname__):
return func(*args, **kwargs)

return wrapper

return decorator


def all_methods_start_new_current_span_with_method_name(tracer: trace.Tracer):
"""
Class decorator which applies start_as_current_span_with_method_name to all methods within the class.

Args:
tracer (trace.Tracer): The tracer to use for starting the span. Create a tracer for the source file using
trace.get_tracer(__name__) and pass it to this decorator.
"""

decorator = start_as_current_span_with_method_name(tracer)

def decorate(cls):
for attr in cls.__dict__:
item = getattr(cls, attr)
if callable(item):
setattr(cls, attr, decorator(item))

return cls

return decorate


def add_attributes_to_span(span: trace.Span, **kwargs: dict[str, any]):
"""
Adds the specified key-value pairs to the specified span as attributes.

For example, calling:
add_attributes_to_span(span, key1="value1", key2="value2")
is equivalent to calling:
span.set_attributes({"key1": "value1", "key2": "value2"})

Args:
**kwargs: The key-value pairs to add to the span as attributes.
"""
if span is not None:
kwargs_as_strings = {k: str(v) for k, v in kwargs.items()}
span.set_attributes(kwargs_as_strings)


def add_attributes_to_current_span(**kwargs: dict[str, any]):
"""
Adds the specified key-value pairs to the current span as attributes.

Args:
**kwargs: The key-value pairs to add to the span as attributes.
"""
add_attributes_to_span(trace.get_current_span(), **kwargs)


def add_kwargs_to_span(span: trace.Span, keys: list[str], source_kwargs: dict[str, any]):
"""
Adds the specified keys from the source_kwargs dictionary to the span as attributes.

Args:
span (trace.Span): The span to add the attributes to.
keys (list[str]): The keys from the source_kwargs to add to the span. These are manually specified to avoid
adding sensitive information to the span.
source_kwargs (dict[str, any]): The dictionary to get the values from. This is typically the kwargs
dictionary of the method being traced.
"""
kwargs_to_add = {key: source_kwargs[key] for key in keys if key in source_kwargs}
add_attributes_to_span(span, **kwargs_to_add)


def add_kwargs_to_current_span(keys: list[str], source_kwargs: dict[str, any]):
"""
Adds the specified keys from the source_kwargs dictionary to the current span as attributes.

Args:
keys (list[str]): The keys from the source_kwargs to add to the span. These are manually specified to avoid
adding sensitive information to the span.
source_kwargs (dict[str, any]): The dictionary to get the values from. This is typically the kwargs
dictionary of the method being traced.
"""
span = trace.get_current_span()
add_kwargs_to_span(span, keys, source_kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from delta import configure_spark_with_delta_pip
import os
from corvus_python.storage import StorageConfiguration, LocalFileSystemStorageConfiguration
from corvus_python.monitoring import all_methods_start_new_current_span_with_method_name
from opentelemetry import trace

tracer = trace.get_tracer(__name__)


CWD = os.path.join(os.getcwd())
Expand Down Expand Up @@ -48,6 +52,7 @@ class LocalSparkSessionConfig:


@dataclass
@all_methods_start_new_current_span_with_method_name(tracer)
class LocalSparkSession:
"""Class to represent a Local Spark session.
Attributes:
Expand Down
7 changes: 7 additions & 0 deletions src/corvus_python/spark_utils/local_spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from typing import Dict, TypedDict, Literal, Optional
from corvus_python.auth import get_az_cli_token
from corvus_python.monitoring import all_methods_start_new_current_span_with_method_name
from opentelemetry import trace

tracer = trace.get_tracer(__name__)


class StaticSecretConfig(TypedDict):
Expand Down Expand Up @@ -98,6 +102,7 @@ def __init__(self, secret_name: str):
super().__init__(self.message)


@all_methods_start_new_current_span_with_method_name(tracer)
class LocalCredentialUtils:
"""Class which mirrors elements of the mssparkutils.credentials API. Intentionally not a full representation -
additional methods will be added to it as and when the need arises.
Expand Down Expand Up @@ -161,6 +166,7 @@ def getToken(self, audience: str) -> str:
return get_az_cli_token(scope, tenant_id=tenant_id) # type: ignore


@all_methods_start_new_current_span_with_method_name(tracer)
class LocalEnvUtils:
"""Class which mirrors elements of the mssparkutils.env API. Intentionally not a full representation - additional
methods will be added to it as and when the need arises.
Expand All @@ -185,6 +191,7 @@ def getWorkspaceName(self) -> str:
return self.config.get("getWorkspaceName")


@all_methods_start_new_current_span_with_method_name(tracer)
class LocalSparkUtils:
"""Class which mirrors elements of the mssparkutils API. Intentionally not a full representation - additional
sub-classes will be added to it as and when the need arises.
Expand Down
54 changes: 28 additions & 26 deletions src/corvus_python/synapse/synapse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from datetime import datetime, timedelta
import time

from corvus_python.monitoring.tracing import all_methods_start_new_current_span_with_method_name
from opentelemetry import trace

tracer = trace.get_tracer(__name__)


class SynapsePipelineError(Exception):
"""
Expand All @@ -21,14 +26,15 @@ class SynapsePipelineStatus:
Represents the possible status values for a Synapse pipeline.
"""

QUEUED: str = 'Queued'
IN_PROGRESS: str = 'InProgress'
SUCCEEDED: str = 'Succeeded'
FAILED: str = 'Failed'
CANCELING: str = 'Canceling'
CANCELLED: str = 'Cancelled'
QUEUED: str = "Queued"
IN_PROGRESS: str = "InProgress"
SUCCEEDED: str = "Succeeded"
FAILED: str = "Failed"
CANCELING: str = "Canceling"
CANCELLED: str = "Cancelled"


@all_methods_start_new_current_span_with_method_name(tracer)
class SynapseUtilities:
"""
A utility class for interacting with Azure Synapse Analytics.
Expand Down Expand Up @@ -58,23 +64,19 @@ def create_pipeline_run(
"""

url = (
f'{self.workspace_endpoint}/pipelines/'
f'{urllib.parse.quote(pipeline_name)}/createRun?api-version=2020-12-01'
f"{self.workspace_endpoint}/pipelines/"
f"{urllib.parse.quote(pipeline_name)}/createRun?api-version=2020-12-01"
)

headers = {'Authorization': f'Bearer {self.access_token}'}
headers = {"Authorization": f"Bearer {self.access_token}"}

response = requests.post(url, json=pipeline_parameters, headers=headers)

pipeline_run_id = response.json()['runId']
pipeline_run_id = response.json()["runId"]

return pipeline_run_id

def wait_for_pipeline_run(
self,
run_id: str,
timeout_mins: int = 60
) -> str:
def wait_for_pipeline_run(self, run_id: str, timeout_mins: int = 60) -> str:
"""
Waits for a pipeline run to complete in Azure Synapse Analytics.

Expand All @@ -87,33 +89,33 @@ def wait_for_pipeline_run(
str: The final status of the pipeline run.
"""

url = f'{self.workspace_endpoint}/pipelineruns/{run_id}?api-version=2020-12-01'
url = f"{self.workspace_endpoint}/pipelineruns/{run_id}?api-version=2020-12-01"

headers = {'Authorization': f'Bearer {self.access_token}'}
headers = {"Authorization": f"Bearer {self.access_token}"}

completed = False
status = None
now = datetime.utcnow()
timeout_at = now + timedelta(minutes=timeout_mins)

while (not completed):
while not completed:
response = requests.get(url, headers=headers)

if response.status_code == 200:
status = response.json()['status']
status = response.json()["status"]

if status in [
SynapsePipelineStatus.SUCCEEDED,
SynapsePipelineStatus.FAILED,
SynapsePipelineStatus.CANCELLED
SynapsePipelineStatus.CANCELLED,
]:
print(f"Status for pipeline run '{run_id}': {status}.")
completed = True
else:
print(f"Status for pipeline run '{run_id}': {status}. Waiting for 10 seconds...")
time.sleep(10)
now = datetime.utcnow()
if (now > timeout_at):
if now > timeout_at:
raise SynapsePipelineError(f"Timed out waiting for pipeline run '{run_id}' to complete.")

else:
Expand Down Expand Up @@ -146,13 +148,13 @@ def get_workspace_resource_id(self) -> str:
Returns:
str: The resource ID of the workspace.
"""
url = f'{self.workspace_endpoint}/workspace?api-version=2020-12-01'
url = f"{self.workspace_endpoint}/workspace?api-version=2020-12-01"

headers = {'Authorization': f'Bearer {self.access_token}'}
headers = {"Authorization": f"Bearer {self.access_token}"}

response = requests.get(url, headers=headers)

resource_id = response.json()['id']
resource_id = response.json()["id"]

return resource_id

Expand All @@ -166,9 +168,9 @@ def get_linked_service(self, linked_service_name: str) -> str:
Returns:
str: The JSON representation of the linked service.
"""
url = f'{self.workspace_endpoint}/linkedservices/{linked_service_name}?api-version=2020-12-01'
url = f"{self.workspace_endpoint}/linkedservices/{linked_service_name}?api-version=2020-12-01"

headers = {'Authorization': f'Bearer {self.access_token}'}
headers = {"Authorization": f"Bearer {self.access_token}"}

response = requests.get(url, headers=headers)

Expand Down
Loading
Loading