Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ecb02a9
TaggedReference.kt added
dconeybe May 29, 2026
deb1e18
QuerySubscriptionImpl.kt: refactor to improve readability
dconeybe May 29, 2026
aeff9f2
RealtimeQueryManager.kt: return the SqliteSequenceNumber of the cache…
dconeybe May 29, 2026
a97a558
DataConnectBidiConnectStream.kt: remove unused (and misleading) compo…
dconeybe May 29, 2026
c6ea47f
arbs.kt: added fun DataConnectArb.sqliteSequenceNumber()
dconeybe May 29, 2026
313ade7
DataConnectGrpcRPCs.kt: return the SqliteSequenceNumber of data read …
dconeybe May 29, 2026
8c93930
DataConnectGrpcClient.kt: return the SqliteSequenceNumber with results
dconeybe May 29, 2026
b062eed
RegisteredDataDeserialzer.kt: return the SqliteSequenceNumber of cach…
dconeybe May 29, 2026
08bd362
LiveQuery.kt: return the SqliteSequenceNumber of cached data
dconeybe May 29, 2026
317ff3c
QueryManager.kt: return the SqliteSequenceNumber of cached data
dconeybe May 29, 2026
102027a
DataConnectGrpcRPCs.kt: minor tweak based on gemini code review
dconeybe May 29, 2026
1d9a46e
arbs.kt: fix DataConnectArb.dataSource() to generate null sequence nu…
dconeybe May 29, 2026
db8e810
Empty commit to suppress github actions [skip actions]
dconeybe May 29, 2026
a04da8f
Empty commit to trigger github actions
dconeybe May 29, 2026
7f1c1c3
Merge branch 'main' into dconeybe/dataconnect/RealtimeCacheUpdate3
dconeybe Jun 1, 2026
5a8457b
QuerySubscriptionIntegrationTest.kt: fix test: collect_gets_notified_…
dconeybe Jun 1, 2026
8fc1f63
QuerySubscriptionIntegrationTest.kt: fix test: collect_gets_an_update…
dconeybe Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package com.google.firebase.dataconnect
import app.cash.turbine.ReceiveTurbine
import app.cash.turbine.test
import app.cash.turbine.turbineScope
import com.google.firebase.dataconnect.QuerySubscriptionIntegrationTest.Companion.awaitItemWithName
import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase
import com.google.firebase.dataconnect.testutil.SuspendingFlag
import com.google.firebase.dataconnect.testutil.registerDataConnectKotestPrinters
import com.google.firebase.dataconnect.testutil.schemas.PersonSchema
import com.google.firebase.dataconnect.testutil.schemas.PersonSchema.GetPersonQuery
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector
import io.kotest.assertions.assertSoftly
import io.kotest.assertions.withClue
import io.kotest.matchers.ints.shouldBeInRange
Expand Down Expand Up @@ -57,12 +60,18 @@ import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.serializer
import org.junit.AssumptionViolatedException
import org.junit.Before
import org.junit.Test

class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {

private val schema by lazy { PersonSchema(dataConnectFactory) }

@Before
fun registerPrinters() {
registerDataConnectKotestPrinters()
}

@Test
fun reload_should_notify_collecting_flows() = runTest {
val personId = Arb.alphanumericString(prefix = "personId").next()
Expand Down Expand Up @@ -291,18 +300,19 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {

@Test
fun collect_gets_an_update_on_error() = runTest {
val personId = Arb.alphanumericString(prefix = "personId").next()
schema.createPerson(id = personId, name = "Name1").execute()
val dataConnect = dataConnectFactory.newInstance(RealtimeConnector.config)
val connector = RealtimeConnector.getInstance(dataConnect)

val noName2Query =
schema.getPerson(personId).withDataDeserializer(serializer<GetPersonDataNoName2>())
val key = connector.insertString(name = "Name1")
val baseQuery = connector.getStringByKey.queryRef(key)
val noName2Query = baseQuery.withDataDeserializer(serializer<GetItemDataNoName2>())
val querySubscription = noName2Query.subscribe()

turbineScope {
val flow = querySubscription.flow.distinctUntilChanged().testIn(backgroundScope)
withClue("result1") { flow.awaitPersonWithName("Name1") }
withClue("result1") { flow.awaitItemWithName("Name1") }

schema.updatePerson(id = personId, name = "Name2").execute()
connector.updateString(key, name = "Name2")
val execute2Result = runCatching { noName2Query.execute() }
withClue("execute2Result") {
withClue("execute2Result.getOrNull()") { execute2Result.getOrNull().shouldBeNull() }
Expand All @@ -314,21 +324,22 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {
withClue("result2.isFailure") { result2.isFailure shouldBe true }
}

schema.updatePerson(id = personId, name = "Name3").execute()
connector.updateString(key, name = "Name3")
noName2Query.execute()
withClue("result3") { flow.awaitPersonWithName("Name3") }
withClue("result3") { flow.awaitItemWithName("Name3") }
}
}

@Test
fun collect_gets_notified_of_per_data_deserializer_successes() = runTest {
val personId = Arb.alphanumericString(prefix = "personId").next()
schema.createPerson(id = personId, name = "Name0").execute()
val dataConnect = dataConnectFactory.newInstance(RealtimeConnector.config)
val connector = RealtimeConnector.getInstance(dataConnect)

val noName1Query =
schema.getPerson(personId).withDataDeserializer(serializer<GetPersonDataNoName1>())
val noName2Query =
schema.getPerson(personId).withDataDeserializer(serializer<GetPersonDataNoName2>())
val key = connector.insertString(name = "Name0")

val baseQuery = connector.getStringByKey.queryRef(key)
val noName1Query = baseQuery.withDataDeserializer(serializer<GetItemDataNoName1>())
val noName2Query = baseQuery.withDataDeserializer(serializer<GetItemDataNoName2>())

turbineScope {
val noName1Flow =
Expand All @@ -343,30 +354,27 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {
.flow
.distinctUntilChanged(::areEquivalentQuerySubscriptionResults)
.testIn(backgroundScope)
withClue("noName1Flow-0") { noName1Flow.awaitPersonWithName("Name0") }
withClue("noName2Flow-0") { noName2Flow.awaitPersonWithName("Name0") }
withClue("noName1Flow-0") { noName1Flow.awaitItemWithName("Name0") }
withClue("noName2Flow-0") { noName2Flow.awaitItemWithName("Name0") }

schema.updatePerson(id = personId, name = "Name1").execute()
schema.getPerson(personId).execute()
connector.updateString(key, name = "Name1")

withClue("noName1Flow-1") {
noName1Flow.awaitItem().result.exceptionOrNull().shouldNotBeNull()
}
withClue("noName2Flow-1") { noName2Flow.awaitPersonWithName("Name1") }
withClue("noName2Flow-1") { noName2Flow.awaitItemWithName("Name1") }

schema.updatePerson(id = personId, name = "Name2").execute()
schema.getPerson(personId).execute()
connector.updateString(key, name = "Name2")

withClue("noName1Flow-2") { noName1Flow.awaitPersonWithName("Name2") }
withClue("noName1Flow-2") { noName1Flow.awaitItemWithName("Name2") }
withClue("noName2Flow-2") {
noName2Flow.awaitItem().result.exceptionOrNull().shouldNotBeNull()
}

schema.updatePerson(id = personId, name = "Name3").execute()
schema.getPerson(personId).execute()
connector.updateString(key, name = "Name3")

withClue("noName1Flow-3") { noName1Flow.awaitPersonWithName("Name3") }
withClue("noName2Flow-3") { noName2Flow.awaitPersonWithName("Name3") }
withClue("noName1Flow-3") { noName1Flow.awaitItemWithName("Name3") }
withClue("noName2Flow-3") { noName2Flow.awaitItemWithName("Name3") }
}
}

Expand Down Expand Up @@ -463,6 +471,32 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {
}
}

/**
* A "data" type suitable for [RealtimeConnector.GetStringByKeyQuery] whose deserialization fails
* if the name happens to be "Name1". This behavior is useful when testing the behavior when one
* deserializer successfully decodes the data but another one does not. See [GetItemDataNoName2].
*/
@Serializable
private data class GetItemDataNoName1(val item: Item?) {
@Serializable
data class Item(@Serializable(with = NameKSerializer::class) val name: String) {
private object NameKSerializer : RejectSpecificNameKSerializer("Name1")
}
}

/**
* A "data" type suitable for [RealtimeConnector.GetStringByKeyQuery] whose deserialization fails
* if the name happens to be "Name1". This behavior is useful when testing the behavior when one
* deserializer successfully decodes the data but another one does not. See [GetItemDataNoName1].
*/
@Serializable
private data class GetItemDataNoName2(val item: Item?) {
@Serializable
data class Item(@Serializable(with = NameKSerializer::class) val name: String) {
private object NameKSerializer : RejectSpecificNameKSerializer("Name2")
}
}

/**
* Starts a background coroutine that subscribes to and collects the given query with the given
* variables. Suspends until the first result has been collected. This effectively ensures that
Expand All @@ -476,19 +510,15 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {

private companion object {
@JvmName("awaitPersonWithNameGetPersonQueryData")
suspend fun ReceiveTurbine<
QuerySubscriptionResult<GetPersonQuery.Data, GetPersonQuery.Variables>
>
.awaitPersonWithName(
name: String
): QuerySubscriptionResult<GetPersonQuery.Data, GetPersonQuery.Variables> {
suspend fun <Variables> ReceiveTurbine<QuerySubscriptionResult<GetPersonQuery.Data, Variables>>
.awaitPersonWithName(name: String): QuerySubscriptionResult<GetPersonQuery.Data, Variables> {
val item = awaitItem()
item.shouldHavePersonWithName(name)
return item
}

@JvmName("shouldHavePersonWithNameGetPersonQueryData")
fun QuerySubscriptionResult<GetPersonQuery.Data, GetPersonQuery.Variables>
fun <Variables> QuerySubscriptionResult<GetPersonQuery.Data, Variables>
.shouldHavePersonWithName(name: String) {
withClue("result.exceptionOrNull()") { result.exceptionOrNull().shouldBeNull() }
val data = withClue("result.getOrThrow()") { result.getOrThrow().data }
Expand All @@ -497,47 +527,75 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {
}

@JvmName("awaitPersonWithNameGetPersonDataNoName1")
suspend fun ReceiveTurbine<
QuerySubscriptionResult<GetPersonDataNoName1, GetPersonQuery.Variables>
>
.awaitPersonWithName(
name: String
): QuerySubscriptionResult<GetPersonDataNoName1, GetPersonQuery.Variables> {
suspend fun <Variables> ReceiveTurbine<QuerySubscriptionResult<GetPersonDataNoName1, Variables>>
.awaitPersonWithName(name: String): QuerySubscriptionResult<GetPersonDataNoName1, Variables> {
val item = awaitItem()
item.shouldHavePersonWithName(name)
return item
}

@JvmName("awaitItemWithNameGetItemDataNoName1")
suspend fun <Variables> ReceiveTurbine<QuerySubscriptionResult<GetItemDataNoName1, Variables>>
.awaitItemWithName(name: String): QuerySubscriptionResult<GetItemDataNoName1, Variables> {
val item = awaitItem()
item.shouldHaveItemWithName(name)
return item
}

@JvmName("shouldHavePersonWithNameGetPersonDataNoName1")
fun QuerySubscriptionResult<GetPersonDataNoName1, GetPersonQuery.Variables>
fun <Variables> QuerySubscriptionResult<GetPersonDataNoName1, Variables>
.shouldHavePersonWithName(name: String) {
withClue("result.exceptionOrNull()") { result.exceptionOrNull().shouldBeNull() }
val data = withClue("result.getOrThrow()") { result.getOrThrow().data }
val person = withClue("data.person") { data.person.shouldNotBeNull() }
withClue("person.name") { person.name shouldBe name }
}

@JvmName("awaitPersonWithNameGetPersonDataNoName2")
suspend fun ReceiveTurbine<
QuerySubscriptionResult<GetPersonDataNoName2, GetPersonQuery.Variables>
>
.awaitPersonWithName(
@JvmName("shouldHaveItemWithNameGetItemDataNoName1")
fun <Variables> QuerySubscriptionResult<GetItemDataNoName1, Variables>.shouldHaveItemWithName(
name: String
): QuerySubscriptionResult<GetPersonDataNoName2, GetPersonQuery.Variables> {
) {
withClue("result.exceptionOrNull()") { result.exceptionOrNull().shouldBeNull() }
val data = withClue("result.getOrThrow()") { result.getOrThrow().data }
val item = withClue("data.item") { data.item.shouldNotBeNull() }
withClue("item.name") { item.name shouldBe name }
}

@JvmName("awaitPersonWithNameGetPersonDataNoName2")
suspend fun <Variables> ReceiveTurbine<QuerySubscriptionResult<GetPersonDataNoName2, Variables>>
.awaitPersonWithName(name: String): QuerySubscriptionResult<GetPersonDataNoName2, Variables> {
val item = awaitItem()
item.shouldHavePersonWithName(name)
return item
}

@JvmName("awaitItemWithNameGetItemDataNoName2")
suspend fun <Variables> ReceiveTurbine<QuerySubscriptionResult<GetItemDataNoName2, Variables>>
.awaitItemWithName(name: String): QuerySubscriptionResult<GetItemDataNoName2, Variables> {
val item = awaitItem()
item.shouldHaveItemWithName(name)
return item
}

@JvmName("shouldHavePersonWithNameGetPersonDataNoName2")
fun QuerySubscriptionResult<GetPersonDataNoName2, GetPersonQuery.Variables>
fun <Variables> QuerySubscriptionResult<GetPersonDataNoName2, Variables>
.shouldHavePersonWithName(name: String) {
withClue("result.exceptionOrNull()") { result.exceptionOrNull().shouldBeNull() }
val data = withClue("result.getOrThrow()") { result.getOrThrow().data }
val person = withClue("data.person") { data.person.shouldNotBeNull() }
withClue("person.name") { person.name shouldBe name }
}

@JvmName("shouldHaveItemWithNameGetItemDataNoName2")
fun <Variables> QuerySubscriptionResult<GetItemDataNoName2, Variables>.shouldHaveItemWithName(
name: String
) {
withClue("result.exceptionOrNull()") { result.exceptionOrNull().shouldBeNull() }
val data = withClue("result.getOrThrow()") { result.getOrThrow().data }
val item = withClue("data.item") { data.item.shouldNotBeNull() }
withClue("item.name") { item.name shouldBe name }
}

/**
* Returns `true` if, and only if, the receiver is a non-null instance of
* [DataConnectOperationException] that indicates that the failure is due to decoding of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataCon

class GetStringByKeyQuery(val connector: RealtimeConnector) {

suspend fun execute(key: Key): Data.Item? = execute(Variables(key))
fun variables(key: Key): Variables = Variables(key)

suspend fun execute(key: Key): Data.Item? = execute(variables(key))

suspend fun execute(variables: Variables): Data.Item? = queryRef(variables).execute().data.item

fun queryRef(key: Key) = queryRef(variables(key))

fun queryRef(variables: Variables) =
connector.dataConnect.query(OPERATION_NAME, variables, serializer<Data>(), serializer())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,7 @@ internal class DataConnectBidiConnectStream(
val data: Struct?,
val errors: List<GraphqlErrorProto>,
val extensions: List<DataConnectPropertiesProto>,
) {
operator fun component1() = data
operator fun component2() = errors
operator fun component3() = extensions
}
)

private sealed interface SubscriptionEvent {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.firebase.dataconnect.core

import com.google.firebase.dataconnect.DataSource
import com.google.firebase.dataconnect.FirebaseDataConnect
import com.google.firebase.dataconnect.QueryRef.FetchPolicy
import com.google.firebase.dataconnect.core.DataConnectAppCheck.GetAppCheckTokenResult
Expand Down Expand Up @@ -90,7 +89,7 @@ internal class DataConnectGrpcClient(
return OperationResult(
data = if (response.hasData()) response.data else null,
errors = response.errorsList,
source = DataSource.SERVER,
source = DataSource.Server,
)
}

Expand Down Expand Up @@ -148,12 +147,12 @@ private fun DataConnectGrpcRPCs.ExecuteQueryResult.toOperationResult():
DataConnectGrpcClient.OperationResult(
data = data,
errors = emptyList(),
source = DataSource.CACHE,
source = DataSource.Cache(sqliteSequenceNumber),
)
is DataConnectGrpcRPCs.ExecuteQueryResult.FromServer ->
DataConnectGrpcClient.OperationResult(
data = if (response.hasData()) response.data else null,
errors = response.errorsList,
source = DataSource.SERVER,
source = DataSource.Server,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.google.firebase.dataconnect.core.LoggerGlobals.Logger
import com.google.firebase.dataconnect.core.LoggerGlobals.debug
import com.google.firebase.dataconnect.core.LoggerGlobals.warn
import com.google.firebase.dataconnect.sqlite.DataConnectCacheDatabase
import com.google.firebase.dataconnect.sqlite.DataConnectCacheDatabase.SqliteSequenceNumber
import com.google.firebase.dataconnect.sqlite.GetEntityIdForPathFunction
import com.google.firebase.dataconnect.util.CoroutineUtils
import com.google.firebase.dataconnect.util.GrpcBidiFlow
Expand Down Expand Up @@ -196,7 +197,10 @@ internal class DataConnectGrpcRPCs(
}

sealed interface ExecuteQueryResult {
@JvmInline value class FromCache(val data: Struct) : ExecuteQueryResult
data class FromCache(
val data: Struct,
val sqliteSequenceNumber: SqliteSequenceNumber?,
) : ExecuteQueryResult

@JvmInline value class FromServer(val response: ExecuteQueryResponse) : ExecuteQueryResult
}
Expand Down Expand Up @@ -329,15 +333,15 @@ internal class DataConnectGrpcRPCs(
val cachedResult =
cache.open().getQueryResult(authUid, queryId, currentTimeMillis(), staleResult)

val cachedData: Struct? =
val found: DataConnectCacheDatabase.GetQueryResultResult.Found? =
when (cachedResult) {
is DataConnectCacheDatabase.GetQueryResultResult.Found -> {
logger.logGrpcReturningFromCache(
requestId = requestId,
kotlinMethodName = kotlinMethodName,
cachedResult = cachedResult,
)
cachedResult.struct
cachedResult
}
is DataConnectCacheDatabase.GetQueryResultResult.Stale -> {
logger.logGrpcIgnoringStaleCache(
Expand All @@ -350,7 +354,10 @@ internal class DataConnectGrpcRPCs(
is DataConnectCacheDatabase.GetQueryResultResult.NotFound -> null
}

return cachedData?.let(ExecuteQueryResult::FromCache)
return when (found) {
null -> null
else -> ExecuteQueryResult.FromCache(found.struct, found.maxLastUpdateSequenceNumber)
}
}

suspend fun connect(
Expand Down
Loading
Loading