Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion aevatar.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
<Project Path="agents/Aevatar.GAgents.Registry/Aevatar.GAgents.Registry.csproj" />
<Project Path="agents/Aevatar.GAgents.ConnectorCatalog/Aevatar.GAgents.ConnectorCatalog.csproj" />
<Project Path="agents/Aevatar.GAgents.RoleCatalog/Aevatar.GAgents.RoleCatalog.csproj" />
<Project Path="agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj" />
<Project Path="agents/Aevatar.GAgents.UserConfig/Aevatar.GAgents.UserConfig.csproj" />
<Project Path="agents/Aevatar.GAgents.UserMemory/Aevatar.GAgents.UserMemory.csproj" />
<Project Path="agents\Aevatar.GAgents.ChatbotClassifier\Aevatar.GAgents.ChatbotClassifier.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,15 @@ public static IServiceCollection AddStreamingProxy(
},
static context => new StreamingProxyCurrentStateRuntimeLease(context));
services.TryAddSingleton<StreamingProxyCurrentStateProjectionPort>();
services.AddCurrentStateProjectionMaterializer<
StreamingProxyCurrentStateProjectionContext,
StreamingProxyRoomCurrentStateProjector>();
services.AddCurrentStateProjectionMaterializer<
StreamingProxyCurrentStateProjectionContext,
StreamingProxyChatSessionTerminalProjector>();
services.TryAddSingleton<
IProjectionDocumentMetadataProvider<StreamingProxyRoomCurrentStateDocument>,
StreamingProxyRoomCurrentStateDocumentMetadataProvider>();
services.TryAddSingleton<
IProjectionDocumentMetadataProvider<StreamingProxyChatSessionTerminalSnapshot>,
StreamingProxyChatSessionTerminalSnapshotMetadataProvider>();
Expand Down Expand Up @@ -112,7 +118,10 @@ private static void AddTerminalSnapshotReadModelProvider(
IConfiguration? configuration)
{
if (services.Any(x => x.ServiceType == typeof(IProjectionDocumentReader<StreamingProxyChatSessionTerminalSnapshot, string>)))
{
EnsureStreamingProxyRoomCurrentStateReadModelProvider(services, configuration);
return;
}

var elasticsearchEnabled = ResolveElasticsearchDocumentEnabled(configuration);
var inMemoryEnabled = ResolveOptionalBool(
Expand All @@ -127,6 +136,7 @@ private static void AddTerminalSnapshotReadModelProvider(

if (elasticsearchEnabled)
{
AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(services, configuration);
services.AddElasticsearchDocumentProjectionStore<StreamingProxyChatSessionTerminalSnapshot, string>(
optionsFactory: _ => BuildElasticsearchDocumentOptions(configuration!),
metadataFactory: sp => sp
Expand All @@ -137,12 +147,50 @@ private static void AddTerminalSnapshotReadModelProvider(
return;
}

AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(services);
services.AddInMemoryDocumentProjectionStore<StreamingProxyChatSessionTerminalSnapshot, string>(
keySelector: readModel => readModel.Id,
keyFormatter: key => key,
defaultSortSelector: readModel => readModel.UpdatedAt.ToDateTimeOffset());
}

private static void EnsureStreamingProxyRoomCurrentStateReadModelProvider(
IServiceCollection services,
IConfiguration? configuration)
{
if (services.Any(x => x.ServiceType == typeof(IProjectionDocumentReader<StreamingProxyRoomCurrentStateDocument, string>)))
return;

if (ResolveElasticsearchDocumentEnabled(configuration))
{
AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(services, configuration);
return;
}

AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(services);
}

private static void AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(
IServiceCollection services,
IConfiguration? configuration)
{
services.AddElasticsearchDocumentProjectionStore<StreamingProxyRoomCurrentStateDocument, string>(
optionsFactory: _ => BuildElasticsearchDocumentOptions(configuration!),
metadataFactory: sp => sp
.GetRequiredService<IProjectionDocumentMetadataProvider<StreamingProxyRoomCurrentStateDocument>>()
.Metadata,
keySelector: readModel => readModel.ActorId,
keyFormatter: key => key);
}

private static void AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(IServiceCollection services)
{
services.AddInMemoryDocumentProjectionStore<StreamingProxyRoomCurrentStateDocument, string>(
keySelector: readModel => readModel.ActorId,
keyFormatter: key => key,
defaultSortSelector: readModel => readModel.UpdatedAt.ToDateTimeOffset());
}

private static bool ResolveElasticsearchDocumentEnabled(IConfiguration? configuration)
{
if (configuration == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ private static async Task<IResult> HandleDeleteRoomAsync(
string roomId,
[FromServices] IGAgentActorRegistryCommandPort registryCommandPort,
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] IStreamingProxyParticipantStore participantStore,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
{
Expand Down Expand Up @@ -155,15 +154,6 @@ await registryCommandPort.UnregisterActorAsync(
new { error = "Failed to delete room" },
statusCode: StatusCodes.Status503ServiceUnavailable);
}
try
{
await participantStore.RemoveRoomAsync(roomId, ct);
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to remove participants for room {RoomId}", roomId);
}
return Results.Ok();
}

Expand All @@ -179,7 +169,7 @@ private static async Task HandleChatAsync(
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] ICommandInteractionService<StreamingProxyRoomChatCommand, StreamingProxyRoomChatAcceptedReceipt, StreamingProxyRoomChatStartError, StreamingProxyRoomSessionEnvelope, StreamingProxyProjectionCompletionStatus> interactionService,
[FromServices] StreamingProxyChatDurableCompletionResolver durableCompletionResolver,
[FromServices] IStreamingProxyParticipantStore participantStore,
[FromServices] IStreamingProxyParticipantQueryPort participantQueryPort,
[FromServices] StreamingProxyNyxParticipantCoordinator participantCoordinator,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
Expand Down Expand Up @@ -240,7 +230,7 @@ private static async Task HandleChatAsync(
scopeId,
roomId,
actor,
participantStore,
participantQueryPort,
accessToken,
token,
preferredRoute,
Expand All @@ -249,21 +239,12 @@ private static async Task HandleChatAsync(
if (participants.Count == 0 || string.IsNullOrWhiteSpace(accessToken))
return;

var terminalState = DetermineParticipantTerminalState(await participantCoordinator.GenerateRepliesAsync(
await participantCoordinator.RequestDiscussionAsync(
participants,
actor,
prompt,
sessionId,
accessToken,
token,
participantStore,
roomId));
await PublishTerminalStateAsync(
actorDispatchPort,
actor.Id,
sessionId,
terminalState.Status,
terminalState.ErrorMessage,
token);
},
ct);
Expand Down Expand Up @@ -456,7 +437,7 @@ private static async Task<IResult> HandleListParticipantsAsync(
string scopeId,
string roomId,
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] IStreamingProxyParticipantStore participantStore,
[FromServices] IStreamingProxyParticipantQueryPort participantQueryPort,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
{
Expand All @@ -475,7 +456,7 @@ private static async Task<IResult> HandleListParticipantsAsync(
var logger = loggerFactory.CreateLogger("Aevatar.GAgents.StreamingProxy.Endpoints");
try
{
var participants = await participantStore.ListAsync(roomId, ct);
var participants = await participantQueryPort.ListAsync(roomId, ct);
return Results.Ok(participants);
}
catch (OperationCanceledException) { throw; }
Expand All @@ -496,7 +477,6 @@ private static async Task<IResult> HandleJoinAsync(
[FromServices] IActorRuntime actorRuntime,
[FromServices] IActorDispatchPort actorDispatchPort,
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] IStreamingProxyParticipantStore participantStore,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
{
Expand Down Expand Up @@ -537,17 +517,6 @@ private static async Task<IResult> HandleJoinAsync(
};
await DispatchRoomEnvelopeAsync(actorDispatchPort, actor.Id, envelope, ct);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Activate room projection before dispatching join events

HandleJoinAsync now only dispatches GroupChatParticipantJoinedEvent and returns, but it never ensures the room current-state projection is active first. In this module, the current-state lease is only explicitly activated from StreamingProxyRoomSessionProjectionPort.EnsureProjectionAsync (chat/subscription path), so a join on a room that has not started chat/streaming can be published before any projector subscribes; per IProjectedActor contract this can silently drop materialization, and IStreamingProxyParticipantQueryPort will then return an empty participant list until some later event replays state. This is a user-visible regression for “join then list participants” flows.

Useful? React with 👍 / 👎.


var logger = loggerFactory.CreateLogger("Aevatar.GAgents.StreamingProxy.Endpoints");
try
{
await participantStore.AddAsync(roomId, agentId, displayName, ct);
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to persist participant {AgentId} in room {RoomId}", agentId, roomId);
}

return Results.Ok(new { status = "joined", agentId });
}

Expand Down Expand Up @@ -698,12 +667,6 @@ private static Task DispatchRoomEnvelopeAsync(
return actorDispatchPort.DispatchAsync(actorId, envelope, ct);
}

private static (StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage) DetermineParticipantTerminalState(
int successfulReplies) =>
successfulReplies > 0
? (StreamingProxyChatSessionTerminalStatus.Completed, null)
: (StreamingProxyChatSessionTerminalStatus.Failed, "StreamingProxy chat completed without any participant replies.");

private static async Task TryPublishCanceledTerminalStateAsync(
IActorDispatchPort actorDispatchPort,
IActor? actor,
Expand Down
Loading
Loading