Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.google.firebase.dataconnect.sqlite.DataConnectCacheDatabase.SqliteSeq
import com.google.firebase.dataconnect.sqlite.QueryResultArb
import com.google.firebase.dataconnect.sqlite.QueryResultArb.EntityRepeatPolicy.INTER_SAMPLE_MUTATED
import com.google.firebase.dataconnect.sqlite.hydratedStructWithMutatedEntityValuesFrom
import com.google.firebase.dataconnect.testutil.CleanupsRule
import com.google.firebase.dataconnect.testutil.Cleanups
import com.google.firebase.dataconnect.testutil.DataConnectLogLevelRule
import com.google.firebase.dataconnect.testutil.DataConnectPath
import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcStreamingServer
Expand Down Expand Up @@ -97,11 +97,13 @@ import io.mockk.coEvery
import io.mockk.every
import io.mockk.mockk
import java.io.File
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.random.Random
import kotlin.time.Duration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.updateAndGet
Expand All @@ -117,11 +119,10 @@ import org.robolectric.RobolectricTestRunner
import org.robolectric.RuntimeEnvironment

@RunWith(RobolectricTestRunner::class)
class DataConnectGrpcRPCsUnitTest {
open class DataConnectGrpcRPCsUnitTest {

@get:Rule val dataConnectLogLevelRule = DataConnectLogLevelRule()
@get:Rule val temporaryFolder = TemporaryFolder()
@get:Rule val cleanups = CleanupsRule()

private val mockLogger = newMockLogger("s3nx74epqj")
private val requestIdArb = Arb.dataConnect.requestId()
Expand Down Expand Up @@ -153,11 +154,18 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb().orNull(nullProbability = 0.2),
) { (sample1, sample2), fetchPolicy1, authToken, appCheckToken, cache ->
val response1 = sample1.hydratedStruct.toExecuteQueryResponse()
val response2 = sample2.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
cleanups.register(cache)

val response1 = sample1.hydratedStruct.toExecuteQueryResponse()
val response2 = sample2.hydratedStruct.toExecuteQueryResponse()

val server = startServer()
cleanups.register(server)

startServer().use { server ->
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache)
cleanups.register(dataConnectGrpcRPCs)

val request = operationNameVariablesPairArb.bind()

server.nextResponse = response1
Expand Down Expand Up @@ -201,14 +209,18 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(),
) { (fetchPolicy1, fetchPolicy2), authToken, appCheckToken, cache ->
val (sample1, sample2) =
QueryResultArb(entityCountRange = 0..5, entityRepeatPolicy = INTER_SAMPLE_MUTATED)
.pair()
.bind()
val (request1, request2) = operationNameVariablesPairArb.distinctPair().bind()

startServer().use { server ->
Cleanups().use { cleanups ->
cleanups.register(cache)
val (sample1, sample2) =
QueryResultArb(entityCountRange = 0..5, entityRepeatPolicy = INTER_SAMPLE_MUTATED)
.pair()
.bind()
val (request1, request2) = operationNameVariablesPairArb.distinctPair().bind()

val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache)
cleanups.register(dataConnectGrpcRPCs)

server.nextResponse = sample1.toExecuteQueryResponse()
dataConnectGrpcRPCs.executeQuery(
Expand Down Expand Up @@ -255,8 +267,12 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(),
) { authToken, appCheckToken, cache ->
startServer().use { server ->
Cleanups().use { cleanups ->
cleanups.register(cache)
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

val exception =
Expand Down Expand Up @@ -290,8 +306,11 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.authTokenResult().orNull(nullProbability = 0.3),
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
) { authToken, appCheckToken ->
startServer().use { server ->
Cleanups().use { cleanups ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = null)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

val exception =
Expand Down Expand Up @@ -335,10 +354,14 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(),
) { sample, (fetchPolicy1, fetchPolicy2), authToken, appCheckToken, cache ->
startServer().use { server ->
Cleanups().use { cleanups ->
cleanups.register(cache)
val server = startServer()
cleanups.register(server)
val response = sample.hydratedStruct.toExecuteQueryResponse()
server.nextResponse = response
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

val result1 =
Expand Down Expand Up @@ -390,12 +413,16 @@ class DataConnectGrpcRPCsUnitTest {
cacheArb(),
) { (fetchPolicy1, fetchPolicy2, fetchPolicy3, fetchPolicy4), authToken, appCheckToken, cache
->
startServer().use { server ->
Cleanups().use { cleanups ->
cleanups.register(cache)
val server = startServer()
cleanups.register(server)
val queryResultArb =
QueryResultArb(entityCountRange = 0..5, entityRepeatPolicy = INTER_SAMPLE_MUTATED)
val sample1 = queryResultArb.bind()
val sample2 = queryResultArb.bind()
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache)
cleanups.register(dataConnectGrpcRPCs)
val distinctExecuteQueryRequestArb = operationNameVariablesPairArb.distinct()
val request1 = distinctExecuteQueryRequestArb.bind()
val request2 = distinctExecuteQueryRequestArb.bind()
Expand Down Expand Up @@ -463,10 +490,13 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.authTokenResult().orNull(nullProbability = 0.3),
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
) { sample, fetchPolicy, authToken, appCheckToken ->
val response = sample.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
val response = sample.hydratedStruct.toExecuteQueryResponse()

startServer().use { server ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = null)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

server.nextResponse = response
Expand Down Expand Up @@ -497,10 +527,14 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(),
) { sample, fetchPolicy, authToken, appCheckToken, cache ->
val response = sample.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
cleanups.register(cache)
val response = sample.hydratedStruct.toExecuteQueryResponse()

startServer().use { server ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

server.nextResponse = response
Expand Down Expand Up @@ -532,11 +566,15 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(),
) { (sample1, sample2), authToken, appCheckToken, cache ->
val response1 = sample1.hydratedStruct.toExecuteQueryResponse()
val response2 = sample2.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
cleanups.register(cache)
val response1 = sample1.hydratedStruct.toExecuteQueryResponse()
val response2 = sample2.hydratedStruct.toExecuteQueryResponse()

startServer().use { server ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

server.nextResponse = response1
Expand Down Expand Up @@ -586,10 +624,14 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(),
) { sample, fetchPolicy, authToken, appCheckToken, cache ->
val response = sample.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
cleanups.register(cache)
val response = sample.hydratedStruct.toExecuteQueryResponse()

startServer().use { server ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

server.nextResponse = response
Expand Down Expand Up @@ -631,10 +673,14 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(maxAge = Arb.constant(Duration.ZERO)), // always stale
) { sample, fetchPolicy1, authToken, appCheckToken, cache ->
val response = sample.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
cleanups.register(cache)
val response = sample.hydratedStruct.toExecuteQueryResponse()

startServer().use { server ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

server.nextResponse = response
Expand Down Expand Up @@ -683,10 +729,14 @@ class DataConnectGrpcRPCsUnitTest {
Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3),
cacheArb(maxAge = Arb.constant(Duration.ZERO)), // always stale
) { sample, fetchPolicy1, authToken, appCheckToken, cache ->
val response = sample.hydratedStruct.toExecuteQueryResponse()
Cleanups().use { cleanups ->
cleanups.register(cache)
val response = sample.hydratedStruct.toExecuteQueryResponse()

startServer().use { server ->
val server = startServer()
cleanups.register(server)
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache = cache)
cleanups.register(dataConnectGrpcRPCs)
val request = operationNameVariablesPairArb.bind()

server.nextResponse = response
Expand Down Expand Up @@ -720,40 +770,51 @@ class DataConnectGrpcRPCsUnitTest {
@Test
fun `connect() lazily sends init request on subscribe`() = runTest {
checkAll(propTestConfig, cacheArb().orNull(nullProbability = 0.2)) { cache ->
val server = InProcessDataConnectGrpcStreamingServer()
val cleanupsRegistration = cleanups.register(server)
server.open()
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache)
val callerSdkType = Arb.enum<CallerSdkType>().bind()

server.events.test {
val stream = dataConnectGrpcRPCs.connect(randomSource())
expectNoEvents()

val subscriptionFlow =
stream.subscribe(
"req1",
"opName",
StructProto.getDefaultInstance(),
callerSdkType,
)
expectNoEvents()
Cleanups().use { cleanups ->
cleanups.register(cache)

val server = InProcessDataConnectGrpcStreamingServer()
cleanups.register(server)
server.open()
val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server, cache)
cleanups.register(dataConnectGrpcRPCs)
val callerSdkType = Arb.enum<CallerSdkType>().bind()

server.events.test {
val stream = dataConnectGrpcRPCs.connect(randomSource())
expectNoEvents()

val subscriptionFlow =
stream.subscribe(
"req1",
"opName",
StructProto.getDefaultInstance(),
callerSdkType,
)
expectNoEvents()

backgroundScope.launch(CallerSdkTypeElement(callerSdkType)) { subscriptionFlow.collect() }
val streamRequest: StreamRequest = awaitUntilInitStreamRequest().streamRequest
val backgroundCollectJob =
backgroundScope.launch(CallerSdkTypeElement(callerSdkType)) {
subscriptionFlow.collect()
}
cleanups.registerSuspending { backgroundCollectJob.cancelAndJoin() }
val streamRequest: StreamRequest = awaitUntilInitStreamRequest().streamRequest

withClue("streamRequest=${streamRequest.print().value}") {
withClue("requestId") { streamRequest.requestId shouldBe "init" }
withClue("name") { streamRequest.name shouldBe dataConnectGrpcRPCs.connectorResourceName }
withClue("requestKindCase") {
streamRequest.requestKindCase shouldBe StreamRequest.RequestKindCase.REQUESTKIND_NOT_SET
withClue("streamRequest=${streamRequest.print().value}") {
withClue("requestId") { streamRequest.requestId shouldBe "init" }
withClue("name") {
streamRequest.name shouldBe dataConnectGrpcRPCs.connectorResourceName
}
withClue("requestKindCase") {
streamRequest.requestKindCase shouldBe
StreamRequest.RequestKindCase.REQUESTKIND_NOT_SET
}
}

backgroundCollectJob.cancelAndJoin()
cancelAndIgnoreRemainingEvents()
}
cancelAndIgnoreRemainingEvents()
}

server.close()
cleanups.unregister(cleanupsRegistration)
}
}

Expand Down Expand Up @@ -803,7 +864,8 @@ class DataConnectGrpcRPCsUnitTest {
}

override fun close() {
grpcServer.shutdown()
grpcServer.shutdownNow()
grpcServer.awaitTermination(10, TimeUnit.SECONDS)
}
}

Expand Down Expand Up @@ -847,22 +909,18 @@ class DataConnectGrpcRPCsUnitTest {
private fun PropertyContext.newDataConnectGrpcRPCsForLocalhostServerOnPort(
port: Int,
cache: DataConnectCache?
): DataConnectGrpcRPCs {
val dataConnectGrpcRPCs =
DataConnectGrpcRPCs(
context = RuntimeEnvironment.getApplication(),
host = "localhost:$port",
sslEnabled = false,
connectorResourceName = connectorResourceNameArb.bind(),
nonBlockingCoroutineDispatcher = Dispatchers.Default,
blockingCoroutineDispatcher = Dispatchers.IO,
grpcMetadata = grpcMetadataArb.bind(),
cache = cache,
parentLogger = mockLogger,
)
cleanups.registerSuspending { dataConnectGrpcRPCs.close() }
return dataConnectGrpcRPCs
}
): DataConnectGrpcRPCs =
DataConnectGrpcRPCs(
context = RuntimeEnvironment.getApplication(),
host = "localhost:$port",
sslEnabled = false,
connectorResourceName = connectorResourceNameArb.bind(),
nonBlockingCoroutineDispatcher = Dispatchers.Default,
blockingCoroutineDispatcher = Dispatchers.IO,
grpcMetadata = grpcMetadataArb.bind(),
cache = cache,
parentLogger = mockLogger,
)

private fun cacheArb(
maxAge: Arb<Duration> =
Expand Down Expand Up @@ -940,3 +998,9 @@ private fun listValueFromPath(path: DataConnectPath): ListValueProto {

private fun <T> T.sequenced(): SequencedReference<T> =
SequencedReference(nextSequenceNumber(), this)

private fun Cleanups.register(cache: DataConnectCache?) = registerSuspending { cache?.close() }

private fun Cleanups.register(dataConnectGrpcRPCs: DataConnectGrpcRPCs) = registerSuspending {
dataConnectGrpcRPCs.close()
}
Loading
Loading