Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import com.flipcash.services.controllers.ChatController
import com.flipcash.services.controllers.ChatMessagingController
import com.flipcash.services.controllers.EventStreamingController
import com.flipcash.services.models.chat.ChatId
import com.flipcash.services.models.chat.ChatMember
import com.flipcash.services.models.chat.ChatMessage
import com.flipcash.services.models.chat.MessagePointer
import com.flipcash.services.models.chat.ChatUpdate
Expand Down Expand Up @@ -59,6 +58,7 @@ import kotlinx.coroutines.launch
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Clock
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

@Singleton
Expand All @@ -78,6 +78,8 @@ class ChatCoordinator @Inject constructor(
companion object {
private const val TAG = "ChatCoordinator"
private val HEARTBEAT_INTERVAL = 30.seconds
private const val RETRY_BASE_MS = 2_000L
private const val RETRY_MAX_MS = 60_000L
}

private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
Expand All @@ -87,6 +89,7 @@ class ChatCoordinator @Inject constructor(
private var eventStreamCollectJob: Job? = null
private var eventStreamRetryJob: Job? = null
private var heartbeatJob: Job? = null
private var retryAttempt = 0

val state: StateFlow<ChatState>
get() = _state.asStateFlow()
Expand Down Expand Up @@ -339,21 +342,34 @@ class ChatCoordinator @Inject constructor(
}
}

private fun openEventStream() {
private fun openEventStream(force: Boolean = false) {
if (!force && eventStreamingController.isConnected) {
trace(tag = TAG, message = "Event stream already connected, skipping open", type = TraceType.Process)
ensureCollector()
return
}

eventStreamRetryJob?.cancel()
val opened = eventStreamingController.open(scope) {
// Stream died after exhausting retries — schedule a re-open
// Stream died — schedule a re-open with exponential backoff
val attempt = retryAttempt++
val delayMs = (RETRY_BASE_MS * (1L shl attempt.coerceAtMost(5)))
.coerceAtMost(RETRY_MAX_MS)
trace(tag = TAG, message = "Event stream error, retry #${attempt + 1} in ${delayMs}ms", type = TraceType.Process)
eventStreamRetryJob = scope.launch {
delay(5_000)
trace(tag = TAG, message = "Retrying event stream after failure", type = TraceType.Process)
openEventStream()
delay(delayMs.milliseconds)
openEventStream(force = true)
}
}
if (!opened) {
if (opened) {
retryAttempt = 0
} else {
trace(tag = TAG, message = "Event stream failed to open", type = TraceType.Error)
}
// Always ensure a collector is running so events are processed
// as soon as the stream (re)connects.
ensureCollector()
}

private fun ensureCollector() {
if (eventStreamCollectJob?.isActive != true) {
eventStreamCollectJob = scope.launch {
eventStreamingController.chatUpdates.collect { applyUpdate(it) }
Expand All @@ -374,10 +390,9 @@ class ChatCoordinator @Inject constructor(
heartbeatJob = scope.launch {
while (true) {
delay(HEARTBEAT_INTERVAL)
trace(tag = TAG, message = "Heartbeat: syncing feed", type = TraceType.Process)
syncFeed()
if (!eventStreamingController.isConnected) {
trace(tag = TAG, message = "Heartbeat: event stream dead, reconnecting", type = TraceType.Process)
trace(tag = TAG, message = "Heartbeat: event stream dead, syncing feed and reconnecting", type = TraceType.Process)
syncFeed()
openEventStream()
}
}
Expand All @@ -397,45 +412,66 @@ class ChatCoordinator @Inject constructor(
type = TraceType.Process,
)

// New messages
if (update.newMessages.isNotEmpty()) {
// --- Persist to DB first (suspend, off main thread) ---

val lastMsg = if (update.newMessages.isNotEmpty()) {
trace(tag = TAG, message = "Upserting ${update.newMessages.size} new messages for $chatId", type = TraceType.Process)
messageDataSource.upsert(chatId, update.newMessages)
update.newMessages.maxByOrNull { it.messageId }?.also { msg ->
metadataDataSource.updateLastMessageId(chatId, msg.messageId)
metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds())
}
} else null

val lastMsg = update.newMessages.maxByOrNull { it.messageId }
if (lastMsg != null) {
metadataDataSource.updateLastMessageId(chatId, lastMsg.messageId)
metadataDataSource.updateLastActivity(chatId, lastMsg.timestamp.toEpochMilliseconds())
for (pointer in update.pointerUpdates) {
memberDataSource.updatePointers(chatId, pointer)
}

_state.update { state ->
val exists = state.feed.any { it.chatId == chatId }
val updatedFeed = if (exists) {
state.feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(
lastMessage = lastMsg,
lastActivity = lastMsg.timestamp,
)
} else meta
}
} else {
// New chat not yet in feed — trigger a sync to pick up full metadata
syncFeed()
state.feed
}
state.copy(feed = updatedFeed)
for (metaUpdate in update.metadataUpdates) {
when (metaUpdate) {
is MetadataUpdate.FullRefresh -> {
metadataDataSource.upsert(metaUpdate.metadata)
memberDataSource.deleteForChat(metaUpdate.metadata.chatId)
memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members)
}
is MetadataUpdate.LastActivityChanged -> {
metadataDataSource.updateLastActivity(
chatId,
metaUpdate.newLastActivity.toEpochMilliseconds(),
)
}
}
}

// Pointer updates
for (pointer in update.pointerUpdates) {
memberDataSource.updatePointers(chatId, pointer)
}
// --- Single atomic state update for all in-memory changes ---

var needsFeedSync = false

if (update.pointerUpdates.isNotEmpty()) {
_state.update { state ->
val updatedFeed = state.feed.map { meta ->
_state.update { state ->
var feed = state.feed
var typingIndicators = state.typingIndicators

// New messages → update feed last message
if (lastMsg != null) {
val exists = feed.any { it.chatId == chatId }
feed = if (exists) {
feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(
lastMessage = lastMsg,
lastActivity = lastMsg.timestamp,
)
} else meta
}
} else {
needsFeedSync = true
feed
}
}

// Pointer updates → merge into member pointers
if (update.pointerUpdates.isNotEmpty()) {
feed = feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(members = meta.members.map { member ->
val memberPointerUpdates = update.pointerUpdates
Expand All @@ -451,51 +487,37 @@ class ChatCoordinator @Inject constructor(
})
} else meta
}
state.copy(feed = updatedFeed)
}
}

// Typing notifications (ephemeral, in-memory only)
if (update.typingNotifications.isNotEmpty()) {
_state.update { state ->
val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf()
// Typing notifications (ephemeral)
if (update.typingNotifications.isNotEmpty()) {
val currentTypists = typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf()
for (notification in update.typingNotifications) {
applyTypingNotification(currentTypists, notification)
}
state.copy(
typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet())
)
typingIndicators = typingIndicators + (chatId to currentTypists.toSet())
}
}

// Metadata updates
for (metaUpdate in update.metadataUpdates) {
when (metaUpdate) {
is MetadataUpdate.FullRefresh -> {
metadataDataSource.upsert(metaUpdate.metadata)
memberDataSource.deleteForChat(metaUpdate.metadata.chatId)
memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members)

_state.update { state ->
val exists = state.feed.any { it.chatId == metaUpdate.metadata.chatId }
val updatedFeed = if (exists) {
state.feed.map {
if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it
}
} else {
state.feed + metaUpdate.metadata
// Metadata full refreshes
for (metaUpdate in update.metadataUpdates) {
if (metaUpdate is MetadataUpdate.FullRefresh) {
val exists = feed.any { it.chatId == metaUpdate.metadata.chatId }
feed = if (exists) {
feed.map {
if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it
}
state.copy(feed = updatedFeed)
} else {
feed + metaUpdate.metadata
}
}

is MetadataUpdate.LastActivityChanged -> {
metadataDataSource.updateLastActivity(
chatId,
metaUpdate.newLastActivity.toEpochMilliseconds(),
)
}
}

state.copy(feed = feed, typingIndicators = typingIndicators)
}

// Side effects after state update
if (needsFeedSync) {
syncFeed()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import com.flipcash.services.repository.EventStreamingRepository
import com.flipcash.services.user.UserManager
import com.getcode.utils.trace
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import javax.inject.Inject
import javax.inject.Singleton

Expand All @@ -17,12 +17,8 @@ class EventStreamingController @Inject constructor(
private val repository: EventStreamingRepository,
private val userManager: UserManager,
) {
private val _chatUpdates = MutableSharedFlow<ChatUpdate>(
replay = 0,
extraBufferCapacity = 64,
onBufferOverflow = kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST,
)
val chatUpdates: SharedFlow<ChatUpdate> = _chatUpdates.asSharedFlow()
private val _chatUpdates = Channel<ChatUpdate>(capacity = Channel.UNLIMITED)
val chatUpdates: Flow<ChatUpdate> = _chatUpdates.receiveAsFlow()

private var streamRef: EventStreamReference? = null

Expand All @@ -44,7 +40,7 @@ class EventStreamingController @Inject constructor(
owner = owner,
onEvent = { update ->
trace("EventStreamingController: Received chat update, messages=${update.newMessages.size}")
_chatUpdates.tryEmit(update)
_chatUpdates.trySend(update)
},
onError = { error ->
trace("EventStreamingController: Stream error: ${error.message}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class EventStreamingControllerTest {
}

@Test
fun `chatUpdates SharedFlow is accessible`() {
fun `chatUpdates flow is accessible`() {
assertNotNull(controller.chatUpdates)
}
}
Expand Down
Loading