Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
from .voice.amd import (
AMD,
AMDCategory,
AMDResult,
AMDPredictionEvent,
)
from .voice.background_audio import AudioConfig, BackgroundAudioPlayer, BuiltinAudioClip, PlayHandle
from .voice.room_io import RoomInputOptions, RoomIO, RoomOutputOptions
Expand Down Expand Up @@ -232,7 +232,7 @@ def __getattr__(name: str) -> typing.Any:
"AgentHandoffEvent",
"AMD",
"AMDCategory",
"AMDResult",
"AMDPredictionEvent",
"TurnHandlingOptions",
"EndpointingOptions",
"InterruptionOptions",
Expand Down
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/voice/amd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .classifier import AMDCategory, AMDResult
from .classifier import AMDCategory, AMDPredictionEvent
from .detector import AMD

__all__ = ["AMD", "AMDCategory", "AMDResult"]
__all__ = ["AMD", "AMDCategory", "AMDPredictionEvent"]
16 changes: 8 additions & 8 deletions livekit-agents/livekit/agents/voice/amd/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class AMDCategory(str, Enum):
UNCERTAIN = "uncertain"


class AMDResult(BaseModel):
type: Literal["amd"] = "amd"
class AMDPredictionEvent(BaseModel):
type: Literal["amd_prediction"] = "amd_prediction"
speech_duration: float
category: AMDCategory
reason: str
Expand Down Expand Up @@ -102,7 +102,7 @@ def wrapper(self: "_AMDClassifier", *args: Any, **kwargs: Any) -> Any:
return wrapper


class _AMDClassifier(EventEmitter[Literal["amd_result"]]):
class _AMDClassifier(EventEmitter[Literal["amd_prediction"]]):
def __init__(
self,
llm: LLM,
Expand All @@ -128,7 +128,7 @@ def __init__(
self._silence_timer: asyncio.TimerHandle | None = None
self._detection_timeout_timer: asyncio.TimerHandle | None = None

self._verdict_result: AMDResult | None = None
self._verdict_result: AMDPredictionEvent | None = None
self._verdict_ready = asyncio.Event()

self._llm = llm
Expand Down Expand Up @@ -224,7 +224,7 @@ def on_user_speech_ended(self, silence_duration: float) -> None:
functools.partial(self._silence_timer_callback, speech_duration=speech_duration),
)

def _set_verdict(self, result: AMDResult) -> None:
def _set_verdict(self, result: AMDPredictionEvent) -> None:
self._verdict_result = result
self._try_emit_result()

Expand All @@ -239,7 +239,7 @@ def _try_emit_result(self) -> None:
if self._detection_timeout_timer is not None:
self._detection_timeout_timer.cancel()
self._detection_timeout_timer = None
self.emit("amd_result", self._verdict_result)
self.emit("amd_prediction", self._verdict_result)
self._emitted = True

@log_exceptions(logger=logger)
Expand All @@ -252,7 +252,7 @@ def _silence_timer_callback(
) -> None:
if is_given(category) and is_given(reason) and self._verdict_result is None:
self._set_verdict(
AMDResult(
AMDPredictionEvent(
speech_duration=speech_duration or self.speech_duration,
category=category,
reason=reason,
Expand Down Expand Up @@ -296,7 +296,7 @@ async def save_prediction(label: AMDCategory) -> None:
"""Save the prediction to the verdict."""
if label != AMDCategory.UNCERTAIN:
self._set_verdict(
AMDResult(
AMDPredictionEvent(
speech_duration=self.speech_duration,
category=label,
reason="llm",
Expand Down
24 changes: 15 additions & 9 deletions livekit-agents/livekit/agents/voice/amd/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
from types import TracebackType
from typing import TYPE_CHECKING, TypedDict
from typing import TYPE_CHECKING, Literal, TypedDict

from opentelemetry import trace

Expand All @@ -15,7 +15,7 @@
from ...stt import STT as _STT
from ...telemetry import trace_types, tracer
from ...types import NOT_GIVEN, NotGivenOr
from ...utils import aio, is_given
from ...utils import EventEmitter, aio, is_given
from ...utils.participant import wait_for_track_publication
from .classifier import (
AMD_PROMPT,
Expand All @@ -25,7 +25,7 @@
NO_SPEECH_THRESHOLD,
TIMEOUT,
AMDCategory,
AMDResult,
AMDPredictionEvent,
_AMDClassifier,
)

Expand Down Expand Up @@ -75,7 +75,7 @@ class DetectionOptions(TypedDict, total=False):
}


class AMD:
class AMD(EventEmitter[Literal["amd_prediction"]]):
"""Answering Machine Detection (AMD).

Detects whether an outbound call is answered by a human or a machine.
Expand Down Expand Up @@ -135,6 +135,7 @@ def __init__(
suppress_compatibility_warning: bool = False,
detection_options: NotGivenOr[DetectionOptions] = NOT_GIVEN,
) -> None:
super().__init__()
self._llm_config: NotGivenOr[LLM | LLMModels | str] = llm
self._session: AgentSession = session
self._interrupt_on_machine = interrupt_on_machine
Expand All @@ -144,7 +145,7 @@ def __init__(
self._stt: NotGivenOr[_STT] = _InferenceSTT(stt) if isinstance(stt, str) else stt

self._classifier: _AMDClassifier | None = None
self._result: AMDResult | None = None
self._result: AMDPredictionEvent | None = None
self._closed = False
self._span: trace.Span | None = None

Expand Down Expand Up @@ -176,7 +177,7 @@ def pending(self) -> bool:
def started(self) -> bool:
return self._classifier is not None and self._classifier.started

async def execute(self) -> AMDResult:
async def execute(self) -> AMDPredictionEvent:
"""Run AMD and return the result.

While executing, speech playout authorization is locked. Once the
Expand Down Expand Up @@ -250,7 +251,7 @@ async def aclose(self) -> None:
self._stt_task = None

if self._classifier:
self._classifier.off("amd_result", self._on_amd_result)
self._classifier.off("amd_prediction", self._on_amd_prediction)
await self._classifier.close()
self._classifier = None

Expand All @@ -276,7 +277,7 @@ async def _run(self, session: AgentSession) -> None:
raise ValueError(
"AMD classifier could not be resolved, please provide a compatible model"
)
self._classifier.on("amd_result", self._on_amd_result)
self._classifier.on("amd_prediction", self._on_amd_prediction)
self._closed = False
self._result = None

Expand Down Expand Up @@ -361,7 +362,7 @@ async def _receive() -> None:
finally:
await aio.cancel_and_wait(*tasks)

def _on_amd_result(self, result: AMDResult) -> None:
def _on_amd_prediction(self, result: AMDPredictionEvent) -> None:
self._result = result
if self._classifier:
self._classifier.end_input()
Expand Down Expand Up @@ -396,6 +397,11 @@ def _on_amd_result(self, result: AMDResult) -> None:
except RuntimeError:
pass

if (host := self._session._session_host) is not None:
host._on_amd_prediction(result)

self.emit("amd_prediction", result)

def _start_span(self) -> None:
if self._span:
return
Expand Down
31 changes: 31 additions & 0 deletions livekit-agents/livekit/agents/voice/remote_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections.abc import AsyncIterator, Mapping, Sequence
from typing import TYPE_CHECKING, Any

from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp

from livekit import rtc
Expand All @@ -32,6 +33,7 @@
TTSModelUsage,
)
from ..version import __version__
from ..voice.amd import AMDCategory, AMDPredictionEvent
from .events import (
AgentState,
AgentStateChangedEvent,
Expand Down Expand Up @@ -248,6 +250,14 @@ async def __anext__(self) -> agent_pb.AgentSessionMessage:
"e2e_latency",
)

_AMD_CATEGORY_MAP: dict[AMDCategory, agent_pb.AmdCategory] = {
AMDCategory.HUMAN: agent_pb.AmdCategory.AMD_HUMAN,
AMDCategory.MACHINE_IVR: agent_pb.AmdCategory.AMD_MACHINE_IVR,
AMDCategory.MACHINE_VM: agent_pb.AmdCategory.AMD_MACHINE_VM,
AMDCategory.MACHINE_UNAVAILABLE: agent_pb.AmdCategory.AMD_MACHINE_UNAVAILABLE,
AMDCategory.UNCERTAIN: agent_pb.AmdCategory.AMD_UNCERTAIN,
}


def _tool_names(tools: Sequence[llm.Tool | Toolset]) -> list[str]:
result: list[str] = []
Expand Down Expand Up @@ -524,6 +534,27 @@ def _on_overlapping_speech(self, event: OverlappingSpeechEvent) -> None:

self._send_event(agent_pb.AgentSessionEvent(overlapping_speech=pb))

def _on_amd_prediction(self, event: AMDPredictionEvent) -> None:
speech_duration = Duration()
speech_duration.FromNanoseconds(int(event.speech_duration * 1e9))

delay = Duration()
delay.FromNanoseconds(int(event.delay * 1e9))

self._send_event(
agent_pb.AgentSessionEvent(
amd_prediction=agent_pb.AgentSessionEvent.AmdPrediction(
speech_duration=speech_duration,
delay=delay,
category=_AMD_CATEGORY_MAP[event.category],
reason=event.reason,
transcript=event.transcript,
)
)
)

# TODO: @chenghao-mou add EOT prediction event

def _on_session_usage_updated(self, event: SessionUsageUpdatedEvent) -> None:
self._send_event(
agent_pb.AgentSessionEvent(
Expand Down
Loading