From 0f0b5b6721d6b12bb82ae84ba23a8b614de6fa25 Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 20 Mar 2026 12:09:27 +0100 Subject: [PATCH 1/4] Adding pipeline telemetry observable --- .../LightTransitionNodeExtensions.cs | 3 +- .../Nodes/LightTransitionNode.cs | 12 ++ .../Nodes/ServiceScopedNode.cs | 12 +- .../Pipeline/LightPipelineFactory.cs | 22 +-- .../Pipeline/ServiceScopedPipeline.cs | 4 + src/CodeCasa.AutomationPipelines/IPipeline.cs | 4 + .../IPipelineNode.cs | 7 +- src/CodeCasa.AutomationPipelines/Pipeline.cs | 160 +++++++----------- .../PipelineLogger.cs | 31 ++++ .../PipelineNode.cs | 23 ++- .../PipelineTelemetry.cs | 11 ++ .../ServiceProviderPipeline.cs | 29 ---- .../Generated/BinarySensorAttributes.cs | 4 +- .../Generated/NumericSensorAttributes.cs | 4 +- 14 files changed, 163 insertions(+), 163 deletions(-) create mode 100644 src/CodeCasa.AutomationPipelines/PipelineLogger.cs create mode 100644 src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs diff --git a/src/CodeCasa.AutomationPipelines.Lights/Extensions/LightTransitionNodeExtensions.cs b/src/CodeCasa.AutomationPipelines.Lights/Extensions/LightTransitionNodeExtensions.cs index 86a67c8..9b3a240 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/Extensions/LightTransitionNodeExtensions.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/Extensions/LightTransitionNodeExtensions.cs @@ -1,5 +1,4 @@ -using System.Reactive; -using System.Reactive.Concurrency; +using System.Reactive.Concurrency; using System.Reactive.Linq; using CodeCasa.AutomationPipelines.Lights.Nodes; using CodeCasa.Lights; diff --git a/src/CodeCasa.AutomationPipelines.Lights/Nodes/LightTransitionNode.cs b/src/CodeCasa.AutomationPipelines.Lights/Nodes/LightTransitionNode.cs index 71492fe..9aaedd0 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/Nodes/LightTransitionNode.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/Nodes/LightTransitionNode.cs @@ -166,5 +166,17 @@ private void SetOutputInternal(LightTransition? output) /// public override string ToString() => GetType().Name; + + /// + public ValueTask DisposeAsync() + { + if (_newOutputSubject.IsDisposed) + { + return ValueTask.CompletedTask; + } + _newOutputSubject.OnCompleted(); + _newOutputSubject.Dispose(); + return ValueTask.CompletedTask; + } } } diff --git a/src/CodeCasa.AutomationPipelines.Lights/Nodes/ServiceScopedNode.cs b/src/CodeCasa.AutomationPipelines.Lights/Nodes/ServiceScopedNode.cs index 1d982ba..6f89aa6 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/Nodes/ServiceScopedNode.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/Nodes/ServiceScopedNode.cs @@ -6,12 +6,6 @@ namespace CodeCasa.AutomationPipelines.Lights.Nodes internal class ServiceScopedNode(IServiceScope serviceScope, IPipelineNode innerNode) : IPipelineNode, IAsyncDisposable { - public async ValueTask DisposeAsync() - { - await serviceScope.DisposeOrDisposeAsync(); - await innerNode.DisposeOrDisposeAsync(); - } - public TState? Input { get => innerNode.Input; @@ -22,5 +16,11 @@ public TState? Input public IObservable OnNewOutput => innerNode.OnNewOutput; public override string? ToString() => $"{innerNode} (scoped)"; + + public async ValueTask DisposeAsync() + { + await serviceScope.DisposeOrDisposeAsync(); + await innerNode.DisposeOrDisposeAsync(); + } } } diff --git a/src/CodeCasa.AutomationPipelines.Lights/Pipeline/LightPipelineFactory.cs b/src/CodeCasa.AutomationPipelines.Lights/Pipeline/LightPipelineFactory.cs index 0d63861..ce29a50 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/Pipeline/LightPipelineFactory.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/Pipeline/LightPipelineFactory.cs @@ -38,7 +38,7 @@ public IAsyncDisposable SetupLightPipeline(TLight light, /// The light to create a pipeline for. /// An action to configure the pipeline behavior. /// A configured pipeline for controlling the specified light. - internal IPipeline CreateLightPipeline(TLight light, Action> pipelineBuilder) where TLight : ILight + public IPipeline CreateLightPipeline(TLight light, Action> pipelineBuilder) where TLight : ILight { return CreateLightPipelines([light], pipelineBuilder)[light.Id]; } @@ -79,22 +79,14 @@ internal Dictionary> CreateLightPipelines kvp.Key, kvp => { var conf = kvp.Value; - IPipeline pipeline; + IPipeline pipeline = new Pipeline( + LightTransition.Off(), + conf.Nodes, + conf.Light.ApplyTransition); if (conf.LoggingEnabled ?? false) { - pipeline = new Pipeline( - $"[{conf.Light.Id}] {conf.LogName}", - LightTransition.Off(), - conf.Nodes, - conf.Light.ApplyTransition, - logger); - } - else - { - pipeline = new Pipeline( - LightTransition.Off(), - conf.Nodes, - conf.Light.ApplyTransition); + var pipelineLogger = new PipelineLogger(logger, $"[{conf.Light.Id}] {conf.LogName}"); + pipeline.Telemetry.Subscribe(t => pipelineLogger.Log(t)); } return (IPipeline)new ServiceScopedPipeline(lightContextScopes[kvp.Key], pipeline); diff --git a/src/CodeCasa.AutomationPipelines.Lights/Pipeline/ServiceScopedPipeline.cs b/src/CodeCasa.AutomationPipelines.Lights/Pipeline/ServiceScopedPipeline.cs index 9b4f227..7c2b0a2 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/Pipeline/ServiceScopedPipeline.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/Pipeline/ServiceScopedPipeline.cs @@ -54,4 +54,8 @@ public IPipeline SetOutputHandler(Action action, bool callActionDi _instance.SetOutputHandler(action, callActionDistinct); return this; } + + public IReadOnlyCollection> Nodes => _instance.Nodes; + + public IObservable> Telemetry => _instance.Telemetry; } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines/IPipeline.cs b/src/CodeCasa.AutomationPipelines/IPipeline.cs index 3e2f28a..282e490 100644 --- a/src/CodeCasa.AutomationPipelines/IPipeline.cs +++ b/src/CodeCasa.AutomationPipelines/IPipeline.cs @@ -25,4 +25,8 @@ public interface IPipeline : IPipelineNode, IAsyncDisposable /// This method can be called at any time during the creation of the pipeline and will be called immediately if the pipeline has already produced an output. /// IPipeline SetOutputHandler(Action action, bool callActionDistinct = true); + + IReadOnlyCollection> Nodes { get; } + + IObservable> Telemetry { get; } } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines/IPipelineNode.cs b/src/CodeCasa.AutomationPipelines/IPipelineNode.cs index 24c4ec2..b4dcef9 100644 --- a/src/CodeCasa.AutomationPipelines/IPipelineNode.cs +++ b/src/CodeCasa.AutomationPipelines/IPipelineNode.cs @@ -1,9 +1,11 @@ -namespace CodeCasa.AutomationPipelines; +using System.Text.Json.Serialization; + +namespace CodeCasa.AutomationPipelines; /// /// Represents a node in a pipeline. /// -public interface IPipelineNode +public interface IPipelineNode : IAsyncDisposable { /// /// Sets the input state of the node. This will trigger the processing of the input. @@ -18,5 +20,6 @@ public interface IPipelineNode /// /// Notifies when a new output is produced by the node. /// + [JsonIgnore] IObservable OnNewOutput { get; } } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines/Pipeline.cs b/src/CodeCasa.AutomationPipelines/Pipeline.cs index 362eadd..04a9cb3 100644 --- a/src/CodeCasa.AutomationPipelines/Pipeline.cs +++ b/src/CodeCasa.AutomationPipelines/Pipeline.cs @@ -1,4 +1,6 @@ -using Microsoft.Extensions.Logging; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Xml.Linq; namespace CodeCasa.AutomationPipelines; @@ -9,7 +11,7 @@ public class Pipeline : PipelineNode, IPipeline { private readonly Lock _lock = new(); private readonly List> _nodes = new(); - private readonly ILogger>? _logger; + private readonly Subject> _telemetrySubject = new(); private bool _callActionDistinct = true; private Action? _action; @@ -27,22 +29,6 @@ public Pipeline() /// Initializes a new pipeline with the specified nodes. /// public Pipeline(IEnumerable> nodes) - : this(null, nodes, null!) - { - } - - /// - /// Initializes a new pipeline with the specified default state, nodes, and output handler. - /// - public Pipeline(TState defaultState, IEnumerable> nodes, Action outputHandlerAction) - : this(null, defaultState, nodes, outputHandlerAction, null!) - { - } - - /// - /// Initializes a new pipeline with the specified nodes. - /// - public Pipeline(params IPipelineNode[] nodes) { foreach (var node in nodes) { @@ -51,9 +37,9 @@ public Pipeline(params IPipelineNode[] nodes) } /// - /// Initializes a new pipeline with the specified default state and nodes. + /// Initializes a new pipeline with the specified default state, nodes, and output handler. /// - public Pipeline(TState defaultState, params IPipelineNode[] nodes) + public Pipeline(TState defaultState, IEnumerable> nodes, Action outputHandlerAction) { foreach (var node in nodes) { @@ -61,24 +47,14 @@ public Pipeline(TState defaultState, params IPipelineNode[] nodes) } SetDefault(defaultState); - } - - /// - /// Initializes a new, empty pipeline with an optional name and logger. - /// - public Pipeline(string? name, ILogger> logger) - { - Name = name; - _logger = logger; + SetOutputHandler(outputHandlerAction); } /// - /// Initializes a new pipeline with the specified nodes and an optional name and logger. + /// Initializes a new pipeline with the specified nodes. /// - public Pipeline(string? name, IEnumerable> nodes, ILogger> logger) + public Pipeline(params IPipelineNode[] nodes) { - Name = name; - _logger = logger; foreach (var node in nodes) { RegisterNode(node); @@ -86,27 +62,23 @@ public Pipeline(string? name, IEnumerable> nodes, ILogger< } /// - /// Initializes a new pipeline with the specified default state, nodes, output handler, and an optional name and logger. + /// Initializes a new pipeline with the specified default state and nodes. /// - public Pipeline(string? name, TState defaultState, IEnumerable> nodes, Action outputHandlerAction, ILogger> logger) + public Pipeline(TState defaultState, params IPipelineNode[] nodes) { - Name = name; - _logger = logger; foreach (var node in nodes) { RegisterNode(node); } SetDefault(defaultState); - SetOutputHandler(outputHandlerAction); } - /// - /// Name of the pipeline (used for logging). - /// - public string? Name { get; set; } + /// + public IReadOnlyCollection> Nodes => _nodes.AsReadOnly(); - private string LogPrefix => Name == null ? "" : $"{Name}: "; + /// + public IObservable> Telemetry => _telemetrySubject.AsObservable(); /// public IPipeline SetDefault(TState state) @@ -126,8 +98,6 @@ public virtual IPipeline RegisterNode() where TNode : IPipelineNo /// public IPipeline RegisterNode(IPipelineNode node) { - _logger?.LogTrace($"{LogPrefix}Registering [Node {_nodes.Count}] ({node})."); - _subscription?.Dispose(); // Dispose old subscription if any. if (_nodes.Any()) @@ -139,13 +109,24 @@ public IPipeline RegisterNode(IPipelineNode node) { lock (_lock) { - _logger?.LogTrace( - $"{LogPrefix}[Node {sourceIndex}] ({previousNode}) passed value [{output?.ToString() ?? "NULL"}] to [Node {destinationIndex}] ({node})."); + _telemetrySubject.OnNext(new PipelineTelemetry( + sourceIndex, + previousNode.ToString(), + destinationIndex, + node.ToString(), + previousNode.Output + )); node.Input = output; } }); - _logger?.LogTrace($"{LogPrefix}Passing [Node {sourceIndex}] ({previousNode}) value [{previousNode.Output?.ToString() ?? "NULL"}] to [Node {destinationIndex}] ({node})."); + _telemetrySubject.OnNext(new PipelineTelemetry( + sourceIndex, + previousNode.ToString(), + destinationIndex, + node.ToString(), + previousNode.Output + )); node.Input = previousNode.Output; } else @@ -164,14 +145,24 @@ public IPipeline RegisterNode(IPipelineNode node) { lock (_lock) { - _logger?.LogTrace( - $"{LogPrefix}[Node {nodeIndex}] ({node}) passed value [{o?.ToString() ?? "NULL"}] to pipeline output."); + _telemetrySubject.OnNext(new PipelineTelemetry( + nodeIndex, + node.ToString(), + null, null, + o + )); + SetOutputAndCallActionWhenApplicable(o); } }); var newOutput = node.Output; - _logger?.LogTrace($"{LogPrefix}[Node {_nodes.Count - 1}] ({node}) registered and passed value [{newOutput?.ToString() ?? "NULL"}] to pipeline output."); + _telemetrySubject.OnNext(new PipelineTelemetry( + _nodes.Count - 1, + node.ToString(), + null, null, + newOutput + )); SetOutputAndCallActionWhenApplicable(newOutput); return this; @@ -180,20 +171,11 @@ public IPipeline RegisterNode(IPipelineNode node) /// public IPipeline SetOutputHandler(Action action, bool callActionDistinct = true) { - _logger?.LogTrace(callActionDistinct - ? $"{LogPrefix}Setting output handler." - : $"{LogPrefix}Setting output handler. Action calls with duplicate values are allowed."); - _callActionDistinct = callActionDistinct; _action = action; if (Output != null) { _action(Output); - _logger?.LogDebug($"{LogPrefix}Action executed with current output [{Output?.ToString() ?? "NULL"}]."); - } - else - { - _logger?.LogTrace($"{LogPrefix}No output value to execute."); } return this; @@ -204,13 +186,20 @@ protected override void InputReceived(TState? state) { if (!_nodes.Any()) { - _logger?.LogTrace($"{LogPrefix}Input set to [{state?.ToString() ?? "NULL"}]. No nodes registered, passing to pipeline output immediately."); + _telemetrySubject.OnNext(new PipelineTelemetry( + null, null, null, null, + Input + )); SetOutputAndCallActionWhenApplicable(Input); return; } var firstNode = _nodes.First(); - _logger?.LogTrace($"{LogPrefix}Input set to [{state?.ToString() ?? "NULL"}]. Passing input to first [Node 0] ({firstNode})."); + _telemetrySubject.OnNext(new PipelineTelemetry( + null, null, + 0, firstNode.ToString(), + Input + )); firstNode.Input = Input; } @@ -219,59 +208,36 @@ private void SetOutputAndCallActionWhenApplicable(TState? output) var outputChanged = !EqualityComparer.Default.Equals(Output, output); Output = output; - if (_action == null) - { - _logger?.LogTrace($"{LogPrefix}No action set to execute."); - return; - } - if (output == null) + if (_action == null || output == null) { - _logger?.LogTrace($"{LogPrefix}No output value to execute."); return; } - if (_callActionDistinct && !outputChanged) { - _logger?.LogTrace($"{LogPrefix}No action executed as output has not changed."); return; } // Note that _action will be called AFTER OnNewOutput. _action.Invoke(output); - _logger?.LogDebug($"{LogPrefix}Action executed with output [{Output?.ToString() ?? "NULL"}]."); } /// - public virtual async ValueTask DisposeAsync() + public override async ValueTask DisposeAsync() { - if (_isDisposed) return; + if (_isDisposed) + { + return; + } _isDisposed = true; + await base.DisposeAsync(); + + _telemetrySubject.OnCompleted(); + _telemetrySubject.Dispose(); + foreach (var node in _nodes) { - switch (node) - { - case IAsyncDisposable asyncDisposable: - try - { - await asyncDisposable.DisposeAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - _logger?.LogError(ex, $"{LogPrefix}Exception when trying to dispose {node}."); - } - break; - case IDisposable disposable: - try - { - disposable.Dispose(); - } - catch (Exception ex) - { - _logger?.LogError(ex, $"{LogPrefix}Exception when trying to dispose {node}."); - } - break; - } + await node.DisposeAsync().ConfigureAwait(false); } } } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines/PipelineLogger.cs b/src/CodeCasa.AutomationPipelines/PipelineLogger.cs new file mode 100644 index 0000000..494b399 --- /dev/null +++ b/src/CodeCasa.AutomationPipelines/PipelineLogger.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.Logging; + +namespace CodeCasa.AutomationPipelines +{ + public class PipelineLogger(ILogger>? logger, string? name) + { + public void Log(PipelineTelemetry pipelineTelemetry) + { + if (pipelineTelemetry.SourceNodeIndex == null && pipelineTelemetry.DestinationNodeIndex == null) + { + logger?.LogTrace($"{LogPrefix}Input set to [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}]. No nodes registered, passing to pipeline output immediately."); + return; + } + + if (pipelineTelemetry.SourceNodeIndex == null) + { + logger?.LogTrace($"{LogPrefix}Input set to [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}]. Passing input to first [Node {pipelineTelemetry.DestinationNodeIndex}] ({pipelineTelemetry.DestinationNodeName})."); + return; + } + if (pipelineTelemetry.DestinationNodeIndex == null) + { + logger?.LogTrace( + $"{LogPrefix}[Node {pipelineTelemetry.SourceNodeIndex}] ({pipelineTelemetry.SourceNodeName}) passed value [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}] to pipeline output."); + return; + } + logger?.LogTrace($"{LogPrefix}Passing [Node {pipelineTelemetry.SourceNodeIndex}] ({pipelineTelemetry.SourceNodeName}) value [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}] to [Node {pipelineTelemetry.DestinationNodeIndex}] ({pipelineTelemetry.DestinationNodeName})."); + } + + private string LogPrefix => name == null ? "" : $"{name}: "; + } +} diff --git a/src/CodeCasa.AutomationPipelines/PipelineNode.cs b/src/CodeCasa.AutomationPipelines/PipelineNode.cs index 1ca5abd..59532e5 100644 --- a/src/CodeCasa.AutomationPipelines/PipelineNode.cs +++ b/src/CodeCasa.AutomationPipelines/PipelineNode.cs @@ -13,7 +13,6 @@ public abstract class PipelineNode : IPipelineNode private TState? _input; private TState? _output; private bool _passThroughNextInput; - private bool _passThrough; /// public IObservable OnNewOutput => _newOutputSubject.AsObservable(); @@ -69,19 +68,19 @@ protected set /// protected bool PassThrough { - get => _passThrough; + get; set { // Always reset _passThroughNextInput when PassThrough is explicitly called. _passThroughNextInput = false; - if (_passThrough == value) + if (field == value) { return; } - - _passThrough = value; - if (_passThrough) + + field = value; + if (field) { SetOutputInternal(_input); } @@ -120,4 +119,16 @@ private void SetOutputInternal(TState? output) /// public override string ToString() => GetType().Name; + + /// + public virtual ValueTask DisposeAsync() + { + if (_newOutputSubject.IsDisposed) + { + return ValueTask.CompletedTask; + } + _newOutputSubject.OnCompleted(); + _newOutputSubject.Dispose(); + return ValueTask.CompletedTask; + } } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs b/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs new file mode 100644 index 0000000..a0454b5 --- /dev/null +++ b/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs @@ -0,0 +1,11 @@ + +namespace CodeCasa.AutomationPipelines +{ + public record PipelineTelemetry( + int? SourceNodeIndex, + string? SourceNodeName, + int? DestinationNodeIndex, + string? DestinationNodeName, + TState? StateValue + ); +} diff --git a/src/CodeCasa.AutomationPipelines/ServiceProviderPipeline.cs b/src/CodeCasa.AutomationPipelines/ServiceProviderPipeline.cs index 220fe78..86801c2 100644 --- a/src/CodeCasa.AutomationPipelines/ServiceProviderPipeline.cs +++ b/src/CodeCasa.AutomationPipelines/ServiceProviderPipeline.cs @@ -1,5 +1,4 @@ using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; namespace CodeCasa.AutomationPipelines; @@ -56,34 +55,6 @@ public ServiceProviderPipeline(IServiceProvider serviceProvider, TState defaultS _serviceProvider = _serviceScope.ServiceProvider; } - /// - public ServiceProviderPipeline(IServiceProvider serviceProvider, string? name, ILogger> logger) : base(name, logger) - { - _serviceScope = serviceProvider.CreateScope(); - _serviceProvider = _serviceScope.ServiceProvider; - } - - /// - public ServiceProviderPipeline(IServiceProvider serviceProvider, string? name, IEnumerable> nodes, ILogger> logger) - : base(name, nodes, logger) - { - _serviceScope = serviceProvider.CreateScope(); - _serviceProvider = _serviceScope.ServiceProvider; - } - - /// - public ServiceProviderPipeline( - IServiceProvider serviceProvider, - string? name, - TState defaultState, - IEnumerable> nodes, - Action outputHandlerAction, ILogger> logger) - : base(name, defaultState, nodes, outputHandlerAction, logger) - { - _serviceScope = serviceProvider.CreateScope(); - _serviceProvider = _serviceScope.ServiceProvider; - } - /// /// Registers a new node in the pipeline. The node will be created using the service provider. /// diff --git a/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/BinarySensorAttributes.cs b/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/BinarySensorAttributes.cs index 04f35d2..6d8201b 100644 --- a/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/BinarySensorAttributes.cs +++ b/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/BinarySensorAttributes.cs @@ -1,6 +1,4 @@ -using System.Text.Json.Serialization; - -namespace CodeCasa.NetDaemon.Sensors.Composite.Generated; +namespace CodeCasa.NetDaemon.Sensors.Composite.Generated; internal partial record BinarySensorAttributes { diff --git a/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/NumericSensorAttributes.cs b/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/NumericSensorAttributes.cs index 4b30b4c..d906c55 100644 --- a/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/NumericSensorAttributes.cs +++ b/src/CodeCasa.NetDaemon.Sensors.Composite/Generated/NumericSensorAttributes.cs @@ -1,6 +1,4 @@ -using System.Text.Json.Serialization; - -namespace CodeCasa.NetDaemon.Sensors.Composite.Generated; +namespace CodeCasa.NetDaemon.Sensors.Composite.Generated; internal partial record NumericSensorAttributes { From d9f4aa63de59c54b689eb4e44ae06369bd14ae69 Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 20 Mar 2026 12:09:39 +0100 Subject: [PATCH 2/4] Removing json ignore. --- src/CodeCasa.AutomationPipelines/IPipelineNode.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/CodeCasa.AutomationPipelines/IPipelineNode.cs b/src/CodeCasa.AutomationPipelines/IPipelineNode.cs index b4dcef9..41ff993 100644 --- a/src/CodeCasa.AutomationPipelines/IPipelineNode.cs +++ b/src/CodeCasa.AutomationPipelines/IPipelineNode.cs @@ -20,6 +20,5 @@ public interface IPipelineNode : IAsyncDisposable /// /// Notifies when a new output is produced by the node. /// - [JsonIgnore] IObservable OnNewOutput { get; } } \ No newline at end of file From 069c39dbd05e4fe96d0a09cf5d4ea53db9179b2d Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 20 Mar 2026 12:20:09 +0100 Subject: [PATCH 3/4] Moved pipeline logger --- .../Pipeline}/PipelineLogger.cs | 4 ++-- src/CodeCasa.AutomationPipelines/IPipeline.cs | 6 ++++++ src/CodeCasa.AutomationPipelines/IPipelineNode.cs | 3 +-- src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs | 9 +++++++++ 4 files changed, 18 insertions(+), 4 deletions(-) rename src/{CodeCasa.AutomationPipelines => CodeCasa.AutomationPipelines.Lights/Pipeline}/PipelineLogger.cs (91%) diff --git a/src/CodeCasa.AutomationPipelines/PipelineLogger.cs b/src/CodeCasa.AutomationPipelines.Lights/Pipeline/PipelineLogger.cs similarity index 91% rename from src/CodeCasa.AutomationPipelines/PipelineLogger.cs rename to src/CodeCasa.AutomationPipelines.Lights/Pipeline/PipelineLogger.cs index 494b399..a5ee1d6 100644 --- a/src/CodeCasa.AutomationPipelines/PipelineLogger.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/Pipeline/PipelineLogger.cs @@ -1,8 +1,8 @@ using Microsoft.Extensions.Logging; -namespace CodeCasa.AutomationPipelines +namespace CodeCasa.AutomationPipelines.Lights.Pipeline { - public class PipelineLogger(ILogger>? logger, string? name) + internal class PipelineLogger(ILogger>? logger, string? name) { public void Log(PipelineTelemetry pipelineTelemetry) { diff --git a/src/CodeCasa.AutomationPipelines/IPipeline.cs b/src/CodeCasa.AutomationPipelines/IPipeline.cs index 282e490..37922f1 100644 --- a/src/CodeCasa.AutomationPipelines/IPipeline.cs +++ b/src/CodeCasa.AutomationPipelines/IPipeline.cs @@ -26,7 +26,13 @@ public interface IPipeline : IPipelineNode, IAsyncDisposable /// IPipeline SetOutputHandler(Action action, bool callActionDistinct = true); + /// + /// Gets the collection of nodes registered in the pipeline. + /// IReadOnlyCollection> Nodes { get; } + /// + /// Gets an observable stream of telemetry events that occur during pipeline execution. + /// IObservable> Telemetry { get; } } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines/IPipelineNode.cs b/src/CodeCasa.AutomationPipelines/IPipelineNode.cs index 41ff993..2fa2169 100644 --- a/src/CodeCasa.AutomationPipelines/IPipelineNode.cs +++ b/src/CodeCasa.AutomationPipelines/IPipelineNode.cs @@ -1,5 +1,4 @@ -using System.Text.Json.Serialization; - + namespace CodeCasa.AutomationPipelines; /// diff --git a/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs b/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs index a0454b5..99c9f27 100644 --- a/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs +++ b/src/CodeCasa.AutomationPipelines/PipelineTelemetry.cs @@ -1,6 +1,15 @@  namespace CodeCasa.AutomationPipelines { + /// + /// Represents telemetry data for a state transition within a pipeline. + /// + /// The type of state flowing through the pipeline. + /// The index of the source node, or null if the state originates from pipeline input. + /// The name of the source node, or null if the state originates from pipeline input. + /// The index of the destination node, or null if the state is being passed to pipeline output. + /// The name of the destination node, or null if the state is being passed to pipeline output. + /// The state value being transitioned. public record PipelineTelemetry( int? SourceNodeIndex, string? SourceNodeName, From 7d3b6636a1745609373b8050bce63c58d806077b Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 20 Mar 2026 12:25:35 +0100 Subject: [PATCH 4/4] Fixed unit test --- .../ReactiveNodeTests.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/CodeCasa.AutomationPipelines.Lights.Tests/ReactiveNodeTests.cs b/tests/CodeCasa.AutomationPipelines.Lights.Tests/ReactiveNodeTests.cs index e1ef6ac..cac5d1d 100644 --- a/tests/CodeCasa.AutomationPipelines.Lights.Tests/ReactiveNodeTests.cs +++ b/tests/CodeCasa.AutomationPipelines.Lights.Tests/ReactiveNodeTests.cs @@ -231,6 +231,12 @@ public void Dispose() { _tracker.Disposed(Id); } + + public override ValueTask DisposeAsync() + { + _tracker.Disposed(Id); + return base.DisposeAsync(); + } } public class ContextAwarePipelineNode : PipelineNode