From 4aad37066cb917d79f3be705e40d9e0325182882 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 13:11:36 +0100 Subject: [PATCH 01/11] Add a configure_tracing function to set up a trace provider --- pixl_core/src/core/tracing.py | 44 +++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 pixl_core/src/core/tracing.py diff --git a/pixl_core/src/core/tracing.py b/pixl_core/src/core/tracing.py new file mode 100644 index 00000000..f091000f --- /dev/null +++ b/pixl_core/src/core/tracing.py @@ -0,0 +1,44 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Configure OpenTelemetry tracing for services not wrapped by opentelemetry-instrument.""" + +from __future__ import annotations + +import atexit +import os + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +__all__ = ["configure_tracing"] + + +def configure_tracing() -> None: + """ + Set up an OTLP span exporter when OTEL_EXPORTER_OTLP_ENDPOINT is set. + + When the endpoint is not set, tracing and spans are no-ops. + """ + if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"): + return + + exporter = OTLPSpanExporter() + processor = BatchSpanProcessor(exporter) + provider = TracerProvider(resource=Resource.create()) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + atexit.register(provider.shutdown) From 78ee16370555a04a31b8fa12d22dcd1833cf747a Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:31:18 +0100 Subject: [PATCH 02/11] Remove unused OTEL_EXPORTER_OTLP_HEADERS env var --- docker-compose.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index ff07a7d6..b606fad7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,7 +61,6 @@ x-azure-keyvault: &azure-keyvault x-otel-common: &otel-common OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-} - OTEL_EXPORTER_OTLP_HEADERS: ${OTEL_EXPORTER_OTLP_HEADERS:-} OTEL_RESOURCE_ATTRIBUTES: "service.namespace=pixl" OTEL_EXPORTER_OTLP_PROTOCOL: grpc OTEL_LOGS_EXPORTER: none # we define our own loguru sink for exporting logs From d4237b7e12b2a0f832d9fcecaaa734ebf8b95b1f Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:32:57 +0100 Subject: [PATCH 03/11] Add a instrument_pika_producer to producer.py to instrument and enrich the producer --- pixl_core/src/core/patient_queue/producer.py | 93 ++++++++++++++------ 1 file changed, 68 insertions(+), 25 deletions(-) diff --git a/pixl_core/src/core/patient_queue/producer.py b/pixl_core/src/core/patient_queue/producer.py index 0218c0d2..b9fc3a18 100644 --- a/pixl_core/src/core/patient_queue/producer.py +++ b/pixl_core/src/core/patient_queue/producer.py @@ -17,16 +17,49 @@ from typing import TYPE_CHECKING +from opentelemetry.instrumentation.pika import PikaInstrumentor from pika import BasicProperties, DeliveryMode +from core.patient_queue.message import deserialise + from ._base import PixlBlockingInterface if TYPE_CHECKING: + from opentelemetry.trace import Span + from core.patient_queue.message import Message from loguru import logger +def _enrich_pika_span( + span: Span, + body: bytes, + properties: BasicProperties, # noqa: ARG001 +) -> None: + """Enrich the span PikaInstrumentor creates for a published message with its identifiers.""" + msg = deserialise(body) + span.set_attributes( + { + "project_name": msg.project_name, + "mrn": msg.mrn, + "accession_number": msg.accession_number, + "study_uid": msg.study_uid, + } + ) + + +def instrument_pika_producer() -> None: + """ + Instrument pika so each published message gets its own producer span, enriched with the + message's identifiers. + + Call once at application start-up. + If tracing is not configured, the spans are no-ops. + """ + PikaInstrumentor().instrument(publish_hook=_enrich_pika_span) + + class PixlProducer(PixlBlockingInterface): """Generic publisher for RabbitMQ""" @@ -36,32 +69,42 @@ def publish(self, messages: list[Message], priority: int) -> None: :param messages: list of messages to be sent to queue :param priority: priority of the messages, from 1 (lowest) to 5 (highest) """ - logger.info("Publishing {} messages to queue: {}", len(messages), self.queue_name) - if len(messages) > 0: - for msg in messages: - serialised_msg = msg.serialise() - self._channel.basic_publish( - exchange="", - routing_key=self.queue_name, - body=serialised_msg, - properties=BasicProperties( - delivery_mode=DeliveryMode.Persistent, - priority=priority, - ), - ) - logger.bind( - project_name=msg.project_name, - mrn=msg.mrn, - accession_number=msg.accession_number, - study_uid=msg.study_uid, - ).debug( - "Message {} published to queue {} with priority {}", - msg, - self.queue_name, - priority, - ) - else: + if len(messages) == 0: logger.warning("List of messages is empty so nothing will be published to queue.") + return + + logger.info("Publishing {} messages to queue: {}", len(messages), self.queue_name) + for msg in messages: + self._publish_message(msg, priority) + + def _publish_message(self, message: Message, priority: int) -> None: + """ + Publish a single serialised message to a queue. + :param message: message to be sent to queue + :param priority: priority of the message, from 1 (lowest) to 5 (highest) + """ + serialised_msg = message.serialise() + self._channel.basic_publish( + exchange="", + routing_key=self.queue_name, + body=serialised_msg, + properties=BasicProperties( + delivery_mode=DeliveryMode.Persistent, + priority=priority, + ), + ) + + logger.bind( + project_name=message.project_name, + mrn=message.mrn, + accession_number=message.accession_number, + study_uid=message.study_uid, + ).debug( + "Message {} published to queue {} with priority {}", + message, + self.queue_name, + priority, + ) def clear_queue(self) -> None: """ From d0c01dd0584562e7d36923c671372713137ae299 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:33:39 +0100 Subject: [PATCH 04/11] Instrument the cli Add a function to get set the relevent otel env vars before calling configure_logging and configure_tracing --- cli/src/pixl_cli/main.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 259a1b12..c2ecef27 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -24,7 +24,8 @@ import requests from core.exports import ParquetExport from core.logging import configure_logging -from core.patient_queue.producer import PixlProducer +from core.patient_queue.producer import PixlProducer, instrument_pika_producer +from core.tracing import configure_tracing from decouple import RepositoryEnv, UndefinedValueError from loguru import logger @@ -51,12 +52,33 @@ os.environ["NO_PROXY"] = os.environ["no_proxy"] = "localhost" +def _configure_telemetry_env_vars() -> None: + """ + Set the OTel environment variables needed by the CLI. + + OTel is configured via environment variables, but the CLI gets its config from a .env in + the current working directory. + + Load the config and set the relevant environment variables. + """ + endpoint = config("OTEL_EXPORTER_OTLP_ENDPOINT", default="") + if not endpoint: + return + + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = endpoint + os.environ["OTEL_SERVICE_NAME"] = "pixl-cli" + os.environ["OTEL_RESOURCE_ATTRIBUTES"] = "service.namespace=pixl" + + @click.group() @click.option("--debug/--no-debug", default=False) def cli(*, debug: bool) -> None: """PIXL command line interface""" logging_level = "DEBUG" if debug else "INFO" + _configure_telemetry_env_vars() configure_logging(level=logging_level) + configure_tracing() + instrument_pika_producer() cli.add_command(dc) From 09075ae93d871e90ad785f24639f0292076115cb Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:34:34 +0100 Subject: [PATCH 05/11] Instrument orthanc raw --- orthanc/orthanc-raw/plugin/pixl.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/orthanc/orthanc-raw/plugin/pixl.py b/orthanc/orthanc-raw/plugin/pixl.py index cbc716fd..101b1b6a 100644 --- a/orthanc/orthanc-raw/plugin/pixl.py +++ b/orthanc/orthanc-raw/plugin/pixl.py @@ -24,8 +24,10 @@ from typing import TYPE_CHECKING from core.logging import configure_logging +from core.tracing import configure_tracing from decouple import config from loguru import logger +from opentelemetry import trace from pixl_dcmd.tagrecording import record_dicom_headers import orthanc @@ -36,6 +38,11 @@ # Set up logging as main entry point logging_level = config("LOG_LEVEL", default="INFO") configure_logging(level=logging_level) + +# Set up tracing to correlate traces and logs +configure_tracing() +tracer = trace.get_tracer("pixl.orthanc_raw") + logger.warning("Running logging at level {}", logging_level) @@ -48,7 +55,8 @@ def OnHeartBeat(output, uri, **request): # noqa: ARG001 def ReceivedInstanceCallback(receivedDicom: bytes, origin: str) -> Any: # noqa: ARG001 """Optionally record headers from the received DICOM instance.""" if should_record_headers(): - record_dicom_headers(receivedDicom) + with tracer.start_as_current_span(name="record_dicom_headers"): + record_dicom_headers(receivedDicom) return orthanc.ReceivedInstanceAction.KEEP_AS_IS, None From 4048d7cf8fa14c29616d6f1a2e0d66e3bb10f905 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:34:59 +0100 Subject: [PATCH 06/11] Instrument orthanc anon --- orthanc/orthanc-anon/plugin/pixl.py | 47 +++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/orthanc/orthanc-anon/plugin/pixl.py b/orthanc/orthanc-anon/plugin/pixl.py index eca53427..f8a83590 100644 --- a/orthanc/orthanc-anon/plugin/pixl.py +++ b/orthanc/orthanc-anon/plugin/pixl.py @@ -37,8 +37,14 @@ from core.exceptions import PixlDiscardError, PixlSkipInstanceError from core.logging import configure_logging from core.project_config.pixl_config_model import load_project_config +from core.tracing import configure_tracing from decouple import config from loguru import logger +from opentelemetry import trace +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from opentelemetry.propagate import extract +from pixl_dcmd._database import engine as pixl_db_engine from pixl_dcmd.dicom_helpers import get_study_info from pixl_dcmd.main import ( anonymise_dicom_and_update_db, @@ -54,6 +60,7 @@ from typing import Any from core.project_config.pixl_config_model import PixlConfig + from opentelemetry.context import Context from pixl_dcmd.dicom_helpers import StudyInfo ORTHANC_USERNAME = config("ORTHANC_USERNAME") @@ -69,6 +76,15 @@ # Set up logging as main entry point logging_level = config("LOG_LEVEL", default="INFO") configure_logging(level=logging_level) + +# Set up tracing to to correlate logs and traces. +# pixl_dcmd creates its SQLAlchemy engine at import time, so the engine must be +# passed explicitly to the instrumentor +configure_tracing() +SQLAlchemyInstrumentor().instrument(engine=pixl_db_engine) +RequestsInstrumentor().instrument() +tracer = trace.get_tracer("pixl.orthanc_anon") + logger.warning("Running logging at level {}", logging_level) # Set up a thread pool executor for non-blocking calls to Orthanc @@ -220,8 +236,18 @@ def ImportStudiesFromRaw(output, uri, **request): # noqa: ARG001 series_to_keep = payload["SeriesInstanceUIDs"] project_name = payload["ProjectName"] + # Extract the trace context injected into the request headers by the caller, and pass it to + # the thread pool job so the import continues the same trace + headers = {key.lower(): value for key, value in request.get("headers", {}).items()} + parent_context = extract(headers) + executor.submit( - _import_studies_from_raw, study_resource_ids, study_uids, project_name, series_to_keep + _import_studies_from_raw, + study_resource_ids, + study_uids, + project_name, + series_to_keep, + parent_context, ) response = json.dumps({"Message": "Ok"}) @@ -233,6 +259,7 @@ def _import_studies_from_raw( study_uids: list[str], project_name: str, series_to_keep: list[str], + parent_context: Context | None = None, ) -> None: """ Import studies from Orthanc Raw. @@ -240,6 +267,7 @@ def _import_studies_from_raw( Args: study_resource_ids: Resource IDs of the study in Orthanc Raw project_name: Name of the project + parent_context: Trace context extracted from the incoming request, to continue the trace - Pull studies from Orthanc Raw based on its resource ID - Iterate over instances and anonymise them @@ -247,7 +275,11 @@ def _import_studies_from_raw( - Notify the PIXL export-api to send the studies to the relevant endpoint for the project """ - with logger.contextualize(project_name=project_name): + # Continue the trace from the incoming request and bind the project to every log within it. + with ( + tracer.start_as_current_span(name="import_studies_from_raw", context=parent_context), + logger.contextualize(project_name=project_name), + ): anonymised_study_uids = [] for study_resource_id, study_uid in zip(study_resource_ids, study_uids, strict=False): @@ -294,10 +326,13 @@ def _anonymise_study_and_upload( zipped_study_bytes = get_study_zip_archive_from_raw(resource_id=study_resource_id) study_info = _get_study_info_from_first_file(zipped_study_bytes) - with logger.contextualize( - mrn=study_info.mrn, - accession_number=study_info.accession_number, - study_uid=study_info.study_uid, + with ( + tracer.start_as_current_span(name="anonymise_study"), + logger.contextualize( + mrn=study_info.mrn, + accession_number=study_info.accession_number, + study_uid=study_info.study_uid, + ), ): logger.info("Processing project '{}', {}", project_name, study_info) From 9ee48d9819f5180e48ff521c60f48bb102b1b958 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:35:53 +0100 Subject: [PATCH 07/11] Use logging fixtures from the conftest --- pixl_core/tests/conftest.py | 31 +++++++++++++++++++++ pixl_core/tests/test_logging.py | 48 +++++++-------------------------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/pixl_core/tests/conftest.py b/pixl_core/tests/conftest.py index b5675297..ca1e5df0 100644 --- a/pixl_core/tests/conftest.py +++ b/pixl_core/tests/conftest.py @@ -22,12 +22,20 @@ import pytest import requests +from loguru import logger +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + InMemoryLogRecordExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.resources import Resource from pydicom.uid import generate_uid from pytest_pixl.helpers import run_subprocess from sqlalchemy import Engine, create_engine from sqlalchemy.orm import Session, sessionmaker from core.db.models import Base, Extract, Image +from core.logging import OTelSink from core.patient_queue.message import Message if TYPE_CHECKING: @@ -226,3 +234,26 @@ def mock_message() -> Message: "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" ).replace(tzinfo=datetime.UTC), ) + + +@pytest.fixture +def log_exporter() -> InMemoryLogRecordExporter: + """In-memory exporter capturing the OTel log records the sink emits.""" + return InMemoryLogRecordExporter() + + +@pytest.fixture +def otel_logger( + monkeypatch: pytest.MonkeyPatch, + log_exporter: InMemoryLogRecordExporter, +) -> Generator[None]: + """Configure an OTelSink using the in-memory exporter.""" + processor = SimpleLogRecordProcessor(log_exporter) + provider = LoggerProvider(resource=Resource.create({"service.name": "test"})) + provider.add_log_record_processor(processor) + monkeypatch.setattr(OTelSink, "_build_provider", lambda _: provider) + + # Set catch=False so loguru doesn't swallow exceptions raised in the sink + handler_id = logger.add(OTelSink(), level="TRACE", catch=False) + yield + logger.remove(handler_id) diff --git a/pixl_core/tests/test_logging.py b/pixl_core/tests/test_logging.py index 79a5774d..5a0293e3 100644 --- a/pixl_core/tests/test_logging.py +++ b/pixl_core/tests/test_logging.py @@ -20,39 +20,11 @@ import pytest from loguru import logger from opentelemetry._logs import SeverityNumber -from opentelemetry.sdk._logs import LoggerProvider -from opentelemetry.sdk._logs.export import ( - InMemoryLogRecordExporter, - SimpleLogRecordProcessor, -) -from opentelemetry.sdk.resources import Resource -from core.logging import OTelSink, configure_logging +from core.logging import configure_logging if TYPE_CHECKING: - from collections.abc import Generator - - -@pytest.fixture -def exporter() -> InMemoryLogRecordExporter: - """In-memory exporter capturing the OTel log records the sink emits.""" - return InMemoryLogRecordExporter() - - -@pytest.fixture -def otel_logger( - exporter: InMemoryLogRecordExporter, - monkeypatch: pytest.MonkeyPatch, -) -> Generator[None]: - """Configure an OTelSink using the in-memory exporter.""" - provider = LoggerProvider(resource=Resource.create({"service.name": "test"})) - provider.add_log_record_processor(SimpleLogRecordProcessor(exporter)) - monkeypatch.setattr(OTelSink, "_build_provider", lambda _: provider) - - # Set catch=False so loguru doesn't swallow exceptions raised in the sink - handler_id = logger.add(OTelSink(), level="TRACE", catch=False) - yield - logger.remove(handler_id) + from opentelemetry.sdk._logs.export import InMemoryLogRecordExporter def test_configure_logging_creates_otel_sink() -> None: @@ -62,23 +34,23 @@ def test_configure_logging_creates_otel_sink() -> None: @pytest.mark.usefixtures("otel_logger") -def test_otel_sink_logs_messages(exporter: InMemoryLogRecordExporter) -> None: +def test_otel_sink_logs_messages(log_exporter: InMemoryLogRecordExporter) -> None: """Test that loguru records are sent to the OTel exporter.""" logger.info("A test message") - record = exporter.get_finished_logs()[0].log_record + record = log_exporter.get_finished_logs()[0].log_record assert record.body == "A test message" @pytest.mark.usefixtures("otel_logger") -def test_bound_fields_become_attributes(exporter: InMemoryLogRecordExporter) -> None: +def test_bound_fields_become_attributes(log_exporter: InMemoryLogRecordExporter) -> None: """Test that bound fields are exported as top-level OTel log attributes.""" logger.bind( project_name="test-project", study_uid="1.2.3", ).info("Processing study.") - record = exporter.get_finished_logs()[0].log_record + record = log_exporter.get_finished_logs()[0].log_record attributes = dict(record.attributes) assert record.body == "Processing study." @@ -90,13 +62,13 @@ def test_bound_fields_become_attributes(exporter: InMemoryLogRecordExporter) -> @pytest.mark.usefixtures("otel_logger") -def test_severity_mapping(exporter: InMemoryLogRecordExporter) -> None: +def test_severity_mapping(log_exporter: InMemoryLogRecordExporter) -> None: """Test loguru levels map correctly to the configured OTel severity name and number.""" logger.trace("Trace message.") logger.info("This is informative.") logger.success("Well done!") - records = [data.log_record for data in exporter.get_finished_logs()] + records = [data.log_record for data in log_exporter.get_finished_logs()] assert [(r.severity_text, r.severity_number) for r in records] == [ ("TRACE", SeverityNumber.TRACE), ("INFO", SeverityNumber.INFO), @@ -105,7 +77,7 @@ def test_severity_mapping(exporter: InMemoryLogRecordExporter) -> None: @pytest.mark.usefixtures("otel_logger") -def test_exception_is_captured(exporter: InMemoryLogRecordExporter) -> None: +def test_exception_is_captured(log_exporter: InMemoryLogRecordExporter) -> None: """ Test that exception type, message and attribute are recorded when calling logger.exception. @@ -120,7 +92,7 @@ def _bad_function() -> None: except ValueError: logger.exception("failed") - record = exporter.get_finished_logs()[0].log_record + record = log_exporter.get_finished_logs()[0].log_record attributes = dict(record.attributes) assert record.severity_text == "ERROR" From 41fd1be87b252c757b69bdc7f8a3e8f88ecd1595 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 15:38:30 +0100 Subject: [PATCH 08/11] Add tests for configure_tracing --- pixl_core/tests/test_tracing.py | 85 +++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 pixl_core/tests/test_tracing.py diff --git a/pixl_core/tests/test_tracing.py b/pixl_core/tests/test_tracing.py new file mode 100644 index 00000000..ceb4b0fd --- /dev/null +++ b/pixl_core/tests/test_tracing.py @@ -0,0 +1,85 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for OpenTelemetry tracing setup and log/trace correlation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from loguru import logger +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from core.tracing import configure_tracing + +if TYPE_CHECKING: + from opentelemetry.sdk._logs.export import InMemoryLogRecordExporter + from opentelemetry.trace import Tracer + + +@pytest.fixture +def otel_tracer() -> Tracer: + """A tracer backed by the in-memory span exporter (local, not the global provider).""" + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = TracerProvider() + provider.add_span_processor(processor) + return provider.get_tracer("test") + + +@pytest.mark.parametrize( + ("endpoint", "is_tracer"), + [ + ("", False), + ("http://localhost:4317", True), + ], +) +def test_configure_tracing_endpoint( + monkeypatch: pytest.MonkeyPatch, + endpoint: str, + is_tracer: type | None, +) -> None: + """Test tracing is configured only if OTEL_EXPORTER_OTLP_ENDPOINT is set.""" + monkeypatch.setenv( + name="OTEL_EXPORTER_OTLP_ENDPOINT", + value=endpoint, + ) + assert (configure_tracing() is not None) == is_tracer + + +@pytest.mark.usefixtures("otel_logger") +def test_log_outside_span_has_no_trace_context(log_exporter: InMemoryLogRecordExporter) -> None: + """Test logs emitted with no active span have no trace context.""" + logger.info("no span here") + + record = log_exporter.get_finished_logs()[0].log_record + assert not record.trace_id + assert not record.span_id + + +@pytest.mark.usefixtures("otel_logger") +def test_log_is_correlated_with_active_span( + otel_tracer: Tracer, + log_exporter: InMemoryLogRecordExporter, +) -> None: + """Test logs emitted within a span carry that span's trace_id and span_id.""" + with otel_tracer.start_as_current_span("test_span") as span: + logger.info("inside the span") + span_context = span.get_span_context() + + record = log_exporter.get_finished_logs()[0].log_record + assert record.trace_id == span_context.trace_id + assert record.span_id == span_context.span_id From 23f426c3fd51b9dc1b1e75466453e5ae32d54f77 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Wed, 17 Jun 2026 17:01:43 +0100 Subject: [PATCH 09/11] Manually create a span before publishing the message This way the span belongs to the CLI rather than to the queue --- cli/src/pixl_cli/main.py | 5 ++- pixl_core/src/core/patient_queue/producer.py | 46 +++++--------------- 2 files changed, 14 insertions(+), 37 deletions(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index c2ecef27..ec758c2d 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -24,10 +24,11 @@ import requests from core.exports import ParquetExport from core.logging import configure_logging -from core.patient_queue.producer import PixlProducer, instrument_pika_producer +from core.patient_queue.producer import PixlProducer from core.tracing import configure_tracing from decouple import RepositoryEnv, UndefinedValueError from loguru import logger +from opentelemetry.instrumentation.pika import PikaInstrumentor from pixl_cli._config import ( HOST_EXPORT_ROOT_DIR, @@ -78,7 +79,7 @@ def cli(*, debug: bool) -> None: _configure_telemetry_env_vars() configure_logging(level=logging_level) configure_tracing() - instrument_pika_producer() + PikaInstrumentor().instrument() cli.add_command(dc) diff --git a/pixl_core/src/core/patient_queue/producer.py b/pixl_core/src/core/patient_queue/producer.py index b9fc3a18..78bf21c4 100644 --- a/pixl_core/src/core/patient_queue/producer.py +++ b/pixl_core/src/core/patient_queue/producer.py @@ -17,47 +17,16 @@ from typing import TYPE_CHECKING -from opentelemetry.instrumentation.pika import PikaInstrumentor +from loguru import logger +from opentelemetry import trace from pika import BasicProperties, DeliveryMode -from core.patient_queue.message import deserialise - from ._base import PixlBlockingInterface if TYPE_CHECKING: - from opentelemetry.trace import Span - from core.patient_queue.message import Message -from loguru import logger - - -def _enrich_pika_span( - span: Span, - body: bytes, - properties: BasicProperties, # noqa: ARG001 -) -> None: - """Enrich the span PikaInstrumentor creates for a published message with its identifiers.""" - msg = deserialise(body) - span.set_attributes( - { - "project_name": msg.project_name, - "mrn": msg.mrn, - "accession_number": msg.accession_number, - "study_uid": msg.study_uid, - } - ) - - -def instrument_pika_producer() -> None: - """ - Instrument pika so each published message gets its own producer span, enriched with the - message's identifiers. - - Call once at application start-up. - If tracing is not configured, the spans are no-ops. - """ - PikaInstrumentor().instrument(publish_hook=_enrich_pika_span) +tracer = trace.get_tracer("pixl_core.patient_queue.producer") class PixlProducer(PixlBlockingInterface): @@ -75,7 +44,14 @@ def publish(self, messages: list[Message], priority: int) -> None: logger.info("Publishing {} messages to queue: {}", len(messages), self.queue_name) for msg in messages: - self._publish_message(msg, priority) + attributes = { + "project_name": msg.project_name, + "mrn": msg.mrn, + "accession_number": msg.accession_number, + "study_uid": msg.study_uid, + } + with tracer.start_as_current_span("publish_message", attributes=attributes): + self._publish_message(msg, priority) def _publish_message(self, message: Message, priority: int) -> None: """ From bea0c56102d916f2a7f30cf994cc289d879ffc95 Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Thu, 18 Jun 2026 14:27:02 +0100 Subject: [PATCH 10/11] Remove test of configure_tracing as we're no longer returning a bool --- pixl_core/tests/test_tracing.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/pixl_core/tests/test_tracing.py b/pixl_core/tests/test_tracing.py index ceb4b0fd..9e5b1013 100644 --- a/pixl_core/tests/test_tracing.py +++ b/pixl_core/tests/test_tracing.py @@ -23,8 +23,6 @@ from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from core.tracing import configure_tracing - if TYPE_CHECKING: from opentelemetry.sdk._logs.export import InMemoryLogRecordExporter from opentelemetry.trace import Tracer @@ -40,26 +38,6 @@ def otel_tracer() -> Tracer: return provider.get_tracer("test") -@pytest.mark.parametrize( - ("endpoint", "is_tracer"), - [ - ("", False), - ("http://localhost:4317", True), - ], -) -def test_configure_tracing_endpoint( - monkeypatch: pytest.MonkeyPatch, - endpoint: str, - is_tracer: type | None, -) -> None: - """Test tracing is configured only if OTEL_EXPORTER_OTLP_ENDPOINT is set.""" - monkeypatch.setenv( - name="OTEL_EXPORTER_OTLP_ENDPOINT", - value=endpoint, - ) - assert (configure_tracing() is not None) == is_tracer - - @pytest.mark.usefixtures("otel_logger") def test_log_outside_span_has_no_trace_context(log_exporter: InMemoryLogRecordExporter) -> None: """Test logs emitted with no active span have no trace context.""" From fe104ace08253f0094f1015392fc13207227276b Mon Sep 17 00:00:00 2001 From: Paul Smith Date: Mon, 29 Jun 2026 13:55:57 +0100 Subject: [PATCH 11/11] Add OTEL_SDK_DISABLED environment variable to disable sending telemetry --- .env.sample | 5 +++-- docker-compose.yml | 1 + test/.env | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.env.sample b/.env.sample index f76700a4..20ec9860 100644 --- a/.env.sample +++ b/.env.sample @@ -118,6 +118,7 @@ HASHER_API_AZ_KEY_VAULT_NAME= # ENV is used by Docker compose to separate runtime environments {dev|test|prod}. ENV= +# set to `false` to enable exporting telemetry +OTEL_SDK_DISABLED=true # gRPC endpoint of the OTel collector, e.g. http://localhost:4317 -# leave empty to disable sending telemetry -OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_ENDPOINT= diff --git a/docker-compose.yml b/docker-compose.yml index b1457cbc..f3f6b440 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,6 +60,7 @@ x-azure-keyvault: &azure-keyvault AZURE_KEY_VAULT_NAME: ${EXPORT_AZ_KEY_VAULT_NAME} x-otel-common: &otel-common + OTEL_SDK_DISABLED: ${OTEL_SDK_DISABLED:-true} OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-} OTEL_EXPORTER_OTLP_HEADERS: ${OTEL_EXPORTER_OTLP_HEADERS:-} OTEL_RESOURCE_ATTRIBUTES: "service.namespace=pixl" diff --git a/test/.env b/test/.env index 47754bfb..0210b332 100644 --- a/test/.env +++ b/test/.env @@ -6,6 +6,7 @@ PIXL_QUERY_TIMEOUT=20 CLI_RETRY_SECONDS=90 PIXL_MAX_MESSAGES_IN_FLIGHT=5 TZ=Europe/London +OTEL_SDK_DISABLED=false OTEL_EXPORTER_OTLP_ENDPOINT=http://lgtm:4317 # PIXL PostgreSQL instance