Skip to content

Commit 78bcf6b

Browse files
Merge pull request #4 from caplin/spring-containers
Add initial container support to Spring
2 parents 3c139d1 + 2ffb708 commit 78bcf6b

File tree

14 files changed

+519
-125
lines changed

14 files changed

+519
-125
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Then refer to the documentation:
3030
## Spring
3131

3232
This module provides a starter for integrating Caplin DataSource with your
33-
[Spring Boot](https://spring.io/projects/spring-boot) application, and integration with
33+
[Spring Boot 3.5](https://spring.io/projects/spring-boot) application, and integration with
3434
[Spring Messaging](https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html)
3535
for publishing data from annotated functions.
3636

examples/spring-java/README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
* Ensure the required Caplin Liberator service is running as described [here](../README.md#requirements).
44
* Launch the application with `../../gradlew bootRun`.
55
* Use the Liberator UI to request one of the subjects, for example:
6-
* `/public/stream/abc/123`
7-
* `/user/stream/abc/123`
8-
* `/session/stream/abc/123`
6+
* `/public/stream/abc/123`
7+
* `/user/stream/abc/123`
8+
* `/session/stream/abc/123`
9+
* `/public/container/abc/123`
10+
* `/user/container/abc/123`
11+

examples/spring-java/src/main/java/example/StreamsController.java

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package example;
22

3+
import static com.caplin.integration.datasourcex.reactive.api.ContainerEvent.RowEvent;
4+
5+
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent;
36
import com.caplin.integration.datasourcex.spring.annotations.IngressDestinationVariable;
47
import com.caplin.integration.datasourcex.spring.annotations.IngressToken;
58
import java.time.Duration;
9+
import java.util.Map;
610
import java.util.UUID;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.ThreadLocalRandom;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.IntStream;
715
import org.springframework.messaging.handler.annotation.DestinationVariable;
816
import org.springframework.messaging.handler.annotation.MessageMapping;
917
import org.springframework.stereotype.Controller;
@@ -23,11 +31,7 @@ public class StreamsController {
2331
@MessageMapping("/public/stream/{parameter1}/{parameter2}")
2432
public Flux<Payload> genericStream(
2533
@DestinationVariable String parameter1, @DestinationVariable int parameter2) {
26-
return Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
27-
.map(
28-
version ->
29-
new Payload(
30-
version.intValue(), null, null, parameter1, parameter2, UUID.randomUUID()));
34+
return createPayloadFlux(null, null, parameter1, parameter2);
3135
}
3236

3337
/**
@@ -43,11 +47,7 @@ public Flux<Payload> userStream(
4347
@IngressDestinationVariable(token = IngressToken.USER_ID) String userId,
4448
@DestinationVariable String parameter1,
4549
@DestinationVariable int parameter2) {
46-
return Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
47-
.map(
48-
version ->
49-
new Payload(
50-
version.intValue(), userId, null, parameter1, parameter2, UUID.randomUUID()));
50+
return createPayloadFlux(userId, null, parameter1, parameter2);
5151
}
5252

5353
/**
@@ -64,6 +64,45 @@ public Flux<Payload> sessionStream(
6464
@IngressDestinationVariable(token = IngressToken.PERSISTENT_SESSION_ID) String sessionId,
6565
@DestinationVariable String parameter1,
6666
@DestinationVariable int parameter2) {
67+
return createPayloadFlux(userId, sessionId, parameter1, parameter2);
68+
}
69+
70+
/**
71+
* Streams a sequence of {@link ContainerEvent}s of {@link Payload} objects.
72+
*
73+
* <p>This example demonstrates how to publish a DataSource container. It initially emits a {@link
74+
* ContainerEvent.Bulk} event containing 10 rows (keys "0" through "9"). Subsequently, it emits
75+
* random {@link RowEvent.Upsert} or {@link RowEvent.Remove} events for these rows at one-second
76+
* intervals.
77+
*
78+
* @see Flux
79+
*/
80+
@MessageMapping("/public/container/{parameter1}/{parameter2}")
81+
public Flux<ContainerEvent<Payload>> containerStream(
82+
@DestinationVariable String parameter1, @DestinationVariable int parameter2) {
83+
return createContainerFlux(null, parameter1, parameter2);
84+
}
85+
86+
/**
87+
* Streams a sequence of {@link ContainerEvent}s of {@link Payload} objects.
88+
*
89+
* <p>This example demonstrates how to publish a user-specific DataSource container. It initially
90+
* emits a {@link ContainerEvent.Bulk} event containing 10 rows (keys "0" through "9").
91+
* Subsequently, it emits random {@link RowEvent.Upsert} or {@link RowEvent.Remove} events for
92+
* these rows at one-second intervals.
93+
*
94+
* @see Flux
95+
*/
96+
@MessageMapping("/user/{userId}/container/{parameter1}/{parameter2}")
97+
public Flux<ContainerEvent<Payload>> userContainerStream(
98+
@IngressDestinationVariable(token = IngressToken.USER_ID) String userId,
99+
@DestinationVariable String parameter1,
100+
@DestinationVariable int parameter2) {
101+
return createContainerFlux(userId, parameter1, parameter2);
102+
}
103+
104+
private Flux<Payload> createPayloadFlux(
105+
String userId, String sessionId, String parameter1, int parameter2) {
67106
return Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
68107
.map(
69108
version ->
@@ -75,4 +114,45 @@ public Flux<Payload> sessionStream(
75114
parameter2,
76115
UUID.randomUUID()));
77116
}
117+
118+
@SuppressWarnings("unchecked")
119+
private Flux<ContainerEvent<Payload>> createContainerFlux(
120+
String userId, String parameter1, int parameter2) {
121+
122+
Map<Integer, Integer> rows = new ConcurrentHashMap<>();
123+
124+
Flux<ContainerEvent<Payload>> initial =
125+
Flux.just(
126+
new ContainerEvent.Bulk<>(
127+
IntStream.rangeClosed(0, 9)
128+
.mapToObj(
129+
row -> {
130+
rows.put(row, 1);
131+
return new RowEvent.Upsert<>(
132+
String.valueOf(row),
133+
new Payload(
134+
0, userId, null, parameter1, parameter2, UUID.randomUUID()));
135+
})
136+
.collect(Collectors.toList())));
137+
138+
Flux<ContainerEvent<Payload>> updates =
139+
Flux.interval(Duration.ofSeconds(1))
140+
.map(
141+
i -> {
142+
int row = ThreadLocalRandom.current().nextInt(10);
143+
if (ThreadLocalRandom.current().nextInt(3) != 2) {
144+
int version = rows.getOrDefault(row, 0);
145+
rows.put(row, version + 1);
146+
return new RowEvent.Upsert<Payload>(
147+
String.valueOf(row),
148+
new Payload(
149+
version, userId, null, parameter1, parameter2, UUID.randomUUID()));
150+
} else {
151+
rows.remove(row);
152+
return new RowEvent.Remove(String.valueOf(row));
153+
}
154+
});
155+
156+
return Flux.concat(initial, updates);
157+
}
78158
}

examples/spring-kotlin/README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
* Ensure the required Caplin Liberator service is running as described [here](../README.md#requirements).
44
* Launch the application with `../../gradlew bootRun`.
55
* Use the Liberator UI to request one of the subjects, for example:
6-
* `/public/stream/abc/123`
7-
* `/user/stream/abc/123`
8-
* `/session/stream/abc/123`
6+
* `/public/stream/abc/123`
7+
* `/user/stream/abc/123`
8+
* `/session/stream/abc/123`
9+
* `/public/container/abc/123`
10+
* `/user/container/abc/123`
11+

examples/spring-kotlin/src/main/kotlin/example/StreamsController.kt

Lines changed: 95 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package example
22

3+
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent
4+
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent.Bulk
5+
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent.RowEvent.Remove
6+
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent.RowEvent.Upsert
37
import com.caplin.integration.datasourcex.spring.annotations.IngressDestinationVariable
48
import com.caplin.integration.datasourcex.spring.annotations.IngressToken
59
import java.util.UUID
10+
import kotlin.random.Random
611
import kotlin.time.Duration.Companion.seconds
712
import kotlinx.coroutines.delay
813
import kotlinx.coroutines.flow.Flow
@@ -27,22 +32,7 @@ class StreamsController {
2732
fun genericStream(
2833
@DestinationVariable parameter1: String,
2934
@DestinationVariable parameter2: Int,
30-
) = flow {
31-
var version = 0
32-
while (true) {
33-
emit(
34-
Payload(
35-
version = version++,
36-
userId = null,
37-
sessionId = null,
38-
parameter1 = parameter1,
39-
parameter2 = parameter2,
40-
uuid = UUID.randomUUID(),
41-
),
42-
)
43-
delay(1.seconds)
44-
}
45-
}
35+
) = createPayloadFlow(null, null, parameter1, parameter2)
4636

4737
/**
4838
* Streams an infinite sequence of Payload objects, where each payload contains an incrementing
@@ -58,22 +48,7 @@ class StreamsController {
5848
@IngressDestinationVariable(IngressToken.USER_ID) userId: String,
5949
@DestinationVariable parameter1: String,
6050
@DestinationVariable parameter2: Int,
61-
) = flow {
62-
var version = 0
63-
while (true) {
64-
emit(
65-
Payload(
66-
version = version++,
67-
userId = null,
68-
sessionId = userId,
69-
parameter1 = parameter1,
70-
parameter2 = parameter2,
71-
uuid = UUID.randomUUID(),
72-
),
73-
)
74-
delay(1.seconds)
75-
}
76-
}
51+
) = createPayloadFlow(userId, null, parameter1, parameter2)
7752

7853
/**
7954
* Streams an infinite sequence of Payload objects, where each payload contains an incrementing
@@ -90,14 +65,100 @@ class StreamsController {
9065
@IngressDestinationVariable(IngressToken.PERSISTENT_SESSION_ID) sessionId: String,
9166
@DestinationVariable parameter1: String,
9267
@DestinationVariable parameter2: Int,
68+
) = createPayloadFlow(userId, sessionId, parameter1, parameter2)
69+
70+
/**
71+
* Streams a sequence of [ContainerEvent]s of [Payload] objects.
72+
*
73+
* This example demonstrates how to publish a DataSource container. It initially emits a [Bulk]
74+
* event containing 10 rows (keys "0" through "9"). Subsequently, it emits random [Upsert] or
75+
* [Remove] events for these rows at one-second intervals.
76+
*
77+
* @see Flow
78+
*/
79+
@MessageMapping("/public/container/{parameter1}/{parameter2}")
80+
fun containerStream(
81+
@DestinationVariable parameter1: String,
82+
@DestinationVariable parameter2: Int,
83+
): Flow<ContainerEvent<Payload>> = createContainerFlow(null, parameter1, parameter2)
84+
85+
/**
86+
* Streams a sequence of [ContainerEvent]s of [Payload] objects.
87+
*
88+
* This example demonstrates how to publish a DataSource container. It initially emits a [Bulk]
89+
* event containing 10 rows (keys "0" through "9"). Subsequently, it emits random [Upsert] or
90+
* [Remove] events for these rows at one-second intervals.
91+
*
92+
* @see Flow
93+
*/
94+
@MessageMapping("/user/{userId}/container/{parameter1}/{parameter2}")
95+
fun userContainerStream(
96+
@IngressDestinationVariable(IngressToken.USER_ID) userId: String,
97+
@DestinationVariable parameter1: String,
98+
@DestinationVariable parameter2: Int,
99+
): Flow<ContainerEvent<Payload>> = createContainerFlow(userId, parameter1, parameter2)
100+
101+
private fun createContainerFlow(userId: String?, parameter1: String, parameter2: Int) = flow {
102+
val rows = mutableMapOf<Int, Int>()
103+
104+
emit(
105+
Bulk(
106+
(0 until 10).map { row ->
107+
rows[row] = 1
108+
Upsert(
109+
row.toString(),
110+
Payload(
111+
0,
112+
userId,
113+
null,
114+
parameter1,
115+
parameter2,
116+
UUID.randomUUID(),
117+
),
118+
)
119+
},
120+
),
121+
)
122+
123+
while (true) {
124+
delay(1.seconds)
125+
val row = Random.nextInt(10)
126+
if (Random.nextInt(3) != 2) {
127+
val version = rows.getOrPut(row) { 0 }
128+
emit(
129+
Upsert(
130+
row.toString(),
131+
Payload(
132+
version,
133+
userId,
134+
null,
135+
parameter1,
136+
parameter2,
137+
UUID.randomUUID(),
138+
),
139+
),
140+
)
141+
rows[row] = version + 1
142+
} else {
143+
rows.remove(row)
144+
emit(Remove(row.toString()))
145+
}
146+
}
147+
}
148+
149+
private fun createPayloadFlow(
150+
userId: String?,
151+
sessionId: String?,
152+
parameter1: String,
153+
parameter2: Int,
93154
) = flow {
94155
var version = 0
95156
while (true) {
96157
emit(
97158
Payload(
98159
version = version++,
99-
userId = sessionId,
100-
sessionId = userId,
160+
userId = userId,
161+
sessionId = sessionId,
101162
parameter1 = parameter1,
102163
parameter2 = parameter2,
103164
uuid = UUID.randomUUID(),

reactive/api/api/datasourcex-reactive-api.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public final class com/caplin/integration/datasourcex/reactive/api/ActiveConfig$
2121

2222
public abstract class com/caplin/integration/datasourcex/reactive/api/ActiveContainerConfig {
2323
public static final field Companion Lcom/caplin/integration/datasourcex/reactive/api/ActiveContainerConfig$Companion;
24+
public static final field ITEMS_SUFFIX_DEFAULT Ljava/lang/String;
2425
public static final field ROW_REQUEST_TIMEOUT_DEFAULT J
2526
public static final field STRUCTURE_DEBOUNCE_DEFAULT J
2627
public final fun getInsertAt ()Lcom/caplin/integration/datasourcex/reactive/api/InsertAt;
@@ -123,6 +124,7 @@ public abstract interface class com/caplin/integration/datasourcex/reactive/api/
123124

124125
public final class com/caplin/integration/datasourcex/reactive/api/ContainerEvent$Bulk : com/caplin/integration/datasourcex/reactive/api/ContainerEvent {
125126
public fun <init> (Ljava/util/List;)V
127+
public fun <init> ([Lcom/caplin/integration/datasourcex/reactive/api/ContainerEvent$RowEvent;)V
126128
public final fun component1 ()Ljava/util/List;
127129
public fun equals (Ljava/lang/Object;)Z
128130
public final fun getEvents ()Ljava/util/List;

reactive/api/src/main/kotlin/com/caplin/integration/datasourcex/reactive/api/ActiveContainerConfig.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ sealed class ActiveContainerConfig {
77
companion object {
88
const val STRUCTURE_DEBOUNCE_DEFAULT: Long = 100L
99
const val ROW_REQUEST_TIMEOUT_DEFAULT: Long = 10000L
10+
const val ITEMS_SUFFIX_DEFAULT = "-items"
1011
}
1112

1213
/**
@@ -44,7 +45,7 @@ sealed class ActiveContainerConfig {
4445
* items with the paths `/container/a-items/{rowKey}` where `{rowKey}` is provided by
4546
* [ContainerEvent.RowEvent.key].
4647
*/
47-
var rowPathSuffix: String = "-items"
48+
var rowPathSuffix: String = ITEMS_SUFFIX_DEFAULT
4849

4950
class Json : ActiveContainerConfig()
5051

reactive/api/src/main/kotlin/com/caplin/integration/datasourcex/reactive/api/ContainerEvent.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ sealed interface ContainerEvent<out T : Any> {
44

55
/** Send a number of updates in one. This may be more efficient on the wire. */
66
class Bulk<out T : Any>(val events: List<RowEvent<T>>) : ContainerEvent<T> {
7+
8+
constructor(vararg events: RowEvent<T>) : this(events.toList())
9+
710
operator fun component1(): List<RowEvent<T>> = events
811

912
override fun equals(other: Any?): Boolean {

reactive/kotlin/src/test/kotlin/com/caplin/integration/datasourcex/reactive/kotlin/BindTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class BindTest :
4646
FunSpec({
4747
isolationMode = IsolationMode.InstancePerTest
4848

49-
suspend fun <T : Any?> MutableSharedFlow<ValueOrCompletion<T>>.emitUpdate(event: T) {
49+
suspend fun <T> MutableSharedFlow<ValueOrCompletion<T>>.emitUpdate(event: T) {
5050
emit(Value(event))
5151
}
5252

0 commit comments

Comments
 (0)