diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 37fffb1e..39754b4f 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -26,7 +26,7 @@ from dataclasses import dataclass, field from typing import ClassVar, SupportsBytes, TypeVar -from bumble import avc, avctp, core, hci, l2cap, utils +from bumble import avc, avctp, core, hci, l2cap, sdp, utils from bumble.colors import color from bumble.device import Connection, Device from bumble.sdp import ( @@ -194,82 +194,43 @@ class TargetFeatures(enum.IntFlag): # ----------------------------------------------------------------------------- -def make_controller_service_sdp_records( - service_record_handle: int, - avctp_version: tuple[int, int] = (1, 4), - avrcp_version: tuple[int, int] = (1, 6), - supported_features: int | ControllerFeatures = 1, -) -> list[ServiceAttribute]: - avctp_version_int = avctp_version[0] << 8 | avctp_version[1] - avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] +@dataclass +class ControllerServiceSdpRecord: + service_record_handle: int + avctp_version: tuple[int, int] = (1, 4) + avrcp_version: tuple[int, int] = (1, 6) + supported_features: int | ControllerFeatures = ControllerFeatures(1) - attributes = [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), - ), - ServiceAttribute( - SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, - DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), - ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE), - ] + def to_service_attributes(self) -> list[ServiceAttribute]: + avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1] + avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1] + + attributes = [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(self.service_record_handle), ), - ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp.AVCTP_PSM), - ] - ), - DataElement.sequence( - [ - DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp_version_int), - ] - ), - ] + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), ), - ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), - DataElement.unsigned_integer_16(avrcp_version_int), - ] - ), - ] + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE), + ] + ), ), - ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(supported_features), - ), - ] - if supported_features & ControllerFeatures.SUPPORTS_BROWSING: - attributes.append( ServiceAttribute( - SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, DataElement.sequence( [ DataElement.sequence( [ DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16( - avctp.AVCTP_BROWSING_PSM - ), + DataElement.unsigned_integer_16(avctp.AVCTP_PSM), ] ), DataElement.sequence( @@ -281,87 +242,130 @@ def make_controller_service_sdp_records( ] ), ), - ) - return attributes - - -# ----------------------------------------------------------------------------- -def make_target_service_sdp_records( - service_record_handle: int, - avctp_version: tuple[int, int] = (1, 4), - avrcp_version: tuple[int, int] = (1, 6), - supported_features: int | TargetFeatures = 0x23, -) -> list[ServiceAttribute]: - # TODO: support a way to compute the supported features from a feature list - avctp_version_int = avctp_version[0] << 8 | avctp_version[1] - avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] - - attributes = [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), - ), - ServiceAttribute( - SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, - DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), - ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE), - ] + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), + DataElement.unsigned_integer_16(avrcp_version_int), + ] + ), + ] + ), ), - ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp.AVCTP_PSM), - ] - ), - DataElement.sequence( - [ - DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp_version_int), - ] - ), - ] + ServiceAttribute( + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(self.supported_features), ), - ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ + ] + if self.supported_features & ControllerFeatures.SUPPORTS_BROWSING: + attributes.append( + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, DataElement.sequence( [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), - DataElement.unsigned_integer_16(avrcp_version_int), + DataElement.sequence( + [ + DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + avctp.AVCTP_BROWSING_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), + DataElement.unsigned_integer_16(avctp_version_int), + ] + ), ] ), - ] + ), + ) + return attributes + + @classmethod + async def find(cls, connection: Connection) -> list[ControllerServiceSdpRecord]: + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE], + attribute_ids=[ + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + + records: list[ControllerServiceSdpRecord] = [] + for attribute_lists in search_result: + record = cls(0) + for attribute in attribute_lists: + if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: + record.service_record_handle = attribute.value.value + elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + # [[L2CAP, PSM], [AVCTP, version]] + record.avctp_version = ( + attribute.value.value[1].value[1].value >> 8, + attribute.value.value[1].value[1].value & 0xFF, + ) + elif ( + attribute.id + == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + # [[AV_REMOTE_CONTROL, version]] + record.avrcp_version = ( + attribute.value.value[0].value[1].value >> 8, + attribute.value.value[0].value[1].value & 0xFF, + ) + elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + record.supported_features = ControllerFeatures( + attribute.value.value + ) + records.append(record) + return records + + +# ----------------------------------------------------------------------------- +@dataclass +class TargetServiceSdpRecord: + service_record_handle: int + avctp_version: tuple[int, int] = (1, 4) + avrcp_version: tuple[int, int] = (1, 6) + supported_features: int | TargetFeatures = TargetFeatures(0x23) + + def to_service_attributes(self) -> list[ServiceAttribute]: + # TODO: support a way to compute the supported features from a feature list + avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1] + avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1] + + attributes = [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(self.service_record_handle), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE), + ] + ), ), - ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(supported_features), - ), - ] - if supported_features & TargetFeatures.SUPPORTS_BROWSING: - attributes.append( ServiceAttribute( - SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, DataElement.sequence( [ DataElement.sequence( [ DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16( - avctp.AVCTP_BROWSING_PSM - ), + DataElement.unsigned_integer_16(avctp.AVCTP_PSM), ] ), DataElement.sequence( @@ -373,8 +377,90 @@ def make_target_service_sdp_records( ] ), ), - ) - return attributes + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), + DataElement.unsigned_integer_16(avrcp_version_int), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(self.supported_features), + ), + ] + if self.supported_features & TargetFeatures.SUPPORTS_BROWSING: + attributes.append( + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + avctp.AVCTP_BROWSING_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), + DataElement.unsigned_integer_16(avctp_version_int), + ] + ), + ] + ), + ), + ) + return attributes + + @classmethod + async def find(cls, connection: Connection) -> list[TargetServiceSdpRecord]: + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE], + attribute_ids=[ + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + + records: list[TargetServiceSdpRecord] = [] + for attribute_lists in search_result: + record = cls(0) + for attribute in attribute_lists: + if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: + record.service_record_handle = attribute.value.value + elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + # [[L2CAP, PSM], [AVCTP, version]] + record.avctp_version = ( + attribute.value.value[1].value[1].value >> 8, + attribute.value.value[1].value[1].value & 0xFF, + ) + elif ( + attribute.id + == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + # [[AV_REMOTE_CONTROL, version]] + record.avrcp_version = ( + attribute.value.value[0].value[1].value >> 8, + attribute.value.value[0].value[1].value & 0xFF, + ) + elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + record.supported_features = TargetFeatures( + attribute.value.value + ) + records.append(record) + return records # ----------------------------------------------------------------------------- @@ -1204,6 +1290,10 @@ class InformBatteryStatusOfCtResponse(Response): @dataclass class GetPlayStatusResponse(Response): pdu_id = PduId.GET_PLAY_STATUS + + # TG doesn't support Song Length or Position. + UNAVAILABLE = 0xFFFFFFFF + song_length: int = field(metadata=hci.metadata(">4")) song_position: int = field(metadata=hci.metadata(">4")) play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1)) @@ -1521,16 +1611,33 @@ class Error(Exception): def __init__(self, status_code: StatusCode) -> None: self.status_code = status_code + class AvcError(Exception): + """The delegate AVC method failed, with a specified status code.""" + + def __init__(self, status_code: avc.ResponseFrame.ResponseCode) -> None: + self.status_code = status_code + supported_events: list[EventId] + supported_company_ids: list[int] volume: int + playback_status: PlayStatus - def __init__(self, supported_events: Iterable[EventId] = ()) -> None: + def __init__( + self, + supported_events: Iterable[EventId] = (), + supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,), + ) -> None: + self.supported_company_ids = list(supported_company_ids) self.supported_events = list(supported_events) self.volume = 0 + self.playback_status = PlayStatus.STOPPED async def get_supported_events(self) -> list[EventId]: return self.supported_events + async def get_supported_company_ids(self) -> list[int]: + return self.supported_company_ids + async def set_absolute_volume(self, volume: int) -> None: """ Set the absolute volume. @@ -1543,6 +1650,19 @@ async def set_absolute_volume(self, volume: int) -> None: async def get_absolute_volume(self) -> int: return self.volume + async def on_key_event( + self, + key: avc.PassThroughFrame.OperationId, + pressed: bool, + data: bytes, + ) -> None: + logger.debug( + "@@@ on_key_event: key=%s, pressed=%s, data=%s", key, pressed, data.hex() + ) + + async def get_playback_status(self) -> PlayStatus: + return self.playback_status + # TODO add other delegate methods @@ -1756,6 +1876,19 @@ async def get_supported_events(self) -> list[EventId]: if isinstance(capability, EventId) ) + async def get_supported_company_ids(self) -> list[int]: + """Get the list of events supported by the connected peer.""" + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + GetCapabilitiesCommand(GetCapabilitiesCommand.CapabilityId.COMPANY_ID), + ) + response = self._check_response(response_context, GetCapabilitiesResponse) + return list( + int.from_bytes(capability, 'big') + for capability in response.capabilities + if isinstance(capability, bytes) + ) + async def get_play_status(self) -> SongAndPlayStatus: """Get the play status of the connected peer.""" response_context = await self.send_avrcp_command( @@ -2052,16 +2185,28 @@ def _on_avctp_command(self, transaction_label: int, payload: bytes) -> None: return if isinstance(command, avc.PassThroughCommandFrame): - # TODO: delegate - response = avc.PassThroughResponseFrame( - avc.ResponseFrame.ResponseCode.ACCEPTED, - command.subunit_type, - command.subunit_id, - command.state_flag, - command.operation_id, - command.operation_data, - ) - self.send_response(transaction_label, response) + + async def dispatch_key_event() -> None: + try: + await self.delegate.on_key_event( + command.operation_id, + command.state_flag == avc.PassThroughFrame.StateFlag.PRESSED, + command.operation_data, + ) + response_code = avc.ResponseFrame.ResponseCode.ACCEPTED + except Delegate.AvcError as error: + logger.exception("delegate method raised exception") + response_code = error.status_code + except Exception: + logger.exception("delegate method raised exception") + response_code = avc.ResponseFrame.ResponseCode.REJECTED + self.send_passthrough_response( + transaction_label=transaction_label, + command=command, + response_code=response_code, + ) + + utils.AsyncRunner.spawn(dispatch_key_event()) return # TODO handle other types @@ -2141,6 +2286,8 @@ def _on_command_pdu(self, pdu_id: PduId, pdu: bytes) -> None: self._on_set_absolute_volume_command(transaction_label, command) elif isinstance(command, RegisterNotificationCommand): self._on_register_notification_command(transaction_label, command) + elif isinstance(command, GetPlayStatusCommand): + self._on_get_play_status_command(transaction_label, command) else: # Not supported. # TODO: check that this is the right way to respond in this case. @@ -2364,17 +2511,27 @@ def _on_get_capabilities_command( logger.debug(f"<<< AVRCP command PDU: {command}") async def get_supported_events() -> None: + capabilities: Sequence[bytes | SupportsBytes] if ( command.capability_id - != GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED + == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED ): - raise core.InvalidArgumentError() - - supported_events = await self.delegate.get_supported_events() + capabilities = await self.delegate.get_supported_events() + elif ( + command.capability_id == GetCapabilitiesCommand.CapabilityId.COMPANY_ID + ): + company_ids = await self.delegate.get_supported_company_ids() + capabilities = [ + company_id.to_bytes(3, 'big') for company_id in company_ids + ] + else: + raise core.InvalidArgumentError( + f"Unsupported capability: {command.capability_id}" + ) self.send_avrcp_response( transaction_label, avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, - GetCapabilitiesResponse(command.capability_id, supported_events), + GetCapabilitiesResponse(command.capability_id, capabilities), ) self._delegate_command(transaction_label, command, get_supported_events()) @@ -2395,6 +2552,26 @@ async def set_absolute_volume() -> None: self._delegate_command(transaction_label, command, set_absolute_volume()) + def _on_get_play_status_command( + self, transaction_label: int, command: GetPlayStatusCommand + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_playback_status() -> None: + play_status: PlayStatus = await self.delegate.get_playback_status() + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + GetPlayStatusResponse( + # TODO: Delegate this. + song_length=GetPlayStatusResponse.UNAVAILABLE, + song_position=GetPlayStatusResponse.UNAVAILABLE, + play_status=play_status, + ), + ) + + self._delegate_command(transaction_label, command, get_playback_status()) + def _on_register_notification_command( self, transaction_label: int, command: RegisterNotificationCommand ) -> None: @@ -2410,28 +2587,27 @@ async def register_notification() -> None: ) return + response: Response if command.event_id == EventId.VOLUME_CHANGED: volume = await self.delegate.get_absolute_volume() response = RegisterNotificationResponse(VolumeChangedEvent(volume)) - self.send_avrcp_response( - transaction_label, - avc.ResponseFrame.ResponseCode.INTERIM, - response, - ) - self._register_notification_listener(transaction_label, command) - return - - if command.event_id == EventId.PLAYBACK_STATUS_CHANGED: - # TODO: testing only, use delegate + elif command.event_id == EventId.PLAYBACK_STATUS_CHANGED: + playback_status = await self.delegate.get_playback_status() response = RegisterNotificationResponse( - PlaybackStatusChangedEvent(play_status=PlayStatus.PLAYING) - ) - self.send_avrcp_response( - transaction_label, - avc.ResponseFrame.ResponseCode.INTERIM, - response, + PlaybackStatusChangedEvent(play_status=playback_status) ) - self._register_notification_listener(transaction_label, command) + elif command.event_id == EventId.NOW_PLAYING_CONTENT_CHANGED: + playback_status = await self.delegate.get_playback_status() + response = RegisterNotificationResponse(NowPlayingContentChangedEvent()) + else: + logger.warning("Event supported but not handled %s", command.event_id) return + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.INTERIM, + response, + ) + self._register_notification_listener(transaction_label, command) + self._delegate_command(transaction_label, command, register_notification()) diff --git a/examples/run_avrcp.py b/examples/run_avrcp.py index 94d2a6ba..13b34d53 100644 --- a/examples/run_avrcp.py +++ b/examples/run_avrcp.py @@ -25,7 +25,7 @@ import websockets.asyncio.server import bumble.logging -from bumble import a2dp, avc, avdtp, avrcp, utils +from bumble import a2dp, avc, avdtp, avrcp, sdp, utils from bumble.core import PhysicalTransport from bumble.device import Device from bumble.transport import open_transport @@ -34,7 +34,7 @@ # ----------------------------------------------------------------------------- -def sdp_records(): +def sdp_records() -> dict[int, list[sdp.ServiceAttribute]]: a2dp_sink_service_record_handle = 0x00010001 avrcp_controller_service_record_handle = 0x00010002 avrcp_target_service_record_handle = 0x00010003 @@ -43,17 +43,17 @@ def sdp_records(): a2dp_sink_service_record_handle: a2dp.make_audio_sink_service_sdp_records( a2dp_sink_service_record_handle ), - avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records( + avrcp_controller_service_record_handle: avrcp.ControllerServiceSdpRecord( avrcp_controller_service_record_handle - ), - avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records( - avrcp_controller_service_record_handle - ), + ).to_service_attributes(), + avrcp_target_service_record_handle: avrcp.TargetServiceSdpRecord( + avrcp_target_service_record_handle + ).to_service_attributes(), } # ----------------------------------------------------------------------------- -def codec_capabilities(): +def codec_capabilities() -> avdtp.MediaCodecCapabilities: return avdtp.MediaCodecCapabilities( media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=a2dp.A2DP_SBC_CODEC_TYPE, @@ -81,20 +81,22 @@ def codec_capabilities(): # ----------------------------------------------------------------------------- -def on_avdtp_connection(server): +def on_avdtp_connection(server: avdtp.Protocol) -> None: # Add a sink endpoint to the server sink = server.add_sink(codec_capabilities()) - sink.on('rtp_packet', on_rtp_packet) + sink.on(sink.EVENT_RTP_PACKET, on_rtp_packet) # ----------------------------------------------------------------------------- -def on_rtp_packet(packet): +def on_rtp_packet(packet: avdtp.MediaPacket) -> None: print(f'RTP: {packet}') # ----------------------------------------------------------------------------- -def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer): - async def get_supported_events(): +def on_avrcp_start( + avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer +) -> None: + async def get_supported_events() -> None: events = await avrcp_protocol.get_supported_events() print("SUPPORTED EVENTS:", events) websocket_server.send_message( @@ -130,14 +132,14 @@ async def get_supported_events(): utils.AsyncRunner.spawn(get_supported_events()) - async def monitor_track_changed(): + async def monitor_track_changed() -> None: async for identifier in avrcp_protocol.monitor_track_changed(): print("TRACK CHANGED:", identifier.hex()) websocket_server.send_message( {"type": "track-changed", "params": {"identifier": identifier.hex()}} ) - async def monitor_playback_status(): + async def monitor_playback_status() -> None: async for playback_status in avrcp_protocol.monitor_playback_status(): print("PLAYBACK STATUS CHANGED:", playback_status.name) websocket_server.send_message( @@ -147,7 +149,7 @@ async def monitor_playback_status(): } ) - async def monitor_playback_position(): + async def monitor_playback_position() -> None: async for playback_position in avrcp_protocol.monitor_playback_position( playback_interval=1 ): @@ -159,7 +161,7 @@ async def monitor_playback_position(): } ) - async def monitor_player_application_settings(): + async def monitor_player_application_settings() -> None: async for settings in avrcp_protocol.monitor_player_application_settings(): print("PLAYER APPLICATION SETTINGS:", settings) settings_as_dict = [ @@ -173,14 +175,14 @@ async def monitor_player_application_settings(): } ) - async def monitor_available_players(): + async def monitor_available_players() -> None: async for _ in avrcp_protocol.monitor_available_players(): print("AVAILABLE PLAYERS CHANGED") websocket_server.send_message( {"type": "available-players-changed", "params": {}} ) - async def monitor_addressed_player(): + async def monitor_addressed_player() -> None: async for player in avrcp_protocol.monitor_addressed_player(): print("ADDRESSED PLAYER CHANGED") websocket_server.send_message( @@ -195,7 +197,7 @@ async def monitor_addressed_player(): } ) - async def monitor_uids(): + async def monitor_uids() -> None: async for uid_counter in avrcp_protocol.monitor_uids(): print("UIDS CHANGED") websocket_server.send_message( @@ -207,7 +209,7 @@ async def monitor_uids(): } ) - async def monitor_volume(): + async def monitor_volume() -> None: async for volume in avrcp_protocol.monitor_volume(): print("VOLUME CHANGED:", volume) websocket_server.send_message( @@ -360,7 +362,7 @@ async def main() -> None: # Create a listener to wait for AVDTP connections listener = avdtp.Listener(avdtp.Listener.create_registrar(device)) - listener.on('connection', on_avdtp_connection) + listener.on(listener.EVENT_CONNECTION, on_avdtp_connection) avrcp_delegate = Delegate() avrcp_protocol = avrcp.Protocol(avrcp_delegate) diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index c7554f9d..755ff179 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -17,6 +17,7 @@ # ----------------------------------------------------------------------------- from __future__ import annotations +import asyncio import struct from collections.abc import Sequence @@ -422,6 +423,47 @@ def test_passthrough_commands(): assert bytes(parsed) == play_pressed_bytes +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_find_sdp_records(): + two_devices = await TwoDevices.create_with_avdtp() + + # Add SDP records to device 1 + controller_record = avrcp.ControllerServiceSdpRecord( + service_record_handle=0x10001, + avctp_version=(1, 4), + avrcp_version=(1, 6), + supported_features=( + avrcp.ControllerFeatures.CATEGORY_1 + | avrcp.ControllerFeatures.SUPPORTS_BROWSING + ), + ) + target_record = avrcp.TargetServiceSdpRecord( + service_record_handle=0x10002, + avctp_version=(1, 4), + avrcp_version=(1, 6), + supported_features=( + avrcp.TargetFeatures.CATEGORY_1 | avrcp.TargetFeatures.SUPPORTS_BROWSING + ), + ) + + two_devices.devices[1].sdp_service_records = { + 0x10001: controller_record.to_service_attributes(), + 0x10002: target_record.to_service_attributes(), + } + + # Find records from device 0 + controller_records = await avrcp.ControllerServiceSdpRecord.find( + two_devices.connections[0] + ) + assert len(controller_records) == 1 + assert controller_records[0] == controller_record + + target_records = await avrcp.TargetServiceSdpRecord.find(two_devices.connections[0]) + assert len(target_records) == 1 + assert target_records[0] == target_record + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_supported_events(): @@ -436,6 +478,163 @@ async def test_get_supported_events(): assert supported_events == [avrcp.EventId.VOLUME_CHANGED] +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_passthrough_key_event(): + two_devices = await TwoDevices.create_with_avdtp() + + q = asyncio.Queue[tuple[avc.PassThroughFrame.OperationId, bool, bytes]]() + + class Delegate(avrcp.Delegate): + async def on_key_event( + self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes + ) -> None: + q.put_nowait((key, pressed, data)) + + two_devices.protocols[1].delegate = Delegate() + + for key, pressed in [ + (avc.PassThroughFrame.OperationId.PLAY, True), + (avc.PassThroughFrame.OperationId.PLAY, False), + (avc.PassThroughFrame.OperationId.PAUSE, True), + (avc.PassThroughFrame.OperationId.PAUSE, False), + ]: + await two_devices.protocols[0].send_key_event(key, pressed) + assert (await q.get()) == (key, pressed, b'') + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_passthrough_key_event_rejected(): + two_devices = await TwoDevices.create_with_avdtp() + + class Delegate(avrcp.Delegate): + async def on_key_event( + self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes + ) -> None: + raise avrcp.Delegate.AvcError(avc.ResponseFrame.ResponseCode.REJECTED) + + two_devices.protocols[1].delegate = Delegate() + + response = await two_devices.protocols[0].send_key_event( + avc.PassThroughFrame.OperationId.PLAY, True + ) + assert response.response == avc.ResponseFrame.ResponseCode.REJECTED + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_passthrough_key_event_exception(): + two_devices = await TwoDevices.create_with_avdtp() + + class Delegate(avrcp.Delegate): + async def on_key_event( + self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes + ) -> None: + raise Exception() + + two_devices.protocols[1].delegate = Delegate() + + response = await two_devices.protocols[0].send_key_event( + avc.PassThroughFrame.OperationId.PLAY, True + ) + assert response.response == avc.ResponseFrame.ResponseCode.REJECTED + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_set_volume(): + two_devices = await TwoDevices.create_with_avdtp() + + for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1): + response = await two_devices.protocols[1].send_avrcp_command( + avc.CommandFrame.CommandType.CONTROL, avrcp.SetAbsoluteVolumeCommand(volume) + ) + assert isinstance(response.response, avrcp.SetAbsoluteVolumeResponse) + assert response.response.volume == volume + assert two_devices.protocols[0].delegate.volume == volume + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_get_playback_status(): + two_devices = await TwoDevices.create_with_avdtp() + + for status in avrcp.PlayStatus: + two_devices.protocols[0].delegate.playback_status = status + response = await two_devices.protocols[1].get_play_status() + assert response.play_status == status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_get_supported_company_ids(): + two_devices = await TwoDevices.create_with_avdtp() + + for status in avrcp.PlayStatus: + two_devices.protocols[0].delegate = avrcp.Delegate( + supported_company_ids=[avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID] + ) + supported_company_ids = await two_devices.protocols[ + 1 + ].get_supported_company_ids() + assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID] + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_volume(): + two_devices = await TwoDevices.create_with_avdtp() + + two_devices.protocols[1].delegate = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED]) + volume_iter = two_devices.protocols[0].monitor_volume() + + for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1): + # Interim + two_devices.protocols[1].delegate.volume = 0 + assert (await anext(volume_iter)) == 0 + # Changed + two_devices.protocols[1].notify_volume_changed(volume) + assert (await anext(volume_iter)) == volume + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_playback_status(): + two_devices = await TwoDevices.create_with_avdtp() + + two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.PLAYBACK_STATUS_CHANGED] + ) + playback_status_iter = two_devices.protocols[0].monitor_playback_status() + + for playback_status in avrcp.PlayStatus: + # Interim + two_devices.protocols[1].delegate.playback_status = avrcp.PlayStatus.STOPPED + assert (await anext(playback_status_iter)) == avrcp.PlayStatus.STOPPED + # Changed + two_devices.protocols[1].notify_playback_status_changed(playback_status) + assert (await anext(playback_status_iter)) == playback_status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_now_playing_content(): + two_devices = await TwoDevices.create_with_avdtp() + + two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.NOW_PLAYING_CONTENT_CHANGED] + ) + now_playing_iter = two_devices.protocols[0].monitor_now_playing_content() + + for _ in range(2): + # Interim + await anext(now_playing_iter) + # Changed + two_devices.protocols[1].notify_now_playing_content_changed() + await anext(now_playing_iter) + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_frame_parser()