From 3eade2e352ee69569276dd00906b1fc6720c40f2 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Thu, 4 Dec 2025 10:39:28 +0530 Subject: [PATCH 1/6] fix CognitePusher to handle metrics using upload queue --- cognite/extractorutils/metrics.py | 98 ++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/cognite/extractorutils/metrics.py b/cognite/extractorutils/metrics.py index 217f6e1e..2313a11c 100644 --- a/cognite/extractorutils/metrics.py +++ b/cognite/extractorutils/metrics.py @@ -53,14 +53,12 @@ def __init__(self): from prometheus_client.exposition import basic_auth_handler, delete_from_gateway, pushadd_to_gateway from cognite.client import CogniteClient -from cognite.client.data_classes import Asset, Datapoints, DatapointsArray, TimeSeries -from cognite.client.data_classes.data_modeling import NodeId +from cognite.client.data_classes import Asset, TimeSeries from cognite.client.exceptions import CogniteDuplicatedError from cognite.extractorutils.threading import CancellationToken +from cognite.extractorutils.uploader.time_series import DataPointList, TimeSeriesUploadQueue from cognite.extractorutils.util import EitherId -from .util import ensure_time_series - _metrics_singularities = {} @@ -359,70 +357,104 @@ def __init__( self.asset = asset self.external_id_prefix = external_id_prefix self.data_set = data_set + self._asset_id: int | None = None + self._data_set_id: int | None = None self._init_cdf() + self.upload_queue = TimeSeriesUploadQueue( + cdf_client=cdf_client, + create_missing=self._create_missing_timeseries_factory, + data_set_id=self._data_set_id, + cancellation_token=cancellation_token, + ) + self._cdf_project = cdf_client.config.project def _init_cdf(self) -> None: """ - Initialize the CDF tenant with the necessary time series and asset. - """ - time_series: list[TimeSeries] = [] + Initialize the CDF tenant with the necessary asset and dataset. + Timeseries are created automatically by TimeSeriesUploadQueue when datapoints are pushed. + """ if self.asset is not None: - # Ensure that asset exist, and retrieve internal ID + # Ensure that asset exists, and retrieve internal ID asset: Asset | None try: asset = self.cdf_client.assets.create(self.asset) except CogniteDuplicatedError: asset = self.cdf_client.assets.retrieve(external_id=self.asset.external_id) - asset_id = asset.id if asset is not None else None - - else: - asset_id = None + self._asset_id = asset.id if asset is not None else None - data_set_id = None if self.data_set: dataset = self.cdf_client.data_sets.retrieve( id=self.data_set.internal_id, external_id=self.data_set.external_id ) if dataset: - data_set_id = dataset.id + self._data_set_id = dataset.id - for metric in REGISTRY.collect(): - if type(metric) is Metric and metric.type in ["gauge", "counter"]: - external_id = self.external_id_prefix + metric.name + def _create_missing_timeseries_factory(self, external_id: str, datapoints: DataPointList) -> TimeSeries: + """ + Factory function to create missing timeseries. - time_series.append( - TimeSeries( - external_id=external_id, - name=metric.name, - legacy_name=external_id, - description=metric.documentation, - asset_id=asset_id, - data_set_id=data_set_id, - ) - ) + Args: + external_id: External ID of the timeseries to create + datapoints: List of datapoints that triggered the creation + + Returns: + A TimeSeries object + """ + metric_name = external_id[len(self.external_id_prefix) :] - ensure_time_series(self.cdf_client, time_series) + metric_description = "" + for metric in REGISTRY.collect(): + if isinstance(metric, Metric) and metric.name == metric_name: + metric_description = metric.documentation + break + + is_string = ( + isinstance(datapoints[0].get("value"), str) + if isinstance(datapoints[0], dict) + else isinstance(datapoints[0][1], str) + ) + + return TimeSeries( + external_id=external_id, + name=metric_name, + legacy_name=external_id, + description=metric_description, + asset_id=self._asset_id, + data_set_id=self._data_set_id, + is_string=is_string, + ) def _push_to_server(self) -> None: """ - Create datapoints an push them to their respective time series. + Create datapoints and push them to their respective time series using TimeSeriesUploadQueue. + + The queue will automatically create missing timeseries for late-registered metrics. """ timestamp = int(arrow.get().float_timestamp * 1000) - datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray | NodeId]] = [] - for metric in REGISTRY.collect(): if isinstance(metric, Metric) and metric.type in ["gauge", "counter"]: if len(metric.samples) == 0: continue external_id = self.external_id_prefix + metric.name - datapoints.append({"externalId": external_id, "datapoints": [(timestamp, metric.samples[0].value)]}) - self.cdf_client.time_series.data.insert_multiple(datapoints) + self.upload_queue.add_to_upload_queue( + external_id=external_id, datapoints=[(timestamp, metric.samples[0].value)] + ) + + self.upload_queue.upload() self.logger.debug("Pushed metrics to CDF tenant '%s'", self._cdf_project) + + def stop(self) -> None: + """ + Stop the push loop and ensure all metrics are uploaded. + """ + self._push_to_server() + self.upload_queue.stop() + self.cancellation_token.cancel() From 0a131e71415f91e047a708fe897af8b346ea44aa Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Thu, 4 Dec 2025 10:39:46 +0530 Subject: [PATCH 2/6] add/modify testcases --- .../test_metrics_integration.py | 226 ++++++++++++++++++ tests/tests_unit/test_metrics.py | 40 +--- 2 files changed, 236 insertions(+), 30 deletions(-) create mode 100644 tests/tests_integration/test_metrics_integration.py diff --git a/tests/tests_integration/test_metrics_integration.py b/tests/tests_integration/test_metrics_integration.py new file mode 100644 index 00000000..9ca50ca2 --- /dev/null +++ b/tests/tests_integration/test_metrics_integration.py @@ -0,0 +1,226 @@ +""" +Integration tests for CognitePusher with late-registered metrics. + +This test verifies that CognitePusher correctly handles metrics that are registered +after initialization (like python_gc_* and python_info metrics from Prometheus). +""" + +import random +import time +from collections.abc import Generator + +import pytest +from prometheus_client import Counter, Gauge +from prometheus_client.core import REGISTRY + +from cognite.client import CogniteClient +from cognite.extractorutils.metrics import CognitePusher + + +@pytest.fixture +def test_prefix() -> str: + """Generate a unique prefix for this test run to avoid conflicts.""" + test_id = random.randint(0, 2**31) + return f"integration_test_{test_id}_" + + +@pytest.fixture +def cognite_pusher_test( + set_client: CogniteClient, test_prefix: str +) -> Generator[tuple[CogniteClient, str, list[str]], None, None]: + """ + Fixture that sets up and tears down a CognitePusher test. + + Yields: + Tuple of (client, test_prefix, list of created timeseries external_ids) + """ + client = set_client + created_external_ids: list[str] = [] + + yield client, test_prefix, created_external_ids + + if created_external_ids: + try: + client.time_series.delete(external_id=created_external_ids, ignore_unknown_ids=True) + except Exception as e: + print(f"Warning: Failed to cleanup timeseries: {e}") + + +def test_cognite_pusher_with_late_registered_metrics(cognite_pusher_test: tuple[CogniteClient, str, list[str]]) -> None: + """ + Test that CognitePusher handles both early and late-registered metrics. + + This simulates the real-world scenario where: + 1. Some metrics (like extractor-specific metrics) are registered before CognitePusher init + 2. Other metrics (like python_gc_*, python_info) are registered after initialization + 3. All metrics should be uploaded correctly during push + """ + client, test_prefix, created_external_ids = cognite_pusher_test + + early_gauge_name = f"early_gauge_{random.randint(0, 2**31)}" + early_gauge = Gauge(early_gauge_name, "A metric registered before CognitePusher init") + early_gauge.set(42.0) + + early_external_id = test_prefix + early_gauge_name + created_external_ids.append(early_external_id) + + pusher = CognitePusher( + cdf_client=client, + external_id_prefix=test_prefix, + push_interval=60, + ) + + late_gauge_name = f"late_gauge_{random.randint(0, 2**31)}" + late_gauge = Gauge(late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)") + late_gauge.set(99.0) + + late_counter_name = f"late_counter_{random.randint(0, 2**31)}" + late_counter = Counter(late_counter_name, "A counter registered AFTER CognitePusher init (like python_info)") + late_counter.inc(5) + + late_gauge_external_id = test_prefix + late_gauge_name + late_counter_external_id = test_prefix + late_counter_name + created_external_ids.append(late_gauge_external_id) + created_external_ids.append(late_counter_external_id) + + # This should create timeseries for ALL metrics (early + late) + pusher._push_to_server() + + time.sleep(2) + + early_ts = client.time_series.retrieve(external_id=early_external_id) + assert early_ts is not None, f"Early metric timeseries {early_external_id} was not created" + assert early_ts.name == early_gauge_name + assert early_ts.description == "A metric registered before CognitePusher init" + + late_gauge_ts = client.time_series.retrieve(external_id=late_gauge_external_id) + assert late_gauge_ts is not None, f"Late gauge timeseries {late_gauge_external_id} was not created" + assert late_gauge_ts.name == late_gauge_name + assert late_gauge_ts.description == "A metric registered AFTER CognitePusher init (like python_gc)" + + late_counter_ts = client.time_series.retrieve(external_id=late_counter_external_id) + assert late_counter_ts is not None, f"Late counter timeseries {late_counter_external_id} was not created" + assert late_counter_ts.name == late_counter_name + + early_datapoints = client.time_series.data.retrieve( + external_id=early_external_id, start="1h-ago", end="now", limit=10 + ) + assert len(early_datapoints) > 0, "No datapoints for early metric" + assert early_datapoints.value[0] == pytest.approx(42.0) + + late_gauge_datapoints = client.time_series.data.retrieve( + external_id=late_gauge_external_id, start="1h-ago", end="now", limit=10 + ) + assert len(late_gauge_datapoints) > 0, "No datapoints for late gauge metric" + assert late_gauge_datapoints.value[0] == pytest.approx(99.0) + + late_counter_datapoints = client.time_series.data.retrieve( + external_id=late_counter_external_id, start="1h-ago", end="now", limit=10 + ) + assert len(late_counter_datapoints) > 0, "No datapoints for late counter metric" + assert late_counter_datapoints.value[0] == pytest.approx(5.0) + + pusher.stop() + + REGISTRY.unregister(early_gauge) + REGISTRY.unregister(late_gauge) + REGISTRY.unregister(late_counter) + + +def test_cognite_pusher_stop_uploads_late_metrics(cognite_pusher_test: tuple[CogniteClient, str, list[str]]) -> None: + """ + Test that stop() correctly uploads all metrics including late-registered ones. + + This is the scenario where: + 1. CognitePusher is initialized + 2. Metrics are registered after + 3. stop() is called during shutdown + 4. All metrics (including late ones) should be uploaded + """ + client, test_prefix, created_external_ids = cognite_pusher_test + + pusher = CognitePusher( + cdf_client=client, + external_id_prefix=test_prefix, + push_interval=60, + ) + + late_metric_name = f"shutdown_metric_{random.randint(0, 2**31)}" + late_metric = Gauge(late_metric_name, "A metric registered after init, uploaded during shutdown") + late_metric.set(123.0) + + late_external_id = test_prefix + late_metric_name + created_external_ids.append(late_external_id) + + pusher.stop() + + time.sleep(2) + + late_ts = client.time_series.retrieve(external_id=late_external_id) + assert late_ts is not None, f"Late metric {late_external_id} was not created during shutdown" + + late_datapoints = client.time_series.data.retrieve( + external_id=late_external_id, start="1h-ago", end="now", limit=10 + ) + assert len(late_datapoints) > 0, "No datapoints for late metric after shutdown" + assert late_datapoints.value[0] == pytest.approx(123.0) + + REGISTRY.unregister(late_metric) + + +def test_cognite_pusher_multiple_pushes_with_late_metrics( + cognite_pusher_test: tuple[CogniteClient, str, list[str]], +) -> None: + """ + Test that multiple pushes work correctly with late-registered metrics. + + Scenario: + 1. Push with some metrics + 2. Register new metrics + 3. Push again - new metrics should be created and uploaded + """ + client, test_prefix, created_external_ids = cognite_pusher_test + + initial_metric_name = f"initial_{random.randint(0, 2**31)}" + initial_metric = Gauge(initial_metric_name, "Initial metric") + initial_metric.set(10.0) + + initial_external_id = test_prefix + initial_metric_name + created_external_ids.append(initial_external_id) + + pusher = CognitePusher( + cdf_client=client, + external_id_prefix=test_prefix, + push_interval=60, + ) + + pusher._push_to_server() + time.sleep(1) + + initial_ts = client.time_series.retrieve(external_id=initial_external_id) + assert initial_ts is not None + + late_metric_name = f"later_{random.randint(0, 2**31)}" + late_metric = Gauge(late_metric_name, "Late metric added between pushes") + late_metric.set(20.0) + + late_external_id = test_prefix + late_metric_name + created_external_ids.append(late_external_id) + + initial_metric.set(11.0) + pusher._push_to_server() + time.sleep(2) + + late_ts = client.time_series.retrieve(external_id=late_external_id) + assert late_ts is not None, "Late metric was not created on second push" + + late_datapoints = client.time_series.data.retrieve( + external_id=late_external_id, start="1h-ago", end="now", limit=10 + ) + assert len(late_datapoints) > 0 + assert late_datapoints.value[0] == pytest.approx(20.0) + + pusher.stop() + + REGISTRY.unregister(initial_metric) + REGISTRY.unregister(late_metric) diff --git a/tests/tests_unit/test_metrics.py b/tests/tests_unit/test_metrics.py index 362d188e..e962d416 100644 --- a/tests/tests_unit/test_metrics.py +++ b/tests/tests_unit/test_metrics.py @@ -21,8 +21,8 @@ from prometheus_client import Gauge from cognite.client import CogniteClient -from cognite.client.data_classes import Asset, TimeSeries -from cognite.client.exceptions import CogniteDuplicatedError, CogniteNotFoundError +from cognite.client.data_classes import Asset +from cognite.client.exceptions import CogniteDuplicatedError from cognite.extractorutils import metrics from cognite.extractorutils.metrics import CognitePusher, safe_get @@ -121,9 +121,9 @@ def test_clear(altered_metrics: ModuleType) -> None: # CognitePusher test @patch("cognite.client.CogniteClient") def test_init_empty_cdf(MockCogniteClient: Mock) -> None: + """Test that initialization sets up asset but doesn't create timeseries (created on-demand).""" init_gauge() client = MockCogniteClient() - client.time_series.retrieve_multiple = Mock(side_effect=CogniteNotFoundError([{"externalId": "pre_gauge"}])) return_asset = Asset(id=123, external_id="asset", name="asset") new_asset = Asset(external_id="asset", name="asset") @@ -132,26 +132,19 @@ def test_init_empty_cdf(MockCogniteClient: Mock) -> None: pusher = CognitePusher(client, external_id_prefix="pre_", asset=new_asset, push_interval=1) - # Assert time series created - # Hacky assert_called_once_with as the TimeSeries object is not the same obj, just equal content - client.time_series.create.assert_called_once() - print(client.time_series.create.call_args_list) - assert ( - client.time_series.create.call_args_list[0][0][0][0].dump() - == TimeSeries( - external_id="pre_gauge", name="gauge", legacy_name="pre_gauge", description="Test gauge", asset_id=123 - ).dump() - ) - # Assert asset created client.assets.create.assert_called_once_with(new_asset) + # Verify that upload_queue exists + assert hasattr(pusher, "upload_queue") + assert pusher._asset_id == 123 + @patch("cognite.client.CogniteClient") def test_init_existing_asset(MockCogniteClient: Mock) -> None: + """Test that initialization retrieves existing asset.""" init_gauge() client = MockCogniteClient() - client.time_series.retrieve_multiple = Mock(side_effect=CogniteNotFoundError([{"externalId": "pre_gauge"}])) return_asset = Asset(id=123, external_id="assetid", name="asset") new_asset = Asset(external_id="assetid", name="asset") @@ -161,23 +154,10 @@ def test_init_existing_asset(MockCogniteClient: Mock) -> None: pusher = CognitePusher(client, external_id_prefix="pre_", asset=new_asset, push_interval=1) - # Assert time series created - # Hacky assert_called_once_with as the TimeSeries object is not the same obj, just equal content - client.time_series.create.assert_called_once() - assert ( - client.time_series.create.call_args_list[0][0][0][0].dump() - == TimeSeries( - external_id="pre_gauge", - name="gauge", - legacy_name="pre_gauge", - description="Test gauge", - asset_id=123, - ).dump() - ) - - # Assert asset created + # Assert asset retrieved client.assets.create.assert_called_once_with(new_asset) client.assets.retrieve.assert_called_once_with(external_id="assetid") + assert pusher._asset_id == 123 @patch("cognite.client.CogniteClient") From ad5bbdb655aa1ebdfe67c12f6159a47c7e1ea000 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Thu, 4 Dec 2025 11:01:32 +0530 Subject: [PATCH 3/6] add cleanup for registry --- .../test_metrics_integration.py | 66 +++++++++++++------ 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/tests/tests_integration/test_metrics_integration.py b/tests/tests_integration/test_metrics_integration.py index 9ca50ca2..a7a91733 100644 --- a/tests/tests_integration/test_metrics_integration.py +++ b/tests/tests_integration/test_metrics_integration.py @@ -5,9 +5,11 @@ after initialization (like python_gc_* and python_info metrics from Prometheus). """ +import logging import random import time -from collections.abc import Generator +from collections.abc import Callable, Generator +from typing import Any import pytest from prometheus_client import Counter, Gauge @@ -16,6 +18,8 @@ from cognite.client import CogniteClient from cognite.extractorutils.metrics import CognitePusher +logger = logging.getLogger(__name__) + @pytest.fixture def test_prefix() -> str: @@ -24,6 +28,28 @@ def test_prefix() -> str: return f"integration_test_{test_id}_" +@pytest.fixture +def metrics_registry() -> Generator[Callable[[Any], Any], None, None]: + """ + Fixture that tracks and cleans up Prometheus metrics. + + Ensures metrics are unregistered even if the test fails. + """ + metrics_to_unregister: list[Any] = [] + + def _register(metric: Any) -> Any: + metrics_to_unregister.append(metric) + return metric + + yield _register + + for metric in metrics_to_unregister: + try: + REGISTRY.unregister(metric) + except KeyError: + logger.debug("Metric %s was already unregistered", metric) + + @pytest.fixture def cognite_pusher_test( set_client: CogniteClient, test_prefix: str @@ -43,10 +69,13 @@ def cognite_pusher_test( try: client.time_series.delete(external_id=created_external_ids, ignore_unknown_ids=True) except Exception as e: - print(f"Warning: Failed to cleanup timeseries: {e}") + logger.warning("Failed to cleanup timeseries: %s", e) -def test_cognite_pusher_with_late_registered_metrics(cognite_pusher_test: tuple[CogniteClient, str, list[str]]) -> None: +def test_cognite_pusher_with_late_registered_metrics( + cognite_pusher_test: tuple[CogniteClient, str, list[str]], + metrics_registry: Callable[[Any], Any], +) -> None: """ Test that CognitePusher handles both early and late-registered metrics. @@ -58,7 +87,7 @@ def test_cognite_pusher_with_late_registered_metrics(cognite_pusher_test: tuple[ client, test_prefix, created_external_ids = cognite_pusher_test early_gauge_name = f"early_gauge_{random.randint(0, 2**31)}" - early_gauge = Gauge(early_gauge_name, "A metric registered before CognitePusher init") + early_gauge = metrics_registry(Gauge(early_gauge_name, "A metric registered before CognitePusher init")) early_gauge.set(42.0) early_external_id = test_prefix + early_gauge_name @@ -71,11 +100,15 @@ def test_cognite_pusher_with_late_registered_metrics(cognite_pusher_test: tuple[ ) late_gauge_name = f"late_gauge_{random.randint(0, 2**31)}" - late_gauge = Gauge(late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)") + late_gauge = metrics_registry( + Gauge(late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)") + ) late_gauge.set(99.0) late_counter_name = f"late_counter_{random.randint(0, 2**31)}" - late_counter = Counter(late_counter_name, "A counter registered AFTER CognitePusher init (like python_info)") + late_counter = metrics_registry( + Counter(late_counter_name, "A counter registered AFTER CognitePusher init (like python_info)") + ) late_counter.inc(5) late_gauge_external_id = test_prefix + late_gauge_name @@ -122,12 +155,11 @@ def test_cognite_pusher_with_late_registered_metrics(cognite_pusher_test: tuple[ pusher.stop() - REGISTRY.unregister(early_gauge) - REGISTRY.unregister(late_gauge) - REGISTRY.unregister(late_counter) - -def test_cognite_pusher_stop_uploads_late_metrics(cognite_pusher_test: tuple[CogniteClient, str, list[str]]) -> None: +def test_cognite_pusher_stop_uploads_late_metrics( + cognite_pusher_test: tuple[CogniteClient, str, list[str]], + metrics_registry: Callable[[Any], Any], +) -> None: """ Test that stop() correctly uploads all metrics including late-registered ones. @@ -146,7 +178,7 @@ def test_cognite_pusher_stop_uploads_late_metrics(cognite_pusher_test: tuple[Cog ) late_metric_name = f"shutdown_metric_{random.randint(0, 2**31)}" - late_metric = Gauge(late_metric_name, "A metric registered after init, uploaded during shutdown") + late_metric = metrics_registry(Gauge(late_metric_name, "A metric registered after init, uploaded during shutdown")) late_metric.set(123.0) late_external_id = test_prefix + late_metric_name @@ -165,11 +197,10 @@ def test_cognite_pusher_stop_uploads_late_metrics(cognite_pusher_test: tuple[Cog assert len(late_datapoints) > 0, "No datapoints for late metric after shutdown" assert late_datapoints.value[0] == pytest.approx(123.0) - REGISTRY.unregister(late_metric) - def test_cognite_pusher_multiple_pushes_with_late_metrics( cognite_pusher_test: tuple[CogniteClient, str, list[str]], + metrics_registry: Callable[[Any], Any], ) -> None: """ Test that multiple pushes work correctly with late-registered metrics. @@ -182,7 +213,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( client, test_prefix, created_external_ids = cognite_pusher_test initial_metric_name = f"initial_{random.randint(0, 2**31)}" - initial_metric = Gauge(initial_metric_name, "Initial metric") + initial_metric = metrics_registry(Gauge(initial_metric_name, "Initial metric")) initial_metric.set(10.0) initial_external_id = test_prefix + initial_metric_name @@ -201,7 +232,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( assert initial_ts is not None late_metric_name = f"later_{random.randint(0, 2**31)}" - late_metric = Gauge(late_metric_name, "Late metric added between pushes") + late_metric = metrics_registry(Gauge(late_metric_name, "Late metric added between pushes")) late_metric.set(20.0) late_external_id = test_prefix + late_metric_name @@ -221,6 +252,3 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( assert late_datapoints.value[0] == pytest.approx(20.0) pusher.stop() - - REGISTRY.unregister(initial_metric) - REGISTRY.unregister(late_metric) From 0793b41af5485c40471e32071cc181f1af2aead7 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Thu, 4 Dec 2025 13:12:28 +0530 Subject: [PATCH 4/6] add helper assertion methods and remove check for upload_queue attribute --- .../test_metrics_integration.py | 104 ++++++++++-------- tests/tests_unit/test_metrics.py | 5 +- 2 files changed, 59 insertions(+), 50 deletions(-) diff --git a/tests/tests_integration/test_metrics_integration.py b/tests/tests_integration/test_metrics_integration.py index a7a91733..147861c7 100644 --- a/tests/tests_integration/test_metrics_integration.py +++ b/tests/tests_integration/test_metrics_integration.py @@ -21,6 +21,51 @@ logger = logging.getLogger(__name__) +def assert_timeseries_exists( + client: CogniteClient, + external_id: str, + expected_name: str | None = None, + expected_description: str | None = None, +) -> None: + """ + Assert that a timeseries exists in CDF with the expected properties. + + Args: + client: CogniteClient instance + external_id: External ID of the timeseries + expected_name: Expected name of the timeseries (optional) + expected_description: Expected description of the timeseries (optional) + """ + ts = client.time_series.retrieve(external_id=external_id) + assert ts is not None, f"Timeseries {external_id} was not created" + if expected_name is not None: + assert ts.name == expected_name, f"Expected name '{expected_name}', got '{ts.name}'" + if expected_description is not None: + assert ts.description == expected_description, ( + f"Expected description '{expected_description}', got '{ts.description}'" + ) + + +def assert_datapoint_value( + client: CogniteClient, + external_id: str, + expected_value: float, +) -> None: + """ + Assert that a timeseries has datapoints with the expected value. + + Args: + client: CogniteClient instance + external_id: External ID of the timeseries + expected_value: Expected value of the first datapoint + """ + datapoints = client.time_series.data.retrieve(external_id=external_id, start="1h-ago", end="now", limit=10) + assert len(datapoints) > 0, f"No datapoints found for timeseries {external_id}" + assert datapoints.value[0] == pytest.approx(expected_value), ( + f"Expected value {expected_value}, got {datapoints.value[0]}" + ) + + @pytest.fixture def test_prefix() -> str: """Generate a unique prefix for this test run to avoid conflicts.""" @@ -121,37 +166,17 @@ def test_cognite_pusher_with_late_registered_metrics( time.sleep(2) - early_ts = client.time_series.retrieve(external_id=early_external_id) - assert early_ts is not None, f"Early metric timeseries {early_external_id} was not created" - assert early_ts.name == early_gauge_name - assert early_ts.description == "A metric registered before CognitePusher init" - - late_gauge_ts = client.time_series.retrieve(external_id=late_gauge_external_id) - assert late_gauge_ts is not None, f"Late gauge timeseries {late_gauge_external_id} was not created" - assert late_gauge_ts.name == late_gauge_name - assert late_gauge_ts.description == "A metric registered AFTER CognitePusher init (like python_gc)" - - late_counter_ts = client.time_series.retrieve(external_id=late_counter_external_id) - assert late_counter_ts is not None, f"Late counter timeseries {late_counter_external_id} was not created" - assert late_counter_ts.name == late_counter_name - - early_datapoints = client.time_series.data.retrieve( - external_id=early_external_id, start="1h-ago", end="now", limit=10 + assert_timeseries_exists( + client, early_external_id, early_gauge_name, "A metric registered before CognitePusher init" ) - assert len(early_datapoints) > 0, "No datapoints for early metric" - assert early_datapoints.value[0] == pytest.approx(42.0) - - late_gauge_datapoints = client.time_series.data.retrieve( - external_id=late_gauge_external_id, start="1h-ago", end="now", limit=10 + assert_timeseries_exists( + client, late_gauge_external_id, late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)" ) - assert len(late_gauge_datapoints) > 0, "No datapoints for late gauge metric" - assert late_gauge_datapoints.value[0] == pytest.approx(99.0) + assert_timeseries_exists(client, late_counter_external_id, late_counter_name) - late_counter_datapoints = client.time_series.data.retrieve( - external_id=late_counter_external_id, start="1h-ago", end="now", limit=10 - ) - assert len(late_counter_datapoints) > 0, "No datapoints for late counter metric" - assert late_counter_datapoints.value[0] == pytest.approx(5.0) + assert_datapoint_value(client, early_external_id, 42.0) + assert_datapoint_value(client, late_gauge_external_id, 99.0) + assert_datapoint_value(client, late_counter_external_id, 5.0) pusher.stop() @@ -188,14 +213,8 @@ def test_cognite_pusher_stop_uploads_late_metrics( time.sleep(2) - late_ts = client.time_series.retrieve(external_id=late_external_id) - assert late_ts is not None, f"Late metric {late_external_id} was not created during shutdown" - - late_datapoints = client.time_series.data.retrieve( - external_id=late_external_id, start="1h-ago", end="now", limit=10 - ) - assert len(late_datapoints) > 0, "No datapoints for late metric after shutdown" - assert late_datapoints.value[0] == pytest.approx(123.0) + assert_timeseries_exists(client, late_external_id) + assert_datapoint_value(client, late_external_id, 123.0) def test_cognite_pusher_multiple_pushes_with_late_metrics( @@ -228,8 +247,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( pusher._push_to_server() time.sleep(1) - initial_ts = client.time_series.retrieve(external_id=initial_external_id) - assert initial_ts is not None + assert_timeseries_exists(client, initial_external_id) late_metric_name = f"later_{random.randint(0, 2**31)}" late_metric = metrics_registry(Gauge(late_metric_name, "Late metric added between pushes")) @@ -242,13 +260,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( pusher._push_to_server() time.sleep(2) - late_ts = client.time_series.retrieve(external_id=late_external_id) - assert late_ts is not None, "Late metric was not created on second push" - - late_datapoints = client.time_series.data.retrieve( - external_id=late_external_id, start="1h-ago", end="now", limit=10 - ) - assert len(late_datapoints) > 0 - assert late_datapoints.value[0] == pytest.approx(20.0) + assert_timeseries_exists(client, late_external_id) + assert_datapoint_value(client, late_external_id, 20.0) pusher.stop() diff --git a/tests/tests_unit/test_metrics.py b/tests/tests_unit/test_metrics.py index e962d416..01b34ec5 100644 --- a/tests/tests_unit/test_metrics.py +++ b/tests/tests_unit/test_metrics.py @@ -132,11 +132,8 @@ def test_init_empty_cdf(MockCogniteClient: Mock) -> None: pusher = CognitePusher(client, external_id_prefix="pre_", asset=new_asset, push_interval=1) - # Assert asset created + # Assert asset created and asset_id was set client.assets.create.assert_called_once_with(new_asset) - - # Verify that upload_queue exists - assert hasattr(pusher, "upload_queue") assert pusher._asset_id == 123 From 17110f829632ff8c4b4bf51f31a137198168c952 Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Fri, 5 Dec 2025 18:19:27 +0530 Subject: [PATCH 5/6] fix review comments --- .../test_metrics_integration.py | 46 +++++++++++++++++-- tests/tests_unit/test_metrics.py | 36 ++++++++++++++- 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/tests/tests_integration/test_metrics_integration.py b/tests/tests_integration/test_metrics_integration.py index 147861c7..5a0ddf00 100644 --- a/tests/tests_integration/test_metrics_integration.py +++ b/tests/tests_integration/test_metrics_integration.py @@ -16,11 +16,47 @@ from prometheus_client.core import REGISTRY from cognite.client import CogniteClient +from cognite.client.exceptions import CogniteNotFoundError from cognite.extractorutils.metrics import CognitePusher logger = logging.getLogger(__name__) +def poll_for_condition(condition: Callable[[], bool], timeout: int = 10, interval: float = 0.5) -> None: + """ + Poll a condition function until it returns True or timeout is reached. + + Args: + condition: A callable that returns True when the condition is met + timeout: Maximum time to wait in seconds + interval: Time to wait between checks in seconds + """ + start_time = time.time() + while time.time() - start_time < timeout: + if condition(): + return + time.sleep(interval) + pytest.fail(f"Condition not met within {timeout} seconds.") + + +def timeseries_exist(client: CogniteClient, external_ids: list[str]) -> bool: + """ + Check if all specified timeseries exist in CDF. + + Args: + client: CogniteClient instance + external_ids: List of external IDs to check + + Returns: + True if all timeseries exist, False otherwise + """ + try: + client.time_series.retrieve_multiple(external_ids=external_ids, ignore_unknown_ids=False) + return True + except CogniteNotFoundError: + return False + + def assert_timeseries_exists( client: CogniteClient, external_id: str, @@ -164,7 +200,9 @@ def test_cognite_pusher_with_late_registered_metrics( # This should create timeseries for ALL metrics (early + late) pusher._push_to_server() - time.sleep(2) + poll_for_condition( + lambda: timeseries_exist(client, [early_external_id, late_gauge_external_id, late_counter_external_id]) + ) assert_timeseries_exists( client, early_external_id, early_gauge_name, "A metric registered before CognitePusher init" @@ -211,7 +249,7 @@ def test_cognite_pusher_stop_uploads_late_metrics( pusher.stop() - time.sleep(2) + poll_for_condition(lambda: timeseries_exist(client, [late_external_id])) assert_timeseries_exists(client, late_external_id) assert_datapoint_value(client, late_external_id, 123.0) @@ -245,7 +283,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( ) pusher._push_to_server() - time.sleep(1) + poll_for_condition(lambda: timeseries_exist(client, [initial_external_id])) assert_timeseries_exists(client, initial_external_id) @@ -258,7 +296,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( initial_metric.set(11.0) pusher._push_to_server() - time.sleep(2) + poll_for_condition(lambda: timeseries_exist(client, [late_external_id])) assert_timeseries_exists(client, late_external_id) assert_datapoint_value(client, late_external_id, 20.0) diff --git a/tests/tests_unit/test_metrics.py b/tests/tests_unit/test_metrics.py index 01b34ec5..d94439df 100644 --- a/tests/tests_unit/test_metrics.py +++ b/tests/tests_unit/test_metrics.py @@ -22,7 +22,7 @@ from cognite.client import CogniteClient from cognite.client.data_classes import Asset -from cognite.client.exceptions import CogniteDuplicatedError +from cognite.client.exceptions import CogniteDuplicatedError, CogniteNotFoundError from cognite.extractorutils import metrics from cognite.extractorutils.metrics import CognitePusher, safe_get @@ -198,6 +198,40 @@ def test_push(MockCogniteClient: Mock) -> None: assert ts["datapoints"][0][1] == pytest.approx(5) +@patch("cognite.client.CogniteClient") +def test_push_creates_missing_timeseries(MockCogniteClient: Mock) -> None: + """Test that push logic creates missing time series when enabled.""" + init_gauge() + client: CogniteClient = MockCogniteClient() + + # Create a mock CogniteNotFoundError with not_found and failed attributes + not_found_error = CogniteNotFoundError([{"externalId": "pre_gauge"}]) + not_found_error.not_found = [{"externalId": "pre_gauge"}] + not_found_error.failed = [] + + # Simulate CogniteNotFoundError on first push, then success on retry + client.time_series.data.insert_multiple.side_effect = [ + not_found_error, + None, # Success on retry + ] + + pusher = CognitePusher(client, "pre_", push_interval=1) + + GaugeSetUp.gauge.set(5) + pusher._push_to_server() + + # Assert that we tried to create the timeseries + client.time_series.create.assert_called_once() + created_ts_list = client.time_series.create.call_args[0][0] + assert len(created_ts_list) == 1 + assert created_ts_list[0].external_id == "pre_gauge" + assert created_ts_list[0].name == "gauge" + assert created_ts_list[0].description == "Test gauge" + + # Assert that insert_multiple was called twice (initial attempt + retry) + assert client.time_series.data.insert_multiple.call_count == 2 + + # MetricsUtils test @pytest.fixture def init_counter() -> None: From 84071e0edaa1e98f509cfb87ada883a63810a76a Mon Sep 17 00:00:00 2001 From: devendra-lohar Date: Fri, 5 Dec 2025 18:32:30 +0530 Subject: [PATCH 6/6] use uuids for test_id --- .../test_metrics_integration.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/tests_integration/test_metrics_integration.py b/tests/tests_integration/test_metrics_integration.py index 5a0ddf00..3f97967a 100644 --- a/tests/tests_integration/test_metrics_integration.py +++ b/tests/tests_integration/test_metrics_integration.py @@ -6,8 +6,8 @@ """ import logging -import random import time +import uuid from collections.abc import Callable, Generator from typing import Any @@ -105,7 +105,7 @@ def assert_datapoint_value( @pytest.fixture def test_prefix() -> str: """Generate a unique prefix for this test run to avoid conflicts.""" - test_id = random.randint(0, 2**31) + test_id = uuid.uuid4().hex[:8] return f"integration_test_{test_id}_" @@ -167,7 +167,7 @@ def test_cognite_pusher_with_late_registered_metrics( """ client, test_prefix, created_external_ids = cognite_pusher_test - early_gauge_name = f"early_gauge_{random.randint(0, 2**31)}" + early_gauge_name = f"early_gauge_{uuid.uuid4().hex[:8]}" early_gauge = metrics_registry(Gauge(early_gauge_name, "A metric registered before CognitePusher init")) early_gauge.set(42.0) @@ -180,13 +180,13 @@ def test_cognite_pusher_with_late_registered_metrics( push_interval=60, ) - late_gauge_name = f"late_gauge_{random.randint(0, 2**31)}" + late_gauge_name = f"late_gauge_{uuid.uuid4().hex[:8]}" late_gauge = metrics_registry( Gauge(late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)") ) late_gauge.set(99.0) - late_counter_name = f"late_counter_{random.randint(0, 2**31)}" + late_counter_name = f"late_counter_{uuid.uuid4().hex[:8]}" late_counter = metrics_registry( Counter(late_counter_name, "A counter registered AFTER CognitePusher init (like python_info)") ) @@ -240,7 +240,7 @@ def test_cognite_pusher_stop_uploads_late_metrics( push_interval=60, ) - late_metric_name = f"shutdown_metric_{random.randint(0, 2**31)}" + late_metric_name = f"shutdown_metric_{uuid.uuid4().hex[:8]}" late_metric = metrics_registry(Gauge(late_metric_name, "A metric registered after init, uploaded during shutdown")) late_metric.set(123.0) @@ -269,7 +269,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( """ client, test_prefix, created_external_ids = cognite_pusher_test - initial_metric_name = f"initial_{random.randint(0, 2**31)}" + initial_metric_name = f"initial_{uuid.uuid4().hex[:8]}" initial_metric = metrics_registry(Gauge(initial_metric_name, "Initial metric")) initial_metric.set(10.0) @@ -287,7 +287,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics( assert_timeseries_exists(client, initial_external_id) - late_metric_name = f"later_{random.randint(0, 2**31)}" + late_metric_name = f"later_{uuid.uuid4().hex[:8]}" late_metric = metrics_registry(Gauge(late_metric_name, "Late metric added between pushes")) late_metric.set(20.0)