diff --git a/aevatar.slnx b/aevatar.slnx index 502fd5de1..25b6e7488 100644 --- a/aevatar.slnx +++ b/aevatar.slnx @@ -5,7 +5,6 @@ - diff --git a/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs b/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs index f5f135256..2eff36b97 100644 --- a/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs @@ -66,9 +66,15 @@ public static IServiceCollection AddStreamingProxy( }, static context => new StreamingProxyCurrentStateRuntimeLease(context)); services.TryAddSingleton(); + services.AddCurrentStateProjectionMaterializer< + StreamingProxyCurrentStateProjectionContext, + StreamingProxyRoomCurrentStateProjector>(); services.AddCurrentStateProjectionMaterializer< StreamingProxyCurrentStateProjectionContext, StreamingProxyChatSessionTerminalProjector>(); + services.TryAddSingleton< + IProjectionDocumentMetadataProvider, + StreamingProxyRoomCurrentStateDocumentMetadataProvider>(); services.TryAddSingleton< IProjectionDocumentMetadataProvider, StreamingProxyChatSessionTerminalSnapshotMetadataProvider>(); @@ -112,7 +118,10 @@ private static void AddTerminalSnapshotReadModelProvider( IConfiguration? configuration) { if (services.Any(x => x.ServiceType == typeof(IProjectionDocumentReader))) + { + EnsureStreamingProxyRoomCurrentStateReadModelProvider(services, configuration); return; + } var elasticsearchEnabled = ResolveElasticsearchDocumentEnabled(configuration); var inMemoryEnabled = ResolveOptionalBool( @@ -127,6 +136,7 @@ private static void AddTerminalSnapshotReadModelProvider( if (elasticsearchEnabled) { + AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(services, configuration); services.AddElasticsearchDocumentProjectionStore( optionsFactory: _ => BuildElasticsearchDocumentOptions(configuration!), metadataFactory: sp => sp @@ -137,12 +147,50 @@ private static void AddTerminalSnapshotReadModelProvider( return; } + AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(services); services.AddInMemoryDocumentProjectionStore( keySelector: readModel => readModel.Id, keyFormatter: key => key, defaultSortSelector: readModel => readModel.UpdatedAt.ToDateTimeOffset()); } + private static void EnsureStreamingProxyRoomCurrentStateReadModelProvider( + IServiceCollection services, + IConfiguration? configuration) + { + if (services.Any(x => x.ServiceType == typeof(IProjectionDocumentReader))) + return; + + if (ResolveElasticsearchDocumentEnabled(configuration)) + { + AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(services, configuration); + return; + } + + AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(services); + } + + private static void AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider( + IServiceCollection services, + IConfiguration? configuration) + { + services.AddElasticsearchDocumentProjectionStore( + optionsFactory: _ => BuildElasticsearchDocumentOptions(configuration!), + metadataFactory: sp => sp + .GetRequiredService>() + .Metadata, + keySelector: readModel => readModel.ActorId, + keyFormatter: key => key); + } + + private static void AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(IServiceCollection services) + { + services.AddInMemoryDocumentProjectionStore( + keySelector: readModel => readModel.ActorId, + keyFormatter: key => key, + defaultSortSelector: readModel => readModel.UpdatedAt.ToDateTimeOffset()); + } + private static bool ResolveElasticsearchDocumentEnabled(IConfiguration? configuration) { if (configuration == null) diff --git a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs index 1fa89ba5c..8eb0551f3 100644 --- a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs @@ -124,7 +124,6 @@ private static async Task HandleDeleteRoomAsync( string roomId, [FromServices] IGAgentActorRegistryCommandPort registryCommandPort, [FromServices] IScopeResourceAdmissionPort admissionPort, - [FromServices] IStreamingProxyParticipantStore participantStore, [FromServices] ILoggerFactory loggerFactory, CancellationToken ct) { @@ -155,15 +154,6 @@ await registryCommandPort.UnregisterActorAsync( new { error = "Failed to delete room" }, statusCode: StatusCodes.Status503ServiceUnavailable); } - try - { - await participantStore.RemoveRoomAsync(roomId, ct); - } - catch (OperationCanceledException) { throw; } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to remove participants for room {RoomId}", roomId); - } return Results.Ok(); } @@ -179,7 +169,7 @@ private static async Task HandleChatAsync( [FromServices] IScopeResourceAdmissionPort admissionPort, [FromServices] ICommandInteractionService interactionService, [FromServices] StreamingProxyChatDurableCompletionResolver durableCompletionResolver, - [FromServices] IStreamingProxyParticipantStore participantStore, + [FromServices] IStreamingProxyParticipantQueryPort participantQueryPort, [FromServices] StreamingProxyNyxParticipantCoordinator participantCoordinator, [FromServices] ILoggerFactory loggerFactory, CancellationToken ct) @@ -240,7 +230,7 @@ private static async Task HandleChatAsync( scopeId, roomId, actor, - participantStore, + participantQueryPort, accessToken, token, preferredRoute, @@ -249,21 +239,12 @@ private static async Task HandleChatAsync( if (participants.Count == 0 || string.IsNullOrWhiteSpace(accessToken)) return; - var terminalState = DetermineParticipantTerminalState(await participantCoordinator.GenerateRepliesAsync( + await participantCoordinator.RequestDiscussionAsync( participants, actor, prompt, sessionId, accessToken, - token, - participantStore, - roomId)); - await PublishTerminalStateAsync( - actorDispatchPort, - actor.Id, - sessionId, - terminalState.Status, - terminalState.ErrorMessage, token); }, ct); @@ -456,7 +437,7 @@ private static async Task HandleListParticipantsAsync( string scopeId, string roomId, [FromServices] IScopeResourceAdmissionPort admissionPort, - [FromServices] IStreamingProxyParticipantStore participantStore, + [FromServices] IStreamingProxyParticipantQueryPort participantQueryPort, [FromServices] ILoggerFactory loggerFactory, CancellationToken ct) { @@ -475,7 +456,7 @@ private static async Task HandleListParticipantsAsync( var logger = loggerFactory.CreateLogger("Aevatar.GAgents.StreamingProxy.Endpoints"); try { - var participants = await participantStore.ListAsync(roomId, ct); + var participants = await participantQueryPort.ListAsync(roomId, ct); return Results.Ok(participants); } catch (OperationCanceledException) { throw; } @@ -496,7 +477,6 @@ private static async Task HandleJoinAsync( [FromServices] IActorRuntime actorRuntime, [FromServices] IActorDispatchPort actorDispatchPort, [FromServices] IScopeResourceAdmissionPort admissionPort, - [FromServices] IStreamingProxyParticipantStore participantStore, [FromServices] ILoggerFactory loggerFactory, CancellationToken ct) { @@ -537,17 +517,6 @@ private static async Task HandleJoinAsync( }; await DispatchRoomEnvelopeAsync(actorDispatchPort, actor.Id, envelope, ct); - var logger = loggerFactory.CreateLogger("Aevatar.GAgents.StreamingProxy.Endpoints"); - try - { - await participantStore.AddAsync(roomId, agentId, displayName, ct); - } - catch (OperationCanceledException) { throw; } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to persist participant {AgentId} in room {RoomId}", agentId, roomId); - } - return Results.Ok(new { status = "joined", agentId }); } @@ -698,12 +667,6 @@ private static Task DispatchRoomEnvelopeAsync( return actorDispatchPort.DispatchAsync(actorId, envelope, ct); } - private static (StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage) DetermineParticipantTerminalState( - int successfulReplies) => - successfulReplies > 0 - ? (StreamingProxyChatSessionTerminalStatus.Completed, null) - : (StreamingProxyChatSessionTerminalStatus.Failed, "StreamingProxy chat completed without any participant replies."); - private static async Task TryPublishCanceledTerminalStateAsync( IActorDispatchPort actorDispatchPort, IActor? actor, diff --git a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs index 9675b7211..e93981f4d 100644 --- a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs @@ -5,6 +5,7 @@ using Aevatar.Foundation.Core.EventSourcing; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace Aevatar.GAgents.StreamingProxy; @@ -97,6 +98,99 @@ public async Task HandleChatSessionTerminalStateChanged(StreamingProxyChatSessio evt.Status); } + [EventHandler] + public async Task HandleNyxDiscussionRequested(StreamingProxyNyxDiscussionRequested evt) + { + // Fix (review round 1, F1): + // Reviewer found Nyx transcript, active set, rounds, pruning, and stop decisions in coordinator locals. + // This actor now persists the discussion session and issues one participant work item at a time. + if (string.IsNullOrWhiteSpace(evt.SessionId)) + return; + + if (evt.Participants.Count == 0) + { + await PersistDomainEventAsync(BuildTerminalEvent( + evt.SessionId, + StreamingProxyChatSessionTerminalStatus.Failed, + "StreamingProxy chat completed without any participant replies.")); + return; + } + + await PersistDomainEventAsync(new StreamingProxyNyxDiscussionStarted + { + SessionId = evt.SessionId, + Prompt = evt.Prompt, + TotalRounds = evt.Participants.Count > 1 ? StreamingProxyDefaults.MaxDiscussionRounds : 1, + Participants = { evt.Participants.Select(participant => participant.Clone()) }, + }); + + await ContinueNyxDiscussionAsync(evt.SessionId, evt.AccessToken); + } + + [EventHandler] + public async Task HandleNyxParticipantReplySucceeded(StreamingProxyNyxParticipantReplySucceeded evt) + { + if (!IsExpectedNyxParticipantTurn(evt.SessionId, evt.Round, evt.ParticipantId)) + return; + + var content = evt.Content?.Trim() ?? string.Empty; + if (string.IsNullOrWhiteSpace(content)) + { + await HandleNyxParticipantReplyFailed(new StreamingProxyNyxParticipantReplyFailed + { + SessionId = evt.SessionId, + AccessToken = evt.AccessToken, + Round = evt.Round, + ParticipantId = evt.ParticipantId, + DisplayName = evt.DisplayName, + ErrorMessage = "Participant returned an empty response.", + }); + return; + } + + await PersistDomainEventAsync(new StreamingProxyNyxParticipantReplyRecorded + { + SessionId = evt.SessionId, + Round = evt.Round, + ParticipantId = evt.ParticipantId, + DisplayName = evt.DisplayName, + Content = content, + }); + + await HandleGroupChatMessage(new GroupChatMessageEvent + { + AgentId = evt.ParticipantId, + AgentName = evt.DisplayName, + Content = content, + SessionId = evt.SessionId, + }); + + await ContinueNyxDiscussionAsync(evt.SessionId, evt.AccessToken); + } + + [EventHandler] + public async Task HandleNyxParticipantReplyFailed(StreamingProxyNyxParticipantReplyFailed evt) + { + if (!IsExpectedNyxParticipantTurn(evt.SessionId, evt.Round, evt.ParticipantId)) + return; + + await PersistDomainEventAsync(new StreamingProxyNyxParticipantFailureRecorded + { + SessionId = evt.SessionId, + Round = evt.Round, + ParticipantId = evt.ParticipantId, + DisplayName = evt.DisplayName, + ErrorMessage = evt.ErrorMessage ?? string.Empty, + }); + + await HandleGroupChatParticipantLeft(new GroupChatParticipantLeftEvent + { + AgentId = evt.ParticipantId, + }); + + await ContinueNyxDiscussionAsync(evt.SessionId, evt.AccessToken); + } + /// /// Applies domain events to the sole authoritative actor state. /// Called by the event sourcing infrastructure after PersistDomainEventAsync. @@ -110,6 +204,10 @@ protected override StreamingProxyGAgentState TransitionState(StreamingProxyGAgen .On(ApplyParticipantJoined) .On(ApplyParticipantLeft) .On(ApplyTerminalStateChanged) + .On(ApplyNyxDiscussionStarted) + .On(ApplyNyxParticipantReplyRecorded) + .On(ApplyNyxParticipantFailureRecorded) + .On(ApplyNyxDiscussionRoundAdvanced) .OrCurrent(); private static StreamingProxyGAgentState ApplyRoomInitialized( @@ -200,6 +298,108 @@ private static void TrimMessages(StreamingProxyGAgentState state) } } + private async Task ContinueNyxDiscussionAsync(string sessionId, string accessToken) + { + if (!State.NyxDiscussionSessions.TryGetValue(sessionId, out var session) || + session.Status != StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusActive) + { + return; + } + + if (session.ActiveParticipants.Count == 0) + { + await CompleteNyxDiscussionAsync(session, StreamingProxyChatSessionTerminalStatus.Failed); + return; + } + + if (session.CurrentParticipantIndex >= session.ActiveParticipants.Count) + { + if (session.CurrentRoundSuccessfulReplies == 0 || + session.ActiveParticipants.Count < 2 || + session.CurrentRound >= session.TotalRounds) + { + await CompleteNyxDiscussionAsync( + session, + session.TotalSuccessfulReplies > 0 + ? StreamingProxyChatSessionTerminalStatus.Completed + : StreamingProxyChatSessionTerminalStatus.Failed); + return; + } + + await PersistDomainEventAsync(new StreamingProxyNyxDiscussionRoundAdvanced + { + SessionId = session.SessionId, + Round = session.CurrentRound + 1, + }); + await ContinueNyxDiscussionAsync(sessionId, accessToken); + return; + } + + var participant = session.ActiveParticipants[session.CurrentParticipantIndex]; + var coordinator = Services.GetRequiredService(); + await coordinator.RequestParticipantReplyAsync( + Id, + new StreamingProxyNyxParticipantWorkItem( + session.SessionId, + accessToken, + session.CurrentRound, + session.TotalRounds, + ToParticipantDefinition(participant), + session.ActiveParticipants.Select(ToParticipantDefinition).ToList(), + session.Transcript.Select(entry => (entry.Speaker, entry.Content)).ToList()) + { + Prompt = session.Prompt, + }, + CancellationToken.None); + } + + private async Task CompleteNyxDiscussionAsync( + StreamingProxyNyxDiscussionSession session, + StreamingProxyChatSessionTerminalStatus status) + { + var errorMessage = status == StreamingProxyChatSessionTerminalStatus.Failed + ? "StreamingProxy chat completed without any participant replies." + : string.Empty; + await PersistDomainEventAsync(BuildTerminalEvent(session.SessionId, status, errorMessage)); + } + + private bool IsExpectedNyxParticipantTurn(string sessionId, int round, string participantId) + { + if (!State.NyxDiscussionSessions.TryGetValue(sessionId, out var session) || + session.Status != StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusActive || + session.CurrentRound != round || + session.CurrentParticipantIndex < 0 || + session.CurrentParticipantIndex >= session.ActiveParticipants.Count) + { + return false; + } + + return string.Equals( + session.ActiveParticipants[session.CurrentParticipantIndex].ParticipantId, + participantId, + StringComparison.OrdinalIgnoreCase); + } + + private static StreamingProxyChatSessionTerminalStateChanged BuildTerminalEvent( + string sessionId, + StreamingProxyChatSessionTerminalStatus status, + string? errorMessage) => + new() + { + SessionId = sessionId, + Status = status, + TerminalAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + ErrorMessage = errorMessage ?? string.Empty, + }; + + private static StreamingProxyNyxParticipantDefinition ToParticipantDefinition( + StreamingProxyNyxParticipant participant) => + new( + participant.ParticipantId, + participant.RoutePreference, + participant.DisplayName, + participant.Model); + private static StreamingProxyGAgentState ApplyTerminalStateChanged( StreamingProxyGAgentState current, StreamingProxyChatSessionTerminalStateChanged evt) @@ -215,6 +415,113 @@ private static StreamingProxyGAgentState ApplyTerminalStateChanged( TerminalAt = evt.TerminalAt, ErrorMessage = evt.ErrorMessage ?? string.Empty, }; + if (next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var discussion)) + { + var nextDiscussion = discussion.Clone(); + nextDiscussion.Status = evt.Status == StreamingProxyChatSessionTerminalStatus.Failed + ? StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusFailed + : StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusCompleted; + nextDiscussion.TerminalErrorMessage = evt.ErrorMessage ?? string.Empty; + next.NyxDiscussionSessions[evt.SessionId] = nextDiscussion; + } + + return next; + } + + private static StreamingProxyGAgentState ApplyNyxDiscussionStarted( + StreamingProxyGAgentState current, + StreamingProxyNyxDiscussionStarted evt) + { + var next = current.Clone(); + next.NyxDiscussionSessions[evt.SessionId] = new StreamingProxyNyxDiscussionSession + { + SessionId = evt.SessionId, + Prompt = evt.Prompt, + CurrentRound = 1, + TotalRounds = Math.Max(evt.TotalRounds, 1), + CurrentParticipantIndex = 0, + CurrentRoundSuccessfulReplies = 0, + TotalSuccessfulReplies = 0, + Status = StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusActive, + ActiveParticipants = { evt.Participants.Select(participant => participant.Clone()) }, + }; + return next; + } + + private static StreamingProxyGAgentState ApplyNyxParticipantReplyRecorded( + StreamingProxyGAgentState current, + StreamingProxyNyxParticipantReplyRecorded evt) + { + var next = current.Clone(); + if (!next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var session)) + return next; + + var nextSession = session.Clone(); + nextSession.Transcript.Add(new StreamingProxyNyxTranscriptEntry + { + Speaker = evt.DisplayName, + Content = evt.Content, + }); + TrimTranscript(nextSession); + nextSession.CurrentParticipantIndex++; + nextSession.CurrentRoundSuccessfulReplies++; + nextSession.TotalSuccessfulReplies++; + next.NyxDiscussionSessions[evt.SessionId] = nextSession; + return next; + } + + private static StreamingProxyGAgentState ApplyNyxParticipantFailureRecorded( + StreamingProxyGAgentState current, + StreamingProxyNyxParticipantFailureRecorded evt) + { + var next = current.Clone(); + if (!next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var session)) + return next; + + var nextSession = session.Clone(); + for (var i = nextSession.ActiveParticipants.Count - 1; i >= 0; i--) + { + if (!string.Equals( + nextSession.ActiveParticipants[i].ParticipantId, + evt.ParticipantId, + StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + nextSession.ActiveParticipants.RemoveAt(i); + if (i < nextSession.CurrentParticipantIndex) + nextSession.CurrentParticipantIndex--; + } + + if (nextSession.CurrentParticipantIndex > nextSession.ActiveParticipants.Count) + nextSession.CurrentParticipantIndex = nextSession.ActiveParticipants.Count; + + next.NyxDiscussionSessions[evt.SessionId] = nextSession; return next; } + + private static StreamingProxyGAgentState ApplyNyxDiscussionRoundAdvanced( + StreamingProxyGAgentState current, + StreamingProxyNyxDiscussionRoundAdvanced evt) + { + var next = current.Clone(); + if (!next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var session)) + return next; + + var nextSession = session.Clone(); + nextSession.CurrentRound = evt.Round; + nextSession.CurrentParticipantIndex = 0; + nextSession.CurrentRoundSuccessfulReplies = 0; + next.NyxDiscussionSessions[evt.SessionId] = nextSession; + return next; + } + + private static void TrimTranscript(StreamingProxyNyxDiscussionSession session) + { + while (session.Transcript.Count > StreamingProxyDefaults.MaxMessages) + { + session.Transcript.RemoveAt(0); + } + } } diff --git a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs index 9ba19a685..46c30fdb4 100644 --- a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs @@ -13,7 +13,7 @@ namespace Aevatar.GAgents.StreamingProxy; -internal sealed class StreamingProxyNyxParticipantCoordinator +internal class StreamingProxyNyxParticipantCoordinator { private const string NyxIdProviderName = "nyxid"; private const string GatewaySuffix = "/api/v1/llm/gateway/v1"; @@ -46,7 +46,7 @@ public async Task> EnsureP string scopeId, string roomId, IActor actor, - IStreamingProxyParticipantStore participantStore, + IStreamingProxyParticipantQueryPort participantQueryPort, string accessToken, CancellationToken ct, string? preferredRoute = null, @@ -56,7 +56,7 @@ public async Task> EnsureP if (participants.Count == 0) return participants; - var existing = await participantStore.ListAsync(roomId, ct); + var existing = await participantQueryPort.ListAsync(roomId, ct); var existingIds = existing .Select(entry => entry.AgentId) .ToHashSet(StringComparer.OrdinalIgnoreCase); @@ -71,150 +71,106 @@ public async Task> EnsureP AgentId = participant.ParticipantId, DisplayName = participant.DisplayName, }, ct); - - await participantStore.AddAsync(roomId, participant.ParticipantId, participant.DisplayName, ct); } return participants; } - public async Task GenerateRepliesAsync( + public async Task RequestDiscussionAsync( IReadOnlyList participants, IActor actor, string prompt, string sessionId, string accessToken, - CancellationToken ct, - IStreamingProxyParticipantStore? participantStore = null, - string? roomId = null) + CancellationToken ct) { - if (participants.Count == 0) - return 0; + if (participants.Count == 0 || string.IsNullOrWhiteSpace(sessionId)) + return; + + // Fix (review round 1, F1): + // Coordinator previously owned the Nyx round loop and mutable transcript/active participant sets. + // It now only dispatches actor continuation events; StreamingProxyGAgent owns progression state. + await DispatchAsync(actor, new StreamingProxyNyxDiscussionRequested + { + SessionId = sessionId, + Prompt = prompt, + AccessToken = accessToken, + Participants = { participants.Select(ToNyxParticipant) }, + }, ct); + } + + public virtual async Task RequestParticipantReplyAsync( + string roomId, + StreamingProxyNyxParticipantWorkItem workItem, + CancellationToken ct) + { + ArgumentException.ThrowIfNullOrWhiteSpace(roomId); + ArgumentNullException.ThrowIfNull(workItem); if (!_llmProviderFactory.GetAvailableProviders().Contains(NyxIdProviderName, StringComparer.OrdinalIgnoreCase)) { - _logger.LogWarning("NyxID provider '{ProviderName}' is not registered; skip Streaming Proxy participants.", NyxIdProviderName); - return 0; + _logger.LogWarning("NyxID provider '{ProviderName}' is not registered; skip Streaming Proxy participant.", NyxIdProviderName); + await DispatchAsync(roomId, BuildReplyFailed(workItem, "NyxID provider is not registered."), ct); + return; } var provider = _llmProviderFactory.GetProvider(NyxIdProviderName); - var transcript = new List<(string Speaker, string Content)>(); - var activeParticipants = participants.ToList(); - var rounds = activeParticipants.Count > 1 ? StreamingProxyDefaults.MaxDiscussionRounds : 1; - var totalSuccessfulReplies = 0; - - for (var round = 1; round <= rounds && activeParticipants.Count > 0; round++) + try { - var successfulReplies = 0; - var failedParticipants = new HashSet(StringComparer.OrdinalIgnoreCase); - var roundParticipants = activeParticipants.ToList(); - - foreach (var participant in roundParticipants) + var request = BuildParticipantRequest( + workItem.Participant, + workItem.ActiveParticipants, + workItem.Prompt, + workItem.SessionId, + workItem.AccessToken, + workItem.Transcript, + workItem.Round, + workItem.TotalRounds); + var response = await ReadParticipantResponseAsync(provider, request, ct); + if (IsUnavailableResponse(response)) { - ct.ThrowIfCancellationRequested(); - - if (failedParticipants.Contains(participant.ParticipantId)) - continue; - - var availableParticipants = activeParticipants - .Where(candidate => !failedParticipants.Contains(candidate.ParticipantId)) - .ToList(); - - if (availableParticipants.Count == 0) - break; - - try - { - var request = BuildParticipantRequest( - participant, - availableParticipants, - prompt, - sessionId, - accessToken, - transcript, - round, - rounds); - var response = await ReadParticipantResponseAsync(provider, request, ct); - if (IsUnavailableResponse(response)) - { - failedParticipants.Add(participant.ParticipantId); - await MarkParticipantLeftAsync( - actor, - participantStore, - roomId, - participant.ParticipantId, - ct); - _logger.LogWarning( - "Streaming Proxy participant '{Participant}' returned an unavailable response for route '{RoutePreference}' in round {Round}.", - participant.DisplayName, - participant.RoutePreference, - round); - continue; - } - - var content = NormalizeParticipantReply( - participant, - availableParticipants, - response.Content); - if (string.IsNullOrWhiteSpace(content)) - { - failedParticipants.Add(participant.ParticipantId); - await MarkParticipantLeftAsync( - actor, - participantStore, - roomId, - participant.ParticipantId, - ct); - continue; - } - - transcript.Add((participant.DisplayName, content)); - successfulReplies++; - totalSuccessfulReplies++; - await DispatchAsync(actor, new GroupChatMessageEvent - { - AgentId = participant.ParticipantId, - AgentName = participant.DisplayName, - Content = content, - SessionId = sessionId, - }, ct); - } - catch (OperationCanceledException) - { - throw; - } - catch (Exception ex) - { - failedParticipants.Add(participant.ParticipantId); - await MarkParticipantLeftAsync( - actor, - participantStore, - roomId, - participant.ParticipantId, - ct); - _logger.LogWarning(ex, - "Streaming Proxy participant '{Participant}' failed for route '{RoutePreference}' in round {Round}.", - participant.DisplayName, - participant.RoutePreference, - round); - } + await DispatchAsync(roomId, BuildReplyFailed(workItem, "Participant returned an unavailable response."), ct); + _logger.LogWarning( + "Streaming Proxy participant '{Participant}' returned an unavailable response for route '{RoutePreference}' in round {Round}.", + workItem.Participant.DisplayName, + workItem.Participant.RoutePreference, + workItem.Round); + return; } - if (failedParticipants.Count > 0) + var content = NormalizeParticipantReply( + workItem.Participant, + workItem.ActiveParticipants, + response.Content); + if (string.IsNullOrWhiteSpace(content)) { - activeParticipants = activeParticipants - .Where(participant => !failedParticipants.Contains(participant.ParticipantId)) - .ToList(); + await DispatchAsync(roomId, BuildReplyFailed(workItem, "Participant returned an empty response."), ct); + return; } - if (successfulReplies == 0) - break; - - if (activeParticipants.Count < 2) - break; + await DispatchAsync(roomId, new StreamingProxyNyxParticipantReplySucceeded + { + SessionId = workItem.SessionId, + AccessToken = workItem.AccessToken, + Round = workItem.Round, + ParticipantId = workItem.Participant.ParticipantId, + DisplayName = workItem.Participant.DisplayName, + Content = content, + }, ct); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + await DispatchAsync(roomId, BuildReplyFailed(workItem, ex.Message), ct); + _logger.LogWarning(ex, + "Streaming Proxy participant '{Participant}' failed for route '{RoutePreference}' in round {Round}.", + workItem.Participant.DisplayName, + workItem.Participant.RoutePreference, + workItem.Round); } - - return totalSuccessfulReplies; } private async Task> ResolveParticipantsAsync( @@ -937,28 +893,6 @@ private static async Task ReadParticipantResponseAsync( return string.Join('\n', lines).Trim(); } - private async Task MarkParticipantLeftAsync( - IActor actor, - IStreamingProxyParticipantStore? participantStore, - string? roomId, - string participantId, - CancellationToken ct) - { - if (string.IsNullOrWhiteSpace(participantId)) - return; - - if (participantStore is not null && - !string.IsNullOrWhiteSpace(roomId)) - { - await participantStore.RemoveParticipantAsync(roomId, participantId, ct); - } - - await DispatchAsync(actor, new GroupChatParticipantLeftEvent - { - AgentId = participantId, - }, ct); - } - private static bool IsUnavailableResponse(LLMResponse response) { if (string.Equals(response.FinishReason, "error", StringComparison.OrdinalIgnoreCase) || @@ -1020,6 +954,45 @@ private async Task DispatchAsync(IActor actor, IMessage payload, CancellationTok await DispatchRoomEnvelopeAsync(actor.Id, envelope, ct); } + private Task DispatchAsync(string actorId, IMessage payload, CancellationToken ct) + { + var envelope = new EventEnvelope + { + Id = Guid.NewGuid().ToString("N"), + Timestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + Payload = Any.Pack(payload), + Route = new EnvelopeRoute + { + Direct = new DirectRoute { TargetActorId = actorId }, + }, + }; + + return DispatchRoomEnvelopeAsync(actorId, envelope, ct); + } + + private static StreamingProxyNyxParticipant ToNyxParticipant( + StreamingProxyNyxParticipantDefinition participant) => + new() + { + ParticipantId = participant.ParticipantId, + RoutePreference = participant.RoutePreference, + DisplayName = participant.DisplayName, + Model = participant.Model ?? string.Empty, + }; + + private static StreamingProxyNyxParticipantReplyFailed BuildReplyFailed( + StreamingProxyNyxParticipantWorkItem workItem, + string? errorMessage) => + new() + { + SessionId = workItem.SessionId, + AccessToken = workItem.AccessToken, + Round = workItem.Round, + ParticipantId = workItem.Participant.ParticipantId, + DisplayName = workItem.Participant.DisplayName, + ErrorMessage = errorMessage ?? string.Empty, + }; + private Task DispatchRoomEnvelopeAsync( string actorId, EventEnvelope envelope, @@ -1038,6 +1011,18 @@ internal sealed record StreamingProxyNyxParticipantDefinition( string DisplayName, string? Model); +internal sealed record StreamingProxyNyxParticipantWorkItem( + string SessionId, + string AccessToken, + int Round, + int TotalRounds, + StreamingProxyNyxParticipantDefinition Participant, + IReadOnlyList ActiveParticipants, + IReadOnlyList<(string Speaker, string Content)> Transcript) +{ + public string Prompt { get; init; } = string.Empty; +} + internal sealed record StreamingProxyNyxParticipantCandidate( StreamingProxyNyxProviderStatus Provider, string ParticipantId, diff --git a/src/Aevatar.Studio.Projection/ReadModels/StreamingProxyParticipantCurrentStateDocument.Partial.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocument.Partial.cs similarity index 65% rename from src/Aevatar.Studio.Projection/ReadModels/StreamingProxyParticipantCurrentStateDocument.Partial.cs rename to agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocument.Partial.cs index 66e116ef0..83495ec2b 100644 --- a/src/Aevatar.Studio.Projection/ReadModels/StreamingProxyParticipantCurrentStateDocument.Partial.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocument.Partial.cs @@ -1,9 +1,9 @@ using Aevatar.CQRS.Projection.Stores.Abstractions; -namespace Aevatar.Studio.Projection.ReadModels; +namespace Aevatar.GAgents.StreamingProxy; -public sealed partial class StreamingProxyParticipantCurrentStateDocument - : IProjectionReadModel +public sealed partial class StreamingProxyRoomCurrentStateDocument + : IProjectionReadModel { string IProjectionReadModel.ActorId => ActorId; diff --git a/src/Aevatar.Studio.Projection/Metadata/StreamingProxyParticipantCurrentStateDocumentMetadataProvider.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocumentMetadataProvider.cs similarity index 55% rename from src/Aevatar.Studio.Projection/Metadata/StreamingProxyParticipantCurrentStateDocumentMetadataProvider.cs rename to agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocumentMetadataProvider.cs index 49900f975..e21d4c008 100644 --- a/src/Aevatar.Studio.Projection/Metadata/StreamingProxyParticipantCurrentStateDocumentMetadataProvider.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocumentMetadataProvider.cs @@ -1,13 +1,12 @@ using Aevatar.CQRS.Projection.Stores.Abstractions; -using Aevatar.Studio.Projection.ReadModels; -namespace Aevatar.Studio.Projection.Metadata; +namespace Aevatar.GAgents.StreamingProxy; -public sealed class StreamingProxyParticipantCurrentStateDocumentMetadataProvider - : IProjectionDocumentMetadataProvider +public sealed class StreamingProxyRoomCurrentStateDocumentMetadataProvider + : IProjectionDocumentMetadataProvider { public DocumentIndexMetadata Metadata { get; } = new( - IndexName: "studio-streaming-proxy-participant", + IndexName: "studio-streaming-proxy-room", Mappings: new Dictionary(StringComparer.Ordinal) { ["dynamic"] = true, diff --git a/src/Aevatar.Studio.Projection/Projectors/StreamingProxyParticipantCurrentStateProjector.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateProjector.cs similarity index 62% rename from src/Aevatar.Studio.Projection/Projectors/StreamingProxyParticipantCurrentStateProjector.cs rename to agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateProjector.cs index 88181d4ac..b27e577ab 100644 --- a/src/Aevatar.Studio.Projection/Projectors/StreamingProxyParticipantCurrentStateProjector.cs +++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateProjector.cs @@ -2,25 +2,22 @@ using Aevatar.CQRS.Projection.Core.Orchestration; using Aevatar.CQRS.Projection.Runtime.Abstractions; using Aevatar.Foundation.Abstractions; -using Aevatar.GAgents.StreamingProxyParticipant; -using Aevatar.Studio.Projection.Orchestration; -using Aevatar.Studio.Projection.ReadModels; using Google.Protobuf.WellKnownTypes; -namespace Aevatar.Studio.Projection.Projectors; +namespace Aevatar.GAgents.StreamingProxy; /// -/// Materializes committed events into -/// in the projection document store. +/// Materializes committed events into +/// in the projection document store. /// -public sealed class StreamingProxyParticipantCurrentStateProjector - : ICurrentStateProjectionMaterializer +public sealed class StreamingProxyRoomCurrentStateProjector + : ICurrentStateProjectionMaterializer { - private readonly IProjectionWriteDispatcher _writeDispatcher; + private readonly IProjectionWriteDispatcher _writeDispatcher; private readonly IProjectionClock _clock; - public StreamingProxyParticipantCurrentStateProjector( - IProjectionWriteDispatcher writeDispatcher, + public StreamingProxyRoomCurrentStateProjector( + IProjectionWriteDispatcher writeDispatcher, IProjectionClock clock) { _writeDispatcher = writeDispatcher ?? throw new ArgumentNullException(nameof(writeDispatcher)); @@ -28,14 +25,14 @@ public StreamingProxyParticipantCurrentStateProjector( } public async ValueTask ProjectAsync( - StudioMaterializationContext context, + StreamingProxyCurrentStateProjectionContext context, EventEnvelope envelope, CancellationToken ct = default) { ArgumentNullException.ThrowIfNull(context); ArgumentNullException.ThrowIfNull(envelope); - if (!CommittedStateEventEnvelope.TryUnpackState( + if (!CommittedStateEventEnvelope.TryUnpackState( envelope, out _, out var stateEvent, @@ -48,7 +45,7 @@ public async ValueTask ProjectAsync( var updatedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow); - var document = new StreamingProxyParticipantCurrentStateDocument + var document = new StreamingProxyRoomCurrentStateDocument { Id = context.RootActorId, ActorId = context.RootActorId, diff --git a/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto b/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto index fdf53456d..94822a5a8 100644 --- a/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto +++ b/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto @@ -3,6 +3,7 @@ package aevatar.gagents.streamingproxy; option csharp_namespace = "Aevatar.GAgents.StreamingProxy"; import "google/protobuf/timestamp.proto"; +import "google/protobuf/any.proto"; import "agent_messages.proto"; // ─── State ─── @@ -28,6 +29,7 @@ message StreamingProxyGAgentState { repeated StreamingProxyChatMessage messages = 3; int64 next_sequence = 4; map terminal_sessions = 5; + map nyx_discussion_sessions = 6; } message StreamingProxyChatSessionTerminalRecord { @@ -37,6 +39,39 @@ message StreamingProxyChatSessionTerminalRecord { string error_message = 4; } +enum StreamingProxyNyxDiscussionStatus { + NYX_DISCUSSION_STATUS_UNSPECIFIED = 0; + NYX_DISCUSSION_STATUS_ACTIVE = 1; + NYX_DISCUSSION_STATUS_COMPLETED = 2; + NYX_DISCUSSION_STATUS_FAILED = 3; +} + +message StreamingProxyNyxParticipant { + string participant_id = 1; + string route_preference = 2; + string display_name = 3; + string model = 4; +} + +message StreamingProxyNyxTranscriptEntry { + string speaker = 1; + string content = 2; +} + +message StreamingProxyNyxDiscussionSession { + string session_id = 1; + string prompt = 2; + int32 current_round = 3; + int32 total_rounds = 4; + int32 current_participant_index = 5; + int32 current_round_successful_replies = 6; + int32 total_successful_replies = 7; + StreamingProxyNyxDiscussionStatus status = 8; + string terminal_error_message = 9; + repeated StreamingProxyNyxParticipant active_participants = 10; + repeated StreamingProxyNyxTranscriptEntry transcript = 11; +} + // ─── Events ─── enum StreamingProxyChatSessionTerminalStatus { @@ -77,6 +112,59 @@ message StreamingProxyChatSessionTerminalStateChanged { string error_message = 4; } +message StreamingProxyNyxDiscussionRequested { + string session_id = 1; + string prompt = 2; + string access_token = 3; + repeated StreamingProxyNyxParticipant participants = 4; +} + +message StreamingProxyNyxDiscussionStarted { + string session_id = 1; + string prompt = 2; + repeated StreamingProxyNyxParticipant participants = 3; + int32 total_rounds = 4; +} + +message StreamingProxyNyxParticipantReplySucceeded { + string session_id = 1; + string access_token = 2; + int32 round = 3; + string participant_id = 4; + string display_name = 5; + string content = 6; +} + +message StreamingProxyNyxParticipantReplyFailed { + string session_id = 1; + string access_token = 2; + int32 round = 3; + string participant_id = 4; + string display_name = 5; + string error_message = 6; +} + +message StreamingProxyNyxParticipantReplyRecorded { + string session_id = 1; + int32 round = 2; + string participant_id = 3; + string display_name = 4; + string content = 5; +} + +message StreamingProxyNyxParticipantFailureRecorded { + string session_id = 1; + int32 round = 2; + string participant_id = 3; + string display_name = 4; + string error_message = 5; +} + +message StreamingProxyNyxDiscussionRoundAdvanced { + string session_id = 1; + int32 round = 2; +} + message StreamingProxyRoomSessionEnvelope { aevatar.EventEnvelope envelope = 1; } @@ -93,3 +181,12 @@ message StreamingProxyChatSessionTerminalSnapshot { google.protobuf.Timestamp terminal_at = 9; string error_message = 10; } + +message StreamingProxyRoomCurrentStateDocument { + string id = 1; + string actor_id = 2; + int64 state_version = 3; + string last_event_id = 4; + google.protobuf.Timestamp updated_at = 5; + google.protobuf.Any state_root = 10; +} diff --git a/agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj b/agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj deleted file mode 100644 index 0a8c06885..000000000 --- a/agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj +++ /dev/null @@ -1,24 +0,0 @@ - - - net10.0 - enable - enable - Aevatar.GAgents.StreamingProxyParticipant - Aevatar.GAgents.StreamingProxyParticipant - - - - - - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - diff --git a/agents/Aevatar.GAgents.StreamingProxyParticipant/StreamingProxyParticipantGAgent.cs b/agents/Aevatar.GAgents.StreamingProxyParticipant/StreamingProxyParticipantGAgent.cs deleted file mode 100644 index e485614b7..000000000 --- a/agents/Aevatar.GAgents.StreamingProxyParticipant/StreamingProxyParticipantGAgent.cs +++ /dev/null @@ -1,128 +0,0 @@ -using Aevatar.Foundation.Abstractions; -using Aevatar.Foundation.Abstractions.Attributes; -using Aevatar.Foundation.Core; -using Aevatar.Foundation.Core.EventSourcing; -using Google.Protobuf; - -namespace Aevatar.GAgents.StreamingProxyParticipant; - -/// -/// Singleton actor that tracks streaming proxy room participants. -/// Replaces the chrono-storage backed ChronoStorageStreamingProxyParticipantStore. -/// -/// Actor ID: streaming-proxy-participants (cluster-scoped singleton). -/// -public sealed class StreamingProxyParticipantGAgent - : GAgentBase, IProjectedActor -{ - public static string ProjectionKind => "streaming-proxy-participant"; - - - [EventHandler(EndpointName = "addParticipant")] - public async Task HandleParticipantAdded(ParticipantAddedEvent evt) - { - if (string.IsNullOrWhiteSpace(evt.RoomId) || string.IsNullOrWhiteSpace(evt.AgentId)) - return; - - await PersistDomainEventAsync(evt); - } - - [EventHandler(EndpointName = "removeRoomParticipants")] - public async Task HandleRoomParticipantsRemoved(RoomParticipantsRemovedEvent evt) - { - if (string.IsNullOrWhiteSpace(evt.RoomId)) - return; - - // Idempotent: skip if room does not exist - if (!State.Rooms.ContainsKey(evt.RoomId)) - return; - - await PersistDomainEventAsync(evt); - } - - [EventHandler(EndpointName = "removeParticipant")] - public async Task HandleParticipantRemoved(ParticipantRemovedEvent evt) - { - if (string.IsNullOrWhiteSpace(evt.RoomId) || string.IsNullOrWhiteSpace(evt.AgentId)) - return; - - if (!State.Rooms.TryGetValue(evt.RoomId, out var list) || - !list.Participants.Any(p => string.Equals(p.AgentId, evt.AgentId, StringComparison.Ordinal))) - { - return; - } - - await PersistDomainEventAsync(evt); - } - - protected override async Task OnActivateAsync(CancellationToken ct) - { - await base.OnActivateAsync(ct); - } - - protected override StreamingProxyParticipantGAgentState TransitionState( - StreamingProxyParticipantGAgentState current, IMessage evt) - { - return StateTransitionMatcher - .Match(current, evt) - .On(ApplyParticipantAdded) - .On(ApplyParticipantRemoved) - .On(ApplyRoomRemoved) - .OrCurrent(); - } - - private static StreamingProxyParticipantGAgentState ApplyParticipantAdded( - StreamingProxyParticipantGAgentState state, ParticipantAddedEvent evt) - { - var next = state.Clone(); - - if (!next.Rooms.TryGetValue(evt.RoomId, out var list)) - { - list = new ParticipantList(); - next.Rooms[evt.RoomId] = list; - } - - // Remove existing entry for the same agent (upsert semantics) - var existing = list.Participants.FirstOrDefault(p => - string.Equals(p.AgentId, evt.AgentId, StringComparison.Ordinal)); - if (existing is not null) - list.Participants.Remove(existing); - - list.Participants.Add(new ParticipantEntry - { - AgentId = evt.AgentId, - DisplayName = evt.DisplayName, - JoinedAt = evt.JoinedAt, - }); - - return next; - } - - private static StreamingProxyParticipantGAgentState ApplyParticipantRemoved( - StreamingProxyParticipantGAgentState state, ParticipantRemovedEvent evt) - { - var next = state.Clone(); - if (!next.Rooms.TryGetValue(evt.RoomId, out var list)) - return next; - - for (var index = list.Participants.Count - 1; index >= 0; index--) - { - if (string.Equals(list.Participants[index].AgentId, evt.AgentId, StringComparison.Ordinal)) - list.Participants.RemoveAt(index); - } - - if (list.Participants.Count == 0) - next.Rooms.Remove(evt.RoomId); - - return next; - } - - private static StreamingProxyParticipantGAgentState ApplyRoomRemoved( - StreamingProxyParticipantGAgentState state, RoomParticipantsRemovedEvent evt) - { - var next = state.Clone(); - next.Rooms.Remove(evt.RoomId); - return next; - } - -} diff --git a/agents/Aevatar.GAgents.StreamingProxyParticipant/streaming_proxy_participant_messages.proto b/agents/Aevatar.GAgents.StreamingProxyParticipant/streaming_proxy_participant_messages.proto deleted file mode 100644 index 46ae2b43c..000000000 --- a/agents/Aevatar.GAgents.StreamingProxyParticipant/streaming_proxy_participant_messages.proto +++ /dev/null @@ -1,40 +0,0 @@ -syntax = "proto3"; -package aevatar.gagents.streaming_proxy_participant; -option csharp_namespace = "Aevatar.GAgents.StreamingProxyParticipant"; - -import "google/protobuf/timestamp.proto"; - -// ─── State ─── - -message ParticipantEntry { - string agent_id = 1; - string display_name = 2; - google.protobuf.Timestamp joined_at = 3; -} - -message ParticipantList { - repeated ParticipantEntry participants = 1; -} - -message StreamingProxyParticipantGAgentState { - // roomId → participants - map rooms = 1; -} - -// ─── Events ─── - -message ParticipantAddedEvent { - string room_id = 1; - string agent_id = 2; - string display_name = 3; - google.protobuf.Timestamp joined_at = 4; -} - -message ParticipantRemovedEvent { - string room_id = 1; - string agent_id = 2; -} - -message RoomParticipantsRemovedEvent { - string room_id = 1; -} diff --git a/docs/2026-04-02-streaming-proxy-flow.md b/docs/2026-04-02-streaming-proxy-flow.md index f2727ad52..2f1b28a03 100644 --- a/docs/2026-04-02-streaming-proxy-flow.md +++ b/docs/2026-04-02-streaming-proxy-flow.md @@ -18,8 +18,8 @@ | `StreamingProxyEndpoints` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs` | 提供 room CRUD、`:chat`、`messages`、`messages:stream`、participant 管理 HTTP/SSE 入口 | | `StreamingProxyGAgent` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs` | 房间 actor,本质上是 group chat broker;持久化事件、更新房间内消息/参与者状态、向订阅者发布事件 | | `IGAgentActorRegistryCommandPort` / `IGAgentActorRegistryQueryPort` / `IScopeResourceAdmissionPort` | `src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentRegistryPorts.cs` | room ownership 的写入、列表查询与 command admission 边界 | -| `IStreamingProxyParticipantStore` | `src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs` | room participant 的持久化索引,供 participant 查询、自动加入与失败移除时使用 | -| `StreamingProxyNyxParticipantCoordinator` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs` | 在带 Bearer Token 时发现 Nyx 可用 provider,把它们自动加入房间并生成多轮回复 | +| `IStreamingProxyParticipantQueryPort` | `src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs` | room participant 查询入口,读取 `StreamingProxyGAgent` 当前态 readmodel | +| `StreamingProxyNyxParticipantCoordinator` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs` | 在带 Bearer Token 时发现 Nyx 可用 provider,并按 room actor 发出的单个 participant work item 执行 Nyx/LLM I/O | | `StreamingProxySseWriter` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxySseWriter.cs` | 把 actor 事件映射成 SSE frame 输出给客户端 | ## 2. 总体拓扑 @@ -29,7 +29,7 @@ flowchart TB CL["Client / OpenClaw"] --> API["StreamingProxyEndpoints\n/api/scopes/{scopeId}/streaming-proxy/..."] API --> REG["GAgent registry ports\ncommand / query / admission"] - API --> PSTORE["IStreamingProxyParticipantStore\nparticipant index"] + API --> PQUERY["IStreamingProxyParticipantQueryPort\nroom current-state readmodel"] API --> RT["IActorRuntime"] RT --> ACT["StreamingProxyGAgent\nroom actor"] API --> SUB["IActorEventSubscriptionProvider"] @@ -134,17 +134,21 @@ sequenceDiagram SUB->>SSE: "WriteParticipantJoinedAsync" SSE-->>CL: "PARTICIPANT_JOINED" - API->>NYX: "GenerateRepliesAsync(...)" - loop "每个 round / participant" - NYX->>LLM: "provider.ChatAsync(request)" + API->>NYX: "RequestDiscussionAsync(...)" + NYX->>ACT: "Dispatch StreamingProxyNyxDiscussionRequested" + loop "actor-owned round / participant continuation" + ACT->>ACT: "persist transcript / active participants / round cursor" + ACT->>NYX: "RequestParticipantReplyAsync(work item)" + NYX->>LLM: "provider.ChatStreamAsync(request)" LLM-->>NYX: "reply" - NYX->>ACT: "Dispatch GroupChatMessageEvent" - ACT->>ACT: "PersistDomainEventAsync" + NYX->>ACT: "Dispatch Nyx reply succeeded/failed" + ACT->>ACT: "record reply or prune failed participant" + ACT->>ACT: "Persist GroupChatMessageEvent or ParticipantLeftEvent" ACT-->>SUB: "Publish GroupChatMessageEvent" SUB->>SSE: "WriteAgentMessageAsync" SSE-->>CL: "AGENT_MESSAGE" end - API->>SSE: "WriteRunFinishedAsync" + ACT->>ACT: "Persist terminal session state" SSE-->>CL: "RUN_FINISHED" else "没有 Token,或没有可用 Nyx participants" note over API: "进入 activityChannel + timeout 等待模式" @@ -194,37 +198,37 @@ flowchart LR `EnsureParticipantsJoinedAsync(...)` 的行为是: -1. 先从 `StreamingProxyActorStore` 读取已有 participant。 +1. 先从 `IStreamingProxyParticipantQueryPort` 读取 room actor 当前态 readmodel 中已有 participant。 2. 对新增 participant 投递 `GroupChatParticipantJoinedEvent` 给 room actor。 -3. 同时更新 `StreamingProxyActorStore` 的 participant 查询索引。 -所以当前实现里,participant 有两份表现层状态: - -1. actor 内 `_proxyState.Participants` -2. `StreamingProxyActorStore` 里的 query list +participant membership 的唯一写侧事实在 `StreamingProxyGAgentState.Participants`;查询只读取该状态的投影副本。 ### 6.2 自动回复生成 -`GenerateRepliesAsync(...)` 的行为是: +Nyx 自动回复的推进由 `StreamingProxyGAgent` 状态和事件处理驱动;`StreamingProxyNyxParticipantCoordinator` 只做 provider 发现和单次 participant LLM I/O。 -1. 按当前 active participants 决定总轮次。 +1. room actor 持有 `StreamingProxyNyxDiscussionSession`: + - active participant 集合 + - 当前 round / participant cursor + - transcript + - 成功回复计数与 terminal 状态 +2. actor 按当前 active participants 决定总轮次。 - 多 participant 时最多 `4` 轮。 - 单 participant 时只跑 `1` 轮。 -2. 为每个 participant 构造一次 LLM request: +3. actor 为当前 participant 发出单个 work item,coordinator 构造一次 LLM request: - system prompt 明确它是 room 内 participant - user prompt 带原始 topic 和最近 transcript - metadata 带 `NyxIdAccessToken` 和 `NyxIdRoutePreference` -3. 调 `NyxID provider.ChatAsync(...)` 拿回复。 -4. 对回复做规范化,去掉 speaker label,避免串写别人的回复。 -5. 把回复重新封装为 `GroupChatMessageEvent` 打回 room actor。 +4. coordinator 调 `NyxID provider.ChatStreamAsync(...)` 拿回复,并把结果作为 `StreamingProxyNyxParticipantReplySucceeded/Failed` continuation 打回 room actor。 +5. actor 在事件处理里记录 transcript、裁剪失败 participant、推进 round,并把成功回复持久化为 `GroupChatMessageEvent`。 注意这里的房间推进不是“直接把 LLM 文本写 SSE”,而是: 1. `LLM reply` -2. 转成 `GroupChatMessageEvent` +2. 转成 Nyx reply continuation 3. 回到 `StreamingProxyGAgent` -4. actor 再发布事件 -5. SSE 订阅端再收到 `AGENT_MESSAGE` +4. actor 持久化 `GroupChatMessageEvent` 或 `GroupChatParticipantLeftEvent` +5. SSE 订阅端再收到 `AGENT_MESSAGE` / `PARTICIPANT_LEFT` 也就是说,SSE 输出仍然以 actor 事件流为单一来源。 @@ -244,15 +248,13 @@ flowchart LR sequenceDiagram participant EXT as "External Participant" participant API as "StreamingProxyEndpoints" - participant STORE as "StreamingProxyActorStore" participant ACT as "StreamingProxyGAgent" participant SUB as "Actor Event Subscription" participant SSE as "StreamingProxySseWriter" participant CL as "Client" EXT->>API: "POST /participants" - API->>ACT: "HandleEventAsync(GroupChatParticipantJoinedEvent)" - API->>STORE: "AddParticipant(...)" + API->>ACT: "Dispatch GroupChatParticipantJoinedEvent" ACT-->>SUB: "Publish joined event" SUB->>SSE: "WriteParticipantJoinedAsync" SSE-->>CL: "PARTICIPANT_JOINED" @@ -304,18 +306,18 @@ sequenceDiagram 状态分别落在三处: -1. `StreamingProxyActorStore` - - room 列表 - - participant 查询索引 -2. `StreamingProxyGAgent` 事件与 `_proxyState` +1. registry actor state + - room ownership +2. `StreamingProxyGAgent` 事件与状态 - 房间名 - participant 集合 - message transcript - `next_sequence` -3. SSE 订阅流 +3. Projection readmodel / SSE 订阅流 + - participant 查询读取 `StreamingProxyRoomCurrentStateDocument` - 只承载实时输出,不承载查询事实 -如果只看当前代码,真正驱动前端实时展示的是 actor 发布的 `GroupChat*` 事件,不是 `StreamingProxyActorStore`。 +如果只看当前代码,真正驱动前端实时展示的是 actor 发布的 `GroupChat*` 事件;participant 查询读取同一 room actor 当前态的投影副本。 ## 11. 关键代码锚点 @@ -323,6 +325,5 @@ sequenceDiagram 2. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs` 3. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs` 4. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs` -5. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyActorStore.cs` -6. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxySseWriter.cs` -7. `agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto` +5. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxySseWriter.cs` +6. `agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto` diff --git a/docs/2026-05-14-gagent-types-analysis.md b/docs/2026-05-14-gagent-types-analysis.md index e2b76eba7..002f7705d 100644 --- a/docs/2026-05-14-gagent-types-analysis.md +++ b/docs/2026-05-14-gagent-types-analysis.md @@ -98,7 +98,6 @@ GAgentBase ← 无状态底座 | `ConnectorCatalogGAgent` | `GAgentBase` | `ConnectorCatalogState` | Connector 目录 | | `DeviceRegistrationGAgent` | `GAgentBase` | `DeviceRegistrationState` | 设备注册 | | `StreamingProxyGAgent` | `GAgentBase` | `StreamingProxyGAgentState` | 流式代理房间 | -| `StreamingProxyParticipantGAgent` | `GAgentBase` | `StreamingProxyParticipantGAgentState` | 流式代理参与者 | | `ConversationGAgent` | `GAgentBase` | `ConversationGAgentState` | 渠道对话(Lark/Telegram) | | `ChannelBotRegistrationGAgent` | `GAgentBase` | `ChannelBotRegistrationStoreState` | 渠道 Bot 注册 | | `ChannelUserBindingGAgent` | `GAgentBase` | `ChannelUserBindingState` | 渠道用户绑定 | @@ -131,7 +130,7 @@ GAgentBase ← 无状态底座 | `RoleCatalogGAgent` | role_name → role spec | | `ConnectorCatalogGAgent` | connector_name → connector config | | `UserAgentCatalogGAgent` | user + agent_id → catalog entry | -| `StreamingProxyParticipantGAgent` | room_id → participant set | +| `StreamingProxyGAgent` | room_id → participant set + messages | | `ChatHistoryIndexGAgent` | user → conversation list | **共性模式**:`Upsert → Tombstone → Compact`,状态内维护 `last_applied_event_version`,projection 直出 current-state read model。 @@ -314,7 +313,7 @@ abstract class RunGAgentBase : GAgentBase #### 4.1.2 `CatalogGAgentBase` — 目录索引基类 -**适用对象**:`ScriptCatalogGAgent`、`GAgentRegistryGAgent`、`RoleCatalogGAgent`、`ConnectorCatalogGAgent`、`UserAgentCatalogGAgent`、`StreamingProxyParticipantGAgent`、`ChatHistoryIndexGAgent` +**适用对象**:`ScriptCatalogGAgent`、`GAgentRegistryGAgent`、`RoleCatalogGAgent`、`ConnectorCatalogGAgent`、`UserAgentCatalogGAgent`、`ChatHistoryIndexGAgent` **可抽取的通用能力**: ``` diff --git a/docs/canon/aevatar-channel-architecture.md b/docs/canon/aevatar-channel-architecture.md index dc47ceb6e..f6c897399 100644 --- a/docs/canon/aevatar-channel-architecture.md +++ b/docs/canon/aevatar-channel-architecture.md @@ -1092,7 +1092,7 @@ adapter 在构造 `ChatActivity` 时**必须**: - `Aevatar.GAgents.ChatHistory` —— 独立 GAgent(`ChatConversationGAgent` / `ChatHistoryIndexGAgent`)。**但当前 ChannelRuntime 未集成它**;对话历史实际上是 `AIGAgentBase` 里的进程内 `ChatHistory`(见 `src/Aevatar.AI.Core/AIGAgentBase.cs`)。`ConversationGAgent` 要集成它是**新工作**,不是"复用现有集成" - `Aevatar.GAgents.UserMemory` —— 同样独立 GAgent 存在,但当前无集成。`ConversationGAgent` 的 long-term memory 集成是新工作 - `Aevatar.GAgents.ChatbotClassifier` —— 按需包成 `ClassificationMiddleware` -- `Aevatar.GAgents.StreamingProxy` / `StreamingProxyParticipant` —— LLM streaming 底层可复用 +- `Aevatar.GAgents.StreamingProxy` —— LLM streaming 底层可复用 - `Aevatar.GAgents.Registry` —— 平台级 GAgent registry,和拟改名的 `UserAgentCatalog` 各司其职(platform actor routing vs user agent metadata) **诚实承认**:早期 RFC 版本措辞是 "必须调用 ChatHistory / UserMemory,不重复存"——暗示已有集成。实际上这些是**未来集成目标**,不是现状复用。本 RFC 实施时需要把这层集成**新建**出来,不要误以为是捡现成。 @@ -1513,7 +1513,6 @@ agents/ ← production code ├── Aevatar.GAgents.Registry/ ← 平台级 registry ├── Aevatar.GAgents.RoleCatalog/ ├── Aevatar.GAgents.StreamingProxy/ -├── Aevatar.GAgents.StreamingProxyParticipant/ ├── Aevatar.GAgents.UserConfig/ └── Aevatar.GAgents.UserMemory/ ← ConversationGAgent 与其集成 @@ -2420,7 +2419,7 @@ public interface ICredentialProvider { ### 17.5 Orleans grain-based cluster-singleton primitive(P2 — 第二 long-conn 场景触发) -**缺口**:aevatar 缺"**集群唯一持有某个外部长连接/会话所有权**"的通用做法。现有"well-known singleton actor" 模式(`RoleCatalogGAgent.cs:14` / `ConnectorCatalogGAgent.cs:14` / `StreamingProxyParticipantGAgent.cs:13` / `ChannelBotRegistrationGAgent.cs:15` / `DeviceRegistrationGAgent.cs:15`)是**被动 actor**——只要 grain id 固定就行,没有 lease / epoch fencing / failover ownership 语义。 +**缺口**:aevatar 缺"**集群唯一持有某个外部长连接/会话所有权**"的通用做法。现有"well-known singleton actor" 模式(`RoleCatalogGAgent.cs:14` / `ConnectorCatalogGAgent.cs:14` / `ChannelBotRegistrationGAgent.cs:15` / `DeviceRegistrationGAgent.cs:15`)是**被动 actor**——只要 grain id 固定就行,没有 lease / epoch fencing / failover ownership 语义。 现有 hosted service(`UserAgentCatalogStartupService.cs:22-60` / `ChannelBotRegistrationStartupService.cs:33-72`)是 **node-local startup/warmup**——host 启动时 poke 一下 grain 让它 activate,不是 cluster-wide supervisor。 diff --git a/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs b/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs new file mode 100644 index 000000000..de90762d2 --- /dev/null +++ b/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs @@ -0,0 +1,15 @@ +namespace Aevatar.Studio.Application.Studio.Abstractions; + +/// +/// Query-side participant view for streaming proxy rooms. +/// +public interface IStreamingProxyParticipantQueryPort +{ + Task> ListAsync( + string roomId, CancellationToken cancellationToken = default); +} + +public sealed record StreamingProxyParticipant( + string AgentId, + string DisplayName, + DateTimeOffset JoinedAt); diff --git a/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs b/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs deleted file mode 100644 index 7e0cc8d46..000000000 --- a/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs +++ /dev/null @@ -1,33 +0,0 @@ -namespace Aevatar.Studio.Application.Studio.Abstractions; - -/// -/// Persistent participant index for streaming proxy rooms. -/// -/// -/// TODO: When wiring to endpoints, the caller must handle corrupt-data exceptions -/// from the underlying store (e.g. from -/// deserialization failures). Swallowing errors and returning an empty list would -/// silently discard existing participants. The chrono-storage implementation -/// intentionally throws on corruption to prevent data loss. -/// -public interface IStreamingProxyParticipantStore -{ - Task> ListAsync( - string roomId, CancellationToken cancellationToken = default); - - Task AddAsync( - string roomId, string agentId, string displayName, - CancellationToken cancellationToken = default); - - Task RemoveParticipantAsync( - string roomId, string agentId, - CancellationToken cancellationToken = default); - - Task RemoveRoomAsync( - string roomId, CancellationToken cancellationToken = default); -} - -public sealed record StreamingProxyParticipant( - string AgentId, - string DisplayName, - DateTimeOffset JoinedAt); diff --git a/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs b/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs index 0b1e40e77..9b86a6aa8 100644 --- a/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs +++ b/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs @@ -8,7 +8,6 @@ using Aevatar.GAgents.ConnectorCatalog; using Aevatar.GAgents.Registry; using Aevatar.GAgents.RoleCatalog; -using Aevatar.GAgents.StreamingProxyParticipant; using Aevatar.GAgents.StudioMember; using Aevatar.GAgents.StudioTeam; using Aevatar.Studio.Workspace; @@ -30,7 +29,7 @@ namespace Aevatar.Studio.Hosting; /// configuration. Required by the actor-backed stores /// (IRoleCatalogStore, IConnectorCatalogStore, /// IChatHistoryStore, IGAgentActorRegistryQueryPort, -/// IUserMemoryStore, IStreamingProxyParticipantStore) that read +/// IUserMemoryStore) that read /// from these documents via IProjectionDocumentReader. /// internal static class StudioProjectionReadModelServiceCollectionExtensions @@ -67,7 +66,6 @@ public static IServiceCollection AddStudioProjectionReadModelProviders( RegisterElasticsearch(services, configuration); RegisterElasticsearch(services, configuration); RegisterElasticsearch(services, configuration); - RegisterElasticsearch(services, configuration); RegisterElasticsearch(services, configuration); RegisterElasticsearch(services, configuration); RegisterElasticsearch(services, configuration); @@ -82,7 +80,6 @@ public static IServiceCollection AddStudioProjectionReadModelProviders( RegisterInMemory(services); RegisterInMemory(services); RegisterInMemory(services); - RegisterInMemory(services); RegisterInMemory(services); RegisterInMemory(services); RegisterInMemory(services); @@ -134,7 +131,6 @@ private static bool HasAllStudioDocumentReaders( && HasDocumentReaderForProvider(services, providerKind) && HasDocumentReaderForProvider(services, providerKind) && HasDocumentReaderForProvider(services, providerKind) - && HasDocumentReaderForProvider(services, providerKind) && HasDocumentReaderForProvider(services, providerKind) && HasDocumentReaderForProvider(services, providerKind) && HasDocumentReaderForProvider(services, providerKind) @@ -219,7 +215,6 @@ private static TypeRegistry BuildStudioStateTypeRegistry() ConnectorCatalogState.Descriptor, RoleCatalogState.Descriptor, UserMemoryState.Descriptor, - StreamingProxyParticipantGAgentState.Descriptor, ChatHistoryIndexState.Descriptor, ChatConversationState.Descriptor, StudioMemberState.Descriptor, diff --git a/src/Aevatar.Studio.Infrastructure/ActorBacked/ActorBackedStreamingProxyParticipantStore.cs b/src/Aevatar.Studio.Infrastructure/ActorBacked/ActorBackedStreamingProxyParticipantStore.cs deleted file mode 100644 index 95152299b..000000000 --- a/src/Aevatar.Studio.Infrastructure/ActorBacked/ActorBackedStreamingProxyParticipantStore.cs +++ /dev/null @@ -1,101 +0,0 @@ -using Aevatar.CQRS.Projection.Stores.Abstractions; -using Aevatar.Foundation.Abstractions; -using Aevatar.GAgents.StreamingProxyParticipant; -using Aevatar.Studio.Application.Studio.Abstractions; -using Aevatar.Studio.Projection.ReadModels; -using Google.Protobuf.WellKnownTypes; -using Microsoft.Extensions.Logging; - -namespace Aevatar.Studio.Infrastructure.ActorBacked; - -/// -/// Actor-backed implementation of . -/// Reads from the projection document store (CQRS read model). -/// Writes send commands to the Write GAgent. -/// -internal sealed class ActorBackedStreamingProxyParticipantStore - : IStreamingProxyParticipantStore -{ - private const string WriteActorId = "streaming-proxy-participants"; - - private readonly IStudioActorBootstrap _bootstrap; - private readonly IActorDispatchPort _dispatchPort; - private readonly IProjectionDocumentReader _documentReader; - private readonly ILogger _logger; - - public ActorBackedStreamingProxyParticipantStore( - IStudioActorBootstrap bootstrap, - IActorDispatchPort dispatchPort, - IProjectionDocumentReader documentReader, - ILogger logger) - { - _bootstrap = bootstrap ?? throw new ArgumentNullException(nameof(bootstrap)); - _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort)); - _documentReader = documentReader ?? throw new ArgumentNullException(nameof(documentReader)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - public async Task> ListAsync( - string roomId, CancellationToken cancellationToken = default) - { - var document = await _documentReader.GetAsync(WriteActorId, cancellationToken); - if (document?.StateRoot == null || - !document.StateRoot.Is(StreamingProxyParticipantGAgentState.Descriptor)) - return []; - - var state = document.StateRoot.Unpack(); - if (!state.Rooms.TryGetValue(roomId, out var list)) - return []; - - return list.Participants - .Select(p => new StreamingProxyParticipant( - p.AgentId, - p.DisplayName, - p.JoinedAt.ToDateTimeOffset())) - .ToList() - .AsReadOnly(); - } - - public async Task AddAsync( - string roomId, string agentId, string displayName, - CancellationToken cancellationToken = default) - { - var actor = await EnsureWriteActorAsync(cancellationToken); - var evt = new ParticipantAddedEvent - { - RoomId = roomId, - AgentId = agentId, - DisplayName = displayName, - JoinedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), - }; - await ActorCommandDispatcher.SendAsync(_dispatchPort, actor, evt, cancellationToken); - } - - public async Task RemoveParticipantAsync( - string roomId, string agentId, CancellationToken cancellationToken = default) - { - var actor = await EnsureWriteActorAsync(cancellationToken); - var evt = new ParticipantRemovedEvent - { - RoomId = roomId, - AgentId = agentId, - }; - await ActorCommandDispatcher.SendAsync(_dispatchPort, actor, evt, cancellationToken); - } - - public async Task RemoveRoomAsync( - string roomId, CancellationToken cancellationToken = default) - { - var actor = await EnsureWriteActorAsync(cancellationToken); - var evt = new RoomParticipantsRemovedEvent - { - RoomId = roomId, - }; - await ActorCommandDispatcher.SendAsync(_dispatchPort, actor, evt, cancellationToken); - } - - // ── Actor resolution ── - - private Task EnsureWriteActorAsync(CancellationToken ct) => - _bootstrap.EnsureAsync(WriteActorId, ct); -} diff --git a/src/Aevatar.Studio.Infrastructure/ActorBacked/ProjectionStreamingProxyParticipantQueryPort.cs b/src/Aevatar.Studio.Infrastructure/ActorBacked/ProjectionStreamingProxyParticipantQueryPort.cs new file mode 100644 index 000000000..8bea4bc0d --- /dev/null +++ b/src/Aevatar.Studio.Infrastructure/ActorBacked/ProjectionStreamingProxyParticipantQueryPort.cs @@ -0,0 +1,49 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgents.StreamingProxy; +using Aevatar.Studio.Application.Studio.Abstractions; +using Microsoft.Extensions.Logging; +using AppStreamingProxyParticipant = Aevatar.Studio.Application.Studio.Abstractions.StreamingProxyParticipant; + +namespace Aevatar.Studio.Infrastructure.ActorBacked; + +/// +/// Projection-backed participant query port for StreamingProxy rooms. +/// +internal sealed class ProjectionStreamingProxyParticipantQueryPort + : IStreamingProxyParticipantQueryPort +{ + private readonly IProjectionDocumentReader _documentReader; + private readonly ILogger _logger; + + public ProjectionStreamingProxyParticipantQueryPort( + IProjectionDocumentReader documentReader, + ILogger logger) + { + _documentReader = documentReader ?? throw new ArgumentNullException(nameof(documentReader)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task> ListAsync( + string roomId, CancellationToken cancellationToken = default) + { + var document = await _documentReader.GetAsync(roomId, cancellationToken); + if (document?.StateRoot == null || + !document.StateRoot.Is(StreamingProxyGAgentState.Descriptor)) + { + _logger.LogDebug( + "StreamingProxy room current-state document is not available for room {RoomId}.", + roomId); + return []; + } + + var state = document.StateRoot.Unpack(); + + return state.Participants + .Select(p => new AppStreamingProxyParticipant( + p.AgentId, + p.DisplayName, + p.JoinedAt?.ToDateTimeOffset() ?? DateTimeOffset.MinValue)) + .ToList() + .AsReadOnly(); + } +} diff --git a/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj b/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj index e886cb485..f9088ebed 100644 --- a/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj +++ b/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj @@ -17,7 +17,7 @@ - + diff --git a/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs b/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs index 4efa6bc81..6f000651b 100644 --- a/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs @@ -44,7 +44,7 @@ public static IServiceCollection AddStudioInfrastructure( services.AddSingleton(sp => sp.GetRequiredService()); services.AddSingleton(sp => sp.GetRequiredService()); services.AddSingleton(sp => sp.GetRequiredService()); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj b/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj index 5146fabeb..f50966e38 100644 --- a/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj +++ b/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj @@ -20,7 +20,6 @@ - diff --git a/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs b/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs index e91c1fb14..b3f9f18fd 100644 --- a/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs @@ -78,10 +78,6 @@ public static IServiceCollection AddStudioProjectionComponents( StudioMaterializationContext, UserMemoryCurrentStateProjector>(); - services.AddCurrentStateProjectionMaterializer< - StudioMaterializationContext, - StreamingProxyParticipantCurrentStateProjector>(); - services.AddCurrentStateProjectionMaterializer< StudioMaterializationContext, ChatHistoryIndexCurrentStateProjector>(); @@ -128,10 +124,6 @@ public static IServiceCollection AddStudioProjectionComponents( IProjectionDocumentMetadataProvider, UserMemoryCurrentStateDocumentMetadataProvider>(); - services.TryAddSingleton< - IProjectionDocumentMetadataProvider, - StreamingProxyParticipantCurrentStateDocumentMetadataProvider>(); - services.TryAddSingleton< IProjectionDocumentMetadataProvider, ChatHistoryIndexCurrentStateDocumentMetadataProvider>(); diff --git a/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto b/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto index 730c5091d..6ed78ca36 100644 --- a/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto +++ b/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto @@ -68,17 +68,6 @@ message UserMemoryCurrentStateDocument { google.protobuf.Any state_root = 10; } -// ─── StreamingProxyParticipant Current State ReadModel ─── - -message StreamingProxyParticipantCurrentStateDocument { - string id = 1; - string actor_id = 2; - int64 state_version = 3; - string last_event_id = 4; - google.protobuf.Timestamp updated_at = 5; - google.protobuf.Any state_root = 10; -} - // ─── ChatHistoryIndex Current State ReadModel ─── message ChatHistoryIndexCurrentStateDocument { diff --git a/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj b/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj index e4a24545a..21d56e31e 100644 --- a/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj +++ b/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj @@ -27,7 +27,6 @@ - diff --git a/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs b/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs index 0e81a8a5a..7e1d40ca8 100644 --- a/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs +++ b/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs @@ -12,6 +12,7 @@ using Aevatar.CQRS.Core.Abstractions.Streaming; using Aevatar.CQRS.Core.Commands; using Aevatar.Foundation.Core.EventSourcing; +using Aevatar.Foundation.Abstractions.Persistence; using Aevatar.Foundation.Abstractions.Streaming; using Aevatar.Studio.Application.Studio.Abstractions; using Aevatar.GAgentService.Abstractions.ScopeGAgents; @@ -190,10 +191,9 @@ public async Task HandleListRoomsAsync_ShouldReturnRoomsForScope() } [Fact] - public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromBothStores() + public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromRegistry() { var actorStore = new StubGAgentActorStore(); - var participantStore = new StubParticipantStore(); var result = await InvokeResultAsync( "HandleDeleteRoomAsync", @@ -202,7 +202,6 @@ public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromBothStores() "room-1", actorStore, actorStore, - participantStore, NullLoggerFactory.Instance, CancellationToken.None); @@ -211,7 +210,6 @@ public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromBothStores() actorStore.RemovedActors.Should().ContainSingle(x => x.scopeId == "scope-a" && x.gagentType == StreamingProxyDefaults.GAgentTypeName && x.actorId == "room-1"); - participantStore.RemovedRooms.Should().ContainSingle(x => x == "room-1"); } [Fact] @@ -221,8 +219,6 @@ public async Task HandleDeleteRoomAsync_UnregisterFailure_ShouldReturnUnavailabl { UnregisterException = new InvalidOperationException("registry unavailable"), }; - var participantStore = new StubParticipantStore(); - var result = await InvokeResultAsync( "HandleDeleteRoomAsync", CreateScopedHttpContext(), @@ -230,13 +226,11 @@ public async Task HandleDeleteRoomAsync_UnregisterFailure_ShouldReturnUnavailabl "room-1", actorStore, actorStore, - participantStore, NullLoggerFactory.Instance, CancellationToken.None); var response = await ExecuteResultAsync(result); response.StatusCode.Should().Be(StatusCodes.Status503ServiceUnavailable); - participantStore.RemovedRooms.Should().BeEmpty(); } [Fact] @@ -247,14 +241,14 @@ public async Task HandleChatAsync_ShouldRejectEmptyPrompt() var dispatchPort = new StubActorDispatchPort(runtime); var interactionService = new StubStreamingProxyRoomChatInteractionService(); var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(new StubTerminalQueryPort()); - var participantStore = new StubParticipantStore(); + var participantQueryPort = new StubParticipantStore(); var actorStore = new StubGAgentActorStore(); var coordinator = CreateNyxParticipantCoordinator(); var method = typeof(StreamingProxyEndpoints).GetMethod( "HandleChatAsync", BindingFlags.NonPublic | BindingFlags.Static)!; - var task = method.Invoke(null, [context, "scope-a", "room-a", new ChatTopicRequest(null), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, CancellationToken.None]); + var task = method.Invoke(null, [context, "scope-a", "room-a", new ChatTopicRequest(null), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, CancellationToken.None]); await InvokeTaskAsync(task); context.Response.StatusCode.Should().Be(StatusCodes.Status400BadRequest); @@ -270,7 +264,7 @@ public async Task HandleChatAsync_ShouldRejectMismatchedAuthenticatedScope() var dispatchPort = new StubActorDispatchPort(runtime); var interactionService = new StubStreamingProxyRoomChatInteractionService(); var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(new StubTerminalQueryPort()); - var participantStore = new StubParticipantStore(); + var participantQueryPort = new StubParticipantStore(); var actorStore = new StubGAgentActorStore(); var coordinator = CreateNyxParticipantCoordinator(); @@ -279,7 +273,7 @@ public async Task HandleChatAsync_ShouldRejectMismatchedAuthenticatedScope() BindingFlags.NonPublic | BindingFlags.Static)!; var task = method.Invoke( null, - [context, "scope-a", "room-a", new ChatTopicRequest("hello"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, CancellationToken.None]); + [context, "scope-a", "room-a", new ChatTopicRequest("hello"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, CancellationToken.None]); await InvokeTaskAsync(task); context.Response.StatusCode.Should().Be(StatusCodes.Status403Forbidden); @@ -449,7 +443,7 @@ public async Task HandleChatAsync_ShouldAttachProjectionSession_AndEmitRunFinish var interactionService = new StubStreamingProxyRoomChatInteractionService(); var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver( new StubTerminalQueryPort(StreamingProxyChatSessionTerminalStatus.Completed)); - var participantStore = new StubParticipantStore(); + var participantQueryPort = new StubParticipantStore(); var actorStore = new StubGAgentActorStore(); var coordinator = CreateNyxParticipantCoordinator(); var request = new ChatTopicRequest("Discuss webhook relay", "session-123"); @@ -536,7 +530,7 @@ public async Task HandleChatAsync_ShouldAttachProjectionSession_AndEmitRunFinish BindingFlags.NonPublic | BindingFlags.Static)!; await InvokeTaskAsync(method.Invoke( null, - [context, "scope-a", "room-a", request, runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, CancellationToken.None])); + [context, "scope-a", "room-a", request, runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, CancellationToken.None])); interactionService.Commands.Should().ContainSingle().Which.Should().Be(new StreamingProxyRoomChatCommand( "room-a", @@ -565,7 +559,7 @@ public async Task HandleChatAsync_ShouldPublishFailedTerminalState_WhenCancelled WaitForCancellation = true, }; var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(new StubTerminalQueryPort()); - var participantStore = new StubParticipantStore(); + var participantQueryPort = new StubParticipantStore(); var actorStore = new StubGAgentActorStore(); var coordinator = CreateNyxParticipantCoordinator(); using var cts = new CancellationTokenSource(); @@ -575,7 +569,7 @@ public async Task HandleChatAsync_ShouldPublishFailedTerminalState_WhenCancelled BindingFlags.NonPublic | BindingFlags.Static)!; var task = InvokeTaskAsync(method.Invoke( null, - [context, "scope-a", "room-a", new ChatTopicRequest("Cancel me", "session-cancel"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, cts.Token])); + [context, "scope-a", "room-a", new ChatTopicRequest("Cancel me", "session-cancel"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, cts.Token])); await interactionService.Started.Task; cts.Cancel(); @@ -824,22 +818,6 @@ await stream.PumpAsync( emitted.Should().BeEmpty(); } - [Fact] - public void DetermineParticipantTerminalState_ShouldFail_WhenNoRepliesWereProduced() - { - var method = typeof(StreamingProxyEndpoints).GetMethod( - "DetermineParticipantTerminalState", - BindingFlags.NonPublic | BindingFlags.Static)!; - - var failed = ((StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage))method.Invoke(null, [0])!; - failed.Status.Should().Be(StreamingProxyChatSessionTerminalStatus.Failed); - failed.ErrorMessage.Should().Be("StreamingProxy chat completed without any participant replies."); - - var completed = ((StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage))method.Invoke(null, [1])!; - completed.Status.Should().Be(StreamingProxyChatSessionTerminalStatus.Completed); - completed.ErrorMessage.Should().BeNull(); - } - [Fact] public async Task TryPublishFailedTerminalStateAsync_ShouldEmitFailedTerminalEvent_WhenCompletionIsUnknown() { @@ -1059,9 +1037,8 @@ public async Task HandlePostMessageAsync_ShouldRejectMissingFieldsAndReturnAccep } [Fact] - public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndAddParticipant() + public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndDispatchJoinEvent() { - var participantStore = new StubParticipantStore(); var runtime = new StubActorRuntime(new List { new StubActor("room-a") }); var dispatchPort = new StubActorDispatchPort(runtime); var actorStore = new StubGAgentActorStore(); @@ -1074,7 +1051,6 @@ public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndAddParticipant() new JoinRoomRequest(null, null), runtime, actorStore, - participantStore, NullLoggerFactory.Instance, CancellationToken.None); @@ -1091,14 +1067,11 @@ public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndAddParticipant() runtime, dispatchPort, actorStore, - participantStore, NullLoggerFactory.Instance, CancellationToken.None); response = await ExecuteResultAsync(result); response.StatusCode.Should().Be(StatusCodes.Status200OK); - participantStore.AddedParticipants.Should().ContainSingle(x => - x.roomId == "room-a" && x.agentId == "agent-1" && x.displayName == "Alice"); dispatchPort.Dispatches.Should().ContainSingle(x => x.ActorId == "room-a"); } @@ -1243,10 +1216,10 @@ await WriteRoomSessionEventAsync( } [Fact] - public async Task HandleListParticipantsAsync_ShouldReturnStoreParticipants() + public async Task HandleListParticipantsAsync_ShouldReturnQueriedParticipants() { - var participantStore = new StubParticipantStore(); - participantStore.Participants["room-a"] = + var participantQueryPort = new StubParticipantStore(); + participantQueryPort.Participants["room-a"] = [ new StreamingProxyParticipant("agent-1", "Alice", DateTimeOffset.UtcNow), ]; @@ -1257,7 +1230,7 @@ public async Task HandleListParticipantsAsync_ShouldReturnStoreParticipants() "scope-a", "room-a", new StubGAgentActorStore(), - participantStore, + participantQueryPort, NullLoggerFactory.Instance, CancellationToken.None); @@ -1324,6 +1297,97 @@ await agent.HandleGroupChatMessage(new GroupChatMessageEvent .ContainSingle(x => x.AgentId == "agent-1"); } + [Fact] + public async Task StreamingProxyGAgent_ShouldPruneFailedNyxParticipant_AndContinueWithNextParticipant() + { + var coordinator = new RecordingNyxCoordinator(); + using var provider = BuildStreamingProxyAgentProvider(coordinator); + var agent = CreateAgent(provider, "room-a"); + + await agent.ActivateAsync(); + await agent.HandleNyxDiscussionRequested(new StreamingProxyNyxDiscussionRequested + { + SessionId = "session-a", + Prompt = "Discuss release risk", + AccessToken = "token-a", + Participants = + { + BuildNyxParticipant("node-a", "Node A"), + BuildNyxParticipant("node-b", "Node B"), + }, + }); + + coordinator.WorkItems.Should().ContainSingle(); + coordinator.WorkItems[0].Participant.ParticipantId.Should().Be("node-a"); + + await agent.HandleNyxParticipantReplyFailed(new StreamingProxyNyxParticipantReplyFailed + { + SessionId = "session-a", + AccessToken = "token-a", + Round = 1, + ParticipantId = "node-a", + DisplayName = "Node A", + ErrorMessage = "unavailable", + }); + + var session = agent.State.NyxDiscussionSessions["session-a"]; + session.ActiveParticipants.Select(x => x.ParticipantId).Should().Equal("node-b"); + session.CurrentParticipantIndex.Should().Be(0); + coordinator.WorkItems.Should().HaveCount(2); + coordinator.WorkItems[1].Participant.ParticipantId.Should().Be("node-b"); + agent.State.Participants.Should().NotContain(x => x.AgentId == "node-a"); + } + + [Fact] + public async Task StreamingProxyGAgent_ShouldAdvanceNyxRound_FromActorState() + { + var coordinator = new RecordingNyxCoordinator(); + using var provider = BuildStreamingProxyAgentProvider(coordinator); + var agent = CreateAgent(provider, "room-a"); + + await agent.ActivateAsync(); + await agent.HandleNyxDiscussionRequested(new StreamingProxyNyxDiscussionRequested + { + SessionId = "session-a", + Prompt = "Discuss release risk", + AccessToken = "token-a", + Participants = + { + BuildNyxParticipant("node-a", "Node A"), + BuildNyxParticipant("node-b", "Node B"), + }, + }); + + await agent.HandleNyxParticipantReplySucceeded(new StreamingProxyNyxParticipantReplySucceeded + { + SessionId = "session-a", + AccessToken = "token-a", + Round = 1, + ParticipantId = "node-a", + DisplayName = "Node A", + Content = "First answer.", + }); + await agent.HandleNyxParticipantReplySucceeded(new StreamingProxyNyxParticipantReplySucceeded + { + SessionId = "session-a", + AccessToken = "token-a", + Round = 1, + ParticipantId = "node-b", + DisplayName = "Node B", + Content = "Second answer.", + }); + + var session = agent.State.NyxDiscussionSessions["session-a"]; + session.CurrentRound.Should().Be(2); + session.CurrentParticipantIndex.Should().Be(0); + session.CurrentRoundSuccessfulReplies.Should().Be(0); + session.TotalSuccessfulReplies.Should().Be(2); + session.Transcript.Select(x => x.Speaker).Should().Equal("Node A", "Node B"); + coordinator.WorkItems.Should().HaveCount(3); + coordinator.WorkItems[2].Round.Should().Be(2); + coordinator.WorkItems[2].Participant.ParticipantId.Should().Be("node-a"); + } + [Fact] public async Task StreamingProxySseWriter_ShouldStartStream_AndSerializeRoomFrames() { @@ -1372,6 +1436,28 @@ private static StreamingProxyGAgent CreateAgent(IServiceProvider provider, strin return agent; } + private static ServiceProvider BuildStreamingProxyAgentProvider( + StreamingProxyNyxParticipantCoordinator coordinator) + { + return new ServiceCollection() + .AddSingleton() + .AddSingleton() + .AddTransient(typeof(IEventSourcingBehaviorFactory<>), typeof(DefaultEventSourcingBehaviorFactory<>)) + .AddSingleton(coordinator) + .BuildServiceProvider(); + } + + private static StreamingProxyNyxParticipant BuildNyxParticipant( + string participantId, + string displayName) => + new() + { + ParticipantId = participantId, + RoutePreference = $"/api/v1/proxy/s/openclaw/{participantId}", + DisplayName = displayName, + Model = "claude-sonnet-4-5-20250929", + }; + private static EventEnvelope CreateTopologyEnvelope(IMessage payload) => new() { @@ -1944,11 +2030,9 @@ public Task CreateRoomAsync( } } - private sealed class StubParticipantStore : IStreamingProxyParticipantStore + private sealed class StubParticipantStore : IStreamingProxyParticipantQueryPort { public Dictionary> Participants { get; } = new(StringComparer.Ordinal); - public List<(string roomId, string agentId, string displayName)> AddedParticipants { get; } = []; - public List RemovedRooms { get; } = []; public Task> ListAsync( string roomId, CancellationToken cancellationToken = default) @@ -1957,26 +2041,6 @@ public Task> ListAsync( return Task.FromResult>(list.AsReadOnly()); return Task.FromResult>([]); } - - public Task AddAsync( - string roomId, string agentId, string displayName, - CancellationToken cancellationToken = default) - { - AddedParticipants.Add((roomId, agentId, displayName)); - return Task.CompletedTask; - } - - public Task RemoveParticipantAsync( - string roomId, string agentId, - CancellationToken cancellationToken = default) - => Task.CompletedTask; - - public Task RemoveRoomAsync( - string roomId, CancellationToken cancellationToken = default) - { - RemovedRooms.Add(roomId); - return Task.CompletedTask; - } } private sealed class StubTerminalQueryPort : IStreamingProxyChatSessionTerminalQueryPort @@ -2043,6 +2107,28 @@ private sealed class StubLlmProviderFactory(ILLMProvider provider) : ILLMProvide public IReadOnlyList GetAvailableProviders() => []; } + private sealed class RecordingNyxCoordinator() + : StreamingProxyNyxParticipantCoordinator( + new StubActorDispatchPort(new StubActorRuntime()), + new StubLlmProviderFactory(new StubLlmProvider()), + new ConfigurationBuilder().Build(), + new StubHttpClientFactory(), + NullLogger.Instance) + { + public List WorkItems { get; } = []; + + public override Task RequestParticipantReplyAsync( + string roomId, + StreamingProxyNyxParticipantWorkItem workItem, + CancellationToken ct) + { + _ = roomId; + _ = ct; + WorkItems.Add(workItem); + return Task.CompletedTask; + } + } + private sealed class StubHttpClientFactory : IHttpClientFactory { public HttpClient CreateClient(string name) => new(); diff --git a/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs b/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs index f6e332a67..edede901d 100644 --- a/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs +++ b/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs @@ -96,9 +96,9 @@ public async Task HandleCreateRoomAsync_ShouldMapCommandFailureToServerError() } [Fact] - public async Task HandleListParticipantsAsync_ShouldReturnStoredParticipants() + public async Task HandleListParticipantsAsync_ShouldReturnQueriedParticipants() { - var participantStore = new RecordingParticipantStore + var participantQueryPort = new RecordingParticipantStore { Participants = [ @@ -111,7 +111,7 @@ public async Task HandleListParticipantsAsync_ShouldReturnStoredParticipants() CreateScopedHttpContext(), "scope-a", "room-1", - participantStore, + participantQueryPort, loggerFactory, CancellationToken.None); @@ -123,9 +123,9 @@ public async Task HandleListParticipantsAsync_ShouldReturnStoredParticipants() } [Fact] - public async Task HandleListParticipantsAsync_ShouldReturnServerError_WhenStoreThrows() + public async Task HandleListParticipantsAsync_ShouldReturnServerError_WhenQueryPortThrows() { - var participantStore = new RecordingParticipantStore + var participantQueryPort = new RecordingParticipantStore { ThrowOnList = new InvalidOperationException("list failed"), }; @@ -135,7 +135,7 @@ public async Task HandleListParticipantsAsync_ShouldReturnServerError_WhenStoreT CreateScopedHttpContext(), "scope-a", "room-1", - participantStore, + participantQueryPort, loggerFactory, CancellationToken.None); @@ -172,14 +172,14 @@ public async Task HandleCreateRoomAsync_ShouldRejectMismatchedAuthenticatedScope [Fact] public async Task HandleListParticipantsAsync_ShouldRejectMismatchedAuthenticatedScope() { - var participantStore = new RecordingParticipantStore(); + var participantQueryPort = new RecordingParticipantStore(); var loggerFactory = LoggerFactory.Create(_ => { }); var result = await InvokeHandleListParticipantsAsync( CreateScopedHttpContext("scope-b"), "scope-a", "room-1", - participantStore, + participantQueryPort, loggerFactory, CancellationToken.None); @@ -206,13 +206,13 @@ private static async Task InvokeHandleListParticipantsAsync( HttpContext context, string scopeId, string roomId, - IStreamingProxyParticipantStore participantStore, + IStreamingProxyParticipantQueryPort participantQueryPort, ILoggerFactory loggerFactory, CancellationToken ct) { return await (Task)HandleListParticipantsAsyncMethod.Invoke( null, - [context, scopeId, roomId, new RecordingGAgentActorStore([]), participantStore, loggerFactory, ct])!; + [context, scopeId, roomId, new RecordingGAgentActorStore([]), participantQueryPort, loggerFactory, ct])!; } private static async Task<(int StatusCode, string Body)> ExecuteResultAsync(IResult result) @@ -319,7 +319,7 @@ public Task CreateRoomAsync( } } - private sealed class RecordingParticipantStore : IStreamingProxyParticipantStore + private sealed class RecordingParticipantStore : IStreamingProxyParticipantQueryPort { public Exception? ThrowOnList { get; init; } public IReadOnlyList Participants { get; init; } = []; @@ -335,33 +335,6 @@ public Task> ListAsync( return Task.FromResult(Participants); } - public Task AddAsync( - string roomId, - string agentId, - string displayName, - CancellationToken cancellationToken = default) - { - _ = roomId; - _ = agentId; - _ = displayName; - return Task.CompletedTask; - } - - public Task RemoveParticipantAsync( - string roomId, - string agentId, - CancellationToken cancellationToken = default) - { - _ = roomId; - _ = agentId; - return Task.CompletedTask; - } - - public Task RemoveRoomAsync(string roomId, CancellationToken cancellationToken = default) - { - _ = roomId; - return Task.CompletedTask; - } } private sealed class RecordingActorRuntime(List operations, IActor actor) : IActorRuntime diff --git a/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs b/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs index 4fe52c88a..755943362 100644 --- a/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs +++ b/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs @@ -43,11 +43,11 @@ public async Task EnsureParticipantsJoinedAsync_ShouldPreserveDistinctNodesWithS joinedEvents.Should().HaveCount(3); joinedEvents.Select(evt => evt.AgentId).Should().OnlyHaveUniqueItems(); - store.ListParticipants("room-1").Should().HaveCount(3); + store.ListParticipants("room-1").Should().BeEmpty(); } [Fact] - public async Task GenerateRepliesAsync_ShouldSkipUnavailableOpenerAndContinueWithHealthyParticipant() + public async Task RequestDiscussionAsync_ShouldDispatchDiscussionRequestWithoutLocalProgression() { var (coordinator, actor, store, llmProvider) = CreateCoordinator(); var participants = await coordinator.EnsureParticipantsJoinedAsync( @@ -61,42 +61,32 @@ public async Task GenerateRepliesAsync_ShouldSkipUnavailableOpenerAndContinueWit var roomParticipants = participants.Take(2).ToList(); - await coordinator.GenerateRepliesAsync( + await coordinator.RequestDiscussionAsync( roomParticipants, actor, "Discuss the roadmap for the next release.", "session-1", "test-token", - CancellationToken.None, - store, - "room-1"); + CancellationToken.None); - llmProvider.Requests.Should().HaveCount(2); - llmProvider.Requests[0].RequestId.Should().Contain("node-a"); - llmProvider.Requests[1].RequestId.Should().Contain("node-b"); + llmProvider.Requests.Should().BeEmpty(); - var messageEvents = actor.Events - .Where(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor)) - .Select(envelope => envelope.Payload!.Unpack()) + var requests = actor.Events + .Where(envelope => envelope.Payload!.Is(StreamingProxyNyxDiscussionRequested.Descriptor)) + .Select(envelope => envelope.Payload!.Unpack()) .ToList(); - var leftEvents = actor.Events - .Where(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor)) - .Select(envelope => envelope.Payload!.Unpack()) - .ToList(); - - messageEvents.Should().HaveCount(1); - messageEvents.Should().NotContain(evt => evt.Content.StartsWith("当前暂时不可用", StringComparison.Ordinal)); - messageEvents.Single().Content.Should().Contain("reply from"); - messageEvents.Single().Content.Should().Contain("node-b"); - messageEvents.Select(evt => evt.AgentId).Should().OnlyHaveUniqueItems(); - leftEvents.Should().HaveCount(1); - leftEvents.Single().AgentId.Should().Contain("node-a"); - store.ListParticipants("room-1").Should().HaveCount(2); + requests.Should().ContainSingle(); + requests.Single().SessionId.Should().Be("session-1"); + requests.Single().Prompt.Should().Be("Discuss the roadmap for the next release."); + requests.Single().Participants.Should().HaveCount(2); + actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor)); + actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor)); + store.ListParticipants("room-1").Should().BeEmpty(); } [Fact] - public async Task GenerateRepliesAsync_ShouldIgnoreUnavailableTextResponseAndContinueWithHealthyParticipant() + public async Task RequestParticipantReplyAsync_ShouldReturnFailureContinuation_WhenResponseIsUnavailable() { var (coordinator, actor, store, llmProvider) = CreateCoordinator(request => { @@ -114,52 +104,35 @@ public async Task GenerateRepliesAsync_ShouldIgnoreUnavailableTextResponseAndCon }; }); - var participants = await coordinator.EnsureParticipantsJoinedAsync( - "scope-1", - "room-1", - actor, - store, - "test-token", - CancellationToken.None, - preferredRoute: "/api/v1/proxy/s/openclaw/node-a"); - - var roomParticipants = participants.Take(2).ToList(); + var roomParticipants = BuildParticipants(); - await coordinator.GenerateRepliesAsync( - roomParticipants, - actor, - "Discuss the roadmap for the next release.", - "session-1", - "test-token", - CancellationToken.None, - store, - "room-1"); + await coordinator.RequestParticipantReplyAsync( + "room-1", + new StreamingProxyNyxParticipantWorkItem( + "session-1", + "test-token", + 1, + StreamingProxyDefaults.MaxDiscussionRounds, + roomParticipants[0], + roomParticipants, + []), + CancellationToken.None); - llmProvider.Requests.Should().HaveCount(2); + llmProvider.Requests.Should().HaveCount(1); llmProvider.Requests[0].RequestId.Should().Contain("node-a"); - llmProvider.Requests[1].RequestId.Should().Contain("node-b"); - var messageEvents = actor.Events - .Where(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor)) - .Select(envelope => envelope.Payload!.Unpack()) + var failures = actor.Events + .Where(envelope => envelope.Payload!.Is(StreamingProxyNyxParticipantReplyFailed.Descriptor)) + .Select(envelope => envelope.Payload!.Unpack()) .ToList(); - var leftEvents = actor.Events - .Where(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor)) - .Select(envelope => envelope.Payload!.Unpack()) - .ToList(); - - messageEvents.Should().HaveCount(1); - messageEvents.Single().Content.Should().Contain("reply from"); - messageEvents.Single().Content.Should().Contain("node-b"); - messageEvents.Should().NotContain(evt => evt.Content.Contains("503", StringComparison.OrdinalIgnoreCase)); - leftEvents.Should().HaveCount(1); - leftEvents.Single().AgentId.Should().Contain("node-a"); - store.ListParticipants("room-1").Should().HaveCount(2); + failures.Should().ContainSingle(); + failures.Single().ParticipantId.Should().Contain("node-a"); + actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor)); } [Fact] - public async Task GenerateRepliesAsync_ShouldUseStreamContentWhenSynchronousContentIsMissing() + public async Task RequestParticipantReplyAsync_ShouldReturnSuccessContinuation_WhenStreamContentArrives() { var (coordinator, actor, store, llmProvider) = CreateCoordinator( responseFactory: _ => new LLMResponse(), @@ -169,43 +142,31 @@ public async Task GenerateRepliesAsync_ShouldUseStreamContentWhenSynchronousCont new LLMStreamChunk { FinishReason = "stop", IsLast = true }, ]); - var participants = await coordinator.EnsureParticipantsJoinedAsync( - "scope-1", - "room-1", - actor, - store, - "test-token", - CancellationToken.None, - preferredRoute: "/api/v1/proxy/s/openclaw/node-b"); - - var roomParticipants = participants.Take(1).ToList(); + var roomParticipants = BuildParticipants().Skip(1).Take(1).ToList(); - await coordinator.GenerateRepliesAsync( - roomParticipants, - actor, - "Discuss the roadmap for the next release.", - "session-1", - "test-token", - CancellationToken.None, - store, - "room-1"); + await coordinator.RequestParticipantReplyAsync( + "room-1", + new StreamingProxyNyxParticipantWorkItem( + "session-1", + "test-token", + 1, + 1, + roomParticipants[0], + roomParticipants, + []), + CancellationToken.None); llmProvider.Requests.Should().HaveCount(1); - var messageEvents = actor.Events - .Where(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor)) - .Select(envelope => envelope.Payload!.Unpack()) - .ToList(); - - var leftEvents = actor.Events - .Where(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor)) - .Select(envelope => envelope.Payload!.Unpack()) + var replies = actor.Events + .Where(envelope => envelope.Payload!.Is(StreamingProxyNyxParticipantReplySucceeded.Descriptor)) + .Select(envelope => envelope.Payload!.Unpack()) .ToList(); - messageEvents.Should().HaveCount(1); - messageEvents.Single().Content.Should().Contain("streamed reply from"); - messageEvents.Single().SessionId.Should().Be("session-1"); - leftEvents.Should().BeEmpty(); + replies.Should().ContainSingle(); + replies.Single().Content.Should().Contain("streamed reply from"); + replies.Single().SessionId.Should().Be("session-1"); + actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor)); } [Fact] @@ -226,13 +187,26 @@ public async Task EnsureParticipantsJoinedAsync_ShouldFallbackToLegacyStatus_Whe participants.Should().ContainSingle(); participants.Single().RoutePreference.Should().Be("/api/v1/proxy/s/openclaw/legacy"); participants.Single().Model.Should().Be("legacy-model"); - store.ListParticipants("room-1").Should().ContainSingle(entry => - entry.AgentId.Contains("svc-legacy", StringComparison.OrdinalIgnoreCase)); + store.ListParticipants("room-1").Should().BeEmpty(); } private static (StreamingProxyNyxParticipantCoordinator Coordinator, RecordingActor Actor, RecordingParticipantStore Store, RecordingLlmProvider Provider) CreateCoordinator() => CreateCoordinator(null); + private static List BuildParticipants() => + [ + new( + "svc-node-a|node-a|/api/v1/proxy/s/openclaw/node-a", + "/api/v1/proxy/s/openclaw/node-a", + "OpenClaw-Node (1)", + "claude-sonnet-4-5-20250929"), + new( + "svc-node-b|node-b|/api/v1/proxy/s/openclaw/node-b", + "/api/v1/proxy/s/openclaw/node-b", + "OpenClaw-Node (2)", + "claude-sonnet-4-5-20250929"), + ]; + private static (StreamingProxyNyxParticipantCoordinator Coordinator, RecordingActor Actor, RecordingParticipantStore Store, RecordingLlmProvider Provider) CreateCoordinator( Func? responseFactory) => CreateCoordinator(responseFactory, null); @@ -486,7 +460,7 @@ public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationTo } } - private sealed class RecordingParticipantStore : IStreamingProxyParticipantStore + private sealed class RecordingParticipantStore : IStreamingProxyParticipantQueryPort { private readonly Dictionary> _rooms = new(StringComparer.OrdinalIgnoreCase); @@ -500,44 +474,6 @@ public Task> ListAsync( return Task.FromResult(participants); } - public Task AddAsync( - string roomId, - string agentId, - string displayName, - CancellationToken cancellationToken = default) - { - if (!_rooms.TryGetValue(roomId, out var participants)) - { - participants = []; - _rooms[roomId] = participants; - } - - participants.RemoveAll(entry => string.Equals(entry.AgentId, agentId, StringComparison.OrdinalIgnoreCase)); - participants.Add(new ParticipantStoreEntry(agentId, displayName, DateTimeOffset.UtcNow)); - return Task.CompletedTask; - } - - public Task RemoveParticipantAsync( - string roomId, - string agentId, - CancellationToken cancellationToken = default) - { - if (_rooms.TryGetValue(roomId, out var participants)) - { - participants.RemoveAll(entry => string.Equals(entry.AgentId, agentId, StringComparison.OrdinalIgnoreCase)); - if (participants.Count == 0) - _rooms.Remove(roomId); - } - - return Task.CompletedTask; - } - - public Task RemoveRoomAsync(string roomId, CancellationToken cancellationToken = default) - { - _rooms.Remove(roomId); - return Task.CompletedTask; - } - public IReadOnlyList ListParticipants(string roomId) => _rooms.TryGetValue(roomId, out var participants) ? participants diff --git a/test/Aevatar.Studio.Tests/StreamingProxyRoomCurrentStateProjectorTests.cs b/test/Aevatar.Studio.Tests/StreamingProxyRoomCurrentStateProjectorTests.cs new file mode 100644 index 000000000..4f82c39b4 --- /dev/null +++ b/test/Aevatar.Studio.Tests/StreamingProxyRoomCurrentStateProjectorTests.cs @@ -0,0 +1,109 @@ +using Aevatar.CQRS.Projection.Core.Abstractions; +using Aevatar.CQRS.Projection.Runtime.Abstractions; +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgents.StreamingProxy; +using FluentAssertions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.Studio.Tests; + +public sealed class StreamingProxyRoomCurrentStateProjectorTests +{ + [Fact] + public async Task ProjectAsync_ShouldUpsertRoomCurrentState_FromCommittedRoomState() + { + var dispatcher = new RecordingWriteDispatcher(); + var projector = new StreamingProxyRoomCurrentStateProjector( + dispatcher, + new FixedProjectionClock(DateTimeOffset.Parse("2026-05-25T00:00:00Z"))); + var joinedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.Parse("2026-05-25T10:00:00Z")); + + await projector.ProjectAsync( + new StreamingProxyCurrentStateProjectionContext + { + RootActorId = "room-a", + ProjectionKind = "streaming-proxy-current-state", + }, + WrapCommitted( + new GroupChatParticipantJoinedEvent + { + AgentId = "agent-1", + DisplayName = "Alice", + }, + new StreamingProxyGAgentState + { + RoomName = "Room A", + Participants = + { + new StreamingProxyParticipant + { + AgentId = "agent-1", + DisplayName = "Alice", + JoinedAt = joinedAt, + }, + }, + }, + version: 7, + eventId: "evt-7")); + + dispatcher.Upserts.Should().ContainSingle(); + var document = dispatcher.Upserts[0]; + document.Id.Should().Be("room-a"); + document.ActorId.Should().Be("room-a"); + document.StateVersion.Should().Be(7); + document.LastEventId.Should().Be("evt-7"); + document.StateRoot.Is(StreamingProxyGAgentState.Descriptor).Should().BeTrue(); + document.StateRoot.Unpack().Participants + .Should().ContainSingle(p => p.AgentId == "agent-1" && p.DisplayName == "Alice"); + } + + private static EventEnvelope WrapCommitted( + IMessage payload, + IMessage state, + long version, + string eventId) + { + return new EventEnvelope + { + Id = "env-1", + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Payload = Any.Pack(new CommittedStateEventPublished + { + StateEvent = new StateEvent + { + Version = version, + EventId = eventId, + EventData = Any.Pack(payload), + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }, + StateRoot = Any.Pack(state), + }), + }; + } + + private sealed class RecordingWriteDispatcher : IProjectionWriteDispatcher + where TReadModel : class, IProjectionReadModel + { + public List Upserts { get; } = []; + + public Task UpsertAsync(TReadModel readModel, CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + Upserts.Add(readModel); + return Task.FromResult(ProjectionWriteResult.Applied()); + } + + public Task DeleteAsync(string id, CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + return Task.FromResult(ProjectionWriteResult.Applied()); + } + } + + private sealed class FixedProjectionClock(DateTimeOffset utcNow) : IProjectionClock + { + public DateTimeOffset UtcNow { get; } = utcNow; + } +}