@@ -36,7 +36,7 @@ public OctopusImpl(Plugin plugin) {
3636 }
3737
3838 @ Override
39- public Collection <Entry > get (@ NonNull String keyPattern , boolean includeExpired , @ Nullable Instant createdRangeStart , @ Nullable Instant createdRangeEnd ) {
39+ public @ NotNull Collection <Entry > get (@ NonNull String keyPattern , boolean includeExpired , @ Nullable Instant createdRangeStart , @ Nullable Instant createdRangeEnd ) {
4040 var builder = GetRequest .newBuilder ();
4141
4242 builder .setKeyPattern (keyPattern );
@@ -58,28 +58,33 @@ public void registerListener(@NonNull Listener listener) {
5858 var observer = stub .listen (new StreamObserver <>() {
5959 @ Override
6060 public void onNext (EventCall value ) {
61- var request = requestRef .get ();
62- if (request == null ) return ;
61+ try {
62+ var request = requestRef .get ();
63+ if (request == null ) return ;
6364
64- listener .onCall (value .getObject ());
65+ listener .onCall (value .getObject ());
6566
66- var msg = ListenMessage .newBuilder ()
67- .setCallback (value )
68- .build ();
67+ var msg = ListenMessage .newBuilder ()
68+ .setCallback (value )
69+ .build ();
6970
70- request .onNext (msg );
71+ request .onNext (msg );
72+ } catch (Exception e ) {
73+ logger .error (e .getMessage (), e );
74+ }
7175 }
7276
7377 @ Override
7478 public void onError (Throwable t ) {
7579 requestRef .set (null );
7680 logger .error ("Cannot call event on listener {} with key-pattern {}" , listener .getListenerUniqueId (), listener .getKeyPattern (), t );
81+ unregisterListener (listener );
7782 }
7883
7984 @ Override
8085 public void onCompleted () {
8186 requestRef .set (null );
82- listeners . remove (listener . getListenerUniqueId () );
87+ unregisterListener (listener );
8388 }
8489 });
8590
0 commit comments