@@ -22,6 +22,7 @@ use std::{
2222
2323use anyhow:: Result ;
2424use async_trait:: async_trait;
25+ use ceramic_actor:: ActorHandle as _;
2526use ceramic_api_server:: models:: { BadRequestResponse , ErrorResponse , EventData , Peer , Peers } ;
2627use ceramic_api_server:: {
2728 models:: { self , Event } ,
@@ -37,7 +38,8 @@ use ceramic_api_server::{
3738use ceramic_core:: {
3839 ensure_multiaddr_has_p2p, Cid , EventId , Interest , Network , NodeId , PeerId , StreamId ,
3940} ;
40- use ceramic_pipeline:: EVENT_STATES_TABLE ;
41+ use ceramic_pipeline:: aggregator:: { AggregatorHandle , StreamStateMsg } ;
42+ use ceramic_pipeline:: PipelineHandle ;
4143use datafusion:: arrow:: array:: {
4244 as_dictionary_array, as_map_array, Array as _, ArrayAccessor as _, BinaryArray ,
4345} ;
@@ -386,7 +388,7 @@ pub struct Server<C, I, M, P> {
386388 marker : PhantomData < C > ,
387389 authentication : bool ,
388390
389- pipeline : Option < SessionContext > ,
391+ pipeline : Option < PipelineHandle > ,
390392}
391393
392394impl < C , I , M , P > Server < C , I , M , P >
@@ -401,7 +403,7 @@ where
401403 interest : I ,
402404 model : Arc < M > ,
403405 p2p : P ,
404- pipeline : Option < SessionContext > ,
406+ pipeline : Option < PipelineHandle > ,
405407 shutdown_signal : Shutdown ,
406408 ) -> Self {
407409 let ( tx, event_rx) = tokio:: sync:: mpsc:: channel :: < EventInsert > ( 1024 ) ;
@@ -876,130 +878,47 @@ where
876878
877879 async fn get_stream_state (
878880 & self ,
879- pipeline : & SessionContext ,
881+ aggregator : AggregatorHandle ,
880882 stream_id : StreamId ,
881883 ) -> Result < ceramic_api_server:: StreamsStreamIdGetResponse , ErrorResponse > {
882- let state_batch = pipeline
883- . table ( EVENT_STATES_TABLE )
884- . await
885- . map_err ( |err| {
886- ErrorResponse :: new ( format ! ( "{} table not found: {err}" , EVENT_STATES_TABLE ) )
887- } ) ?
888- . select ( vec ! [
889- col( "stream_cid" ) ,
890- col( "event_cid" ) ,
891- col( "dimensions" ) ,
892- col( "controller" ) ,
893- col( "data" ) ,
894- col( "index" ) ,
895- ] )
896- . map_err ( |err| ErrorResponse :: new ( format ! ( "failed to select: {err}" ) ) ) ?
897- . filter ( col ( "stream_cid" ) . eq ( lit ( stream_id. cid . to_bytes ( ) ) ) )
898- . map_err ( |err| ErrorResponse :: new ( format ! ( "failed to filter: {err}" ) ) ) ?
899- . aggregate (
900- vec ! [ col( "stream_cid" ) , col( "controller" ) ] ,
901- vec ! [
902- last_value( vec![ col( "data" ) ] )
903- . order_by( vec![ col( "index" ) . sort( true , true ) ] )
904- . build( )
905- . map_err( |err| {
906- ErrorResponse :: new( format!(
907- "failed to define last_value state query: {err}"
908- ) )
909- } ) ?
910- . alias( "data" ) ,
911- last_value( vec![ col( "event_cid" ) ] )
912- . order_by( vec![ col( "index" ) . sort( true , true ) ] )
913- . build( )
914- . map_err( |err| {
915- ErrorResponse :: new( format!(
916- "failed to define last_value event_cid query: {err}"
917- ) )
918- } ) ?
919- . alias( "event_cid" ) ,
920- last_value( vec![ col( "dimensions" ) ] )
921- . order_by( vec![ col( "index" ) . sort( true , true ) ] )
922- . build( )
923- . map_err( |err| {
924- ErrorResponse :: new( format!(
925- "failed to define last_value event_cid dimensions: {err}"
926- ) )
927- } ) ?
928- . alias( "dimensions" ) ,
929- ] ,
930- )
931- . map_err ( |err| ErrorResponse :: new ( format ! ( "failed to define window: {err}" ) ) ) ?
932- . collect ( )
884+ let state = aggregator
885+ . send ( StreamStateMsg {
886+ id : stream_id. clone ( ) ,
887+ } )
933888 . await
934- . map_err ( |err| {
935- ErrorResponse :: new ( format ! ( "failed to execute pipeline query: {err}" ) )
936- } ) ?;
937-
938- if state_batch. is_empty ( ) {
939- return Ok (
889+ . map_err ( |err| ErrorResponse :: new ( err. to_string ( ) ) ) ?
890+ . map_err ( |err| ErrorResponse :: new ( err. to_string ( ) ) ) ?;
891+ if let Some ( state) = state {
892+ Ok ( ceramic_api_server:: StreamsStreamIdGetResponse :: Success (
893+ models:: StreamState {
894+ id : state. id . to_string ( ) ,
895+ event_cid : state. event_cid . to_string ( ) ,
896+ controller : state. controller ,
897+ dimensions : serde_json:: Value :: Object (
898+ state
899+ . dimensions
900+ . into_iter ( )
901+ . map ( |( k, v) | {
902+ (
903+ k,
904+ serde_json:: Value :: String ( multibase:: encode (
905+ multibase:: Base :: Base64Url ,
906+ v,
907+ ) ) ,
908+ )
909+ } )
910+ . collect ( ) ,
911+ ) ,
912+ data : multibase:: encode ( multibase:: Base :: Base64Url , state. data ) ,
913+ } ,
914+ ) )
915+ } else {
916+ Ok (
940917 ceramic_api_server:: StreamsStreamIdGetResponse :: StreamNotFound (
941918 stream_id. to_string ( ) ,
942919 ) ,
943- ) ;
944- }
945-
946- let batch = concat_batches ( & state_batch[ 0 ] . schema ( ) , state_batch. iter ( ) )
947- . map_err ( |err| ErrorResponse :: new ( format ! ( "failed to concat batches: {err}" ) ) ) ?;
948- let data = as_binary_array (
949- batch
950- . column_by_name ( "data" )
951- . ok_or_else ( || ErrorResponse :: new ( "state column should exist" . to_string ( ) ) ) ?,
952- )
953- . map_err ( |err| ErrorResponse :: new ( format ! ( "state should be a string column: {err}" ) ) ) ?
954- . value ( 0 ) ;
955- let event_cid = as_binary_array (
956- batch
957- . column_by_name ( "event_cid" )
958- . ok_or_else ( || ErrorResponse :: new ( "event_cid column should exist" . to_string ( ) ) ) ?,
959- )
960- . map_err ( |err| ErrorResponse :: new ( format ! ( "event_cid should be a binary column: {err}" ) ) ) ?;
961- let controller = as_string_array (
962- batch
963- . column_by_name ( "controller" )
964- . ok_or_else ( || ErrorResponse :: new ( "controller column should exist" . to_string ( ) ) ) ?,
965- )
966- . map_err ( |err| {
967- ErrorResponse :: new ( format ! ( "controller should be a string column: {err}" ) )
968- } ) ?;
969- let dimensions = as_map_array (
970- batch
971- . column_by_name ( "dimensions" )
972- . ok_or_else ( || ErrorResponse :: new ( "dimensions column should exist" . to_string ( ) ) ) ?,
973- ) ;
974- let keys = as_string_array ( dimensions. keys ( ) ) . map_err ( |err| {
975- ErrorResponse :: new ( format ! ( "dimensions keys should be strings: {err}" ) )
976- } ) ?;
977- let values = as_dictionary_array :: < Int32Type > ( dimensions. values ( ) )
978- . downcast_dict :: < BinaryArray > ( )
979- . ok_or_else ( || ErrorResponse :: new ( "dimensions values should be binary" . to_string ( ) ) ) ?;
980- let mut dimensions = serde_json:: Map :: with_capacity ( keys. len ( ) ) ;
981- for i in 0 ..keys. len ( ) {
982- let key = keys. value ( i) ;
983- let value = values. value ( i) ;
984- dimensions. insert (
985- key. to_string ( ) ,
986- serde_json:: Value :: String ( multibase:: encode ( multibase:: Base :: Base64Url , value) ) ,
987- ) ;
920+ )
988921 }
989-
990- Ok ( ceramic_api_server:: StreamsStreamIdGetResponse :: Success (
991- models:: StreamState {
992- id : stream_id. to_string ( ) ,
993- event_cid : Cid :: read_bytes ( event_cid. value ( 0 ) )
994- . map_err ( |err| {
995- ErrorResponse :: new ( format ! ( "event_cid should be valid cid: {err}" ) )
996- } ) ?
997- . to_string ( ) ,
998- controller : controller. value ( 0 ) . to_string ( ) ,
999- dimensions : serde_json:: Value :: Object ( dimensions) ,
1000- data : multibase:: encode ( multibase:: Base :: Base64Url , data) ,
1001- } ,
1002- ) )
1003922 }
1004923 async fn get_peers ( & self ) -> Result < PeersGetResponse , ErrorResponse > {
1005924 let peers =
@@ -1260,8 +1179,8 @@ where
12601179 ) )
12611180 }
12621181 } ;
1263- if let Some ( pipeline ) = & self . pipeline {
1264- self . get_stream_state ( pipeline , stream_id)
1182+ if let Some ( aggregator ) = self . pipeline . as_ref ( ) . and_then ( |p| p . aggregator ( ) ) {
1183+ self . get_stream_state ( aggregator , stream_id)
12651184 . await
12661185 . or_else ( |err| {
12671186 Ok ( ceramic_api_server:: StreamsStreamIdGetResponse :: InternalServerError ( err) )
0 commit comments