Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/publisher/core.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import datetime
import json
import re
from typing import TYPE_CHECKING, Any, TypeVar

import mqtt_topics
from utils import datetime_to_str

if TYPE_CHECKING:
from configuration import Configuration

T = TypeVar("T")

type Publishable = bool | int | float | str | dict[str, Any] | datetime
"""Closed union of value types this gateway knows how to publish to MQTT.

Mirrors the typed `publish_*` methods on :class:`Publisher` plus the `dict`
shape handled by `publish_json`, and `datetime`, which is stringified via
:func:`utils.datetime_to_str`. Use it at signature boundaries when a caller
holds "something publishable" without statically knowing which arm.
"""

type WirePayload = bool | int | float | str
"""Primitive subset of :data:`Publishable` that reaches the transport layer.

After the typed `publish_*` methods do their work (`publish_json` serializes
dicts to JSON strings, `publish_datetime` stringifies via
:func:`utils.datetime_to_str`), only these scalar arms cross the
publisher/transport boundary. Use `WirePayload | None` for wire-level helpers
where `None` means "clear the retained message."
"""


class MqttCommandListener(ABC):
@abstractmethod
Expand Down Expand Up @@ -108,6 +129,49 @@ def publish_float(
) -> None:
raise NotImplementedError

def publish_datetime(
self,
key: str,
value: datetime,
no_prefix: bool = False,
*,
retain: bool = True,
) -> None:
"""Stringify a datetime via :func:`utils.datetime_to_str` and publish."""
self.publish_str(key, datetime_to_str(value), no_prefix, retain=retain)

def publish(
self,
key: str,
value: Publishable,
no_prefix: bool = False,
*,
retain: bool = True,
) -> None:
"""Dispatch to the appropriate typed publish_* based on value type.

For callers that hold a `Publishable` without statically knowing
which arm of the union it is. `retain` is forwarded to every arm.
"""
# bool must precede int: isinstance(True, int) is True in Python.
if isinstance(value, bool):
self.publish_bool(key, value, no_prefix, retain=retain)
elif isinstance(value, int):
self.publish_int(key, value, no_prefix, retain=retain)
elif isinstance(value, float):
self.publish_float(key, value, no_prefix, retain=retain)
elif isinstance(value, str):
self.publish_str(key, value, no_prefix, retain=retain)
elif isinstance(value, dict):
self.publish_json(key, value, no_prefix, retain=retain)
elif isinstance(value, datetime):
self.publish_datetime(key, value, no_prefix, retain=retain)
else:
# Defensive: type system rules this out, but `Any` callers can sneak
# an unsupported runtime type through; raise rather than silently no-op.
msg = f"Unsupported value type: {type(value).__name__}" # type: ignore[unreachable]
raise TypeError(msg)

@abstractmethod
def clear_topic(self, key: str, no_prefix: bool = False) -> None:
raise NotImplementedError
Expand Down
6 changes: 4 additions & 2 deletions src/publisher/log_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from typing import Any, override

from publisher.core import Publisher
from publisher.core import Publisher, WirePayload

LOG = logging.getLogger(__name__)
LOG.setLevel(level="DEBUG")
Expand Down Expand Up @@ -62,5 +62,7 @@ def publish_float(
def clear_topic(self, key: str, no_prefix: bool = False) -> None:
self.internal_publish(key, None)

def internal_publish(self, key: str, value: Any, *, retain: bool = True) -> None:
def internal_publish(
self, key: str, value: WirePayload | None, *, retain: bool = True
) -> None:
LOG.debug(f"{key}: {value} (retain={retain})")
5 changes: 4 additions & 1 deletion src/publisher/mqtt_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
if TYPE_CHECKING:
from configuration import Configuration
from integrations.openwb.charging_station import ChargingStation
from publisher.core import WirePayload

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -226,7 +227,9 @@ async def __handle_imported_energy(self, topic: str, payload: str) -> None:
vin, imported_energy_wh
)

def __publish(self, topic: str, payload: Any, *, retain: bool = True) -> None:
def __publish(
self, topic: str, payload: WirePayload | None, *, retain: bool = True
) -> None:
self.client.publish(topic, payload, retain=retain)

@override
Expand Down
58 changes: 13 additions & 45 deletions src/status_publisher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
from __future__ import annotations

from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Any, Final, TypeVar
from typing import TYPE_CHECKING, Final

from utils import datetime_to_str
from publisher.core import Publishable

if TYPE_CHECKING:
from collections.abc import Callable

from publisher.core import Publisher
from vehicle_info import VehicleInfo

T = TypeVar("T")
Publishable = TypeVar("Publishable", str, int, float, bool, dict[str, Any], datetime)


class VehicleDataPublisher[I, O](metaclass=ABCMeta):
def __init__(
Expand All @@ -28,65 +24,37 @@ def __init__(
def publish(self, data: I) -> O:
raise NotImplementedError

def _publish(
def _publish[V: Publishable](
self,
*,
topic: str,
value: Publishable | None,
validator: Callable[[Publishable], bool] = lambda _: True,
value: V | None,
validator: Callable[[V], bool] = lambda _: True,
no_prefix: bool = False,
retain: bool = True,
) -> tuple[bool, Publishable | None]:
) -> tuple[bool, V | None]:
if value is None or not validator(value):
return False, None
actual_topic = topic if no_prefix else self.__get_topic(topic)
published = self._publish_directly(
topic=actual_topic, value=value, retain=retain
)
return published, value
self.__publisher.publish(actual_topic, value, retain=retain)
return True, value

def _transform_and_publish(
def _transform_and_publish[T, V: Publishable](
self,
*,
topic: str,
value: T | None,
validator: Callable[[T], bool] = lambda _: True,
transform: Callable[[T], Publishable],
transform: Callable[[T], V],
no_prefix: bool = False,
retain: bool = True,
) -> tuple[bool, Publishable | None]:
) -> tuple[bool, V | None]:
if value is None or not validator(value):
return False, None
actual_topic = topic if no_prefix else self.__get_topic(topic)
transformed_value = transform(value)
published = self._publish_directly(
topic=actual_topic, value=transformed_value, retain=retain
)
return published, transformed_value

def _publish_directly(
self, *, topic: str, value: Publishable, retain: bool = True
) -> bool:
published = False
if isinstance(value, bool):
self.__publisher.publish_bool(topic, value)
published = True
elif isinstance(value, int):
self.__publisher.publish_int(topic, value)
published = True
elif isinstance(value, float):
self.__publisher.publish_float(topic, value)
published = True
elif isinstance(value, str):
self.__publisher.publish_str(topic, value)
published = True
elif isinstance(value, dict):
self.__publisher.publish_json(topic, value, retain=retain)
published = True
elif isinstance(value, datetime):
self.__publisher.publish_str(topic, datetime_to_str(value))
published = True
return published
self.__publisher.publish(actual_topic, transformed_value, retain=retain)
return True, transformed_value

def __get_topic(self, sub_topic: str) -> str:
return f"{self.__mqtt_vehicle_prefix}/{sub_topic}"
49 changes: 11 additions & 38 deletions src/vehicle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from enum import Enum, unique
import logging
import math
from typing import TYPE_CHECKING, Any, Final, TypeVar
from typing import TYPE_CHECKING, Final

from apscheduler.triggers.cron import CronTrigger
from saic_ismart_client_ng.api.vehicle_charging import (
Expand All @@ -17,6 +17,7 @@

from extractors import extract_electric_range, extract_soc
import mqtt_topics
from publisher.core import Publishable
from status_publisher.charge.chrg_mgmt_data_resp import (
ChrgMgmtDataRespProcessingResult,
ChrgMgmtDataRespPublisher,
Expand All @@ -26,7 +27,6 @@
VehicleStatusRespProcessingResult,
VehicleStatusRespPublisher,
)
from utils import datetime_to_str

if TYPE_CHECKING:
from collections.abc import Callable
Expand All @@ -42,11 +42,6 @@
from publisher.core import Publisher
from vehicle_info import VehicleInfo

T = TypeVar("T")
Publishable = TypeVar(
"Publishable", str, int, float, bool, dict[str, Any], datetime.datetime
)

DEFAULT_AC_TEMP = 22
PRESSURE_TO_BAR_FACTOR = 0.04

Expand Down Expand Up @@ -378,7 +373,7 @@ def notify_car_activity(self) -> None:
self.last_car_activity = datetime.datetime.now(tz=datetime.UTC)
self.__publish(
topic=mqtt_topics.REFRESH_LAST_ACTIVITY,
value=datetime_to_str(self.last_car_activity),
value=self.last_car_activity,
)

def notify_message(self, message: MessageEntity) -> None:
Expand Down Expand Up @@ -505,8 +500,8 @@ def last_failed_refresh(self, value: datetime.datetime | None) -> None:
)
)
self.__failed_refresh_counter = self.__failed_refresh_counter + 1
self.publisher.publish_str(
self.get_topic(mqtt_topics.REFRESH_LAST_ERROR), datetime_to_str(value)
self.publisher.publish_datetime(
self.get_topic(mqtt_topics.REFRESH_LAST_ERROR), value
)
self.publisher.publish_int(
self.get_topic(mqtt_topics.REFRESH_PERIOD_ERROR),
Expand Down Expand Up @@ -806,41 +801,19 @@ def update_battery_capacity(self, new_capacity: float) -> None:
def is_remote_ac_running(self) -> bool:
return self.__remote_ac_running

def __publish(
def __publish[V: Publishable](
self,
*,
topic: str,
value: Publishable | None,
validator: Callable[[Publishable], bool] = lambda _: True,
value: V | None,
validator: Callable[[V], bool] = lambda _: True,
no_prefix: bool = False,
) -> tuple[bool, Publishable | None]:
) -> tuple[bool, V | None]:
if value is None or not validator(value):
return False, None
actual_topic = topic if no_prefix else self.get_topic(topic)
published = self.__publish_directly(topic=actual_topic, value=value)
return published, value

def __publish_directly(self, *, topic: str, value: Publishable) -> bool:
published = False
if isinstance(value, bool):
self.publisher.publish_bool(topic, value)
published = True
elif isinstance(value, int):
self.publisher.publish_int(topic, value)
published = True
elif isinstance(value, float):
self.publisher.publish_float(topic, value)
published = True
elif isinstance(value, str):
self.publisher.publish_str(topic, value)
published = True
elif isinstance(value, dict):
self.publisher.publish_json(topic, value)
published = True
elif isinstance(value, datetime.datetime):
self.publisher.publish_str(topic, datetime_to_str(value))
published = True
return published
self.publisher.publish(actual_topic, value)
return True, value

@property
def vin(self) -> str:
Expand Down
7 changes: 6 additions & 1 deletion tests/mocks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@

if TYPE_CHECKING:
from configuration import Configuration
from publisher.core import WirePayload

LOG = logging.getLogger(__name__)


class MessageCapturingConsolePublisher(ConsolePublisher):
def __init__(self, configuration: Configuration) -> None:
super().__init__(configuration)
# Test inspection map; consumers narrow per-key (e.g. json.loads on
# serialized dict topics), so keep the value type permissive here.
self.map: dict[str, Any] = {}
self.publish_count: dict[str, int] = {}

@override
def internal_publish(self, key: str, value: Any, *, retain: bool = True) -> None:
def internal_publish(
self, key: str, value: WirePayload | None, *, retain: bool = True
) -> None:
self.map[key] = value
self.publish_count[key] = self.publish_count.get(key, 0) + 1
LOG.debug(f"{key}: {value} (retain={retain})")
1 change: 1 addition & 0 deletions tests/publisher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from __future__ import annotations
Loading