From d279566ee5263fb957763d50bdb9d8cf70786e4b Mon Sep 17 00:00:00 2001 From: pird32 Date: Thu, 16 Apr 2026 17:51:52 +0300 Subject: [PATCH 1/3] feat(transport-contract): bounded queue observability and typed events - BaseSender: fix getCacheSize() to track actual capacity after resizeCache() (was always returning initial 400 even after resizeCache(128)) - BaseSender: add getQueueSnapshot() returning QueueSnapshot(capacity, items) - BaseSender: add frameLifecycleListener for pooled-copy buffer recycling - BaseSender: emit rate-limited ConnectChecker.onTransportEvent(QueueOverflow) when sendMediaFrame() drops a frame (cooldown: 1500 ms, no flooding) - New: QueueSnapshot data class with usageRatio and summary() - New: TransportEvent sealed class (QueueOverflow, NetworkSendError) - New: FrameLifecycleListener fun interface for buffer pool integration - ConnectChecker: add onTransportEvent(TransportEvent) default no-op method Upstream-friendliness: onTransportEvent is a default method; existing ConnectChecker implementors are not required to override it. Made-with: Cursor --- .../java/com/pedro/common/ConnectChecker.kt | 16 +++++ .../pedro/common/FrameLifecycleListener.kt | 52 +++++++++++++++ .../java/com/pedro/common/QueueSnapshot.kt | 42 ++++++++++++ .../java/com/pedro/common/TransportEvent.kt | 64 +++++++++++++++++++ .../java/com/pedro/common/base/BaseSender.kt | 60 ++++++++++++++++- 5 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 common/src/main/java/com/pedro/common/FrameLifecycleListener.kt create mode 100644 common/src/main/java/com/pedro/common/QueueSnapshot.kt create mode 100644 common/src/main/java/com/pedro/common/TransportEvent.kt diff --git a/common/src/main/java/com/pedro/common/ConnectChecker.kt b/common/src/main/java/com/pedro/common/ConnectChecker.kt index 7745d2f4de..ba800975cf 100644 --- a/common/src/main/java/com/pedro/common/ConnectChecker.kt +++ b/common/src/main/java/com/pedro/common/ConnectChecker.kt @@ -26,4 +26,20 @@ interface ConnectChecker: BitrateChecker { fun onDisconnect() fun onAuthError() fun onAuthSuccess() + + /** + * Typed transport event from the sender layer. + * + * Provides machine-readable signal types so callers can differentiate between + * queue-pressure events ([TransportEvent.QueueOverflow]) and network errors + * ([TransportEvent.NetworkSendError]). + * + * Default implementation is a no-op so existing [ConnectChecker] implementors + * are not required to override this method. A [TransportEvent.NetworkSendError] is + * always accompanied by a corresponding [onConnectionFailed] call for backward + * compatibility. + * + * @param event Typed transport event; use exhaustive `when` for dispatch. + */ + fun onTransportEvent(event: TransportEvent) {} } \ No newline at end of file diff --git a/common/src/main/java/com/pedro/common/FrameLifecycleListener.kt b/common/src/main/java/com/pedro/common/FrameLifecycleListener.kt new file mode 100644 index 0000000000..33d7b142f2 --- /dev/null +++ b/common/src/main/java/com/pedro/common/FrameLifecycleListener.kt @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024 pedroSG94. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pedro.common + +import com.pedro.common.frame.MediaFrame + +/** + * Lifecycle callback for frames sent through a [com.pedro.common.base.BaseSender]. + * + * The callback is invoked from the **sender's dispatch thread** (IO coroutine) after the + * sender has fully consumed a [MediaFrame] — i.e., after the frame data has been encoded + * into protocol packets and written to the network socket. At that point the [MediaFrame.data] + * buffer is no longer referenced by the sender and may be safely returned to a buffer pool. + * + * ## Thread guarantee + * [onFrameConsumed] is called on the sender's internal coroutine dispatcher (typically + * [kotlinx.coroutines.Dispatchers.IO]). Implementations must not block this thread. + * + * ## Usage with FramePayloadPool (RTMP pooled-copy) + * ```kotlin + * rtmpSender.frameLifecycleListener = FrameLifecycleListener { frame -> + * (frame.data as? PooledBuffer)?.release() + * } + * ``` + */ +fun interface FrameLifecycleListener { + + /** + * Called after [frame] has been fully consumed by the sender. + * + * The sender will not access [frame] or [MediaFrame.data] after this call. + * The implementation may safely release, recycle, or pool the underlying buffer. + * + * @param frame The frame that was consumed. Same instance that was passed to + * [com.pedro.common.base.BaseSender.sendMediaFrame]. + */ + fun onFrameConsumed(frame: MediaFrame) +} diff --git a/common/src/main/java/com/pedro/common/QueueSnapshot.kt b/common/src/main/java/com/pedro/common/QueueSnapshot.kt new file mode 100644 index 0000000000..3e71c49b30 --- /dev/null +++ b/common/src/main/java/com/pedro/common/QueueSnapshot.kt @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 pedroSG94. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pedro.common + +/** + * Point-in-time snapshot of a sender's internal frame queue. + * + * Returned by [com.pedro.common.base.BaseSender.getQueueSnapshot]. + * Safe to pass across threads; all fields are immutable. + * + * @param capacity Maximum number of frames the queue can hold. + * @param items Number of frames currently queued. + * @param softThresholdPercent Usage % at which congestion enters soft state (default 70). + * @param hardThresholdPercent Usage % at which congestion enters hard state (default 85). + */ +data class QueueSnapshot( + val capacity: Int, + val items: Int, + val softThresholdPercent: Float = 70f, + val hardThresholdPercent: Float = 85f, +) { + /** Usage in [0.0, 1.0]. 0.0 when capacity is zero. */ + val usageRatio: Double + get() = if (capacity > 0) items.toDouble() / capacity else 0.0 + + /** Human-readable summary for log lines. */ + fun summary(): String = "$items/$capacity (${"%.0f".format(usageRatio * 100)}%)" +} diff --git a/common/src/main/java/com/pedro/common/TransportEvent.kt b/common/src/main/java/com/pedro/common/TransportEvent.kt new file mode 100644 index 0000000000..48ccc33a72 --- /dev/null +++ b/common/src/main/java/com/pedro/common/TransportEvent.kt @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2024 pedroSG94. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pedro.common + +import com.pedro.common.frame.MediaFrame + +/** + * Typed transport events emitted by [com.pedro.common.base.BaseSender]. + * + * These supplement [ConnectChecker.onConnectionFailed] with machine-readable signal types, + * allowing callers to differentiate between queue-pressure events (which do not require a + * network reconnect) and actual network send failures (which do). + * + * Designed as a sealed class so all variants are exhaustive-checked by callers. + */ +sealed class TransportEvent { + + /** + * The sender's internal frame queue has reached capacity and rejected a frame. + * + * This does not indicate a network failure. The connection is healthy; the sender + * cannot consume frames fast enough. Callers should reduce frame rate or bitrate, + * request a keyframe, or clear the queue — but NOT reconnect. + * + * @param frameType Type of the dropped frame (VIDEO or AUDIO). + * @param droppedTotal Total dropped frames of this type since sender start. + * @param queueCapacity Maximum number of frames the queue can hold. + * @param queueSize Number of frames currently in the queue. + */ + data class QueueOverflow( + val frameType: MediaFrame.Type, + val droppedTotal: Long, + val queueCapacity: Int, + val queueSize: Int, + ) : TransportEvent() + + /** + * A network-level send error occurred inside the sender dispatch loop. + * + * This is equivalent to the existing [ConnectChecker.onConnectionFailed] call and + * is emitted in addition to (not instead of) that callback for typed handling. + * + * @param message Human-readable error description. + * @param cause Original exception, if available. + */ + data class NetworkSendError( + val message: String, + val cause: Throwable? = null, + ) : TransportEvent() +} diff --git a/common/src/main/java/com/pedro/common/base/BaseSender.kt b/common/src/main/java/com/pedro/common/base/BaseSender.kt index 154eab3e4d..722a986cc7 100644 --- a/common/src/main/java/com/pedro/common/base/BaseSender.kt +++ b/common/src/main/java/com/pedro/common/base/BaseSender.kt @@ -3,8 +3,12 @@ package com.pedro.common.base import android.util.Log import com.pedro.common.BitrateManager import com.pedro.common.ConnectChecker +import com.pedro.common.FrameLifecycleListener +import com.pedro.common.QueueSnapshot import com.pedro.common.StreamBlockingQueue +import com.pedro.common.TransportEvent import com.pedro.common.frame.MediaFrame +import com.pedro.common.onMainThread import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -22,6 +26,8 @@ abstract class BaseSender( @Volatile protected var running = false + // Tracks actual queue capacity; updated when resizeCache() replaces the queue. + // getCacheSize() reads this field — it must stay in sync with the live queue object. private var cacheSize = 400 @Volatile protected var queue = StreamBlockingQueue(cacheSize) @@ -41,6 +47,19 @@ abstract class BaseSender( @Volatile protected var bytesSendPerSecond = 0L + /** + * Lifecycle callback invoked after the sender's dispatch thread has fully consumed a + * [MediaFrame]. Register a [FrameLifecycleListener] to receive notifications when the + * frame's [MediaFrame.data] buffer may safely be returned to a pool. + * + * Called from the sender's IO coroutine; implementations must not block. + */ + var frameLifecycleListener: FrameLifecycleListener? = null + + // Rate-limit for queue-overflow signals: at most one per OVERFLOW_SIGNAL_COOLDOWN_MS. + private var lastOverflowSignalMs = 0L + private val OVERFLOW_SIGNAL_COOLDOWN_MS = 1_500L + abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean) protected abstract suspend fun onRun() @@ -50,20 +69,43 @@ abstract class BaseSender( if (running && !queue.trySend(mediaFrame)) { when (mediaFrame.type) { MediaFrame.Type.VIDEO -> { - Log.i(TAG, "Video frame discarded") + Log.i(TAG, "Video frame discarded (queue full)") droppedVideoFrames++ } MediaFrame.Type.AUDIO -> { - Log.i(TAG, "Audio frame discarded") + Log.i(TAG, "Audio frame discarded (queue full)") droppedAudioFrames++ } } + // Emit typed queue-overflow event with rate limiting to avoid flooding callers. + val now = System.currentTimeMillis() + if (now - lastOverflowSignalMs >= OVERFLOW_SIGNAL_COOLDOWN_MS) { + lastOverflowSignalMs = now + val snapshot = QueueSnapshot(capacity = cacheSize, items = queue.getSize()) + val event = TransportEvent.QueueOverflow( + frameType = mediaFrame.type, + droppedTotal = if (mediaFrame.type == MediaFrame.Type.VIDEO) droppedVideoFrames else droppedAudioFrames, + queueCapacity = snapshot.capacity, + queueSize = snapshot.items, + ) + scope.launch { onMainThread { connectChecker.onTransportEvent(event) } } + } } } + /** + * Called by subclass [onRun] implementations after a [MediaFrame] has been fully + * dispatched to the network socket. Notifies [frameLifecycleListener] so callers can + * recycle the frame's buffer. + */ + protected fun notifyFrameConsumed(frame: MediaFrame) { + frameLifecycleListener?.onFrameConsumed(frame) + } + fun start() { bitrateManager.reset() queue.clear() + lastOverflowSignalMs = 0L running = true job = scope.launch { val bitrateTask = async { @@ -107,12 +149,26 @@ abstract class BaseSender( val tempQueue = StreamBlockingQueue(newSize) queue.drainTo(tempQueue) queue = tempQueue + cacheSize = newSize // keep getCacheSize() in sync with the new queue capacity } + /** + * Returns the current maximum capacity of the sender frame queue. + * Reflects the value passed to the most recent [resizeCache] call. + */ fun getCacheSize(): Int = cacheSize fun getItemsInCache(): Int = queue.getSize() + /** + * Returns a point-in-time snapshot of the sender queue state. + * Thread-safe; values are read atomically from the backing [StreamBlockingQueue]. + */ + fun getQueueSnapshot(): QueueSnapshot = QueueSnapshot( + capacity = cacheSize, + items = queue.getSize(), + ) + fun clearCache() { queue.clear() } From 45a44171c62a88a18f4fc015e4b1c93da646ceb9 Mon Sep 17 00:00:00 2001 From: pird32 Date: Thu, 16 Apr 2026 17:52:00 +0300 Subject: [PATCH 2/3] feat(senders): notifyFrameConsumed + NetworkSendError typed event in all senders All four protocol senders (RtmpSender, SrtSender, RtspSender, UdpSender) now: - Call notifyFrameConsumed(mediaFrame) after the dispatch loop finishes processing each frame (after network write). Wired to frameLifecycleListener so buffer pools can release slots at the correct point in the lifecycle. - Emit ConnectChecker.onTransportEvent(NetworkSendError) on send errors in addition to the existing onConnectionFailed(String) call (backward compatible). Made-with: Cursor --- rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt | 8 ++++++++ rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt | 8 ++++++++ srt/src/main/java/com/pedro/srt/srt/SrtSender.kt | 8 ++++++++ udp/src/main/java/com/pedro/udp/UdpSender.kt | 8 ++++++++ 4 files changed, 32 insertions(+) diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt index c64513ca64..42b428cb2b 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt @@ -100,10 +100,18 @@ class RtmpSender( bytesSend += size bytesSendPerSecond += size } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false diff --git a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt index dac9bfd860..d8b0ccd1c8 100644 --- a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt +++ b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt @@ -136,10 +136,18 @@ class RtspSender( Log.i(TAG, "wrote $type packet, size $size") } } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false diff --git a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt index 67a2ce64e5..1f9e11d4ec 100644 --- a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt +++ b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt @@ -130,10 +130,18 @@ class SrtSender( bytesSend += bytesPsi + bytes bytesSendPerSecond += bytesPsi + bytes } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false diff --git a/udp/src/main/java/com/pedro/udp/UdpSender.kt b/udp/src/main/java/com/pedro/udp/UdpSender.kt index 619614161c..247068a5ce 100644 --- a/udp/src/main/java/com/pedro/udp/UdpSender.kt +++ b/udp/src/main/java/com/pedro/udp/UdpSender.kt @@ -112,10 +112,18 @@ class UdpSender( bytesSend += bytesPsi + bytes bytesSendPerSecond += bytesPsi + bytes } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false From 99c4fc11ffbafeb50d062e2e62b82f1789eb4452 Mon Sep 17 00:00:00 2001 From: pedroSG94 Date: Sun, 19 Apr 2026 23:28:15 +0200 Subject: [PATCH 3/3] move queue overflow event to async task --- .../java/com/pedro/common/TransportEvent.kt | 10 +++--- .../java/com/pedro/common/base/BaseSender.kt | 32 ++++++++----------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/common/src/main/java/com/pedro/common/TransportEvent.kt b/common/src/main/java/com/pedro/common/TransportEvent.kt index 48ccc33a72..fbcb2d9037 100644 --- a/common/src/main/java/com/pedro/common/TransportEvent.kt +++ b/common/src/main/java/com/pedro/common/TransportEvent.kt @@ -16,8 +16,6 @@ package com.pedro.common -import com.pedro.common.frame.MediaFrame - /** * Typed transport events emitted by [com.pedro.common.base.BaseSender]. * @@ -36,14 +34,14 @@ sealed class TransportEvent { * cannot consume frames fast enough. Callers should reduce frame rate or bitrate, * request a keyframe, or clear the queue — but NOT reconnect. * - * @param frameType Type of the dropped frame (VIDEO or AUDIO). - * @param droppedTotal Total dropped frames of this type since sender start. + * @param droppedVideo Total dropped video frames of this type since sender start. + * @param droppedAudio Total dropped audio frames of this type since sender start. * @param queueCapacity Maximum number of frames the queue can hold. * @param queueSize Number of frames currently in the queue. */ data class QueueOverflow( - val frameType: MediaFrame.Type, - val droppedTotal: Long, + val droppedVideo: Long, + val droppedAudio: Long, val queueCapacity: Int, val queueSize: Int, ) : TransportEvent() diff --git a/common/src/main/java/com/pedro/common/base/BaseSender.kt b/common/src/main/java/com/pedro/common/base/BaseSender.kt index 722a986cc7..5d98ac4374 100644 --- a/common/src/main/java/com/pedro/common/base/BaseSender.kt +++ b/common/src/main/java/com/pedro/common/base/BaseSender.kt @@ -56,10 +56,6 @@ abstract class BaseSender( */ var frameLifecycleListener: FrameLifecycleListener? = null - // Rate-limit for queue-overflow signals: at most one per OVERFLOW_SIGNAL_COOLDOWN_MS. - private var lastOverflowSignalMs = 0L - private val OVERFLOW_SIGNAL_COOLDOWN_MS = 1_500L - abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean) protected abstract suspend fun onRun() @@ -77,19 +73,6 @@ abstract class BaseSender( droppedAudioFrames++ } } - // Emit typed queue-overflow event with rate limiting to avoid flooding callers. - val now = System.currentTimeMillis() - if (now - lastOverflowSignalMs >= OVERFLOW_SIGNAL_COOLDOWN_MS) { - lastOverflowSignalMs = now - val snapshot = QueueSnapshot(capacity = cacheSize, items = queue.getSize()) - val event = TransportEvent.QueueOverflow( - frameType = mediaFrame.type, - droppedTotal = if (mediaFrame.type == MediaFrame.Type.VIDEO) droppedVideoFrames else droppedAudioFrames, - queueCapacity = snapshot.capacity, - queueSize = snapshot.items, - ) - scope.launch { onMainThread { connectChecker.onTransportEvent(event) } } - } } } @@ -105,7 +88,6 @@ abstract class BaseSender( fun start() { bitrateManager.reset() queue.clear() - lastOverflowSignalMs = 0L running = true job = scope.launch { val bitrateTask = async { @@ -116,6 +98,18 @@ abstract class BaseSender( delay(timeMillis = 1000) } } + val frameEventTask = async { + while (scope.isActive && running) { + val event = TransportEvent.QueueOverflow( + droppedVideo = droppedVideoFrames, + droppedAudio = droppedAudioFrames, + queueCapacity = cacheSize, + queueSize = queue.getSize(), + ) + onMainThread { connectChecker.onTransportEvent(event) } + delay(timeMillis = 1500) + } + } onRun() } } @@ -135,7 +129,7 @@ abstract class BaseSender( @Throws(IllegalArgumentException::class) fun hasCongestion(percentUsed: Float = 20f): Boolean { - if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100") + if (percentUsed !in 0.0..100.0) throw IllegalArgumentException("the value must be in range 0 to 100") val size = queue.getSize().toFloat() val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining