Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cognite/extractorutils/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,6 @@ def stop(self) -> None:
self._push_to_server()
self.upload_queue.stop()
self.cancellation_token.cancel()


MetricsType = TypeVar("MetricsType", bound=BaseMetrics)
37 changes: 28 additions & 9 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class MyConfig(ExtractorConfig):
another_parameter: int
schedule: ScheduleConfig

class MyMetrics(BaseMetrics):
def __init__(self, extractor_name: str, extractor_version: str):
super().__init__(extractor_name, extractor_version)
self.custom_counter = Counter("custom_counter", "A custom counter")

class MyExtractor(Extractor[MyConfig]):
NAME = "My Extractor"
EXTERNAL_ID = "my-extractor"
Expand All @@ -30,6 +35,9 @@ class MyExtractor(Extractor[MyConfig]):

CONFIG_TYPE = MyConfig

# Override metrics type annotation for IDE support
metrics: MyMetrics

def __init_tasks__(self) -> None:
self.add_task(
ScheduledTask(
Expand All @@ -42,6 +50,8 @@ def __init_tasks__(self) -> None:

def my_task_function(self, task_context: TaskContext) -> None:
task_context.logger.info("Running my task")
# IDE will now autocomplete custom_counter
self.metrics.custom_counter.inc()
"""

import logging
Expand All @@ -59,7 +69,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
from typing_extensions import Self, assert_never

from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.metrics import BaseMetrics, MetricsType, safe_get
from cognite.extractorutils.statestore import (
AbstractStateStore,
LocalStateStore,
Expand Down Expand Up @@ -117,11 +127,13 @@ def __init__(
application_config: _T,
current_config_revision: ConfigRevision,
log_level_override: str | None = None,
metrics_class: type[MetricsType] | None = None,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision: ConfigRevision = current_config_revision
self.log_level_override = log_level_override
self.metrics_class: type[MetricsType] | None = metrics_class


class Extractor(Generic[ConfigType], CogniteLogger):
Expand Down Expand Up @@ -149,9 +161,7 @@ class Extractor(Generic[ConfigType], CogniteLogger):

cancellation_token: CancellationToken

def __init__(
self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics | None = None
) -> None:
def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> None:
self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")
self._checkin_worker = checkin_worker

Expand All @@ -175,7 +185,8 @@ def __init__(

self._tasks: list[Task] = []
self._start_time: datetime
self._metrics: BaseMetrics | None = metrics

self.metrics: BaseMetrics = self._load_metrics(config.metrics_class)

self.metrics_push_manager = (
self.metrics_config.create_manager(self.cognite_client, cancellation_token=self.cancellation_token)
Expand Down Expand Up @@ -262,6 +273,16 @@ def _setup_logging(self) -> None:
"Defaulted to console logging."
)

def _load_metrics(self, metrics_class: type[MetricsType] | None = None) -> MetricsType | BaseMetrics:
"""
Loads metrics based on the provided metrics class.

Reuses existing singleton if available to avoid Prometheus registry conflicts.
"""
if metrics_class:
return safe_get(metrics_class)
return safe_get(BaseMetrics, extractor_name=self.EXTERNAL_ID, extractor_version=self.VERSION)

def _load_state_store(self) -> None:
"""
Searches through the config object for a StateStoreConfig.
Expand Down Expand Up @@ -379,10 +400,8 @@ def restart(self) -> None:
self.cancellation_token.cancel()

@classmethod
def _init_from_runtime(
cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics
) -> Self:
return cls(config, checkin_worker, metrics)
def _init_from_runtime(cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> Self:
return cls(config, checkin_worker)

def add_task(self, task: Task) -> None:
"""
Expand Down
22 changes: 14 additions & 8 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def main() -> None:
CogniteAuthError,
CogniteConnectionError,
)
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.metrics import BaseMetrics, MetricsType
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.exceptions import InvalidArgumentError, InvalidConfigError
from cognite.extractorutils.unstable.configuration.loaders import (
Expand Down Expand Up @@ -79,16 +79,13 @@ def _extractor_process_entrypoint(
controls: _RuntimeControls,
config: FullConfig,
checkin_worker: CheckinWorker,
metrics: BaseMetrics | None = None,
) -> None:
logger = logging.getLogger(f"{extractor_class.EXTERNAL_ID}.runtime")
checkin_worker.active_revision = config.current_config_revision
checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls))
checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls))
checkin_worker.set_retry_startup(extractor_class.RETRY_STARTUP)
if not metrics:
metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION)
extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics)
extractor = extractor_class._init_from_runtime(config, checkin_worker)
extractor._attach_runtime_controls(
cancel_event=controls.cancel_event,
message_queue=controls.message_queue,
Expand Down Expand Up @@ -138,13 +135,13 @@ class Runtime(Generic[ExtractorType]):
def __init__(
self,
extractor: type[ExtractorType],
metrics: BaseMetrics | None = None,
metrics: type[MetricsType] | None = None,
) -> None:
self._extractor_class = extractor
self._cancellation_token = CancellationToken()
self._cancellation_token.cancel_on_interrupt()
self._message_queue: Queue[RuntimeMessage] = Queue()
self._metrics = metrics
self._metrics_class = metrics
self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime")
self._setup_logging()
self._cancel_event: MpEvent | None = None
Expand Down Expand Up @@ -273,7 +270,7 @@ def _spawn_extractor(

process = Process(
target=_extractor_process_entrypoint,
args=(self._extractor_class, controls, config, checkin_worker, self._metrics),
args=(self._extractor_class, controls, config, checkin_worker),
)

process.start()
Expand Down Expand Up @@ -477,6 +474,14 @@ def _main_runtime(self, args: Namespace) -> None:
if not args.skip_init_checks and not self._verify_connection_config(connection_config):
sys.exit(1)

if self._metrics_class is not None and (
not isinstance(self._metrics_class, type) or not issubclass(self._metrics_class, BaseMetrics)
):
self.logger.critical(
"The provided metrics class does not inherit from BaseMetrics. Metrics will not be collected."
)
sys.exit(1)

# This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't
# exist yet, and I have not found a way to represent it in a generic way that isn't just an Any in disguise.
application_config: Any
Expand Down Expand Up @@ -507,6 +512,7 @@ def _main_runtime(self, args: Namespace) -> None:
application_config=application_config,
current_config_revision=current_config_revision,
log_level_override=args.log_level,
metrics_class=self._metrics_class,
),
checkin_worker,
)
Expand Down
30 changes: 30 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,47 @@
from enum import Enum

import pytest
from prometheus_client.core import REGISTRY

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError
from cognite.extractorutils import metrics

NUM_NODES = 5000
NUM_EDGES = NUM_NODES // 100


@pytest.fixture(autouse=True)
def reset_singleton() -> Generator[None, None, None]:
"""
This fixture ensures that the _metrics_singularities
class variables are reset, and Prometheus collectors are unregistered,
providing test isolation.
"""
# Clean up before test
metrics._metrics_singularities.clear()

# Unregister all collectors to prevent "Duplicated timeseries" errors
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
with contextlib.suppress(Exception):
REGISTRY.unregister(collector)

yield

# Clean up after test
metrics._metrics_singularities.clear()

# Unregister all collectors again
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
with contextlib.suppress(Exception):
REGISTRY.unregister(collector)


class ETestType(Enum):
TIME_SERIES = "time_series"
CDM_TIME_SERIES = "cdm_time_series"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_unstable/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import gzip
import json
import os
from collections import Counter
from collections.abc import Callable, Generator, Iterator
from threading import RLock
from time import sleep, time
Expand All @@ -10,6 +9,7 @@

import pytest
import requests_mock
from prometheus_client.core import Counter

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
Expand Down
5 changes: 3 additions & 2 deletions tests/test_unstable/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ def counting_push(self: CognitePusher) -> None:
application_config=app_config,
current_config_revision=1,
log_level_override=override_level,
metrics_class=TestMetrics,
)
worker = get_checkin_worker(connection_config)
extractor = TestExtractor(full_config, worker, metrics=TestMetrics)
assert isinstance(extractor._metrics, TestMetrics) or extractor._metrics == TestMetrics
extractor = TestExtractor(full_config, worker)
assert isinstance(extractor.metrics, TestMetrics)

with contextlib.ExitStack() as stack:
stack.enter_context(contextlib.suppress(Exception))
Expand Down
Loading
Loading