Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 67 additions & 19 deletions livekit-agents/livekit/agents/telemetry/traces.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import json
import logging
import threading
Expand Down Expand Up @@ -403,6 +404,26 @@ def _to_proto_chat_item(item: ChatItem) -> dict: # agent_pb.agent_session.ChatC
return MessageToDict(item_pb, preserving_proto_field_name=True)


async def _parse_retry_delay(resp: aiohttp.ClientResponse) -> float | None:
"""Parse a protobuf Status error response for RetryInfo and return the retry delay in seconds,
or None if the error is not retryable."""
from google.rpc import error_details_pb2, status_pb2 # type: ignore[import-untyped]

try:
body = await resp.read()
status = status_pb2.Status()
status.ParseFromString(body)
for detail in status.details:
retry_info = error_details_pb2.RetryInfo()
if detail.Unpack(retry_info):
delay = retry_info.retry_delay
return float(delay.seconds + delay.nanos / 1e9)
except Exception:
pass

return None


async def _upload_session_report(
*,
agent_name: str,
Expand Down Expand Up @@ -544,42 +565,69 @@ def _log(
header_msg.start_time.FromMilliseconds(int((report.audio_recording_started_at or 0) * 1000))
header_bytes = header_msg.SerializeToString()

mp = aiohttp.MultipartWriter("form-data")

part = mp.append(header_bytes)
part.set_content_disposition("form-data", name="header", filename="header.binpb")
part.headers["Content-Type"] = "application/protobuf"
part.headers["Content-Length"] = str(len(header_bytes))

chat_history_json = ""
if recording_options["transcript"]:
chat_history_json = json.dumps(report.chat_history.to_dict(exclude_timestamp=False))
part = mp.append(chat_history_json)
part.set_content_disposition("form-data", name="chat_history", filename="chat_history.json")
part.headers["Content-Type"] = "application/json"
part.headers["Content-Length"] = str(len(chat_history_json))

audio_bytes = b""
if has_audio and report.audio_recording_path:
try:
async with aiofiles.open(report.audio_recording_path, "rb") as f:
audio_bytes = await f.read()
except Exception:
audio_bytes = b""

url = f"{observability_url}/observability/recordings/v0"

def _build_multipart() -> aiohttp.MultipartWriter:
mp = aiohttp.MultipartWriter("form-data")

part = mp.append(header_bytes)
part.set_content_disposition("form-data", name="header", filename="header.binpb")
part.headers["Content-Type"] = "application/protobuf"
part.headers["Content-Length"] = str(len(header_bytes))

if recording_options["transcript"]:
part = mp.append(chat_history_json)
part.set_content_disposition(
"form-data", name="chat_history", filename="chat_history.json"
)
part.headers["Content-Type"] = "application/json"
part.headers["Content-Length"] = str(len(chat_history_json))

if audio_bytes:
part = mp.append(audio_bytes)
part.set_content_disposition("form-data", name="audio", filename="recording.ogg")
part.headers["Content-Type"] = "audio/ogg"
part.headers["Content-Length"] = str(len(audio_bytes))

url = f"{observability_url}/observability/recordings/v0"
headers = {
"Authorization": f"Bearer {jwt}",
"Content-Type": mp.content_type,
}
return mp

max_retries = 3
for attempt in range(max_retries + 1):
mp = _build_multipart()
headers = {
"Authorization": f"Bearer {jwt}",
"Content-Type": mp.content_type,
}

logger.debug("uploading session report to LiveKit Cloud")
async with http_session.post(url, data=mp, headers=headers) as resp:
resp.raise_for_status()
logger.debug("uploading session report to LiveKit Cloud")
async with http_session.post(url, data=mp, headers=headers) as resp:
if resp.status < 400:
break

retry_delay = await _parse_retry_delay(resp)
if retry_delay is None or attempt == max_retries:
resp.raise_for_status()
raise RuntimeError(f"recording upload failed: status {resp.status}")

logger.warning(
"recording upload failed (attempt %d/%d), retrying in %.1fs",
attempt + 1,
max_retries + 1,
retry_delay,
)
await asyncio.sleep(retry_delay)

logger.debug("finished uploading")

Expand Down
1 change: 1 addition & 0 deletions tests/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def _make_mock_tagger(
def _make_mock_http() -> MagicMock:
"""Create a mock aiohttp.ClientSession with async post."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.raise_for_status = MagicMock()
mock_http = MagicMock(spec=aiohttp.ClientSession)
mock_post_cm = AsyncMock()
Expand Down
Loading