diff --git a/.changeset/humble-boxes-jump.md b/.changeset/humble-boxes-jump.md new file mode 100644 index 00000000..e3a19bf3 --- /dev/null +++ b/.changeset/humble-boxes-jump.md @@ -0,0 +1,6 @@ +--- +"posthog-android": minor +"posthog": minor +--- + +feat: support session replay minimum recording duration diff --git a/posthog-android/api/posthog-android.api b/posthog-android/api/posthog-android.api index faab4159..fb35dd13 100644 --- a/posthog-android/api/posthog-android.api +++ b/posthog-android/api/posthog-android.api @@ -62,7 +62,7 @@ public final class com/posthog/android/replay/PostHogMaskModifier { public static synthetic fun postHogUnmask$default (Lcom/posthog/android/replay/PostHogMaskModifier;Landroidx/compose/ui/Modifier;ZILjava/lang/Object;)Landroidx/compose/ui/Modifier; } -public final class com/posthog/android/replay/PostHogReplayIntegration : com/posthog/PostHogIntegration, com/posthog/internal/replay/PostHogSessionReplayHandler { +public final class com/posthog/android/replay/PostHogReplayIntegration : com/posthog/PostHogIntegration, com/posthog/android/replay/PostHogReplayBufferDelegate, com/posthog/internal/replay/PostHogSessionReplayHandler { public static final field ANDROID_COMPOSE_VIEW Ljava/lang/String; public static final field ANDROID_COMPOSE_VIEW_CLASS_NAME Ljava/lang/String; public static final field PH_NO_CAPTURE_LABEL Ljava/lang/String; @@ -70,12 +70,22 @@ public final class com/posthog/android/replay/PostHogReplayIntegration : com/pos public fun (Landroid/content/Context;Lcom/posthog/android/PostHogAndroidConfig;Lcom/posthog/android/internal/MainHandler;)V public fun install (Lcom/posthog/PostHogInterface;)V public fun isActive ()Z + public fun isBuffering ()Z public fun onRemoteConfig ()V + public fun onReplayBufferSnapshot (Lcom/posthog/android/replay/PostHogReplayQueue;)V public fun start (Z)V public fun stop ()V public fun uninstall ()V } +public final class com/posthog/android/replay/PostHogReplayQueue : com/posthog/internal/PostHogQueueInterface { + public fun add (Lcom/posthog/PostHogEvent;)V + public fun clear ()V + public fun flush ()V + public fun start ()V + public fun stop ()V +} + public final class com/posthog/android/replay/PostHogSessionReplayConfig { public fun ()V public fun (Z)V diff --git a/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt b/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt index f2302f5c..f724caa5 100644 --- a/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt +++ b/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt @@ -1,7 +1,10 @@ package com.posthog.android import com.posthog.PostHogConfig +import com.posthog.android.replay.PostHogReplayQueue import com.posthog.android.replay.PostHogSessionReplayConfig +import com.posthog.internal.PostHogApiEndpoint +import com.posthog.internal.PostHogQueue /** * The SDK Config @@ -19,4 +22,19 @@ public open class PostHogAndroidConfig public var captureDeepLinks: Boolean = true, public var captureScreenViews: Boolean = true, public var sessionReplayConfig: PostHogSessionReplayConfig = PostHogSessionReplayConfig(), - ) : PostHogConfig(apiKey = apiKey, host = host) + ) : PostHogConfig( + apiKey = apiKey, + host = host, + queueProvider = { config, api, endpoint, storagePrefix, executor -> + val defaultQueue = PostHogQueue(config, api, endpoint, storagePrefix, executor) + if (endpoint == PostHogApiEndpoint.SNAPSHOT) { + val replayQueue = PostHogReplayQueue(config, defaultQueue, storagePrefix) + (config as? PostHogAndroidConfig)?.replayQueueHolder = replayQueue + replayQueue + } else { + defaultQueue + } + }, + ) { + internal var replayQueueHolder: PostHogReplayQueue? = null + } diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt new file mode 100644 index 00000000..e375862e --- /dev/null +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt @@ -0,0 +1,22 @@ +package com.posthog.android.replay + +/** + * Delegate interface for controlling session replay buffering behavior. + * + * The replay queue is passive: it checks [isBuffering] on every `add()` and `flush()`, + * and notifies the delegate after buffering a snapshot. + */ +internal interface PostHogReplayBufferDelegate { + /** + * Whether the replay queue should buffer snapshots instead of sending directly. + * Checked on every `queue.add()` and `queue.flush()`. + */ + val isBuffering: Boolean + + /** + * Called after a snapshot was added to the buffer. + * The delegate should check threshold conditions and call + * `replayQueue.migrateBufferToQueue()` when the minimum duration has been met. + */ + fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) +} diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt new file mode 100644 index 00000000..83c7d4e2 --- /dev/null +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt @@ -0,0 +1,173 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.internal.PostHogQueue +import com.posthog.internal.PostHogQueueInterface +import com.posthog.vendor.uuid.TimeBasedEpochGenerator +import java.io.File +import java.util.UUID + +/** + * A disk-based buffer queue for session replay snapshots. + * + * Uses UUID v7 filenames so timestamps can be extracted from filenames + * for duration calculations. + */ +internal class PostHogReplayBufferQueue( + private val config: PostHogConfig, + private val bufferDir: File, +) { + private val items = mutableListOf() + private val itemsLock = Any() + + val depth: Int + get() = synchronized(itemsLock) { items.size } + + /** + * Returns the time span (in millis) between the oldest and newest buffered items, + * based on the UUID v7 embedded timestamps. + */ + val bufferDurationMs: Long? + get() = + synchronized(itemsLock) { + val oldest = items.firstOrNull() ?: return@synchronized null + val newest = items.lastOrNull() ?: return@synchronized null + val oldestTs = timestampFromUUIDv7(oldest) ?: return@synchronized null + val newestTs = timestampFromUUIDv7(newest) ?: return@synchronized null + maxOf(newestTs - oldestTs, 0) + } + + init { + setup() + } + + private fun setup() { + // Clear any leftover buffer from previous sessions — if they're still here, + // they didn't meet the minimum duration threshold and should be discarded. + deleteDirectorySafely(bufferDir) + + try { + bufferDir.mkdirs() + } catch (e: Throwable) { + config.logger.log("Error trying to create replay buffer folder: $e") + } + + synchronized(itemsLock) { + items.clear() + } + } + + private fun deleteDirectorySafely(dir: File) { + try { + if (dir.exists()) { + dir.deleteRecursively() + } + } catch (e: Throwable) { + config.logger.log("Error deleting replay buffer directory: $e") + } + } + + fun add(event: PostHogEvent) { + try { + val filename = "${TimeBasedEpochGenerator.generate()}.event" + val file = File(bufferDir, filename) + val os = config.encryption?.encrypt(file.outputStream()) ?: file.outputStream() + os.use { output -> + config.serializer.serialize(event, output.writer().buffered()) + } + synchronized(itemsLock) { items.add(filename) } + } catch (e: Throwable) { + config.logger.log("Could not write replay buffer file: $e") + } + } + + /** + * Migrates all buffered items to the target queue. + * + * Migration is supported for [PostHogQueue] targets by moving files on disk + * and reloading the target queue from disk. + * + * Returns the number of events successfully migrated. + */ + fun migrateAllTo(targetQueue: PostHogQueueInterface): Int { + if (targetQueue !is PostHogQueue) { + config.logger.log("Replay buffer migration skipped: target queue is not PostHogQueue") + return 0 + } + + val targetDir = targetQueue.queueDirectory + if (targetDir == null) { + config.logger.log("Replay queue has no disk directory configured. Skipping buffer migration.") + return 0 + } + + val itemsToMigrate: List = + synchronized(itemsLock) { + val copy = items.toList() + items.clear() + copy + } + + try { + targetDir.mkdirs() + } catch (e: Throwable) { + config.logger.log("Error creating replay target queue directory: $e") + } + + var migratedCount = 0 + for (item in itemsToMigrate) { + val sourceFile = File(bufferDir, item) + if (!sourceFile.exists()) { + continue + } + val targetFile = File(targetDir, item) + try { + if (targetFile.exists()) { + sourceFile.delete() + continue + } + + if (sourceFile.renameTo(targetFile)) { + migratedCount++ + } else { + config.logger.log("Failed to move replay buffer item $item") + } + } catch (e: Throwable) { + config.logger.log("Failed to migrate replay buffer item $item: $e") + } + } + + targetQueue.reloadFromDisk() + return migratedCount + } + + /** + * Removes all buffered items from disk and memory. + */ + fun clear() { + setup() + } + + companion object { + /** + * Extracts the millisecond epoch timestamp from a UUID v7 filename. + * + * UUID v7 encodes Unix milliseconds in the first 48 bits. + * The filename format is `.event`. + * + * We parse the UUID and extract millis via `mostSignificantBits ushr 16`. + * + * @return millis since epoch, or null if parsing fails + */ + internal fun timestampFromUUIDv7(filename: String): Long? { + return try { + val uuidString = filename.removeSuffix(".event") + val uuid = UUID.fromString(uuidString) + uuid.mostSignificantBits ushr 16 + } catch (_: Throwable) { + null + } + } + } +} diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt index ccd3169c..677963a2 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt @@ -99,7 +99,7 @@ public class PostHogReplayIntegration( private val context: Context, private val config: PostHogAndroidConfig, private val mainHandler: MainHandler, -) : PostHogIntegration, PostHogSessionReplayHandler { +) : PostHogIntegration, PostHogSessionReplayHandler, PostHogReplayBufferDelegate { private val decorViews = WeakHashMap() private val passwordInputTypes = @@ -161,6 +161,14 @@ public class PostHogReplayIntegration( get() = (config.sdkName != "posthog-flutter") private var postHog: PostHogInterface? = null + private var replayQueue: PostHogReplayQueue? = null + + // Minimum duration buffering state + private val bufferingLock = Any() + + @Volatile + private var hasPassedMinimumDuration: Boolean = false + private var cachedMinimumDurationMs: Long? = null @Volatile private var isOnDrawnCalled: Boolean = false @@ -383,6 +391,13 @@ public class PostHogReplayIntegration( integrationInstalled = true this.postHog = postHog + // Wire up as buffer delegate for the replay queue + replayQueue = config.replayQueueHolder + replayQueue?.bufferDelegate = this + + // Load cached minimum duration from remote config (if available) + updateCachedMinimumDuration() + // workaround for react native that is started after the window is added // Curtains.rootViews should be empty for normal apps yet Curtains.rootViews.forEach { view -> @@ -400,6 +415,11 @@ public class PostHogReplayIntegration( try { integrationInstalled = false this.postHog = null + + // Clear buffer delegate + replayQueue?.bufferDelegate = null + replayQueue = null + Curtains.onRootViewsChangedListeners -= onRootViewsChangedListener decorViews.entries.forEach { @@ -1587,6 +1607,8 @@ public class PostHogReplayIntegration( override fun start(resumeCurrent: Boolean) { if (!resumeCurrent) { clearSnapshotStates() + // Reset minimum duration buffering state for the new session + resetBufferingState() } isSessionReplayActive = true @@ -1608,6 +1630,71 @@ public class PostHogReplayIntegration( return isSessionReplayActive } + // MARK: - PostHogReplayBufferDelegate + + override val isBuffering: Boolean + get() { + synchronized(bufferingLock) { + val minimumDuration = cachedMinimumDurationMs + if (minimumDuration == null || minimumDuration <= 0) { + return false + } + return !hasPassedMinimumDuration + } + } + + override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { + val minimumDurationMs: Long? = synchronized(bufferingLock) { cachedMinimumDurationMs } + if (minimumDurationMs == null || minimumDurationMs <= 0) { + // No minimum duration configured: should not be buffering, migrate immediately. + synchronized(bufferingLock) { hasPassedMinimumDuration = true } + replayQueue.migrateBufferToQueue() + return + } + + // Check buffer content duration (oldest to newest snapshot) + val bufferDurationMs = replayQueue.bufferDurationMs ?: 0 + + // Keep buffered snapshots intact until threshold is reached. + // Session replay payloads may include metadata snapshots required by the player, + // so buffering follows an all-or-nothing migration strategy. + if (bufferDurationMs >= minimumDurationMs) { + config.logger.log( + "[Session Replay] Minimum duration met. Migrating ${replayQueue.bufferDepth} buffered events to replay queue.", + ) + // Flip state before migration so new snapshots don't keep entering the buffer during long-running migrations. + synchronized(bufferingLock) { hasPassedMinimumDuration = true } + replayQueue.migrateBufferToQueue() + } + } + + // MARK: - Remote Config + + override fun onRemoteConfig() { + updateCachedMinimumDuration() + } + + private fun updateCachedMinimumDuration() { + val minimumDuration = config.remoteConfigHolder?.getRecordingMinimumDurationMs() + synchronized(bufferingLock) { + cachedMinimumDurationMs = minimumDuration + } + } + + // MARK: - Buffering State + + /** + * Resets buffering state for a new session — clears the buffer and marks + * as not yet passed minimum duration. + */ + private fun resetBufferingState() { + synchronized(bufferingLock) { + hasPassedMinimumDuration = false + } + // Clear any buffered events from previous session + replayQueue?.clearBuffer() + } + internal companion object { const val PH_NO_CAPTURE_LABEL: String = "ph-no-capture" const val PH_NO_MASK_LABEL: String = "ph-no-mask" diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt new file mode 100644 index 00000000..55c0e084 --- /dev/null +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt @@ -0,0 +1,108 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.internal.PostHogQueueInterface +import java.io.File + +/** + * A replay queue that wraps an inner queue (for actual API sends) and a + * [PostHogReplayBufferQueue] (for buffering snapshots until minimum session + * duration is met). + * + * The queue is passive — it delegates all buffering decisions to a + * [PostHogReplayBufferDelegate]. When [PostHogReplayBufferDelegate.isBuffering] + * is true, snapshots are routed to the buffer and `flush()` calls are suppressed. + */ +public class PostHogReplayQueue internal constructor( + private val config: PostHogConfig, + private val innerQueue: PostHogQueueInterface, + replayStoragePrefix: String?, +) : PostHogQueueInterface { + private val replayDir = replayStoragePrefix?.let { File(it, config.apiKey) } + + private val bufferQueue: PostHogReplayBufferQueue = + PostHogReplayBufferQueue( + config, + if (replayStoragePrefix != null) { + File("$replayStoragePrefix-buffer", config.apiKey) + } else { + File(System.getProperty("java.io.tmpdir"), "posthog-replay-buffer/${config.apiKey}") + }, + ) + + internal var bufferDelegate: PostHogReplayBufferDelegate? = null + + /** + * The time span (in millis) of buffered snapshots (oldest to newest). + */ + internal val bufferDurationMs: Long? + get() = bufferQueue.bufferDurationMs + + /** + * Number of events currently in the buffer. + */ + internal val bufferDepth: Int + get() = bufferQueue.depth + + /** + * Approximate number of events currently in the inner replay queue directory. + */ + internal val depth: Int + get() = replayDir?.listFiles()?.size ?: 0 + + override fun add(event: PostHogEvent) { + if (bufferDelegate?.isBuffering == true) { + bufferQueue.add(event) + config.logger.log("Buffered replay event '${event.event}'. Buffer depth: ${bufferQueue.depth}") + bufferDelegate?.onReplayBufferSnapshot(this) + } else { + innerQueue.add(event) + } + } + + override fun flush() { + if (bufferDelegate?.isBuffering == true) { + config.logger.log("Replay queue flush suppressed — still buffering") + return + } + innerQueue.flush() + } + + override fun start() { + innerQueue.start() + } + + override fun stop() { + innerQueue.stop() + } + + override fun clear() { + innerQueue.clear() + bufferQueue.clear() + } + + /** + * Migrates all buffered items to the inner replay queue. + */ + internal fun migrateBufferToQueue() { + val bufferedCount = bufferQueue.depth + if (bufferedCount == 0) { + config.logger.log("No buffered replay events to migrate") + return + } + + val migrated = bufferQueue.migrateAllTo(innerQueue) + config.logger.log( + "Migrated $migrated/$bufferedCount buffered replay events to replay queue.", + ) + } + + /** + * Discards all buffered replay events. + */ + internal fun clearBuffer() { + bufferQueue.clear() + config.logger.log("Replay buffer cleared") + } +} diff --git a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt new file mode 100644 index 00000000..abf9247e --- /dev/null +++ b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt @@ -0,0 +1,161 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.android.API_KEY +import com.posthog.internal.PostHogQueueInterface +import org.junit.Rule +import org.junit.rules.TemporaryFolder +import java.io.File +import java.util.UUID +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue + +internal class PostHogReplayBufferQueueTest { + @get:Rule + val tmpDir = TemporaryFolder() + + private class FakeQueue : PostHogQueueInterface { + val events = mutableListOf() + + override fun add(event: PostHogEvent) { + events.add(event) + } + + override fun flush() { + } + + override fun start() { + } + + override fun stop() { + } + + override fun clear() { + events.clear() + } + } + + private fun createQueue(bufferDir: File? = null): PostHogReplayBufferQueue { + val config = PostHogConfig(API_KEY) + val dir = bufferDir ?: File(tmpDir.newFolder(), "buffer") + return PostHogReplayBufferQueue(config, dir) + } + + private fun createEvent(name: String): PostHogEvent { + return PostHogEvent( + event = name, + distinctId = "test-user", + properties = mutableMapOf("name" to name), + uuid = UUID.randomUUID(), + ) + } + + @Test + fun `add increases depth`() { + val queue = createQueue() + + assertEquals(0, queue.depth) + + queue.add(createEvent("item1")) + assertEquals(1, queue.depth) + + queue.add(createEvent("item2")) + assertEquals(2, queue.depth) + } + + @Test + fun `clear resets depth to zero`() { + val queue = createQueue() + + queue.add(createEvent("item1")) + queue.add(createEvent("item2")) + assertEquals(2, queue.depth) + + queue.clear() + assertEquals(0, queue.depth) + } + + @Test + fun `init clears leftover buffer from previous session`() { + val bufferDir = File(tmpDir.newFolder(), "buffer") + bufferDir.mkdirs() + + File(bufferDir, "leftover.event").writeText("old data") + assertTrue(bufferDir.listFiles()!!.isNotEmpty()) + + val queue = createQueue(bufferDir) + assertEquals(0, queue.depth) + assertTrue(bufferDir.exists()) + assertTrue(bufferDir.listFiles()!!.isEmpty()) + } + + @Test + fun `bufferDurationMs returns null for empty queue`() { + val queue = createQueue() + assertNull(queue.bufferDurationMs) + } + + @Test + fun `bufferDurationMs returns zero for single item`() { + val queue = createQueue() + queue.add(createEvent("only")) + assertEquals(0L, queue.bufferDurationMs) + } + + @Test + fun `bufferDurationMs increases as items are added over time`() { + val queue = createQueue() + + queue.add(createEvent("first")) + val duration1 = queue.bufferDurationMs ?: 0 + + Thread.sleep(50) + queue.add(createEvent("second")) + val duration2 = queue.bufferDurationMs ?: 0 + + assertTrue(duration2 > duration1) + } + + @Test + fun `migrateAllTo noops when target queue is not PostHogQueue`() { + val queue = createQueue() + val targetQueue = FakeQueue() + + queue.add(createEvent("item1")) + val migrated = queue.migrateAllTo(targetQueue) + + assertEquals(0, migrated) + assertEquals(1, queue.depth) + assertEquals(0, targetQueue.events.size) + } + + @Test + fun `migrateAllTo noops for non PostHogQueue target when empty`() { + val queue = createQueue() + val targetQueue = FakeQueue() + + val migrated = queue.migrateAllTo(targetQueue) + + assertEquals(0, migrated) + assertEquals(0, queue.depth) + assertEquals(0, targetQueue.events.size) + } + + @Test + fun `timestampFromUUIDv7 extracts correct milliseconds`() { + val filename = "019711e4-7c00-7000-8000-000000000000.event" + val ts = PostHogReplayBufferQueue.timestampFromUUIDv7(filename) + assertNotNull(ts) + assertEquals(1748351876096L, ts) + } + + @Test + fun `timestampFromUUIDv7 returns null for invalid filename`() { + assertNull(PostHogReplayBufferQueue.timestampFromUUIDv7("not-a-uuid.event")) + assertNull(PostHogReplayBufferQueue.timestampFromUUIDv7("")) + } +} diff --git a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt new file mode 100644 index 00000000..ca3b742f --- /dev/null +++ b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt @@ -0,0 +1,259 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.android.API_KEY +import com.posthog.internal.PostHogQueueInterface +import org.junit.Rule +import org.junit.rules.TemporaryFolder +import java.util.UUID +import kotlin.test.Test +import kotlin.test.assertEquals + +internal class PostHogReplayQueueTest { + @get:Rule + val tmpDir = TemporaryFolder() + + private class FakeQueue : PostHogQueueInterface { + val events = mutableListOf() + var flushCallCount = 0 + var startCallCount = 0 + var stopCallCount = 0 + + override fun add(event: PostHogEvent) { + events.add(event) + } + + override fun flush() { + flushCallCount++ + } + + override fun start() { + startCallCount++ + } + + override fun stop() { + stopCallCount++ + } + + override fun clear() { + events.clear() + } + } + + private class MockReplayBufferDelegate : PostHogReplayBufferDelegate { + override var isBuffering: Boolean = false + var didBufferSnapshotCallCount: Int = 0 + var lastReplayQueue: PostHogReplayQueue? = null + + override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { + didBufferSnapshotCallCount++ + lastReplayQueue = replayQueue + } + } + + private fun createFakeQueue(): FakeQueue { + return FakeQueue() + } + + private fun createReplayQueue( + fakeInnerQueue: FakeQueue, + storagePrefix: String = tmpDir.newFolder().absolutePath, + ): PostHogReplayQueue { + val config = PostHogConfig(API_KEY, "http://localhost:9001") + return PostHogReplayQueue(config, fakeInnerQueue, storagePrefix) + } + + private fun createTestEvent(name: String = "test_event"): PostHogEvent { + return PostHogEvent( + event = name, + distinctId = "test-user", + properties = mutableMapOf("test" to "value"), + uuid = UUID.randomUUID(), + ) + } + + @Test + fun `add routes to buffer when delegate isBuffering is true`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + + assertEquals(2, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + assertEquals(2, delegate.didBufferSnapshotCallCount) + } + + @Test + fun `add routes to inner queue when delegate isBuffering is false`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = false } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + + assertEquals(0, queue.bufferDepth) + assertEquals(2, fakeInnerQueue.events.size) + assertEquals(0, delegate.didBufferSnapshotCallCount) + } + + @Test + fun `flush is suppressed when buffering`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.flush() + + assertEquals(1, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.flushCallCount) + } + + @Test + fun `flush delegates to inner queue when not buffering`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = false } + queue.bufferDelegate = delegate + + queue.flush() + + assertEquals(1, fakeInnerQueue.flushCallCount) + } + + @Test + fun `migrateBufferToQueue noops when inner queue is not PostHogQueue`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + queue.add(createTestEvent("snapshot_3")) + + assertEquals(3, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + + queue.migrateBufferToQueue() + + assertEquals(3, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + } + + @Test + fun `migrateBufferToQueue handles empty buffer gracefully`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + + queue.migrateBufferToQueue() + + assertEquals(0, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + } + + @Test + fun `clearBuffer discards all buffered events`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + + assertEquals(2, queue.bufferDepth) + + queue.clearBuffer() + + assertEquals(0, queue.bufferDepth) + } + + @Test + fun `clear removes both buffer and inner queue events`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + + val delegate = MockReplayBufferDelegate().apply { isBuffering = false } + queue.bufferDelegate = delegate + queue.add(createTestEvent("direct_1")) + assertEquals(1, fakeInnerQueue.events.size) + + delegate.isBuffering = true + queue.add(createTestEvent("buffered_1")) + assertEquals(1, queue.bufferDepth) + + queue.clear() + + assertEquals(0, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + } + + @Test + fun `start and stop delegate to inner queue`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + + queue.start() + queue.stop() + + assertEquals(1, fakeInnerQueue.startCallCount) + assertEquals(1, fakeInnerQueue.stopCallCount) + } + + @Test + fun `events go directly to inner queue after buffering disabled even if migration noops`() { + val fakeInnerQueue = createFakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("buffered_1")) + queue.add(createTestEvent("buffered_2")) + assertEquals(2, queue.bufferDepth) + + queue.migrateBufferToQueue() + delegate.isBuffering = false + + queue.add(createTestEvent("direct_1")) + + assertEquals(2, queue.bufferDepth) + assertEquals(1, fakeInnerQueue.events.size) + assertEquals("direct_1", fakeInnerQueue.events.first().event) + } + + @Test + fun `delegate can disable buffering even if migration noops`() { + val innerQueue = createFakeQueue() + val queue = createReplayQueue(innerQueue) + + val delegate = + object : PostHogReplayBufferDelegate { + override var isBuffering: Boolean = true + + override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { + if (isBuffering && replayQueue.bufferDepth >= 2) { + isBuffering = false + replayQueue.migrateBufferToQueue() + } + } + } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("buffered_1")) + queue.add(createTestEvent("buffered_2")) + queue.add(createTestEvent("direct_after_disable")) + + assertEquals(2, queue.bufferDepth) + assertEquals(1, innerQueue.events.size) + assertEquals("direct_after_disable", innerQueue.events.first().event) + } +} diff --git a/posthog/api/posthog.api b/posthog/api/posthog.api index de5bd199..2db86885 100644 --- a/posthog/api/posthog.api +++ b/posthog/api/posthog.api @@ -817,6 +817,18 @@ public final class com/posthog/internal/PostHogPrintLogger : com/posthog/interna public fun log (Ljava/lang/String;)V } +public final class com/posthog/internal/PostHogQueue : com/posthog/internal/PostHogQueueInterface { + public fun (Lcom/posthog/PostHogConfig;Lcom/posthog/internal/PostHogApi;Lcom/posthog/internal/PostHogApiEndpoint;Ljava/lang/String;Ljava/util/concurrent/ExecutorService;)V + public fun add (Lcom/posthog/PostHogEvent;)V + public fun clear ()V + public fun flush ()V + public final fun getDequeList ()Ljava/util/List; + public final fun getQueueDirectory ()Ljava/io/File; + public final fun reloadFromDisk ()V + public fun start ()V + public fun stop ()V +} + public abstract interface class com/posthog/internal/PostHogQueueInterface { public abstract fun add (Lcom/posthog/PostHogEvent;)V public abstract fun clear ()V @@ -838,6 +850,7 @@ public final class com/posthog/internal/PostHogRemoteConfig : com/posthog/intern public fun getFeatureFlagResult (Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lcom/posthog/FeatureFlagResult; public fun getFeatureFlags (Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Ljava/util/Map; public final fun getFlagDetails (Ljava/lang/String;)Lcom/posthog/internal/FeatureFlag; + public final fun getRecordingMinimumDurationMs ()Ljava/lang/Long; public fun getRequestId (Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Ljava/lang/String; public final fun getSessionRecordingSampleRate ()Ljava/lang/Double; public final fun getSurveys ()Ljava/util/List; diff --git a/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt b/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt index c641a4d6..2bd2c80e 100644 --- a/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt +++ b/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt @@ -2,6 +2,7 @@ package com.posthog.internal import com.posthog.PostHogConfig import com.posthog.PostHogEvent +import com.posthog.PostHogInternal import com.posthog.PostHogVisibleForTesting import com.posthog.vendor.uuid.TimeBasedEpochGenerator import java.io.File @@ -24,7 +25,8 @@ private val RETRYABLE_STATUS_CODES = setOf(429, 500, 502, 503, 504) * @property api the API * @property executor the Executor */ -internal class PostHogQueue( +@PostHogInternal +public class PostHogQueue( private val config: PostHogConfig, private val api: PostHogApi, private val endpoint: PostHogApiEndpoint, @@ -54,6 +56,9 @@ internal class PostHogQueue( private val delay: Long get() = (config.flushIntervalSeconds * 1000).toLong() + public val queueDirectory: File? + get() = storagePrefix?.let { File(it, config.apiKey) } + private fun addEventSync(event: PostHogEvent): Boolean { storagePrefix?.let { val dir = File(it, config.apiKey) @@ -407,31 +412,53 @@ internal class PostHogQueue( * with any new events added after SDK start. */ private fun loadCachedEvents() { - storagePrefix?.let { - val dir = File(it, config.apiKey) + val files = loadQueueFilesFromDisk() + if (files.isEmpty()) return - if (!dir.existsSafely(config)) { - return - } + synchronized(dequeLock) { + // prepend cached files before any events already in the deque + // so that older events are sent first + val existingFiles = deque.toList() + deque.clear() + deque.addAll(files) + deque.addAll(existingFiles) + } + config.logger.log("Loaded ${files.size} cached events from disk for $endpoint.") + } - val files = (dir.listFiles() ?: emptyArray()).toMutableList() + private fun loadQueueFilesFromDisk(): List { + val dir = queueDirectory ?: return emptyList() - if (files.isEmpty()) return + if (!dir.existsSafely(config)) { + return emptyList() + } - // sort by last modified date ascending so events are sent in order - files.sortBy { file -> file.lastModified() } + val files = (dir.listFiles() ?: emptyArray()).toMutableList() + if (files.isEmpty()) { + return emptyList() + } - if (files.isNotEmpty()) { - synchronized(dequeLock) { - // prepend cached files before any events already in the deque - // so that older events are sent first - val existingFiles = deque.toList() - deque.clear() - deque.addAll(files) - deque.addAll(existingFiles) - } - config.logger.log("Loaded ${files.size} cached events from disk for $endpoint.") - } + // sort by last modified date ascending so events are sent in order + files.sortBy { file -> file.lastModified() } + return files + } + + private fun reloadFromDiskSync() { + val files = loadQueueFilesFromDisk() + synchronized(dequeLock) { + deque.clear() + deque.addAll(files) + } + cachedEventsLoaded = true + } + + /** + * Rebuilds the in-memory deque from disk, sorted by file last-modified time. + */ + @PostHogInternal + public fun reloadFromDisk() { + executor.submitSyncSafely { + reloadFromDiskSync() } } @@ -454,7 +481,7 @@ internal class PostHogQueue( } } - val dequeList: List + public val dequeList: List @PostHogVisibleForTesting get() { val tempFiles: List diff --git a/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt b/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt index 497ca144..af66f74a 100644 --- a/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt +++ b/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt @@ -104,6 +104,15 @@ public class PostHogRemoteConfig( @Volatile private var sessionRecordingSampleRate: Double? = null + /** + * The minimum recording duration in milliseconds. + * When configured, session replay snapshots are buffered locally until + * the session reaches this duration threshold. + * null or 0 means no minimum duration (send immediately). + */ + @Volatile + private var sessionRecordingMinimumDurationMs: Long? = null + init { preloadSessionRecordingConfig() preloadSurveys() @@ -351,6 +360,25 @@ public class PostHogRemoteConfig( return value } + /** + * Parses and validates a minimum duration value which may come as a Number (from the API JSON) + * or from cached storage. Returns null if the value is absent, unparseable, or negative. + * The value is expected to be in milliseconds. + */ + private fun parseMinimumDurationMs(raw: Any?): Long? { + val milliseconds: Long? = + when (raw) { + is Number -> raw.toLong() + is String -> raw.toLongOrNull() + else -> null + } + if (milliseconds != null && milliseconds < 0) { + config.logger.log("Remote config minimumDurationMilliseconds must be non-negative, got $milliseconds. Ignoring.") + return null + } + return milliseconds + } + private fun processSessionRecordingConfig(sessionRecording: Any?) { when (sessionRecording) { is Boolean -> { @@ -381,6 +409,8 @@ public class PostHogRemoteConfig( sessionRecordingSampleRate = parseSampleRate(it["sampleRate"]) + sessionRecordingMinimumDurationMs = parseMinimumDurationMs(it["minimumDurationMilliseconds"]) + config.cachePreferences?.setValue(SESSION_REPLAY, it) // TODO: @@ -717,6 +747,8 @@ public class PostHogRemoteConfig( consoleLogRecordingEnabled = sessionRecording["consoleLogRecordingEnabled"] as? Boolean ?: false sessionRecordingSampleRate = parseSampleRate(sessionRecording["sampleRate"]) + + sessionRecordingMinimumDurationMs = parseMinimumDurationMs(sessionRecording["minimumDurationMilliseconds"]) } } } @@ -920,6 +952,13 @@ public class PostHogRemoteConfig( */ public fun getSessionRecordingSampleRate(): Double? = sessionRecordingSampleRate + /** + * Returns the current minimum recording duration in milliseconds, or null if not set. + * When set, session replay snapshots should be buffered until the session + * reaches this duration. + */ + public fun getRecordingMinimumDurationMs(): Long? = sessionRecordingMinimumDurationMs + override fun getRequestId( distinctId: String?, groups: Map?,