diff --git a/.gitignore b/.gitignore index 84cd19c28e..30d1ee50a4 100644 --- a/.gitignore +++ b/.gitignore @@ -66,3 +66,6 @@ tmp/ # Output directory out/ +data/ +logs/ +storage/ \ No newline at end of file diff --git a/cmd/start.go b/cmd/start.go index 691d666e2f..cb8a015ca5 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -3,6 +3,8 @@ package cmd import ( "context" "fmt" + "time" + "github.com/keep-network/keep-core/pkg/tbtcpg" "github.com/keep-network/keep-common/pkg/persistence" @@ -87,6 +89,23 @@ func start(cmd *cobra.Command) error { blockCounter, ) + // Wire performance metrics into network provider if available + var perfMetrics *clientinfo.PerformanceMetrics + if clientInfoRegistry != nil { + perfMetrics = clientinfo.NewPerformanceMetrics(clientInfoRegistry) + // Type assert to libp2p provider to set metrics recorder + // The provider struct is not exported, so we use interface assertion + if setter, ok := netProvider.(interface { + SetMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + }) + }); ok { + setter.SetMetricsRecorder(perfMetrics) + } + } + // Initialize beacon and tbtc only for non-bootstrap nodes. // Skip initialization for bootstrap nodes as they are only used for network // discovery. @@ -113,6 +132,16 @@ func start(cmd *cobra.Command) error { clientInfoRegistry.RegisterBtcChainInfoSource(btcChain) + if clientInfoRegistry != nil { + rpcHealthChecker := clientinfo.NewRPCHealthChecker( + clientInfoRegistry, + blockCounter, + btcChain, + clientConfig.ClientInfo.RPCHealthCheckInterval, + ) + rpcHealthChecker.Start(ctx) + } + err = beacon.Initialize( ctx, beaconChain, @@ -140,6 +169,7 @@ func start(cmd *cobra.Command) error { proposalGenerator, clientConfig.Tbtc, clientInfoRegistry, + perfMetrics, // Pass the existing performance metrics instance to avoid duplicate registrations ) if err != nil { return fmt.Errorf("error initializing TBTC: [%v]", err) diff --git a/docs/performance-metrics.adoc b/docs/performance-metrics.adoc new file mode 100644 index 0000000000..be5a550746 --- /dev/null +++ b/docs/performance-metrics.adoc @@ -0,0 +1,238 @@ += Performance Metrics + +The Keep Core client exposes performance metrics that can be used to monitor +the health and performance of node operations. These metrics are available +through the `/metrics` endpoint when the client info endpoint is configured. + +== Metrics Endpoint + +Metrics are exposed via HTTP at the `/metrics` endpoint on the port configured +in the `ClientInfo` section of the configuration file (default: `9601`). + +Example: +---- +curl http://localhost:9601/metrics +---- + +== Metric Types + +The client uses three types of metrics: + +* **Counters**: Cumulative counts that only increase (e.g., total operations) +* **Gauges**: Current values that can go up or down (e.g., queue sizes, active operations) +* **Durations**: Time measurements for operations (exposed as average duration and count) + +== Available Metrics + +=== Distributed Key Generation (DKG) Metrics + +==== `performance_dkg_joined_total` +*Type*: Counter +*Description*: Total number of times the node has joined a DKG process +*Labels*: None + +==== `performance_dkg_failed_total` +*Type*: Counter +*Description*: Total number of failed DKG attempts +*Labels*: None + +==== `performance_dkg_duration_seconds` +*Type*: Gauge (average) +*Description*: Average duration of DKG operations in seconds +*Labels*: None + +==== `performance_dkg_duration_seconds_count` +*Type*: Gauge +*Description*: Total number of DKG operations completed +*Labels*: None + +==== `performance_dkg_validation_total` +*Type*: Counter +*Description*: Total number of DKG result validations performed +*Labels*: None + +==== `performance_dkg_challenges_submitted_total` +*Type*: Counter +*Description*: Total number of DKG challenges submitted on-chain +*Labels*: None + +==== `performance_dkg_approvals_submitted_total` +*Type*: Counter +*Description*: Total number of DKG approvals submitted on-chain +*Labels*: None + +=== Signing Operation Metrics + +==== `performance_signing_operations_total` +*Type*: Counter +*Description*: Total number of signing operations attempted +*Labels*: None + +==== `performance_signing_success_total` +*Type*: Counter +*Description*: Total number of successful signing operations +*Labels*: None + +==== `performance_signing_failed_total` +*Type*: Counter +*Description*: Total number of failed signing operations +*Labels*: None + +==== `performance_signing_duration_seconds` +*Type*: Gauge (average) +*Description*: Average duration of signing operations in seconds +*Labels*: None + +==== `performance_signing_duration_seconds_count` +*Type*: Gauge +*Description*: Total number of signing operations completed +*Labels*: None + +==== `performance_signing_timeouts_total` +*Type*: Counter +*Description*: Total number of signing operations that timed out +*Labels*: None + +=== Wallet Action Metrics + +==== `performance_wallet_actions_total` +*Type*: Counter +*Description*: Total number of wallet actions dispatched +*Labels*: None + +==== `performance_wallet_action_success_total` +*Type*: Counter +*Description*: Total number of successfully completed wallet actions +*Labels*: None + +==== `performance_wallet_action_failed_total` +*Type*: Counter +*Description*: Total number of failed wallet actions +*Labels*: None + +==== `performance_wallet_action_duration_seconds` +*Type*: Gauge (average) +*Description*: Average duration of wallet actions in seconds +*Labels*: None + +==== `performance_wallet_action_duration_seconds_count` +*Type*: Gauge +*Description*: Total number of wallet actions completed +*Labels*: None + +==== `performance_wallet_heartbeat_failures_total` +*Type*: Counter +*Description*: Total number of heartbeat failures across all wallets +*Labels*: None + +=== Wallet Dispatcher Metrics + +==== `performance_wallet_dispatcher_active_actions` +*Type*: Gauge +*Description*: Current number of wallets with active actions being executed +*Labels*: None +*Note*: This metric helps identify when wallets are busy and cannot accept new actions + +==== `performance_wallet_dispatcher_rejected_total` +*Type*: Counter +*Description*: Total number of wallet actions rejected because the wallet was busy +*Labels*: None +*Note*: High values indicate that wallets are frequently busy and actions may need retry logic + +=== Coordination Metrics + +==== `performance_coordination_windows_detected_total` +*Type*: Counter +*Description*: Total number of coordination windows detected +*Labels*: None + +==== `performance_coordination_procedures_executed_total` +*Type*: Counter +*Description*: Total number of coordination procedures executed +*Labels*: None + +==== `performance_coordination_failed_total` +*Type*: Counter +*Description*: Total number of failed coordination procedures +*Labels*: None + +==== `performance_coordination_duration_seconds` +*Type*: Gauge (average) +*Description*: Average duration of coordination procedures in seconds +*Labels*: None + +=== Network Metrics + +==== `performance_incoming_message_queue_size` +*Type*: Gauge +*Description*: Current size of the incoming message queue +*Labels*: `channel` (channel name) +*Note*: Maximum queue size is 4096. Values approaching this limit indicate message processing bottlenecks. + +==== `performance_message_handler_queue_size` +*Type*: Gauge +*Description*: Current size of message handler queues +*Labels*: `channel` (channel name), `handler` (handler ID) +*Note*: Maximum queue size per handler is 512. + +==== `performance_peer_connections_total` +*Type*: Counter +*Description*: Total number of peer connections established +*Labels*: None + +==== `performance_peer_disconnections_total` +*Type*: Counter +*Description*: Total number of peer disconnections +*Labels*: None + +==== `performance_message_broadcast_total` +*Type*: Counter +*Description*: Total number of messages broadcast to the network +*Labels*: None + +==== `performance_message_received_total` +*Type*: Counter +*Description*: Total number of messages received from the network +*Labels*: None + +==== `performance_ping_test_total` +*Type*: Counter +*Description*: Total number of ping tests performed +*Labels*: None + +==== `performance_ping_test_success_total` +*Type*: Counter +*Description*: Total number of successful ping tests +*Labels*: None + +==== `performance_ping_test_failed_total` +*Type*: Counter +*Description*: Total number of failed ping tests +*Labels*: None + +=== Relay Entry Metrics (Beacon Node) + +==== `performance_relay_entry_generation_total` +*Type*: Counter +*Description*: Total number of relay entry generation attempts +*Labels*: None + +==== `performance_relay_entry_success_total` +*Type*: Counter +*Description*: Total number of successful relay entries generated +*Labels*: None + +==== `performance_relay_entry_failed_total` +*Type*: Counter +*Description*: Total number of failed relay entry generations +*Labels*: None + +==== `performance_relay_entry_duration_seconds` +*Type*: Gauge (average) +*Description*: Average duration of relay entry generation in seconds +*Labels*: None + +==== `performance_relay_entry_timeout_reported_total` +*Type*: Counter +*Description*: Total number of relay entry timeouts reported on-chain +*Labels*: None \ No newline at end of file diff --git a/pkg/clientinfo/clientinfo.go b/pkg/clientinfo/clientinfo.go index 2f3c6c6f41..7848aa0ec7 100644 --- a/pkg/clientinfo/clientinfo.go +++ b/pkg/clientinfo/clientinfo.go @@ -13,10 +13,11 @@ var logger = log.Logger("keep-clientinfo") // Config stores configuration for the client info. type Config struct { - Port int - NetworkMetricsTick time.Duration - EthereumMetricsTick time.Duration - BitcoinMetricsTick time.Duration + Port int + NetworkMetricsTick time.Duration + EthereumMetricsTick time.Duration + BitcoinMetricsTick time.Duration + RPCHealthCheckInterval time.Duration } // Registry wraps keep-common clientinfo registry and exposes additional diff --git a/pkg/clientinfo/metrics.go b/pkg/clientinfo/metrics.go index 85afdf6b67..c80755cab8 100644 --- a/pkg/clientinfo/metrics.go +++ b/pkg/clientinfo/metrics.go @@ -2,6 +2,7 @@ package clientinfo import ( "fmt" + "strings" "time" "github.com/keep-network/keep-common/pkg/clientinfo" @@ -159,7 +160,16 @@ func (r *Registry) observe( ) { observer, err := r.NewMetricGaugeObserver(name, clientinfo.MetricObserverInput(input)) if err != nil { - logger.Warnf("could not create gauge observer [%v]", name) + // Check if the error is due to metric already existing (expected in some cases) + errStr := err.Error() + if strings.Contains(errStr, "already exists") { + // Metric already registered, this is expected if registerAllMetrics is called multiple times + // or if the same metric is registered in multiple places. Log at debug level. + logger.Debugf("metric [%v] already registered, skipping duplicate registration: %v", name, err) + return + } + // For other errors, log as warning + logger.Warnf("could not create gauge observer [%v]: %v", name, err) return } diff --git a/pkg/clientinfo/performance.go b/pkg/clientinfo/performance.go new file mode 100644 index 0000000000..051212db13 --- /dev/null +++ b/pkg/clientinfo/performance.go @@ -0,0 +1,446 @@ +package clientinfo + +import ( + "sync" + "time" +) + +// PerformanceMetricsRecorder provides a simple interface for recording +// performance metrics. It can be nil if metrics are not enabled. +type PerformanceMetricsRecorder interface { + // IncrementCounter increments a counter metric + IncrementCounter(name string, value float64) + // RecordDuration records a duration in seconds + RecordDuration(name string, duration time.Duration) + // SetGauge sets a gauge metric value + SetGauge(name string, value float64) + // GetCounterValue returns current counter value + GetCounterValue(name string) float64 + // GetGaugeValue returns current gauge value + GetGaugeValue(name string) float64 +} + +// PerformanceMetrics provides a way to record performance-related metrics +// including operation counts, durations, and queue sizes. +// It implements PerformanceMetricsRecorder interface. +type PerformanceMetrics struct { + registry *Registry + + // Counters track cumulative counts of events + countersMutex sync.RWMutex + counters map[string]*counter + + // Histograms track distributions of values (like durations) + histogramsMutex sync.RWMutex + histograms map[string]*histogram + + // Gauges track current values (like queue sizes) + gaugesMutex sync.RWMutex + gauges map[string]*gauge +} + +// Ensure PerformanceMetrics implements PerformanceMetricsRecorder +var _ PerformanceMetricsRecorder = (*PerformanceMetrics)(nil) + +type counter struct { + value float64 + mutex sync.RWMutex +} + +type histogram struct { + buckets map[float64]float64 // bucket upper bound -> count + mutex sync.RWMutex +} + +type gauge struct { + value float64 + mutex sync.RWMutex +} + +// NewPerformanceMetrics creates a new performance metrics instance. +func NewPerformanceMetrics(registry *Registry) *PerformanceMetrics { + pm := &PerformanceMetrics{ + registry: registry, + counters: make(map[string]*counter), + histograms: make(map[string]*histogram), + gauges: make(map[string]*gauge), + } + + // Register all metrics upfront with 0 values so they appear in /metrics endpoint + pm.registerAllMetrics() + + // Register gauge observers for all gauges + go pm.observeGauges() + + return pm +} + +// registerAllMetrics registers all performance metrics with 0 values +// so they appear in the /metrics endpoint even before operations occur. +func (pm *PerformanceMetrics) registerAllMetrics() { + // Register all counter metrics with 0 initial value + counters := []string{ + MetricDKGJoinedTotal, + MetricDKGFailedTotal, + MetricDKGValidationTotal, + MetricDKGChallengesSubmittedTotal, + MetricDKGApprovalsSubmittedTotal, + MetricSigningOperationsTotal, + MetricSigningSuccessTotal, + MetricSigningFailedTotal, + MetricSigningTimeoutsTotal, + MetricWalletActionsTotal, + MetricWalletActionSuccessTotal, + MetricWalletActionFailedTotal, + MetricWalletHeartbeatFailuresTotal, + MetricCoordinationWindowsDetectedTotal, + MetricCoordinationProceduresExecutedTotal, + MetricCoordinationFailedTotal, + MetricPeerConnectionsTotal, + MetricPeerDisconnectionsTotal, + MetricMessageBroadcastTotal, + MetricMessageReceivedTotal, + MetricPingTestsTotal, + MetricPingTestSuccessTotal, + MetricPingTestFailedTotal, + MetricWalletDispatcherRejectedTotal, + MetricRelayEntryGenerationTotal, + MetricRelayEntrySuccessTotal, + MetricRelayEntryFailedTotal, + MetricRelayEntryTimeoutReportedTotal, + } + + // First, initialize all counters in the map + for _, name := range counters { + pm.counters[name] = &counter{value: 0} + } + + // Then, register observers (this prevents concurrent map read/write) + for _, name := range counters { + metricName := name // Capture for closure + pm.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + metricName: func() float64 { + pm.countersMutex.RLock() + c, exists := pm.counters[metricName] + pm.countersMutex.RUnlock() + if !exists { + return 0 + } + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.value + }, + }, + ) + } + + // Register all duration/histogram metrics with 0 initial values + // Note: These use the actual metric names as used in the codebase + durationMetrics := []string{ + "dkg_duration_seconds", + "signing_duration_seconds", + "wallet_action_duration_seconds", + "coordination_duration_seconds", + "ping_test_duration_seconds", + "relay_entry_duration_seconds", // For future beacon metrics + } + + // First, initialize all histograms in the map + for _, name := range durationMetrics { + pm.histograms[name] = &histogram{ + buckets: make(map[float64]float64), + } + } + + // Then, register observers (this prevents concurrent map read/write) + for _, name := range durationMetrics { + metricName := name // Capture for closure + // RecordDuration registers metrics as name + "_duration_seconds" and name + "_count" + // So for "dkg_duration_seconds", it registers: + // - "dkg_duration_seconds_duration_seconds" (average duration) + // - "dkg_duration_seconds_count" (count) + pm.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + metricName + "_duration_seconds": func() float64 { + pm.histogramsMutex.RLock() + h, exists := pm.histograms[metricName] + pm.histogramsMutex.RUnlock() + if !exists { + return 0 + } + h.mutex.RLock() + defer h.mutex.RUnlock() + count := h.buckets[-1] + if count == 0 { + return 0 + } + return h.buckets[-2] / count // average + }, + metricName + "_count": func() float64 { + pm.histogramsMutex.RLock() + h, exists := pm.histograms[metricName] + pm.histogramsMutex.RUnlock() + if !exists { + return 0 + } + h.mutex.RLock() + defer h.mutex.RUnlock() + return h.buckets[-1] + }, + }, + ) + } + + // Register all gauge metrics with 0 initial value + gauges := []string{ + MetricWalletDispatcherActiveActions, + MetricIncomingMessageQueueSize, + MetricMessageHandlerQueueSize, + MetricSigningAttemptsPerOperation, + } + + // First, initialize all gauges in the map + for _, name := range gauges { + pm.gauges[name] = &gauge{value: 0} + } + + // Then, register observers (this prevents concurrent map read/write) + for _, name := range gauges { + metricName := name // Capture for closure + pm.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + metricName: func() float64 { + pm.gaugesMutex.RLock() + g, exists := pm.gauges[metricName] + pm.gaugesMutex.RUnlock() + if !exists { + return 0 + } + g.mutex.RLock() + defer g.mutex.RUnlock() + return g.value + }, + }, + ) + } + +} + +// IncrementCounter increments a counter metric by the given value. +func (pm *PerformanceMetrics) IncrementCounter(name string, value float64) { + pm.countersMutex.Lock() + c, exists := pm.counters[name] + if !exists { + c = &counter{value: 0} + pm.counters[name] = c + } + pm.countersMutex.Unlock() + + c.mutex.Lock() + c.value += value + c.mutex.Unlock() + + // Update the gauge observer for this counter + pm.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + name: func() float64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.value + }, + }, + ) +} + +// RecordDuration records a duration value in a histogram. +// The duration is recorded in seconds. +func (pm *PerformanceMetrics) RecordDuration(name string, duration time.Duration) { + pm.histogramsMutex.Lock() + h, exists := pm.histograms[name] + if !exists { + h = &histogram{ + buckets: make(map[float64]float64), + } + pm.histograms[name] = h + } + pm.histogramsMutex.Unlock() + + seconds := duration.Seconds() + h.mutex.Lock() + // Simple histogram: increment bucket counts + // Buckets: 0.001, 0.01, 0.1, 1, 10, 60, 300, 600 + buckets := []float64{0.001, 0.01, 0.1, 1, 10, 60, 300, 600} + for _, bucket := range buckets { + if seconds <= bucket { + h.buckets[bucket]++ + break + } + } + // Also track total count and sum for average calculation + h.buckets[-1]++ // -1 = count + h.buckets[-2] += seconds // -2 = sum + h.mutex.Unlock() + + // Expose as gauge for now (Prometheus-style histograms would be better) + pm.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + name + "_duration_seconds": func() float64 { + h.mutex.RLock() + defer h.mutex.RUnlock() + count := h.buckets[-1] + if count == 0 { + return 0 + } + return h.buckets[-2] / count // average + }, + name + "_count": func() float64 { + h.mutex.RLock() + defer h.mutex.RUnlock() + return h.buckets[-1] + }, + }, + ) +} + +// SetGauge sets a gauge metric to the given value. +func (pm *PerformanceMetrics) SetGauge(name string, value float64) { + pm.gaugesMutex.Lock() + g, exists := pm.gauges[name] + if !exists { + g = &gauge{value: 0} + pm.gauges[name] = g + } + pm.gaugesMutex.Unlock() + + g.mutex.Lock() + g.value = value + g.mutex.Unlock() + + // Register gauge observer if not already registered + pm.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + name: func() float64 { + g.mutex.RLock() + defer g.mutex.RUnlock() + return g.value + }, + }, + ) +} + +// observeGauges periodically updates gauge observers. +// This is handled automatically by ObserveApplicationSource. +func (pm *PerformanceMetrics) observeGauges() { + // Gauges are observed automatically via ObserveApplicationSource + // This function is kept for future use if needed +} + +// NoOpPerformanceMetrics is a no-op implementation of PerformanceMetricsRecorder +// that can be used when metrics are disabled. +type NoOpPerformanceMetrics struct{} + +// IncrementCounter is a no-op. +func (n *NoOpPerformanceMetrics) IncrementCounter(name string, value float64) {} + +// RecordDuration is a no-op. +func (n *NoOpPerformanceMetrics) RecordDuration(name string, duration time.Duration) {} + +// SetGauge is a no-op. +func (n *NoOpPerformanceMetrics) SetGauge(name string, value float64) {} + +// GetCounterValue always returns 0. +func (n *NoOpPerformanceMetrics) GetCounterValue(name string) float64 { return 0 } + +// GetGaugeValue always returns 0. +func (n *NoOpPerformanceMetrics) GetGaugeValue(name string) float64 { return 0 } + +// GetCounterValue returns the current value of a counter. +func (pm *PerformanceMetrics) GetCounterValue(name string) float64 { + pm.countersMutex.RLock() + c, exists := pm.counters[name] + pm.countersMutex.RUnlock() + + if !exists { + return 0 + } + + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.value +} + +// GetGaugeValue returns the current value of a gauge. +func (pm *PerformanceMetrics) GetGaugeValue(name string) float64 { + pm.gaugesMutex.RLock() + g, exists := pm.gauges[name] + pm.gaugesMutex.RUnlock() + + if !exists { + return 0 + } + + g.mutex.RLock() + defer g.mutex.RUnlock() + return g.value +} + +// Metric names for performance metrics +const ( + // DKG Metrics + MetricDKGJoinedTotal = "dkg_joined_total" + MetricDKGFailedTotal = "dkg_failed_total" + MetricDKGDurationSeconds = "dkg_duration_seconds" + MetricDKGValidationTotal = "dkg_validation_total" + MetricDKGChallengesSubmittedTotal = "dkg_challenges_submitted_total" + MetricDKGApprovalsSubmittedTotal = "dkg_approvals_submitted_total" + + // Signing Metrics + MetricSigningOperationsTotal = "signing_operations_total" + MetricSigningSuccessTotal = "signing_success_total" + MetricSigningFailedTotal = "signing_failed_total" + MetricSigningDurationSeconds = "signing_duration_seconds" + MetricSigningAttemptsPerOperation = "signing_attempts_per_operation" + MetricSigningTimeoutsTotal = "signing_timeouts_total" + + // Wallet Action Metrics + MetricWalletActionsTotal = "wallet_actions_total" + MetricWalletActionSuccessTotal = "wallet_action_success_total" + MetricWalletActionFailedTotal = "wallet_action_failed_total" + MetricWalletActionDurationSeconds = "wallet_action_duration_seconds" + MetricWalletHeartbeatFailuresTotal = "wallet_heartbeat_failures_total" + + // Coordination Metrics + MetricCoordinationWindowsDetectedTotal = "coordination_windows_detected_total" + MetricCoordinationProceduresExecutedTotal = "coordination_procedures_executed_total" + MetricCoordinationFailedTotal = "coordination_failed_total" + MetricCoordinationDurationSeconds = "coordination_duration_seconds" + + // Network Metrics + MetricIncomingMessageQueueSize = "incoming_message_queue_size" + MetricMessageHandlerQueueSize = "message_handler_queue_size" + MetricPeerConnectionsTotal = "peer_connections_total" + MetricPeerDisconnectionsTotal = "peer_disconnections_total" + MetricMessageBroadcastTotal = "message_broadcast_total" + MetricMessageReceivedTotal = "message_received_total" + MetricPingTestsTotal = "ping_test_total" + MetricPingTestSuccessTotal = "ping_test_success_total" + MetricPingTestFailedTotal = "ping_test_failed_total" + + // Wallet Dispatcher Metrics + MetricWalletDispatcherActiveActions = "wallet_dispatcher_active_actions" + MetricWalletDispatcherRejectedTotal = "wallet_dispatcher_rejected_total" + + // Relay Entry Metrics (Beacon) + MetricRelayEntryGenerationTotal = "relay_entry_generation_total" + MetricRelayEntrySuccessTotal = "relay_entry_success_total" + MetricRelayEntryFailedTotal = "relay_entry_failed_total" + MetricRelayEntryDurationSeconds = "relay_entry_duration_seconds" + MetricRelayEntryTimeoutReportedTotal = "relay_entry_timeout_reported_total" +) diff --git a/pkg/clientinfo/rpc_health.go b/pkg/clientinfo/rpc_health.go new file mode 100644 index 0000000000..0832b94c00 --- /dev/null +++ b/pkg/clientinfo/rpc_health.go @@ -0,0 +1,293 @@ +package clientinfo + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ipfs/go-log" + + "github.com/keep-network/keep-core/pkg/bitcoin" + "github.com/keep-network/keep-core/pkg/chain" +) + +var rpcHealthLogger = log.Logger("keep-rpc-health") + +// RPCHealthChecker performs periodic health checks on Ethereum and Bitcoin RPC endpoints +// by making actual RPC calls (not just ICMP ping) to verify the services are working. +type RPCHealthChecker struct { + registry *Registry + + // Ethereum health check + ethBlockCounter chain.BlockCounter + ethLastCheck time.Time + ethLastSuccess time.Time + ethLastError error + ethLastDuration time.Duration // Last successful RPC call duration + ethMutex sync.RWMutex + + // Bitcoin health check + btcChain bitcoin.Chain + btcLastCheck time.Time + btcLastSuccess time.Time + btcLastError error + btcLastDuration time.Duration // Last successful RPC call duration + btcMutex sync.RWMutex + + // Configuration + checkInterval time.Duration +} + +// NewRPCHealthChecker creates a new RPC health checker instance. +func NewRPCHealthChecker( + registry *Registry, + ethBlockCounter chain.BlockCounter, + btcChain bitcoin.Chain, + checkInterval time.Duration, +) *RPCHealthChecker { + if checkInterval == 0 { + checkInterval = 30 * time.Second // Default: check every 30 seconds + } + + return &RPCHealthChecker{ + registry: registry, + ethBlockCounter: ethBlockCounter, + btcChain: btcChain, + checkInterval: checkInterval, + } +} + +// Start begins periodic health checks for both Ethereum and Bitcoin RPC endpoints. +func (r *RPCHealthChecker) Start(ctx context.Context) { + // Perform initial health checks immediately + r.checkEthereumHealth(ctx) + r.checkBitcoinHealth(ctx) + + // Start periodic health checks + go r.runEthereumHealthChecks(ctx) + go r.runBitcoinHealthChecks(ctx) + + // Register metrics observers + r.registerMetrics() +} + +// runEthereumHealthChecks runs periodic Ethereum RPC health checks. +func (r *RPCHealthChecker) runEthereumHealthChecks(ctx context.Context) { + ticker := time.NewTicker(r.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + r.checkEthereumHealth(ctx) + case <-ctx.Done(): + return + } + } +} + +// runBitcoinHealthChecks runs periodic Bitcoin RPC health checks. +func (r *RPCHealthChecker) runBitcoinHealthChecks(ctx context.Context) { + ticker := time.NewTicker(r.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + r.checkBitcoinHealth(ctx) + case <-ctx.Done(): + return + } + } +} + +// checkEthereumHealth performs a comprehensive health check on the Ethereum RPC endpoint +// by making actual RPC calls to verify the service is working properly. +// It checks: +// 1. Current block number retrieval +// 2. Block number is reasonable (not stuck at 0 or extremely old) +func (r *RPCHealthChecker) checkEthereumHealth(ctx context.Context) { + if r.ethBlockCounter == nil { + return + } + + startTime := time.Now() + + // First check: Get current block number + currentBlock, err := r.ethBlockCounter.CurrentBlock() + if err != nil { + r.ethMutex.Lock() + r.ethLastCheck = startTime + r.ethLastError = err + r.ethMutex.Unlock() + rpcHealthLogger.Warnf( + "Ethereum RPC health check failed (CurrentBlock): [%v] (duration: %v)", + err, + time.Since(startTime), + ) + return + } + + // Second check: Verify block number is reasonable + // Block number should be > 0 (unless on a very new testnet) + // For mainnet/testnet, block numbers should be in thousands/millions + if currentBlock == 0 { + r.ethMutex.Lock() + r.ethLastCheck = startTime + r.ethLastError = fmt.Errorf("block number is 0, node may not be synced") + r.ethMutex.Unlock() + rpcHealthLogger.Warnf( + "Ethereum RPC health check failed (block number is 0): [%v] (duration: %v)", + r.ethLastError, + time.Since(startTime), + ) + return + } + + duration := time.Since(startTime) + + r.ethMutex.Lock() + r.ethLastCheck = startTime + r.ethLastSuccess = time.Now() + r.ethLastError = nil + r.ethLastDuration = duration + r.ethMutex.Unlock() + + rpcHealthLogger.Debugf( + "Ethereum RPC health check succeeded (block: %d, duration: %v)", + currentBlock, + duration, + ) +} + +// checkBitcoinHealth performs a comprehensive health check on the Bitcoin RPC endpoint +// by making actual RPC calls to verify the service is working properly. +// It checks: +// 1. Latest block height retrieval +// 2. Block header retrieval for the latest block (verifies RPC can retrieve block data) +// 3. Block height is reasonable (not 0) +func (r *RPCHealthChecker) checkBitcoinHealth(ctx context.Context) { + if r.btcChain == nil { + return + } + + startTime := time.Now() + + // First check: Get latest block height + latestHeight, err := r.btcChain.GetLatestBlockHeight() + if err != nil { + r.btcMutex.Lock() + r.btcLastCheck = startTime + r.btcLastError = err + r.btcMutex.Unlock() + rpcHealthLogger.Warnf( + "Bitcoin RPC health check failed (GetLatestBlockHeight): [%v] (duration: %v)", + err, + time.Since(startTime), + ) + return + } + + // Second check: Verify block height is reasonable + if latestHeight == 0 { + r.btcMutex.Lock() + r.btcLastCheck = startTime + r.btcLastError = fmt.Errorf("block height is 0, node may not be synced") + r.btcMutex.Unlock() + rpcHealthLogger.Warnf( + "Bitcoin RPC health check failed (block height is 0): [%v] (duration: %v)", + r.btcLastError, + time.Since(startTime), + ) + return + } + + // Third check: Try to get block header for the latest block + // This verifies the RPC can actually retrieve block data, not just return a number + _, err = r.btcChain.GetBlockHeader(latestHeight) + if err != nil { + r.btcMutex.Lock() + r.btcLastCheck = startTime + r.btcLastError = fmt.Errorf("failed to get block header for height %d: %w", latestHeight, err) + r.btcMutex.Unlock() + rpcHealthLogger.Warnf( + "Bitcoin RPC health check failed (GetBlockHeader): [%v] (duration: %v)", + r.btcLastError, + time.Since(startTime), + ) + return + } + + duration := time.Since(startTime) + + r.btcMutex.Lock() + r.btcLastCheck = startTime + r.btcLastSuccess = time.Now() + r.btcLastError = nil + r.btcLastDuration = duration + r.btcMutex.Unlock() + + rpcHealthLogger.Debugf( + "Bitcoin RPC health check succeeded (height: %d, duration: %v)", + latestHeight, + duration, + ) +} + +// GetEthereumHealthStatus returns the current Ethereum RPC health status. +func (r *RPCHealthChecker) GetEthereumHealthStatus() (isHealthy bool, lastCheck time.Time, lastSuccess time.Time, lastError error, lastDuration time.Duration) { + r.ethMutex.RLock() + defer r.ethMutex.RUnlock() + + isHealthy = r.ethLastError == nil && !r.ethLastCheck.IsZero() + return isHealthy, r.ethLastCheck, r.ethLastSuccess, r.ethLastError, r.ethLastDuration +} + +// GetBitcoinHealthStatus returns the current Bitcoin RPC health status. +func (r *RPCHealthChecker) GetBitcoinHealthStatus() (isHealthy bool, lastCheck time.Time, lastSuccess time.Time, lastError error, lastDuration time.Duration) { + r.btcMutex.RLock() + defer r.btcMutex.RUnlock() + + isHealthy = r.btcLastError == nil && !r.btcLastCheck.IsZero() + return isHealthy, r.btcLastCheck, r.btcLastSuccess, r.btcLastError, r.btcLastDuration +} + +// registerMetrics registers metrics observers for RPC health status. +func (r *RPCHealthChecker) registerMetrics() { + // Ethereum RPC health status and response time + r.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + "rpc_eth_health_status": func() float64 { + isHealthy, _, _, _, _ := r.GetEthereumHealthStatus() + if isHealthy { + return 1 + } + return 0 + }, + "rpc_eth_response_time_seconds": func() float64 { + _, _, _, _, lastDuration := r.GetEthereumHealthStatus() + return lastDuration.Seconds() + }, + }, + ) + + // Bitcoin RPC health status and response time + r.registry.ObserveApplicationSource( + "performance", + map[string]Source{ + "rpc_btc_health_status": func() float64 { + isHealthy, _, _, _, _ := r.GetBitcoinHealthStatus() + if isHealthy { + return 1 + } + return 0 + }, + "rpc_btc_response_time_seconds": func() float64 { + _, _, _, _, lastDuration := r.GetBitcoinHealthStatus() + return lastDuration.Seconds() + }, + }, + ) +} diff --git a/pkg/maintainer/spv/deposit_sweep.go b/pkg/maintainer/spv/deposit_sweep.go index ef85221b90..2b0b8a5f77 100644 --- a/pkg/maintainer/spv/deposit_sweep.go +++ b/pkg/maintainer/spv/deposit_sweep.go @@ -27,6 +27,7 @@ func SubmitDepositSweepProof( btcChain, spvChain, bitcoin.AssembleSpvProof, + getGlobalMetricsRecorder(), ) } @@ -36,8 +37,19 @@ func submitDepositSweepProof( btcChain bitcoin.Chain, spvChain Chain, spvProofAssembler spvProofAssembler, + metricsRecorder interface { + IncrementCounter(name string, value float64) + }, ) error { + // Record proof submission attempt + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("deposit_sweep_proof_submissions_total", 1) + } + if requiredConfirmations == 0 { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("deposit_sweep_proof_submissions_failed_total", 1) + } return fmt.Errorf( "provided required confirmations count must be greater than 0", ) @@ -49,6 +61,9 @@ func submitDepositSweepProof( btcChain, ) if err != nil { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("deposit_sweep_proof_submissions_failed_total", 1) + } return fmt.Errorf( "failed to assemble transaction spv proof: [%v]", err, @@ -61,6 +76,9 @@ func submitDepositSweepProof( transaction, ) if err != nil { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("deposit_sweep_proof_submissions_failed_total", 1) + } return fmt.Errorf( "error while parsing transaction inputs: [%v]", err, @@ -73,12 +91,20 @@ func submitDepositSweepProof( mainUTXO, vault, ); err != nil { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("deposit_sweep_proof_submissions_failed_total", 1) + } return fmt.Errorf( "failed to submit deposit sweep proof with reimbursement: [%v]", err, ) } + // Record successful proof submission + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("deposit_sweep_proof_submissions_success_total", 1) + } + return nil } diff --git a/pkg/maintainer/spv/deposit_sweep_test.go b/pkg/maintainer/spv/deposit_sweep_test.go index fa9ff0f655..dc61256ccf 100644 --- a/pkg/maintainer/spv/deposit_sweep_test.go +++ b/pkg/maintainer/spv/deposit_sweep_test.go @@ -96,6 +96,7 @@ func TestSubmitDepositSweepProof(t *testing.T) { btcChain, spvChain, mockSpvProofAssembler, + getGlobalMetricsRecorder(), ) if err != nil { t.Fatal(err) diff --git a/pkg/maintainer/spv/redemptions.go b/pkg/maintainer/spv/redemptions.go index df8541f07b..094c73643d 100644 --- a/pkg/maintainer/spv/redemptions.go +++ b/pkg/maintainer/spv/redemptions.go @@ -3,10 +3,18 @@ package spv import ( "bytes" "fmt" + "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/tbtc" ) +// getGlobalMetricsRecorder returns the global metrics recorder if set. +func getGlobalMetricsRecorder() interface { + IncrementCounter(name string, value float64) +} { + return globalMetricsRecorder +} + // SubmitRedemptionProof prepares redemption proof for the given transaction // and submits it to the on-chain contract. If the number of required // confirmations is `0`, an error is returned. @@ -22,6 +30,7 @@ func SubmitRedemptionProof( btcChain, spvChain, bitcoin.AssembleSpvProof, + getGlobalMetricsRecorder(), ) } @@ -31,8 +40,19 @@ func submitRedemptionProof( btcChain bitcoin.Chain, spvChain Chain, spvProofAssembler spvProofAssembler, + metricsRecorder interface { + IncrementCounter(name string, value float64) + }, ) error { + // Record proof submission attempt + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("redemption_proof_submissions_total", 1) + } + if requiredConfirmations == 0 { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("redemption_proof_submissions_failed_total", 1) + } return fmt.Errorf( "provided required confirmations count must be greater than 0", ) @@ -44,6 +64,9 @@ func submitRedemptionProof( btcChain, ) if err != nil { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("redemption_proof_submissions_failed_total", 1) + } return fmt.Errorf( "failed to assemble transaction spv proof: [%v]", err, @@ -55,6 +78,9 @@ func submitRedemptionProof( transaction, ) if err != nil { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("redemption_proof_submissions_failed_total", 1) + } return fmt.Errorf( "error while parsing transaction inputs: [%v]", err, @@ -67,12 +93,20 @@ func submitRedemptionProof( mainUTXO, walletPublicKeyHash, ); err != nil { + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("redemption_proof_submissions_failed_total", 1) + } return fmt.Errorf( "failed to submit redemption proof with reimbursement: [%v]", err, ) } + // Record successful proof submission + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("redemption_proof_submissions_success_total", 1) + } + return nil } diff --git a/pkg/maintainer/spv/redemptions_test.go b/pkg/maintainer/spv/redemptions_test.go index 70a1594c00..4f10a3a208 100644 --- a/pkg/maintainer/spv/redemptions_test.go +++ b/pkg/maintainer/spv/redemptions_test.go @@ -78,6 +78,7 @@ func TestSubmitRedemptionProof(t *testing.T) { btcChain, spvChain, mockSpvProofAssembler, + getGlobalMetricsRecorder(), ) if err != nil { t.Fatal(err) diff --git a/pkg/maintainer/spv/spv.go b/pkg/maintainer/spv/spv.go index ef1bef69e2..4fbd3baf49 100644 --- a/pkg/maintainer/spv/spv.go +++ b/pkg/maintainer/spv/spv.go @@ -38,6 +38,20 @@ func Initialize( go spvMaintainer.startControlLoop(ctx) } +// globalMetricsRecorder is a package-level variable to access metrics recorder +// from proof submission functions. +var globalMetricsRecorder interface { + IncrementCounter(name string, value float64) +} + +// SetMetricsRecorder sets the metrics recorder for the SPV maintainer. +// This allows recording metrics for proof submissions. +func SetMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) +}) { + globalMetricsRecorder = recorder +} + // proofTypes holds the information about proof types supported by the // SPV maintainer. var proofTypes = map[tbtc.WalletActionType]struct { @@ -67,6 +81,11 @@ type spvMaintainer struct { spvChain Chain btcDiffChain btcdiff.Chain btcChain bitcoin.Chain + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + } } func (sm *spvMaintainer) startControlLoop(ctx context.Context) { diff --git a/pkg/net/libp2p/channel.go b/pkg/net/libp2p/channel.go index c91e338b01..c75fd808aa 100644 --- a/pkg/net/libp2p/channel.go +++ b/pkg/net/libp2p/channel.go @@ -6,6 +6,7 @@ import ( "runtime" "sync" "sync/atomic" + "time" "google.golang.org/protobuf/proto" @@ -73,6 +74,13 @@ type channel struct { unmarshalersByType map[string]func() net.TaggedUnmarshaler retransmissionTicker *retransmission.Ticker + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } type messageHandler struct { @@ -239,7 +247,11 @@ func (c *channel) publish(message *pb.BroadcastNetworkMessage) error { c.publisherMutex.Lock() defer c.publisherMutex.Unlock() - return c.publisher.Publish(context.TODO(), messageBytes) + publishErr := c.publisher.Publish(context.TODO(), messageBytes) + if publishErr == nil && c.metricsRecorder != nil { + c.metricsRecorder.IncrementCounter("message_broadcast_total", 1) + } + return publishErr } func (c *channel) handleMessages(ctx context.Context) { @@ -282,6 +294,9 @@ func (c *channel) incomingMessageWorker(ctx context.Context) { case <-ctx.Done(): return case msg := <-c.incomingMessageQueue: + if c.metricsRecorder != nil { + c.metricsRecorder.IncrementCounter("message_received_total", 1) + } if err := c.processPubsubMessage(msg); err != nil { logger.Error(err) } @@ -424,3 +439,49 @@ func extractPublicKey(peer peer.ID) (*operator.PublicKey, error) { return networkPublicKeyToOperatorPublicKey(publicKey) } + +// setMetricsRecorder sets the metrics recorder for the channel and starts +// periodic queue size monitoring. +func (c *channel) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + c.metricsRecorder = recorder + // Start periodic queue size monitoring + if recorder != nil { + go c.monitorQueueSizes(recorder) + } +} + +// monitorQueueSizes periodically records queue sizes as metrics. +func (c *channel) monitorQueueSizes(recorder interface { + SetGauge(name string, value float64) +}) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Stop monitoring when channel is closed (we'll use a simple ticker for now) + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Record incoming message queue size + queueSize := float64(len(c.incomingMessageQueue)) + recorder.SetGauge("incoming_message_queue_size", queueSize) + + // Record message handler queue sizes + c.messageHandlersMutex.Lock() + for i, handler := range c.messageHandlers { + handlerQueueSize := float64(len(handler.channel)) + recorder.SetGauge("message_handler_queue_size", handlerQueueSize) + _ = i // avoid unused variable + } + c.messageHandlersMutex.Unlock() + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/net/libp2p/channel_manager.go b/pkg/net/libp2p/channel_manager.go index aa9f888a12..1118d22db6 100644 --- a/pkg/net/libp2p/channel_manager.go +++ b/pkg/net/libp2p/channel_manager.go @@ -48,6 +48,13 @@ type channelManager struct { topicsMutex sync.Mutex topics map[string]*pubsub.Topic + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func newChannelManager( @@ -108,11 +115,31 @@ func (cm *channelManager) getChannel(name string) (*channel, error) { } cm.channels[name] = channel + // Wire metrics recorder into channel if available + if cm.metricsRecorder != nil { + channel.setMetricsRecorder(cm.metricsRecorder) + } } return channel, nil } +// setMetricsRecorder sets the metrics recorder for the channel manager +// and wires it into existing channels. +func (cm *channelManager) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + cm.metricsRecorder = recorder + // Wire metrics into existing channels + cm.channelsMutex.Lock() + defer cm.channelsMutex.Unlock() + for _, channel := range cm.channels { + channel.setMetricsRecorder(recorder) + } +} + func (cm *channelManager) newChannel(name string) (*channel, error) { topic, err := cm.getTopic(name) if err != nil { diff --git a/pkg/net/libp2p/libp2p.go b/pkg/net/libp2p/libp2p.go index 5f56306222..f2e1f27d9c 100644 --- a/pkg/net/libp2p/libp2p.go +++ b/pkg/net/libp2p/libp2p.go @@ -16,6 +16,7 @@ import ( dstore "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" + //lint:ignore SA1019 package deprecated, but we rely on its interface addrutil "github.com/libp2p/go-addr-util" "github.com/libp2p/go-libp2p" @@ -92,6 +93,13 @@ type provider struct { disseminationTime int connectionManager *connectionManager + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func (p *provider) BroadcastChannelFor(name string) (net.BroadcastChannel, error) { @@ -325,7 +333,9 @@ func Connect( return nil, err } - host.Network().Notify(buildNotifiee(host)) + // Build notifiee with metrics recorder (will be set later if available) + notifiee := buildNotifiee(host, nil) + host.Network().Notify(notifiee) broadcastChannelManager, err := newChannelManager(ctx, identity, host, ticker) if err != nil { @@ -376,6 +386,21 @@ func Connect( return provider, nil } +// SetMetricsRecorder sets the metrics recorder for the provider and wires it +// into network components. +func (p *provider) SetMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + p.metricsRecorder = recorder + if p.broadcastChannelManager != nil { + p.broadcastChannelManager.setMetricsRecorder(recorder) + } + // Update notifiee with metrics recorder + p.host.Network().Notify(buildNotifiee(p.host, recorder)) +} + func discoverAndListen( ctx context.Context, identity *identity, @@ -533,7 +558,11 @@ func extractMultiAddrFromPeers(peers []string) ([]peer.AddrInfo, error) { return peerInfos, nil } -func buildNotifiee(libp2pHost host.Host) libp2pnet.Notifiee { +func buildNotifiee(libp2pHost host.Host, metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) libp2pnet.Notifiee { notifyBundle := &libp2pnet.NotifyBundle{} notifyBundle.ConnectedF = func(_ libp2pnet.Network, connection libp2pnet.Conn) { @@ -546,7 +575,11 @@ func buildNotifiee(libp2pHost host.Host) libp2pnet.Notifiee { logger.Infof("established connection to [%v]", peerMultiaddress) - go executePingTest(libp2pHost, peerID, peerMultiaddress) + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("peer_connections_total", 1) + } + + go executePingTest(libp2pHost, peerID, peerMultiaddress, metricsRecorder) } notifyBundle.DisconnectedF = func(_ libp2pnet.Network, connection libp2pnet.Conn) { logger.Infof( @@ -556,6 +589,10 @@ func buildNotifiee(libp2pHost host.Host) libp2pnet.Notifiee { connection.RemotePeer(), ), ) + + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("peer_disconnections_total", 1) + } } return notifyBundle @@ -565,39 +602,64 @@ func executePingTest( libp2pHost host.Host, peerID peer.ID, peerMultiaddress string, + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + }, ) { logger.Infof("starting ping test for [%v]", peerMultiaddress) + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("ping_test_total", 1) + } + ctx, cancelCtx := context.WithTimeout( context.Background(), pingTestTimeout, ) defer cancelCtx() + startTime := time.Now() resultChan := ping.Ping(ctx, libp2pHost, peerID) select { case result := <-resultChan: + if metricsRecorder != nil { + metricsRecorder.RecordDuration("ping_test_duration_seconds", time.Since(startTime)) + } if result.Error != nil { logger.Warnf( "ping test for [%v] failed: [%v]", peerMultiaddress, result.Error, ) + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("ping_test_failed_total", 1) + } } else if result.Error == nil && result.RTT == 0 { logger.Warnf( "peer test for [%v] failed without clear reason", peerMultiaddress, ) + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("ping_test_failed_total", 1) + } } else { logger.Infof( "ping test for [%v] completed with success (RTT [%v])", peerMultiaddress, result.RTT, ) + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("ping_test_success_total", 1) + } } case <-ctx.Done(): logger.Warnf("ping test for [%v] timed out", peerMultiaddress) + if metricsRecorder != nil { + metricsRecorder.IncrementCounter("ping_test_failed_total", 1) + } } } diff --git a/pkg/tbtc/coordination.go b/pkg/tbtc/coordination.go index 6d06ff9634..b38e08b6fc 100644 --- a/pkg/tbtc/coordination.go +++ b/pkg/tbtc/coordination.go @@ -295,6 +295,13 @@ type coordinationExecutor struct { protocolLatch *generator.ProtocolLatch waitForBlockFn waitForBlockFn + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } // newCoordinationExecutor creates a new coordination executor for the @@ -363,6 +370,8 @@ func (ce *coordinationExecutor) coordinate( execLogger.Info("starting coordination") + startTime := time.Now() + seed, err := ce.getSeed(window.coordinationBlock) if err != nil { return nil, fmt.Errorf("failed to compute coordination seed: [%v]", err) @@ -411,6 +420,10 @@ func (ce *coordinationExecutor) coordinate( // no point to keep the context active as retransmissions do not // occur anyway. cancelCtx() + if ce.metricsRecorder != nil { + ce.metricsRecorder.IncrementCounter("coordination_failed_total", 1) + ce.metricsRecorder.RecordDuration("coordination_duration_seconds", time.Since(startTime)) + } return nil, fmt.Errorf( "failed to execute leader's routine: [%v]", err, @@ -431,6 +444,10 @@ func (ce *coordinationExecutor) coordinate( append(actionsChecklist, ActionNoop), ) if err != nil { + if ce.metricsRecorder != nil { + ce.metricsRecorder.IncrementCounter("coordination_failed_total", 1) + ce.metricsRecorder.RecordDuration("coordination_duration_seconds", time.Since(startTime)) + } return nil, fmt.Errorf( "failed to execute follower's routine: [%v]", err, @@ -459,9 +476,23 @@ func (ce *coordinationExecutor) coordinate( execLogger.Infof("coordination completed with result: [%s]", result) + // Record successful coordination metrics + if ce.metricsRecorder != nil { + ce.metricsRecorder.RecordDuration("coordination_duration_seconds", time.Since(startTime)) + } + return result, nil } +// setMetricsRecorder sets the metrics recorder for the coordination executor. +func (ce *coordinationExecutor) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + ce.metricsRecorder = recorder +} + // getSeed computes the coordination seed for the given coordination window. func (ce *coordinationExecutor) getSeed( coordinationBlock uint64, diff --git a/pkg/tbtc/deposit_sweep.go b/pkg/tbtc/deposit_sweep.go index ca29d63b2e..824ce29d28 100644 --- a/pkg/tbtc/deposit_sweep.go +++ b/pkg/tbtc/deposit_sweep.go @@ -87,6 +87,12 @@ type depositSweepAction struct { signingTimeoutSafetyMarginBlocks uint64 broadcastTimeout time.Duration broadcastCheckDelay time.Duration + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func newDepositSweepAction( @@ -124,6 +130,13 @@ func newDepositSweepAction( } func (dsa *depositSweepAction) execute() error { + executionStartTime := time.Now() + + // Record deposit sweep execution attempt + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_total", 1) + } + validateProposalLogger := dsa.logger.With( zap.String("step", "validateProposal"), ) @@ -139,6 +152,10 @@ func (dsa *depositSweepAction) execute() error { dsa.btcChain, ) if err != nil { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("validate proposal step failed: [%v]", err) } @@ -148,6 +165,10 @@ func (dsa *depositSweepAction) execute() error { dsa.btcChain, ) if err != nil { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf( "error while determining wallet's main UTXO: [%v]", err, @@ -161,6 +182,10 @@ func (dsa *depositSweepAction) execute() error { dsa.btcChain, ) if err != nil { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf( "error while ensuring wallet state is synced between "+ "BTC and host chain: [%v]", @@ -176,6 +201,10 @@ func (dsa *depositSweepAction) execute() error { dsa.proposal.SweepTxFee.Int64(), ) if err != nil { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf( "error while assembling deposit sweep transaction: [%v]", err, @@ -188,9 +217,14 @@ func (dsa *depositSweepAction) execute() error { // Just in case. This should never happen. if dsa.proposalExpiryBlock < dsa.signingTimeoutSafetyMarginBlocks { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("invalid proposal expiry block") } + signingStartTime := time.Now() sweepTx, err := dsa.transactionExecutor.signTransaction( signTxLogger, unsignedSweepTx, @@ -198,9 +232,18 @@ func (dsa *depositSweepAction) execute() error { dsa.proposalExpiryBlock-dsa.signingTimeoutSafetyMarginBlocks, ) if err != nil { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("sign transaction step failed: [%v]", err) } + // Record deposit sweep transaction signing duration + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.RecordDuration("deposit_sweep_tx_signing_duration_seconds", time.Since(signingStartTime)) + } + broadcastTxLogger := dsa.logger.With( zap.String("step", "broadcastTransaction"), zap.String("sweepTxHash", sweepTx.Hash().Hex(bitcoin.ReversedByteOrder)), @@ -213,9 +256,19 @@ func (dsa *depositSweepAction) execute() error { dsa.broadcastCheckDelay, ) if err != nil { + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_failed_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("broadcast transaction step failed: [%v]", err) } + // Record successful deposit sweep execution + if dsa.metricsRecorder != nil { + dsa.metricsRecorder.IncrementCounter("deposit_sweep_executions_success_total", 1) + dsa.metricsRecorder.RecordDuration("deposit_sweep_execution_duration_seconds", time.Since(executionStartTime)) + } + return nil } @@ -424,6 +477,14 @@ func (dsa *depositSweepAction) actionType() WalletActionType { return ActionDepositSweep } +// setMetricsRecorder sets the metrics recorder for the deposit sweep action. +func (dsa *depositSweepAction) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + dsa.metricsRecorder = recorder +} + // assembleDepositSweepTransaction constructs an unsigned deposit sweep Bitcoin // transaction. // diff --git a/pkg/tbtc/dkg.go b/pkg/tbtc/dkg.go index 06f417b32b..7b2189da52 100644 --- a/pkg/tbtc/dkg.go +++ b/pkg/tbtc/dkg.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" - "golang.org/x/exp/maps" "math/big" "sort" + "time" + + "golang.org/x/exp/maps" "go.uber.org/zap" @@ -64,6 +66,13 @@ type dkgExecutor struct { waitForBlockFn waitForBlockFn tecdsaExecutor *dkg.Executor + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } // newDkgExecutor creates a new instance of dkgExecutor struct. There should @@ -105,6 +114,15 @@ func newDkgExecutor( } } +// setMetricsRecorder sets the metrics recorder for the DKG executor. +func (de *dkgExecutor) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + de.metricsRecorder = recorder +} + // preParamsCount returns the current count of the ECDSA DKG pre-parameters. func (de *dkgExecutor) preParamsCount() int { return de.tecdsaExecutor.PreParamsCount() @@ -152,6 +170,10 @@ func (de *dkgExecutor) executeDkgIfEligible( membersCount, ) + if de.metricsRecorder != nil { + de.metricsRecorder.IncrementCounter("dkg_joined_total", float64(membersCount)) + } + de.generateSigningGroup( dkgLogger, seed, @@ -283,6 +305,7 @@ func (de *dkgExecutor) generateSigningGroup( memberIndex := index go func() { + dkgStartTime := time.Now() de.protocolLatch.Lock() defer de.protocolLatch.Unlock() @@ -388,6 +411,10 @@ func (de *dkgExecutor) generateSigningGroup( }, ) if err != nil { + if de.metricsRecorder != nil { + de.metricsRecorder.IncrementCounter("dkg_failed_total", 1) + de.metricsRecorder.RecordDuration("dkg_duration_seconds", time.Since(dkgStartTime)) + } if errors.Is(err, context.Canceled) { dkgLogger.Infof( "[member:%v] DKG is no longer awaiting the result; "+ @@ -420,6 +447,11 @@ func (de *dkgExecutor) generateSigningGroup( dkgLogger.Infof("registered %s", signer) + // Record successful DKG completion + if de.metricsRecorder != nil { + de.metricsRecorder.RecordDuration("dkg_duration_seconds", time.Since(dkgStartTime)) + } + err = de.publishDkgResult( ctx, dkgLogger, @@ -557,6 +589,10 @@ func (de *dkgExecutor) executeDkgValidation( dkgLogger.Infof("starting DKG result validation") + if de.metricsRecorder != nil { + de.metricsRecorder.IncrementCounter("dkg_validation_total", 1) + } + isValid, err := de.chain.IsDKGResultValid(result) if err != nil { dkgLogger.Errorf("cannot validate DKG result: [%v]", err) @@ -586,6 +622,10 @@ func (de *dkgExecutor) executeDkgValidation( return } + if de.metricsRecorder != nil { + de.metricsRecorder.IncrementCounter("dkg_challenges_submitted_total", 1) + } + confirmationBlock := submissionBlock + (i * dkgResultChallengeConfirmationBlocks) @@ -732,6 +772,10 @@ func (de *dkgExecutor) executeDkgValidation( return } + if de.metricsRecorder != nil { + de.metricsRecorder.IncrementCounter("dkg_approvals_submitted_total", 1) + } + dkgLogger.Infof("[member:%v] approving DKG result", memberIndex) }(currentMemberIndex) } diff --git a/pkg/tbtc/node.go b/pkg/tbtc/node.go index fc9ba55b10..0fb3b55710 100644 --- a/pkg/tbtc/node.go +++ b/pkg/tbtc/node.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/keep-network/keep-common/pkg/chain/ethereum" "github.com/keep-network/keep-core/pkg/bitcoin" @@ -111,6 +112,13 @@ type node struct { // proposalGenerator is the implementation of the coordination proposal // generator used by the node. proposalGenerator CoordinationProposalGenerator + + // performanceMetrics is optional and used for recording performance metrics + performanceMetrics interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func newNode( @@ -184,6 +192,34 @@ func newNode( return node, nil } +// setPerformanceMetrics sets the performance metrics recorder for the node +// and wires it into components that support metrics. +func (n *node) setPerformanceMetrics(metrics interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + n.performanceMetrics = metrics + if n.walletDispatcher != nil { + n.walletDispatcher.setMetricsRecorder(metrics) + } + if n.dkgExecutor != nil { + n.dkgExecutor.setMetricsRecorder(metrics) + } + + // Wire redemption metrics to proposal generator if it supports it + // This uses a type assertion to check if proposalGenerator is a *ProposalGenerator + // from the tbtcpg package. We can't import tbtcpg here to avoid circular dependencies, + // so we use an interface check instead. + if pg, ok := n.proposalGenerator.(interface { + SetRedemptionMetricsRecorder(recorder interface { + SetGauge(name string, value float64) + }) + }); ok { + pg.SetRedemptionMetricsRecorder(metrics) + } +} + // operatorAddress returns the node's operator address. func (n *node) operatorAddress() (chain.Address, error) { _, operatorPublicKey, err := n.chain.OperatorKeyPair() @@ -339,6 +375,11 @@ func (n *node) getSigningExecutor( signingAttemptsLimit, ) + // Wire metrics recorder if available + if n.performanceMetrics != nil { + executor.setMetricsRecorder(n.performanceMetrics) + } + n.signingExecutors[executorKey] = executor return executor, true, nil @@ -434,6 +475,11 @@ func (n *node) getCoordinationExecutor( n.waitForBlockHeight, ) + // Wire metrics recorder if available + if n.performanceMetrics != nil { + executor.setMetricsRecorder(n.performanceMetrics) + } + n.coordinationExecutors[executorKey] = executor executorLogger.Infof( @@ -673,6 +719,11 @@ func (n *node) handleDepositSweepProposal( n.waitForBlockHeight, ) + // Wire metrics recorder if available + if n.performanceMetrics != nil { + action.setMetricsRecorder(n.performanceMetrics) + } + err = n.walletDispatcher.dispatch(action) if err != nil { walletActionLogger.Errorf("cannot dispatch wallet action: [%v]", err) @@ -741,6 +792,11 @@ func (n *node) handleRedemptionProposal( n.waitForBlockHeight, ) + // Wire metrics recorder if available + if n.performanceMetrics != nil { + action.setMetricsRecorder(n.performanceMetrics) + } + err = n.walletDispatcher.dispatch(action) if err != nil { walletActionLogger.Errorf("cannot dispatch wallet action: [%v]", err) @@ -934,6 +990,11 @@ func (n *node) runCoordinationLayer( // Prepare a callback function that will be called every time a new // coordination window is detected. onWindowFn := func(window *coordinationWindow) { + // Track coordination window detection + if n.performanceMetrics != nil { + n.performanceMetrics.IncrementCounter("coordination_windows_detected_total", 1) + } + // Fetch all wallets controlled by the node. It is important to // get the wallets every time the window is triggered as the // node may have started controlling a new wallet in the meantime. @@ -998,6 +1059,8 @@ func executeCoordinationProcedure( procedureLogger.Infof("starting coordination procedure") + startTime := time.Now() + executor, ok, err := node.getCoordinationExecutor(walletPublicKey) if err != nil { procedureLogger.Errorf("cannot get coordination executor: [%v]", err) @@ -1015,6 +1078,10 @@ func executeCoordinationProcedure( result, err := executor.coordinate(window) if err != nil { procedureLogger.Errorf("coordination procedure failed: [%v]", err) + if node.performanceMetrics != nil { + node.performanceMetrics.IncrementCounter("coordination_failed_total", 1) + node.performanceMetrics.RecordDuration("coordination_duration_seconds", time.Since(startTime)) + } return nil, false } @@ -1023,6 +1090,12 @@ func executeCoordinationProcedure( result, ) + // Record successful coordination procedure + if node.performanceMetrics != nil { + node.performanceMetrics.IncrementCounter("coordination_procedures_executed_total", 1) + node.performanceMetrics.RecordDuration("coordination_duration_seconds", time.Since(startTime)) + } + return result, true } diff --git a/pkg/tbtc/redemption.go b/pkg/tbtc/redemption.go index 218f2db58d..1b08031eb3 100644 --- a/pkg/tbtc/redemption.go +++ b/pkg/tbtc/redemption.go @@ -118,6 +118,12 @@ type redemptionAction struct { feeDistribution redemptionFeeDistributionFn transactionShape RedemptionTransactionShape + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func newRedemptionAction( @@ -158,6 +164,13 @@ func newRedemptionAction( } func (ra *redemptionAction) execute() error { + executionStartTime := time.Now() + + // Record redemption execution attempt + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_total", 1) + } + validateProposalLogger := ra.logger.With( zap.String("step", "validateProposal"), ) @@ -171,6 +184,10 @@ func (ra *redemptionAction) execute() error { ra.chain, ) if err != nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("validate proposal step failed: [%v]", err) } @@ -180,6 +197,10 @@ func (ra *redemptionAction) execute() error { ra.btcChain, ) if err != nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf( "error while determining wallet's main UTXO: [%v]", err, @@ -189,6 +210,10 @@ func (ra *redemptionAction) execute() error { // Proposal validation should detect this but let's make a check just // in case. if walletMainUtxo == nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("redeeming wallet has no main UTXO") } @@ -199,6 +224,10 @@ func (ra *redemptionAction) execute() error { ra.btcChain, ) if err != nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf( "error while ensuring wallet state is synced between "+ "BTC and host chain: [%v]", @@ -215,6 +244,10 @@ func (ra *redemptionAction) execute() error { ra.transactionShape, ) if err != nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf( "error while assembling redemption transaction: [%v]", err, @@ -227,9 +260,14 @@ func (ra *redemptionAction) execute() error { // Just in case. This should never happen. if ra.proposalExpiryBlock < ra.signingTimeoutSafetyMarginBlocks { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("invalid proposal expiry block") } + signingStartTime := time.Now() redemptionTx, err := ra.transactionExecutor.signTransaction( signTxLogger, unsignedRedemptionTx, @@ -237,9 +275,18 @@ func (ra *redemptionAction) execute() error { ra.proposalExpiryBlock-ra.signingTimeoutSafetyMarginBlocks, ) if err != nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("sign transaction step failed: [%v]", err) } + // Record redemption transaction signing duration + if ra.metricsRecorder != nil { + ra.metricsRecorder.RecordDuration("redemption_tx_signing_duration_seconds", time.Since(signingStartTime)) + } + broadcastTxLogger := ra.logger.With( zap.String("step", "broadcastTransaction"), zap.String("redemptionTxHash", redemptionTx.Hash().Hex(bitcoin.ReversedByteOrder)), @@ -252,12 +299,30 @@ func (ra *redemptionAction) execute() error { ra.broadcastCheckDelay, ) if err != nil { + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_failed_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } return fmt.Errorf("broadcast transaction step failed: [%v]", err) } + // Record successful redemption execution + if ra.metricsRecorder != nil { + ra.metricsRecorder.IncrementCounter("redemption_executions_success_total", 1) + ra.metricsRecorder.RecordDuration("redemption_execution_duration_seconds", time.Since(executionStartTime)) + } + return nil } +// setMetricsRecorder sets the metrics recorder for the redemption action. +func (ra *redemptionAction) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + ra.metricsRecorder = recorder +} + // ValidateRedemptionProposal checks the redemption proposal with on-chain // validation rules. func ValidateRedemptionProposal( diff --git a/pkg/tbtc/signing.go b/pkg/tbtc/signing.go index 370e8df583..580a438354 100644 --- a/pkg/tbtc/signing.go +++ b/pkg/tbtc/signing.go @@ -6,6 +6,7 @@ import ( "math/big" "strings" "sync" + "time" "github.com/keep-network/keep-core/pkg/generator" "github.com/keep-network/keep-core/pkg/net" @@ -58,6 +59,13 @@ type signingExecutor struct { // be made by a single signer for the given message. Once the attempts // limit is hit the signer gives up. signingAttemptsLimit uint + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func newSigningExecutor( @@ -147,6 +155,7 @@ func (se *signingExecutor) signBatch( signature, _, endBlock, err := se.sign(ctx, message, signingStartBlock) if err != nil { + // Error metrics are already recorded in sign() method return nil, err } @@ -180,6 +189,12 @@ func (se *signingExecutor) sign( } defer se.lock.Release(1) + startTime := time.Now() + + if se.metricsRecorder != nil { + se.metricsRecorder.IncrementCounter("signing_operations_total", 1) + } + wallet := se.wallet() walletPublicKeyBytes, err := marshalPublicKey(wallet.publicKey) @@ -333,6 +348,10 @@ func (se *signingExecutor) sign( err, ) + // Note: We don't record failure metrics here because the failure + // is only for one signer. The overall signing operation failure + // is recorded in the select statement below when no outcome is received. + return } @@ -386,8 +405,17 @@ func (se *signingExecutor) sign( // signer, that means all signers failed and have not produced a signature. select { case outcome := <-signingOutcomeChan: + if se.metricsRecorder != nil { + se.metricsRecorder.IncrementCounter("signing_success_total", 1) + se.metricsRecorder.RecordDuration("signing_duration_seconds", time.Since(startTime)) + } return outcome.signature, outcome.activityReport, outcome.endBlock, nil default: + if se.metricsRecorder != nil { + se.metricsRecorder.IncrementCounter("signing_failed_total", 1) + se.metricsRecorder.IncrementCounter("signing_timeouts_total", 1) + se.metricsRecorder.RecordDuration("signing_duration_seconds", time.Since(startTime)) + } return nil, nil, 0, fmt.Errorf("all signers failed") } } @@ -397,3 +425,12 @@ func (se *signingExecutor) wallet() wallet { // first signer. return se.signers[0].wallet } + +// setMetricsRecorder sets the metrics recorder for the signing executor. +func (se *signingExecutor) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + se.metricsRecorder = recorder +} diff --git a/pkg/tbtc/tbtc.go b/pkg/tbtc/tbtc.go index 6f6414ba35..ced4d61013 100644 --- a/pkg/tbtc/tbtc.go +++ b/pkg/tbtc/tbtc.go @@ -81,6 +81,7 @@ func Initialize( proposalGenerator CoordinationProposalGenerator, config Config, clientInfo *clientinfo.Registry, + perfMetrics *clientinfo.PerformanceMetrics, ) error { groupParameters := &GroupParameters{ GroupSize: 100, @@ -120,6 +121,14 @@ func Initialize( }, }, ) + + // Use provided performance metrics instance, or create a new one if not provided + // This prevents duplicate metric registrations when perfMetrics is already + // initialized in cmd/start.go for network provider metrics + if perfMetrics == nil { + perfMetrics = clientinfo.NewPerformanceMetrics(clientInfo) + } + node.setPerformanceMetrics(perfMetrics) } err = sortition.MonitorPool( diff --git a/pkg/tbtc/wallet.go b/pkg/tbtc/wallet.go index 461c9ba3a5..b241774863 100644 --- a/pkg/tbtc/wallet.go +++ b/pkg/tbtc/wallet.go @@ -127,6 +127,12 @@ type walletDispatcher struct { // given wallet. The mapping key is the uncompressed public key // (with 04 prefix) of the wallet. actions map[string]WalletActionType + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) + } } func newWalletDispatcher() *walletDispatcher { @@ -135,6 +141,15 @@ func newWalletDispatcher() *walletDispatcher { } } +// setMetricsRecorder sets the metrics recorder for the wallet dispatcher. +func (wd *walletDispatcher) setMetricsRecorder(recorder interface { + IncrementCounter(name string, value float64) + SetGauge(name string, value float64) + RecordDuration(name string, duration time.Duration) +}) { + wd.metricsRecorder = recorder +} + // dispatch sends the given walletAction for execution. If the wallet is // already busy, an errWalletBusy error is returned and the action is ignored. func (wd *walletDispatcher) dispatch(action walletAction) error { @@ -154,16 +169,36 @@ func (wd *walletDispatcher) dispatch(action walletAction) error { key := hex.EncodeToString(walletPublicKeyBytes) if _, ok := wd.actions[key]; ok { + if wd.metricsRecorder != nil { + wd.metricsRecorder.IncrementCounter("wallet_dispatcher_rejected_total", 1) + } return errWalletBusy } wd.actions[key] = action.actionType() + // Update metrics + if wd.metricsRecorder != nil { + wd.actionsMutex.Lock() + activeCount := float64(len(wd.actions)) + wd.actionsMutex.Unlock() + wd.metricsRecorder.SetGauge("wallet_dispatcher_active_actions", activeCount) + wd.metricsRecorder.IncrementCounter("wallet_actions_total", 1) + } + go func() { + startTime := time.Now() defer func() { wd.actionsMutex.Lock() delete(wd.actions, key) + activeCount := float64(len(wd.actions)) wd.actionsMutex.Unlock() + + // Update metrics + if wd.metricsRecorder != nil { + wd.metricsRecorder.SetGauge("wallet_dispatcher_active_actions", activeCount) + wd.metricsRecorder.RecordDuration("wallet_action_duration_seconds", time.Since(startTime)) + } }() walletActionLogger.Infof("starting action execution") @@ -174,9 +209,16 @@ func (wd *walletDispatcher) dispatch(action walletAction) error { "action execution terminated with error: [%v]", err, ) + if wd.metricsRecorder != nil { + wd.metricsRecorder.IncrementCounter("wallet_action_failed_total", 1) + } return } + if wd.metricsRecorder != nil { + wd.metricsRecorder.IncrementCounter("wallet_action_success_total", 1) + } + walletActionLogger.Infof("action execution terminated with success") }() diff --git a/pkg/tbtcpg/redemptions.go b/pkg/tbtcpg/redemptions.go index c9c01a5856..981e9a8eb7 100644 --- a/pkg/tbtcpg/redemptions.go +++ b/pkg/tbtcpg/redemptions.go @@ -18,6 +18,11 @@ import ( type RedemptionTask struct { chain Chain btcChain bitcoin.Chain + + // metricsRecorder is optional and used for recording performance metrics + metricsRecorder interface { + SetGauge(name string, value float64) + } } func NewRedemptionTask( @@ -30,6 +35,13 @@ func NewRedemptionTask( } } +// setMetricsRecorder sets the metrics recorder for the redemption task. +func (rt *RedemptionTask) setMetricsRecorder(recorder interface { + SetGauge(name string, value float64) +}) { + rt.metricsRecorder = recorder +} + func (rt *RedemptionTask) Run(request *tbtc.CoordinationProposalRequest) ( tbtc.CoordinationProposal, bool, @@ -165,6 +177,11 @@ func (rt *RedemptionTask) FindPendingRedemptions( taskLogger.Infof("found [%d] redemption requests", len(pendingRedemptions)) + // Record pending redemption requests count + if rt.metricsRecorder != nil { + rt.metricsRecorder.SetGauge("redemption_pending_requests_count", float64(len(pendingRedemptions))) + } + result := make([]bitcoin.Script, 0) for _, pendingRedemption := range pendingRedemptions { diff --git a/pkg/tbtcpg/tbtcpg.go b/pkg/tbtcpg/tbtcpg.go index 9e12735fd9..14b29f7c84 100644 --- a/pkg/tbtcpg/tbtcpg.go +++ b/pkg/tbtcpg/tbtcpg.go @@ -31,6 +31,18 @@ type ProposalGenerator struct { tasks []ProposalTask } +// SetRedemptionMetricsRecorder sets the metrics recorder for the redemption task. +// This allows recording redemption-specific metrics. +func (pg *ProposalGenerator) SetRedemptionMetricsRecorder(recorder interface { + SetGauge(name string, value float64) +}) { + for _, task := range pg.tasks { + if redemptionTask, ok := task.(*RedemptionTask); ok { + redemptionTask.setMetricsRecorder(recorder) + } + } +} + // NewProposalGenerator returns a new proposal generator. func NewProposalGenerator( chain Chain,