Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ HASHER_API_AZ_KEY_VAULT_NAME=
# ENV is used by Docker compose to separate runtime environments {dev|test|prod}.
ENV=

# set to `false` to enable exporting telemetry
OTEL_SDK_DISABLED=true
# gRPC endpoint of the OTel collector, e.g. http://localhost:4317
# leave empty to disable sending telemetry
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_EXPORTER_OTLP_ENDPOINT=
23 changes: 23 additions & 0 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from core.exports import ParquetExport
from core.logging import configure_logging
from core.patient_queue.producer import PixlProducer
from core.tracing import configure_tracing
from decouple import RepositoryEnv, UndefinedValueError
from loguru import logger
from opentelemetry.instrumentation.pika import PikaInstrumentor

from pixl_cli._config import (
HOST_EXPORT_ROOT_DIR,
Expand All @@ -51,12 +53,33 @@
os.environ["NO_PROXY"] = os.environ["no_proxy"] = "localhost"


def _configure_telemetry_env_vars() -> None:
"""
Set the OTel environment variables needed by the CLI.

OTel is configured via environment variables, but the CLI gets its config from a .env in
the current working directory.

Load the config and set the relevant environment variables.
"""
endpoint = config("OTEL_EXPORTER_OTLP_ENDPOINT", default="")
if not endpoint:
return

os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = endpoint
os.environ["OTEL_SERVICE_NAME"] = "pixl-cli"
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = "service.namespace=pixl"


@click.group()
@click.option("--debug/--no-debug", default=False)
def cli(*, debug: bool) -> None:
"""PIXL command line interface"""
logging_level = "DEBUG" if debug else "INFO"
_configure_telemetry_env_vars()
configure_logging(level=logging_level)
configure_tracing()
PikaInstrumentor().instrument()


cli.add_command(dc)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ x-azure-keyvault: &azure-keyvault
AZURE_KEY_VAULT_NAME: ${EXPORT_AZ_KEY_VAULT_NAME}

x-otel-common: &otel-common
OTEL_SDK_DISABLED: ${OTEL_SDK_DISABLED:-true}
OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-}
OTEL_EXPORTER_OTLP_HEADERS: ${OTEL_EXPORTER_OTLP_HEADERS:-}
Comment thread
p-j-smith marked this conversation as resolved.
OTEL_RESOURCE_ATTRIBUTES: "service.namespace=pixl"
OTEL_EXPORTER_OTLP_PROTOCOL: grpc
OTEL_LOGS_EXPORTER: none # we define our own loguru sink for exporting logs
Expand Down
47 changes: 41 additions & 6 deletions orthanc/orthanc-anon/plugin/pixl.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@
from core.exceptions import PixlDiscardError, PixlSkipInstanceError
from core.logging import configure_logging
from core.project_config.pixl_config_model import load_project_config
from core.tracing import configure_tracing
from decouple import config
from loguru import logger
from opentelemetry import trace
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.propagate import extract
from pixl_dcmd._database import engine as pixl_db_engine
from pixl_dcmd.dicom_helpers import get_study_info
from pixl_dcmd.main import (
anonymise_dicom_and_update_db,
Expand All @@ -54,6 +60,7 @@
from typing import Any

from core.project_config.pixl_config_model import PixlConfig
from opentelemetry.context import Context
from pixl_dcmd.dicom_helpers import StudyInfo

ORTHANC_USERNAME = config("ORTHANC_USERNAME")
Expand All @@ -69,6 +76,15 @@
# Set up logging as main entry point
logging_level = config("LOG_LEVEL", default="INFO")
configure_logging(level=logging_level)

# Set up tracing to to correlate logs and traces.
# pixl_dcmd creates its SQLAlchemy engine at import time, so the engine must be
# passed explicitly to the instrumentor
configure_tracing()
SQLAlchemyInstrumentor().instrument(engine=pixl_db_engine)
RequestsInstrumentor().instrument()
tracer = trace.get_tracer("pixl.orthanc_anon")

logger.warning("Running logging at level {}", logging_level)

# Set up a thread pool executor for non-blocking calls to Orthanc
Expand Down Expand Up @@ -220,8 +236,18 @@ def ImportStudiesFromRaw(output, uri, **request): # noqa: ARG001
series_to_keep = payload["SeriesInstanceUIDs"]
project_name = payload["ProjectName"]

# Extract the trace context injected into the request headers by the caller, and pass it to
# the thread pool job so the import continues the same trace
headers = {key.lower(): value for key, value in request.get("headers", {}).items()}
parent_context = extract(headers)

executor.submit(
_import_studies_from_raw, study_resource_ids, study_uids, project_name, series_to_keep
_import_studies_from_raw,
study_resource_ids,
study_uids,
project_name,
series_to_keep,
parent_context,
)

response = json.dumps({"Message": "Ok"})
Expand All @@ -233,21 +259,27 @@ def _import_studies_from_raw(
study_uids: list[str],
project_name: str,
series_to_keep: list[str],
parent_context: Context | None = None,
) -> None:
"""
Import studies from Orthanc Raw.

Args:
study_resource_ids: Resource IDs of the study in Orthanc Raw
project_name: Name of the project
parent_context: Trace context extracted from the incoming request, to continue the trace

- Pull studies from Orthanc Raw based on its resource ID
- Iterate over instances and anonymise them
- Upload the studies to orthanc-anon
- Notify the PIXL export-api to send the studies to the relevant endpoint for the project

"""
with logger.contextualize(project_name=project_name):
# Continue the trace from the incoming request and bind the project to every log within it.
with (
tracer.start_as_current_span(name="import_studies_from_raw", context=parent_context),
logger.contextualize(project_name=project_name),
):
anonymised_study_uids = []

for study_resource_id, study_uid in zip(study_resource_ids, study_uids, strict=False):
Expand Down Expand Up @@ -294,10 +326,13 @@ def _anonymise_study_and_upload(
zipped_study_bytes = get_study_zip_archive_from_raw(resource_id=study_resource_id)

study_info = _get_study_info_from_first_file(zipped_study_bytes)
with logger.contextualize(
mrn=study_info.mrn,
accession_number=study_info.accession_number,
study_uid=study_info.study_uid,
with (
tracer.start_as_current_span(name="anonymise_study"),
logger.contextualize(
mrn=study_info.mrn,
accession_number=study_info.accession_number,
study_uid=study_info.study_uid,
),
):
logger.info("Processing project '{}', {}", project_name, study_info)

Expand Down
10 changes: 9 additions & 1 deletion orthanc/orthanc-raw/plugin/pixl.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
from typing import TYPE_CHECKING

from core.logging import configure_logging
from core.tracing import configure_tracing
from decouple import config
from loguru import logger
from opentelemetry import trace
from pixl_dcmd.tagrecording import record_dicom_headers

import orthanc
Expand All @@ -36,6 +38,11 @@
# Set up logging as main entry point
logging_level = config("LOG_LEVEL", default="INFO")
configure_logging(level=logging_level)

# Set up tracing to correlate traces and logs
configure_tracing()
tracer = trace.get_tracer("pixl.orthanc_raw")

logger.warning("Running logging at level {}", logging_level)


Expand All @@ -48,7 +55,8 @@ def OnHeartBeat(output, uri, **request): # noqa: ARG001
def ReceivedInstanceCallback(receivedDicom: bytes, origin: str) -> Any: # noqa: ARG001
"""Optionally record headers from the received DICOM instance."""
if should_record_headers():
record_dicom_headers(receivedDicom)
with tracer.start_as_current_span(name="record_dicom_headers"):
record_dicom_headers(receivedDicom)
return orthanc.ReceivedInstanceAction.KEEP_AS_IS, None


Expand Down
71 changes: 45 additions & 26 deletions pixl_core/src/core/patient_queue/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

from typing import TYPE_CHECKING

from loguru import logger
from opentelemetry import trace
from pika import BasicProperties, DeliveryMode

from ._base import PixlBlockingInterface

if TYPE_CHECKING:
from core.patient_queue.message import Message

from loguru import logger
tracer = trace.get_tracer("pixl_core.patient_queue.producer")


class PixlProducer(PixlBlockingInterface):
Expand All @@ -36,32 +38,49 @@ def publish(self, messages: list[Message], priority: int) -> None:
:param messages: list of messages to be sent to queue
:param priority: priority of the messages, from 1 (lowest) to 5 (highest)
"""
logger.info("Publishing {} messages to queue: {}", len(messages), self.queue_name)
if len(messages) > 0:
for msg in messages:
serialised_msg = msg.serialise()
self._channel.basic_publish(
exchange="",
routing_key=self.queue_name,
body=serialised_msg,
properties=BasicProperties(
delivery_mode=DeliveryMode.Persistent,
priority=priority,
),
)
logger.bind(
project_name=msg.project_name,
mrn=msg.mrn,
accession_number=msg.accession_number,
study_uid=msg.study_uid,
).debug(
"Message {} published to queue {} with priority {}",
msg,
self.queue_name,
priority,
)
else:
if len(messages) == 0:
logger.warning("List of messages is empty so nothing will be published to queue.")
return

logger.info("Publishing {} messages to queue: {}", len(messages), self.queue_name)
for msg in messages:
attributes = {
"project_name": msg.project_name,
"mrn": msg.mrn,
"accession_number": msg.accession_number,
"study_uid": msg.study_uid,
}
with tracer.start_as_current_span("publish_message", attributes=attributes):
self._publish_message(msg, priority)

def _publish_message(self, message: Message, priority: int) -> None:
"""
Publish a single serialised message to a queue.
:param message: message to be sent to queue
:param priority: priority of the message, from 1 (lowest) to 5 (highest)
"""
serialised_msg = message.serialise()
self._channel.basic_publish(
exchange="",
routing_key=self.queue_name,
body=serialised_msg,
properties=BasicProperties(
delivery_mode=DeliveryMode.Persistent,
priority=priority,
),
)

logger.bind(
project_name=message.project_name,
mrn=message.mrn,
accession_number=message.accession_number,
study_uid=message.study_uid,
).debug(
"Message {} published to queue {} with priority {}",
message,
self.queue_name,
priority,
)

def clear_queue(self) -> None:
"""
Expand Down
44 changes: 44 additions & 0 deletions pixl_core/src/core/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configure OpenTelemetry tracing for services not wrapped by opentelemetry-instrument."""

from __future__ import annotations

import atexit
import os

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

__all__ = ["configure_tracing"]
Comment thread
p-j-smith marked this conversation as resolved.


def configure_tracing() -> None:
"""
Set up an OTLP span exporter when OTEL_EXPORTER_OTLP_ENDPOINT is set.

When the endpoint is not set, tracing and spans are no-ops.
"""
if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"):
return

exporter = OTLPSpanExporter()
processor = BatchSpanProcessor(exporter)
provider = TracerProvider(resource=Resource.create())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
atexit.register(provider.shutdown)
31 changes: 31 additions & 0 deletions pixl_core/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@

import pytest
import requests
from loguru import logger
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import (
InMemoryLogRecordExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.resources import Resource
from pydicom.uid import generate_uid
from pytest_pixl.helpers import run_subprocess
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker

from core.db.models import Base, Extract, Image
from core.logging import OTelSink
from core.patient_queue.message import Message

if TYPE_CHECKING:
Expand Down Expand Up @@ -226,3 +234,26 @@ def mock_message() -> Message:
"Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p"
).replace(tzinfo=datetime.UTC),
)


@pytest.fixture
def log_exporter() -> InMemoryLogRecordExporter:
"""In-memory exporter capturing the OTel log records the sink emits."""
return InMemoryLogRecordExporter()


@pytest.fixture
def otel_logger(
monkeypatch: pytest.MonkeyPatch,
log_exporter: InMemoryLogRecordExporter,
Comment thread
p-j-smith marked this conversation as resolved.
) -> Generator[None]:
"""Configure an OTelSink using the in-memory exporter."""
processor = SimpleLogRecordProcessor(log_exporter)
provider = LoggerProvider(resource=Resource.create({"service.name": "test"}))
provider.add_log_record_processor(processor)
monkeypatch.setattr(OTelSink, "_build_provider", lambda _: provider)

# Set catch=False so loguru doesn't swallow exceptions raised in the sink
handler_id = logger.add(OTelSink(), level="TRACE", catch=False)
yield
logger.remove(handler_id)
Loading
Loading