Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 65 additions & 33 deletions cognite/extractorutils/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ def __init__(self):
from prometheus_client.exposition import basic_auth_handler, delete_from_gateway, pushadd_to_gateway

from cognite.client import CogniteClient
from cognite.client.data_classes import Asset, Datapoints, DatapointsArray, TimeSeries
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes import Asset, TimeSeries
from cognite.client.exceptions import CogniteDuplicatedError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader.time_series import DataPointList, TimeSeriesUploadQueue
from cognite.extractorutils.util import EitherId

from .util import ensure_time_series

_metrics_singularities = {}


Expand Down Expand Up @@ -359,70 +357,104 @@ def __init__(
self.asset = asset
self.external_id_prefix = external_id_prefix
self.data_set = data_set
self._asset_id: int | None = None
self._data_set_id: int | None = None

self._init_cdf()

self.upload_queue = TimeSeriesUploadQueue(
cdf_client=cdf_client,
create_missing=self._create_missing_timeseries_factory,
data_set_id=self._data_set_id,
cancellation_token=cancellation_token,
)

self._cdf_project = cdf_client.config.project

def _init_cdf(self) -> None:
"""
Initialize the CDF tenant with the necessary time series and asset.
"""
time_series: list[TimeSeries] = []
Initialize the CDF tenant with the necessary asset and dataset.

Timeseries are created automatically by TimeSeriesUploadQueue when datapoints are pushed.
"""
if self.asset is not None:
# Ensure that asset exist, and retrieve internal ID
# Ensure that asset exists, and retrieve internal ID
asset: Asset | None
try:
asset = self.cdf_client.assets.create(self.asset)
except CogniteDuplicatedError:
asset = self.cdf_client.assets.retrieve(external_id=self.asset.external_id)

asset_id = asset.id if asset is not None else None

else:
asset_id = None
self._asset_id = asset.id if asset is not None else None

data_set_id = None
if self.data_set:
dataset = self.cdf_client.data_sets.retrieve(
id=self.data_set.internal_id, external_id=self.data_set.external_id
)
if dataset:
data_set_id = dataset.id
self._data_set_id = dataset.id

for metric in REGISTRY.collect():
if type(metric) is Metric and metric.type in ["gauge", "counter"]:
external_id = self.external_id_prefix + metric.name
def _create_missing_timeseries_factory(self, external_id: str, datapoints: DataPointList) -> TimeSeries:
"""
Factory function to create missing timeseries.

time_series.append(
TimeSeries(
external_id=external_id,
name=metric.name,
legacy_name=external_id,
description=metric.documentation,
asset_id=asset_id,
data_set_id=data_set_id,
)
)
Args:
external_id: External ID of the timeseries to create
datapoints: List of datapoints that triggered the creation

Returns:
A TimeSeries object
"""
metric_name = external_id[len(self.external_id_prefix) :]

ensure_time_series(self.cdf_client, time_series)
metric_description = ""
for metric in REGISTRY.collect():
if isinstance(metric, Metric) and metric.name == metric_name:
metric_description = metric.documentation
break

is_string = (
isinstance(datapoints[0].get("value"), str)
if isinstance(datapoints[0], dict)
else isinstance(datapoints[0][1], str)
)

return TimeSeries(
external_id=external_id,
name=metric_name,
legacy_name=external_id,
description=metric_description,
asset_id=self._asset_id,
data_set_id=self._data_set_id,
is_string=is_string,
)

def _push_to_server(self) -> None:
"""
Create datapoints an push them to their respective time series.
Create datapoints and push them to their respective time series using TimeSeriesUploadQueue.

The queue will automatically create missing timeseries for late-registered metrics.
"""
timestamp = int(arrow.get().float_timestamp * 1000)

datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray | NodeId]] = []

for metric in REGISTRY.collect():
if isinstance(metric, Metric) and metric.type in ["gauge", "counter"]:
if len(metric.samples) == 0:
continue

external_id = self.external_id_prefix + metric.name
datapoints.append({"externalId": external_id, "datapoints": [(timestamp, metric.samples[0].value)]})

self.cdf_client.time_series.data.insert_multiple(datapoints)
self.upload_queue.add_to_upload_queue(
external_id=external_id, datapoints=[(timestamp, metric.samples[0].value)]
)

self.upload_queue.upload()
self.logger.debug("Pushed metrics to CDF tenant '%s'", self._cdf_project)

def stop(self) -> None:
"""
Stop the push loop and ensure all metrics are uploaded.
"""
self._push_to_server()
self.upload_queue.stop()
self.cancellation_token.cancel()
Loading
Loading