|
18 | 18 | GetMicrogridDispatchResponse, |
19 | 19 | ListMicrogridDispatchesRequest, |
20 | 20 | ListMicrogridDispatchesResponse, |
| 21 | + StreamMicrogridDispatchesRequest, |
| 22 | + StreamMicrogridDispatchesResponse, |
21 | 23 | ) |
22 | 24 | from frequenz.api.dispatch.v1.dispatch_pb2 import ( |
23 | 25 | TimeIntervalFilter as PBTimeIntervalFilter, |
|
27 | 29 | UpdateMicrogridDispatchResponse, |
28 | 30 | ) |
29 | 31 |
|
| 32 | +from frequenz import channels |
30 | 33 | from frequenz.client.base.channel import ChannelOptions, SslOptions |
31 | 34 | from frequenz.client.base.client import BaseApiClient |
32 | 35 | from frequenz.client.base.conversion import to_timestamp |
| 36 | +from frequenz.client.base.streaming import GrpcStreamBroadcaster |
33 | 37 |
|
34 | 38 | from ._internal_types import DispatchCreateRequest |
35 | 39 | from .types import ( |
36 | 40 | ComponentSelector, |
37 | 41 | Dispatch, |
| 42 | + DispatchEvent, |
38 | 43 | RecurrenceRule, |
39 | 44 | component_selector_to_protobuf, |
40 | 45 | ) |
|
46 | 51 | class Client(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]): |
47 | 52 | """Dispatch API client.""" |
48 | 53 |
|
| 54 | + streams: dict[ |
| 55 | + int, GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent] |
| 56 | + ] = {} |
| 57 | + """A dictionary of streamers, keyed by microgrid_id.""" |
| 58 | + |
49 | 59 | def __init__( |
50 | 60 | self, |
51 | 61 | *, |
@@ -170,6 +180,50 @@ def to_interval( |
170 | 180 | else: |
171 | 181 | break |
172 | 182 |
|
| 183 | + def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]: |
| 184 | + """Receive a stream of dispatch events. |
| 185 | +
|
| 186 | + This function returns a receiver channel that can be used to receive |
| 187 | + dispatch events. |
| 188 | + An event is one of [CREATE, UPDATE, DELETE]. |
| 189 | +
|
| 190 | + Example usage: |
| 191 | +
|
| 192 | + ``` |
| 193 | + client = Client(key="key", server_url="grpc://fz-0004.frequenz.io") |
| 194 | + async for message in client.stream(microgrid_id=1): |
| 195 | + print(message.event, message.dispatch) |
| 196 | + ``` |
| 197 | +
|
| 198 | + Args: |
| 199 | + microgrid_id: The microgrid_id to receive dispatches for. |
| 200 | +
|
| 201 | + Returns: |
| 202 | + A receiver channel to receive the stream of dispatch events. |
| 203 | + """ |
| 204 | + return self._get_stream(microgrid_id).new_receiver() |
| 205 | + |
| 206 | + def _get_stream( |
| 207 | + self, microgrid_id: int |
| 208 | + ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]: |
| 209 | + """Get an instance to the streaming helper.""" |
| 210 | + broadcaster = self.streams.get(microgrid_id) |
| 211 | + if broadcaster is None: |
| 212 | + request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id) |
| 213 | + broadcaster = GrpcStreamBroadcaster( |
| 214 | + stream_name="StreamMicrogridDispatches", |
| 215 | + stream_method=lambda: cast( |
| 216 | + AsyncIterator[StreamMicrogridDispatchesResponse], |
| 217 | + self.stub.StreamMicrogridDispatches( |
| 218 | + request, metadata=self._metadata |
| 219 | + ), |
| 220 | + ), |
| 221 | + transform=DispatchEvent.from_protobuf, |
| 222 | + ) |
| 223 | + self.streams[microgrid_id] = broadcaster |
| 224 | + |
| 225 | + return broadcaster |
| 226 | + |
173 | 227 | async def create( |
174 | 228 | self, |
175 | 229 | microgrid_id: int, |
|
0 commit comments