Skip to content

Commit 55f809e

Browse files
committed
refactor(chat): replace manual in-memory feed with DB-reactive flow
The chat feed is now driven by Room reactive queries instead of manual _state.update patches. ChatCoordinator combines metadata and member observeAll() flows to build the feed, so any DB write (from feed sync, event stream, push notification, or sendMessage) automatically updates the contact list previews. - Add observeAll() to ChatMemberDao/DataSource - Add exists() to ChatMetadataDataSource - Add observeFeedFromDb() replacing hydrateFromPersistence() - Store lastMessage during feed sync and FullRefresh updates - Remove manual feed patching from applyUpdate, sendMessage, advanceReadPointer - Update loadMessages to write metadata so previews refresh
1 parent c02adf7 commit 55f809e

4 files changed

Lines changed: 62 additions & 108 deletions

File tree

apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt

Lines changed: 50 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ import com.flipcash.app.persistence.sources.mediator.ChatMessageRemoteMediator
1919
import com.flipcash.app.persistence.sources.ChatMessageDataSource
2020
import com.flipcash.app.persistence.sources.ChatMetadataDataSource
2121
import com.flipcash.app.persistence.sources.ContactDataSource
22+
import com.flipcash.app.persistence.entities.ChatMetadataEntity
2223
import com.flipcash.services.controllers.ChatController
2324
import com.flipcash.services.controllers.ChatMessagingController
2425
import com.flipcash.services.controllers.EventStreamingController
2526
import com.flipcash.services.models.chat.ChatId
27+
import com.flipcash.services.models.chat.ChatMetadata
2628
import com.flipcash.services.models.chat.ChatMember
2729
import com.flipcash.services.models.chat.ChatMessage
2830
import com.flipcash.services.models.chat.MessagePointer
@@ -54,7 +56,6 @@ import kotlinx.coroutines.flow.debounce
5456
import kotlinx.coroutines.flow.distinctUntilChanged
5557
import kotlinx.coroutines.flow.filter
5658
import kotlinx.coroutines.flow.filterNotNull
57-
import kotlinx.coroutines.flow.firstOrNull
5859
import kotlinx.coroutines.flow.flatMapLatest
5960
import kotlinx.coroutines.flow.launchIn
6061
import kotlinx.coroutines.flow.map
@@ -96,6 +97,7 @@ class ChatCoordinator @Inject constructor(
9697
private var flagObserverJob: Job? = null
9798
private var eventStreamCollectJob: Job? = null
9899
private var eventStreamRetryJob: Job? = null
100+
private var feedObserverJob: Job? = null
99101
private var heartbeatJob: Job? = null
100102
private var retryAttempt = 0
101103
private var backgroundedActiveChat: ChatId? = null
@@ -135,7 +137,7 @@ class ChatCoordinator @Inject constructor(
135137
override suspend fun onUserLoggedIn(cluster: AccountCluster) {
136138
trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User)
137139
this.cluster.value = cluster
138-
hydrateFromPersistence()
140+
observeFeedFromDb()
139141
if (isChatEnabled()) {
140142
syncFeed()
141143
openEventStream()
@@ -241,6 +243,10 @@ class ChatCoordinator @Inject constructor(
241243
messagingController.getMessages(chatId)
242244
.onSuccess { messages ->
243245
messageDataSource.upsert(chatId, messages)
246+
247+
val latest = messages.maxByOrNull { it.messageId } ?: return@onSuccess
248+
metadataDataSource.updateLastMessageId(chatId, latest.messageId)
249+
metadataDataSource.updateLastActivity(chatId, latest.timestamp.toEpochMilliseconds())
244250
}
245251
}
246252

@@ -260,20 +266,9 @@ class ChatCoordinator @Inject constructor(
260266
messageDataSource.confirmPending(chatId, clientMessageId, serverMessage)
261267
advanceReadPointer(chatId, serverMessage.messageId)
262268

263-
// Update feed metadata so the contact list shows the latest message
269+
// Update feed metadata — reactive flow picks up the change
264270
metadataDataSource.updateLastMessageId(chatId, serverMessage.messageId)
265271
metadataDataSource.updateLastActivity(chatId, serverMessage.timestamp.toEpochMilliseconds())
266-
_state.update { state ->
267-
val updatedFeed = state.feed.map { meta ->
268-
if (meta.chatId == chatId) {
269-
meta.copy(
270-
lastMessage = serverMessage,
271-
lastActivity = serverMessage.timestamp,
272-
)
273-
} else meta
274-
}
275-
state.copy(feed = updatedFeed)
276-
}
277272
}
278273
.onFailure {
279274
messageDataSource.failPending(chatId, clientMessageId)
@@ -285,29 +280,14 @@ class ChatCoordinator @Inject constructor(
285280
IllegalStateException("No account")
286281
)
287282

288-
// Optimistically update local pointer so the feed unread count clears immediately
283+
// Update local pointer — reactive flow updates the feed's unread count
289284
val pointer = MessagePointer(
290285
type = PointerType.READ,
291286
userId = selfId,
292287
value = messageId,
293288
timestamp = Clock.System.now(),
294289
)
295290
memberDataSource.updatePointers(chatId, pointer)
296-
_state.update { state ->
297-
val updatedFeed = state.feed.map { meta ->
298-
if (meta.chatId == chatId) {
299-
meta.copy(members = meta.members.map { member ->
300-
if (member.userId == selfId) {
301-
val updated = member.pointers
302-
.filter { it.type != PointerType.READ }
303-
.plus(pointer)
304-
member.copy(pointers = updated)
305-
} else member
306-
})
307-
} else meta
308-
}
309-
state.copy(feed = updatedFeed)
310-
}
311291

312292
return messagingController.advancePointer(chatId, PointerType.READ, messageId)
313293
}
@@ -361,6 +341,8 @@ class ChatCoordinator @Inject constructor(
361341
closeEventStream()
362342
syncJob?.cancel()
363343
flagObserverJob?.cancel()
344+
feedObserverJob?.cancel()
345+
feedObserverJob = null
364346
_state.value = ChatState()
365347
cluster.value = null
366348
metadataDataSource.clear()
@@ -398,20 +380,29 @@ class ChatCoordinator @Inject constructor(
398380
.launchIn(scope)
399381
}
400382

401-
private suspend fun hydrateFromPersistence() {
402-
val entities = metadataDataSource.observeAll().firstOrNull() ?: return
403-
if (entities.isEmpty()) return
404-
405-
val feed = entities.map { entity ->
406-
val members = memberDataSource.getMembersForChat(entity.chatIdHex)
383+
private fun observeFeedFromDb() {
384+
feedObserverJob?.cancel()
385+
feedObserverJob = combine(
386+
metadataDataSource.observeAll(),
387+
memberDataSource.observeAll(),
388+
) { metadataEntities, membersByChat ->
389+
buildFeedFromDb(metadataEntities, membersByChat)
390+
}.onEach { feed ->
391+
_state.update { it.copy(feed = feed) }
392+
}.launchIn(scope)
393+
}
394+
395+
private suspend fun buildFeedFromDb(
396+
metadataEntities: List<ChatMetadataEntity>,
397+
membersByChat: Map<String, List<ChatMember>>,
398+
): List<ChatMetadata> {
399+
return metadataEntities.map { entity ->
400+
val members = membersByChat[entity.chatIdHex] ?: emptyList()
407401
val lastMessage = entity.lastMessageId?.let {
408402
messageDataSource.getLatest(entity.chatIdHex)
409403
}
410404
metadataDataSource.toMetadata(entity, members, lastMessage)
411405
}
412-
413-
_state.update { it.copy(feed = feed) }
414-
trace(tag = TAG, message = "Hydrated ${feed.size} chats from persistence", type = TraceType.Process)
415406
}
416407

417408
private fun syncFeed() {
@@ -427,9 +418,12 @@ class ChatCoordinator @Inject constructor(
427418

428419
for (chat in page.chats) {
429420
memberDataSource.upsert(chat.chatId, chat.members)
421+
chat.lastMessage?.let { msg ->
422+
messageDataSource.upsert(chat.chatId, listOf(msg))
423+
}
430424
}
431425

432-
_state.update { it.copy(feed = page.chats, feedSyncState = FeedSyncState.Synced) }
426+
_state.update { it.copy(feedSyncState = FeedSyncState.Synced) }
433427
trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process)
434428

435429
// Prefetch first page of messages for chats with no cached messages
@@ -534,6 +528,9 @@ class ChatCoordinator @Inject constructor(
534528
metadataDataSource.upsert(metaUpdate.metadata)
535529
memberDataSource.deleteForChat(metaUpdate.metadata.chatId)
536530
memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members)
531+
metaUpdate.metadata.lastMessage?.let { msg ->
532+
messageDataSource.upsert(metaUpdate.metadata.chatId, listOf(msg))
533+
}
537534
}
538535
is MetadataUpdate.LastActivityChanged -> {
539536
metadataDataSource.updateLastActivity(
@@ -544,81 +541,26 @@ class ChatCoordinator @Inject constructor(
544541
}
545542
}
546543

547-
// --- Single atomic state update for all in-memory changes ---
548-
549-
var needsFeedSync = false
550-
551-
_state.update { state ->
552-
var feed = state.feed
553-
var typingIndicators = state.typingIndicators
554-
555-
// New messages → update feed last message
556-
if (lastMsg != null) {
557-
val exists = feed.any { it.chatId == chatId }
558-
feed = if (exists) {
559-
feed.map { meta ->
560-
if (meta.chatId == chatId) {
561-
meta.copy(
562-
lastMessage = lastMsg,
563-
lastActivity = lastMsg.timestamp,
564-
)
565-
} else meta
566-
}
567-
} else {
568-
needsFeedSync = true
569-
feed
570-
}
571-
}
544+
// --- Check if unknown chat requires a full feed sync ---
572545

573-
// Pointer updates → merge into member pointers
574-
if (update.pointerUpdates.isNotEmpty()) {
575-
feed = feed.map { meta ->
576-
if (meta.chatId == chatId) {
577-
meta.copy(members = meta.members.map { member ->
578-
val memberPointerUpdates = update.pointerUpdates
579-
.filter { it.userId == member.userId }
580-
if (memberPointerUpdates.isNotEmpty()) {
581-
val updated = member.pointers.toMutableList()
582-
for (p in memberPointerUpdates) {
583-
updated.removeAll { it.type == p.type }
584-
updated.add(p)
585-
}
586-
member.copy(pointers = updated)
587-
} else member
588-
})
589-
} else meta
590-
}
546+
if (lastMsg != null) {
547+
if (!metadataDataSource.exists(chatId)) {
548+
syncFeed()
591549
}
550+
}
592551

593-
// Typing notifications (ephemeral)
594-
if (update.typingNotifications.isNotEmpty()) {
595-
val currentTypists = typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf()
552+
// --- Update ephemeral state (typing indicators are not DB-backed) ---
553+
554+
if (update.typingNotifications.isNotEmpty()) {
555+
_state.update { state ->
556+
val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf()
596557
for (notification in update.typingNotifications) {
597558
applyTypingNotification(currentTypists, notification)
598559
}
599-
typingIndicators = typingIndicators + (chatId to currentTypists.toSet())
600-
}
601-
602-
// Metadata full refreshes
603-
for (metaUpdate in update.metadataUpdates) {
604-
if (metaUpdate is MetadataUpdate.FullRefresh) {
605-
val exists = feed.any { it.chatId == metaUpdate.metadata.chatId }
606-
feed = if (exists) {
607-
feed.map {
608-
if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it
609-
}
610-
} else {
611-
feed + metaUpdate.metadata
612-
}
613-
}
560+
state.copy(
561+
typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet())
562+
)
614563
}
615-
616-
state.copy(feed = feed, typingIndicators = typingIndicators)
617-
}
618-
619-
// Side effects after state update
620-
if (needsFeedSync) {
621-
syncFeed()
622564
}
623565
}
624566

apps/flipcash/shared/persistence/db/src/main/kotlin/com/flipcash/app/persistence/dao/ChatMemberDao.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ interface ChatMemberDao {
1616
@Query("SELECT * FROM chat_members WHERE chat_id_hex = :chatIdHex")
1717
fun observeMembersForChat(chatIdHex: String): Flow<List<ChatMemberEntity>>
1818

19+
@Query("SELECT * FROM chat_members")
20+
fun observeAll(): Flow<List<ChatMemberEntity>>
21+
1922
@Insert(onConflict = OnConflictStrategy.REPLACE)
2023
suspend fun upsert(entity: ChatMemberEntity)
2124

apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMemberDataSource.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ class ChatMemberDataSource @Inject constructor(
2424
entities.map { mapper.toMember(it) }
2525
} ?: emptyFlow()
2626

27+
fun observeAll(): Flow<Map<String, List<ChatMember>>> =
28+
db?.chatMemberDao()?.observeAll()?.map { entities ->
29+
entities.groupBy { it.chatIdHex }
30+
.mapValues { (_, members) -> members.map { mapper.toMember(it) } }
31+
} ?: emptyFlow()
32+
2733
suspend fun getMembersForChat(chatId: ChatId): List<ChatMember> =
2834
getMembersForChat(mapper.chatIdHex(chatId))
2935

apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMetadataDataSource.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ class ChatMetadataDataSource @Inject constructor(
4040
db?.chatMetadataDao()?.updateLastMessageId(mapper.chatIdHex(chatId), messageId)
4141
}
4242

43+
suspend fun exists(chatId: ChatId): Boolean =
44+
db?.chatMetadataDao()?.getById(mapper.chatIdHex(chatId)) != null
45+
4346
fun toMetadata(
4447
entity: ChatMetadataEntity,
4548
members: List<ChatMember>,

0 commit comments

Comments
 (0)