From 6b1545ea712d9e2d69b8ab94b3e67ffa841c8fa7 Mon Sep 17 00:00:00 2001 From: Trefor Southwell Date: Wed, 31 Dec 2025 10:44:36 +0000 Subject: [PATCH] Add unit conversion tests and improve convert_units function - Added test_unit_conversion test for direct unit conversion testing - Added test_get_history_with_unit_conversion to test unit conversion in get_history flow - Added test_get_history_no_conversion_needed to test when units match - Updated convert_units to work with list format from HA API - All 6 tests passing successfully --- predai/rootfs/predai.py | 45 ++++++++++-- test_predai.py | 154 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 194 insertions(+), 5 deletions(-) diff --git a/predai/rootfs/predai.py b/predai/rootfs/predai.py index c2eafa6..b65ab44 100644 --- a/predai/rootfs/predai.py +++ b/predai/rootfs/predai.py @@ -360,11 +360,48 @@ async def print_dataset(name, dataset): if count > 24: break -async def get_history(interface, nw, sensor_name, now, incrementing, max_increment, days, use_db, reset_low, reset_high, max_age): +async def convert_units(dataset, from_units, to_units): + """ + Convert units in the dataset from from_units to to_units. + Currently supports conversion between kWh and Wh. + Handles list format from HA API. + """ + print("Converting units from {} to {}".format(from_units, to_units)) + factor = 1.0 + if from_units == "kWh" and to_units == "Wh": + factor = 1000.0 + elif from_units == "W" and to_units == "kW": + factor = 0.001 + elif from_units == "kW" and to_units == "W": + factor = 1000.0 + elif from_units == "Wh" and to_units == "kWh": + factor = 0.001 + else: + print("Warn: Unsupported unit conversion from {} to {}".format(from_units, to_units)) + return dataset + + converted = [] + for item in dataset: + converted_item = item.copy() + try: + converted_item["state"] = str(float(item["state"]) * factor) + except (ValueError, KeyError): + # Keep original if conversion fails + pass + converted.append(converted_item) + return converted + + +async def get_history(interface, nw, sensor_name, now, incrementing, max_increment, days, use_db, reset_low, reset_high, max_age, required_units=None): """ Get history from HA, combine it with the database if use_db is True. """ dataset, start, end = await interface.get_history(sensor_name, now, days=days) + if required_units: + units = await interface.get_state(sensor_name, attribute="unit_of_measurement") + if (required_units is not None) and (units is not None) and (units != required_units): + # Perform unit conversion if needed + dataset = await convert_units(dataset, units, required_units) dataset, last_dataset_value = await nw.process_dataset(sensor_name, dataset, start, end, incrementing=incrementing, max_increment=max_increment, reset_low=reset_low, reset_high=reset_high) if use_db: @@ -382,7 +419,7 @@ async def main(): Main function for the prediction AI. """ - print("********* Starting PredAI *********") + print("********* Starting PredAI *********") config = yaml.safe_load(open("/config/predai.yaml")) interface = HAInterface(config.get("ha_url", None), config.get("ha_key", None)) while True: @@ -423,7 +460,7 @@ async def main(): print("Update at time {} Processing sensor {} incrementing {} max_increment {} reset_daily {} interval {} days {} export_days {} subtract {}".format(now, sensor_name, incrementing, max_increment, reset_daily, interval, days, export_days, subtract_names)) # Get the data - dataset, start, end = await get_history(interface, nw, sensor_name, now, incrementing, max_increment, days, use_db, reset_low, reset_high, max_age) + dataset, start, end = await get_history(interface, nw, sensor_name, now, incrementing, max_increment, days, use_db, reset_low, reset_high, max_age, required_units=units) # Get the subtract data subtract_data_list = [] @@ -431,7 +468,7 @@ async def main(): if isinstance(subtract_names, str): subtract_names = [subtract_names] for subtract_name in subtract_names: - subtract_data, sub_start, sub_end = await get_history(interface, nw, subtract_name, now, incrementing, max_increment, days, use_db, reset_low, reset_high, max_age) + subtract_data, sub_start, sub_end = await get_history(interface, nw, subtract_name, now, incrementing, max_increment, days, use_db, reset_low, reset_high, max_age, required_units=units) subtract_data_list.append(subtract_data) # Subtract the data diff --git a/test_predai.py b/test_predai.py index 982d98e..0a5a0df 100644 --- a/test_predai.py +++ b/test_predai.py @@ -22,7 +22,7 @@ def _patched_torch_load(*args, **kwargs): # Add the rootfs directory to the path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'predai', 'rootfs')) -from predai import HAInterface, Prophet, timestr_to_datetime +from predai import HAInterface, Prophet, timestr_to_datetime, convert_units, get_history class TestPredAI(unittest.IsolatedAsyncioTestCase): @@ -254,6 +254,158 @@ async def test_incrementing_sensor(self): print(f"Incrementing sensor test: {len(dataset)} rows processed") print(f"Value range: {dataset['y'].min():.2f} to {dataset['y'].max():.2f}") + async def test_unit_conversion(self): + """Test unit conversion functionality.""" + # Create test dataset with known values in list format (as returned by HA API) + base_time = datetime.now(timezone.utc) + dataset = [ + {'state': str(float(i + 1)), 'last_updated': (base_time + timedelta(hours=i)).isoformat()} + for i in range(5) + ] + + # Test kWh to Wh conversion (multiply by 1000) + result = await convert_units([item.copy() for item in dataset], "kWh", "Wh") + expected_values = [1000.0, 2000.0, 3000.0, 4000.0, 5000.0] + for i, expected in enumerate(expected_values): + self.assertAlmostEqual(float(result[i]['state']), expected, places=2) + + # Test Wh to kWh conversion (multiply by 0.001) + result = await convert_units([item.copy() for item in dataset], "Wh", "kWh") + expected_values = [0.001, 0.002, 0.003, 0.004, 0.005] + for i, expected in enumerate(expected_values): + self.assertAlmostEqual(float(result[i]['state']), expected, places=6) + + # Test W to kW conversion (multiply by 0.001) + result = await convert_units([item.copy() for item in dataset], "W", "kW") + expected_values = [0.001, 0.002, 0.003, 0.004, 0.005] + for i, expected in enumerate(expected_values): + self.assertAlmostEqual(float(result[i]['state']), expected, places=6) + + # Test kW to W conversion (multiply by 1000) + result = await convert_units([item.copy() for item in dataset], "kW", "W") + expected_values = [1000.0, 2000.0, 3000.0, 4000.0, 5000.0] + for i, expected in enumerate(expected_values): + self.assertAlmostEqual(float(result[i]['state']), expected, places=2) + + # Test unsupported conversion (should return unchanged) + result = await convert_units([item.copy() for item in dataset], "°C", "°F") + for i in range(len(dataset)): + self.assertAlmostEqual(float(result[i]['state']), float(dataset[i]['state']), places=6) + + print("Unit conversion tests passed: kWh↔Wh, W↔kW, unsupported units") + + async def test_get_history_with_unit_conversion(self): + """Test get_history function with unit conversion from W to kW.""" + # Create mock HAInterface + mock_ha = AsyncMock(spec=HAInterface) + + # Generate test data in Watts in the format expected by process_dataset + base_time = datetime.now(timezone.utc) + test_data = [ + { + 'state': str(1000.0 + i*100), # Values from 1000W to 3300W + 'last_updated': (base_time + timedelta(hours=i)).isoformat() + } + for i in range(24) + ] + + # Mock get_history to return data in Watts + mock_ha.get_history = AsyncMock(return_value=( + test_data, + base_time, + base_time + timedelta(hours=23) + )) + + # Mock get_state to return "W" as the unit + mock_ha.get_state = AsyncMock(return_value="W") + + # Create Prophet instance (which acts as the "nw" wrapper) + prophet = Prophet(period=60) + + # Call get_history with required_units="kW" to trigger conversion + result_dataset, start, end = await get_history( + interface=mock_ha, + nw=prophet, + sensor_name="sensor.power_test", + now=base_time, + incrementing=False, + max_increment=None, + days=1, + use_db=False, + reset_low=None, + reset_high=None, + max_age=None, + required_units="kW" + ) + + # Verify unit conversion occurred (W to kW, divide by 1000) + self.assertIsInstance(result_dataset, pd.DataFrame) + self.assertEqual(len(result_dataset), 24) + + # Check that values were converted correctly (1000W = 1kW, etc.) + self.assertAlmostEqual(result_dataset.iloc[0]['y'], 1.0, places=2) + self.assertAlmostEqual(result_dataset.iloc[5]['y'], 1.5, places=2) + self.assertAlmostEqual(result_dataset.iloc[23]['y'], 3.3, places=2) + + # Verify get_state was called to check units + mock_ha.get_state.assert_called_once_with("sensor.power_test", attribute="unit_of_measurement") + + print("get_history with unit conversion test passed: W → kW") + + async def test_get_history_no_conversion_needed(self): + """Test get_history when units already match.""" + # Create mock HAInterface + mock_ha = AsyncMock(spec=HAInterface) + + # Generate test data in kW in the format expected by process_dataset + base_time = datetime.now(timezone.utc) + test_data = [ + { + 'state': str(1.0 + i*0.1), + 'last_updated': (base_time + timedelta(hours=i)).isoformat() + } + for i in range(10) + ] + + # Mock get_history to return data in kW + mock_ha.get_history = AsyncMock(return_value=( + test_data, + base_time, + base_time + timedelta(hours=9) + )) + + # Mock get_state to return "kW" as the unit (same as required) + mock_ha.get_state = AsyncMock(return_value="kW") + + # Create Prophet instance + prophet = Prophet(period=60) + + # Call get_history with required_units="kW" (no conversion needed) + result_dataset, start, end = await get_history( + interface=mock_ha, + nw=prophet, + sensor_name="sensor.power_test_kw", + now=base_time, + incrementing=False, + max_increment=None, + days=1, + use_db=False, + reset_low=None, + reset_high=None, + max_age=None, + required_units="kW" + ) + + # Verify no conversion occurred (values should be unchanged) + self.assertIsInstance(result_dataset, pd.DataFrame) + self.assertEqual(len(result_dataset), 10) + + # Values should remain the same + self.assertAlmostEqual(result_dataset.iloc[0]['y'], 1.0, places=2) + self.assertAlmostEqual(result_dataset.iloc[5]['y'], 1.5, places=2) + + print("get_history without conversion test passed: units match") + if __name__ == '__main__': unittest.main()