Skip to content

Commit a7f7bc3

Browse files
committed
feat: add event connection state transition api
1 parent c62e09c commit a7f7bc3

File tree

10 files changed

+262
-67
lines changed

10 files changed

+262
-67
lines changed

saltify-core/src/commonMain/kotlin/org/ntqqrev/saltify/core/SaltifyApplication.kt

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,20 @@ import io.ktor.serialization.kotlinx.*
1212
import io.ktor.serialization.kotlinx.json.*
1313
import kotlinx.coroutines.*
1414
import kotlinx.coroutines.flow.MutableSharedFlow
15+
import kotlinx.coroutines.flow.MutableStateFlow
1516
import kotlinx.coroutines.flow.SharedFlow
17+
import kotlinx.coroutines.flow.StateFlow
1618
import kotlinx.coroutines.flow.asSharedFlow
19+
import kotlinx.coroutines.flow.asStateFlow
1720
import kotlinx.serialization.json.decodeFromJsonElement
1821
import org.ntqqrev.milky.*
1922
import org.ntqqrev.saltify.annotation.WithApiExtension
20-
import org.ntqqrev.saltify.dsl.SaltifyConfig
23+
import org.ntqqrev.saltify.dsl.SaltifyApplicationConfig
2124
import org.ntqqrev.saltify.dsl.SaltifyPluginContext
25+
import org.ntqqrev.saltify.exception.ApiCallException
26+
import org.ntqqrev.saltify.model.EventConnectionState
2227
import org.ntqqrev.saltify.model.EventConnectionType
2328
import org.ntqqrev.saltify.model.SaltifyComponentType
24-
import org.ntqqrev.saltify.exception.ApiCallException
2529
import org.ntqqrev.saltify.util.coroutine.SaltifyComponent
2630
import org.ntqqrev.saltify.util.coroutine.SaltifyExceptionHandlerProvider
2731
import kotlin.coroutines.CoroutineContext
@@ -30,14 +34,14 @@ import kotlin.coroutines.CoroutineContext
3034
* 一个 Saltify 应用实例
3135
*/
3236
@WithApiExtension
33-
public sealed class SaltifyApplication(private val config: SaltifyConfig) : AutoCloseable {
37+
public sealed class SaltifyApplication(protected val config: SaltifyApplicationConfig) : AutoCloseable {
3438
public companion object {
3539
/**
3640
* 创建一个 Saltify 应用实例。
3741
*/
38-
public operator fun invoke(block: SaltifyConfig.() -> Unit): SaltifyApplication {
39-
val config = SaltifyConfig().apply(block)
40-
return when (config.eventConnectionType) {
42+
public operator fun invoke(block: SaltifyApplicationConfig.() -> Unit): SaltifyApplication {
43+
val config = SaltifyApplicationConfig().apply(block)
44+
return when (config.eventConnectionConfig.type) {
4145
EventConnectionType.WebSocket -> SaltifyApplicationWebSocket(config)
4246
EventConnectionType.SSE -> SaltifyApplicationSSE(config)
4347
}
@@ -61,20 +65,19 @@ public sealed class SaltifyApplication(private val config: SaltifyConfig) : Auto
6165
public val exceptionFlow: SharedFlow<Pair<CoroutineContext, Throwable>> =
6266
exceptionHandlerProvider.exceptionFlow.asSharedFlow()
6367

68+
protected val eventConnectionState: MutableStateFlow<EventConnectionState> =
69+
MutableStateFlow(EventConnectionState.Disconnected(null))
70+
71+
/**
72+
* 事件服务连接状态流。
73+
*/
74+
public val eventConnectionStateFlow: StateFlow<EventConnectionState> = eventConnectionState.asStateFlow()
75+
6476
internal val applicationScope: CoroutineScope = CoroutineScope(
6577
SaltifyComponent(SaltifyComponentType.Application, "SaltifyApplication") +
6678
exceptionHandlerProvider.handler
6779
)
6880

69-
@PublishedApi
70-
internal val extensionScope: CoroutineScope = CoroutineScope(
71-
applicationScope.coroutineContext +
72-
SupervisorJob(applicationScope.coroutineContext.job) +
73-
SaltifyComponent(SaltifyComponentType.Extension, "SaltifyExtension")
74-
)
75-
76-
protected val addressBaseNormalized: String = config.addressBase.trimEnd('/')
77-
7881
protected val events: MutableSharedFlow<Event> = MutableSharedFlow(extraBufferCapacity = 64)
7982

8083
/**
@@ -84,7 +87,16 @@ public sealed class SaltifyApplication(private val config: SaltifyConfig) : Auto
8487
*/
8588
public val eventFlow: SharedFlow<Event> = events.asSharedFlow()
8689

87-
private val activePlugins = mutableListOf<SaltifyPluginContext>()
90+
@PublishedApi
91+
internal val extensionScope: CoroutineScope = CoroutineScope(
92+
applicationScope.coroutineContext +
93+
SupervisorJob(applicationScope.coroutineContext.job) +
94+
SaltifyComponent(SaltifyComponentType.Extension, "SaltifyExtension")
95+
)
96+
97+
protected val addressBaseNormalized: String = config.addressBase.trimEnd('/')
98+
99+
private val loadedPlugins = mutableListOf<SaltifyPluginContext>()
88100

89101
@PublishedApi
90102
internal val httpClient: HttpClient = HttpClient {
@@ -97,7 +109,7 @@ public sealed class SaltifyApplication(private val config: SaltifyConfig) : Auto
97109
config.accessToken?.let { header(HttpHeaders.Authorization, "Bearer $it") }
98110
}
99111

100-
when (config.eventConnectionType) {
112+
when (config.eventConnectionConfig.type) {
101113
EventConnectionType.WebSocket -> install(WebSockets.Plugin) {
102114
contentConverter = KotlinxWebsocketSerializationConverter(milkyJsonModule)
103115
}
@@ -116,8 +128,11 @@ public sealed class SaltifyApplication(private val config: SaltifyConfig) : Auto
116128

117129
val context = SaltifyPluginContext(this, pluginScope)
118130
plugin.setup(context)
131+
loadedPlugins.add(context)
119132

120-
activePlugins.add(context)
133+
pluginScope.launch {
134+
context.onStartHooks.forEach { it() }
135+
}
121136
}
122137
}
123138

@@ -147,26 +162,17 @@ public sealed class SaltifyApplication(private val config: SaltifyConfig) : Auto
147162
/**
148163
* 连接事件服务。需要在监听事件时调用。请搭配 [disconnectEvent] 使用。
149164
*/
150-
public open suspend fun connectEvent(): Unit = coroutineScope {
151-
activePlugins.map { context ->
152-
context.launch {
153-
context.onStartHooks.forEach { it() }
154-
}
155-
}.joinAll()
156-
}
165+
public abstract suspend fun connectEvent()
157166

158167
/**
159168
* 断开事件服务。请搭配 [connectEvent] 使用。
160169
*/
161-
public open suspend fun disconnectEvent(): Unit = coroutineScope {
162-
activePlugins.map { context ->
163-
context.launch {
164-
context.onStopHooks.forEach { it() }
165-
}
166-
}.joinAll()
167-
}
170+
public abstract suspend fun disconnectEvent()
168171

169172
override fun close() {
173+
loadedPlugins.forEach {
174+
it.onStopHooks.forEach { block -> block() }
175+
}
170176
httpClient.close()
171177
applicationScope.cancel()
172178
}

saltify-core/src/commonMain/kotlin/org/ntqqrev/saltify/core/SaltifyApplicationSSE.kt

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,53 @@ import kotlinx.coroutines.cancelAndJoin
66
import kotlinx.coroutines.launch
77
import org.ntqqrev.milky.Event
88
import org.ntqqrev.milky.milkyJsonModule
9-
import org.ntqqrev.saltify.dsl.SaltifyConfig
9+
import org.ntqqrev.saltify.dsl.SaltifyApplicationConfig
10+
import org.ntqqrev.saltify.model.EventConnectionState
11+
import org.ntqqrev.saltify.model.EventConnectionType
12+
import org.ntqqrev.saltify.util.coroutine.withRetry
1013

11-
public class SaltifyApplicationSSE(config: SaltifyConfig) : SaltifyApplication(config) {
14+
public class SaltifyApplicationSSE(config: SaltifyApplicationConfig) : SaltifyApplication(config) {
1215
private var connectionJob: Job? = null
1316

1417
override suspend fun connectEvent() {
1518
connectionJob?.cancelAndJoin()
19+
eventConnectionState.emit(EventConnectionState.Connecting)
1620

1721
connectionJob = applicationScope.launch {
18-
httpClient.sse("$addressBaseNormalized/event") {
19-
incoming.collect { sseEvent ->
20-
if (sseEvent.event == "milky_event") {
21-
sseEvent.data?.let { data ->
22-
val event = milkyJsonModule.decodeFromString<Event>(data)
23-
events.emit(event)
22+
withRetry(
23+
config.eventConnectionConfig.maxReconnectionAttempts,
24+
config.eventConnectionConfig.baseReconnectionInterval,
25+
config.eventConnectionConfig.maxReconnectionInterval,
26+
config.eventConnectionConfig.autoReconnect,
27+
onRetry = { throwable, retryCount ->
28+
eventConnectionState.emit(EventConnectionState.Reconnecting(throwable, retryCount))
29+
},
30+
onFailure = {
31+
eventConnectionState.emit(EventConnectionState.Disconnected(it))
32+
},
33+
block = {
34+
eventConnectionState.emit(
35+
EventConnectionState.Connected(
36+
EventConnectionType.SSE, this@SaltifyApplicationSSE
37+
)
38+
)
39+
40+
httpClient.sse("$addressBaseNormalized/event") {
41+
incoming.collect { sseEvent ->
42+
if (sseEvent.event == "milky_event") {
43+
sseEvent.data?.let { data ->
44+
val event = milkyJsonModule.decodeFromString<Event>(data)
45+
events.emit(event)
46+
}
47+
}
2448
}
2549
}
2650
}
27-
}
51+
)
2852
}
29-
30-
super.connectEvent()
3153
}
3254

3355
override suspend fun disconnectEvent() {
3456
connectionJob?.cancelAndJoin()
35-
super.disconnectEvent()
3657
}
3758
}

saltify-core/src/commonMain/kotlin/org/ntqqrev/saltify/core/SaltifyApplicationWebSocket.kt

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,51 @@ import kotlinx.coroutines.cancelAndJoin
66
import kotlinx.coroutines.isActive
77
import kotlinx.coroutines.launch
88
import org.ntqqrev.milky.Event
9-
import org.ntqqrev.saltify.dsl.SaltifyConfig
9+
import org.ntqqrev.saltify.dsl.SaltifyApplicationConfig
10+
import org.ntqqrev.saltify.model.EventConnectionState
11+
import org.ntqqrev.saltify.model.EventConnectionType
12+
import org.ntqqrev.saltify.util.coroutine.withRetry
1013

11-
public class SaltifyApplicationWebSocket(config: SaltifyConfig) : SaltifyApplication(config) {
14+
public class SaltifyApplicationWebSocket(config: SaltifyApplicationConfig) : SaltifyApplication(config) {
1215
private var connectionJob: Job? = null
1316

1417
override suspend fun connectEvent() {
1518
connectionJob?.cancelAndJoin()
19+
eventConnectionState.emit(EventConnectionState.Connecting)
1620

1721
connectionJob = applicationScope.launch {
1822
val urlString = "$addressBaseNormalized/event".replaceFirst("http", "ws")
1923

20-
httpClient.webSocket(urlString) {
21-
while (isActive) {
22-
val event = receiveDeserialized<Event>()
23-
events.emit(event)
24+
withRetry(
25+
config.eventConnectionConfig.maxReconnectionAttempts,
26+
config.eventConnectionConfig.baseReconnectionInterval,
27+
config.eventConnectionConfig.maxReconnectionInterval,
28+
config.eventConnectionConfig.autoReconnect,
29+
onRetry = { throwable, retryCount ->
30+
eventConnectionState.emit(EventConnectionState.Reconnecting(throwable, retryCount))
31+
},
32+
onFailure = {
33+
eventConnectionState.emit(EventConnectionState.Disconnected(it))
34+
},
35+
block = {
36+
httpClient.webSocket(urlString) {
37+
eventConnectionState.emit(
38+
EventConnectionState.Connected(
39+
EventConnectionType.WebSocket, this@SaltifyApplicationWebSocket
40+
)
41+
)
42+
43+
while (isActive) {
44+
val event = receiveDeserialized<Event>()
45+
events.emit(event)
46+
}
47+
}
2448
}
25-
}
49+
)
2650
}
27-
28-
super.connectEvent()
2951
}
3052

3153
override suspend fun disconnectEvent() {
3254
connectionJob?.cancelAndJoin()
33-
super.disconnectEvent()
3455
}
3556
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.ntqqrev.saltify.dsl
2+
3+
import org.ntqqrev.saltify.annotation.SaltifyDsl
4+
import org.ntqqrev.saltify.model.EventConnectionType
5+
6+
@Suppress("MagicNumber")
7+
@SaltifyDsl
8+
public class EventConnectionConfig {
9+
/**
10+
* 事件服务使用的协议。
11+
*/
12+
public var type: EventConnectionType = EventConnectionType.WebSocket
13+
14+
/**
15+
* 是否启用自动重连。
16+
*/
17+
public var autoReconnect: Boolean = true
18+
19+
/**
20+
* 最小自动重连间隔。
21+
*/
22+
public var baseReconnectionInterval: Long = 500L
23+
24+
/**
25+
* 最大自动重连间隔。
26+
*/
27+
public var maxReconnectionInterval: Long = 10000L
28+
29+
/**
30+
* 最大重试次数。
31+
*/
32+
public var maxReconnectionAttempts: Int = 5
33+
}

saltify-core/src/commonMain/kotlin/org/ntqqrev/saltify/dsl/SaltifyConfig.kt renamed to saltify-core/src/commonMain/kotlin/org/ntqqrev/saltify/dsl/SaltifyApplicationConfig.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,29 @@
11
package org.ntqqrev.saltify.dsl
22

33
import org.ntqqrev.saltify.annotation.SaltifyDsl
4-
import org.ntqqrev.saltify.model.EventConnectionType
54

65
@SaltifyDsl
7-
public class SaltifyConfig {
6+
public class SaltifyApplicationConfig {
87
/**
98
* 接口根地址,如:`https://localhost:3000`。
109
*/
1110
public var addressBase: String = ""
1211

12+
internal var eventConnectionConfig = EventConnectionConfig()
13+
1314
/**
14-
* 事件服务使用的协议
15+
* 事件服务配置
1516
*/
16-
public var eventConnectionType: EventConnectionType = EventConnectionType.WebSocket
17+
public fun eventConnection(block: EventConnectionConfig.() -> Unit) {
18+
eventConnectionConfig.block()
19+
}
1720

1821
/**
1922
* 访问令牌,无需 Bearer。
2023
*/
2124
public var accessToken: String? = null
22-
internal val installedPlugins = mutableListOf<SaltifyPlugin>()
2325

26+
internal val installedPlugins = mutableListOf<SaltifyPlugin>()
2427
public fun install(plugin: SaltifyPlugin) {
2528
installedPlugins.add(plugin)
2629
}

saltify-core/src/commonMain/kotlin/org/ntqqrev/saltify/dsl/SaltifyPluginContext.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,19 @@ public class SaltifyPluginContext internal constructor(
2323
@PublishedApi internal val pluginScope: CoroutineScope
2424
) : CoroutineScope by pluginScope {
2525
internal val onStartHooks = mutableListOf<suspend () -> Unit>()
26-
internal val onStopHooks = mutableListOf<suspend () -> Unit>()
26+
internal val onStopHooks = mutableListOf<() -> Unit>()
2727

2828
/**
29-
* 插件被加载,即 [SaltifyApplication.connectEvent] 后执行的逻辑。
29+
* 插件被加载,即 [SaltifyApplication.Companion.invoke] 后执行的逻辑。
3030
*/
3131
public fun onStart(block: suspend () -> Unit) {
3232
onStartHooks.add(block)
3333
}
3434

3535
/**
36-
* 插件被卸载,即 [SaltifyApplication.disconnectEvent] 后执行的逻辑
36+
* 插件被卸载,即 [SaltifyApplication.close] 前执行的逻辑
3737
*/
38-
public fun onStop(block: suspend () -> Unit) {
38+
public fun onStop(block: () -> Unit) {
3939
onStopHooks.add(block)
4040
}
4141

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.ntqqrev.saltify.model
2+
3+
import org.ntqqrev.saltify.core.SaltifyApplication
4+
5+
/**
6+
* Saltify 应用的事件服务连接状态
7+
*/
8+
public sealed class EventConnectionState {
9+
public data class Connected(
10+
val type: EventConnectionType,
11+
val instance: SaltifyApplication
12+
) : EventConnectionState()
13+
public data class Disconnected(val throwable: Throwable?) : EventConnectionState()
14+
public data class Reconnecting(val throwable: Throwable, val attempt: Int) : EventConnectionState()
15+
public object Connecting : EventConnectionState()
16+
}

0 commit comments

Comments
 (0)