|
| 1 | +@file:OptIn(ExperimentalCoroutinesApi::class) |
| 2 | + |
| 3 | +package com.flipcash.shared.chat |
| 4 | + |
| 5 | +import androidx.compose.runtime.staticCompositionLocalOf |
| 6 | +import androidx.lifecycle.DefaultLifecycleObserver |
| 7 | +import androidx.lifecycle.LifecycleOwner |
| 8 | +import androidx.lifecycle.ProcessLifecycleOwner |
| 9 | +import com.flipcash.app.persistence.sources.ChatMemberDataSource |
| 10 | +import com.flipcash.app.persistence.sources.ChatMessageDataSource |
| 11 | +import com.flipcash.app.persistence.sources.ChatMetadataDataSource |
| 12 | +import com.flipcash.services.controllers.ChatController |
| 13 | +import com.flipcash.services.controllers.ChatMessagingController |
| 14 | +import com.flipcash.services.controllers.EventStreamingController |
| 15 | +import com.flipcash.services.models.chat.ChatId |
| 16 | +import com.flipcash.services.models.chat.ChatMessage |
| 17 | +import com.flipcash.services.models.chat.ChatUpdate |
| 18 | +import com.flipcash.services.models.chat.MessageContent |
| 19 | +import com.flipcash.services.models.chat.MetadataUpdate |
| 20 | +import com.flipcash.services.models.chat.PointerType |
| 21 | +import com.flipcash.services.models.chat.TypingNotification |
| 22 | +import com.flipcash.services.models.chat.TypingState |
| 23 | +import com.flipcash.services.user.UserManager |
| 24 | +import com.getcode.opencode.model.accounts.AccountCluster |
| 25 | +import com.getcode.opencode.providers.SessionListener |
| 26 | +import com.getcode.utils.TraceType |
| 27 | +import com.getcode.utils.network.NetworkConnectivityListener |
| 28 | +import com.getcode.utils.trace |
| 29 | +import kotlinx.coroutines.CoroutineScope |
| 30 | +import kotlinx.coroutines.Dispatchers |
| 31 | +import kotlinx.coroutines.ExperimentalCoroutinesApi |
| 32 | +import kotlinx.coroutines.Job |
| 33 | +import kotlinx.coroutines.SupervisorJob |
| 34 | +import kotlinx.coroutines.flow.Flow |
| 35 | +import kotlinx.coroutines.flow.MutableStateFlow |
| 36 | +import kotlinx.coroutines.flow.StateFlow |
| 37 | +import kotlinx.coroutines.flow.asStateFlow |
| 38 | +import kotlinx.coroutines.flow.debounce |
| 39 | +import kotlinx.coroutines.flow.distinctUntilChanged |
| 40 | +import kotlinx.coroutines.flow.filter |
| 41 | +import kotlinx.coroutines.flow.filterNotNull |
| 42 | +import kotlinx.coroutines.flow.firstOrNull |
| 43 | +import kotlinx.coroutines.flow.flatMapLatest |
| 44 | +import kotlinx.coroutines.flow.launchIn |
| 45 | +import kotlinx.coroutines.flow.map |
| 46 | +import kotlinx.coroutines.flow.onEach |
| 47 | +import kotlinx.coroutines.flow.update |
| 48 | +import kotlinx.coroutines.launch |
| 49 | +import javax.inject.Inject |
| 50 | +import javax.inject.Singleton |
| 51 | +import kotlin.time.Clock |
| 52 | +import kotlin.time.Duration.Companion.seconds |
| 53 | + |
| 54 | +@Singleton |
| 55 | +class ChatCoordinator @Inject constructor( |
| 56 | + private val chatController: ChatController, |
| 57 | + private val messagingController: ChatMessagingController, |
| 58 | + private val eventStreamingController: EventStreamingController, |
| 59 | + private val metadataDataSource: ChatMetadataDataSource, |
| 60 | + private val messageDataSource: ChatMessageDataSource, |
| 61 | + private val memberDataSource: ChatMemberDataSource, |
| 62 | + private val networkObserver: NetworkConnectivityListener, |
| 63 | + private val userManager: UserManager, |
| 64 | +) : SessionListener, DefaultLifecycleObserver { |
| 65 | + |
| 66 | + companion object { |
| 67 | + private const val TAG = "ChatCoordinator" |
| 68 | + } |
| 69 | + |
| 70 | + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) |
| 71 | + private val cluster = MutableStateFlow<AccountCluster?>(null) |
| 72 | + private val _state = MutableStateFlow(ChatState()) |
| 73 | + |
| 74 | + private var syncJob: Job? = null |
| 75 | + private var eventStreamCollectJob: Job? = null |
| 76 | + |
| 77 | + val state: StateFlow<ChatState> |
| 78 | + get() = _state.asStateFlow() |
| 79 | + |
| 80 | + val feed: Flow<List<ChatSummary>> |
| 81 | + get() = _state.map { state -> |
| 82 | + state.feed.map { metadata -> |
| 83 | + val readPointer = metadata.members |
| 84 | + .firstOrNull { it.userId == userManager.accountId } |
| 85 | + ?.pointers |
| 86 | + ?.firstOrNull { it.type == PointerType.READ } |
| 87 | + ?.value ?: 0L |
| 88 | + |
| 89 | + val unreadCount = metadata.lastMessage?.let { lastMsg -> |
| 90 | + if (lastMsg.messageId > readPointer) 1 else 0 |
| 91 | + } ?: 0 |
| 92 | + |
| 93 | + ChatSummary(metadata = metadata, unreadCount = unreadCount) |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + // region SessionListener |
| 98 | + |
| 99 | + override suspend fun onUserLoggedIn(cluster: AccountCluster) { |
| 100 | + trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User) |
| 101 | + this.cluster.value = cluster |
| 102 | + hydrateFromPersistence() |
| 103 | + } |
| 104 | + |
| 105 | + // endregion |
| 106 | + |
| 107 | + // region Lifecycle |
| 108 | + |
| 109 | + init { |
| 110 | + ProcessLifecycleOwner.get().lifecycle.addObserver(this) |
| 111 | + |
| 112 | + cluster.filterNotNull() |
| 113 | + .flatMapLatest { networkObserver.state } |
| 114 | + .distinctUntilChanged() |
| 115 | + .filter { it.connected } |
| 116 | + .debounce(1.seconds) |
| 117 | + .onEach { |
| 118 | + trace(tag = TAG, message = "Network connected, re-syncing chat feed", type = TraceType.Process) |
| 119 | + syncFeed() |
| 120 | + openEventStream() |
| 121 | + } |
| 122 | + .launchIn(scope) |
| 123 | + } |
| 124 | + |
| 125 | + override fun onStart(owner: LifecycleOwner) { |
| 126 | + if (cluster.value != null) { |
| 127 | + trace(tag = TAG, message = "Lifecycle resumed, syncing chat feed", type = TraceType.Process) |
| 128 | + syncFeed() |
| 129 | + openEventStream() |
| 130 | + } |
| 131 | + } |
| 132 | + |
| 133 | + override fun onStop(owner: LifecycleOwner) { |
| 134 | + closeEventStream() |
| 135 | + } |
| 136 | + |
| 137 | + // endregion |
| 138 | + |
| 139 | + // region Public API |
| 140 | + |
| 141 | + fun observeMessages(chatId: ChatId): Flow<List<ChatMessage>> { |
| 142 | + return messageDataSource.observeMessages(chatId) |
| 143 | + } |
| 144 | + |
| 145 | + fun observeTypingIndicators(chatId: ChatId): Flow<Set<ActiveTypist>> { |
| 146 | + return _state.map { it.typingIndicators[chatId] ?: emptySet() } |
| 147 | + } |
| 148 | + |
| 149 | + suspend fun loadMessages(chatId: ChatId, limit: Int = 100) { |
| 150 | + messagingController.getMessages(chatId) |
| 151 | + .onSuccess { messages -> |
| 152 | + messageDataSource.upsert(chatId, messages) |
| 153 | + } |
| 154 | + } |
| 155 | + |
| 156 | + suspend fun sendMessage(chatId: ChatId, content: List<MessageContent>): Result<ChatMessage> { |
| 157 | + val senderId = userManager.accountId |
| 158 | + ?: return Result.failure(IllegalStateException("Cannot send message without an account")) |
| 159 | + |
| 160 | + val (_, clientMessageId) = messageDataSource.insertPending( |
| 161 | + chatId = chatId, |
| 162 | + content = content, |
| 163 | + senderId = senderId, |
| 164 | + ) |
| 165 | + |
| 166 | + return messagingController.sendMessage(chatId, content, clientMessageId) |
| 167 | + .onSuccess { serverMessage -> |
| 168 | + messageDataSource.confirmPending(chatId, clientMessageId, serverMessage.messageId) |
| 169 | + } |
| 170 | + .onFailure { |
| 171 | + messageDataSource.failPending(chatId, clientMessageId) |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + suspend fun advanceReadPointer(chatId: ChatId, messageId: Long): Result<Unit> { |
| 176 | + return messagingController.advancePointer(chatId, PointerType.READ, messageId) |
| 177 | + } |
| 178 | + |
| 179 | + suspend fun notifyTyping(chatId: ChatId, typingState: TypingState): Result<Unit> { |
| 180 | + return messagingController.notifyIsTyping(chatId, typingState) |
| 181 | + } |
| 182 | + |
| 183 | + suspend fun reset() { |
| 184 | + closeEventStream() |
| 185 | + syncJob?.cancel() |
| 186 | + _state.value = ChatState() |
| 187 | + cluster.value = null |
| 188 | + metadataDataSource.clear() |
| 189 | + messageDataSource.clear() |
| 190 | + memberDataSource.clear() |
| 191 | + trace(tag = TAG, message = "reset complete", type = TraceType.Process) |
| 192 | + } |
| 193 | + |
| 194 | + // endregion |
| 195 | + |
| 196 | + // region Internal |
| 197 | + |
| 198 | + private suspend fun hydrateFromPersistence() { |
| 199 | + val entities = metadataDataSource.observeAll().firstOrNull() ?: return |
| 200 | + if (entities.isEmpty()) return |
| 201 | + |
| 202 | + val feed = entities.map { entity -> |
| 203 | + val members = memberDataSource.getMembersForChat(entity.chatIdHex) |
| 204 | + val lastMessage = entity.lastMessageId?.let { |
| 205 | + messageDataSource.getLatest(entity.chatIdHex) |
| 206 | + } |
| 207 | + metadataDataSource.toMetadata(entity, members, lastMessage) |
| 208 | + } |
| 209 | + |
| 210 | + _state.update { it.copy(feed = feed) } |
| 211 | + trace(tag = TAG, message = "Hydrated ${feed.size} chats from persistence", type = TraceType.Process) |
| 212 | + } |
| 213 | + |
| 214 | + private fun syncFeed() { |
| 215 | + syncJob?.cancel() |
| 216 | + syncJob = scope.launch { performFeedSync() } |
| 217 | + } |
| 218 | + |
| 219 | + private suspend fun performFeedSync() { |
| 220 | + _state.update { it.copy(feedSyncState = FeedSyncState.Syncing) } |
| 221 | + chatController.getDmChatFeed() |
| 222 | + .onSuccess { page -> |
| 223 | + metadataDataSource.upsert(page.chats) |
| 224 | + |
| 225 | + for (chat in page.chats) { |
| 226 | + memberDataSource.upsert(chat.chatId, chat.members) |
| 227 | + } |
| 228 | + |
| 229 | + _state.update { it.copy(feed = page.chats, feedSyncState = FeedSyncState.Synced) } |
| 230 | + trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process) |
| 231 | + } |
| 232 | + .onFailure { error -> |
| 233 | + _state.update { it.copy(feedSyncState = FeedSyncState.Error) } |
| 234 | + trace(tag = TAG, message = "Feed sync failed: ${error.message}", type = TraceType.Error) |
| 235 | + } |
| 236 | + } |
| 237 | + |
| 238 | + private fun openEventStream() { |
| 239 | + eventStreamingController.open(scope) |
| 240 | + eventStreamCollectJob?.cancel() |
| 241 | + eventStreamCollectJob = scope.launch { |
| 242 | + eventStreamingController.chatUpdates.collect { applyUpdate(it) } |
| 243 | + } |
| 244 | + } |
| 245 | + |
| 246 | + private fun closeEventStream() { |
| 247 | + eventStreamCollectJob?.cancel() |
| 248 | + eventStreamCollectJob = null |
| 249 | + eventStreamingController.close() |
| 250 | + } |
| 251 | + |
| 252 | + private suspend fun applyUpdate(update: ChatUpdate) { |
| 253 | + val chatId = update.chatId |
| 254 | + |
| 255 | + // New messages |
| 256 | + if (update.newMessages.isNotEmpty()) { |
| 257 | + messageDataSource.upsert(chatId, update.newMessages) |
| 258 | + |
| 259 | + val lastMsg = update.newMessages.maxByOrNull { it.messageId } |
| 260 | + if (lastMsg != null) { |
| 261 | + metadataDataSource.updateLastMessageId(chatId, lastMsg.messageId) |
| 262 | + metadataDataSource.updateLastActivity(chatId, lastMsg.timestamp.toEpochMilliseconds()) |
| 263 | + } |
| 264 | + } |
| 265 | + |
| 266 | + // Pointer updates |
| 267 | + for (pointer in update.pointerUpdates) { |
| 268 | + memberDataSource.updatePointers(chatId, pointer) |
| 269 | + } |
| 270 | + |
| 271 | + // Typing notifications (ephemeral, in-memory only) |
| 272 | + if (update.typingNotifications.isNotEmpty()) { |
| 273 | + _state.update { state -> |
| 274 | + val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() |
| 275 | + for (notification in update.typingNotifications) { |
| 276 | + applyTypingNotification(currentTypists, notification) |
| 277 | + } |
| 278 | + state.copy( |
| 279 | + typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet()) |
| 280 | + ) |
| 281 | + } |
| 282 | + } |
| 283 | + |
| 284 | + // Metadata updates |
| 285 | + for (metaUpdate in update.metadataUpdates) { |
| 286 | + when (metaUpdate) { |
| 287 | + is MetadataUpdate.FullRefresh -> { |
| 288 | + metadataDataSource.upsert(metaUpdate.metadata) |
| 289 | + memberDataSource.deleteForChat(metaUpdate.metadata.chatId) |
| 290 | + memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members) |
| 291 | + |
| 292 | + _state.update { state -> |
| 293 | + val updatedFeed = state.feed.map { |
| 294 | + if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it |
| 295 | + } |
| 296 | + state.copy(feed = updatedFeed) |
| 297 | + } |
| 298 | + } |
| 299 | + |
| 300 | + is MetadataUpdate.LastActivityChanged -> { |
| 301 | + metadataDataSource.updateLastActivity( |
| 302 | + chatId, |
| 303 | + metaUpdate.newLastActivity.toEpochMilliseconds(), |
| 304 | + ) |
| 305 | + } |
| 306 | + } |
| 307 | + } |
| 308 | + } |
| 309 | + |
| 310 | + private fun applyTypingNotification( |
| 311 | + typists: MutableSet<ActiveTypist>, |
| 312 | + notification: TypingNotification, |
| 313 | + ) { |
| 314 | + when (notification.state) { |
| 315 | + TypingState.STARTED_TYPING, TypingState.STILL_TYPING -> { |
| 316 | + typists.removeAll { it.userId == notification.userId } |
| 317 | + typists.add(ActiveTypist(userId = notification.userId, since = Clock.System.now())) |
| 318 | + } |
| 319 | + TypingState.STOPPED_TYPING, TypingState.TYPING_TIMED_OUT -> { |
| 320 | + typists.removeAll { it.userId == notification.userId } |
| 321 | + } |
| 322 | + TypingState.UNKNOWN -> Unit |
| 323 | + } |
| 324 | + } |
| 325 | + |
| 326 | + // endregion |
| 327 | +} |
0 commit comments