Skip to content

feat: Improve reusability of components in event-driven simulations #248

@toby-coleman

Description

@toby-coleman

Summary

Currently, non event-driven components cannot be reused in event-driven models. Instead new components have to be defined. This is because if we simply subclass a Component and add events to it, it will still wait (forever) to receive the inputs, which never arrive.

Example

This example demonstrates the behaviour we would like, using a subclass of FileWriter to reuse it:

from plugboard.library import FileWriter
from plugboard.component import IOController as IO, Component
from plugboard.events import Event
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess

from pydantic import BaseModel
import typing as _t

from plugboard.schemas import ConnectorSpec

class MyData(BaseModel):
    message: str


class MyEvent(Event):
    type: _t.ClassVar[str] = "my_event"
    data: MyData


class EventReaderFileWriter(FileWriter):
    io: IO = IO(input_events=[MyEvent])

    @MyEvent.handler
    async def handle_match(self, event: MyEvent):
        self.message = event.data.message

class MessageEventGenerator(Component):
    io = IO(output_events=[MyEvent])

    def __init__(self, iters: int, **kwargs: _t.Any) -> None:
        super().__init__(**kwargs)
        self._iters = iters

    async def init(self) -> None:
        self._seq = iter(range(self._iters))

    async def step(self) -> None:
        try:
            message = "Message {}".format(next(self._seq))
            event = MyEvent(source=self.name, data=MyData(message=message))
            self.io.queue_event(event)
        except StopIteration:
            await self.io.close()

components = [
    MessageEventGenerator(iters=3, name="message_event_generator"),
    EventReaderFileWriter(
        path="output_messages.csv", name="event_reader_file_writer", field_names=["message"]
    ),
]

event_connectors = AsyncioConnector.builder().build_event_connectors(components)

process = LocalProcess(
    components=components,
    connectors=event_connectors,
)

async with process:
    await process.run()
# This hangs while event_reader_file_writer waits for input

The following is the non-event driven behaviour, which works:

class MessageInputGenerator(Component):
    io = IO(outputs=["message_input"])

    def __init__(self, iters: int, **kwargs: _t.Any) -> None:
        super().__init__(**kwargs)
        self._iters = iters

    async def init(self) -> None:
        self._seq = iter(range(self._iters))

    async def step(self) -> None:
        try:
            self.message_input = "Message {}".format(next(self._seq))
        except StopIteration:
            await self.io.close()

connect = lambda in_, out_: AsyncioConnector(  # (1)!
    spec=ConnectorSpec(source=in_, target=out_)
)
components = [
    MessageInputGenerator(iters=3, name="message_input_generator"),
    EventReaderFileWriter(
        path="output_messages.csv", name="event_reader_file_writer", field_names=["message"]
    ),
]
connectors = [
    connect("message_input_generator.message_input", "event_reader_file_writer.message"),
]

process = LocalProcess(
    components=components,
    connectors=connectors,
)

async with process:
    await process.run()

Alternatives

No response

Metadata

Metadata

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions