diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt index a8808fa5b..61f9e87e7 100644 --- a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt @@ -19,7 +19,6 @@ import com.flipcash.services.controllers.ChatController import com.flipcash.services.controllers.ChatMessagingController import com.flipcash.services.controllers.EventStreamingController import com.flipcash.services.models.chat.ChatId -import com.flipcash.services.models.chat.ChatMember import com.flipcash.services.models.chat.ChatMessage import com.flipcash.services.models.chat.MessagePointer import com.flipcash.services.models.chat.ChatUpdate @@ -59,6 +58,7 @@ import kotlinx.coroutines.launch import javax.inject.Inject import javax.inject.Singleton import kotlin.time.Clock +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds @Singleton @@ -78,6 +78,8 @@ class ChatCoordinator @Inject constructor( companion object { private const val TAG = "ChatCoordinator" private val HEARTBEAT_INTERVAL = 30.seconds + private const val RETRY_BASE_MS = 2_000L + private const val RETRY_MAX_MS = 60_000L } private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) @@ -87,6 +89,7 @@ class ChatCoordinator @Inject constructor( private var eventStreamCollectJob: Job? = null private var eventStreamRetryJob: Job? = null private var heartbeatJob: Job? = null + private var retryAttempt = 0 val state: StateFlow get() = _state.asStateFlow() @@ -339,21 +342,34 @@ class ChatCoordinator @Inject constructor( } } - private fun openEventStream() { + private fun openEventStream(force: Boolean = false) { + if (!force && eventStreamingController.isConnected) { + trace(tag = TAG, message = "Event stream already connected, skipping open", type = TraceType.Process) + ensureCollector() + return + } + eventStreamRetryJob?.cancel() val opened = eventStreamingController.open(scope) { - // Stream died after exhausting retries — schedule a re-open + // Stream died — schedule a re-open with exponential backoff + val attempt = retryAttempt++ + val delayMs = (RETRY_BASE_MS * (1L shl attempt.coerceAtMost(5))) + .coerceAtMost(RETRY_MAX_MS) + trace(tag = TAG, message = "Event stream error, retry #${attempt + 1} in ${delayMs}ms", type = TraceType.Process) eventStreamRetryJob = scope.launch { - delay(5_000) - trace(tag = TAG, message = "Retrying event stream after failure", type = TraceType.Process) - openEventStream() + delay(delayMs.milliseconds) + openEventStream(force = true) } } - if (!opened) { + if (opened) { + retryAttempt = 0 + } else { trace(tag = TAG, message = "Event stream failed to open", type = TraceType.Error) } - // Always ensure a collector is running so events are processed - // as soon as the stream (re)connects. + ensureCollector() + } + + private fun ensureCollector() { if (eventStreamCollectJob?.isActive != true) { eventStreamCollectJob = scope.launch { eventStreamingController.chatUpdates.collect { applyUpdate(it) } @@ -374,10 +390,9 @@ class ChatCoordinator @Inject constructor( heartbeatJob = scope.launch { while (true) { delay(HEARTBEAT_INTERVAL) - trace(tag = TAG, message = "Heartbeat: syncing feed", type = TraceType.Process) - syncFeed() if (!eventStreamingController.isConnected) { - trace(tag = TAG, message = "Heartbeat: event stream dead, reconnecting", type = TraceType.Process) + trace(tag = TAG, message = "Heartbeat: event stream dead, syncing feed and reconnecting", type = TraceType.Process) + syncFeed() openEventStream() } } @@ -397,45 +412,66 @@ class ChatCoordinator @Inject constructor( type = TraceType.Process, ) - // New messages - if (update.newMessages.isNotEmpty()) { + // --- Persist to DB first (suspend, off main thread) --- + + val lastMsg = if (update.newMessages.isNotEmpty()) { trace(tag = TAG, message = "Upserting ${update.newMessages.size} new messages for $chatId", type = TraceType.Process) messageDataSource.upsert(chatId, update.newMessages) + update.newMessages.maxByOrNull { it.messageId }?.also { msg -> + metadataDataSource.updateLastMessageId(chatId, msg.messageId) + metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds()) + } + } else null - val lastMsg = update.newMessages.maxByOrNull { it.messageId } - if (lastMsg != null) { - metadataDataSource.updateLastMessageId(chatId, lastMsg.messageId) - metadataDataSource.updateLastActivity(chatId, lastMsg.timestamp.toEpochMilliseconds()) + for (pointer in update.pointerUpdates) { + memberDataSource.updatePointers(chatId, pointer) + } - _state.update { state -> - val exists = state.feed.any { it.chatId == chatId } - val updatedFeed = if (exists) { - state.feed.map { meta -> - if (meta.chatId == chatId) { - meta.copy( - lastMessage = lastMsg, - lastActivity = lastMsg.timestamp, - ) - } else meta - } - } else { - // New chat not yet in feed — trigger a sync to pick up full metadata - syncFeed() - state.feed - } - state.copy(feed = updatedFeed) + for (metaUpdate in update.metadataUpdates) { + when (metaUpdate) { + is MetadataUpdate.FullRefresh -> { + metadataDataSource.upsert(metaUpdate.metadata) + memberDataSource.deleteForChat(metaUpdate.metadata.chatId) + memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members) + } + is MetadataUpdate.LastActivityChanged -> { + metadataDataSource.updateLastActivity( + chatId, + metaUpdate.newLastActivity.toEpochMilliseconds(), + ) } } } - // Pointer updates - for (pointer in update.pointerUpdates) { - memberDataSource.updatePointers(chatId, pointer) - } + // --- Single atomic state update for all in-memory changes --- + + var needsFeedSync = false - if (update.pointerUpdates.isNotEmpty()) { - _state.update { state -> - val updatedFeed = state.feed.map { meta -> + _state.update { state -> + var feed = state.feed + var typingIndicators = state.typingIndicators + + // New messages → update feed last message + if (lastMsg != null) { + val exists = feed.any { it.chatId == chatId } + feed = if (exists) { + feed.map { meta -> + if (meta.chatId == chatId) { + meta.copy( + lastMessage = lastMsg, + lastActivity = lastMsg.timestamp, + ) + } else meta + } + } else { + needsFeedSync = true + feed + } + } + + // Pointer updates → merge into member pointers + if (update.pointerUpdates.isNotEmpty()) { + feed = feed.map { meta -> if (meta.chatId == chatId) { meta.copy(members = meta.members.map { member -> val memberPointerUpdates = update.pointerUpdates @@ -451,51 +487,37 @@ class ChatCoordinator @Inject constructor( }) } else meta } - state.copy(feed = updatedFeed) } - } - // Typing notifications (ephemeral, in-memory only) - if (update.typingNotifications.isNotEmpty()) { - _state.update { state -> - val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() + // Typing notifications (ephemeral) + if (update.typingNotifications.isNotEmpty()) { + val currentTypists = typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() for (notification in update.typingNotifications) { applyTypingNotification(currentTypists, notification) } - state.copy( - typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet()) - ) + typingIndicators = typingIndicators + (chatId to currentTypists.toSet()) } - } - - // Metadata updates - for (metaUpdate in update.metadataUpdates) { - when (metaUpdate) { - is MetadataUpdate.FullRefresh -> { - metadataDataSource.upsert(metaUpdate.metadata) - memberDataSource.deleteForChat(metaUpdate.metadata.chatId) - memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members) - _state.update { state -> - val exists = state.feed.any { it.chatId == metaUpdate.metadata.chatId } - val updatedFeed = if (exists) { - state.feed.map { - if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it - } - } else { - state.feed + metaUpdate.metadata + // Metadata full refreshes + for (metaUpdate in update.metadataUpdates) { + if (metaUpdate is MetadataUpdate.FullRefresh) { + val exists = feed.any { it.chatId == metaUpdate.metadata.chatId } + feed = if (exists) { + feed.map { + if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it } - state.copy(feed = updatedFeed) + } else { + feed + metaUpdate.metadata } } - - is MetadataUpdate.LastActivityChanged -> { - metadataDataSource.updateLastActivity( - chatId, - metaUpdate.newLastActivity.toEpochMilliseconds(), - ) - } } + + state.copy(feed = feed, typingIndicators = typingIndicators) + } + + // Side effects after state update + if (needsFeedSync) { + syncFeed() } } diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt index c1f76c462..47607a2a9 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt @@ -6,9 +6,9 @@ import com.flipcash.services.repository.EventStreamingRepository import com.flipcash.services.user.UserManager import com.getcode.utils.trace import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.receiveAsFlow import javax.inject.Inject import javax.inject.Singleton @@ -17,12 +17,8 @@ class EventStreamingController @Inject constructor( private val repository: EventStreamingRepository, private val userManager: UserManager, ) { - private val _chatUpdates = MutableSharedFlow( - replay = 0, - extraBufferCapacity = 64, - onBufferOverflow = kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST, - ) - val chatUpdates: SharedFlow = _chatUpdates.asSharedFlow() + private val _chatUpdates = Channel(capacity = Channel.UNLIMITED) + val chatUpdates: Flow = _chatUpdates.receiveAsFlow() private var streamRef: EventStreamReference? = null @@ -44,7 +40,7 @@ class EventStreamingController @Inject constructor( owner = owner, onEvent = { update -> trace("EventStreamingController: Received chat update, messages=${update.newMessages.size}") - _chatUpdates.tryEmit(update) + _chatUpdates.trySend(update) }, onError = { error -> trace("EventStreamingController: Stream error: ${error.message}") diff --git a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt index c844c836b..24b2eedd8 100644 --- a/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt +++ b/services/flipcash/src/test/kotlin/com/flipcash/services/controllers/EventStreamingControllerTest.kt @@ -66,7 +66,7 @@ class EventStreamingControllerTest { } @Test - fun `chatUpdates SharedFlow is accessible`() { + fun `chatUpdates flow is accessible`() { assertNotNull(controller.chatUpdates) } }