Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package at.ac.uibk.dps.cirrina.execution.`object`
import at.ac.uibk.dps.cirrina.csm.Csml.EventChannel
import at.ac.uibk.dps.cirrina.execution.service.ServiceImplementationSelector
import com.codahale.metrics.MetricRegistry
import kotlin.time.measureTime
import kotlin.time.toJavaDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KotlinLogging
Expand Down Expand Up @@ -45,18 +47,21 @@ class ActionExecutor(
val input = action.input.map { it.evaluate(scope.extent) }

coroutineScope.launch {
runCatching { service.invoke(input) }
.onSuccess { output ->
action.emits.forEach { eventTemplate ->
val emittedEvent = eventTemplate.copy(data = output)
if (emittedEvent.channel == EventChannel.INTERNAL) {
eventHandler.propagateToParent(emittedEvent)
} else {
eventHandler.emit(emittedEvent)
val delta = measureTime {
runCatching { service.invoke(input) }
.onSuccess { output ->
action.emits.forEach { eventTemplate ->
val emittedEvent = eventTemplate.copy(data = output)
if (emittedEvent.channel == EventChannel.INTERNAL) {
eventHandler.propagateToParent(emittedEvent)
} else {
eventHandler.emit(emittedEvent)
}
}
}
}
.onFailure { logger.error(it) { "service invocation failed" } }
.onFailure { logger.error(it) { "service invocation failed" } }
}
metricRegistry.timer("invoke.time").update(delta.toJavaDuration())
}

return emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import dagger.assisted.AssistedInject
import java.util.concurrent.TimeUnit
import kotlin.properties.Delegates
import kotlin.time.Clock
import kotlin.time.measureTime
import kotlin.time.toJavaDuration
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.onFailure
Expand Down Expand Up @@ -100,6 +102,8 @@ internal constructor(

private val eventTimer: Timer = runtime.metricRegistry.timer("event.latency")

private val processEventTimer: Timer = runtime.metricRegistry.timer("processEvent.time")

init {
val instanceData = Context.from(instance.data).getAll()
val transientContext = Context.from(specification.transient)
Expand Down Expand Up @@ -154,19 +158,26 @@ internal constructor(
}

private fun processEvent(event: Event) {
if (isTerminated()) return
val delta = measureTime {
if (isTerminated()) return

if (
(event.channel == EventChannel.EXTERNAL || event.channel == EventChannel.PERIPHERAL) &&
event.source != name
) {
val now = Clock.System.now()
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
val deltaNanos = (nowNanos - event.emittedTime).coerceAtLeast(0L)

eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
}

if (event.channel == EventChannel.EXTERNAL && event.source != name) {
val now = Clock.System.now()
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
val deltaNanos = (nowNanos - event.emittedTime).coerceAtLeast(0L)
handleEvent(event)?.let { transition -> step(transition) }

eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
if (event.channel == EventChannel.INTERNAL) stateMachineEventHandler.propagateToNested(event)
}

handleEvent(event)?.let { transition -> step(transition) }

if (event.channel == EventChannel.INTERNAL) stateMachineEventHandler.propagateToNested(event)
processEventTimer.update(delta.toJavaDuration())
}

private fun handleEvent(event: Event): ActiveTransition? {
Expand Down
Loading