diff --git a/build.sbt b/build.sbt index a5ff3ab481..dc9d64507a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,4 @@ +import Versions.zioHttp import com.softwaremill.Publish.{ossPublishSettings, updateDocs} import com.softwaremill.SbtSoftwareMillBrowserTestJS._ import com.softwaremill.SbtSoftwareMillCommon.commonSmlBuildSettings @@ -192,6 +193,7 @@ lazy val rawAllAggregates = core.projectRefs ++ opentelemetryTracing.projectRefs ++ otel4sMetrics.projectRefs ++ otel4sTracing.projectRefs ++ + zioOpenTelemetry.projectRefs ++ json4s.projectRefs ++ playJson.projectRefs ++ play29Json.projectRefs ++ @@ -1179,6 +1181,24 @@ lazy val otel4sMetrics: ProjectMatrix = (projectMatrix in file("metrics/otel4s-m .jvmPlatform(scalaVersions = scala2_13And3Versions, settings = commonJvmSettings) .dependsOn(serverCore % CompileAndTest, catsEffect % Test) +lazy val zioOpenTelemetry: ProjectMatrix = (projectMatrix in file("observability/zio-opentelemetry")) + .dependsOn(zio, zioHttpServer, opentelemetryMetrics) + .settings(commonSettings) + .settings( + name := "tapir-zio-opentelemetry", + libraryDependencies ++= Seq( + "dev.zio" %% "zio-opentelemetry" % Versions.zioOpenTelemetry, + "dev.zio" %% "zio-test" % Versions.zio % Test, + "dev.zio" %% "zio-test-sbt" % Versions.zio % Test, + "io.opentelemetry" % "opentelemetry-sdk" % Versions.openTelemetry, + "io.opentelemetry" % "opentelemetry-exporter-otlp" % Versions.openTelemetry, + "io.opentelemetry" % "opentelemetry-exporter-logging-otlp" % Versions.openTelemetry, + "io.opentelemetry" % "opentelemetry-sdk-testing" % Versions.openTelemetry % Test + ) + ) + .jvmPlatform(scalaVersions = scala2And3Versions, settings = commonJvmSettings) + .dependsOn(serverCore % CompileAndTest) + // docs lazy val apispecDocs: ProjectMatrix = (projectMatrix in file("docs/apispec-docs")) @@ -2321,6 +2341,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples")) "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % Versions.openTelemetry, "com.github.plokhotnyuk.jsoniter-scala" %%% "jsoniter-scala-macros" % Versions.jsoniter, "org.typelevel" %% "otel4s-oteljava" % Versions.otel4s, + "dev.zio" %% "zio-logging-slf4j2" % Versions.zioLogging, scalaTest.value, logback ), @@ -2361,7 +2382,8 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples")) vertxServer, zioHttpServer, zioJson, - zioMetrics + zioMetrics, + zioOpenTelemetry ) //TODO this should be invoked by compilation process, see #https://github.com/scalameta/mdoc/issues/355 diff --git a/doc/server/observability.md b/doc/server/observability.md index 42037d9c98..ea42b21548 100644 --- a/doc/server/observability.md +++ b/doc/server/observability.md @@ -522,4 +522,29 @@ might still serve the request. If a default response (e.g. a `404 Not Found`) should be produced, this should be enabled using the [reject interceptor](errors.md). Such a setup assumes that there are no other routes in the server, after the Tapir -server interpreter is invoked. \ No newline at end of file +server interpreter is invoked. + +## ZIO OpenTelemetry + +ZIO OpenTelemetry integration is provided by the `otel4z` module, which uses the otel4s library under the hood. It provides both logging, tracing and metrics capabilities, as well as a runtime telemetry service for ZIO applications. + + +Add the following dependency: + +```scala +"com.softwaremill.sttp.tapir" %% "tapir-otel4z" % "@VERSION@" +``` + +The `otel4z` module provides integration with the [ZIO OpenTelemetry](https://zio.dev/zio-opentelemetry/) library, which is built on top of the [OpenTelemetry](https://opentelemetry.io/) allowing you to create traces and metrics for your tapir endpoints using a purely functional API. + +This module provides the following layers helpers: +- `otel4zLogging` - a layer that provides the OpenTelemetry logging interceptor, which logs incoming requests and other operations. +- `otel4zMetrics` - a layer that provides the OpenTelemetry metrics interceptor, which records metrics for incoming requests and other operations. +- `otel4zTracing` - a layer that provides the OpenTelemetry tracing interceptor, which creates spans for incoming requests and other operations. + +All of these layers require an OpenTelemetry instance to be provided, but this layer to works with Zio runtime metrics must be provided during the application startup (aka bootstrap). + +The ZIOpenTelemetry trait provide this bootstrap layer, which is used to create the OpenTelemetry instance and provide it to the application. + +Full example of using the `otel4z` module can be found in the [ZIO OpenTelemetry example](https://tapir.softwaremill.com/en/latest/observability/ZIOpenTelemetryExample.scala) + diff --git a/examples/src/main/scala/sttp/tapir/examples/observability/ZIOpenTelemetryDefaultExample.skip b/examples/src/main/scala/sttp/tapir/examples/observability/ZIOpenTelemetryDefaultExample.skip new file mode 100644 index 0000000000..7d4e76745d --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/observability/ZIOpenTelemetryDefaultExample.skip @@ -0,0 +1,115 @@ +// {cat=Hello, World!; effects=ZIO; server=ZIO HTTP; json=zio; docs=Swagger UI}: ZIO OpenTelemetry tracing example + +//> using option -Xkind-projector +//> using dep com.softwaremill.sttp.tapir::tapir-core::1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-json-circe:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-zio-http-server:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-swagger-ui-bundle:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-zio:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-zio-opentelemetry:1.13.19 +//> using dep io.opentelemetry.semconv:opentelemetry-semconv:1.41.0 +//> using dep io.opentelemetry:opentelemetry-sdk:1.62.0 +//> using dep io.opentelemetry:opentelemetry-exporter-otlp:1.62.0 +//> using dep io.opentelemetry:opentelemetry-exporter-logging-otlp:1.62.0 +//> using dep dev.zio::zio-logging:2.5.3 +//> using dep dev.zio::zio-logging-slf4j2:2.5.3 +//> using dep dev.zio::zio-opentelemetry-zio-logging:3.1.17 +//> using dep ch.qos.logback:logback-classic:1.5.32 + +package sttp.tapir.examples.observability + +import io.opentelemetry.api +import io.opentelemetry.api.common.Attributes + +import sttp.tapir.server.interceptor.cors.CORSInterceptor +import sttp.tapir.server.ziopentelemetry.* +import sttp.tapir.server.ziohttp.* +import sttp.tapir.ztapir.* + +import zio.* +import zio.http.* +import zio.logging.backend.SLF4J +import zio.telemetry.opentelemetry.metrics.Meter +import zio.telemetry.opentelemetry.tracing.Tracing +import sttp.tapir.server.ServerEndpoint + +/** This example demonstrates how to use ZIO with Tapir and OpenTelemetry for tracing. It sets up a simple HTTP server with a single + * endpoint that returns "Hello, World!" and includes tracing for incoming requests. + * + * To enable tracing, we use the ZIOpenTelemetry trait, which provides a Tracing service. + * + * To effectively produce traces, you need to set the OTEL_EXPORTER_OTLP_ENDPOINT environment variable to the address of your + * OpenTelemetry. + */ +object ZIOpenTelemetryDefaultExample extends ZIOtelAppDefault("zio-observability-default", Some("1.0.0"), Some("dev")) { + override def consoleLogLayer: ZLayer[Any, Nothing, Unit] = Runtime.removeDefaultLoggers >>> SLF4J.slf4j + override def extraAttributes: Attributes = Attributes.builder().put("stack", "zio").build() + + val program = for + _ <- Console.printLine("Starting server on http://localhost:8080") + + given api.OpenTelemetry <- ZIO.service[api.OpenTelemetry] + + given Tracing <- ZIO.service[Tracing] + + _ <- ZIO.service[Meter] + + endpoints = ZIOHttpApiDefault.endpoints + + httpApp = ZioHttpInterpreter(serverOptions).toHttp( + endpoints + ) + _ <- Server + .serve(httpApp) + .provide( + Server.default + ) + yield () + + override def run = + program + + /** The server options for the ZIOpenTelemetry trait. + * + * This is the server options that will be used to run the ZIO application, hence provided by bootstrap. It includes the OpenTelemetry + * instance and the ContextStorage. + */ + private def serverOptions(implicit + otel: api.OpenTelemetry, + tracing: Tracing + ): ZioHttpServerOptions[Any] = ZioHttpServerOptions.customiseInterceptors + .prependInterceptor( + ZIOpenTelemetryTracing(tracing) + ) + .appendInterceptor( + CORSInterceptor.default + ) + .appendInterceptor(otel4zMetricsInterceptor()) + .serverLog( + ZioHttpServerOptions.defaultServerLog[Any] + ) + .options + + class ZIOHttpApiDefault(using tracing: Tracing) { + + import tracing.aspects.* + + def helloEndpoint: ServerEndpoint[Any, Task] = sttp.tapir.endpoint.get + .in("hello") + .out(stringBody) + .zServerLogic(_ => + ZIO.logInfo("Handling /hello request") *> + ZIO.succeed("Hello, World!") @@ span("hello-logic") + ) + + } + object ZIOHttpApiDefault { + + def endpoints(using tracing: Tracing): List[ServerEndpoint[Any, Task]] = + + val api = new ZIOHttpApiDefault(using tracing) + + List(api.helloEndpoint) + } + +} diff --git a/examples/src/main/scala/sttp/tapir/examples/observability/ZIOpenTelemetryExample.skip b/examples/src/main/scala/sttp/tapir/examples/observability/ZIOpenTelemetryExample.skip new file mode 100644 index 0000000000..d5d55c6840 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/observability/ZIOpenTelemetryExample.skip @@ -0,0 +1,229 @@ +// {cat=Hello, World!; effects=ZIO; server=ZIO HTTP; json=zio; docs=Swagger UI}: ZIO OpenTelemetry tracing example + +//> using option -Xkind-projector +//> using dep com.softwaremill.sttp.tapir::tapir-core::1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-json-circe:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-zio-http-server:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-swagger-ui-bundle:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-zio:1.13.19 +//> using dep com.softwaremill.sttp.tapir::tapir-zio-opentelemetry:1.13.19 +//> using dep io.opentelemetry.semconv:opentelemetry-semconv:1.41.0 +//> using dep io.opentelemetry:opentelemetry-sdk:1.62.0 +//> using dep io.opentelemetry:opentelemetry-exporter-otlp:1.62.0 +//> using dep io.opentelemetry:opentelemetry-exporter-logging-otlp:1.62.0 +//> using dep dev.zio::zio-logging:2.5.3 +//> using dep dev.zio::zio-logging-slf4j2:2.5.3 +//> using dep dev.zio::zio-opentelemetry-zio-logging:3.1.17 +//> using dep ch.qos.logback:logback-classic:1.5.32 + +package sttp.tapir.examples.observability + +import io.opentelemetry.api +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.OpenTelemetrySdk + +import sttp.tapir.server.interceptor.cors.CORSInterceptor +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.ziopentelemetry.* +import sttp.tapir.server.ziohttp.* +import sttp.tapir.ztapir.* + +import zio.* +import zio.http.* +import zio.logging.backend.SLF4J +import zio.telemetry.opentelemetry.metrics.Meter +import zio.telemetry.opentelemetry.tracing.Tracing +import zio.telemetry.opentelemetry.context.ContextStorage +import zio.telemetry.opentelemetry.OpenTelemetry + +/** This example demonstrates how to use ZIO with Tapir and OpenTelemetry for tracing. It sets up a simple HTTP server with a single + * endpoint that returns "Hello, World!" and includes tracing for incoming requests. + * + * To enable tracing, we use the ZIOpenTelemetry trait, which provides a Tracing service. + * + * To effectively produce traces, you need to set the OTEL_EXPORTER_OTLP_ENDPOINT environment variable to the address of your + * OpenTelemetry. + */ +object ZIOpenTelemetryExample + extends ZIOtelApp("zio-observability-custom", Some("1.0.0"), Some("dev")) + { + override def consoleLogLayer: ZLayer[Any, Nothing, Unit] = Runtime.removeDefaultLoggers >>> SLF4J.slf4j + override def extraAttributes: Attributes = Attributes.builder().put("stack", "zio").build() + + val program = for + _ <- Console.printLine("Starting server on http://localhost:8080") + + given api.OpenTelemetry <- ZIO.service[api.OpenTelemetry] + + given Tracing <- ZIO.service[Tracing] + + endpoints = ZIOHttpApi.endpoints + + httpApp = ZioHttpInterpreter(serverOptions).toHttp( + endpoints + ) + _ <- Server + .serve(httpApp) + .provide( + Server.default + ) + yield () + + override def run = + program.provideSome[Environment]( + // This layers provides sample custom metric, which will be visible in the OpenTelemetry collector and can be used to verify that the metrics are working. + TickCounter.tickRefLayer, + TickCounter.tickCounterLayer, + // This layer provides the OpenTelemetry Metrics service. + // Can be used to create custom metrics. + // Note this will be different Meter instance than the one used by the ZIO runtime or Tapir. + otel4zMetrics(resourceName), + // This layer provides the OpenTelemetry Tracing service, + // which is used to create spans for incoming requests and other operations. + otel4zTracing(resourceName) + + + ) + + /** The server options for the ZIOpenTelemetry trait. + * + * This is the server options that will be used to run the ZIO application, hence provided by bootstrap. It includes the OpenTelemetry + * instance and the ContextStorage. + */ + private def serverOptions(implicit + otel: api.OpenTelemetry, + tracing: Tracing + ): ZioHttpServerOptions[Any] = ZioHttpServerOptions.customiseInterceptors + .prependInterceptor( + ZIOpenTelemetryTracing(tracing) + ) + .appendInterceptor( + CORSInterceptor.default + ) + .appendInterceptor(otel4zMetricsInterceptor()) + .serverLog( + ZioHttpServerOptions.defaultServerLog[Any] + ) + .options +} +class ZIOHttpApi(using tracing: Tracing) { + + import tracing.aspects.* + + def helloEndpoint: ServerEndpoint[Any, Task] = sttp.tapir.endpoint.get + .in("hello") + .out(stringBody) + .zServerLogic(_ => + ZIO.logInfo("Handling /hello request") *> + ZIO.succeed("Hello, World!") @@ span("hello-logic") + ) + + } + object ZIOHttpApi { + + def endpoints(using tracing: Tracing): List[ServerEndpoint[Any, Task]] = + + val api = new ZIOHttpApi(using tracing) + + List(api.helloEndpoint) + } + + +/** A simple counter that increments every second and is exposed as an OpenTelemetry metric. + * + * This is used to demonstrate how to create custom metrics + */ +object TickCounter { + val tickRefLayer: ULayer[Ref[Long]] = + ZLayer( + for { + ref <- Ref.make(0L) + _ <- ref + .update(_ + 1) + .repeat[Any, Long](Schedule.spaced(1.second)) + .forkDaemon + } yield ref + ) + + // Records the number of seconds elapsed since the application startup + val tickCounterLayer: RLayer[Meter & Ref[Long], Unit] = + ZLayer.scoped( + for { + meter <- ZIO.service[Meter] + ref <- ZIO.service[Ref[Long]] + // Initialize observable counter instrument + _ <- meter.observableCounter("tick_counter") { om => + for { + tick <- ref.get + _ <- om.record(tick) + } yield () + } + } yield () + ) +} + +/* + +To provide runtime metrics, you can use the OpenTelemetry Runtime Telemetry module, which is available as a separate dependency. +It provides a RuntimeTelemetry class that can be instantiated with an OpenTelemetry instance and will automatically collect and export runtime metrics. + +"io.opentelemetry.instrumentation" % "opentelemetry-runtime-telemetry-java17" % + +object RuntimeMetrics { + def otel4zRuntimeTelemetry = ZLayer.fromZIO( + for { + openTelemetry <- ZIO.service[io.opentelemetry.api.OpenTelemetry] + _ <- ZIO.fromAutoCloseable(ZIO.succeed(RuntimeTelemetry.create(openTelemetry))) + } yield () + ) +} + */ + + + + + +/** ZIOtelApp is a trait that provides a ZIO layer for OpenTelemetry. + * @param name + */ +abstract class ZIOtelApp(val resourceName: String, val version: Option[String] = None, val environment: Option[String] = None) + extends ZIOtel with Logging + with Metrics + with Traces { + + /** The environment for the ZIOpenTelemetry trait. + * + * This is the environment that will be used to run the ZIO application, hence provided by bootstrap. + * + * It includes: + * - the OpenTelemetry instance. + * - the ContextStorage instance. + */ + override type Environment = api.OpenTelemetry & ContextStorage + + /** The tag for the ZIOpenTelemetry trait. */ + def environmentTag: Tag[Environment] = + Tag[Environment] + + /** The console log layer for the ZIOpenTelemetry trait. + * + * Default implementation uses the default ZIO console logger, which logs to stdout. You can override this to use a different logger, + * e.g. SLF4J, Logback, etc. To use SLF4J, you can use the following layer: + * {{{ + * def consoleLogLayer: ZLayer[Any, Nothing, Unit] = Runtime.removeDefaultLoggers >>> SLF4J.slf4j + * }}} + */ + def consoleLogLayer: ZLayer[Any, Nothing, Unit] = Runtime.removeDefaultLoggers >>> SLF4J.slf4j + + + /** The bootstrap layer for the ZIOpenTelemetry trait. + * + * This is the layer that will be used to bootstrap the ZIO application. It includes the OpenTelemetry layer, the Tracing layer, and the + * Meter layer. + */ + override def bootstrap: ZLayer[ZIOAppArgs, Any, Environment] = + consoleLogLayer >>> + OpenTelemetry.contextZIO >+> (otelProviders >>> + openTelemetryLive) >+> (otel4zLogging ++ zioMetrics ++ otel4zMetrics(resourceName) ++ otel4zTracing(resourceName)) + +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Logger.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Logger.scala new file mode 100644 index 0000000000..f55d89c44b --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Logger.scala @@ -0,0 +1,110 @@ +package sttp.tapir.server.ziopentelemetry + +import zio._ +import io.opentelemetry.api +import io.opentelemetry.sdk.logs.SdkLoggerProvider +import zio.telemetry.opentelemetry.context.ContextStorage +import zio.telemetry.opentelemetry.OpenTelemetry +import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter +import io.opentelemetry.sdk.logs.`export`.SimpleLogRecordProcessor +import io.opentelemetry.sdk.resources.Resource + +/** + * A trait that provides a logger provider for OpenTelemetry, + * which logs in OTLP gRPC format with [[LoggerProvider]] + */ +trait Logging { + this: ZIOtel => + + /** The log level to use for the OpenTelemetry logger provider. + * + * Uses the `OTEL_LOG_LEVEL` environment variable to determine the log level. + * + * By default, this is set to `INFO`. You can override this to change the log level, e.g. to `DEBUG` for more verbose logging. + */ + def logLevel = sys.env.getOrElse("OTEL_LOG_LEVEL", "INFO") match { + case "DEBUG" => LogLevel.Debug + case "INFO" => LogLevel.Info + case "WARN" => LogLevel.Warning + case "ERROR" => LogLevel.Error + case "TRACE" => LogLevel.Trace + case "ALL" => LogLevel.All + case _ => LogLevel.Info + } + + /** + * The OTLP endpoint to use for logging. + * + * Uses the `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT` environment variable to determine the endpoint. + * If not set, it will use the `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. + * If neither is set, it will return `None`. + * + * @return + */ + def logEndpoint: ZIO[Any, Nothing, Option[String]] = OtlpEndpoint("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT") match { + case None => + ZIO.logInfo( + "No OTLP logs endpoint configured, skipping OpenTelemetry logging setup. To enable it, set either OTEL_EXPORTER_OTLP_LOGS_ENDPOINT or OTEL_EXPORTER_OTLP_ENDPOINT environment variable." + ) *> ZIO.succeed(None) + + case Some(endpoint) => + ZIO.some(endpoint) + } + + /** Provides a logger provider for OpenTelemetry, which logs in OTLP gRPC format with [[LoggerProvider]] + */ + override def logProvider: URIO[Scope, Option[SdkLoggerProvider]] = for { + + endpointOption <- logEndpoint + + endpoint <- endpointOption match { + case Some(endpoint) => provider(endpoint).map(Some(_)) + case None => ZIO.none + } + + } yield endpoint + + + private def provider(endpoint: String): URIO[Scope, SdkLoggerProvider] = + for { + _ <- ZIO.logInfo(s"Configuring OpenTelemetry logging to $endpoint") + logRecordExporter <- + ZIO.fromAutoCloseable( + ZIO.succeed( + OtlpGrpcLogRecordExporter + .builder() + .setEndpoint(endpoint) + .build() + ) + ) + logRecordProcessor <- + ZIO.fromAutoCloseable( + ZIO.succeed(SimpleLogRecordProcessor.create(logRecordExporter)) + ) + loggerProvider <- + ZIO.fromAutoCloseable( + ZIO.succeed( + SdkLoggerProvider + .builder() + .setResource( + Resource.create( + attributes + ) + ) + .addLogRecordProcessor(logRecordProcessor) + .build() + ) + ) + } yield loggerProvider + + + + /** A OpenTelemetry logging layer. + * + * @return + */ + override def otel4zLogging: URLayer[api.OpenTelemetry with ContextStorage, Unit] = OpenTelemetry.logging( + instrumentationScopeName = resourceName, + logLevel = logLevel + ) +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Metrics.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Metrics.scala new file mode 100644 index 0000000000..616fc2f3aa --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Metrics.scala @@ -0,0 +1,145 @@ +package sttp.tapir.server.ziopentelemetry + +import zio._ +import io.opentelemetry.api +import io.opentelemetry.sdk.metrics.SdkMeterProvider + +import zio.telemetry.opentelemetry.OpenTelemetry +import zio.telemetry.opentelemetry.context.ContextStorage +import io.opentelemetry.sdk.metrics.`export`.PeriodicMetricReader +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter +import io.opentelemetry.sdk.metrics.`export`.AggregationTemporalitySelector +import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor +import sttp.tapir.server.metrics.opentelemetry.OpenTelemetryMetrics + +import zio.telemetry.opentelemetry.metrics.Meter +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder + +trait Metrics { + this: ZIOtel => + + + def collectZioMetrics: Boolean = false + /** Provides a meter provider for OpenTelemetry, which exports metrics in OTLP gRPC format with [[MeterProvider]] + */ + + def metricExporter(endpoint: String): OtlpGrpcMetricExporter = { + + /** + * Creates a new [[OtlpGrpcMetricExporterBuilder]] instance. + * + * If `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment variable is set to "DELTA" (case insensitive), + * the exporter will be DELTA. + * + * @return + */ + def otlpGrpcMetricExporterBuilder: OtlpGrpcMetricExporterBuilder = OtlpGrpcMetricExporter + .builder() + sys.env + .get("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE") + .filter(v => v.toUpperCase().equals("DELTA")) + .foreach(_ => otlpGrpcMetricExporterBuilder.setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())) + + otlpGrpcMetricExporterBuilder + .setEndpoint(endpoint) + .build() + } + + + def meterEndpoint: ZIO[Any, Nothing, Option[String]] = OtlpEndpoint("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") match { + case None => + ZIO.logInfo( + "No OTLP metrics endpoint configured, skipping OpenTelemetry metrics setup. To enable it, set either OTEL_EXPORTER_OTLP_METRICS_ENDPOINT or OTEL_EXPORTER_OTLP_ENDPOINT environment variable." + ) *> ZIO.succeed(None) + + case Some(endpoint) => + ZIO.some(endpoint) + } + /** Provides a meter provider for OpenTelemetry, which logs in OTLP Json format as gRPC if either of the following environment variables + * is set: + * - `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` + * - `OTEL_EXPORTER_OTLP_ENDPOINT` + * If `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment variable is set to "DELTA" (case insensitive), the exporter will be + * configured to prefer delta temporality for metrics, otherwise it will use the default cumulative temporality. + */ + override def meterProvider: URIO[Scope, Option[SdkMeterProvider]] = + for { + endpointOption <- meterEndpoint + endpoint <- endpointOption match { + case Some(endpoint) => provider(endpoint).map(Some(_)) + case None => ZIO.none + } + } yield endpoint + + def provider(endpoint: String): URIO[Scope, SdkMeterProvider] = + + for { + _ <- ZIO.logInfo(s"Configuring OpenTelemetry metrics to $endpoint") + metricExporter <- ZIO.fromAutoCloseable( + ZIO.succeed(metricExporter(endpoint)) + ) + metricReader <- + ZIO.fromAutoCloseable( + ZIO.succeed( + PeriodicMetricReader + .builder(metricExporter) + .setInterval(5.second) + .build() + ) + ) + meterProvider <- + ZIO.fromAutoCloseable( + ZIO.succeed( + SdkMeterProvider + .builder() + .registerMetricReader(metricReader) + .setResource( + Resource.create( + attributes + ) + ) + .build() + ) + ) + } yield meterProvider + + + override def zioMetrics: URLayer[io.opentelemetry.api.OpenTelemetry & ContextStorage, Unit] = + if (collectZioMetrics) OpenTelemetry.metrics("zio") >>> OpenTelemetry.zioMetrics + else ZLayer.unit + + /** A OpenTelemetry metrics layer, with configurable instrumentation scope name, version and schema url. + * + * @param instrumentationScopeName + * @param instrumentationVersion + * @param schemaUrl + * @param logAnnotated + * @return + */ + override def otel4zMetrics( + instrumentationScopeName: String, + + ): URLayer[io.opentelemetry.api.OpenTelemetry & ContextStorage, Meter] = OpenTelemetry.metrics( + instrumentationScopeName = instrumentationScopeName, + ) + + /** A OpenTelemetry metrics interceptor for tapir, with configurable instrumentation scope name. + * + * It uses the OpenTelemetry instance from the environment, which is provided by the [[ZIOpenTelemetry]] trait bootstrap layer. + * + * @param instrumentationScopeName + * @param otel + * @return + */ + def otel4zMetricsInterceptor( + instrumentationScopeName: String = "tapir" + )(implicit otel: api.OpenTelemetry): MetricsRequestInterceptor[Task] = { + val meter: api.metrics.Meter = otel.meterBuilder(instrumentationScopeName).build() + + val metrics = OpenTelemetryMetrics.default[Task](meter) + + metrics.metricsInterceptor() + } + +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/OtelEndpoint.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/OtelEndpoint.scala new file mode 100644 index 0000000000..c2b3861b24 --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/OtelEndpoint.scala @@ -0,0 +1,22 @@ +package sttp.tapir.server.ziopentelemetry + +object OtlpEndpoint { + + /** OTLP gRPC endpoint to export telemetry data to. + * + * It can be set via: + * + * - environment variable provided as `envVar` + * - environment variable "OTEL_EXPORTER_OTLP_ENDPOINT" + * - defaults to "http://localhost:4317" + * + * See https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#otel_exporter_otlp_endpoint. + * + * @param envVar + * @return + */ + def apply(envVar: String): Option[String] = + sys.env + .get(envVar) + .orElse(sys.env.get("OTEL_EXPORTER_OTLP_ENDPOINT")) +} \ No newline at end of file diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Traces.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Traces.scala new file mode 100644 index 0000000000..f41034c277 --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/Traces.scala @@ -0,0 +1,75 @@ +package sttp.tapir.server.ziopentelemetry + +import zio._ +import io.opentelemetry.sdk.trace.SdkTracerProvider +import zio.telemetry.opentelemetry.OpenTelemetry +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter +import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor +import io.opentelemetry.sdk.resources.Resource +import zio.telemetry.opentelemetry.context.ContextStorage +import zio.telemetry.opentelemetry.tracing.Tracing + +trait Traces { + this: ZIOtel => + + + def traceEndpoint: ZIO[Any, Nothing, Option[String]] = OtlpEndpoint("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") match { + case None => + ZIO.logInfo( + "No OTLP traces endpoint configured, skipping OpenTelemetry tracing setup. To enable it, set either OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or OTEL_EXPORTER_OTLP_ENDPOINT environment variable." + ) *> ZIO.succeed(None) + + case Some(endpoint) => + ZIO.some(endpoint) + } + /** Provides a tracer provider for OpenTelemetry, which logs in OTLP Json format as gRPC if either of the following environment variables + * is set: + * - `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` + * - `OTEL_EXPORTER_OTLP_ENDPOINT` + */ + override def tracerProvider: URIO[Scope, Option[SdkTracerProvider]] = for { + + endpointOption <- traceEndpoint + + endpoint <- endpointOption match { + case Some(endpoint) => provider(endpoint).map(Some(_)) + case None => ZIO.none + } + + } yield endpoint + + private def provider(endpoint: String): URIO[Scope, SdkTracerProvider] = + + for { + _ <- ZIO.logInfo(s"Configuring OpenTelemetry tracing to $endpoint") + spanExporter <- ZIO.fromAutoCloseable( + ZIO.succeed(OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).build()) + ) + spanProcessor <- ZIO.fromAutoCloseable( + ZIO.succeed(SimpleSpanProcessor.create(spanExporter)) + ) + tracerProvider <- + ZIO.fromAutoCloseable( + ZIO.succeed( + SdkTracerProvider + .builder() + .setResource( + Resource.create( + attributes + ) + ) + .addSpanProcessor(spanProcessor) + .build() + ) + ) + } yield tracerProvider + + + + + override def otel4zTracing( + instrumentationScopeName: String + ): ZLayer[io.opentelemetry.api.OpenTelemetry & ContextStorage, Nothing, Tracing] = OpenTelemetry.tracing( + instrumentationScopeName = instrumentationScopeName + ) +} \ No newline at end of file diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracing.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracing.scala new file mode 100644 index 0000000000..2b5f057133 --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracing.scala @@ -0,0 +1,221 @@ +package sttp.tapir.server.ziopentelemetry + +import zio._ + +import collection.mutable.{Map => MutableMap} + +import sttp.monad.MonadError +import sttp.model.{StatusCode => SttpStatusCode} +import sttp.tapir.AnyEndpoint +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interceptor.RequestResult.{Failure, Response} +import sttp.tapir.server.interceptor._ +import sttp.tapir.server.interpreter.BodyListener +import sttp.tapir.server.model.ServerResponse + +import io.opentelemetry.api.trace.Span + +import zio.telemetry.opentelemetry.tracing.Tracing +import io.opentelemetry.api.trace.SpanKind + +import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.api.common.Attributes +import zio.telemetry.opentelemetry.context.IncomingContextCarrier + +/** Interceptor which traces requests using ZIO OpenTelemetry. + * + * Span names and attributes are calculated using the provided [[ZIOpenTelemetryTracingConfig]]. + * + * To use, customize the interceptors of the server interpreter you are using, and prepend this interceptor, so that it runs as early as + * possible, e.g.: + * + * {{{ + * def serverOptions(using + * tracing: Tracing + * ): ZioHttpServerOptions[Any] = + * ZioHttpServerOptions.customiseInterceptors + * .prependInterceptor( + * ZIOpenTelemetryTracing(tracing) + * ) + * .appendInterceptor( + * CORSInterceptor.default + * ) + * .serverLog( + * ZioHttpServerOptions.defaultServerLog + * ) + * .options + * }}} + */ + +class ZIOpenTelemetryTracing( + tracing: Tracing, + config: ZIOpenTelemetryTracingConfig +) extends RequestInterceptor[Task] { + + import config._ + + private def extractCarrier(request: ServerRequest) = { + val headers = request.headers + val carrier = MutableMap.empty[String, String] + headers.foreach(h => carrier.put(h.name, h.value)) + IncomingContextCarrier.default(carrier) + } + + override def apply[R, B]( + responder: Responder[Task, B], + requestHandler: EndpointInterceptor[Task] => RequestHandler[Task, R, B] + ): RequestHandler[Task, R, B] = + + new RequestHandler[Task, R, B] { + override def apply( + request: ServerRequest, + endpoints: List[ServerEndpoint[R, Task]] + )(implicit monad: MonadError[Task]): Task[RequestResult[B]] = tracing + .extractSpanUnsafe( + config.propagator, + extractCarrier(request), + request.showShort, + spanKind = SpanKind.SERVER, + attributes = config.requestAttributes(request) + ) + .flatMap { case (span, finalize) => + handleRequest(span, request, endpoints) + .tapError(th => spanError(span)(Right(th))) + .ensuring(finalize) + } + + /** Handle the request, setting span attributes and status based on the result. + * + * @param span + * @param request + * @param endpoints + * @param monad + * @return + */ + def handleRequest( + span: Span, + request: ServerRequest, + endpoints: List[ServerEndpoint[R, Task]] + )(implicit monad: MonadError[Task]) = + for { + requestResult <- requestHandler( + knownEndpointInterceptor(request, span) + )(request, endpoints) + _ <- requestResult match { + case Response(response, _) => + setSpanAttributes( + span, + responseAttributes(request, response) + ) *> ZIO.when(response.isServerError)( + spanError(span)(Left(response.code)) + ) + case Failure(_) => + // ignore, request not handled + ZIO.unit + } + } yield requestResult + + /** Interceptor which sets span name and attributes based on the matched endpoint. + * + * @param request + * @param span + * @return + */ + def knownEndpointInterceptor( + request: ServerRequest, + span: Span + ) = + new EndpointInterceptor[Task] { + def apply[B2]( + responder: Responder[Task, B2], + endpointHandler: EndpointHandler[Task, B2] + ): EndpointHandler[Task, B2] = new EndpointHandler[Task, B2] { + def onDecodeFailure( + ctx: DecodeFailureContext + )(implicit + monad: MonadError[Task], + bodyListener: BodyListener[Task, B2] + ): Task[Option[ServerResponse[B2]]] = + endpointHandler.onDecodeFailure(ctx).flatMap { + case result @ Some(_) => + knownEndpoint(ctx.endpoint).map(_ => result) + case None => monad.unit(None) + } + + def onDecodeSuccess[A, U, I]( + ctx: DecodeSuccessContext[Task, A, U, I] + )(implicit + monad: MonadError[Task], + bodyListener: BodyListener[Task, B2] + ): Task[ServerResponse[B2]] = + knownEndpoint(ctx.endpoint).flatMap(_ => endpointHandler.onDecodeSuccess(ctx)) + + def onSecurityFailure[A]( + ctx: SecurityFailureContext[Task, A] + )(implicit + monad: MonadError[Task], + bodyListener: BodyListener[Task, B2] + ): Task[ServerResponse[B2]] = + knownEndpoint(ctx.endpoint).flatMap(_ => endpointHandler.onSecurityFailure(ctx)) + + def knownEndpoint( + e: AnyEndpoint + ): Task[Unit] = { + val (name, attributes) = + spanNameFromEndpointAndAttributes(request, e) + ZIO.succeed { + span + .updateName(name) + span.setAllAttributes(attributes) + }.unit + } + } + } + + /** Set span status and attributes for errors, both exceptions and error status. + */ + private def spanError( + span: Span + )(error: Either[SttpStatusCode, Throwable]): Task[Unit] = + ZIO.succeed { + span.setStatus(StatusCode.ERROR) + span.setAllAttributes(errorAttributes(error)) + }.unit + + private def setSpanAttributes( + span: Span, + attributes: Attributes + ): Task[Unit] = + ZIO.succeed(span.setAllAttributes(attributes)).unit + + } +} + +object ZIOpenTelemetryTracing { + + /** Create a new ZIOpenTelemetryTracing interceptor with the provided Tracing and default configuration. + * + * @param tracing + * @return + */ + def apply( + tracing: Tracing + ): ZIOpenTelemetryTracing = + new ZIOpenTelemetryTracing( + tracing, + ZIOpenTelemetryTracingConfig() + ) + + /** Create a new ZIOpenTelemetryTracing interceptor with the provided Tracing and configuration. + */ + def apply( + tracing: Tracing, + config: ZIOpenTelemetryTracingConfig + ): ZIOpenTelemetryTracing = + new ZIOpenTelemetryTracing( + tracing, + config + ) + +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracingConfig.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracingConfig.scala new file mode 100644 index 0000000000..0c5b539ab4 --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracingConfig.scala @@ -0,0 +1,131 @@ +package sttp.tapir.server.ziopentelemetry + +import sttp.model.headers.{Forwarded, Host} +import sttp.model.{HeaderNames, StatusCode} +import sttp.tapir.AnyEndpoint +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.model.ServerResponse + +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.HttpAttributes +import io.opentelemetry.semconv.UrlAttributes +import io.opentelemetry.semconv.ServerAttributes +import io.opentelemetry.semconv.ErrorAttributes +import scala.annotation.nowarn +import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator + +/** Configuration for OpenTelemetry Otel4z tracing of server requests, used by [[ZIOpenTelemetry]]. Use the apply method to override only + * some of the configuration options, while using the defaults for the rest. + * + * The default values follow OpenTelemetry semantic conventions, as described in [their + * documentation](https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name). + * @param propagator + * The propagator to use for extracting and injecting trace context. + * @param spanName + * Calculates the name of the span, given an incoming request. + * @param requestAttributes + * Calculates the attributes of the span, given an incoming request. + * @param spanNameFromEndpointAndAttributes + * Calculates an updated name of the span and additional attributes, once (and if) an endpoint is determined to handle the request. By + * default, the span name includes the request's method and the route, which is created by rendering the endpoint's path template. + * @param responseAttributes + * Calculates additional attributes of the span, given a response that will be sent back. + * @param errorAttributes + * Calculates additional attributes of the span, given an error that occurred while processing the request (an exception); although + * usually, exceptions are translated into 5xx responses earlier in the interceptor chain. + */ +case class ZIOpenTelemetryTracingConfig( + propagator: TraceContextPropagator, + spanName: ServerRequest => String, + requestAttributes: ServerRequest => Attributes, + spanNameFromEndpointAndAttributes: (ServerRequest, AnyEndpoint) => ( + String, + Attributes + ), + responseAttributes: (ServerRequest, ServerResponse[?]) => Attributes, + errorAttributes: Either[StatusCode, Throwable] => Attributes +) + +object ZIOpenTelemetryTracingConfig { + def apply( + propagator: TraceContextPropagator = TraceContextPropagator.default, + spanName: ServerRequest => String = Defaults.spanName, + requestAttributes: ServerRequest => Attributes = Defaults.requestAttributes, + spanNameFromEndpointAndAttributes: (ServerRequest, AnyEndpoint) => ( + String, + Attributes + ) = Defaults.spanNameFromEndpointAndAttributes, + responseAttributes: (ServerRequest, ServerResponse[?]) => Attributes = Defaults.responseAttributes, + errorAttributes: Either[StatusCode, Throwable] => Attributes = Defaults.errorAttributes + ): ZIOpenTelemetryTracingConfig = + new ZIOpenTelemetryTracingConfig( + propagator, + spanName, + requestAttributes, + spanNameFromEndpointAndAttributes, + responseAttributes, + errorAttributes + ) + + /** @see + * https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name + * @see + * https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-server + */ + object Defaults { + def spanNameFromEndpointAndAttributes( + request: ServerRequest, + endpoint: AnyEndpoint + ): (String, Attributes) = { + val route = endpoint.showPathTemplate(showQueryParam = None) + val name = s"${request.method.method} $route" + (name, Attributes.of(HttpAttributes.HTTP_ROUTE, route)) + } + + def requestAttributes(request: ServerRequest): Attributes = { + val hostHeader: String = request + .header(HeaderNames.Forwarded) + .flatMap(f => Forwarded.parse(f).toOption.flatMap(_.headOption).flatMap(_.host)) + .orElse(request.header(HeaderNames.XForwardedHost)) + .orElse(request.header(":authority")) + .orElse(request.header(HeaderNames.Host)) + .getOrElse("unknown") + + val (host, _) = Host.parseHostAndPort(hostHeader) + + Attributes.of( + HttpAttributes.HTTP_REQUEST_METHOD, + request.method.method, + UrlAttributes.URL_PATH, + request.uri.pathToString, + UrlAttributes.URL_SCHEME, + request.uri.scheme.getOrElse("http"), + ServerAttributes.SERVER_ADDRESS, + host + ) + + } + + def spanName(request: ServerRequest): String = s"${request.method.method}" + + @nowarn + def responseAttributes( + request: ServerRequest, + response: ServerResponse[_] + ): Attributes = + Attributes.of( + HttpAttributes.HTTP_RESPONSE_STATUS_CODE, + java.lang.Long.valueOf(response.code.code.toLong) + ) + + def errorAttributes(error: Either[StatusCode, Throwable]): Attributes = + error match { + case Left(statusCode) => + // see footnote for error.type + Attributes.of(ErrorAttributes.ERROR_TYPE, statusCode.code.toString) + case Right(exception) => + val errorType = exception.getClass.getSimpleName + Attributes.of(ErrorAttributes.ERROR_TYPE, errorType) + } + } +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOtel.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOtel.scala new file mode 100644 index 0000000000..e27f432ba8 --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOtel.scala @@ -0,0 +1,115 @@ +package sttp.tapir.server.ziopentelemetry + +import zio._ +import io.opentelemetry.api +import io.opentelemetry.sdk.logs.SdkLoggerProvider +import zio.telemetry.opentelemetry.context.ContextStorage +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.ServiceAttributes +import io.opentelemetry.semconv.DeploymentAttributes +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.trace.SdkTracerProvider +import io.opentelemetry.sdk.OpenTelemetrySdk +import zio.telemetry.opentelemetry.metrics.Meter +import zio.telemetry.opentelemetry.tracing.Tracing + + +/** OtelProviders is a case class that holds the OpenTelemetry providers for tracing, metrics and logging. + * + * It is used to build the OpenTelemetry + * + * @param tracerProvider + * @param meterProvider + * @param loggerProvider + */ +case class OtelProviders( + tracerProvider: Option[SdkTracerProvider], + meterProvider: Option[SdkMeterProvider], + loggerProvider: Option[SdkLoggerProvider] +) { + + def build(): OpenTelemetrySdk = { + val builder = OpenTelemetrySdk + .builder() + tracerProvider.foreach(builder.setTracerProvider) + meterProvider.foreach(builder.setMeterProvider) + loggerProvider.foreach(builder.setLoggerProvider) + builder.build() + } + +} + +trait ZIOtel extends ZIOApp{ + + type Environment <: api.OpenTelemetry with ContextStorage + + def resourceName: String + + def version: Option[String] + + def environment: Option[String] + + /** The OpenTelemetry logging layer for the ZIOpenTelemetry trait. + * + * By default, no logging layer is provided. When mixing in the [[Logging]] trait, this layer is provided, and will be used by bootstrap. + */ + protected def otel4zLogging: URLayer[api.OpenTelemetry with ContextStorage, Unit] = ZLayer.unit + + protected def zioMetrics: URLayer[api.OpenTelemetry with ContextStorage, Unit] = ZLayer.unit + + /** The OpenTelemetry [[SdkLoggerProvider]] for the ZIOpenTelemetry trait. + * + * By default, no logger provider is provided. You can override this to provide a logger provider, e.g. to export logs in OTLP gRPC + * format to collector. + * + * Or mixing in the [[Logging]] trait, which provides a logger provider that exports logs in OTLP gRPC format to collector. + */ + protected def logProvider: URIO[Scope, Option[SdkLoggerProvider]] = ZIO.none + + protected def meterProvider: URIO[Scope, Option[SdkMeterProvider]] = ZIO.none + + protected def tracerProvider: URIO[Scope, Option[SdkTracerProvider]] = ZIO.none + + /** Extra attributes to be added to the resource. */ + def extraAttributes: Attributes = Attributes.empty + + /** The attributes of the resource, advertised to the OpenTelemetry collector. */ + def attributes: Attributes = { + val builder = Attributes + .builder() + .put(ServiceAttributes.SERVICE_NAME, resourceName) + version.foreach(v => builder.put(ServiceAttributes.SERVICE_VERSION, v)) + environment.foreach(e => builder.put(DeploymentAttributes.DEPLOYMENT_ENVIRONMENT_NAME, e)) + builder.putAll(extraAttributes).build() + } + + /** The OpenTelemetry providers for the ZIOpenTelemetry trait. + * + * This is the layer that will be used to provide the OpenTelemetry providers to the ZIO application. It includes the OpenTelemetry + * logger provider, the OpenTelemetry meter provider, and the OpenTelemetry tracer provider. + */ + final def otelProviders: ULayer[OtelProviders] = ZLayer.scoped[Any](for { + logger <- logProvider + meter <- meterProvider + tracer <- tracerProvider + } yield OtelProviders(tracer, meter, logger)) + + + def otel4zMetrics( + instrumentationScopeName: String, + ): URLayer[io.opentelemetry.api.OpenTelemetry & ContextStorage, Meter] = ZLayer.succeed(noop.NoopMeter(instrumentationScopeName)) + + def otel4zTracing( + instrumentationScopeName: String + ): ZLayer[io.opentelemetry.api.OpenTelemetry & ContextStorage, Nothing, Tracing] = ZLayer.succeed(noop.NoopTracing(instrumentationScopeName)) + + def openTelemetryLive: ZLayer[OtelProviders, Nothing, OpenTelemetrySdk] = ZLayer.scoped[OtelProviders]( + for { + otelProviders <- ZIO.service[OtelProviders] + openTelemetry <- ZIO.fromAutoCloseable( + ZIO.succeed(otelProviders.build()) + ) + + } yield openTelemetry + ) +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOtelAppDefault.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOtelAppDefault.scala new file mode 100644 index 0000000000..dcf2b60c0c --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/ZIOtelAppDefault.scala @@ -0,0 +1,56 @@ +package sttp.tapir.server.ziopentelemetry + + +import io.opentelemetry.api +import zio._ +import zio.telemetry.opentelemetry.context.ContextStorage +import zio.telemetry.opentelemetry.metrics.Meter +import zio.telemetry.opentelemetry.tracing.Tracing +import zio.telemetry.opentelemetry.OpenTelemetry + + +/** ZIOtelAppDefault provides a default implementation of the ZIOtel trait. + * + * With logging, metrics and tracing enabled. + */ +abstract class ZIOtelAppDefault(val resourceName: String, val version: Option[String] = None, val environment: Option[String] = None) + extends ZIOtel with Logging + with Metrics + with Traces { + + /** The environment for the ZIOpenTelemetry trait. + * + * This is the environment that will be used to run the ZIO application, hence provided by bootstrap. + * + * It includes: + * - the OpenTelemetry instance. + * - the ContextStorage instance. + */ + override type Environment = api.OpenTelemetry with ContextStorage with Meter with Tracing + + /** The tag for the ZIOpenTelemetry trait. */ + def environmentTag: Tag[Environment] = + Tag[Environment] + + /** The console log layer for the ZIOpenTelemetry trait. + * + * Default implementation uses the default ZIO console logger, which logs to stdout. You can override this to use a different logger, + * e.g. SLF4J, Logback, etc. To use SLF4J, you can use the following layer: + * {{{ + * def consoleLogLayer: ZLayer[Any, Nothing, Unit] = Runtime.removeDefaultLoggers >>> SLF4J.slf4j + * }}} + */ + def consoleLogLayer: ZLayer[Any, Nothing, Unit] = ZLayer.unit + + + /** The bootstrap layer for the ZIOpenTelemetry trait. + * + * This is the layer that will be used to bootstrap the ZIO application. It includes the OpenTelemetry layer, the Tracing layer, and the + * Meter layer. + */ + override def bootstrap: ZLayer[ZIOAppArgs, Any, Environment] = + consoleLogLayer >>> + OpenTelemetry.contextZIO >+> (otelProviders >>> + openTelemetryLive) >+> (otel4zLogging ++ zioMetrics ++ otel4zMetrics(resourceName) ++ otel4zTracing(resourceName)) + +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/noop/NoopMeter.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/noop/NoopMeter.scala new file mode 100644 index 0000000000..5675db4001 --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/noop/NoopMeter.scala @@ -0,0 +1,42 @@ +package sttp.tapir.server.ziopentelemetry.noop + +import zio._ +import zio.telemetry.opentelemetry.metrics._ +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.context.Context + +object NoopMeter { + + def apply(instrumentationScopeName: String): Meter = new Meter{ + + override def toString(): String = instrumentationScopeName + + override def counter( + name: String, + unit: Option[String] = None, + description: Option[String] = None + )(implicit trace: Trace): UIO[Counter[Long]] = ZIO.succeed(new Counter[Long]{ + def add(value: Long, attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + def inc(attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + def record0(value: Long, attributes: Attributes, context: Context): Unit = () + }) + + def histogram(name: String, unit: Option[String], description: Option[String], boundaries: Option[Chunk[Double]])(implicit trace: Trace): UIO[Histogram[Double]] = ZIO.succeed(new Histogram[Double]{ + def record(value: Double, attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + def record0(value: Double, attributes: Attributes, context: Context): Unit = () + }) + + def observableCounter(name: String, unit: Option[String], description: Option[String])(callback: ObservableMeasurement[Long] => Task[Unit])(implicit trace: Trace): RIO[Scope, Unit] = ZIO.unit + + def observableGauge(name: String, unit: Option[String], description: Option[String])(callback: ObservableMeasurement[Double] => Task[Unit])(implicit trace: Trace): RIO[Scope, Unit] = ZIO.unit + + def observableUpDownCounter(name: String, unit: Option[String], description: Option[String])(callback: ObservableMeasurement[Long] => Task[Unit])(implicit trace: Trace): RIO[Scope, Unit] = ZIO.unit + + def upDownCounter(name: String, unit: Option[String], description: Option[String])(implicit trace: Trace): UIO[UpDownCounter[Long]] = ZIO.succeed(new UpDownCounter[Long]{ + def add(value: Long, attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + def inc(attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + def dec(attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + def record0(value: Long, attributes: Attributes, context: Context): Unit = () + }) + } +} diff --git a/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/noop/NoopTracing.scala b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/noop/NoopTracing.scala new file mode 100644 index 0000000000..4f8941b6db --- /dev/null +++ b/observability/zio-opentelemetry/src/main/scala/sttp/tapir/server/ziopentelemetry/noop/NoopTracing.scala @@ -0,0 +1,86 @@ +package sttp.tapir.server.ziopentelemetry.noop + +import zio.telemetry.opentelemetry.tracing.Tracing +import zio._ +import io.opentelemetry.api.common.Attributes +import zio.telemetry.opentelemetry.context.IncomingContextCarrier +import io.opentelemetry.api.trace.SpanContext +import io.opentelemetry.api.trace.SpanKind +import zio.telemetry.opentelemetry.tracing.StatusMapper +import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator +import io.opentelemetry.api.trace.Span +import io.opentelemetry.context.Context +import zio.telemetry.opentelemetry.context.OutgoingContextCarrier +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import zio.telemetry.opentelemetry.common.Attribute +import io.opentelemetry.api.common.AttributeKey + +object NoopTracing { + def apply(instrumentationScopeName: String): Tracing = new Tracing { + + override def toString(): String = instrumentationScopeName + + def addEvent(name: String)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def addEventWithAttributes(name: String, attributes: Attributes)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def extractSpan[C, R, E, E1 <: E, A, A1 <: A](propagator: TraceContextPropagator, carrier: IncomingContextCarrier[C], spanName: String, spanKind: SpanKind, attributes: Attributes, statusMapper: StatusMapper[E, A], links: Seq[SpanContext])(zio: => ZIO[R, E1, A1])(implicit trace: Trace): ZIO[R, E1, A1] = + zio + + def extractSpanUnsafe[C](propagator: TraceContextPropagator, carrier: IncomingContextCarrier[C], spanName: String, spanKind: SpanKind, attributes: Attributes, links: Seq[SpanContext])(implicit trace: Trace): UIO[(Span, UIO[Any])] = + ZIO.succeed((Span.getInvalid(), ZIO.unit)) + + def getCurrentContextUnsafe(implicit trace: Trace): UIO[Context] = + ZIO.succeed(Context.current()) + + def getCurrentSpanUnsafe(implicit trace: Trace): UIO[Span] = + ZIO.succeed(Span.current()) + + def inSpan[R, E, E1 <: E, A, A1 <: A](span: Span, spanName: String, spanKind: SpanKind, attributes: Attributes, statusMapper: StatusMapper[E, A], links: Seq[SpanContext])(zio: => ZIO[R, E1, A1])(implicit trace: Trace): ZIO[R, E1, A1] = + zio + + def getCurrentSpanContextUnsafe(implicit trace: Trace): UIO[SpanContext] = ZIO.succeed(Span.current().getSpanContext()) + + def root[R, E, E1 <: E, A, A1 <: A](spanName: String, spanKind: SpanKind, attributes: Attributes, statusMapper: StatusMapper[E, A], links: Seq[SpanContext])(zio: => ZIO[R, E1, A1])(implicit trace: Trace): ZIO[R, E1, A1] = + zio + + def span[R, E, E1 <: E, A, A1 <: A](spanName: String, spanKind: SpanKind, attributes: Attributes, statusMapper: StatusMapper[E, A], links: Seq[SpanContext])(zio: => ZIO[R, E1, A1])(implicit trace: Trace): ZIO[R, E1, A1] = + zio + + def scopedEffect[A](effect: => A)(implicit trace: Trace): Task[A] = ZIO.attempt(effect) + + def injectSpan[C](propagator: TraceContextPropagator, carrier: OutgoingContextCarrier[C])(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def scopedEffectFromFuture[A](make: ExecutionContext => Future[A])(implicit trace: Trace): Task[A] = ZIO.fromFuture(make) + + def scopedEffectTotal[A](effect: => A)(implicit trace: Trace): UIO[A] = ZIO.succeed(effect) + + def spanScoped(spanName: String, spanKind: SpanKind, attributes: Attributes, statusMapper: StatusMapper[Any, Unit], links: Seq[SpanContext])(implicit trace: Trace): ZIO[Scope, Nothing, Unit] = ZIO.unit + + def spanUnsafe(spanName: String, spanKind: SpanKind, attributes: Attributes, links: Seq[SpanContext])(implicit trace: Trace): UIO[(Span, UIO[Any])] = + ZIO.succeed((Span.getInvalid(), ZIO.unit)) + + def setAttribute[T](attribute: Attribute[T])(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, value: String)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, value: Boolean)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, value: Long)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, value: Double)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, values: Seq[String])(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute[T](key: AttributeKey[T], value: T)(implicit trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, values: Seq[Boolean])(implicit i1: DummyImplicit, trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, values: Seq[Long])(implicit i1: DummyImplicit, i2: DummyImplicit, trace: Trace): UIO[Unit] = ZIO.unit + + def setAttribute(name: String, values: Seq[Double])(implicit i1: DummyImplicit, i2: DummyImplicit, i3: DummyImplicit, trace: Trace): UIO[Unit] = ZIO.unit + + } + +} diff --git a/observability/zio-opentelemetry/src/test/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracingSpec.scala b/observability/zio-opentelemetry/src/test/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracingSpec.scala new file mode 100644 index 0000000000..67e52110e4 --- /dev/null +++ b/observability/zio-opentelemetry/src/test/scala/sttp/tapir/server/ziopentelemetry/ZIOpenTelemetryTracingSpec.scala @@ -0,0 +1,666 @@ +package sttp.tapir.server.ziopentelemetry + +import scala.jdk.CollectionConverters._ +import scala.util.{Success, Try} + +import sttp.capabilities.Streams +import sttp.model.{Header, HeaderNames, Method, Uri} +import sttp.model.Uri._ +import sttp.model.headers.Forwarded +import sttp.monad.MonadError +import sttp.tapir._ +import sttp.tapir.TestUtil.serverRequestFromUri +import sttp.tapir.capabilities.NoStreams +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interpreter._ +import sttp.tapir.server.ziopentelemetry.ZIOpenTelemetryTracing +import sttp.tapir.server.TestUtil.StringToResponseBody + +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.trace.{SpanKind, Tracer, StatusCode => OtelStatusCode} +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter +import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor +import io.opentelemetry.sdk.trace.SdkTracerProvider +import io.opentelemetry.sdk.trace.data.SpanData +import io.opentelemetry.semconv.HttpAttributes +import io.opentelemetry.semconv.UrlAttributes +import io.opentelemetry.semconv.ServerAttributes +import io.opentelemetry.semconv.ErrorAttributes + +import zio._ +import zio.telemetry.opentelemetry.context.ContextStorage +import zio.telemetry.opentelemetry.tracing.Tracing +import zio.test._ +import zio.test.Assertion._ + +import sttp.tapir.ztapir.RIOMonadError +import zio.telemetry.opentelemetry.OpenTelemetry +import sttp.tapir.server.ziopentelemetry.ZIOpenTelemetryTracingConfig + +object ZIOpenTelemetryTracingSpec extends ZIOSpecDefault { + + implicit val bodyListener: BodyListener[Task, String] = new BodyListener[Task, String] { + override def onComplete(body: String)(cb: Try[Unit] => Task[Unit]): Task[String] = cb(Success(())).map(_ => body) + } + + implicit val ioErr: MonadError[Task] = new RIOMonadError + + val inMemoryTracer: UIO[(InMemorySpanExporter, Tracer)] = for { + spanExporter <- ZIO.succeed(InMemorySpanExporter.create()) + spanProcessor <- ZIO.succeed(SimpleSpanProcessor.create(spanExporter)) + tracerProvider <- ZIO.succeed(SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build()) + tracer = tracerProvider.get("TracingTest") + } yield (spanExporter, tracer) + + val inMemoryTracerLayer: ULayer[InMemorySpanExporter with Tracer] = + ZLayer.fromZIOEnvironment(inMemoryTracer.map { case (inMemorySpanExporter, tracer) => + ZEnvironment(inMemorySpanExporter).add(tracer) + }) + + def tracingMockLayer( + logAnnotated: Boolean = false + ): URLayer[ContextStorage, Tracing with InMemorySpanExporter with Tracer] = + inMemoryTracerLayer >>> (Tracing.live(logAnnotated) ++ inMemoryTracerLayer) + + /** Helper: create an interpreter with the given interceptor and endpoints, run a request, return finished spans. */ + private def runRequest( + endpoints: List[ServerEndpoint[Any, Task]], + request: ServerRequest, + config: ZIOpenTelemetryTracingConfig = ZIOpenTelemetryTracingConfig() + ): ZIO[Tracing with InMemorySpanExporter, Throwable, java.util.List[SpanData]] = + for { + tracing <- ZIO.service[Tracing] + exported <- ZIO.service[InMemorySpanExporter] + _ <- ZIO.succeed(exported.reset()) + interpreter = new ServerInterpreter[Any, Task, String, NoStreams]( + _ => endpoints, + ZIOTestRequestBody, + StringToResponseBody, + List(ZIOpenTelemetryTracing(tracing, config)), + _ => ZIO.succeed(()) + ) + _ <- interpreter(request) + spans = exported.getFinishedSpanItems() + } yield spans + + /** Helper: run request and return single span. */ + private def runRequestSingleSpan( + endpoints: List[ServerEndpoint[Any, Task]], + request: ServerRequest, + config: ZIOpenTelemetryTracingConfig = ZIOpenTelemetryTracingConfig() + ): ZIO[Tracing with InMemorySpanExporter, Throwable, SpanData] = + runRequest(endpoints, request, config).map(_.get(0)) + + // Tests are provided with layers at the suite level via .provide() + + // ─── Span Creation & Naming ──────────────────────────────────────────────── + + private val spanNamingSuite = suite("Span Creation & Naming")( + test("report a simple trace") { + val ep = endpoint + .in("person") + .in(query[String]("name")) + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/person?name=Adam") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getName)(equalTo("GET /person")) && + assert(span.getKind)(equalTo(SpanKind.SERVER)) && + assert(span.getAttributes.size())(equalTo(6)) + } + }, + test("use path template with path parameters as span name") { + val ep = endpoint + .in("person" / path[String]("name") / path[String]("surname") / "info") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/person/Adam/Smith/info") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getName)(equalTo("GET /person/{name}/{surname}/info")) && + assert(span.getAttributes.get(HttpAttributes.HTTP_ROUTE))(equalTo("/person/{name}/{surname}/info")) + } + }, + test("use POST method in span name") { + val ep = endpoint.post + .in("person") + .in(query[String]("name")) + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("created"))) + + val request = serverRequestFromUri(uri"http://example.com/person?name=Adam", _method = Method.POST) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getName)(equalTo("POST /person")) && + assert(span.getAttributes.get(HttpAttributes.HTTP_REQUEST_METHOD))(equalTo("POST")) + } + }, + test("use complex path template with mixed segments") { + val ep = endpoint + .in("api" / "v2" / "users" / path[Int]("id") / "profile") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("profile"))) + + val request = serverRequestFromUri(uri"http://example.com/api/v2/users/123/profile") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getName)(equalTo("GET /api/v2/users/{id}/profile")) + } + } + ) + + // ─── Request Attributes ──────────────────────────────────────────────────── + + private val requestAttributesSuite = suite("Request Attributes")( + test("default request attributes include method, path, scheme, host") { + val ep = endpoint + .in("person") + .in(query[String]("name")) + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/person?name=Adam", + _headers = List(Header(HeaderNames.Host, "example.com")) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + val attrs = span.getAttributes + assert(attrs.get(HttpAttributes.HTTP_REQUEST_METHOD))(equalTo("GET")) && + assert(attrs.get(UrlAttributes.URL_PATH))(equalTo("/person")) && + assert(attrs.get(UrlAttributes.URL_SCHEME))(equalTo("http")) && + assert(attrs.get(ServerAttributes.SERVER_ADDRESS))(equalTo("example.com")) + } + }, + test("extract host from Forwarded header") { + val ep = endpoint + .in("person") + .in(query[String]("name")) + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/person?name=Adam", + _headers = List(Header(HeaderNames.Forwarded, Forwarded(None, None, Some("softwaremill.com"), None).toString)) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(ServerAttributes.SERVER_ADDRESS))(equalTo("softwaremill.com")) + } + }, + test("extract host from X-Forwarded-Host header") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/hello", + _headers = List(Header(HeaderNames.XForwardedHost, "proxy.example.com")) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(ServerAttributes.SERVER_ADDRESS))(equalTo("proxy.example.com")) + } + }, + test("extract host from :authority pseudo-header") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/hello", + _headers = List(Header(":authority", "authority.example.com")) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(ServerAttributes.SERVER_ADDRESS))(equalTo("authority.example.com")) + } + }, + test("extract host from Host header") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/hello", + _headers = List(Header(HeaderNames.Host, "host.example.com")) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(ServerAttributes.SERVER_ADDRESS))(equalTo("host.example.com")) + } + }, + test("fallback to 'unknown' when no host headers present") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + // serverRequestFromUri with empty headers and a URI without explicit Host header + val request = serverRequestFromUri(uri"http://example.com/hello") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + // When there's no Host/Forwarded/X-Forwarded-Host/:authority header, defaults to "unknown" + assert(span.getAttributes.get(ServerAttributes.SERVER_ADDRESS))(equalTo("unknown")) + } + }, + test("detect HTTPS scheme") { + val ep = endpoint + .in("secure") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("secure"))) + + val request = serverRequestFromUri(uri"https://example.com/secure") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(UrlAttributes.URL_SCHEME))(equalTo("https")) + } + } + ) + + // ─── Response Attributes ─────────────────────────────────────────────────── + + private val responseAttributesSuite = suite("Response Attributes")( + test("200 OK response sets status code attribute") { + val ep = endpoint + .in("person") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/person") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(HttpAttributes.HTTP_RESPONSE_STATUS_CODE))(equalTo(java.lang.Long.valueOf(200L))) + } + }, + test("404 error response sets status code attribute") { + val ep = endpoint + .in("person") + .out(stringBody) + .errorOut(statusCode(sttp.model.StatusCode.NotFound)) + .serverLogic[Task](_ => ZIO.succeed(Left(()))) + + val request = serverRequestFromUri(uri"http://example.com/person") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getAttributes.get(HttpAttributes.HTTP_RESPONSE_STATUS_CODE))(equalTo(java.lang.Long.valueOf(404L))) + } + } + ) + + // ─── Error Handling ──────────────────────────────────────────────────────── + + private val errorHandlingSuite = suite("Error Handling")( + test("5xx server error sets span error status") { + val ep = endpoint + .in("fail") + .out(stringBody) + .errorOut(statusCode(sttp.model.StatusCode.InternalServerError)) + .serverLogic[Task](_ => ZIO.succeed(Left(()))) + + val request = serverRequestFromUri(uri"http://example.com/fail") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getStatus.getStatusCode)(equalTo(OtelStatusCode.ERROR)) && + assert(span.getAttributes.get(ErrorAttributes.ERROR_TYPE))(equalTo("500")) + } + }, + test("exception in endpoint logic sets span error status") { + val ep = endpoint + .in("crash") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.fail(new RuntimeException("boom"))) + + val request = serverRequestFromUri(uri"http://example.com/crash") + for { + _ <- runRequest(List(ep), request).either + exported <- ZIO.service[InMemorySpanExporter] + finishedSpans = exported.getFinishedSpanItems() + } yield { + // The span should be recorded even if there was an exception + assertTrue(finishedSpans.size() >= 1) && + assert(finishedSpans.get(0).getStatus.getStatusCode)(equalTo(OtelStatusCode.ERROR)) && + assert(finishedSpans.get(0).getAttributes.get(ErrorAttributes.ERROR_TYPE))(equalTo("RuntimeException")) + } + }, + test("4xx client error does NOT set span error status") { + val ep = endpoint + .in("client-error") + .out(stringBody) + .errorOut(statusCode(sttp.model.StatusCode.BadRequest)) + .serverLogic[Task](_ => ZIO.succeed(Left(()))) + + val request = serverRequestFromUri(uri"http://example.com/client-error") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + // 4xx is not a server error, so span status should NOT be ERROR + assert(span.getStatus.getStatusCode)(not(equalTo(OtelStatusCode.ERROR))) && + assert(span.getAttributes.get(HttpAttributes.HTTP_RESPONSE_STATUS_CODE))(equalTo(java.lang.Long.valueOf(400L))) + } + } + ) + + // ─── Context Propagation ─────────────────────────────────────────────────── + + private val contextPropagationSuite = suite("Context Propagation")( + test("extract trace context from traceparent header") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/hello", + _headers = List(Header("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + assert(span.getSpanContext.getTraceId)(equalTo("4bf92f3577b34da6a3ce929d0e0e4736")) && + assert(span.getParentSpanContext.getSpanId)(equalTo("00f067aa0ba902b7")) && + assertTrue(span.getParentSpanContext.isRemote) + } + }, + test("create fresh trace when no traceparent header is present") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/hello") + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + // Should have a valid trace ID but no valid parent span context + assertTrue(span.getSpanContext.isValid) && + assertTrue(!span.getParentSpanContext.isValid) + } + }, + test("malformed traceparent starts a new trace gracefully") { + val ep = endpoint + .in("hello") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri( + uri"http://example.com/hello", + _headers = List(Header("traceparent", "invalid-traceparent-value")) + ) + for { + span <- runRequestSingleSpan(List(ep), request) + } yield { + // Should gracefully start a new trace + assertTrue(span.getSpanContext.isValid) && + assertTrue(!span.getParentSpanContext.isValid) + } + } + ) + + // ─── Endpoint Matching Behavior ──────────────────────────────────────────── + + private val endpointMatchingSuite = suite("Endpoint Matching Behavior")( + test("unmatched request does not set error status") { + val ep = endpoint + .in("person") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + // Request to a path that doesn't match any endpoint + val request = serverRequestFromUri(uri"http://example.com/unknown-path") + for { + spans <- runRequest(List(ep), request) + } yield { + assertTrue(spans.size() == 1) && + assert(spans.get(0).getStatus.getStatusCode)(not(equalTo(OtelStatusCode.ERROR))) + } + }, + test("decode failure on matched endpoint - span is created with initial name") { + val ep = endpoint + .in("person") + .in(query[Int]("age")) // expects Int, will fail with String + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + // Send "notanint" for the Int query parameter - decode will fail + val request = serverRequestFromUri(uri"http://example.com/person?age=notanint") + for { + spans <- runRequest(List(ep), request) + } yield { + // A span is still created even when decode fails; + // without a DecodeFailureHandler interceptor producing a response, + // the endpoint is not confirmed, so span retains the initial request-based name. + assertTrue(spans.size() >= 1) && + assertTrue(spans.get(0).getKind == SpanKind.SERVER) && + assert(spans.get(0).getStatus.getStatusCode)(not(equalTo(OtelStatusCode.ERROR))) + } + }, + test("security failure on matched endpoint updates span name") { + val ep = endpoint + .securityIn("secure") + .securityIn(header[String]("X-Auth-Token")) + .in("data") + .out(stringBody) + .errorOut(stringBody) + .serverSecurityLogic[String, Task](token => + if (token == "valid") ZIO.succeed(Right("principal")) + else ZIO.succeed(Left("unauthorized")) + ) + .serverLogic(_ => _ => ZIO.succeed(Right("data"))) + + val request = serverRequestFromUri( + uri"http://example.com/secure/data", + _headers = List(Header("X-Auth-Token", "invalid")) + ) + for { + spans <- runRequest(List(ep), request) + } yield { + assertTrue(spans.size() >= 1) && + assert(spans.get(0).getName)(equalTo("GET /secure/data")) + } + } + ) + + // ─── Configuration Customization ────────────────────────────────────────── + + private val configCustomizationSuite = suite("Configuration Customization")( + test("custom spanNameFromEndpointAndAttributes overrides default naming") { + val customConfig = ZIOpenTelemetryTracingConfig( + spanNameFromEndpointAndAttributes = (request, _) => (s"custom-${request.method.method}", Attributes.empty()) + ) + + val ep = endpoint + .in("person") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/person") + for { + span <- runRequestSingleSpan(List(ep), request, customConfig) + } yield { + assert(span.getName)(equalTo("custom-GET")) + } + }, + test("custom requestAttributes adds custom attributes to span") { + val customConfig = ZIOpenTelemetryTracingConfig( + requestAttributes = request => + Attributes.builder() + .put(HttpAttributes.HTTP_REQUEST_METHOD, request.method.method) + .put("custom.attribute", "custom-value") + .build() + ) + + val ep = endpoint + .in("person") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/person") + for { + span <- runRequestSingleSpan(List(ep), request, customConfig) + } yield { + import io.opentelemetry.api.common.AttributeKey + val customAttr = span.getAttributes.get(AttributeKey.stringKey("custom.attribute")) + assert(customAttr)(equalTo("custom-value")) + } + }, + test("custom responseAttributes adds extra response attributes") { + import io.opentelemetry.api.common.AttributeKey + val customConfig = ZIOpenTelemetryTracingConfig( + responseAttributes = (_, response) => + Attributes.of( + HttpAttributes.HTTP_RESPONSE_STATUS_CODE, java.lang.Long.valueOf(response.code.code.toLong), + AttributeKey.stringKey("custom.response"), "response-value" + ) + ) + + val ep = endpoint + .in("person") + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](_ => ZIO.succeed(Right("hello"))) + + val request = serverRequestFromUri(uri"http://example.com/person") + for { + span <- runRequestSingleSpan(List(ep), request, customConfig) + } yield { + import io.opentelemetry.api.common.AttributeKey + val customAttr = span.getAttributes.get(AttributeKey.stringKey("custom.response")) + assert(customAttr)(equalTo("response-value")) + } + }, + test("custom errorAttributes for server error") { + val customConfig = ZIOpenTelemetryTracingConfig( + errorAttributes = { + case Left(statusCode) => + Attributes.builder() + .put(ErrorAttributes.ERROR_TYPE, s"custom-${statusCode.code}") + .build() + case Right(exception) => + Attributes.builder() + .put(ErrorAttributes.ERROR_TYPE, s"custom-${exception.getClass.getSimpleName}") + .build() + } + ) + + val ep = endpoint + .in("fail") + .out(stringBody) + .errorOut(statusCode(sttp.model.StatusCode.InternalServerError)) + .serverLogic[Task](_ => ZIO.succeed(Left(()))) + + val request = serverRequestFromUri(uri"http://example.com/fail") + for { + span <- runRequestSingleSpan(List(ep), request, customConfig) + } yield { + assert(span.getAttributes.get(ErrorAttributes.ERROR_TYPE))(equalTo("custom-500")) + } + } + ) + + // ─── Concurrency ────────────────────────────────────────────────────────── + + private val concurrencySuite = suite("Concurrency")( + test("concurrent requests produce isolated spans with distinct trace IDs") { + val ep = endpoint + .in("person") + .in(query[String]("name")) + .out(stringBody) + .errorOut(stringBody) + .serverLogic[Task](name => ZIO.succeed(Right(s"hello $name"))) + + for { + tracing <- ZIO.service[Tracing] + exported <- ZIO.service[InMemorySpanExporter] + _ <- ZIO.succeed(exported.reset()) + interpreter = new ServerInterpreter[Any, Task, String, NoStreams]( + _ => List(ep), + ZIOTestRequestBody, + StringToResponseBody, + List(ZIOpenTelemetryTracing(tracing)), + _ => ZIO.succeed(()) + ) + names = (1 to 20).map(i => s"user$i").toList + _ <- ZIO.foreachPar(names) { name => + val request = serverRequestFromUri(Uri.unsafeParse(s"http://example.com/person?name=$name")) + interpreter(request) + } + spans = exported.getFinishedSpanItems().asScala.toList + traceIds = spans.map(_.getSpanContext.getTraceId).toSet + } yield { + // Each request should produce its own span + assertTrue(spans.size == 20) && + // All trace IDs should be distinct (each is an independent trace) + assertTrue(traceIds.size == 20) && + // All spans should be SERVER spans + assertTrue(spans.forall(_.getKind == SpanKind.SERVER)) && + // All spans should have the matched endpoint name + assertTrue(spans.forall(_.getName == "GET /person")) + } + } + ) + + // ─── Main Spec ───────────────────────────────────────────────────────────── + + def spec: Spec[Any, Throwable] = + suite("zio opentelemetry tapir interceptor")( + spanNamingSuite, + requestAttributesSuite, + responseAttributesSuite, + errorHandlingSuite, + contextPropagationSuite, + endpointMatchingSuite, + configCustomizationSuite, + concurrencySuite + ).provide( + OpenTelemetry.contextZIO, + tracingMockLayer(false) + ) @@ TestAspect.sequential +} + +object ZIOTestRequestBody extends RequestBody[Task, NoStreams] { + override def toRaw[R](serverRequest: ServerRequest, bodyType: RawBodyType[R], maxBytes: Option[Long]): Task[RawValue[R]] = ??? + override val streams: Streams[NoStreams] = NoStreams + override def toStream(serverRequest: ServerRequest, maxBytes: Option[Long]): streams.BinaryStream = ??? +} diff --git a/project/Versions.scala b/project/Versions.scala index d061fd7916..4e937a3e46 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -43,6 +43,8 @@ object Versions { val zioInteropCats = "23.1.0.13" val zioInteropReactiveStreams = "2.0.2" val zioJson = "0.7.44" + val zioLogging = "2.5.2" + val zioOpenTelemetry = "3.1.15" val playClient = "3.0.12" val playServer = "3.0.10" val play29Client = "2.2.16" @@ -64,7 +66,7 @@ object Versions { val decline = "2.6.2" val quicklens = "1.9.12" val openTelemetry = "1.62.0" - val openTelemetrySemconvVersion = "1.41.1" + val openTelemetrySemconvVersion = "1.41.0" val mockServer = "5.15.0" val dogstatsdClient = "4.4.5" val nettyAll = "4.2.14.Final"