From bd04f15e059bac179b211067e61db1b90ddbdf19 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Wed, 8 Apr 2026 16:19:39 +0300 Subject: [PATCH 1/8] feat: add buffer queue --- .../replay/PostHogReplayBufferDelegate.kt | 22 +++ .../replay/PostHogReplayBufferQueue.kt | 183 ++++++++++++++++++ .../android/replay/PostHogReplayQueue.kt | 115 +++++++++++ .../java/com/posthog/internal/PostHogQueue.kt | 6 +- 4 files changed, 324 insertions(+), 2 deletions(-) create mode 100644 posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt create mode 100644 posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt create mode 100644 posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt new file mode 100644 index 00000000..e375862e --- /dev/null +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferDelegate.kt @@ -0,0 +1,22 @@ +package com.posthog.android.replay + +/** + * Delegate interface for controlling session replay buffering behavior. + * + * The replay queue is passive: it checks [isBuffering] on every `add()` and `flush()`, + * and notifies the delegate after buffering a snapshot. + */ +internal interface PostHogReplayBufferDelegate { + /** + * Whether the replay queue should buffer snapshots instead of sending directly. + * Checked on every `queue.add()` and `queue.flush()`. + */ + val isBuffering: Boolean + + /** + * Called after a snapshot was added to the buffer. + * The delegate should check threshold conditions and call + * `replayQueue.migrateBufferToQueue()` when the minimum duration has been met. + */ + fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) +} diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt new file mode 100644 index 00000000..e44ab792 --- /dev/null +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt @@ -0,0 +1,183 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.internal.PostHogQueueInterface +import com.posthog.vendor.uuid.TimeBasedEpochGenerator +import java.io.File +import java.util.UUID + +/** + * A disk-based buffer queue for session replay snapshots. + * + * Uses UUID v7 filenames so timestamps can be extracted from filenames + * for duration calculations. + */ +internal class PostHogReplayBufferQueue( + private val config: PostHogConfig, + private val bufferDir: File, +) { + private val items = mutableListOf() + private val itemsLock = Any() + + val depth: Int + get() = synchronized(itemsLock) { items.size } + + /** + * Returns the time span (in millis) between the oldest and newest buffered items, + * based on the UUID v7 embedded timestamps. + */ + val bufferDurationMs: Long? + get() = + synchronized(itemsLock) { + val oldest = items.firstOrNull() ?: return null + val newest = items.lastOrNull() ?: return null + val oldestTs = timestampFromUUIDv7(oldest) ?: return null + val newestTs = timestampFromUUIDv7(newest) ?: return null + maxOf(newestTs - oldestTs, 0) + } + + init { + setup() + } + + private fun setup() { + // Clear any leftover buffer from previous sessions — if they're still here, + // they didn't meet the minimum duration threshold and should be discarded. + deleteDirectorySafely(bufferDir) + + try { + bufferDir.mkdirs() + } catch (e: Throwable) { + config.logger.log("Error trying to create replay buffer folder: $e") + } + + synchronized(itemsLock) { + items.clear() + } + } + + private fun deleteDirectorySafely(dir: File) { + try { + if (dir.exists()) { + dir.deleteRecursively() + } + } catch (e: Throwable) { + config.logger.log("Error deleting replay buffer directory: $e") + } + } + + fun add(event: PostHogEvent) { + try { + val filename = "${TimeBasedEpochGenerator.generate()}.event" + val file = File(bufferDir, filename) + val os = config.encryption?.encrypt(file.outputStream()) ?: file.outputStream() + os.use { output -> + config.serializer.serialize(event, output.writer().buffered()) + } + synchronized(itemsLock) { items.add(filename) } + } catch (e: Throwable) { + config.logger.log("Could not write replay buffer file: $e") + } + } + + /** + * Removes items older than `newestTimestamp - durationMs`. + * This keeps approximately [durationMs] worth of snapshots in the buffer. + */ + fun pruneOlderThan(durationMs: Long) { + val newestTs: Long = + synchronized(itemsLock) { + val newest = items.lastOrNull() ?: return + timestampFromUUIDv7(newest) ?: return + } + val cutoff = newestTs - durationMs + + // Items are sorted oldest → newest already + synchronized(itemsLock) { + while (items.isNotEmpty()) { + val first = items.first() + val ts = timestampFromUUIDv7(first) ?: break + if (ts < cutoff) { + items.removeAt(0) + deleteFileSafely(File(bufferDir, first)) + } else { + break + } + } + } + } + + /** + * Migrates all buffered items to the target queue by deserializing each buffered + * event and re-adding it to the target queue. + * + * Returns the number of events successfully migrated. + */ + fun migrateAllTo(targetQueue: PostHogQueueInterface): Int { + val itemsToMigrate: List = + synchronized(itemsLock) { + val copy = items.toList() + items.clear() + copy + } + + var migratedCount = 0 + for (item in itemsToMigrate) { + val sourceFile = File(bufferDir, item) + try { + val input = config.encryption?.decrypt(sourceFile.inputStream()) ?: sourceFile.inputStream() + input.use { + val event = config.serializer.deserialize(it.reader().buffered()) + if (event != null) { + targetQueue.add(event) + migratedCount++ + } + } + } catch (e: Throwable) { + config.logger.log("Failed to migrate replay buffer item $item: $e") + } finally { + deleteFileSafely(sourceFile) + } + } + + return migratedCount + } + + /** + * Removes all buffered items from disk and memory. + */ + fun clear() { + setup() + } + + private fun deleteFileSafely(file: File) { + try { + file.delete() + } catch (e: Throwable) { + config.logger.log("Error deleting replay buffer file ${file.name}: $e") + } + } + + companion object { + /** + * Extracts the millisecond epoch timestamp from a UUID v7 filename. + * + * UUID v7 encodes Unix milliseconds in the first 48 bits. + * The filename format is `.event`. + * + * We parse the UUID and extract millis via `mostSignificantBits ushr 16`. + * + * @return millis since epoch, or null if parsing fails + */ + internal fun timestampFromUUIDv7(filename: String): Long? { + return try { + val uuidString = filename.removeSuffix(".event") + val uuid = UUID.fromString(uuidString) + uuid.mostSignificantBits ushr 16 + } catch (_: Throwable) { + null + } + } + } +} diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt new file mode 100644 index 00000000..10de5f31 --- /dev/null +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt @@ -0,0 +1,115 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.internal.PostHogQueueInterface +import java.io.File + +/** + * A replay queue that wraps an inner queue (for actual API sends) and a + * [PostHogReplayBufferQueue] (for buffering snapshots until minimum session + * duration is met). + * + * The queue is passive — it delegates all buffering decisions to a + * [PostHogReplayBufferDelegate]. When [PostHogReplayBufferDelegate.isBuffering] + * is true, snapshots are routed to the buffer and `flush()` calls are suppressed. + */ +public class PostHogReplayQueue internal constructor( + private val config: PostHogConfig, + private val innerQueue: PostHogQueueInterface, + replayStoragePrefix: String?, +) : PostHogQueueInterface { + private val replayDir = replayStoragePrefix?.let { File(it, config.apiKey) } + + private val bufferQueue: PostHogReplayBufferQueue = + PostHogReplayBufferQueue( + config, + if (replayStoragePrefix != null) { + File("$replayStoragePrefix-buffer", config.apiKey) + } else { + File(System.getProperty("java.io.tmpdir"), "posthog-replay-buffer/${config.apiKey}") + }, + ) + + internal var bufferDelegate: PostHogReplayBufferDelegate? = null + + /** + * The time span (in millis) of buffered snapshots (oldest to newest). + */ + internal val bufferDurationMs: Long? + get() = bufferQueue.bufferDurationMs + + /** + * Number of events currently in the buffer. + */ + internal val bufferDepth: Int + get() = bufferQueue.depth + + /** + * Approximate number of events currently in the inner replay queue directory. + */ + internal val depth: Int + get() = replayDir?.listFiles()?.size ?: 0 + + override fun add(event: PostHogEvent) { + if (bufferDelegate?.isBuffering == true) { + bufferQueue.add(event) + config.logger.log("Buffered replay event '${event.event}'. Buffer depth: ${bufferQueue.depth}") + bufferDelegate?.onReplayBufferSnapshot(this) + } else { + innerQueue.add(event) + } + } + + override fun flush() { + if (bufferDelegate?.isBuffering == true) { + config.logger.log("Replay queue flush suppressed — still buffering") + return + } + innerQueue.flush() + } + + override fun start() { + innerQueue.start() + } + + override fun stop() { + innerQueue.stop() + } + + override fun clear() { + innerQueue.clear() + bufferQueue.clear() + } + + /** + * Migrates all buffered items to the inner replay queue. + */ + internal fun migrateBufferToQueue() { + val bufferedCount = bufferQueue.depth + if (bufferedCount == 0) { + config.logger.log("No buffered replay events to migrate") + return + } + + val migrated = bufferQueue.migrateAllTo(innerQueue) + config.logger.log( + "Migrated $migrated/$bufferedCount buffered replay events to replay queue.", + ) + } + + /** + * Discards all buffered replay events. + */ + internal fun clearBuffer() { + bufferQueue.clear() + config.logger.log("Replay buffer cleared") + } + + /** + * Prunes buffer items older than the given duration from the newest item. + */ + internal fun pruneBuffer(olderThanMs: Long) { + bufferQueue.pruneOlderThan(olderThanMs) + } +} diff --git a/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt b/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt index c641a4d6..b9e6d12d 100644 --- a/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt +++ b/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt @@ -2,6 +2,7 @@ package com.posthog.internal import com.posthog.PostHogConfig import com.posthog.PostHogEvent +import com.posthog.PostHogInternal import com.posthog.PostHogVisibleForTesting import com.posthog.vendor.uuid.TimeBasedEpochGenerator import java.io.File @@ -24,7 +25,8 @@ private val RETRYABLE_STATUS_CODES = setOf(429, 500, 502, 503, 504) * @property api the API * @property executor the Executor */ -internal class PostHogQueue( +@PostHogInternal +public class PostHogQueue( private val config: PostHogConfig, private val api: PostHogApi, private val endpoint: PostHogApiEndpoint, @@ -454,7 +456,7 @@ internal class PostHogQueue( } } - val dequeList: List + public val dequeList: List @PostHogVisibleForTesting get() { val tempFiles: List From fb1fb29b50b25adf256f8c7cc557088c863cbbbb Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Wed, 8 Apr 2026 16:20:09 +0300 Subject: [PATCH 2/8] feat: parse remote config --- .../posthog/internal/PostHogRemoteConfig.kt | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt b/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt index 497ca144..af66f74a 100644 --- a/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt +++ b/posthog/src/main/java/com/posthog/internal/PostHogRemoteConfig.kt @@ -104,6 +104,15 @@ public class PostHogRemoteConfig( @Volatile private var sessionRecordingSampleRate: Double? = null + /** + * The minimum recording duration in milliseconds. + * When configured, session replay snapshots are buffered locally until + * the session reaches this duration threshold. + * null or 0 means no minimum duration (send immediately). + */ + @Volatile + private var sessionRecordingMinimumDurationMs: Long? = null + init { preloadSessionRecordingConfig() preloadSurveys() @@ -351,6 +360,25 @@ public class PostHogRemoteConfig( return value } + /** + * Parses and validates a minimum duration value which may come as a Number (from the API JSON) + * or from cached storage. Returns null if the value is absent, unparseable, or negative. + * The value is expected to be in milliseconds. + */ + private fun parseMinimumDurationMs(raw: Any?): Long? { + val milliseconds: Long? = + when (raw) { + is Number -> raw.toLong() + is String -> raw.toLongOrNull() + else -> null + } + if (milliseconds != null && milliseconds < 0) { + config.logger.log("Remote config minimumDurationMilliseconds must be non-negative, got $milliseconds. Ignoring.") + return null + } + return milliseconds + } + private fun processSessionRecordingConfig(sessionRecording: Any?) { when (sessionRecording) { is Boolean -> { @@ -381,6 +409,8 @@ public class PostHogRemoteConfig( sessionRecordingSampleRate = parseSampleRate(it["sampleRate"]) + sessionRecordingMinimumDurationMs = parseMinimumDurationMs(it["minimumDurationMilliseconds"]) + config.cachePreferences?.setValue(SESSION_REPLAY, it) // TODO: @@ -717,6 +747,8 @@ public class PostHogRemoteConfig( consoleLogRecordingEnabled = sessionRecording["consoleLogRecordingEnabled"] as? Boolean ?: false sessionRecordingSampleRate = parseSampleRate(sessionRecording["sampleRate"]) + + sessionRecordingMinimumDurationMs = parseMinimumDurationMs(sessionRecording["minimumDurationMilliseconds"]) } } } @@ -920,6 +952,13 @@ public class PostHogRemoteConfig( */ public fun getSessionRecordingSampleRate(): Double? = sessionRecordingSampleRate + /** + * Returns the current minimum recording duration in milliseconds, or null if not set. + * When set, session replay snapshots should be buffered until the session + * reaches this duration. + */ + public fun getRecordingMinimumDurationMs(): Long? = sessionRecordingMinimumDurationMs + override fun getRequestId( distinctId: String?, groups: Map?, From 828eb8c643ed7fd05e9b89631ba79d314d5c3e22 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Wed, 8 Apr 2026 16:21:29 +0300 Subject: [PATCH 3/8] feat: write up replay integration --- .../posthog/android/PostHogAndroidConfig.kt | 20 ++++- .../replay/PostHogReplayIntegration.kt | 88 ++++++++++++++++++- 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt b/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt index f2302f5c..f724caa5 100644 --- a/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt +++ b/posthog-android/src/main/java/com/posthog/android/PostHogAndroidConfig.kt @@ -1,7 +1,10 @@ package com.posthog.android import com.posthog.PostHogConfig +import com.posthog.android.replay.PostHogReplayQueue import com.posthog.android.replay.PostHogSessionReplayConfig +import com.posthog.internal.PostHogApiEndpoint +import com.posthog.internal.PostHogQueue /** * The SDK Config @@ -19,4 +22,19 @@ public open class PostHogAndroidConfig public var captureDeepLinks: Boolean = true, public var captureScreenViews: Boolean = true, public var sessionReplayConfig: PostHogSessionReplayConfig = PostHogSessionReplayConfig(), - ) : PostHogConfig(apiKey = apiKey, host = host) + ) : PostHogConfig( + apiKey = apiKey, + host = host, + queueProvider = { config, api, endpoint, storagePrefix, executor -> + val defaultQueue = PostHogQueue(config, api, endpoint, storagePrefix, executor) + if (endpoint == PostHogApiEndpoint.SNAPSHOT) { + val replayQueue = PostHogReplayQueue(config, defaultQueue, storagePrefix) + (config as? PostHogAndroidConfig)?.replayQueueHolder = replayQueue + replayQueue + } else { + defaultQueue + } + }, + ) { + internal var replayQueueHolder: PostHogReplayQueue? = null + } diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt index ccd3169c..f6e32f40 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt @@ -99,7 +99,7 @@ public class PostHogReplayIntegration( private val context: Context, private val config: PostHogAndroidConfig, private val mainHandler: MainHandler, -) : PostHogIntegration, PostHogSessionReplayHandler { +) : PostHogIntegration, PostHogSessionReplayHandler, PostHogReplayBufferDelegate { private val decorViews = WeakHashMap() private val passwordInputTypes = @@ -161,6 +161,14 @@ public class PostHogReplayIntegration( get() = (config.sdkName != "posthog-flutter") private var postHog: PostHogInterface? = null + private var replayQueue: PostHogReplayQueue? = null + + // Minimum duration buffering state + private val bufferingLock = Any() + + @Volatile + private var hasPassedMinimumDuration: Boolean = false + private var cachedMinimumDurationMs: Long? = null @Volatile private var isOnDrawnCalled: Boolean = false @@ -383,6 +391,13 @@ public class PostHogReplayIntegration( integrationInstalled = true this.postHog = postHog + // Wire up as buffer delegate for the replay queue + replayQueue = config.replayQueueHolder + replayQueue?.bufferDelegate = this + + // Load cached minimum duration from remote config (if available) + updateCachedMinimumDuration() + // workaround for react native that is started after the window is added // Curtains.rootViews should be empty for normal apps yet Curtains.rootViews.forEach { view -> @@ -400,6 +415,11 @@ public class PostHogReplayIntegration( try { integrationInstalled = false this.postHog = null + + // Clear buffer delegate + replayQueue?.bufferDelegate = null + replayQueue = null + Curtains.onRootViewsChangedListeners -= onRootViewsChangedListener decorViews.entries.forEach { @@ -1587,6 +1607,8 @@ public class PostHogReplayIntegration( override fun start(resumeCurrent: Boolean) { if (!resumeCurrent) { clearSnapshotStates() + // Reset minimum duration buffering state for the new session + resetBufferingState() } isSessionReplayActive = true @@ -1608,6 +1630,70 @@ public class PostHogReplayIntegration( return isSessionReplayActive } + // MARK: - PostHogReplayBufferDelegate + + override val isBuffering: Boolean + get() { + synchronized(bufferingLock) { + val minimumDuration = cachedMinimumDurationMs + if (minimumDuration == null || minimumDuration <= 0) { + return false + } + return !hasPassedMinimumDuration + } + } + + override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { + val minimumDurationMs: Long? = synchronized(bufferingLock) { cachedMinimumDurationMs } + if (minimumDurationMs == null || minimumDurationMs <= 0) { + // No minimum duration configured: should not be buffering, migrate immediately + replayQueue.migrateBufferToQueue() + synchronized(bufferingLock) { hasPassedMinimumDuration = true } + return + } + + // Check buffer content duration (oldest to newest snapshot) + val bufferDurationMs = replayQueue.bufferDurationMs ?: 0 + + // Prune old snapshots beyond minimum duration window + replayQueue.pruneBuffer(olderThanMs = minimumDurationMs) + + if (bufferDurationMs >= minimumDurationMs) { + config.logger.log( + "[Session Replay] Minimum duration met. Migrating ${replayQueue.bufferDepth} buffered events to replay queue.", + ) + replayQueue.migrateBufferToQueue() + synchronized(bufferingLock) { hasPassedMinimumDuration = true } + } + } + + // MARK: - Remote Config + + override fun onRemoteConfig() { + updateCachedMinimumDuration() + } + + private fun updateCachedMinimumDuration() { + val minimumDuration = config.remoteConfigHolder?.getRecordingMinimumDurationMs() + synchronized(bufferingLock) { + cachedMinimumDurationMs = minimumDuration + } + } + + // MARK: - Buffering State + + /** + * Resets buffering state for a new session — clears the buffer and marks + * as not yet passed minimum duration. + */ + private fun resetBufferingState() { + synchronized(bufferingLock) { + hasPassedMinimumDuration = false + } + // Clear any buffered events from previous session + replayQueue?.clearBuffer() + } + internal companion object { const val PH_NO_CAPTURE_LABEL: String = "ph-no-capture" const val PH_NO_MASK_LABEL: String = "ph-no-mask" From f630ff98dbe1e0ab3255a072e2006185d11e1148 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Wed, 8 Apr 2026 19:30:37 +0300 Subject: [PATCH 4/8] fix: do not prune buffer queue --- .../replay/PostHogReplayBufferQueue.kt | 35 +++---------------- .../replay/PostHogReplayIntegration.kt | 6 ++-- .../android/replay/PostHogReplayQueue.kt | 7 ---- 3 files changed, 7 insertions(+), 41 deletions(-) diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt index e44ab792..939a16a4 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt @@ -30,10 +30,10 @@ internal class PostHogReplayBufferQueue( val bufferDurationMs: Long? get() = synchronized(itemsLock) { - val oldest = items.firstOrNull() ?: return null - val newest = items.lastOrNull() ?: return null - val oldestTs = timestampFromUUIDv7(oldest) ?: return null - val newestTs = timestampFromUUIDv7(newest) ?: return null + val oldest = items.firstOrNull() ?: return@synchronized null + val newest = items.lastOrNull() ?: return@synchronized null + val oldestTs = timestampFromUUIDv7(oldest) ?: return@synchronized null + val newestTs = timestampFromUUIDv7(newest) ?: return@synchronized null maxOf(newestTs - oldestTs, 0) } @@ -81,33 +81,6 @@ internal class PostHogReplayBufferQueue( } } - /** - * Removes items older than `newestTimestamp - durationMs`. - * This keeps approximately [durationMs] worth of snapshots in the buffer. - */ - fun pruneOlderThan(durationMs: Long) { - val newestTs: Long = - synchronized(itemsLock) { - val newest = items.lastOrNull() ?: return - timestampFromUUIDv7(newest) ?: return - } - val cutoff = newestTs - durationMs - - // Items are sorted oldest → newest already - synchronized(itemsLock) { - while (items.isNotEmpty()) { - val first = items.first() - val ts = timestampFromUUIDv7(first) ?: break - if (ts < cutoff) { - items.removeAt(0) - deleteFileSafely(File(bufferDir, first)) - } else { - break - } - } - } - } - /** * Migrates all buffered items to the target queue by deserializing each buffered * event and re-adding it to the target queue. diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt index f6e32f40..0a5f498e 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt @@ -1655,9 +1655,9 @@ public class PostHogReplayIntegration( // Check buffer content duration (oldest to newest snapshot) val bufferDurationMs = replayQueue.bufferDurationMs ?: 0 - // Prune old snapshots beyond minimum duration window - replayQueue.pruneBuffer(olderThanMs = minimumDurationMs) - + // Keep buffered snapshots intact until threshold is reached. + // Session replay payloads may include metadata snapshots required by the player, + // so buffering follows an all-or-nothing migration strategy. if (bufferDurationMs >= minimumDurationMs) { config.logger.log( "[Session Replay] Minimum duration met. Migrating ${replayQueue.bufferDepth} buffered events to replay queue.", diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt index 10de5f31..55c0e084 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayQueue.kt @@ -105,11 +105,4 @@ public class PostHogReplayQueue internal constructor( bufferQueue.clear() config.logger.log("Replay buffer cleared") } - - /** - * Prunes buffer items older than the given duration from the newest item. - */ - internal fun pruneBuffer(olderThanMs: Long) { - bufferQueue.pruneOlderThan(olderThanMs) - } } From 696269284af6d2daa376b13ed2a365b90c2d1a30 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Thu, 9 Apr 2026 02:12:06 +0300 Subject: [PATCH 5/8] feat: flip buffering switch before draining --- .../posthog/android/replay/PostHogReplayIntegration.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt index 0a5f498e..ca44ebc6 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt @@ -1646,9 +1646,10 @@ public class PostHogReplayIntegration( override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { val minimumDurationMs: Long? = synchronized(bufferingLock) { cachedMinimumDurationMs } if (minimumDurationMs == null || minimumDurationMs <= 0) { - // No minimum duration configured: should not be buffering, migrate immediately - replayQueue.migrateBufferToQueue() + // No minimum duration configured: should not be buffering, migrate immediately. + // Flip buffering first so concurrent snapshot producers route straight to replay queue. synchronized(bufferingLock) { hasPassedMinimumDuration = true } + replayQueue.migrateBufferToQueue() return } @@ -1662,8 +1663,10 @@ public class PostHogReplayIntegration( config.logger.log( "[Session Replay] Minimum duration met. Migrating ${replayQueue.bufferDepth} buffered events to replay queue.", ) - replayQueue.migrateBufferToQueue() + // Flip buffering before migration so concurrently captured snapshots + // go directly to replay queue instead of entering the soon-to-be-drained buffer. synchronized(bufferingLock) { hasPassedMinimumDuration = true } + replayQueue.migrateBufferToQueue() } } From af92495cc182359f188cc5c03964aca57c207b94 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Thu, 9 Apr 2026 02:19:56 +0300 Subject: [PATCH 6/8] feat: add unit tests --- .../replay/PostHogReplayIntegration.kt | 4 +- .../replay/PostHogReplayBufferQueueTest.kt | 181 +++++++++++ .../android/replay/PostHogReplayQueueTest.kt | 297 ++++++++++++++++++ 3 files changed, 479 insertions(+), 3 deletions(-) create mode 100644 posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt create mode 100644 posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt index ca44ebc6..677963a2 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayIntegration.kt @@ -1647,7 +1647,6 @@ public class PostHogReplayIntegration( val minimumDurationMs: Long? = synchronized(bufferingLock) { cachedMinimumDurationMs } if (minimumDurationMs == null || minimumDurationMs <= 0) { // No minimum duration configured: should not be buffering, migrate immediately. - // Flip buffering first so concurrent snapshot producers route straight to replay queue. synchronized(bufferingLock) { hasPassedMinimumDuration = true } replayQueue.migrateBufferToQueue() return @@ -1663,8 +1662,7 @@ public class PostHogReplayIntegration( config.logger.log( "[Session Replay] Minimum duration met. Migrating ${replayQueue.bufferDepth} buffered events to replay queue.", ) - // Flip buffering before migration so concurrently captured snapshots - // go directly to replay queue instead of entering the soon-to-be-drained buffer. + // Flip state before migration so new snapshots don't keep entering the buffer during long-running migrations. synchronized(bufferingLock) { hasPassedMinimumDuration = true } replayQueue.migrateBufferToQueue() } diff --git a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt new file mode 100644 index 00000000..2f8d8910 --- /dev/null +++ b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt @@ -0,0 +1,181 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.android.API_KEY +import com.posthog.internal.PostHogQueueInterface +import org.junit.Rule +import org.junit.rules.TemporaryFolder +import java.io.File +import java.util.UUID +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue + +internal class PostHogReplayBufferQueueTest { + @get:Rule + val tmpDir = TemporaryFolder() + + private class FakeQueue : PostHogQueueInterface { + val events = mutableListOf() + + override fun add(event: PostHogEvent) { + events.add(event) + } + + override fun flush() { + } + + override fun start() { + } + + override fun stop() { + } + + override fun clear() { + events.clear() + } + } + + private fun createQueue(bufferDir: File? = null): PostHogReplayBufferQueue { + val config = PostHogConfig(API_KEY) + val dir = bufferDir ?: File(tmpDir.newFolder(), "buffer") + return PostHogReplayBufferQueue(config, dir) + } + + private fun createEvent(name: String): PostHogEvent { + return PostHogEvent( + event = name, + distinctId = "test-user", + properties = mutableMapOf("name" to name), + uuid = UUID.randomUUID(), + ) + } + + @Test + fun `add increases depth`() { + val queue = createQueue() + + assertEquals(0, queue.depth) + + queue.add(createEvent("item1")) + assertEquals(1, queue.depth) + + queue.add(createEvent("item2")) + assertEquals(2, queue.depth) + } + + @Test + fun `clear resets depth to zero`() { + val queue = createQueue() + + queue.add(createEvent("item1")) + queue.add(createEvent("item2")) + assertEquals(2, queue.depth) + + queue.clear() + assertEquals(0, queue.depth) + } + + @Test + fun `init clears leftover buffer from previous session`() { + val bufferDir = File(tmpDir.newFolder(), "buffer") + bufferDir.mkdirs() + + File(bufferDir, "leftover.event").writeText("old data") + assertTrue(bufferDir.listFiles()!!.isNotEmpty()) + + val queue = createQueue(bufferDir) + assertEquals(0, queue.depth) + assertTrue(bufferDir.exists()) + assertTrue(bufferDir.listFiles()!!.isEmpty()) + } + + @Test + fun `bufferDurationMs returns null for empty queue`() { + val queue = createQueue() + assertNull(queue.bufferDurationMs) + } + + @Test + fun `bufferDurationMs returns zero for single item`() { + val queue = createQueue() + queue.add(createEvent("only")) + assertEquals(0L, queue.bufferDurationMs) + } + + @Test + fun `bufferDurationMs increases as items are added over time`() { + val queue = createQueue() + + queue.add(createEvent("first")) + val duration1 = queue.bufferDurationMs ?: 0 + + Thread.sleep(50) + queue.add(createEvent("second")) + val duration2 = queue.bufferDurationMs ?: 0 + + assertTrue(duration2 > duration1) + } + + @Test + fun `migrateAllTo moves all items to target queue`() { + val queue = createQueue() + val targetQueue = FakeQueue() + + queue.add(createEvent("item1")) + queue.add(createEvent("item2")) + queue.add(createEvent("item3")) + + val migrated = queue.migrateAllTo(targetQueue) + + assertEquals(0, queue.depth) + assertEquals(3, migrated) + assertEquals(3, targetQueue.events.size) + } + + @Test + fun `migrateAllTo preserves event data`() { + val queue = createQueue() + val targetQueue = FakeQueue() + + val names = listOf("first", "second", "third") + names.forEach { + queue.add(createEvent(it)) + Thread.sleep(20) + } + + queue.migrateAllTo(targetQueue) + + assertEquals(3, targetQueue.events.size) + assertEquals(names, targetQueue.events.map { it.event }) + } + + @Test + fun `migrateAllTo handles empty buffer gracefully`() { + val queue = createQueue() + val targetQueue = FakeQueue() + + val migrated = queue.migrateAllTo(targetQueue) + + assertEquals(0, migrated) + assertEquals(0, queue.depth) + assertEquals(0, targetQueue.events.size) + } + + @Test + fun `timestampFromUUIDv7 extracts correct milliseconds`() { + val filename = "019711e4-7c00-7000-8000-000000000000.event" + val ts = PostHogReplayBufferQueue.timestampFromUUIDv7(filename) + assertNotNull(ts) + assertEquals(1748351876096L, ts) + } + + @Test + fun `timestampFromUUIDv7 returns null for invalid filename`() { + assertNull(PostHogReplayBufferQueue.timestampFromUUIDv7("not-a-uuid.event")) + assertNull(PostHogReplayBufferQueue.timestampFromUUIDv7("")) + } +} diff --git a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt new file mode 100644 index 00000000..9c467272 --- /dev/null +++ b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt @@ -0,0 +1,297 @@ +package com.posthog.android.replay + +import com.posthog.PostHogConfig +import com.posthog.PostHogEvent +import com.posthog.android.API_KEY +import com.posthog.internal.PostHogQueueInterface +import org.junit.Rule +import org.junit.rules.TemporaryFolder +import java.util.UUID +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +internal class PostHogReplayQueueTest { + @get:Rule + val tmpDir = TemporaryFolder() + + private class FakeQueue : PostHogQueueInterface { + val events = mutableListOf() + var flushCallCount = 0 + var startCallCount = 0 + var stopCallCount = 0 + + override fun add(event: PostHogEvent) { + events.add(event) + } + + override fun flush() { + flushCallCount++ + } + + override fun start() { + startCallCount++ + } + + override fun stop() { + stopCallCount++ + } + + override fun clear() { + events.clear() + } + } + + private class MockReplayBufferDelegate : PostHogReplayBufferDelegate { + override var isBuffering: Boolean = false + var didBufferSnapshotCallCount: Int = 0 + var lastReplayQueue: PostHogReplayQueue? = null + + override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { + didBufferSnapshotCallCount++ + lastReplayQueue = replayQueue + } + } + + private fun createReplayQueue( + fakeInnerQueue: FakeQueue, + storagePrefix: String = tmpDir.newFolder().absolutePath, + ): PostHogReplayQueue { + val config = PostHogConfig(API_KEY, "http://localhost:9001") + return PostHogReplayQueue(config, fakeInnerQueue, storagePrefix) + } + + private fun createTestEvent(name: String = "test_event"): PostHogEvent { + return PostHogEvent( + event = name, + distinctId = "test-user", + properties = mutableMapOf("test" to "value"), + uuid = UUID.randomUUID(), + ) + } + + @Test + fun `add routes to buffer when delegate isBuffering is true`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + + assertEquals(2, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + assertEquals(2, delegate.didBufferSnapshotCallCount) + } + + @Test + fun `add routes to inner queue when delegate isBuffering is false`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = false } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + + assertEquals(0, queue.bufferDepth) + assertEquals(2, fakeInnerQueue.events.size) + assertEquals(0, delegate.didBufferSnapshotCallCount) + } + + @Test + fun `flush is suppressed when buffering`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.flush() + + assertEquals(1, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.flushCallCount) + } + + @Test + fun `flush delegates to inner queue when not buffering`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = false } + queue.bufferDelegate = delegate + + queue.flush() + + assertEquals(1, fakeInnerQueue.flushCallCount) + } + + @Test + fun `migrateBufferToQueue moves buffered events to inner queue`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + queue.add(createTestEvent("snapshot_3")) + + assertEquals(3, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + + queue.migrateBufferToQueue() + + assertEquals(0, queue.bufferDepth) + assertEquals(3, fakeInnerQueue.events.size) + assertEquals(listOf("snapshot_1", "snapshot_2", "snapshot_3"), fakeInnerQueue.events.map { it.event }) + } + + @Test + fun `migrateBufferToQueue handles empty buffer gracefully`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + + queue.migrateBufferToQueue() + + assertEquals(0, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + } + + @Test + fun `clearBuffer discards all buffered events`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("snapshot_1")) + queue.add(createTestEvent("snapshot_2")) + + assertEquals(2, queue.bufferDepth) + + queue.clearBuffer() + + assertEquals(0, queue.bufferDepth) + } + + @Test + fun `clear removes both buffer and inner queue events`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + + val delegate = MockReplayBufferDelegate().apply { isBuffering = false } + queue.bufferDelegate = delegate + queue.add(createTestEvent("direct_1")) + assertEquals(1, fakeInnerQueue.events.size) + + delegate.isBuffering = true + queue.add(createTestEvent("buffered_1")) + assertEquals(1, queue.bufferDepth) + + queue.clear() + + assertEquals(0, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) + } + + @Test + fun `start and stop delegate to inner queue`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + + queue.start() + queue.stop() + + assertEquals(1, fakeInnerQueue.startCallCount) + assertEquals(1, fakeInnerQueue.stopCallCount) + } + + @Test + fun `events go directly to inner queue after migration when buffering disabled`() { + val fakeInnerQueue = FakeQueue() + val queue = createReplayQueue(fakeInnerQueue) + val delegate = MockReplayBufferDelegate().apply { isBuffering = true } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("buffered_1")) + queue.add(createTestEvent("buffered_2")) + assertEquals(2, queue.bufferDepth) + + queue.migrateBufferToQueue() + delegate.isBuffering = false + + queue.add(createTestEvent("direct_1")) + + assertEquals(0, queue.bufferDepth) + assertEquals(3, fakeInnerQueue.events.size) + assertTrue(fakeInnerQueue.events.map { it.event }.containsAll(listOf("buffered_1", "buffered_2", "direct_1"))) + } + + @Test + fun `concurrent snapshot goes direct when delegate flips buffering before migration`() { + val migrationStarted = CountDownLatch(1) + val allowMigrationToContinue = CountDownLatch(1) + + val innerQueue = + object : PostHogQueueInterface { + val events = mutableListOf() + + override fun add(event: PostHogEvent) { + synchronized(events) { events.add(event) } + if (event.event == "buffered_1") { + migrationStarted.countDown() + allowMigrationToContinue.await(2, TimeUnit.SECONDS) + } + } + + override fun flush() {} + + override fun start() {} + + override fun stop() {} + + override fun clear() { + synchronized(events) { events.clear() } + } + } + + val config = PostHogConfig(API_KEY, "http://localhost:9001") + val queue = PostHogReplayQueue(config, innerQueue, tmpDir.newFolder().absolutePath) + + val delegate = + object : PostHogReplayBufferDelegate { + override var isBuffering: Boolean = true + + override fun onReplayBufferSnapshot(replayQueue: PostHogReplayQueue) { + if (isBuffering && replayQueue.bufferDepth >= 2) { + isBuffering = false + replayQueue.migrateBufferToQueue() + } + } + } + queue.bufferDelegate = delegate + + queue.add(createTestEvent("buffered_1")) + + val migrationThread = + Thread { + queue.add(createTestEvent("buffered_2")) + }.apply { start() } + + assertTrue(migrationStarted.await(2, TimeUnit.SECONDS)) + + queue.add(createTestEvent("concurrent_direct")) + + allowMigrationToContinue.countDown() + migrationThread.join(2000) + + assertEquals(0, queue.bufferDepth) + synchronized(innerQueue.events) { + assertEquals(3, innerQueue.events.size) + assertTrue(innerQueue.events.map { it.event }.containsAll(listOf("buffered_1", "buffered_2", "concurrent_direct"))) + } + } +} From 06531295584c1d1f9dea229d799732104132e8e8 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Thu, 9 Apr 2026 02:25:38 +0300 Subject: [PATCH 7/8] chore: add changeset --- .changeset/humble-boxes-jump.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/humble-boxes-jump.md diff --git a/.changeset/humble-boxes-jump.md b/.changeset/humble-boxes-jump.md new file mode 100644 index 00000000..e3a19bf3 --- /dev/null +++ b/.changeset/humble-boxes-jump.md @@ -0,0 +1,6 @@ +--- +"posthog-android": minor +"posthog": minor +--- + +feat: support session replay minimum recording duration From b4d525c1171d778997d7b2149a1a75b88ba29ef8 Mon Sep 17 00:00:00 2001 From: Ioannis J Date: Thu, 9 Apr 2026 13:11:15 +0300 Subject: [PATCH 8/8] fix: move with disk operation when migrating --- posthog-android/api/posthog-android.api | 12 ++- .../replay/PostHogReplayBufferQueue.kt | 55 +++++++---- .../replay/PostHogReplayBufferQueueTest.kt | 30 +----- .../android/replay/PostHogReplayQueueTest.kt | 96 ++++++------------- posthog/api/posthog.api | 13 +++ .../java/com/posthog/internal/PostHogQueue.kt | 65 +++++++++---- 6 files changed, 139 insertions(+), 132 deletions(-) diff --git a/posthog-android/api/posthog-android.api b/posthog-android/api/posthog-android.api index faab4159..fb35dd13 100644 --- a/posthog-android/api/posthog-android.api +++ b/posthog-android/api/posthog-android.api @@ -62,7 +62,7 @@ public final class com/posthog/android/replay/PostHogMaskModifier { public static synthetic fun postHogUnmask$default (Lcom/posthog/android/replay/PostHogMaskModifier;Landroidx/compose/ui/Modifier;ZILjava/lang/Object;)Landroidx/compose/ui/Modifier; } -public final class com/posthog/android/replay/PostHogReplayIntegration : com/posthog/PostHogIntegration, com/posthog/internal/replay/PostHogSessionReplayHandler { +public final class com/posthog/android/replay/PostHogReplayIntegration : com/posthog/PostHogIntegration, com/posthog/android/replay/PostHogReplayBufferDelegate, com/posthog/internal/replay/PostHogSessionReplayHandler { public static final field ANDROID_COMPOSE_VIEW Ljava/lang/String; public static final field ANDROID_COMPOSE_VIEW_CLASS_NAME Ljava/lang/String; public static final field PH_NO_CAPTURE_LABEL Ljava/lang/String; @@ -70,12 +70,22 @@ public final class com/posthog/android/replay/PostHogReplayIntegration : com/pos public fun (Landroid/content/Context;Lcom/posthog/android/PostHogAndroidConfig;Lcom/posthog/android/internal/MainHandler;)V public fun install (Lcom/posthog/PostHogInterface;)V public fun isActive ()Z + public fun isBuffering ()Z public fun onRemoteConfig ()V + public fun onReplayBufferSnapshot (Lcom/posthog/android/replay/PostHogReplayQueue;)V public fun start (Z)V public fun stop ()V public fun uninstall ()V } +public final class com/posthog/android/replay/PostHogReplayQueue : com/posthog/internal/PostHogQueueInterface { + public fun add (Lcom/posthog/PostHogEvent;)V + public fun clear ()V + public fun flush ()V + public fun start ()V + public fun stop ()V +} + public final class com/posthog/android/replay/PostHogSessionReplayConfig { public fun ()V public fun (Z)V diff --git a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt index 939a16a4..83c7d4e2 100644 --- a/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt +++ b/posthog-android/src/main/java/com/posthog/android/replay/PostHogReplayBufferQueue.kt @@ -2,6 +2,7 @@ package com.posthog.android.replay import com.posthog.PostHogConfig import com.posthog.PostHogEvent +import com.posthog.internal.PostHogQueue import com.posthog.internal.PostHogQueueInterface import com.posthog.vendor.uuid.TimeBasedEpochGenerator import java.io.File @@ -82,12 +83,25 @@ internal class PostHogReplayBufferQueue( } /** - * Migrates all buffered items to the target queue by deserializing each buffered - * event and re-adding it to the target queue. + * Migrates all buffered items to the target queue. + * + * Migration is supported for [PostHogQueue] targets by moving files on disk + * and reloading the target queue from disk. * * Returns the number of events successfully migrated. */ fun migrateAllTo(targetQueue: PostHogQueueInterface): Int { + if (targetQueue !is PostHogQueue) { + config.logger.log("Replay buffer migration skipped: target queue is not PostHogQueue") + return 0 + } + + val targetDir = targetQueue.queueDirectory + if (targetDir == null) { + config.logger.log("Replay queue has no disk directory configured. Skipping buffer migration.") + return 0 + } + val itemsToMigrate: List = synchronized(itemsLock) { val copy = items.toList() @@ -95,25 +109,36 @@ internal class PostHogReplayBufferQueue( copy } + try { + targetDir.mkdirs() + } catch (e: Throwable) { + config.logger.log("Error creating replay target queue directory: $e") + } + var migratedCount = 0 for (item in itemsToMigrate) { val sourceFile = File(bufferDir, item) + if (!sourceFile.exists()) { + continue + } + val targetFile = File(targetDir, item) try { - val input = config.encryption?.decrypt(sourceFile.inputStream()) ?: sourceFile.inputStream() - input.use { - val event = config.serializer.deserialize(it.reader().buffered()) - if (event != null) { - targetQueue.add(event) - migratedCount++ - } + if (targetFile.exists()) { + sourceFile.delete() + continue + } + + if (sourceFile.renameTo(targetFile)) { + migratedCount++ + } else { + config.logger.log("Failed to move replay buffer item $item") } } catch (e: Throwable) { config.logger.log("Failed to migrate replay buffer item $item: $e") - } finally { - deleteFileSafely(sourceFile) } } + targetQueue.reloadFromDisk() return migratedCount } @@ -124,14 +149,6 @@ internal class PostHogReplayBufferQueue( setup() } - private fun deleteFileSafely(file: File) { - try { - file.delete() - } catch (e: Throwable) { - config.logger.log("Error deleting replay buffer file ${file.name}: $e") - } - } - companion object { /** * Extracts the millisecond epoch timestamp from a UUID v7 filename. diff --git a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt index 2f8d8910..abf9247e 100644 --- a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt +++ b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayBufferQueueTest.kt @@ -121,40 +121,20 @@ internal class PostHogReplayBufferQueueTest { } @Test - fun `migrateAllTo moves all items to target queue`() { + fun `migrateAllTo noops when target queue is not PostHogQueue`() { val queue = createQueue() val targetQueue = FakeQueue() queue.add(createEvent("item1")) - queue.add(createEvent("item2")) - queue.add(createEvent("item3")) - val migrated = queue.migrateAllTo(targetQueue) - assertEquals(0, queue.depth) - assertEquals(3, migrated) - assertEquals(3, targetQueue.events.size) - } - - @Test - fun `migrateAllTo preserves event data`() { - val queue = createQueue() - val targetQueue = FakeQueue() - - val names = listOf("first", "second", "third") - names.forEach { - queue.add(createEvent(it)) - Thread.sleep(20) - } - - queue.migrateAllTo(targetQueue) - - assertEquals(3, targetQueue.events.size) - assertEquals(names, targetQueue.events.map { it.event }) + assertEquals(0, migrated) + assertEquals(1, queue.depth) + assertEquals(0, targetQueue.events.size) } @Test - fun `migrateAllTo handles empty buffer gracefully`() { + fun `migrateAllTo noops for non PostHogQueue target when empty`() { val queue = createQueue() val targetQueue = FakeQueue() diff --git a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt index 9c467272..ca3b742f 100644 --- a/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt +++ b/posthog-android/src/test/java/com/posthog/android/replay/PostHogReplayQueueTest.kt @@ -7,11 +7,8 @@ import com.posthog.internal.PostHogQueueInterface import org.junit.Rule import org.junit.rules.TemporaryFolder import java.util.UUID -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import kotlin.test.Test import kotlin.test.assertEquals -import kotlin.test.assertTrue internal class PostHogReplayQueueTest { @get:Rule @@ -55,6 +52,10 @@ internal class PostHogReplayQueueTest { } } + private fun createFakeQueue(): FakeQueue { + return FakeQueue() + } + private fun createReplayQueue( fakeInnerQueue: FakeQueue, storagePrefix: String = tmpDir.newFolder().absolutePath, @@ -74,7 +75,7 @@ internal class PostHogReplayQueueTest { @Test fun `add routes to buffer when delegate isBuffering is true`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = true } queue.bufferDelegate = delegate @@ -89,7 +90,7 @@ internal class PostHogReplayQueueTest { @Test fun `add routes to inner queue when delegate isBuffering is false`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = false } queue.bufferDelegate = delegate @@ -104,7 +105,7 @@ internal class PostHogReplayQueueTest { @Test fun `flush is suppressed when buffering`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = true } queue.bufferDelegate = delegate @@ -118,7 +119,7 @@ internal class PostHogReplayQueueTest { @Test fun `flush delegates to inner queue when not buffering`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = false } queue.bufferDelegate = delegate @@ -129,8 +130,8 @@ internal class PostHogReplayQueueTest { } @Test - fun `migrateBufferToQueue moves buffered events to inner queue`() { - val fakeInnerQueue = FakeQueue() + fun `migrateBufferToQueue noops when inner queue is not PostHogQueue`() { + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = true } queue.bufferDelegate = delegate @@ -144,14 +145,13 @@ internal class PostHogReplayQueueTest { queue.migrateBufferToQueue() - assertEquals(0, queue.bufferDepth) - assertEquals(3, fakeInnerQueue.events.size) - assertEquals(listOf("snapshot_1", "snapshot_2", "snapshot_3"), fakeInnerQueue.events.map { it.event }) + assertEquals(3, queue.bufferDepth) + assertEquals(0, fakeInnerQueue.events.size) } @Test fun `migrateBufferToQueue handles empty buffer gracefully`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) queue.migrateBufferToQueue() @@ -162,7 +162,7 @@ internal class PostHogReplayQueueTest { @Test fun `clearBuffer discards all buffered events`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = true } queue.bufferDelegate = delegate @@ -179,7 +179,7 @@ internal class PostHogReplayQueueTest { @Test fun `clear removes both buffer and inner queue events`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = false } @@ -199,7 +199,7 @@ internal class PostHogReplayQueueTest { @Test fun `start and stop delegate to inner queue`() { - val fakeInnerQueue = FakeQueue() + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) queue.start() @@ -210,8 +210,8 @@ internal class PostHogReplayQueueTest { } @Test - fun `events go directly to inner queue after migration when buffering disabled`() { - val fakeInnerQueue = FakeQueue() + fun `events go directly to inner queue after buffering disabled even if migration noops`() { + val fakeInnerQueue = createFakeQueue() val queue = createReplayQueue(fakeInnerQueue) val delegate = MockReplayBufferDelegate().apply { isBuffering = true } queue.bufferDelegate = delegate @@ -225,41 +225,15 @@ internal class PostHogReplayQueueTest { queue.add(createTestEvent("direct_1")) - assertEquals(0, queue.bufferDepth) - assertEquals(3, fakeInnerQueue.events.size) - assertTrue(fakeInnerQueue.events.map { it.event }.containsAll(listOf("buffered_1", "buffered_2", "direct_1"))) + assertEquals(2, queue.bufferDepth) + assertEquals(1, fakeInnerQueue.events.size) + assertEquals("direct_1", fakeInnerQueue.events.first().event) } @Test - fun `concurrent snapshot goes direct when delegate flips buffering before migration`() { - val migrationStarted = CountDownLatch(1) - val allowMigrationToContinue = CountDownLatch(1) - - val innerQueue = - object : PostHogQueueInterface { - val events = mutableListOf() - - override fun add(event: PostHogEvent) { - synchronized(events) { events.add(event) } - if (event.event == "buffered_1") { - migrationStarted.countDown() - allowMigrationToContinue.await(2, TimeUnit.SECONDS) - } - } - - override fun flush() {} - - override fun start() {} - - override fun stop() {} - - override fun clear() { - synchronized(events) { events.clear() } - } - } - - val config = PostHogConfig(API_KEY, "http://localhost:9001") - val queue = PostHogReplayQueue(config, innerQueue, tmpDir.newFolder().absolutePath) + fun `delegate can disable buffering even if migration noops`() { + val innerQueue = createFakeQueue() + val queue = createReplayQueue(innerQueue) val delegate = object : PostHogReplayBufferDelegate { @@ -275,23 +249,11 @@ internal class PostHogReplayQueueTest { queue.bufferDelegate = delegate queue.add(createTestEvent("buffered_1")) + queue.add(createTestEvent("buffered_2")) + queue.add(createTestEvent("direct_after_disable")) - val migrationThread = - Thread { - queue.add(createTestEvent("buffered_2")) - }.apply { start() } - - assertTrue(migrationStarted.await(2, TimeUnit.SECONDS)) - - queue.add(createTestEvent("concurrent_direct")) - - allowMigrationToContinue.countDown() - migrationThread.join(2000) - - assertEquals(0, queue.bufferDepth) - synchronized(innerQueue.events) { - assertEquals(3, innerQueue.events.size) - assertTrue(innerQueue.events.map { it.event }.containsAll(listOf("buffered_1", "buffered_2", "concurrent_direct"))) - } + assertEquals(2, queue.bufferDepth) + assertEquals(1, innerQueue.events.size) + assertEquals("direct_after_disable", innerQueue.events.first().event) } } diff --git a/posthog/api/posthog.api b/posthog/api/posthog.api index de5bd199..2db86885 100644 --- a/posthog/api/posthog.api +++ b/posthog/api/posthog.api @@ -817,6 +817,18 @@ public final class com/posthog/internal/PostHogPrintLogger : com/posthog/interna public fun log (Ljava/lang/String;)V } +public final class com/posthog/internal/PostHogQueue : com/posthog/internal/PostHogQueueInterface { + public fun (Lcom/posthog/PostHogConfig;Lcom/posthog/internal/PostHogApi;Lcom/posthog/internal/PostHogApiEndpoint;Ljava/lang/String;Ljava/util/concurrent/ExecutorService;)V + public fun add (Lcom/posthog/PostHogEvent;)V + public fun clear ()V + public fun flush ()V + public final fun getDequeList ()Ljava/util/List; + public final fun getQueueDirectory ()Ljava/io/File; + public final fun reloadFromDisk ()V + public fun start ()V + public fun stop ()V +} + public abstract interface class com/posthog/internal/PostHogQueueInterface { public abstract fun add (Lcom/posthog/PostHogEvent;)V public abstract fun clear ()V @@ -838,6 +850,7 @@ public final class com/posthog/internal/PostHogRemoteConfig : com/posthog/intern public fun getFeatureFlagResult (Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lcom/posthog/FeatureFlagResult; public fun getFeatureFlags (Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Ljava/util/Map; public final fun getFlagDetails (Ljava/lang/String;)Lcom/posthog/internal/FeatureFlag; + public final fun getRecordingMinimumDurationMs ()Ljava/lang/Long; public fun getRequestId (Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Ljava/lang/String; public final fun getSessionRecordingSampleRate ()Ljava/lang/Double; public final fun getSurveys ()Ljava/util/List; diff --git a/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt b/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt index b9e6d12d..2bd2c80e 100644 --- a/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt +++ b/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt @@ -56,6 +56,9 @@ public class PostHogQueue( private val delay: Long get() = (config.flushIntervalSeconds * 1000).toLong() + public val queueDirectory: File? + get() = storagePrefix?.let { File(it, config.apiKey) } + private fun addEventSync(event: PostHogEvent): Boolean { storagePrefix?.let { val dir = File(it, config.apiKey) @@ -409,31 +412,53 @@ public class PostHogQueue( * with any new events added after SDK start. */ private fun loadCachedEvents() { - storagePrefix?.let { - val dir = File(it, config.apiKey) + val files = loadQueueFilesFromDisk() + if (files.isEmpty()) return - if (!dir.existsSafely(config)) { - return - } + synchronized(dequeLock) { + // prepend cached files before any events already in the deque + // so that older events are sent first + val existingFiles = deque.toList() + deque.clear() + deque.addAll(files) + deque.addAll(existingFiles) + } + config.logger.log("Loaded ${files.size} cached events from disk for $endpoint.") + } - val files = (dir.listFiles() ?: emptyArray()).toMutableList() + private fun loadQueueFilesFromDisk(): List { + val dir = queueDirectory ?: return emptyList() - if (files.isEmpty()) return + if (!dir.existsSafely(config)) { + return emptyList() + } - // sort by last modified date ascending so events are sent in order - files.sortBy { file -> file.lastModified() } + val files = (dir.listFiles() ?: emptyArray()).toMutableList() + if (files.isEmpty()) { + return emptyList() + } - if (files.isNotEmpty()) { - synchronized(dequeLock) { - // prepend cached files before any events already in the deque - // so that older events are sent first - val existingFiles = deque.toList() - deque.clear() - deque.addAll(files) - deque.addAll(existingFiles) - } - config.logger.log("Loaded ${files.size} cached events from disk for $endpoint.") - } + // sort by last modified date ascending so events are sent in order + files.sortBy { file -> file.lastModified() } + return files + } + + private fun reloadFromDiskSync() { + val files = loadQueueFilesFromDisk() + synchronized(dequeLock) { + deque.clear() + deque.addAll(files) + } + cachedEventsLoaded = true + } + + /** + * Rebuilds the in-memory deque from disk, sorted by file last-modified time. + */ + @PostHogInternal + public fun reloadFromDisk() { + executor.submitSyncSafely { + reloadFromDiskSync() } }