From 983ac0e373f3605ad767c0796499d35ab5392b11 Mon Sep 17 00:00:00 2001 From: AlexZangerle Date: Tue, 7 Apr 2026 10:57:51 +0200 Subject: [PATCH] feat: adding invoke and event processing timers --- .../cirrina/execution/object/ActionCommand.kt | 25 +++++++++------- .../cirrina/execution/object/StateMachine.kt | 29 +++++++++++++------ 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/ActionCommand.kt b/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/ActionCommand.kt index d02f2c3c..0a115cff 100644 --- a/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/ActionCommand.kt +++ b/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/ActionCommand.kt @@ -3,6 +3,8 @@ package at.ac.uibk.dps.cirrina.execution.`object` import at.ac.uibk.dps.cirrina.csm.Csml.EventChannel import at.ac.uibk.dps.cirrina.execution.service.ServiceImplementationSelector import com.codahale.metrics.MetricRegistry +import kotlin.time.measureTime +import kotlin.time.toJavaDuration import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import mu.KotlinLogging @@ -45,18 +47,21 @@ class ActionExecutor( val input = action.input.map { it.evaluate(scope.extent) } coroutineScope.launch { - runCatching { service.invoke(input) } - .onSuccess { output -> - action.emits.forEach { eventTemplate -> - val emittedEvent = eventTemplate.copy(data = output) - if (emittedEvent.channel == EventChannel.INTERNAL) { - eventHandler.propagateToParent(emittedEvent) - } else { - eventHandler.emit(emittedEvent) + val delta = measureTime { + runCatching { service.invoke(input) } + .onSuccess { output -> + action.emits.forEach { eventTemplate -> + val emittedEvent = eventTemplate.copy(data = output) + if (emittedEvent.channel == EventChannel.INTERNAL) { + eventHandler.propagateToParent(emittedEvent) + } else { + eventHandler.emit(emittedEvent) + } } } - } - .onFailure { logger.error(it) { "service invocation failed" } } + .onFailure { logger.error(it) { "service invocation failed" } } + } + metricRegistry.timer("invoke.time").update(delta.toJavaDuration()) } return emptyList() diff --git a/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/StateMachine.kt b/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/StateMachine.kt index bb5b438d..280a2296 100644 --- a/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/StateMachine.kt +++ b/src/main/kotlin/at/ac/uibk/dps/cirrina/execution/object/StateMachine.kt @@ -13,6 +13,8 @@ import dagger.assisted.AssistedInject import java.util.concurrent.TimeUnit import kotlin.properties.Delegates import kotlin.time.Clock +import kotlin.time.measureTime +import kotlin.time.toJavaDuration import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.onFailure @@ -100,6 +102,8 @@ internal constructor( private val eventTimer: Timer = runtime.metricRegistry.timer("event.latency") + private val processEventTimer: Timer = runtime.metricRegistry.timer("processEvent.time") + init { val instanceData = Context.from(instance.data).getAll() val transientContext = Context.from(specification.transient) @@ -154,19 +158,26 @@ internal constructor( } private fun processEvent(event: Event) { - if (isTerminated()) return + val delta = measureTime { + if (isTerminated()) return + + if ( + (event.channel == EventChannel.EXTERNAL || event.channel == EventChannel.PERIPHERAL) && + event.source != name + ) { + val now = Clock.System.now() + val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond + val deltaNanos = (nowNanos - event.emittedTime).coerceAtLeast(0L) + + eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS) + } - if (event.channel == EventChannel.EXTERNAL && event.source != name) { - val now = Clock.System.now() - val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond - val deltaNanos = (nowNanos - event.emittedTime).coerceAtLeast(0L) + handleEvent(event)?.let { transition -> step(transition) } - eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS) + if (event.channel == EventChannel.INTERNAL) stateMachineEventHandler.propagateToNested(event) } - handleEvent(event)?.let { transition -> step(transition) } - - if (event.channel == EventChannel.INTERNAL) stateMachineEventHandler.propagateToNested(event) + processEventTimer.update(delta.toJavaDuration()) } private fun handleEvent(event: Event): ActiveTransition? {