Summary
Currently, non event-driven components cannot be reused in event-driven models. Instead new components have to be defined. This is because if we simply subclass a Component and add events to it, it will still wait (forever) to receive the inputs, which never arrive.
Example
This example demonstrates the behaviour we would like, using a subclass of FileWriter to reuse it:
from plugboard.library import FileWriter
from plugboard.component import IOController as IO, Component
from plugboard.events import Event
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from pydantic import BaseModel
import typing as _t
from plugboard.schemas import ConnectorSpec
class MyData(BaseModel):
message: str
class MyEvent(Event):
type: _t.ClassVar[str] = "my_event"
data: MyData
class EventReaderFileWriter(FileWriter):
io: IO = IO(input_events=[MyEvent])
@MyEvent.handler
async def handle_match(self, event: MyEvent):
self.message = event.data.message
class MessageEventGenerator(Component):
io = IO(output_events=[MyEvent])
def __init__(self, iters: int, **kwargs: _t.Any) -> None:
super().__init__(**kwargs)
self._iters = iters
async def init(self) -> None:
self._seq = iter(range(self._iters))
async def step(self) -> None:
try:
message = "Message {}".format(next(self._seq))
event = MyEvent(source=self.name, data=MyData(message=message))
self.io.queue_event(event)
except StopIteration:
await self.io.close()
components = [
MessageEventGenerator(iters=3, name="message_event_generator"),
EventReaderFileWriter(
path="output_messages.csv", name="event_reader_file_writer", field_names=["message"]
),
]
event_connectors = AsyncioConnector.builder().build_event_connectors(components)
process = LocalProcess(
components=components,
connectors=event_connectors,
)
async with process:
await process.run()
# This hangs while event_reader_file_writer waits for input
The following is the non-event driven behaviour, which works:
class MessageInputGenerator(Component):
io = IO(outputs=["message_input"])
def __init__(self, iters: int, **kwargs: _t.Any) -> None:
super().__init__(**kwargs)
self._iters = iters
async def init(self) -> None:
self._seq = iter(range(self._iters))
async def step(self) -> None:
try:
self.message_input = "Message {}".format(next(self._seq))
except StopIteration:
await self.io.close()
connect = lambda in_, out_: AsyncioConnector( # (1)!
spec=ConnectorSpec(source=in_, target=out_)
)
components = [
MessageInputGenerator(iters=3, name="message_input_generator"),
EventReaderFileWriter(
path="output_messages.csv", name="event_reader_file_writer", field_names=["message"]
),
]
connectors = [
connect("message_input_generator.message_input", "event_reader_file_writer.message"),
]
process = LocalProcess(
components=components,
connectors=connectors,
)
async with process:
await process.run()
Alternatives
No response
Summary
Currently, non event-driven components cannot be reused in event-driven models. Instead new components have to be defined. This is because if we simply subclass a
Componentand add events to it, it will still wait (forever) to receive the inputs, which never arrive.Example
This example demonstrates the behaviour we would like, using a subclass of
FileWriterto reuse it:The following is the non-event driven behaviour, which works:
Alternatives
No response