diff --git a/common/src/main/java/com/pedro/common/ConnectChecker.kt b/common/src/main/java/com/pedro/common/ConnectChecker.kt index 7745d2f4d..ba800975c 100644 --- a/common/src/main/java/com/pedro/common/ConnectChecker.kt +++ b/common/src/main/java/com/pedro/common/ConnectChecker.kt @@ -26,4 +26,20 @@ interface ConnectChecker: BitrateChecker { fun onDisconnect() fun onAuthError() fun onAuthSuccess() + + /** + * Typed transport event from the sender layer. + * + * Provides machine-readable signal types so callers can differentiate between + * queue-pressure events ([TransportEvent.QueueOverflow]) and network errors + * ([TransportEvent.NetworkSendError]). + * + * Default implementation is a no-op so existing [ConnectChecker] implementors + * are not required to override this method. A [TransportEvent.NetworkSendError] is + * always accompanied by a corresponding [onConnectionFailed] call for backward + * compatibility. + * + * @param event Typed transport event; use exhaustive `when` for dispatch. + */ + fun onTransportEvent(event: TransportEvent) {} } \ No newline at end of file diff --git a/common/src/main/java/com/pedro/common/FrameLifecycleListener.kt b/common/src/main/java/com/pedro/common/FrameLifecycleListener.kt new file mode 100644 index 000000000..33d7b142f --- /dev/null +++ b/common/src/main/java/com/pedro/common/FrameLifecycleListener.kt @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024 pedroSG94. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pedro.common + +import com.pedro.common.frame.MediaFrame + +/** + * Lifecycle callback for frames sent through a [com.pedro.common.base.BaseSender]. + * + * The callback is invoked from the **sender's dispatch thread** (IO coroutine) after the + * sender has fully consumed a [MediaFrame] — i.e., after the frame data has been encoded + * into protocol packets and written to the network socket. At that point the [MediaFrame.data] + * buffer is no longer referenced by the sender and may be safely returned to a buffer pool. + * + * ## Thread guarantee + * [onFrameConsumed] is called on the sender's internal coroutine dispatcher (typically + * [kotlinx.coroutines.Dispatchers.IO]). Implementations must not block this thread. + * + * ## Usage with FramePayloadPool (RTMP pooled-copy) + * ```kotlin + * rtmpSender.frameLifecycleListener = FrameLifecycleListener { frame -> + * (frame.data as? PooledBuffer)?.release() + * } + * ``` + */ +fun interface FrameLifecycleListener { + + /** + * Called after [frame] has been fully consumed by the sender. + * + * The sender will not access [frame] or [MediaFrame.data] after this call. + * The implementation may safely release, recycle, or pool the underlying buffer. + * + * @param frame The frame that was consumed. Same instance that was passed to + * [com.pedro.common.base.BaseSender.sendMediaFrame]. + */ + fun onFrameConsumed(frame: MediaFrame) +} diff --git a/common/src/main/java/com/pedro/common/QueueSnapshot.kt b/common/src/main/java/com/pedro/common/QueueSnapshot.kt new file mode 100644 index 000000000..3e71c49b3 --- /dev/null +++ b/common/src/main/java/com/pedro/common/QueueSnapshot.kt @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 pedroSG94. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pedro.common + +/** + * Point-in-time snapshot of a sender's internal frame queue. + * + * Returned by [com.pedro.common.base.BaseSender.getQueueSnapshot]. + * Safe to pass across threads; all fields are immutable. + * + * @param capacity Maximum number of frames the queue can hold. + * @param items Number of frames currently queued. + * @param softThresholdPercent Usage % at which congestion enters soft state (default 70). + * @param hardThresholdPercent Usage % at which congestion enters hard state (default 85). + */ +data class QueueSnapshot( + val capacity: Int, + val items: Int, + val softThresholdPercent: Float = 70f, + val hardThresholdPercent: Float = 85f, +) { + /** Usage in [0.0, 1.0]. 0.0 when capacity is zero. */ + val usageRatio: Double + get() = if (capacity > 0) items.toDouble() / capacity else 0.0 + + /** Human-readable summary for log lines. */ + fun summary(): String = "$items/$capacity (${"%.0f".format(usageRatio * 100)}%)" +} diff --git a/common/src/main/java/com/pedro/common/TransportEvent.kt b/common/src/main/java/com/pedro/common/TransportEvent.kt new file mode 100644 index 000000000..fbcb2d903 --- /dev/null +++ b/common/src/main/java/com/pedro/common/TransportEvent.kt @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 pedroSG94. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pedro.common + +/** + * Typed transport events emitted by [com.pedro.common.base.BaseSender]. + * + * These supplement [ConnectChecker.onConnectionFailed] with machine-readable signal types, + * allowing callers to differentiate between queue-pressure events (which do not require a + * network reconnect) and actual network send failures (which do). + * + * Designed as a sealed class so all variants are exhaustive-checked by callers. + */ +sealed class TransportEvent { + + /** + * The sender's internal frame queue has reached capacity and rejected a frame. + * + * This does not indicate a network failure. The connection is healthy; the sender + * cannot consume frames fast enough. Callers should reduce frame rate or bitrate, + * request a keyframe, or clear the queue — but NOT reconnect. + * + * @param droppedVideo Total dropped video frames of this type since sender start. + * @param droppedAudio Total dropped audio frames of this type since sender start. + * @param queueCapacity Maximum number of frames the queue can hold. + * @param queueSize Number of frames currently in the queue. + */ + data class QueueOverflow( + val droppedVideo: Long, + val droppedAudio: Long, + val queueCapacity: Int, + val queueSize: Int, + ) : TransportEvent() + + /** + * A network-level send error occurred inside the sender dispatch loop. + * + * This is equivalent to the existing [ConnectChecker.onConnectionFailed] call and + * is emitted in addition to (not instead of) that callback for typed handling. + * + * @param message Human-readable error description. + * @param cause Original exception, if available. + */ + data class NetworkSendError( + val message: String, + val cause: Throwable? = null, + ) : TransportEvent() +} diff --git a/common/src/main/java/com/pedro/common/base/BaseSender.kt b/common/src/main/java/com/pedro/common/base/BaseSender.kt index 154eab3e4..5d98ac437 100644 --- a/common/src/main/java/com/pedro/common/base/BaseSender.kt +++ b/common/src/main/java/com/pedro/common/base/BaseSender.kt @@ -3,8 +3,12 @@ package com.pedro.common.base import android.util.Log import com.pedro.common.BitrateManager import com.pedro.common.ConnectChecker +import com.pedro.common.FrameLifecycleListener +import com.pedro.common.QueueSnapshot import com.pedro.common.StreamBlockingQueue +import com.pedro.common.TransportEvent import com.pedro.common.frame.MediaFrame +import com.pedro.common.onMainThread import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -22,6 +26,8 @@ abstract class BaseSender( @Volatile protected var running = false + // Tracks actual queue capacity; updated when resizeCache() replaces the queue. + // getCacheSize() reads this field — it must stay in sync with the live queue object. private var cacheSize = 400 @Volatile protected var queue = StreamBlockingQueue(cacheSize) @@ -41,6 +47,15 @@ abstract class BaseSender( @Volatile protected var bytesSendPerSecond = 0L + /** + * Lifecycle callback invoked after the sender's dispatch thread has fully consumed a + * [MediaFrame]. Register a [FrameLifecycleListener] to receive notifications when the + * frame's [MediaFrame.data] buffer may safely be returned to a pool. + * + * Called from the sender's IO coroutine; implementations must not block. + */ + var frameLifecycleListener: FrameLifecycleListener? = null + abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean) protected abstract suspend fun onRun() @@ -50,17 +65,26 @@ abstract class BaseSender( if (running && !queue.trySend(mediaFrame)) { when (mediaFrame.type) { MediaFrame.Type.VIDEO -> { - Log.i(TAG, "Video frame discarded") + Log.i(TAG, "Video frame discarded (queue full)") droppedVideoFrames++ } MediaFrame.Type.AUDIO -> { - Log.i(TAG, "Audio frame discarded") + Log.i(TAG, "Audio frame discarded (queue full)") droppedAudioFrames++ } } } } + /** + * Called by subclass [onRun] implementations after a [MediaFrame] has been fully + * dispatched to the network socket. Notifies [frameLifecycleListener] so callers can + * recycle the frame's buffer. + */ + protected fun notifyFrameConsumed(frame: MediaFrame) { + frameLifecycleListener?.onFrameConsumed(frame) + } + fun start() { bitrateManager.reset() queue.clear() @@ -74,6 +98,18 @@ abstract class BaseSender( delay(timeMillis = 1000) } } + val frameEventTask = async { + while (scope.isActive && running) { + val event = TransportEvent.QueueOverflow( + droppedVideo = droppedVideoFrames, + droppedAudio = droppedAudioFrames, + queueCapacity = cacheSize, + queueSize = queue.getSize(), + ) + onMainThread { connectChecker.onTransportEvent(event) } + delay(timeMillis = 1500) + } + } onRun() } } @@ -93,7 +129,7 @@ abstract class BaseSender( @Throws(IllegalArgumentException::class) fun hasCongestion(percentUsed: Float = 20f): Boolean { - if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100") + if (percentUsed !in 0.0..100.0) throw IllegalArgumentException("the value must be in range 0 to 100") val size = queue.getSize().toFloat() val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining @@ -107,12 +143,26 @@ abstract class BaseSender( val tempQueue = StreamBlockingQueue(newSize) queue.drainTo(tempQueue) queue = tempQueue + cacheSize = newSize // keep getCacheSize() in sync with the new queue capacity } + /** + * Returns the current maximum capacity of the sender frame queue. + * Reflects the value passed to the most recent [resizeCache] call. + */ fun getCacheSize(): Int = cacheSize fun getItemsInCache(): Int = queue.getSize() + /** + * Returns a point-in-time snapshot of the sender queue state. + * Thread-safe; values are read atomically from the backing [StreamBlockingQueue]. + */ + fun getQueueSnapshot(): QueueSnapshot = QueueSnapshot( + capacity = cacheSize, + items = queue.getSize(), + ) + fun clearCache() { queue.clear() } diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt index c64513ca6..42b428cb2 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt @@ -100,10 +100,18 @@ class RtmpSender( bytesSend += size bytesSendPerSecond += size } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false diff --git a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt index dac9bfd86..d8b0ccd1c 100644 --- a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt +++ b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt @@ -136,10 +136,18 @@ class RtspSender( Log.i(TAG, "wrote $type packet, size $size") } } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false diff --git a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt index 67a2ce64e..1f9e11d4e 100644 --- a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt +++ b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt @@ -130,10 +130,18 @@ class SrtSender( bytesSend += bytesPsi + bytes bytesSendPerSecond += bytesPsi + bytes } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false diff --git a/udp/src/main/java/com/pedro/udp/UdpSender.kt b/udp/src/main/java/com/pedro/udp/UdpSender.kt index 619614161..247068a5c 100644 --- a/udp/src/main/java/com/pedro/udp/UdpSender.kt +++ b/udp/src/main/java/com/pedro/udp/UdpSender.kt @@ -112,10 +112,18 @@ class UdpSender( bytesSend += bytesPsi + bytes bytesSendPerSecond += bytesPsi + bytes } + // Notify lifecycle listener: frame fully consumed, buffer may be recycled. + notifyFrameConsumed(mediaFrame) }.exceptionOrNull() if (error != null) { onMainThread { connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}") + connectChecker.onTransportEvent( + com.pedro.common.TransportEvent.NetworkSendError( + message = "Error send packet, ${error.validMessage()}", + cause = error, + ) + ) } Log.e(TAG, "send error: ", error) running = false