Skip to content

Commit 11e03a0

Browse files
committed
Adding raw event subscription.
Signed-off-by: Artur Ciocanu <[email protected]>
1 parent 8f11972 commit 11e03a0

File tree

4 files changed

+147
-0
lines changed

4 files changed

+147
-0
lines changed

sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,48 @@ public void onError(RuntimeException exception) {
124124
}
125125
}
126126
}
127+
128+
@Test
129+
public void testPubSubRawData() throws Exception {
130+
final DaprRun daprRun = closeLater(startDaprApp(
131+
this.getClass().getSimpleName() + "-rawdata",
132+
60000));
133+
134+
var runId = UUID.randomUUID().toString();
135+
try (DaprClient client = daprRun.newDaprClient();
136+
DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) {
137+
138+
// Publish messages
139+
for (int i = 0; i < NUM_MESSAGES; i++) {
140+
String message = String.format("Raw message #%d for run %s", i, runId);
141+
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
142+
System.out.println(
143+
String.format("Published raw message: '%s' to topic '%s'", message, TOPIC_NAME));
144+
}
145+
146+
System.out.println("Starting raw data subscription for " + TOPIC_NAME);
147+
148+
Set<String> messages = Collections.synchronizedSet(new HashSet<>());
149+
150+
// Use new subscribeToEventsData - receives String directly, not CloudEvent<String>
151+
var disposable = previewClient.subscribeToEventsData(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING)
152+
.doOnNext(rawMessage -> {
153+
// rawMessage is String directly
154+
if (rawMessage.contains(runId)) {
155+
messages.add(rawMessage);
156+
System.out.println("Received raw message: " + rawMessage);
157+
}
158+
})
159+
.subscribe();
160+
161+
callWithRetry(() -> {
162+
var messageCount = messages.size();
163+
System.out.println(
164+
String.format("Got %d raw messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME));
165+
assertEquals(NUM_MESSAGES, messages.size());
166+
}, 60000);
167+
168+
disposable.dispose();
169+
}
170+
}
127171
}

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,15 @@ public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic
514514
}, FluxSink.OverflowStrategy.BUFFER);
515515
}
516516

517+
/**
518+
* {@inheritDoc}
519+
*/
520+
@Override
521+
public <T> Flux<T> subscribeToEventsData(String pubsubName, String topic, TypeRef<T> type) {
522+
return subscribeToEvents(pubsubName, topic, type)
523+
.map(CloudEvent::getData);
524+
}
525+
517526
@Nonnull
518527
private <T> Subscription<T> buildSubscription(
519528
SubscriptionListener<T> listener,

sdk/src/main/java/io/dapr/client/DaprPreviewClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,18 @@ <T> Subscription subscribeToEvents(
293293
*/
294294
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);
295295

296+
/**
297+
* Subscribe to pubsub events via streaming using Project Reactor Flux.
298+
* Returns only the deserialized event data without CloudEvent metadata wrapper.
299+
*
300+
* @param pubsubName Name of the pubsub component.
301+
* @param topic Name of the topic to subscribe to.
302+
* @param type Type for object deserialization.
303+
* @return A Flux of deserialized event payloads (no CloudEvent wrapper).
304+
* @param <T> Type of the event payload.
305+
*/
306+
<T> Flux<T> subscribeToEventsData(String pubsubName, String topic, TypeRef<T> type);
307+
296308
/*
297309
* Converse with an LLM.
298310
*

sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,88 @@ public void onCompleted() {
659659
assertEquals(numEvents, eventCount.get());
660660
}
661661

662+
@Test
663+
public void subscribeToEventsDataTest() throws Exception {
664+
var numEvents = 100;
665+
var pubsubName = "pubsubName";
666+
var topicName = "topicName";
667+
var data = "my message";
668+
var started = new Semaphore(0);
669+
670+
doAnswer((Answer<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>>) invocation -> {
671+
StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1> observer =
672+
(StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1>) invocation.getArguments()[0];
673+
674+
var emitterThread = new Thread(() -> {
675+
try {
676+
started.acquire();
677+
} catch (InterruptedException e) {
678+
throw new RuntimeException(e);
679+
}
680+
681+
observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
682+
683+
for (int i = 0; i < numEvents; i++) {
684+
DaprProtos.SubscribeTopicEventsResponseAlpha1 response =
685+
DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
686+
.setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder()
687+
.setId(Integer.toString(i))
688+
.setPubsubName(pubsubName)
689+
.setTopic(topicName)
690+
.setData(ByteString.copyFromUtf8("\"" + data + "\""))
691+
.setDataContentType("application/json")
692+
.build())
693+
.build();
694+
observer.onNext(response);
695+
}
696+
697+
observer.onCompleted();
698+
});
699+
700+
emitterThread.start();
701+
702+
return new StreamObserver<>() {
703+
@Override
704+
public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) {
705+
started.release();
706+
}
707+
708+
@Override
709+
public void onError(Throwable throwable) {
710+
// No-op
711+
}
712+
713+
@Override
714+
public void onCompleted() {
715+
// No-op
716+
}
717+
};
718+
}).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
719+
720+
final AtomicInteger eventCount = new AtomicInteger(0);
721+
final Semaphore gotAll = new Semaphore(0);
722+
723+
// Use new subscribeToEventsData - receives raw String, not CloudEvent<String>
724+
var disposable = previewClient.subscribeToEventsData(pubsubName, topicName, TypeRef.STRING)
725+
.doOnNext(rawData -> {
726+
// rawData is String directly, not CloudEvent
727+
assertEquals(data, rawData);
728+
assertTrue(rawData instanceof String);
729+
730+
int count = eventCount.incrementAndGet();
731+
732+
if (count >= numEvents) {
733+
gotAll.release();
734+
}
735+
})
736+
.subscribe();
737+
738+
gotAll.acquire();
739+
disposable.dispose();
740+
741+
assertEquals(numEvents, eventCount.get());
742+
}
743+
662744
@Test
663745
public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception {
664746
List<ConversationInput> inputs = new ArrayList<>();

0 commit comments

Comments
 (0)