Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import weakref
from dataclasses import dataclass
from typing import Any, Literal, TypedDict
from urllib.parse import quote

import aiohttp

Expand Down Expand Up @@ -74,6 +75,8 @@ class STTOptions:
sample_rate: STTRealtimeSampleRates
server_vad: NotGivenOr[VADOptions | None]
keyterms: NotGivenOr[list[str]]
no_verbatim: NotGivenOr[bool]
enable_logging: NotGivenOr[bool]


class STT(stt.STT):
Expand All @@ -91,6 +94,8 @@ def __init__(
http_session: aiohttp.ClientSession | None = None,
model_id: NotGivenOr[ElevenLabsSTTModels | str] = NOT_GIVEN,
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
no_verbatim: NotGivenOr[bool] = NOT_GIVEN,
enable_logging: NotGivenOr[bool] = NOT_GIVEN,
) -> None:
"""
Create a new instance of ElevenLabs STT.
Expand All @@ -109,9 +114,16 @@ def __init__(
model_id (ElevenLabsSTTModels | str): ElevenLabs STT model to use. If not specified a default model will
be selected based on parameters provided.
keyterms (NotGivenOr[list[str]]): A list of keywords or phrases to bias the transcription towards.
Each keyterm can contain at most 5 words and must be less than 50 characters.
Maximum of 100 keyterms. Only supported for Scribe v2 batch recognition
(not realtime streaming). Usage incurs additional costs.
Limits differ by mode:
- Scribe v2 batch: up to 100 keyterms, each at most 5 words and less than 50 characters.
- Scribe v2 realtime: up to 50 keyterms, each at most 20 characters.
Usage incurs additional costs.
no_verbatim (NotGivenOr[bool]): If True, filler words and disfluencies are removed from
the transcript. Only supported for the Scribe v2 realtime model. Default is False
(verbatim transcription) when not provided.
enable_logging (NotGivenOr[bool]): If False, activates zero-retention mode: ElevenLabs
will not store or log request data. Only supported for the Scribe v2 realtime model.
Default is True (logging enabled) when not provided.
"""

if is_given(use_realtime):
Expand Down Expand Up @@ -157,6 +169,8 @@ def __init__(
include_timestamps=include_timestamps,
model_id=model_id,
keyterms=keyterms,
no_verbatim=no_verbatim,
enable_logging=enable_logging,
)
self._session = http_session
self._streams = weakref.WeakSet[SpeechStream]()
Expand Down Expand Up @@ -280,6 +294,8 @@ def update_options(
tag_audio_events: NotGivenOr[bool] = NOT_GIVEN,
server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
no_verbatim: NotGivenOr[bool] = NOT_GIVEN,
enable_logging: NotGivenOr[bool] = NOT_GIVEN,
) -> None:
if is_given(tag_audio_events):
self._opts.tag_audio_events = tag_audio_events
Expand All @@ -290,8 +306,19 @@ def update_options(
if is_given(keyterms):
self._opts.keyterms = keyterms

if is_given(no_verbatim):
self._opts.no_verbatim = no_verbatim

if is_given(enable_logging):
self._opts.enable_logging = enable_logging

for stream in self._streams:
stream.update_options(server_vad=server_vad)
stream.update_options(
server_vad=server_vad,
keyterms=keyterms,
no_verbatim=no_verbatim,
enable_logging=enable_logging,
)

def stream(
self,
Expand Down Expand Up @@ -337,9 +364,29 @@ def update_options(
self,
*,
server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
no_verbatim: NotGivenOr[bool] = NOT_GIVEN,
enable_logging: NotGivenOr[bool] = NOT_GIVEN,
) -> None:
reconnect = False

if is_given(server_vad):
self._opts.server_vad = server_vad
reconnect = True

if is_given(keyterms):
self._opts.keyterms = keyterms
reconnect = True

if is_given(no_verbatim):
self._opts.no_verbatim = no_verbatim
reconnect = True

if is_given(enable_logging):
self._opts.enable_logging = enable_logging
reconnect = True

if reconnect:
self._reconnect_event.set()

def _on_audio_duration_report(self, duration: float) -> None:
Expand Down Expand Up @@ -509,6 +556,16 @@ async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
if self._opts.include_timestamps:
params.append("include_timestamps=true")

if is_given(self._opts.keyterms):
for keyterm in self._opts.keyterms:
params.append(f"keyterms={quote(keyterm, safe='')}")

if is_given(self._opts.no_verbatim):
params.append(f"no_verbatim={str(self._opts.no_verbatim).lower()}")

if is_given(self._opts.enable_logging):
params.append(f"enable_logging={str(self._opts.enable_logging).lower()}")

query_string = "&".join(params)

# Convert HTTPS URL to WSS
Expand Down
158 changes: 158 additions & 0 deletions tests/test_plugin_elevenlabs_stt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Tests for ElevenLabs STT plugin configuration options."""

from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock

import pytest

from livekit.agents.types import NOT_GIVEN
from livekit.plugins.elevenlabs import STT


def test_keyterms_default():
stt = STT(api_key="test-key")
assert stt._opts.keyterms is NOT_GIVEN


def test_keyterms_set():
stt = STT(api_key="test-key", keyterms=["LiveKit", "Scribe"])
assert stt._opts.keyterms == ["LiveKit", "Scribe"]


def test_keyterms_update():
stt = STT(api_key="test-key")
stt.update_options(keyterms=["foo", "bar"])
assert stt._opts.keyterms == ["foo", "bar"]


def test_no_verbatim_default():
stt = STT(api_key="test-key")
assert stt._opts.no_verbatim is NOT_GIVEN


def test_no_verbatim_set():
stt = STT(api_key="test-key", no_verbatim=True)
assert stt._opts.no_verbatim is True


def test_no_verbatim_update():
stt = STT(api_key="test-key")
stt.update_options(no_verbatim=False)
assert stt._opts.no_verbatim is False


def test_enable_logging_default():
stt = STT(api_key="test-key")
assert stt._opts.enable_logging is NOT_GIVEN


def test_enable_logging_set():
stt = STT(api_key="test-key", enable_logging=False)
assert stt._opts.enable_logging is False


def test_enable_logging_update():
stt = STT(api_key="test-key")
stt.update_options(enable_logging=True)
assert stt._opts.enable_logging is True


@pytest.mark.asyncio
async def test_update_options_triggers_stream_reconnect():
"""Updating keyterms/no_verbatim/enable_logging on STT must trigger active stream reconnect."""
captured: dict[str, str] = {}
stt = STT(
api_key="test-key",
model_id="scribe_v2_realtime",
language_code="en",
http_session=_make_session_mock(captured),
)
stream = stt.stream()
try:
assert not stream._reconnect_event.is_set()
stt.update_options(keyterms=["foo"])
assert stream._reconnect_event.is_set()
assert stream._opts.keyterms == ["foo"]

stream._reconnect_event.clear()
stt.update_options(no_verbatim=True)
assert stream._reconnect_event.is_set()
assert stream._opts.no_verbatim is True

stream._reconnect_event.clear()
stt.update_options(enable_logging=False)
assert stream._reconnect_event.is_set()
assert stream._opts.enable_logging is False
finally:
await stream.aclose()


def test_combined_options():
stt = STT(
api_key="test-key",
keyterms=["alpha", "beta"],
no_verbatim=True,
enable_logging=False,
)
assert stt._opts.keyterms == ["alpha", "beta"]
assert stt._opts.no_verbatim is True
assert stt._opts.enable_logging is False


def _make_session_mock(captured: dict[str, str]) -> MagicMock:
async def fake_ws_connect(url: str, **kwargs):
captured["url"] = url
return MagicMock()

session = MagicMock()
session.ws_connect = AsyncMock(side_effect=fake_ws_connect)
return session


@pytest.mark.asyncio
async def test_realtime_query_string_includes_new_params():
"""Verify keyterms, no_verbatim, and enable_logging appear in the realtime WS URL."""
captured: dict[str, str] = {}
stt = STT(
api_key="test-key",
model_id="scribe_v2_realtime",
language_code="en",
keyterms=["hello world", "scribe&v2"],
no_verbatim=True,
enable_logging=False,
http_session=_make_session_mock(captured),
)
stream = stt.stream()
try:
await stream._connect_ws()
finally:
await stream.aclose()

url = captured["url"]
assert "keyterms=hello%20world" in url
assert "keyterms=scribe%26v2" in url
assert "no_verbatim=true" in url
assert "enable_logging=false" in url


@pytest.mark.asyncio
async def test_realtime_query_string_omits_unset_params():
"""Unset params should not appear in the realtime WS URL."""
captured: dict[str, str] = {}
stt = STT(
api_key="test-key",
model_id="scribe_v2_realtime",
language_code="en",
http_session=_make_session_mock(captured),
)
stream = stt.stream()
try:
await stream._connect_ws()
finally:
await stream.aclose()

url = captured["url"]
assert "keyterms=" not in url
assert "no_verbatim=" not in url
assert "enable_logging=" not in url