Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:

- name: Install dependencies
run: |
python -m pip install -e ".[tests,tracking-client,graphviz]"
python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]"

- name: Run tests
run: |
Expand Down
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,11 @@ burr/tracking/server/build
examples/*/statemachine
examples/*/*/statemachine
.vscode

# Terraform (see also examples/deployment/aws/terraform/.gitignore)
**/.terraform.lock.hcl
examples/deployment/aws/terraform/.terraform/
examples/deployment/aws/terraform/*.tfstate
examples/deployment/aws/terraform/*.tfstate.*
examples/deployment/aws/terraform/.terraform.tfstate.lock.info
examples/deployment/aws/terraform/*.tfplan
25 changes: 25 additions & 0 deletions burr/tracking/server/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,31 @@ def snapshot_interval_milliseconds(self) -> Optional[int]:
pass


class EventDrivenBackendMixin(abc.ABC):
"""Mixin for backends that support event-driven updates.

Enables backends to receive real-time notifications instead of polling
for new files.
"""

@abc.abstractmethod
async def start_event_consumer(self):
"""Start the event consumer for event-driven tracking.

This method should run indefinitely, processing event notifications
from the configured message queue.
"""
pass

@abc.abstractmethod
def is_event_driven(self) -> bool:
"""Check if this backend is configured for event-driven updates.

:return: True if event-driven mode is enabled and configured, False otherwise
"""
pass


class BackendBase(abc.ABC):
async def lifespan(self, app: FastAPI):
"""Quick tool to allow plugin to the app's lifecycle.
Expand Down
31 changes: 25 additions & 6 deletions burr/tracking/server/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import asyncio
import importlib
import logging
import os
Expand All @@ -29,6 +30,7 @@
from burr.tracking.server.backend import (
AnnotationsBackendMixin,
BackendBase,
EventDrivenBackendMixin,
IndexingBackendMixin,
SnapshottingBackendMixin,
)
Expand Down Expand Up @@ -135,9 +137,20 @@ async def lifespan(app: FastAPI):
await backend.lifespan(app).__anext__()
await sync_index() # this will trigger the repeat every N seconds
await save_snapshot() # this will trigger the repeat every N seconds
# Start event consumer for event-driven tracking when configured
event_consumer_task = None
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
event_consumer_task = asyncio.create_task(backend.start_event_consumer())
global initialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High — Task reference not stored: asyncio.create_task() returns a task reference that isn't assigned to anything. This means:

  1. The task could be garbage collected
  2. There's no way to cancel it during shutdown
  3. Exceptions in the task will be silently lost

Store the task and cancel it in the shutdown path:

sqs_task = None
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
    sqs_task = asyncio.create_task(backend.start_event_consumer())
yield
if sqs_task:
    sqs_task.cancel()
    try:
        await sqs_task
    except asyncio.CancelledError:
        pass

Also, import asyncio should be a top-level import, not inside the function.

initialized = True
yield
# Graceful shutdown: cancel event consumer task
if event_consumer_task is not None:
event_consumer_task.cancel()
try:
await event_consumer_task
except asyncio.CancelledError:
pass
await backend.lifespan(app).__anext__()


Expand Down Expand Up @@ -172,12 +185,18 @@ def get_app_spec():
logger = logging.getLogger(__name__)

if app_spec.indexing:
update_interval = backend.update_interval_milliseconds() / 1000 if app_spec.indexing else None
sync_index = repeat_every(
seconds=backend.update_interval_milliseconds() / 1000,
wait_first=True,
logger=logger,
)(sync_index)
# Only use polling when not in event-driven mode
if not (
isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven()
):
update_interval = (
backend.update_interval_milliseconds() / 1000 if app_spec.indexing else None
)
sync_index = repeat_every(
seconds=backend.update_interval_milliseconds() / 1000,
wait_first=True,
logger=logger,
)(sync_index)

if app_spec.snapshotting:
snapshot_interval = (
Expand Down
3 changes: 2 additions & 1 deletion burr/tracking/server/s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ This will immediately start indexing your s3 bucket (or pick up from the last sn

To track your data, you use the S3TrackingClient. You pass the tracker to the `ApplicationBuilder`:


```python
from burr.tracking.s3client import S3TrackingClient

app = (
ApplicationBuilder()
.with_graph(graph)
Expand Down
Loading