Skip to content
Open
11 changes: 11 additions & 0 deletions greenwave_monitor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ if(BUILD_TESTING)
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
)

# Add parameter-based topic configuration tests (in test/parameters/)
# Automatically discover and add all test_*.py files in the parameters directory
file(GLOB PARAM_TEST_FILES "${CMAKE_SOURCE_DIR}/test/parameters/test_*.py")
foreach(TEST_FILE ${PARAM_TEST_FILES})
get_filename_component(TEST_NAME ${TEST_FILE} NAME_WE)
ament_add_pytest_test(${TEST_NAME} test/parameters/${TEST_NAME}.py
TIMEOUT 120
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
)
endforeach()

# Add gtests
ament_add_gtest(test_message_diagnostics test/test_message_diagnostics.cpp
TIMEOUT 60
Expand Down
19 changes: 10 additions & 9 deletions greenwave_monitor/examples/example.launch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,6 @@
# limitations under the License.

from launch import LaunchDescription
from launch.actions import LogInfo
from launch_ros.actions import Node


Expand Down Expand Up @@ -52,11 +51,13 @@ def generate_launch_description():
name='greenwave_monitor',
output='log',
parameters=[
{'topics': ['/imu_topic', '/image_topic', '/string_topic']}
],
),
LogInfo(
msg='Run `ros2 run r2s_gw r2s_gw` in another terminal to see the demo output '
'with the r2s dashboard.'
),
{
'topics': {
'/imu_topic': {'expected_frequency': 100.0, 'tolerance': 5.0},
'/image_topic': {'expected_frequency': 30.0, 'tolerance': 5.0},
'/string_topic': {'expected_frequency': 1000.0, 'tolerance': 5.0}
},
}
]
)
])
132 changes: 125 additions & 7 deletions greenwave_monitor/greenwave_monitor/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3

# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,22 @@
#
# SPDX-License-Identifier: Apache-2.0

from abc import ABC
import math
import time
from typing import List, Optional, Tuple
import unittest

from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
from greenwave_monitor.ui_adaptor import (
FREQ_SUFFIX,
TOL_SUFFIX,
TOPIC_PARAM_PREFIX,
)
from greenwave_monitor_interfaces.srv import ManageTopic, SetExpectedFrequency
import launch_ros
from rcl_interfaces.msg import Parameter, ParameterType, ParameterValue
from rcl_interfaces.srv import GetParameters, SetParameters
import rclpy
from rclpy.node import Node

Expand All @@ -47,6 +56,75 @@
MONITOR_NODE_NAMESPACE = 'test_namespace'


def make_freq_param(topic: str) -> str:
"""Build frequency parameter name for a topic."""
return f'{TOPIC_PARAM_PREFIX}{topic}{FREQ_SUFFIX}'


def make_tol_param(topic: str) -> str:
"""Build tolerance parameter name for a topic."""
return f'{TOPIC_PARAM_PREFIX}{topic}{TOL_SUFFIX}'


def set_parameter(test_node: Node, param_name: str, value: float,
node_name: str = MONITOR_NODE_NAME,
timeout_sec: float = 10.0) -> bool:
"""Set a parameter on the monitor node using rclpy service client."""
full_node_name = f'/{MONITOR_NODE_NAMESPACE}/{node_name}'
service_name = f'{full_node_name}/set_parameters'

client = test_node.create_client(SetParameters, service_name)
if not client.wait_for_service(timeout_sec=5.0):
return False

param = Parameter()
param.name = param_name
param.value = ParameterValue()
param.value.type = ParameterType.PARAMETER_DOUBLE
param.value.double_value = float(value)

request = SetParameters.Request()
request.parameters = [param]

future = client.call_async(request)
rclpy.spin_until_future_complete(test_node, future, timeout_sec=timeout_sec)

test_node.destroy_client(client)

if future.result() is None:
return False
return all(r.successful for r in future.result().results)


def get_parameter(test_node: Node, param_name: str,
node_name: str = MONITOR_NODE_NAME) -> Tuple[bool, Optional[float]]:
"""Get a parameter from the monitor node using rclpy service client."""
full_node_name = f'/{MONITOR_NODE_NAMESPACE}/{node_name}'
service_name = f'{full_node_name}/get_parameters'

client = test_node.create_client(GetParameters, service_name)
if not client.wait_for_service(timeout_sec=5.0):
return False, None

request = GetParameters.Request()
request.names = [param_name]

future = client.call_async(request)
rclpy.spin_until_future_complete(test_node, future, timeout_sec=5.0)

test_node.destroy_client(client)

if future.result() is None or not future.result().values:
return False, None

param_value = future.result().values[0]
if param_value.type == ParameterType.PARAMETER_DOUBLE:
return True, param_value.double_value
elif param_value.type == ParameterType.PARAMETER_INTEGER:
return True, float(param_value.integer_value)
return False, None


def create_minimal_publisher(
topic: str, frequency_hz: float, message_type: str, id_suffix: str = ''):
"""Create a minimal publisher node with the given parameters."""
Expand All @@ -65,19 +143,33 @@ def create_minimal_publisher(

def create_monitor_node(namespace: str = MONITOR_NODE_NAMESPACE,
node_name: str = MONITOR_NODE_NAME,
topics: List[str] = None):
topics: List[str] = None,
topic_configs: dict = None):
"""Create a greenwave_monitor node for testing."""
if topics is None:
topics = ['/test_topic']
params = {}

# Only add topics param if explicitly provided or no topic_configs
if topics is not None:
if not topics:
topics = ['']
params['topics'] = topics
elif not topic_configs:
params['topics'] = ['/test_topic']

if topic_configs:
for topic, config in topic_configs.items():
if 'expected_frequency' in config:
params[f'{TOPIC_PARAM_PREFIX}{topic}{FREQ_SUFFIX}'] = float(
config['expected_frequency'])
if 'tolerance' in config:
params[f'{TOPIC_PARAM_PREFIX}{topic}{TOL_SUFFIX}'] = float(config['tolerance'])

return launch_ros.actions.Node(
package='greenwave_monitor',
executable='greenwave_monitor',
name=node_name,
namespace=namespace,
parameters=[{
'topics': topics
}],
parameters=[params],
output='screen'
)

Expand Down Expand Up @@ -277,3 +369,29 @@ def create_service_clients(node: Node, namespace: str = MONITOR_NODE_NAMESPACE,
)

return manage_topic_client, set_frequency_client


class RosNodeTestCase(unittest.TestCase, ABC):
"""
Abstract base class for ROS 2 launch tests that need a test node.
Subclasses must define the TEST_NODE_NAME class attribute to specify
the unique name for the test node.
"""

TEST_NODE_NAME: str = None

@classmethod
def setUpClass(cls):
"""Initialize ROS 2 and create test node."""
if cls.TEST_NODE_NAME is None:
raise ValueError(
f'{cls.__name__} must define TEST_NODE_NAME class attribute')
rclpy.init()
cls.test_node = Node(cls.TEST_NODE_NAME, namespace=MONITOR_NODE_NAMESPACE)

@classmethod
def tearDownClass(cls):
"""Clean up ROS 2."""
cls.test_node.destroy_node()
rclpy.shutdown()
Loading