Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.getstream.chat.android.client.internal.state.plugin.state.channel.inte
import io.getstream.chat.android.client.internal.state.plugin.state.channel.thread.internal.ThreadMutableState
import io.getstream.chat.android.client.internal.state.plugin.state.querychannels.internal.QueryChannelsMutableState
import io.getstream.chat.android.client.internal.state.plugin.state.querythreads.internal.QueryThreadsMutableState
import io.getstream.chat.android.client.utils.internal.ChannelId
import io.getstream.chat.android.core.internal.InternalStreamChatApi
import io.getstream.chat.android.models.Channel
import io.getstream.chat.android.models.FilterObject
Expand Down Expand Up @@ -57,7 +58,7 @@ import java.util.concurrent.ConcurrentHashMap
* @param mutedUsers The current list of muted users.
* @param useLegacyChannelState Whether to use the legacy channel state implementation.
*/
@Suppress("LongParameterList")
@Suppress("LongParameterList", "TooManyFunctions")
public class StateRegistry @JvmOverloads constructor(
private val userStateFlow: StateFlow<User?>,
private var latestUsers: StateFlow<Map<String, User>>,
Expand All @@ -74,8 +75,8 @@ public class StateRegistry @JvmOverloads constructor(

private val queryChannels: ConcurrentHashMap<QueryChannelsIdentifier, QueryChannelsMutableState> =
ConcurrentHashMap()
private val legacyChannels: ConcurrentHashMap<Pair<String, String>, ChannelStateLegacyImpl> = ConcurrentHashMap()
private val channels: ConcurrentHashMap<Pair<String, String>, ChannelStateImpl> = ConcurrentHashMap()
private val legacyChannels: ConcurrentHashMap<ChannelId, ChannelStateLegacyImpl> = ConcurrentHashMap()
private val channels: ConcurrentHashMap<ChannelId, ChannelStateImpl> = ConcurrentHashMap()
private val queryThreads: ConcurrentHashMap<Pair<FilterObject?, QuerySorter<Thread>>, QueryThreadsMutableState> =
ConcurrentHashMap()
private val threads: ConcurrentHashMap<String, ThreadMutableState> = ConcurrentHashMap()
Expand Down Expand Up @@ -120,78 +121,100 @@ public class StateRegistry @JvmOverloads constructor(
}

/**
* Returns [ChannelState] that represents a state of particular channel.
* Returns the [ChannelState] for the given channel.
*
* @param channelType The channel type. ie messaging.
* @param channelId The channel id. ie 123.
*
* @return [ChannelState] object.
*/
public fun channel(channelType: String, channelId: String): ChannelState = if (useLegacyChannelState) {
legacyChannelState(channelType, channelId)
} else {
channelState(channelType, channelId)
}

/**
* Returns [ChannelStateLegacyImpl] that represents a state of particular channel.
* A malformed cid yields a fresh, non-cached state so callers still get a non-null object, but
* the registry won't track it and the state will never receive updates.
*
* @param channelType The channel type. ie messaging.
* @param channelId The channel id. ie 123.
*
* @return [ChannelState] object.
*/
internal fun legacyChannelState(channelType: String, channelId: String): ChannelStateLegacyImpl {
return legacyChannels.getOrPut(channelType to channelId) {
val baseMessageLimit = messageLimitConfig.channelMessageLimits
.find { it.channelType == channelType }
?.baseLimit
ChannelStateLegacyImpl(
channelType = channelType,
channelId = channelId,
userFlow = userStateFlow,
latestUsers = latestUsers,
activeLiveLocations = activeLiveLocations,
baseMessageLimit = baseMessageLimit,
now = now,
)
public fun channel(channelType: String, channelId: String): ChannelState {
val id = ChannelId.fromTypeAndId(channelType, channelId)
if (id == null) {
logger.w { "[channel] rejected malformed cid: $channelType:$channelId" }
return newChannelState(channelType, channelId)
}
return channel(id)
}

internal fun channelState(channelType: String, channelId: String): ChannelStateImpl {
val baseMessageLimit = messageLimitConfig.channelMessageLimits
.find { it.channelType == channelType }
?.baseLimit
return channels.getOrPut(channelType to channelId) {
ChannelStateImpl(
channelType = channelType,
channelId = channelId,
currentUser = userStateFlow,
latestUsers = latestUsers,
mutedUsers = mutedUsers,
liveLocations = activeLiveLocations,
messageLimit = baseMessageLimit,
)
/** Returns the cached [ChannelState] for an already-validated [ChannelId]. */
internal fun channel(channelId: ChannelId): ChannelState =
if (useLegacyChannelState) legacyChannelState(channelId) else channelState(channelId)

internal fun legacyChannelState(channelId: ChannelId): ChannelStateLegacyImpl =
legacyChannels.getOrPut(channelId) {
buildLegacyChannelState(channelId.type, channelId.id)
}

internal fun legacyChannelState(channelType: String, channelId: String): ChannelStateLegacyImpl =
ChannelId.fromTypeAndId(channelType, channelId)
?.let(::legacyChannelState)
?: buildLegacyChannelState(channelType, channelId)

internal fun channelState(channelId: ChannelId): ChannelStateImpl =
channels.getOrPut(channelId) {
buildChannelState(channelId.type, channelId.id)
}
}

internal fun channelState(channelType: String, channelId: String): ChannelStateImpl =
ChannelId.fromTypeAndId(channelType, channelId)
?.let(::channelState)
?: buildChannelState(channelType, channelId)

/**
* Checks if the channel is already present in the state.
* Should be used to prevent creating [ChannelState] objects without populated data.
*
* @param channelType The channel type. ie messaging.
* @param channelId The channel id. ie 123.
*
* @return true if the channel is active.
*/
internal fun isActiveChannel(channelType: String, channelId: String): Boolean {
internal fun isActiveChannel(channelId: ChannelId): Boolean {
return if (useLegacyChannelState) {
legacyChannels.containsKey(channelType to channelId)
legacyChannels.containsKey(channelId)
} else {
channels.containsKey(channelType to channelId)
channels.containsKey(channelId)
}
}

private fun newChannelState(channelType: String, channelId: String): ChannelState =
if (useLegacyChannelState) {
buildLegacyChannelState(channelType, channelId)
} else {
buildChannelState(channelType, channelId)
}

private fun buildLegacyChannelState(channelType: String, channelId: String): ChannelStateLegacyImpl {
val baseMessageLimit = messageLimitConfig.channelMessageLimits
.find { it.channelType == channelType }
?.baseLimit
return ChannelStateLegacyImpl(
channelType = channelType,
channelId = channelId,
userFlow = userStateFlow,
latestUsers = latestUsers,
activeLiveLocations = activeLiveLocations,
baseMessageLimit = baseMessageLimit,
now = now,
)
}

private fun buildChannelState(channelType: String, channelId: String): ChannelStateImpl {
val baseMessageLimit = messageLimitConfig.channelMessageLimits
.find { it.channelType == channelType }
?.baseLimit
return ChannelStateImpl(
channelType = channelType,
channelId = channelId,
currentUser = userStateFlow,
latestUsers = latestUsers,
mutedUsers = mutedUsers,
liveLocations = activeLiveLocations,
messageLimit = baseMessageLimit,
)
}

/**
* Returns a [QueryThreadsState] holding the current state of the threads data.
*/
Expand Down Expand Up @@ -227,13 +250,8 @@ public class StateRegistry @JvmOverloads constructor(
ThreadMutableState(messageId, scope)
}

internal fun getActiveChannelStates(): List<ChannelState> {
return if (useLegacyChannelState) {
legacyChannels.values.toList()
} else {
channels.values.toList()
}
}
internal fun getActiveChannelStates(): Map<ChannelId, ChannelState> =
if (useLegacyChannelState) legacyChannels.toMap() else channels.toMap()

/**
* Clear state of all state objects.
Expand Down Expand Up @@ -269,10 +287,11 @@ public class StateRegistry @JvmOverloads constructor(
}

private fun removeChanel(channelType: String, channelId: String) {
val id = ChannelId.fromTypeAndId(channelType, channelId) ?: return
val removed = if (useLegacyChannelState) {
legacyChannels.remove(channelType to channelId)?.destroy()
legacyChannels.remove(id)?.destroy()
} else {
channels.remove(channelType to channelId)?.destroy()
channels.remove(id)?.destroy()
}
logger.i { "[removeChanel] removed channel($channelType, $channelId): $removed" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,11 @@ internal class EventHandlerSequential(
sortedEvents.find { it is UserPresenceChangedEvent }?.let { userPresenceChanged ->
val event = userPresenceChanged as UserPresenceChangedEvent

stateRegistry.getActiveChannelStates()
.filter { channelState -> channelState.members.containsWithUserId(event.user.id) }
.forEach { channelState ->
val channelLogic: ChannelLogic = logicRegistry.channel(
channelType = channelState.channelType,
channelId = channelState.channelId,
)
channelLogic.handleEvent(userPresenceChanged)
stateRegistry.getActiveChannelStates().forEach { (id, state) ->
if (state.members.containsWithUserId(event.user.id)) {
logicRegistry.channel(id).handleEvent(userPresenceChanged)
}
}
}

// Handle `user.messages.deleted` event
Expand Down
Loading
Loading