diff --git a/aevatar.slnx b/aevatar.slnx
index 502fd5de1..25b6e7488 100644
--- a/aevatar.slnx
+++ b/aevatar.slnx
@@ -5,7 +5,6 @@
-
diff --git a/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs b/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs
index f5f135256..2eff36b97 100644
--- a/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/ServiceCollectionExtensions.cs
@@ -66,9 +66,15 @@ public static IServiceCollection AddStreamingProxy(
},
static context => new StreamingProxyCurrentStateRuntimeLease(context));
services.TryAddSingleton();
+ services.AddCurrentStateProjectionMaterializer<
+ StreamingProxyCurrentStateProjectionContext,
+ StreamingProxyRoomCurrentStateProjector>();
services.AddCurrentStateProjectionMaterializer<
StreamingProxyCurrentStateProjectionContext,
StreamingProxyChatSessionTerminalProjector>();
+ services.TryAddSingleton<
+ IProjectionDocumentMetadataProvider,
+ StreamingProxyRoomCurrentStateDocumentMetadataProvider>();
services.TryAddSingleton<
IProjectionDocumentMetadataProvider,
StreamingProxyChatSessionTerminalSnapshotMetadataProvider>();
@@ -112,7 +118,10 @@ private static void AddTerminalSnapshotReadModelProvider(
IConfiguration? configuration)
{
if (services.Any(x => x.ServiceType == typeof(IProjectionDocumentReader)))
+ {
+ EnsureStreamingProxyRoomCurrentStateReadModelProvider(services, configuration);
return;
+ }
var elasticsearchEnabled = ResolveElasticsearchDocumentEnabled(configuration);
var inMemoryEnabled = ResolveOptionalBool(
@@ -127,6 +136,7 @@ private static void AddTerminalSnapshotReadModelProvider(
if (elasticsearchEnabled)
{
+ AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(services, configuration);
services.AddElasticsearchDocumentProjectionStore(
optionsFactory: _ => BuildElasticsearchDocumentOptions(configuration!),
metadataFactory: sp => sp
@@ -137,12 +147,50 @@ private static void AddTerminalSnapshotReadModelProvider(
return;
}
+ AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(services);
services.AddInMemoryDocumentProjectionStore(
keySelector: readModel => readModel.Id,
keyFormatter: key => key,
defaultSortSelector: readModel => readModel.UpdatedAt.ToDateTimeOffset());
}
+ private static void EnsureStreamingProxyRoomCurrentStateReadModelProvider(
+ IServiceCollection services,
+ IConfiguration? configuration)
+ {
+ if (services.Any(x => x.ServiceType == typeof(IProjectionDocumentReader)))
+ return;
+
+ if (ResolveElasticsearchDocumentEnabled(configuration))
+ {
+ AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(services, configuration);
+ return;
+ }
+
+ AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(services);
+ }
+
+ private static void AddElasticsearchStreamingProxyRoomCurrentStateReadModelProvider(
+ IServiceCollection services,
+ IConfiguration? configuration)
+ {
+ services.AddElasticsearchDocumentProjectionStore(
+ optionsFactory: _ => BuildElasticsearchDocumentOptions(configuration!),
+ metadataFactory: sp => sp
+ .GetRequiredService>()
+ .Metadata,
+ keySelector: readModel => readModel.ActorId,
+ keyFormatter: key => key);
+ }
+
+ private static void AddInMemoryStreamingProxyRoomCurrentStateReadModelProvider(IServiceCollection services)
+ {
+ services.AddInMemoryDocumentProjectionStore(
+ keySelector: readModel => readModel.ActorId,
+ keyFormatter: key => key,
+ defaultSortSelector: readModel => readModel.UpdatedAt.ToDateTimeOffset());
+ }
+
private static bool ResolveElasticsearchDocumentEnabled(IConfiguration? configuration)
{
if (configuration == null)
diff --git a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs
index 1fa89ba5c..8eb0551f3 100644
--- a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs
@@ -124,7 +124,6 @@ private static async Task HandleDeleteRoomAsync(
string roomId,
[FromServices] IGAgentActorRegistryCommandPort registryCommandPort,
[FromServices] IScopeResourceAdmissionPort admissionPort,
- [FromServices] IStreamingProxyParticipantStore participantStore,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
{
@@ -155,15 +154,6 @@ await registryCommandPort.UnregisterActorAsync(
new { error = "Failed to delete room" },
statusCode: StatusCodes.Status503ServiceUnavailable);
}
- try
- {
- await participantStore.RemoveRoomAsync(roomId, ct);
- }
- catch (OperationCanceledException) { throw; }
- catch (Exception ex)
- {
- logger.LogWarning(ex, "Failed to remove participants for room {RoomId}", roomId);
- }
return Results.Ok();
}
@@ -179,7 +169,7 @@ private static async Task HandleChatAsync(
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] ICommandInteractionService interactionService,
[FromServices] StreamingProxyChatDurableCompletionResolver durableCompletionResolver,
- [FromServices] IStreamingProxyParticipantStore participantStore,
+ [FromServices] IStreamingProxyParticipantQueryPort participantQueryPort,
[FromServices] StreamingProxyNyxParticipantCoordinator participantCoordinator,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
@@ -240,7 +230,7 @@ private static async Task HandleChatAsync(
scopeId,
roomId,
actor,
- participantStore,
+ participantQueryPort,
accessToken,
token,
preferredRoute,
@@ -249,21 +239,12 @@ private static async Task HandleChatAsync(
if (participants.Count == 0 || string.IsNullOrWhiteSpace(accessToken))
return;
- var terminalState = DetermineParticipantTerminalState(await participantCoordinator.GenerateRepliesAsync(
+ await participantCoordinator.RequestDiscussionAsync(
participants,
actor,
prompt,
sessionId,
accessToken,
- token,
- participantStore,
- roomId));
- await PublishTerminalStateAsync(
- actorDispatchPort,
- actor.Id,
- sessionId,
- terminalState.Status,
- terminalState.ErrorMessage,
token);
},
ct);
@@ -456,7 +437,7 @@ private static async Task HandleListParticipantsAsync(
string scopeId,
string roomId,
[FromServices] IScopeResourceAdmissionPort admissionPort,
- [FromServices] IStreamingProxyParticipantStore participantStore,
+ [FromServices] IStreamingProxyParticipantQueryPort participantQueryPort,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
{
@@ -475,7 +456,7 @@ private static async Task HandleListParticipantsAsync(
var logger = loggerFactory.CreateLogger("Aevatar.GAgents.StreamingProxy.Endpoints");
try
{
- var participants = await participantStore.ListAsync(roomId, ct);
+ var participants = await participantQueryPort.ListAsync(roomId, ct);
return Results.Ok(participants);
}
catch (OperationCanceledException) { throw; }
@@ -496,7 +477,6 @@ private static async Task HandleJoinAsync(
[FromServices] IActorRuntime actorRuntime,
[FromServices] IActorDispatchPort actorDispatchPort,
[FromServices] IScopeResourceAdmissionPort admissionPort,
- [FromServices] IStreamingProxyParticipantStore participantStore,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
{
@@ -537,17 +517,6 @@ private static async Task HandleJoinAsync(
};
await DispatchRoomEnvelopeAsync(actorDispatchPort, actor.Id, envelope, ct);
- var logger = loggerFactory.CreateLogger("Aevatar.GAgents.StreamingProxy.Endpoints");
- try
- {
- await participantStore.AddAsync(roomId, agentId, displayName, ct);
- }
- catch (OperationCanceledException) { throw; }
- catch (Exception ex)
- {
- logger.LogWarning(ex, "Failed to persist participant {AgentId} in room {RoomId}", agentId, roomId);
- }
-
return Results.Ok(new { status = "joined", agentId });
}
@@ -698,12 +667,6 @@ private static Task DispatchRoomEnvelopeAsync(
return actorDispatchPort.DispatchAsync(actorId, envelope, ct);
}
- private static (StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage) DetermineParticipantTerminalState(
- int successfulReplies) =>
- successfulReplies > 0
- ? (StreamingProxyChatSessionTerminalStatus.Completed, null)
- : (StreamingProxyChatSessionTerminalStatus.Failed, "StreamingProxy chat completed without any participant replies.");
-
private static async Task TryPublishCanceledTerminalStateAsync(
IActorDispatchPort actorDispatchPort,
IActor? actor,
diff --git a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs
index 9675b7211..e93981f4d 100644
--- a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs
@@ -5,6 +5,7 @@
using Aevatar.Foundation.Core.EventSourcing;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
+using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Aevatar.GAgents.StreamingProxy;
@@ -97,6 +98,99 @@ public async Task HandleChatSessionTerminalStateChanged(StreamingProxyChatSessio
evt.Status);
}
+ [EventHandler]
+ public async Task HandleNyxDiscussionRequested(StreamingProxyNyxDiscussionRequested evt)
+ {
+ // Fix (review round 1, F1):
+ // Reviewer found Nyx transcript, active set, rounds, pruning, and stop decisions in coordinator locals.
+ // This actor now persists the discussion session and issues one participant work item at a time.
+ if (string.IsNullOrWhiteSpace(evt.SessionId))
+ return;
+
+ if (evt.Participants.Count == 0)
+ {
+ await PersistDomainEventAsync(BuildTerminalEvent(
+ evt.SessionId,
+ StreamingProxyChatSessionTerminalStatus.Failed,
+ "StreamingProxy chat completed without any participant replies."));
+ return;
+ }
+
+ await PersistDomainEventAsync(new StreamingProxyNyxDiscussionStarted
+ {
+ SessionId = evt.SessionId,
+ Prompt = evt.Prompt,
+ TotalRounds = evt.Participants.Count > 1 ? StreamingProxyDefaults.MaxDiscussionRounds : 1,
+ Participants = { evt.Participants.Select(participant => participant.Clone()) },
+ });
+
+ await ContinueNyxDiscussionAsync(evt.SessionId, evt.AccessToken);
+ }
+
+ [EventHandler]
+ public async Task HandleNyxParticipantReplySucceeded(StreamingProxyNyxParticipantReplySucceeded evt)
+ {
+ if (!IsExpectedNyxParticipantTurn(evt.SessionId, evt.Round, evt.ParticipantId))
+ return;
+
+ var content = evt.Content?.Trim() ?? string.Empty;
+ if (string.IsNullOrWhiteSpace(content))
+ {
+ await HandleNyxParticipantReplyFailed(new StreamingProxyNyxParticipantReplyFailed
+ {
+ SessionId = evt.SessionId,
+ AccessToken = evt.AccessToken,
+ Round = evt.Round,
+ ParticipantId = evt.ParticipantId,
+ DisplayName = evt.DisplayName,
+ ErrorMessage = "Participant returned an empty response.",
+ });
+ return;
+ }
+
+ await PersistDomainEventAsync(new StreamingProxyNyxParticipantReplyRecorded
+ {
+ SessionId = evt.SessionId,
+ Round = evt.Round,
+ ParticipantId = evt.ParticipantId,
+ DisplayName = evt.DisplayName,
+ Content = content,
+ });
+
+ await HandleGroupChatMessage(new GroupChatMessageEvent
+ {
+ AgentId = evt.ParticipantId,
+ AgentName = evt.DisplayName,
+ Content = content,
+ SessionId = evt.SessionId,
+ });
+
+ await ContinueNyxDiscussionAsync(evt.SessionId, evt.AccessToken);
+ }
+
+ [EventHandler]
+ public async Task HandleNyxParticipantReplyFailed(StreamingProxyNyxParticipantReplyFailed evt)
+ {
+ if (!IsExpectedNyxParticipantTurn(evt.SessionId, evt.Round, evt.ParticipantId))
+ return;
+
+ await PersistDomainEventAsync(new StreamingProxyNyxParticipantFailureRecorded
+ {
+ SessionId = evt.SessionId,
+ Round = evt.Round,
+ ParticipantId = evt.ParticipantId,
+ DisplayName = evt.DisplayName,
+ ErrorMessage = evt.ErrorMessage ?? string.Empty,
+ });
+
+ await HandleGroupChatParticipantLeft(new GroupChatParticipantLeftEvent
+ {
+ AgentId = evt.ParticipantId,
+ });
+
+ await ContinueNyxDiscussionAsync(evt.SessionId, evt.AccessToken);
+ }
+
///
/// Applies domain events to the sole authoritative actor state.
/// Called by the event sourcing infrastructure after PersistDomainEventAsync.
@@ -110,6 +204,10 @@ protected override StreamingProxyGAgentState TransitionState(StreamingProxyGAgen
.On(ApplyParticipantJoined)
.On(ApplyParticipantLeft)
.On(ApplyTerminalStateChanged)
+ .On(ApplyNyxDiscussionStarted)
+ .On(ApplyNyxParticipantReplyRecorded)
+ .On(ApplyNyxParticipantFailureRecorded)
+ .On(ApplyNyxDiscussionRoundAdvanced)
.OrCurrent();
private static StreamingProxyGAgentState ApplyRoomInitialized(
@@ -200,6 +298,108 @@ private static void TrimMessages(StreamingProxyGAgentState state)
}
}
+ private async Task ContinueNyxDiscussionAsync(string sessionId, string accessToken)
+ {
+ if (!State.NyxDiscussionSessions.TryGetValue(sessionId, out var session) ||
+ session.Status != StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusActive)
+ {
+ return;
+ }
+
+ if (session.ActiveParticipants.Count == 0)
+ {
+ await CompleteNyxDiscussionAsync(session, StreamingProxyChatSessionTerminalStatus.Failed);
+ return;
+ }
+
+ if (session.CurrentParticipantIndex >= session.ActiveParticipants.Count)
+ {
+ if (session.CurrentRoundSuccessfulReplies == 0 ||
+ session.ActiveParticipants.Count < 2 ||
+ session.CurrentRound >= session.TotalRounds)
+ {
+ await CompleteNyxDiscussionAsync(
+ session,
+ session.TotalSuccessfulReplies > 0
+ ? StreamingProxyChatSessionTerminalStatus.Completed
+ : StreamingProxyChatSessionTerminalStatus.Failed);
+ return;
+ }
+
+ await PersistDomainEventAsync(new StreamingProxyNyxDiscussionRoundAdvanced
+ {
+ SessionId = session.SessionId,
+ Round = session.CurrentRound + 1,
+ });
+ await ContinueNyxDiscussionAsync(sessionId, accessToken);
+ return;
+ }
+
+ var participant = session.ActiveParticipants[session.CurrentParticipantIndex];
+ var coordinator = Services.GetRequiredService();
+ await coordinator.RequestParticipantReplyAsync(
+ Id,
+ new StreamingProxyNyxParticipantWorkItem(
+ session.SessionId,
+ accessToken,
+ session.CurrentRound,
+ session.TotalRounds,
+ ToParticipantDefinition(participant),
+ session.ActiveParticipants.Select(ToParticipantDefinition).ToList(),
+ session.Transcript.Select(entry => (entry.Speaker, entry.Content)).ToList())
+ {
+ Prompt = session.Prompt,
+ },
+ CancellationToken.None);
+ }
+
+ private async Task CompleteNyxDiscussionAsync(
+ StreamingProxyNyxDiscussionSession session,
+ StreamingProxyChatSessionTerminalStatus status)
+ {
+ var errorMessage = status == StreamingProxyChatSessionTerminalStatus.Failed
+ ? "StreamingProxy chat completed without any participant replies."
+ : string.Empty;
+ await PersistDomainEventAsync(BuildTerminalEvent(session.SessionId, status, errorMessage));
+ }
+
+ private bool IsExpectedNyxParticipantTurn(string sessionId, int round, string participantId)
+ {
+ if (!State.NyxDiscussionSessions.TryGetValue(sessionId, out var session) ||
+ session.Status != StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusActive ||
+ session.CurrentRound != round ||
+ session.CurrentParticipantIndex < 0 ||
+ session.CurrentParticipantIndex >= session.ActiveParticipants.Count)
+ {
+ return false;
+ }
+
+ return string.Equals(
+ session.ActiveParticipants[session.CurrentParticipantIndex].ParticipantId,
+ participantId,
+ StringComparison.OrdinalIgnoreCase);
+ }
+
+ private static StreamingProxyChatSessionTerminalStateChanged BuildTerminalEvent(
+ string sessionId,
+ StreamingProxyChatSessionTerminalStatus status,
+ string? errorMessage) =>
+ new()
+ {
+ SessionId = sessionId,
+ Status = status,
+ TerminalAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
+ ErrorMessage = errorMessage ?? string.Empty,
+ };
+
+ private static StreamingProxyNyxParticipantDefinition ToParticipantDefinition(
+ StreamingProxyNyxParticipant participant) =>
+ new(
+ participant.ParticipantId,
+ participant.RoutePreference,
+ participant.DisplayName,
+ participant.Model);
+
private static StreamingProxyGAgentState ApplyTerminalStateChanged(
StreamingProxyGAgentState current,
StreamingProxyChatSessionTerminalStateChanged evt)
@@ -215,6 +415,113 @@ private static StreamingProxyGAgentState ApplyTerminalStateChanged(
TerminalAt = evt.TerminalAt,
ErrorMessage = evt.ErrorMessage ?? string.Empty,
};
+ if (next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var discussion))
+ {
+ var nextDiscussion = discussion.Clone();
+ nextDiscussion.Status = evt.Status == StreamingProxyChatSessionTerminalStatus.Failed
+ ? StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusFailed
+ : StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusCompleted;
+ nextDiscussion.TerminalErrorMessage = evt.ErrorMessage ?? string.Empty;
+ next.NyxDiscussionSessions[evt.SessionId] = nextDiscussion;
+ }
+
+ return next;
+ }
+
+ private static StreamingProxyGAgentState ApplyNyxDiscussionStarted(
+ StreamingProxyGAgentState current,
+ StreamingProxyNyxDiscussionStarted evt)
+ {
+ var next = current.Clone();
+ next.NyxDiscussionSessions[evt.SessionId] = new StreamingProxyNyxDiscussionSession
+ {
+ SessionId = evt.SessionId,
+ Prompt = evt.Prompt,
+ CurrentRound = 1,
+ TotalRounds = Math.Max(evt.TotalRounds, 1),
+ CurrentParticipantIndex = 0,
+ CurrentRoundSuccessfulReplies = 0,
+ TotalSuccessfulReplies = 0,
+ Status = StreamingProxyNyxDiscussionStatus.NyxDiscussionStatusActive,
+ ActiveParticipants = { evt.Participants.Select(participant => participant.Clone()) },
+ };
+ return next;
+ }
+
+ private static StreamingProxyGAgentState ApplyNyxParticipantReplyRecorded(
+ StreamingProxyGAgentState current,
+ StreamingProxyNyxParticipantReplyRecorded evt)
+ {
+ var next = current.Clone();
+ if (!next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var session))
+ return next;
+
+ var nextSession = session.Clone();
+ nextSession.Transcript.Add(new StreamingProxyNyxTranscriptEntry
+ {
+ Speaker = evt.DisplayName,
+ Content = evt.Content,
+ });
+ TrimTranscript(nextSession);
+ nextSession.CurrentParticipantIndex++;
+ nextSession.CurrentRoundSuccessfulReplies++;
+ nextSession.TotalSuccessfulReplies++;
+ next.NyxDiscussionSessions[evt.SessionId] = nextSession;
+ return next;
+ }
+
+ private static StreamingProxyGAgentState ApplyNyxParticipantFailureRecorded(
+ StreamingProxyGAgentState current,
+ StreamingProxyNyxParticipantFailureRecorded evt)
+ {
+ var next = current.Clone();
+ if (!next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var session))
+ return next;
+
+ var nextSession = session.Clone();
+ for (var i = nextSession.ActiveParticipants.Count - 1; i >= 0; i--)
+ {
+ if (!string.Equals(
+ nextSession.ActiveParticipants[i].ParticipantId,
+ evt.ParticipantId,
+ StringComparison.OrdinalIgnoreCase))
+ {
+ continue;
+ }
+
+ nextSession.ActiveParticipants.RemoveAt(i);
+ if (i < nextSession.CurrentParticipantIndex)
+ nextSession.CurrentParticipantIndex--;
+ }
+
+ if (nextSession.CurrentParticipantIndex > nextSession.ActiveParticipants.Count)
+ nextSession.CurrentParticipantIndex = nextSession.ActiveParticipants.Count;
+
+ next.NyxDiscussionSessions[evt.SessionId] = nextSession;
return next;
}
+
+ private static StreamingProxyGAgentState ApplyNyxDiscussionRoundAdvanced(
+ StreamingProxyGAgentState current,
+ StreamingProxyNyxDiscussionRoundAdvanced evt)
+ {
+ var next = current.Clone();
+ if (!next.NyxDiscussionSessions.TryGetValue(evt.SessionId, out var session))
+ return next;
+
+ var nextSession = session.Clone();
+ nextSession.CurrentRound = evt.Round;
+ nextSession.CurrentParticipantIndex = 0;
+ nextSession.CurrentRoundSuccessfulReplies = 0;
+ next.NyxDiscussionSessions[evt.SessionId] = nextSession;
+ return next;
+ }
+
+ private static void TrimTranscript(StreamingProxyNyxDiscussionSession session)
+ {
+ while (session.Transcript.Count > StreamingProxyDefaults.MaxMessages)
+ {
+ session.Transcript.RemoveAt(0);
+ }
+ }
}
diff --git a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs
index 9ba19a685..46c30fdb4 100644
--- a/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs
@@ -13,7 +13,7 @@
namespace Aevatar.GAgents.StreamingProxy;
-internal sealed class StreamingProxyNyxParticipantCoordinator
+internal class StreamingProxyNyxParticipantCoordinator
{
private const string NyxIdProviderName = "nyxid";
private const string GatewaySuffix = "/api/v1/llm/gateway/v1";
@@ -46,7 +46,7 @@ public async Task> EnsureP
string scopeId,
string roomId,
IActor actor,
- IStreamingProxyParticipantStore participantStore,
+ IStreamingProxyParticipantQueryPort participantQueryPort,
string accessToken,
CancellationToken ct,
string? preferredRoute = null,
@@ -56,7 +56,7 @@ public async Task> EnsureP
if (participants.Count == 0)
return participants;
- var existing = await participantStore.ListAsync(roomId, ct);
+ var existing = await participantQueryPort.ListAsync(roomId, ct);
var existingIds = existing
.Select(entry => entry.AgentId)
.ToHashSet(StringComparer.OrdinalIgnoreCase);
@@ -71,150 +71,106 @@ public async Task> EnsureP
AgentId = participant.ParticipantId,
DisplayName = participant.DisplayName,
}, ct);
-
- await participantStore.AddAsync(roomId, participant.ParticipantId, participant.DisplayName, ct);
}
return participants;
}
- public async Task GenerateRepliesAsync(
+ public async Task RequestDiscussionAsync(
IReadOnlyList participants,
IActor actor,
string prompt,
string sessionId,
string accessToken,
- CancellationToken ct,
- IStreamingProxyParticipantStore? participantStore = null,
- string? roomId = null)
+ CancellationToken ct)
{
- if (participants.Count == 0)
- return 0;
+ if (participants.Count == 0 || string.IsNullOrWhiteSpace(sessionId))
+ return;
+
+ // Fix (review round 1, F1):
+ // Coordinator previously owned the Nyx round loop and mutable transcript/active participant sets.
+ // It now only dispatches actor continuation events; StreamingProxyGAgent owns progression state.
+ await DispatchAsync(actor, new StreamingProxyNyxDiscussionRequested
+ {
+ SessionId = sessionId,
+ Prompt = prompt,
+ AccessToken = accessToken,
+ Participants = { participants.Select(ToNyxParticipant) },
+ }, ct);
+ }
+
+ public virtual async Task RequestParticipantReplyAsync(
+ string roomId,
+ StreamingProxyNyxParticipantWorkItem workItem,
+ CancellationToken ct)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(roomId);
+ ArgumentNullException.ThrowIfNull(workItem);
if (!_llmProviderFactory.GetAvailableProviders().Contains(NyxIdProviderName, StringComparer.OrdinalIgnoreCase))
{
- _logger.LogWarning("NyxID provider '{ProviderName}' is not registered; skip Streaming Proxy participants.", NyxIdProviderName);
- return 0;
+ _logger.LogWarning("NyxID provider '{ProviderName}' is not registered; skip Streaming Proxy participant.", NyxIdProviderName);
+ await DispatchAsync(roomId, BuildReplyFailed(workItem, "NyxID provider is not registered."), ct);
+ return;
}
var provider = _llmProviderFactory.GetProvider(NyxIdProviderName);
- var transcript = new List<(string Speaker, string Content)>();
- var activeParticipants = participants.ToList();
- var rounds = activeParticipants.Count > 1 ? StreamingProxyDefaults.MaxDiscussionRounds : 1;
- var totalSuccessfulReplies = 0;
-
- for (var round = 1; round <= rounds && activeParticipants.Count > 0; round++)
+ try
{
- var successfulReplies = 0;
- var failedParticipants = new HashSet(StringComparer.OrdinalIgnoreCase);
- var roundParticipants = activeParticipants.ToList();
-
- foreach (var participant in roundParticipants)
+ var request = BuildParticipantRequest(
+ workItem.Participant,
+ workItem.ActiveParticipants,
+ workItem.Prompt,
+ workItem.SessionId,
+ workItem.AccessToken,
+ workItem.Transcript,
+ workItem.Round,
+ workItem.TotalRounds);
+ var response = await ReadParticipantResponseAsync(provider, request, ct);
+ if (IsUnavailableResponse(response))
{
- ct.ThrowIfCancellationRequested();
-
- if (failedParticipants.Contains(participant.ParticipantId))
- continue;
-
- var availableParticipants = activeParticipants
- .Where(candidate => !failedParticipants.Contains(candidate.ParticipantId))
- .ToList();
-
- if (availableParticipants.Count == 0)
- break;
-
- try
- {
- var request = BuildParticipantRequest(
- participant,
- availableParticipants,
- prompt,
- sessionId,
- accessToken,
- transcript,
- round,
- rounds);
- var response = await ReadParticipantResponseAsync(provider, request, ct);
- if (IsUnavailableResponse(response))
- {
- failedParticipants.Add(participant.ParticipantId);
- await MarkParticipantLeftAsync(
- actor,
- participantStore,
- roomId,
- participant.ParticipantId,
- ct);
- _logger.LogWarning(
- "Streaming Proxy participant '{Participant}' returned an unavailable response for route '{RoutePreference}' in round {Round}.",
- participant.DisplayName,
- participant.RoutePreference,
- round);
- continue;
- }
-
- var content = NormalizeParticipantReply(
- participant,
- availableParticipants,
- response.Content);
- if (string.IsNullOrWhiteSpace(content))
- {
- failedParticipants.Add(participant.ParticipantId);
- await MarkParticipantLeftAsync(
- actor,
- participantStore,
- roomId,
- participant.ParticipantId,
- ct);
- continue;
- }
-
- transcript.Add((participant.DisplayName, content));
- successfulReplies++;
- totalSuccessfulReplies++;
- await DispatchAsync(actor, new GroupChatMessageEvent
- {
- AgentId = participant.ParticipantId,
- AgentName = participant.DisplayName,
- Content = content,
- SessionId = sessionId,
- }, ct);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch (Exception ex)
- {
- failedParticipants.Add(participant.ParticipantId);
- await MarkParticipantLeftAsync(
- actor,
- participantStore,
- roomId,
- participant.ParticipantId,
- ct);
- _logger.LogWarning(ex,
- "Streaming Proxy participant '{Participant}' failed for route '{RoutePreference}' in round {Round}.",
- participant.DisplayName,
- participant.RoutePreference,
- round);
- }
+ await DispatchAsync(roomId, BuildReplyFailed(workItem, "Participant returned an unavailable response."), ct);
+ _logger.LogWarning(
+ "Streaming Proxy participant '{Participant}' returned an unavailable response for route '{RoutePreference}' in round {Round}.",
+ workItem.Participant.DisplayName,
+ workItem.Participant.RoutePreference,
+ workItem.Round);
+ return;
}
- if (failedParticipants.Count > 0)
+ var content = NormalizeParticipantReply(
+ workItem.Participant,
+ workItem.ActiveParticipants,
+ response.Content);
+ if (string.IsNullOrWhiteSpace(content))
{
- activeParticipants = activeParticipants
- .Where(participant => !failedParticipants.Contains(participant.ParticipantId))
- .ToList();
+ await DispatchAsync(roomId, BuildReplyFailed(workItem, "Participant returned an empty response."), ct);
+ return;
}
- if (successfulReplies == 0)
- break;
-
- if (activeParticipants.Count < 2)
- break;
+ await DispatchAsync(roomId, new StreamingProxyNyxParticipantReplySucceeded
+ {
+ SessionId = workItem.SessionId,
+ AccessToken = workItem.AccessToken,
+ Round = workItem.Round,
+ ParticipantId = workItem.Participant.ParticipantId,
+ DisplayName = workItem.Participant.DisplayName,
+ Content = content,
+ }, ct);
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ await DispatchAsync(roomId, BuildReplyFailed(workItem, ex.Message), ct);
+ _logger.LogWarning(ex,
+ "Streaming Proxy participant '{Participant}' failed for route '{RoutePreference}' in round {Round}.",
+ workItem.Participant.DisplayName,
+ workItem.Participant.RoutePreference,
+ workItem.Round);
}
-
- return totalSuccessfulReplies;
}
private async Task> ResolveParticipantsAsync(
@@ -937,28 +893,6 @@ private static async Task ReadParticipantResponseAsync(
return string.Join('\n', lines).Trim();
}
- private async Task MarkParticipantLeftAsync(
- IActor actor,
- IStreamingProxyParticipantStore? participantStore,
- string? roomId,
- string participantId,
- CancellationToken ct)
- {
- if (string.IsNullOrWhiteSpace(participantId))
- return;
-
- if (participantStore is not null &&
- !string.IsNullOrWhiteSpace(roomId))
- {
- await participantStore.RemoveParticipantAsync(roomId, participantId, ct);
- }
-
- await DispatchAsync(actor, new GroupChatParticipantLeftEvent
- {
- AgentId = participantId,
- }, ct);
- }
-
private static bool IsUnavailableResponse(LLMResponse response)
{
if (string.Equals(response.FinishReason, "error", StringComparison.OrdinalIgnoreCase) ||
@@ -1020,6 +954,45 @@ private async Task DispatchAsync(IActor actor, IMessage payload, CancellationTok
await DispatchRoomEnvelopeAsync(actor.Id, envelope, ct);
}
+ private Task DispatchAsync(string actorId, IMessage payload, CancellationToken ct)
+ {
+ var envelope = new EventEnvelope
+ {
+ Id = Guid.NewGuid().ToString("N"),
+ Timestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
+ Payload = Any.Pack(payload),
+ Route = new EnvelopeRoute
+ {
+ Direct = new DirectRoute { TargetActorId = actorId },
+ },
+ };
+
+ return DispatchRoomEnvelopeAsync(actorId, envelope, ct);
+ }
+
+ private static StreamingProxyNyxParticipant ToNyxParticipant(
+ StreamingProxyNyxParticipantDefinition participant) =>
+ new()
+ {
+ ParticipantId = participant.ParticipantId,
+ RoutePreference = participant.RoutePreference,
+ DisplayName = participant.DisplayName,
+ Model = participant.Model ?? string.Empty,
+ };
+
+ private static StreamingProxyNyxParticipantReplyFailed BuildReplyFailed(
+ StreamingProxyNyxParticipantWorkItem workItem,
+ string? errorMessage) =>
+ new()
+ {
+ SessionId = workItem.SessionId,
+ AccessToken = workItem.AccessToken,
+ Round = workItem.Round,
+ ParticipantId = workItem.Participant.ParticipantId,
+ DisplayName = workItem.Participant.DisplayName,
+ ErrorMessage = errorMessage ?? string.Empty,
+ };
+
private Task DispatchRoomEnvelopeAsync(
string actorId,
EventEnvelope envelope,
@@ -1038,6 +1011,18 @@ internal sealed record StreamingProxyNyxParticipantDefinition(
string DisplayName,
string? Model);
+internal sealed record StreamingProxyNyxParticipantWorkItem(
+ string SessionId,
+ string AccessToken,
+ int Round,
+ int TotalRounds,
+ StreamingProxyNyxParticipantDefinition Participant,
+ IReadOnlyList ActiveParticipants,
+ IReadOnlyList<(string Speaker, string Content)> Transcript)
+{
+ public string Prompt { get; init; } = string.Empty;
+}
+
internal sealed record StreamingProxyNyxParticipantCandidate(
StreamingProxyNyxProviderStatus Provider,
string ParticipantId,
diff --git a/src/Aevatar.Studio.Projection/ReadModels/StreamingProxyParticipantCurrentStateDocument.Partial.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocument.Partial.cs
similarity index 65%
rename from src/Aevatar.Studio.Projection/ReadModels/StreamingProxyParticipantCurrentStateDocument.Partial.cs
rename to agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocument.Partial.cs
index 66e116ef0..83495ec2b 100644
--- a/src/Aevatar.Studio.Projection/ReadModels/StreamingProxyParticipantCurrentStateDocument.Partial.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocument.Partial.cs
@@ -1,9 +1,9 @@
using Aevatar.CQRS.Projection.Stores.Abstractions;
-namespace Aevatar.Studio.Projection.ReadModels;
+namespace Aevatar.GAgents.StreamingProxy;
-public sealed partial class StreamingProxyParticipantCurrentStateDocument
- : IProjectionReadModel
+public sealed partial class StreamingProxyRoomCurrentStateDocument
+ : IProjectionReadModel
{
string IProjectionReadModel.ActorId => ActorId;
diff --git a/src/Aevatar.Studio.Projection/Metadata/StreamingProxyParticipantCurrentStateDocumentMetadataProvider.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocumentMetadataProvider.cs
similarity index 55%
rename from src/Aevatar.Studio.Projection/Metadata/StreamingProxyParticipantCurrentStateDocumentMetadataProvider.cs
rename to agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocumentMetadataProvider.cs
index 49900f975..e21d4c008 100644
--- a/src/Aevatar.Studio.Projection/Metadata/StreamingProxyParticipantCurrentStateDocumentMetadataProvider.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateDocumentMetadataProvider.cs
@@ -1,13 +1,12 @@
using Aevatar.CQRS.Projection.Stores.Abstractions;
-using Aevatar.Studio.Projection.ReadModels;
-namespace Aevatar.Studio.Projection.Metadata;
+namespace Aevatar.GAgents.StreamingProxy;
-public sealed class StreamingProxyParticipantCurrentStateDocumentMetadataProvider
- : IProjectionDocumentMetadataProvider
+public sealed class StreamingProxyRoomCurrentStateDocumentMetadataProvider
+ : IProjectionDocumentMetadataProvider
{
public DocumentIndexMetadata Metadata { get; } = new(
- IndexName: "studio-streaming-proxy-participant",
+ IndexName: "studio-streaming-proxy-room",
Mappings: new Dictionary(StringComparer.Ordinal)
{
["dynamic"] = true,
diff --git a/src/Aevatar.Studio.Projection/Projectors/StreamingProxyParticipantCurrentStateProjector.cs b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateProjector.cs
similarity index 62%
rename from src/Aevatar.Studio.Projection/Projectors/StreamingProxyParticipantCurrentStateProjector.cs
rename to agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateProjector.cs
index 88181d4ac..b27e577ab 100644
--- a/src/Aevatar.Studio.Projection/Projectors/StreamingProxyParticipantCurrentStateProjector.cs
+++ b/agents/Aevatar.GAgents.StreamingProxy/StreamingProxyRoomCurrentStateProjector.cs
@@ -2,25 +2,22 @@
using Aevatar.CQRS.Projection.Core.Orchestration;
using Aevatar.CQRS.Projection.Runtime.Abstractions;
using Aevatar.Foundation.Abstractions;
-using Aevatar.GAgents.StreamingProxyParticipant;
-using Aevatar.Studio.Projection.Orchestration;
-using Aevatar.Studio.Projection.ReadModels;
using Google.Protobuf.WellKnownTypes;
-namespace Aevatar.Studio.Projection.Projectors;
+namespace Aevatar.GAgents.StreamingProxy;
///
-/// Materializes committed events into
-/// in the projection document store.
+/// Materializes committed events into
+/// in the projection document store.
///
-public sealed class StreamingProxyParticipantCurrentStateProjector
- : ICurrentStateProjectionMaterializer
+public sealed class StreamingProxyRoomCurrentStateProjector
+ : ICurrentStateProjectionMaterializer
{
- private readonly IProjectionWriteDispatcher _writeDispatcher;
+ private readonly IProjectionWriteDispatcher _writeDispatcher;
private readonly IProjectionClock _clock;
- public StreamingProxyParticipantCurrentStateProjector(
- IProjectionWriteDispatcher writeDispatcher,
+ public StreamingProxyRoomCurrentStateProjector(
+ IProjectionWriteDispatcher writeDispatcher,
IProjectionClock clock)
{
_writeDispatcher = writeDispatcher ?? throw new ArgumentNullException(nameof(writeDispatcher));
@@ -28,14 +25,14 @@ public StreamingProxyParticipantCurrentStateProjector(
}
public async ValueTask ProjectAsync(
- StudioMaterializationContext context,
+ StreamingProxyCurrentStateProjectionContext context,
EventEnvelope envelope,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(envelope);
- if (!CommittedStateEventEnvelope.TryUnpackState(
+ if (!CommittedStateEventEnvelope.TryUnpackState(
envelope,
out _,
out var stateEvent,
@@ -48,7 +45,7 @@ public async ValueTask ProjectAsync(
var updatedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow);
- var document = new StreamingProxyParticipantCurrentStateDocument
+ var document = new StreamingProxyRoomCurrentStateDocument
{
Id = context.RootActorId,
ActorId = context.RootActorId,
diff --git a/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto b/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto
index fdf53456d..94822a5a8 100644
--- a/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto
+++ b/agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto
@@ -3,6 +3,7 @@ package aevatar.gagents.streamingproxy;
option csharp_namespace = "Aevatar.GAgents.StreamingProxy";
import "google/protobuf/timestamp.proto";
+import "google/protobuf/any.proto";
import "agent_messages.proto";
// ─── State ───
@@ -28,6 +29,7 @@ message StreamingProxyGAgentState {
repeated StreamingProxyChatMessage messages = 3;
int64 next_sequence = 4;
map terminal_sessions = 5;
+ map nyx_discussion_sessions = 6;
}
message StreamingProxyChatSessionTerminalRecord {
@@ -37,6 +39,39 @@ message StreamingProxyChatSessionTerminalRecord {
string error_message = 4;
}
+enum StreamingProxyNyxDiscussionStatus {
+ NYX_DISCUSSION_STATUS_UNSPECIFIED = 0;
+ NYX_DISCUSSION_STATUS_ACTIVE = 1;
+ NYX_DISCUSSION_STATUS_COMPLETED = 2;
+ NYX_DISCUSSION_STATUS_FAILED = 3;
+}
+
+message StreamingProxyNyxParticipant {
+ string participant_id = 1;
+ string route_preference = 2;
+ string display_name = 3;
+ string model = 4;
+}
+
+message StreamingProxyNyxTranscriptEntry {
+ string speaker = 1;
+ string content = 2;
+}
+
+message StreamingProxyNyxDiscussionSession {
+ string session_id = 1;
+ string prompt = 2;
+ int32 current_round = 3;
+ int32 total_rounds = 4;
+ int32 current_participant_index = 5;
+ int32 current_round_successful_replies = 6;
+ int32 total_successful_replies = 7;
+ StreamingProxyNyxDiscussionStatus status = 8;
+ string terminal_error_message = 9;
+ repeated StreamingProxyNyxParticipant active_participants = 10;
+ repeated StreamingProxyNyxTranscriptEntry transcript = 11;
+}
+
// ─── Events ───
enum StreamingProxyChatSessionTerminalStatus {
@@ -77,6 +112,59 @@ message StreamingProxyChatSessionTerminalStateChanged {
string error_message = 4;
}
+message StreamingProxyNyxDiscussionRequested {
+ string session_id = 1;
+ string prompt = 2;
+ string access_token = 3;
+ repeated StreamingProxyNyxParticipant participants = 4;
+}
+
+message StreamingProxyNyxDiscussionStarted {
+ string session_id = 1;
+ string prompt = 2;
+ repeated StreamingProxyNyxParticipant participants = 3;
+ int32 total_rounds = 4;
+}
+
+message StreamingProxyNyxParticipantReplySucceeded {
+ string session_id = 1;
+ string access_token = 2;
+ int32 round = 3;
+ string participant_id = 4;
+ string display_name = 5;
+ string content = 6;
+}
+
+message StreamingProxyNyxParticipantReplyFailed {
+ string session_id = 1;
+ string access_token = 2;
+ int32 round = 3;
+ string participant_id = 4;
+ string display_name = 5;
+ string error_message = 6;
+}
+
+message StreamingProxyNyxParticipantReplyRecorded {
+ string session_id = 1;
+ int32 round = 2;
+ string participant_id = 3;
+ string display_name = 4;
+ string content = 5;
+}
+
+message StreamingProxyNyxParticipantFailureRecorded {
+ string session_id = 1;
+ int32 round = 2;
+ string participant_id = 3;
+ string display_name = 4;
+ string error_message = 5;
+}
+
+message StreamingProxyNyxDiscussionRoundAdvanced {
+ string session_id = 1;
+ int32 round = 2;
+}
+
message StreamingProxyRoomSessionEnvelope {
aevatar.EventEnvelope envelope = 1;
}
@@ -93,3 +181,12 @@ message StreamingProxyChatSessionTerminalSnapshot {
google.protobuf.Timestamp terminal_at = 9;
string error_message = 10;
}
+
+message StreamingProxyRoomCurrentStateDocument {
+ string id = 1;
+ string actor_id = 2;
+ int64 state_version = 3;
+ string last_event_id = 4;
+ google.protobuf.Timestamp updated_at = 5;
+ google.protobuf.Any state_root = 10;
+}
diff --git a/agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj b/agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj
deleted file mode 100644
index 0a8c06885..000000000
--- a/agents/Aevatar.GAgents.StreamingProxyParticipant/Aevatar.GAgents.StreamingProxyParticipant.csproj
+++ /dev/null
@@ -1,24 +0,0 @@
-
-
- net10.0
- enable
- enable
- Aevatar.GAgents.StreamingProxyParticipant
- Aevatar.GAgents.StreamingProxyParticipant
-
-
-
-
-
-
-
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
-
-
-
-
diff --git a/agents/Aevatar.GAgents.StreamingProxyParticipant/StreamingProxyParticipantGAgent.cs b/agents/Aevatar.GAgents.StreamingProxyParticipant/StreamingProxyParticipantGAgent.cs
deleted file mode 100644
index e485614b7..000000000
--- a/agents/Aevatar.GAgents.StreamingProxyParticipant/StreamingProxyParticipantGAgent.cs
+++ /dev/null
@@ -1,128 +0,0 @@
-using Aevatar.Foundation.Abstractions;
-using Aevatar.Foundation.Abstractions.Attributes;
-using Aevatar.Foundation.Core;
-using Aevatar.Foundation.Core.EventSourcing;
-using Google.Protobuf;
-
-namespace Aevatar.GAgents.StreamingProxyParticipant;
-
-///
-/// Singleton actor that tracks streaming proxy room participants.
-/// Replaces the chrono-storage backed ChronoStorageStreamingProxyParticipantStore.
-///
-/// Actor ID: streaming-proxy-participants (cluster-scoped singleton).
-///
-public sealed class StreamingProxyParticipantGAgent
- : GAgentBase, IProjectedActor
-{
- public static string ProjectionKind => "streaming-proxy-participant";
-
-
- [EventHandler(EndpointName = "addParticipant")]
- public async Task HandleParticipantAdded(ParticipantAddedEvent evt)
- {
- if (string.IsNullOrWhiteSpace(evt.RoomId) || string.IsNullOrWhiteSpace(evt.AgentId))
- return;
-
- await PersistDomainEventAsync(evt);
- }
-
- [EventHandler(EndpointName = "removeRoomParticipants")]
- public async Task HandleRoomParticipantsRemoved(RoomParticipantsRemovedEvent evt)
- {
- if (string.IsNullOrWhiteSpace(evt.RoomId))
- return;
-
- // Idempotent: skip if room does not exist
- if (!State.Rooms.ContainsKey(evt.RoomId))
- return;
-
- await PersistDomainEventAsync(evt);
- }
-
- [EventHandler(EndpointName = "removeParticipant")]
- public async Task HandleParticipantRemoved(ParticipantRemovedEvent evt)
- {
- if (string.IsNullOrWhiteSpace(evt.RoomId) || string.IsNullOrWhiteSpace(evt.AgentId))
- return;
-
- if (!State.Rooms.TryGetValue(evt.RoomId, out var list) ||
- !list.Participants.Any(p => string.Equals(p.AgentId, evt.AgentId, StringComparison.Ordinal)))
- {
- return;
- }
-
- await PersistDomainEventAsync(evt);
- }
-
- protected override async Task OnActivateAsync(CancellationToken ct)
- {
- await base.OnActivateAsync(ct);
- }
-
- protected override StreamingProxyParticipantGAgentState TransitionState(
- StreamingProxyParticipantGAgentState current, IMessage evt)
- {
- return StateTransitionMatcher
- .Match(current, evt)
- .On(ApplyParticipantAdded)
- .On(ApplyParticipantRemoved)
- .On(ApplyRoomRemoved)
- .OrCurrent();
- }
-
- private static StreamingProxyParticipantGAgentState ApplyParticipantAdded(
- StreamingProxyParticipantGAgentState state, ParticipantAddedEvent evt)
- {
- var next = state.Clone();
-
- if (!next.Rooms.TryGetValue(evt.RoomId, out var list))
- {
- list = new ParticipantList();
- next.Rooms[evt.RoomId] = list;
- }
-
- // Remove existing entry for the same agent (upsert semantics)
- var existing = list.Participants.FirstOrDefault(p =>
- string.Equals(p.AgentId, evt.AgentId, StringComparison.Ordinal));
- if (existing is not null)
- list.Participants.Remove(existing);
-
- list.Participants.Add(new ParticipantEntry
- {
- AgentId = evt.AgentId,
- DisplayName = evt.DisplayName,
- JoinedAt = evt.JoinedAt,
- });
-
- return next;
- }
-
- private static StreamingProxyParticipantGAgentState ApplyParticipantRemoved(
- StreamingProxyParticipantGAgentState state, ParticipantRemovedEvent evt)
- {
- var next = state.Clone();
- if (!next.Rooms.TryGetValue(evt.RoomId, out var list))
- return next;
-
- for (var index = list.Participants.Count - 1; index >= 0; index--)
- {
- if (string.Equals(list.Participants[index].AgentId, evt.AgentId, StringComparison.Ordinal))
- list.Participants.RemoveAt(index);
- }
-
- if (list.Participants.Count == 0)
- next.Rooms.Remove(evt.RoomId);
-
- return next;
- }
-
- private static StreamingProxyParticipantGAgentState ApplyRoomRemoved(
- StreamingProxyParticipantGAgentState state, RoomParticipantsRemovedEvent evt)
- {
- var next = state.Clone();
- next.Rooms.Remove(evt.RoomId);
- return next;
- }
-
-}
diff --git a/agents/Aevatar.GAgents.StreamingProxyParticipant/streaming_proxy_participant_messages.proto b/agents/Aevatar.GAgents.StreamingProxyParticipant/streaming_proxy_participant_messages.proto
deleted file mode 100644
index 46ae2b43c..000000000
--- a/agents/Aevatar.GAgents.StreamingProxyParticipant/streaming_proxy_participant_messages.proto
+++ /dev/null
@@ -1,40 +0,0 @@
-syntax = "proto3";
-package aevatar.gagents.streaming_proxy_participant;
-option csharp_namespace = "Aevatar.GAgents.StreamingProxyParticipant";
-
-import "google/protobuf/timestamp.proto";
-
-// ─── State ───
-
-message ParticipantEntry {
- string agent_id = 1;
- string display_name = 2;
- google.protobuf.Timestamp joined_at = 3;
-}
-
-message ParticipantList {
- repeated ParticipantEntry participants = 1;
-}
-
-message StreamingProxyParticipantGAgentState {
- // roomId → participants
- map rooms = 1;
-}
-
-// ─── Events ───
-
-message ParticipantAddedEvent {
- string room_id = 1;
- string agent_id = 2;
- string display_name = 3;
- google.protobuf.Timestamp joined_at = 4;
-}
-
-message ParticipantRemovedEvent {
- string room_id = 1;
- string agent_id = 2;
-}
-
-message RoomParticipantsRemovedEvent {
- string room_id = 1;
-}
diff --git a/docs/2026-04-02-streaming-proxy-flow.md b/docs/2026-04-02-streaming-proxy-flow.md
index f2727ad52..2f1b28a03 100644
--- a/docs/2026-04-02-streaming-proxy-flow.md
+++ b/docs/2026-04-02-streaming-proxy-flow.md
@@ -18,8 +18,8 @@
| `StreamingProxyEndpoints` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs` | 提供 room CRUD、`:chat`、`messages`、`messages:stream`、participant 管理 HTTP/SSE 入口 |
| `StreamingProxyGAgent` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs` | 房间 actor,本质上是 group chat broker;持久化事件、更新房间内消息/参与者状态、向订阅者发布事件 |
| `IGAgentActorRegistryCommandPort` / `IGAgentActorRegistryQueryPort` / `IScopeResourceAdmissionPort` | `src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentRegistryPorts.cs` | room ownership 的写入、列表查询与 command admission 边界 |
-| `IStreamingProxyParticipantStore` | `src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs` | room participant 的持久化索引,供 participant 查询、自动加入与失败移除时使用 |
-| `StreamingProxyNyxParticipantCoordinator` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs` | 在带 Bearer Token 时发现 Nyx 可用 provider,把它们自动加入房间并生成多轮回复 |
+| `IStreamingProxyParticipantQueryPort` | `src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs` | room participant 查询入口,读取 `StreamingProxyGAgent` 当前态 readmodel |
+| `StreamingProxyNyxParticipantCoordinator` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs` | 在带 Bearer Token 时发现 Nyx 可用 provider,并按 room actor 发出的单个 participant work item 执行 Nyx/LLM I/O |
| `StreamingProxySseWriter` | `agents/Aevatar.GAgents.StreamingProxy/StreamingProxySseWriter.cs` | 把 actor 事件映射成 SSE frame 输出给客户端 |
## 2. 总体拓扑
@@ -29,7 +29,7 @@
flowchart TB
CL["Client / OpenClaw"] --> API["StreamingProxyEndpoints\n/api/scopes/{scopeId}/streaming-proxy/..."]
API --> REG["GAgent registry ports\ncommand / query / admission"]
- API --> PSTORE["IStreamingProxyParticipantStore\nparticipant index"]
+ API --> PQUERY["IStreamingProxyParticipantQueryPort\nroom current-state readmodel"]
API --> RT["IActorRuntime"]
RT --> ACT["StreamingProxyGAgent\nroom actor"]
API --> SUB["IActorEventSubscriptionProvider"]
@@ -134,17 +134,21 @@ sequenceDiagram
SUB->>SSE: "WriteParticipantJoinedAsync"
SSE-->>CL: "PARTICIPANT_JOINED"
- API->>NYX: "GenerateRepliesAsync(...)"
- loop "每个 round / participant"
- NYX->>LLM: "provider.ChatAsync(request)"
+ API->>NYX: "RequestDiscussionAsync(...)"
+ NYX->>ACT: "Dispatch StreamingProxyNyxDiscussionRequested"
+ loop "actor-owned round / participant continuation"
+ ACT->>ACT: "persist transcript / active participants / round cursor"
+ ACT->>NYX: "RequestParticipantReplyAsync(work item)"
+ NYX->>LLM: "provider.ChatStreamAsync(request)"
LLM-->>NYX: "reply"
- NYX->>ACT: "Dispatch GroupChatMessageEvent"
- ACT->>ACT: "PersistDomainEventAsync"
+ NYX->>ACT: "Dispatch Nyx reply succeeded/failed"
+ ACT->>ACT: "record reply or prune failed participant"
+ ACT->>ACT: "Persist GroupChatMessageEvent or ParticipantLeftEvent"
ACT-->>SUB: "Publish GroupChatMessageEvent"
SUB->>SSE: "WriteAgentMessageAsync"
SSE-->>CL: "AGENT_MESSAGE"
end
- API->>SSE: "WriteRunFinishedAsync"
+ ACT->>ACT: "Persist terminal session state"
SSE-->>CL: "RUN_FINISHED"
else "没有 Token,或没有可用 Nyx participants"
note over API: "进入 activityChannel + timeout 等待模式"
@@ -194,37 +198,37 @@ flowchart LR
`EnsureParticipantsJoinedAsync(...)` 的行为是:
-1. 先从 `StreamingProxyActorStore` 读取已有 participant。
+1. 先从 `IStreamingProxyParticipantQueryPort` 读取 room actor 当前态 readmodel 中已有 participant。
2. 对新增 participant 投递 `GroupChatParticipantJoinedEvent` 给 room actor。
-3. 同时更新 `StreamingProxyActorStore` 的 participant 查询索引。
-所以当前实现里,participant 有两份表现层状态:
-
-1. actor 内 `_proxyState.Participants`
-2. `StreamingProxyActorStore` 里的 query list
+participant membership 的唯一写侧事实在 `StreamingProxyGAgentState.Participants`;查询只读取该状态的投影副本。
### 6.2 自动回复生成
-`GenerateRepliesAsync(...)` 的行为是:
+Nyx 自动回复的推进由 `StreamingProxyGAgent` 状态和事件处理驱动;`StreamingProxyNyxParticipantCoordinator` 只做 provider 发现和单次 participant LLM I/O。
-1. 按当前 active participants 决定总轮次。
+1. room actor 持有 `StreamingProxyNyxDiscussionSession`:
+ - active participant 集合
+ - 当前 round / participant cursor
+ - transcript
+ - 成功回复计数与 terminal 状态
+2. actor 按当前 active participants 决定总轮次。
- 多 participant 时最多 `4` 轮。
- 单 participant 时只跑 `1` 轮。
-2. 为每个 participant 构造一次 LLM request:
+3. actor 为当前 participant 发出单个 work item,coordinator 构造一次 LLM request:
- system prompt 明确它是 room 内 participant
- user prompt 带原始 topic 和最近 transcript
- metadata 带 `NyxIdAccessToken` 和 `NyxIdRoutePreference`
-3. 调 `NyxID provider.ChatAsync(...)` 拿回复。
-4. 对回复做规范化,去掉 speaker label,避免串写别人的回复。
-5. 把回复重新封装为 `GroupChatMessageEvent` 打回 room actor。
+4. coordinator 调 `NyxID provider.ChatStreamAsync(...)` 拿回复,并把结果作为 `StreamingProxyNyxParticipantReplySucceeded/Failed` continuation 打回 room actor。
+5. actor 在事件处理里记录 transcript、裁剪失败 participant、推进 round,并把成功回复持久化为 `GroupChatMessageEvent`。
注意这里的房间推进不是“直接把 LLM 文本写 SSE”,而是:
1. `LLM reply`
-2. 转成 `GroupChatMessageEvent`
+2. 转成 Nyx reply continuation
3. 回到 `StreamingProxyGAgent`
-4. actor 再发布事件
-5. SSE 订阅端再收到 `AGENT_MESSAGE`
+4. actor 持久化 `GroupChatMessageEvent` 或 `GroupChatParticipantLeftEvent`
+5. SSE 订阅端再收到 `AGENT_MESSAGE` / `PARTICIPANT_LEFT`
也就是说,SSE 输出仍然以 actor 事件流为单一来源。
@@ -244,15 +248,13 @@ flowchart LR
sequenceDiagram
participant EXT as "External Participant"
participant API as "StreamingProxyEndpoints"
- participant STORE as "StreamingProxyActorStore"
participant ACT as "StreamingProxyGAgent"
participant SUB as "Actor Event Subscription"
participant SSE as "StreamingProxySseWriter"
participant CL as "Client"
EXT->>API: "POST /participants"
- API->>ACT: "HandleEventAsync(GroupChatParticipantJoinedEvent)"
- API->>STORE: "AddParticipant(...)"
+ API->>ACT: "Dispatch GroupChatParticipantJoinedEvent"
ACT-->>SUB: "Publish joined event"
SUB->>SSE: "WriteParticipantJoinedAsync"
SSE-->>CL: "PARTICIPANT_JOINED"
@@ -304,18 +306,18 @@ sequenceDiagram
状态分别落在三处:
-1. `StreamingProxyActorStore`
- - room 列表
- - participant 查询索引
-2. `StreamingProxyGAgent` 事件与 `_proxyState`
+1. registry actor state
+ - room ownership
+2. `StreamingProxyGAgent` 事件与状态
- 房间名
- participant 集合
- message transcript
- `next_sequence`
-3. SSE 订阅流
+3. Projection readmodel / SSE 订阅流
+ - participant 查询读取 `StreamingProxyRoomCurrentStateDocument`
- 只承载实时输出,不承载查询事实
-如果只看当前代码,真正驱动前端实时展示的是 actor 发布的 `GroupChat*` 事件,不是 `StreamingProxyActorStore`。
+如果只看当前代码,真正驱动前端实时展示的是 actor 发布的 `GroupChat*` 事件;participant 查询读取同一 room actor 当前态的投影副本。
## 11. 关键代码锚点
@@ -323,6 +325,5 @@ sequenceDiagram
2. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyEndpoints.cs`
3. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyGAgent.cs`
4. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyNyxParticipantCoordinator.cs`
-5. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxyActorStore.cs`
-6. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxySseWriter.cs`
-7. `agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto`
+5. `agents/Aevatar.GAgents.StreamingProxy/StreamingProxySseWriter.cs`
+6. `agents/Aevatar.GAgents.StreamingProxy/streaming_proxy_messages.proto`
diff --git a/docs/2026-05-14-gagent-types-analysis.md b/docs/2026-05-14-gagent-types-analysis.md
index e2b76eba7..002f7705d 100644
--- a/docs/2026-05-14-gagent-types-analysis.md
+++ b/docs/2026-05-14-gagent-types-analysis.md
@@ -98,7 +98,6 @@ GAgentBase ← 无状态底座
| `ConnectorCatalogGAgent` | `GAgentBase` | `ConnectorCatalogState` | Connector 目录 |
| `DeviceRegistrationGAgent` | `GAgentBase` | `DeviceRegistrationState` | 设备注册 |
| `StreamingProxyGAgent` | `GAgentBase` | `StreamingProxyGAgentState` | 流式代理房间 |
-| `StreamingProxyParticipantGAgent` | `GAgentBase` | `StreamingProxyParticipantGAgentState` | 流式代理参与者 |
| `ConversationGAgent` | `GAgentBase` | `ConversationGAgentState` | 渠道对话(Lark/Telegram) |
| `ChannelBotRegistrationGAgent` | `GAgentBase` | `ChannelBotRegistrationStoreState` | 渠道 Bot 注册 |
| `ChannelUserBindingGAgent` | `GAgentBase` | `ChannelUserBindingState` | 渠道用户绑定 |
@@ -131,7 +130,7 @@ GAgentBase ← 无状态底座
| `RoleCatalogGAgent` | role_name → role spec |
| `ConnectorCatalogGAgent` | connector_name → connector config |
| `UserAgentCatalogGAgent` | user + agent_id → catalog entry |
-| `StreamingProxyParticipantGAgent` | room_id → participant set |
+| `StreamingProxyGAgent` | room_id → participant set + messages |
| `ChatHistoryIndexGAgent` | user → conversation list |
**共性模式**:`Upsert → Tombstone → Compact`,状态内维护 `last_applied_event_version`,projection 直出 current-state read model。
@@ -314,7 +313,7 @@ abstract class RunGAgentBase : GAgentBase
#### 4.1.2 `CatalogGAgentBase` — 目录索引基类
-**适用对象**:`ScriptCatalogGAgent`、`GAgentRegistryGAgent`、`RoleCatalogGAgent`、`ConnectorCatalogGAgent`、`UserAgentCatalogGAgent`、`StreamingProxyParticipantGAgent`、`ChatHistoryIndexGAgent`
+**适用对象**:`ScriptCatalogGAgent`、`GAgentRegistryGAgent`、`RoleCatalogGAgent`、`ConnectorCatalogGAgent`、`UserAgentCatalogGAgent`、`ChatHistoryIndexGAgent`
**可抽取的通用能力**:
```
diff --git a/docs/canon/aevatar-channel-architecture.md b/docs/canon/aevatar-channel-architecture.md
index dc47ceb6e..f6c897399 100644
--- a/docs/canon/aevatar-channel-architecture.md
+++ b/docs/canon/aevatar-channel-architecture.md
@@ -1092,7 +1092,7 @@ adapter 在构造 `ChatActivity` 时**必须**:
- `Aevatar.GAgents.ChatHistory` —— 独立 GAgent(`ChatConversationGAgent` / `ChatHistoryIndexGAgent`)。**但当前 ChannelRuntime 未集成它**;对话历史实际上是 `AIGAgentBase` 里的进程内 `ChatHistory`(见 `src/Aevatar.AI.Core/AIGAgentBase.cs`)。`ConversationGAgent` 要集成它是**新工作**,不是"复用现有集成"
- `Aevatar.GAgents.UserMemory` —— 同样独立 GAgent 存在,但当前无集成。`ConversationGAgent` 的 long-term memory 集成是新工作
- `Aevatar.GAgents.ChatbotClassifier` —— 按需包成 `ClassificationMiddleware`
-- `Aevatar.GAgents.StreamingProxy` / `StreamingProxyParticipant` —— LLM streaming 底层可复用
+- `Aevatar.GAgents.StreamingProxy` —— LLM streaming 底层可复用
- `Aevatar.GAgents.Registry` —— 平台级 GAgent registry,和拟改名的 `UserAgentCatalog` 各司其职(platform actor routing vs user agent metadata)
**诚实承认**:早期 RFC 版本措辞是 "必须调用 ChatHistory / UserMemory,不重复存"——暗示已有集成。实际上这些是**未来集成目标**,不是现状复用。本 RFC 实施时需要把这层集成**新建**出来,不要误以为是捡现成。
@@ -1513,7 +1513,6 @@ agents/ ← production code
├── Aevatar.GAgents.Registry/ ← 平台级 registry
├── Aevatar.GAgents.RoleCatalog/
├── Aevatar.GAgents.StreamingProxy/
-├── Aevatar.GAgents.StreamingProxyParticipant/
├── Aevatar.GAgents.UserConfig/
└── Aevatar.GAgents.UserMemory/ ← ConversationGAgent 与其集成
@@ -2420,7 +2419,7 @@ public interface ICredentialProvider {
### 17.5 Orleans grain-based cluster-singleton primitive(P2 — 第二 long-conn 场景触发)
-**缺口**:aevatar 缺"**集群唯一持有某个外部长连接/会话所有权**"的通用做法。现有"well-known singleton actor" 模式(`RoleCatalogGAgent.cs:14` / `ConnectorCatalogGAgent.cs:14` / `StreamingProxyParticipantGAgent.cs:13` / `ChannelBotRegistrationGAgent.cs:15` / `DeviceRegistrationGAgent.cs:15`)是**被动 actor**——只要 grain id 固定就行,没有 lease / epoch fencing / failover ownership 语义。
+**缺口**:aevatar 缺"**集群唯一持有某个外部长连接/会话所有权**"的通用做法。现有"well-known singleton actor" 模式(`RoleCatalogGAgent.cs:14` / `ConnectorCatalogGAgent.cs:14` / `ChannelBotRegistrationGAgent.cs:15` / `DeviceRegistrationGAgent.cs:15`)是**被动 actor**——只要 grain id 固定就行,没有 lease / epoch fencing / failover ownership 语义。
现有 hosted service(`UserAgentCatalogStartupService.cs:22-60` / `ChannelBotRegistrationStartupService.cs:33-72`)是 **node-local startup/warmup**——host 启动时 poke 一下 grain 让它 activate,不是 cluster-wide supervisor。
diff --git a/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs b/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs
new file mode 100644
index 000000000..de90762d2
--- /dev/null
+++ b/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantQueryPort.cs
@@ -0,0 +1,15 @@
+namespace Aevatar.Studio.Application.Studio.Abstractions;
+
+///
+/// Query-side participant view for streaming proxy rooms.
+///
+public interface IStreamingProxyParticipantQueryPort
+{
+ Task> ListAsync(
+ string roomId, CancellationToken cancellationToken = default);
+}
+
+public sealed record StreamingProxyParticipant(
+ string AgentId,
+ string DisplayName,
+ DateTimeOffset JoinedAt);
diff --git a/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs b/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs
deleted file mode 100644
index 7e0cc8d46..000000000
--- a/src/Aevatar.Studio.Application/Studio/Abstractions/IStreamingProxyParticipantStore.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-namespace Aevatar.Studio.Application.Studio.Abstractions;
-
-///
-/// Persistent participant index for streaming proxy rooms.
-///
-///
-/// TODO: When wiring to endpoints, the caller must handle corrupt-data exceptions
-/// from the underlying store (e.g. from
-/// deserialization failures). Swallowing errors and returning an empty list would
-/// silently discard existing participants. The chrono-storage implementation
-/// intentionally throws on corruption to prevent data loss.
-///
-public interface IStreamingProxyParticipantStore
-{
- Task> ListAsync(
- string roomId, CancellationToken cancellationToken = default);
-
- Task AddAsync(
- string roomId, string agentId, string displayName,
- CancellationToken cancellationToken = default);
-
- Task RemoveParticipantAsync(
- string roomId, string agentId,
- CancellationToken cancellationToken = default);
-
- Task RemoveRoomAsync(
- string roomId, CancellationToken cancellationToken = default);
-}
-
-public sealed record StreamingProxyParticipant(
- string AgentId,
- string DisplayName,
- DateTimeOffset JoinedAt);
diff --git a/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs b/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs
index 0b1e40e77..9b86a6aa8 100644
--- a/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs
+++ b/src/Aevatar.Studio.Hosting/StudioProjectionReadModelServiceCollectionExtensions.cs
@@ -8,7 +8,6 @@
using Aevatar.GAgents.ConnectorCatalog;
using Aevatar.GAgents.Registry;
using Aevatar.GAgents.RoleCatalog;
-using Aevatar.GAgents.StreamingProxyParticipant;
using Aevatar.GAgents.StudioMember;
using Aevatar.GAgents.StudioTeam;
using Aevatar.Studio.Workspace;
@@ -30,7 +29,7 @@ namespace Aevatar.Studio.Hosting;
/// configuration. Required by the actor-backed stores
/// (IRoleCatalogStore, IConnectorCatalogStore,
/// IChatHistoryStore, IGAgentActorRegistryQueryPort,
-/// IUserMemoryStore, IStreamingProxyParticipantStore) that read
+/// IUserMemoryStore) that read
/// from these documents via IProjectionDocumentReader.
///
internal static class StudioProjectionReadModelServiceCollectionExtensions
@@ -67,7 +66,6 @@ public static IServiceCollection AddStudioProjectionReadModelProviders(
RegisterElasticsearch(services, configuration);
RegisterElasticsearch(services, configuration);
RegisterElasticsearch(services, configuration);
- RegisterElasticsearch(services, configuration);
RegisterElasticsearch(services, configuration);
RegisterElasticsearch(services, configuration);
RegisterElasticsearch(services, configuration);
@@ -82,7 +80,6 @@ public static IServiceCollection AddStudioProjectionReadModelProviders(
RegisterInMemory(services);
RegisterInMemory(services);
RegisterInMemory(services);
- RegisterInMemory(services);
RegisterInMemory(services);
RegisterInMemory(services);
RegisterInMemory(services);
@@ -134,7 +131,6 @@ private static bool HasAllStudioDocumentReaders(
&& HasDocumentReaderForProvider(services, providerKind)
&& HasDocumentReaderForProvider(services, providerKind)
&& HasDocumentReaderForProvider(services, providerKind)
- && HasDocumentReaderForProvider(services, providerKind)
&& HasDocumentReaderForProvider(services, providerKind)
&& HasDocumentReaderForProvider(services, providerKind)
&& HasDocumentReaderForProvider(services, providerKind)
@@ -219,7 +215,6 @@ private static TypeRegistry BuildStudioStateTypeRegistry()
ConnectorCatalogState.Descriptor,
RoleCatalogState.Descriptor,
UserMemoryState.Descriptor,
- StreamingProxyParticipantGAgentState.Descriptor,
ChatHistoryIndexState.Descriptor,
ChatConversationState.Descriptor,
StudioMemberState.Descriptor,
diff --git a/src/Aevatar.Studio.Infrastructure/ActorBacked/ActorBackedStreamingProxyParticipantStore.cs b/src/Aevatar.Studio.Infrastructure/ActorBacked/ActorBackedStreamingProxyParticipantStore.cs
deleted file mode 100644
index 95152299b..000000000
--- a/src/Aevatar.Studio.Infrastructure/ActorBacked/ActorBackedStreamingProxyParticipantStore.cs
+++ /dev/null
@@ -1,101 +0,0 @@
-using Aevatar.CQRS.Projection.Stores.Abstractions;
-using Aevatar.Foundation.Abstractions;
-using Aevatar.GAgents.StreamingProxyParticipant;
-using Aevatar.Studio.Application.Studio.Abstractions;
-using Aevatar.Studio.Projection.ReadModels;
-using Google.Protobuf.WellKnownTypes;
-using Microsoft.Extensions.Logging;
-
-namespace Aevatar.Studio.Infrastructure.ActorBacked;
-
-///
-/// Actor-backed implementation of .
-/// Reads from the projection document store (CQRS read model).
-/// Writes send commands to the Write GAgent.
-///
-internal sealed class ActorBackedStreamingProxyParticipantStore
- : IStreamingProxyParticipantStore
-{
- private const string WriteActorId = "streaming-proxy-participants";
-
- private readonly IStudioActorBootstrap _bootstrap;
- private readonly IActorDispatchPort _dispatchPort;
- private readonly IProjectionDocumentReader _documentReader;
- private readonly ILogger _logger;
-
- public ActorBackedStreamingProxyParticipantStore(
- IStudioActorBootstrap bootstrap,
- IActorDispatchPort dispatchPort,
- IProjectionDocumentReader documentReader,
- ILogger logger)
- {
- _bootstrap = bootstrap ?? throw new ArgumentNullException(nameof(bootstrap));
- _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort));
- _documentReader = documentReader ?? throw new ArgumentNullException(nameof(documentReader));
- _logger = logger ?? throw new ArgumentNullException(nameof(logger));
- }
-
- public async Task> ListAsync(
- string roomId, CancellationToken cancellationToken = default)
- {
- var document = await _documentReader.GetAsync(WriteActorId, cancellationToken);
- if (document?.StateRoot == null ||
- !document.StateRoot.Is(StreamingProxyParticipantGAgentState.Descriptor))
- return [];
-
- var state = document.StateRoot.Unpack();
- if (!state.Rooms.TryGetValue(roomId, out var list))
- return [];
-
- return list.Participants
- .Select(p => new StreamingProxyParticipant(
- p.AgentId,
- p.DisplayName,
- p.JoinedAt.ToDateTimeOffset()))
- .ToList()
- .AsReadOnly();
- }
-
- public async Task AddAsync(
- string roomId, string agentId, string displayName,
- CancellationToken cancellationToken = default)
- {
- var actor = await EnsureWriteActorAsync(cancellationToken);
- var evt = new ParticipantAddedEvent
- {
- RoomId = roomId,
- AgentId = agentId,
- DisplayName = displayName,
- JoinedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
- };
- await ActorCommandDispatcher.SendAsync(_dispatchPort, actor, evt, cancellationToken);
- }
-
- public async Task RemoveParticipantAsync(
- string roomId, string agentId, CancellationToken cancellationToken = default)
- {
- var actor = await EnsureWriteActorAsync(cancellationToken);
- var evt = new ParticipantRemovedEvent
- {
- RoomId = roomId,
- AgentId = agentId,
- };
- await ActorCommandDispatcher.SendAsync(_dispatchPort, actor, evt, cancellationToken);
- }
-
- public async Task RemoveRoomAsync(
- string roomId, CancellationToken cancellationToken = default)
- {
- var actor = await EnsureWriteActorAsync(cancellationToken);
- var evt = new RoomParticipantsRemovedEvent
- {
- RoomId = roomId,
- };
- await ActorCommandDispatcher.SendAsync(_dispatchPort, actor, evt, cancellationToken);
- }
-
- // ── Actor resolution ──
-
- private Task EnsureWriteActorAsync(CancellationToken ct) =>
- _bootstrap.EnsureAsync(WriteActorId, ct);
-}
diff --git a/src/Aevatar.Studio.Infrastructure/ActorBacked/ProjectionStreamingProxyParticipantQueryPort.cs b/src/Aevatar.Studio.Infrastructure/ActorBacked/ProjectionStreamingProxyParticipantQueryPort.cs
new file mode 100644
index 000000000..8bea4bc0d
--- /dev/null
+++ b/src/Aevatar.Studio.Infrastructure/ActorBacked/ProjectionStreamingProxyParticipantQueryPort.cs
@@ -0,0 +1,49 @@
+using Aevatar.CQRS.Projection.Stores.Abstractions;
+using Aevatar.GAgents.StreamingProxy;
+using Aevatar.Studio.Application.Studio.Abstractions;
+using Microsoft.Extensions.Logging;
+using AppStreamingProxyParticipant = Aevatar.Studio.Application.Studio.Abstractions.StreamingProxyParticipant;
+
+namespace Aevatar.Studio.Infrastructure.ActorBacked;
+
+///
+/// Projection-backed participant query port for StreamingProxy rooms.
+///
+internal sealed class ProjectionStreamingProxyParticipantQueryPort
+ : IStreamingProxyParticipantQueryPort
+{
+ private readonly IProjectionDocumentReader _documentReader;
+ private readonly ILogger _logger;
+
+ public ProjectionStreamingProxyParticipantQueryPort(
+ IProjectionDocumentReader documentReader,
+ ILogger logger)
+ {
+ _documentReader = documentReader ?? throw new ArgumentNullException(nameof(documentReader));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public async Task> ListAsync(
+ string roomId, CancellationToken cancellationToken = default)
+ {
+ var document = await _documentReader.GetAsync(roomId, cancellationToken);
+ if (document?.StateRoot == null ||
+ !document.StateRoot.Is(StreamingProxyGAgentState.Descriptor))
+ {
+ _logger.LogDebug(
+ "StreamingProxy room current-state document is not available for room {RoomId}.",
+ roomId);
+ return [];
+ }
+
+ var state = document.StateRoot.Unpack();
+
+ return state.Participants
+ .Select(p => new AppStreamingProxyParticipant(
+ p.AgentId,
+ p.DisplayName,
+ p.JoinedAt?.ToDateTimeOffset() ?? DateTimeOffset.MinValue))
+ .ToList()
+ .AsReadOnly();
+ }
+}
diff --git a/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj b/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj
index e886cb485..f9088ebed 100644
--- a/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj
+++ b/src/Aevatar.Studio.Infrastructure/Aevatar.Studio.Infrastructure.csproj
@@ -17,7 +17,7 @@
-
+
diff --git a/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs b/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs
index 4efa6bc81..6f000651b 100644
--- a/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/Aevatar.Studio.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs
@@ -44,7 +44,7 @@ public static IServiceCollection AddStudioInfrastructure(
services.AddSingleton(sp => sp.GetRequiredService());
services.AddSingleton(sp => sp.GetRequiredService());
services.AddSingleton(sp => sp.GetRequiredService());
- services.AddSingleton();
+ services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
diff --git a/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj b/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj
index 5146fabeb..f50966e38 100644
--- a/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj
+++ b/src/Aevatar.Studio.Projection/Aevatar.Studio.Projection.csproj
@@ -20,7 +20,6 @@
-
diff --git a/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs b/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs
index e91c1fb14..b3f9f18fd 100644
--- a/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/Aevatar.Studio.Projection/DependencyInjection/ServiceCollectionExtensions.cs
@@ -78,10 +78,6 @@ public static IServiceCollection AddStudioProjectionComponents(
StudioMaterializationContext,
UserMemoryCurrentStateProjector>();
- services.AddCurrentStateProjectionMaterializer<
- StudioMaterializationContext,
- StreamingProxyParticipantCurrentStateProjector>();
-
services.AddCurrentStateProjectionMaterializer<
StudioMaterializationContext,
ChatHistoryIndexCurrentStateProjector>();
@@ -128,10 +124,6 @@ public static IServiceCollection AddStudioProjectionComponents(
IProjectionDocumentMetadataProvider,
UserMemoryCurrentStateDocumentMetadataProvider>();
- services.TryAddSingleton<
- IProjectionDocumentMetadataProvider,
- StreamingProxyParticipantCurrentStateDocumentMetadataProvider>();
-
services.TryAddSingleton<
IProjectionDocumentMetadataProvider,
ChatHistoryIndexCurrentStateDocumentMetadataProvider>();
diff --git a/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto b/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto
index 730c5091d..6ed78ca36 100644
--- a/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto
+++ b/src/Aevatar.Studio.Projection/ReadModels/studio_projection_readmodels.proto
@@ -68,17 +68,6 @@ message UserMemoryCurrentStateDocument {
google.protobuf.Any state_root = 10;
}
-// ─── StreamingProxyParticipant Current State ReadModel ───
-
-message StreamingProxyParticipantCurrentStateDocument {
- string id = 1;
- string actor_id = 2;
- int64 state_version = 3;
- string last_event_id = 4;
- google.protobuf.Timestamp updated_at = 5;
- google.protobuf.Any state_root = 10;
-}
-
// ─── ChatHistoryIndex Current State ReadModel ───
message ChatHistoryIndexCurrentStateDocument {
diff --git a/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj b/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj
index e4a24545a..21d56e31e 100644
--- a/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj
+++ b/test/Aevatar.AI.Tests/Aevatar.AI.Tests.csproj
@@ -27,7 +27,6 @@
-
diff --git a/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs b/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs
index 0e81a8a5a..7e1d40ca8 100644
--- a/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs
+++ b/test/Aevatar.AI.Tests/StreamingProxyCoverageTests.cs
@@ -12,6 +12,7 @@
using Aevatar.CQRS.Core.Abstractions.Streaming;
using Aevatar.CQRS.Core.Commands;
using Aevatar.Foundation.Core.EventSourcing;
+using Aevatar.Foundation.Abstractions.Persistence;
using Aevatar.Foundation.Abstractions.Streaming;
using Aevatar.Studio.Application.Studio.Abstractions;
using Aevatar.GAgentService.Abstractions.ScopeGAgents;
@@ -190,10 +191,9 @@ public async Task HandleListRoomsAsync_ShouldReturnRoomsForScope()
}
[Fact]
- public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromBothStores()
+ public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromRegistry()
{
var actorStore = new StubGAgentActorStore();
- var participantStore = new StubParticipantStore();
var result = await InvokeResultAsync(
"HandleDeleteRoomAsync",
@@ -202,7 +202,6 @@ public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromBothStores()
"room-1",
actorStore,
actorStore,
- participantStore,
NullLoggerFactory.Instance,
CancellationToken.None);
@@ -211,7 +210,6 @@ public async Task HandleDeleteRoomAsync_ShouldReturnOk_AndRemoveFromBothStores()
actorStore.RemovedActors.Should().ContainSingle(x =>
x.scopeId == "scope-a" &&
x.gagentType == StreamingProxyDefaults.GAgentTypeName && x.actorId == "room-1");
- participantStore.RemovedRooms.Should().ContainSingle(x => x == "room-1");
}
[Fact]
@@ -221,8 +219,6 @@ public async Task HandleDeleteRoomAsync_UnregisterFailure_ShouldReturnUnavailabl
{
UnregisterException = new InvalidOperationException("registry unavailable"),
};
- var participantStore = new StubParticipantStore();
-
var result = await InvokeResultAsync(
"HandleDeleteRoomAsync",
CreateScopedHttpContext(),
@@ -230,13 +226,11 @@ public async Task HandleDeleteRoomAsync_UnregisterFailure_ShouldReturnUnavailabl
"room-1",
actorStore,
actorStore,
- participantStore,
NullLoggerFactory.Instance,
CancellationToken.None);
var response = await ExecuteResultAsync(result);
response.StatusCode.Should().Be(StatusCodes.Status503ServiceUnavailable);
- participantStore.RemovedRooms.Should().BeEmpty();
}
[Fact]
@@ -247,14 +241,14 @@ public async Task HandleChatAsync_ShouldRejectEmptyPrompt()
var dispatchPort = new StubActorDispatchPort(runtime);
var interactionService = new StubStreamingProxyRoomChatInteractionService();
var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(new StubTerminalQueryPort());
- var participantStore = new StubParticipantStore();
+ var participantQueryPort = new StubParticipantStore();
var actorStore = new StubGAgentActorStore();
var coordinator = CreateNyxParticipantCoordinator();
var method = typeof(StreamingProxyEndpoints).GetMethod(
"HandleChatAsync",
BindingFlags.NonPublic | BindingFlags.Static)!;
- var task = method.Invoke(null, [context, "scope-a", "room-a", new ChatTopicRequest(null), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, CancellationToken.None]);
+ var task = method.Invoke(null, [context, "scope-a", "room-a", new ChatTopicRequest(null), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, CancellationToken.None]);
await InvokeTaskAsync(task);
context.Response.StatusCode.Should().Be(StatusCodes.Status400BadRequest);
@@ -270,7 +264,7 @@ public async Task HandleChatAsync_ShouldRejectMismatchedAuthenticatedScope()
var dispatchPort = new StubActorDispatchPort(runtime);
var interactionService = new StubStreamingProxyRoomChatInteractionService();
var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(new StubTerminalQueryPort());
- var participantStore = new StubParticipantStore();
+ var participantQueryPort = new StubParticipantStore();
var actorStore = new StubGAgentActorStore();
var coordinator = CreateNyxParticipantCoordinator();
@@ -279,7 +273,7 @@ public async Task HandleChatAsync_ShouldRejectMismatchedAuthenticatedScope()
BindingFlags.NonPublic | BindingFlags.Static)!;
var task = method.Invoke(
null,
- [context, "scope-a", "room-a", new ChatTopicRequest("hello"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, CancellationToken.None]);
+ [context, "scope-a", "room-a", new ChatTopicRequest("hello"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, CancellationToken.None]);
await InvokeTaskAsync(task);
context.Response.StatusCode.Should().Be(StatusCodes.Status403Forbidden);
@@ -449,7 +443,7 @@ public async Task HandleChatAsync_ShouldAttachProjectionSession_AndEmitRunFinish
var interactionService = new StubStreamingProxyRoomChatInteractionService();
var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(
new StubTerminalQueryPort(StreamingProxyChatSessionTerminalStatus.Completed));
- var participantStore = new StubParticipantStore();
+ var participantQueryPort = new StubParticipantStore();
var actorStore = new StubGAgentActorStore();
var coordinator = CreateNyxParticipantCoordinator();
var request = new ChatTopicRequest("Discuss webhook relay", "session-123");
@@ -536,7 +530,7 @@ public async Task HandleChatAsync_ShouldAttachProjectionSession_AndEmitRunFinish
BindingFlags.NonPublic | BindingFlags.Static)!;
await InvokeTaskAsync(method.Invoke(
null,
- [context, "scope-a", "room-a", request, runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, CancellationToken.None]));
+ [context, "scope-a", "room-a", request, runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, CancellationToken.None]));
interactionService.Commands.Should().ContainSingle().Which.Should().Be(new StreamingProxyRoomChatCommand(
"room-a",
@@ -565,7 +559,7 @@ public async Task HandleChatAsync_ShouldPublishFailedTerminalState_WhenCancelled
WaitForCancellation = true,
};
var durableCompletionResolver = new StreamingProxyChatDurableCompletionResolver(new StubTerminalQueryPort());
- var participantStore = new StubParticipantStore();
+ var participantQueryPort = new StubParticipantStore();
var actorStore = new StubGAgentActorStore();
var coordinator = CreateNyxParticipantCoordinator();
using var cts = new CancellationTokenSource();
@@ -575,7 +569,7 @@ public async Task HandleChatAsync_ShouldPublishFailedTerminalState_WhenCancelled
BindingFlags.NonPublic | BindingFlags.Static)!;
var task = InvokeTaskAsync(method.Invoke(
null,
- [context, "scope-a", "room-a", new ChatTopicRequest("Cancel me", "session-cancel"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantStore, coordinator, NullLoggerFactory.Instance, cts.Token]));
+ [context, "scope-a", "room-a", new ChatTopicRequest("Cancel me", "session-cancel"), runtime, dispatchPort, actorStore, interactionService, durableCompletionResolver, participantQueryPort, coordinator, NullLoggerFactory.Instance, cts.Token]));
await interactionService.Started.Task;
cts.Cancel();
@@ -824,22 +818,6 @@ await stream.PumpAsync(
emitted.Should().BeEmpty();
}
- [Fact]
- public void DetermineParticipantTerminalState_ShouldFail_WhenNoRepliesWereProduced()
- {
- var method = typeof(StreamingProxyEndpoints).GetMethod(
- "DetermineParticipantTerminalState",
- BindingFlags.NonPublic | BindingFlags.Static)!;
-
- var failed = ((StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage))method.Invoke(null, [0])!;
- failed.Status.Should().Be(StreamingProxyChatSessionTerminalStatus.Failed);
- failed.ErrorMessage.Should().Be("StreamingProxy chat completed without any participant replies.");
-
- var completed = ((StreamingProxyChatSessionTerminalStatus Status, string? ErrorMessage))method.Invoke(null, [1])!;
- completed.Status.Should().Be(StreamingProxyChatSessionTerminalStatus.Completed);
- completed.ErrorMessage.Should().BeNull();
- }
-
[Fact]
public async Task TryPublishFailedTerminalStateAsync_ShouldEmitFailedTerminalEvent_WhenCompletionIsUnknown()
{
@@ -1059,9 +1037,8 @@ public async Task HandlePostMessageAsync_ShouldRejectMissingFieldsAndReturnAccep
}
[Fact]
- public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndAddParticipant()
+ public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndDispatchJoinEvent()
{
- var participantStore = new StubParticipantStore();
var runtime = new StubActorRuntime(new List { new StubActor("room-a") });
var dispatchPort = new StubActorDispatchPort(runtime);
var actorStore = new StubGAgentActorStore();
@@ -1074,7 +1051,6 @@ public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndAddParticipant()
new JoinRoomRequest(null, null),
runtime,
actorStore,
- participantStore,
NullLoggerFactory.Instance,
CancellationToken.None);
@@ -1091,14 +1067,11 @@ public async Task HandleJoinAsync_ShouldRejectMissingAgentIdAndAddParticipant()
runtime,
dispatchPort,
actorStore,
- participantStore,
NullLoggerFactory.Instance,
CancellationToken.None);
response = await ExecuteResultAsync(result);
response.StatusCode.Should().Be(StatusCodes.Status200OK);
- participantStore.AddedParticipants.Should().ContainSingle(x =>
- x.roomId == "room-a" && x.agentId == "agent-1" && x.displayName == "Alice");
dispatchPort.Dispatches.Should().ContainSingle(x => x.ActorId == "room-a");
}
@@ -1243,10 +1216,10 @@ await WriteRoomSessionEventAsync(
}
[Fact]
- public async Task HandleListParticipantsAsync_ShouldReturnStoreParticipants()
+ public async Task HandleListParticipantsAsync_ShouldReturnQueriedParticipants()
{
- var participantStore = new StubParticipantStore();
- participantStore.Participants["room-a"] =
+ var participantQueryPort = new StubParticipantStore();
+ participantQueryPort.Participants["room-a"] =
[
new StreamingProxyParticipant("agent-1", "Alice", DateTimeOffset.UtcNow),
];
@@ -1257,7 +1230,7 @@ public async Task HandleListParticipantsAsync_ShouldReturnStoreParticipants()
"scope-a",
"room-a",
new StubGAgentActorStore(),
- participantStore,
+ participantQueryPort,
NullLoggerFactory.Instance,
CancellationToken.None);
@@ -1324,6 +1297,97 @@ await agent.HandleGroupChatMessage(new GroupChatMessageEvent
.ContainSingle(x => x.AgentId == "agent-1");
}
+ [Fact]
+ public async Task StreamingProxyGAgent_ShouldPruneFailedNyxParticipant_AndContinueWithNextParticipant()
+ {
+ var coordinator = new RecordingNyxCoordinator();
+ using var provider = BuildStreamingProxyAgentProvider(coordinator);
+ var agent = CreateAgent(provider, "room-a");
+
+ await agent.ActivateAsync();
+ await agent.HandleNyxDiscussionRequested(new StreamingProxyNyxDiscussionRequested
+ {
+ SessionId = "session-a",
+ Prompt = "Discuss release risk",
+ AccessToken = "token-a",
+ Participants =
+ {
+ BuildNyxParticipant("node-a", "Node A"),
+ BuildNyxParticipant("node-b", "Node B"),
+ },
+ });
+
+ coordinator.WorkItems.Should().ContainSingle();
+ coordinator.WorkItems[0].Participant.ParticipantId.Should().Be("node-a");
+
+ await agent.HandleNyxParticipantReplyFailed(new StreamingProxyNyxParticipantReplyFailed
+ {
+ SessionId = "session-a",
+ AccessToken = "token-a",
+ Round = 1,
+ ParticipantId = "node-a",
+ DisplayName = "Node A",
+ ErrorMessage = "unavailable",
+ });
+
+ var session = agent.State.NyxDiscussionSessions["session-a"];
+ session.ActiveParticipants.Select(x => x.ParticipantId).Should().Equal("node-b");
+ session.CurrentParticipantIndex.Should().Be(0);
+ coordinator.WorkItems.Should().HaveCount(2);
+ coordinator.WorkItems[1].Participant.ParticipantId.Should().Be("node-b");
+ agent.State.Participants.Should().NotContain(x => x.AgentId == "node-a");
+ }
+
+ [Fact]
+ public async Task StreamingProxyGAgent_ShouldAdvanceNyxRound_FromActorState()
+ {
+ var coordinator = new RecordingNyxCoordinator();
+ using var provider = BuildStreamingProxyAgentProvider(coordinator);
+ var agent = CreateAgent(provider, "room-a");
+
+ await agent.ActivateAsync();
+ await agent.HandleNyxDiscussionRequested(new StreamingProxyNyxDiscussionRequested
+ {
+ SessionId = "session-a",
+ Prompt = "Discuss release risk",
+ AccessToken = "token-a",
+ Participants =
+ {
+ BuildNyxParticipant("node-a", "Node A"),
+ BuildNyxParticipant("node-b", "Node B"),
+ },
+ });
+
+ await agent.HandleNyxParticipantReplySucceeded(new StreamingProxyNyxParticipantReplySucceeded
+ {
+ SessionId = "session-a",
+ AccessToken = "token-a",
+ Round = 1,
+ ParticipantId = "node-a",
+ DisplayName = "Node A",
+ Content = "First answer.",
+ });
+ await agent.HandleNyxParticipantReplySucceeded(new StreamingProxyNyxParticipantReplySucceeded
+ {
+ SessionId = "session-a",
+ AccessToken = "token-a",
+ Round = 1,
+ ParticipantId = "node-b",
+ DisplayName = "Node B",
+ Content = "Second answer.",
+ });
+
+ var session = agent.State.NyxDiscussionSessions["session-a"];
+ session.CurrentRound.Should().Be(2);
+ session.CurrentParticipantIndex.Should().Be(0);
+ session.CurrentRoundSuccessfulReplies.Should().Be(0);
+ session.TotalSuccessfulReplies.Should().Be(2);
+ session.Transcript.Select(x => x.Speaker).Should().Equal("Node A", "Node B");
+ coordinator.WorkItems.Should().HaveCount(3);
+ coordinator.WorkItems[2].Round.Should().Be(2);
+ coordinator.WorkItems[2].Participant.ParticipantId.Should().Be("node-a");
+ }
+
[Fact]
public async Task StreamingProxySseWriter_ShouldStartStream_AndSerializeRoomFrames()
{
@@ -1372,6 +1436,28 @@ private static StreamingProxyGAgent CreateAgent(IServiceProvider provider, strin
return agent;
}
+ private static ServiceProvider BuildStreamingProxyAgentProvider(
+ StreamingProxyNyxParticipantCoordinator coordinator)
+ {
+ return new ServiceCollection()
+ .AddSingleton()
+ .AddSingleton()
+ .AddTransient(typeof(IEventSourcingBehaviorFactory<>), typeof(DefaultEventSourcingBehaviorFactory<>))
+ .AddSingleton(coordinator)
+ .BuildServiceProvider();
+ }
+
+ private static StreamingProxyNyxParticipant BuildNyxParticipant(
+ string participantId,
+ string displayName) =>
+ new()
+ {
+ ParticipantId = participantId,
+ RoutePreference = $"/api/v1/proxy/s/openclaw/{participantId}",
+ DisplayName = displayName,
+ Model = "claude-sonnet-4-5-20250929",
+ };
+
private static EventEnvelope CreateTopologyEnvelope(IMessage payload) =>
new()
{
@@ -1944,11 +2030,9 @@ public Task CreateRoomAsync(
}
}
- private sealed class StubParticipantStore : IStreamingProxyParticipantStore
+ private sealed class StubParticipantStore : IStreamingProxyParticipantQueryPort
{
public Dictionary> Participants { get; } = new(StringComparer.Ordinal);
- public List<(string roomId, string agentId, string displayName)> AddedParticipants { get; } = [];
- public List RemovedRooms { get; } = [];
public Task> ListAsync(
string roomId, CancellationToken cancellationToken = default)
@@ -1957,26 +2041,6 @@ public Task> ListAsync(
return Task.FromResult>(list.AsReadOnly());
return Task.FromResult>([]);
}
-
- public Task AddAsync(
- string roomId, string agentId, string displayName,
- CancellationToken cancellationToken = default)
- {
- AddedParticipants.Add((roomId, agentId, displayName));
- return Task.CompletedTask;
- }
-
- public Task RemoveParticipantAsync(
- string roomId, string agentId,
- CancellationToken cancellationToken = default)
- => Task.CompletedTask;
-
- public Task RemoveRoomAsync(
- string roomId, CancellationToken cancellationToken = default)
- {
- RemovedRooms.Add(roomId);
- return Task.CompletedTask;
- }
}
private sealed class StubTerminalQueryPort : IStreamingProxyChatSessionTerminalQueryPort
@@ -2043,6 +2107,28 @@ private sealed class StubLlmProviderFactory(ILLMProvider provider) : ILLMProvide
public IReadOnlyList GetAvailableProviders() => [];
}
+ private sealed class RecordingNyxCoordinator()
+ : StreamingProxyNyxParticipantCoordinator(
+ new StubActorDispatchPort(new StubActorRuntime()),
+ new StubLlmProviderFactory(new StubLlmProvider()),
+ new ConfigurationBuilder().Build(),
+ new StubHttpClientFactory(),
+ NullLogger.Instance)
+ {
+ public List WorkItems { get; } = [];
+
+ public override Task RequestParticipantReplyAsync(
+ string roomId,
+ StreamingProxyNyxParticipantWorkItem workItem,
+ CancellationToken ct)
+ {
+ _ = roomId;
+ _ = ct;
+ WorkItems.Add(workItem);
+ return Task.CompletedTask;
+ }
+ }
+
private sealed class StubHttpClientFactory : IHttpClientFactory
{
public HttpClient CreateClient(string name) => new();
diff --git a/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs b/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs
index f6e332a67..edede901d 100644
--- a/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs
+++ b/test/Aevatar.AI.Tests/StreamingProxyEndpointsCoverageTests.cs
@@ -96,9 +96,9 @@ public async Task HandleCreateRoomAsync_ShouldMapCommandFailureToServerError()
}
[Fact]
- public async Task HandleListParticipantsAsync_ShouldReturnStoredParticipants()
+ public async Task HandleListParticipantsAsync_ShouldReturnQueriedParticipants()
{
- var participantStore = new RecordingParticipantStore
+ var participantQueryPort = new RecordingParticipantStore
{
Participants =
[
@@ -111,7 +111,7 @@ public async Task HandleListParticipantsAsync_ShouldReturnStoredParticipants()
CreateScopedHttpContext(),
"scope-a",
"room-1",
- participantStore,
+ participantQueryPort,
loggerFactory,
CancellationToken.None);
@@ -123,9 +123,9 @@ public async Task HandleListParticipantsAsync_ShouldReturnStoredParticipants()
}
[Fact]
- public async Task HandleListParticipantsAsync_ShouldReturnServerError_WhenStoreThrows()
+ public async Task HandleListParticipantsAsync_ShouldReturnServerError_WhenQueryPortThrows()
{
- var participantStore = new RecordingParticipantStore
+ var participantQueryPort = new RecordingParticipantStore
{
ThrowOnList = new InvalidOperationException("list failed"),
};
@@ -135,7 +135,7 @@ public async Task HandleListParticipantsAsync_ShouldReturnServerError_WhenStoreT
CreateScopedHttpContext(),
"scope-a",
"room-1",
- participantStore,
+ participantQueryPort,
loggerFactory,
CancellationToken.None);
@@ -172,14 +172,14 @@ public async Task HandleCreateRoomAsync_ShouldRejectMismatchedAuthenticatedScope
[Fact]
public async Task HandleListParticipantsAsync_ShouldRejectMismatchedAuthenticatedScope()
{
- var participantStore = new RecordingParticipantStore();
+ var participantQueryPort = new RecordingParticipantStore();
var loggerFactory = LoggerFactory.Create(_ => { });
var result = await InvokeHandleListParticipantsAsync(
CreateScopedHttpContext("scope-b"),
"scope-a",
"room-1",
- participantStore,
+ participantQueryPort,
loggerFactory,
CancellationToken.None);
@@ -206,13 +206,13 @@ private static async Task InvokeHandleListParticipantsAsync(
HttpContext context,
string scopeId,
string roomId,
- IStreamingProxyParticipantStore participantStore,
+ IStreamingProxyParticipantQueryPort participantQueryPort,
ILoggerFactory loggerFactory,
CancellationToken ct)
{
return await (Task)HandleListParticipantsAsyncMethod.Invoke(
null,
- [context, scopeId, roomId, new RecordingGAgentActorStore([]), participantStore, loggerFactory, ct])!;
+ [context, scopeId, roomId, new RecordingGAgentActorStore([]), participantQueryPort, loggerFactory, ct])!;
}
private static async Task<(int StatusCode, string Body)> ExecuteResultAsync(IResult result)
@@ -319,7 +319,7 @@ public Task CreateRoomAsync(
}
}
- private sealed class RecordingParticipantStore : IStreamingProxyParticipantStore
+ private sealed class RecordingParticipantStore : IStreamingProxyParticipantQueryPort
{
public Exception? ThrowOnList { get; init; }
public IReadOnlyList Participants { get; init; } = [];
@@ -335,33 +335,6 @@ public Task> ListAsync(
return Task.FromResult(Participants);
}
- public Task AddAsync(
- string roomId,
- string agentId,
- string displayName,
- CancellationToken cancellationToken = default)
- {
- _ = roomId;
- _ = agentId;
- _ = displayName;
- return Task.CompletedTask;
- }
-
- public Task RemoveParticipantAsync(
- string roomId,
- string agentId,
- CancellationToken cancellationToken = default)
- {
- _ = roomId;
- _ = agentId;
- return Task.CompletedTask;
- }
-
- public Task RemoveRoomAsync(string roomId, CancellationToken cancellationToken = default)
- {
- _ = roomId;
- return Task.CompletedTask;
- }
}
private sealed class RecordingActorRuntime(List operations, IActor actor) : IActorRuntime
diff --git a/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs b/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs
index 4fe52c88a..755943362 100644
--- a/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs
+++ b/test/Aevatar.AI.Tests/StreamingProxyNyxParticipantCoordinatorTests.cs
@@ -43,11 +43,11 @@ public async Task EnsureParticipantsJoinedAsync_ShouldPreserveDistinctNodesWithS
joinedEvents.Should().HaveCount(3);
joinedEvents.Select(evt => evt.AgentId).Should().OnlyHaveUniqueItems();
- store.ListParticipants("room-1").Should().HaveCount(3);
+ store.ListParticipants("room-1").Should().BeEmpty();
}
[Fact]
- public async Task GenerateRepliesAsync_ShouldSkipUnavailableOpenerAndContinueWithHealthyParticipant()
+ public async Task RequestDiscussionAsync_ShouldDispatchDiscussionRequestWithoutLocalProgression()
{
var (coordinator, actor, store, llmProvider) = CreateCoordinator();
var participants = await coordinator.EnsureParticipantsJoinedAsync(
@@ -61,42 +61,32 @@ public async Task GenerateRepliesAsync_ShouldSkipUnavailableOpenerAndContinueWit
var roomParticipants = participants.Take(2).ToList();
- await coordinator.GenerateRepliesAsync(
+ await coordinator.RequestDiscussionAsync(
roomParticipants,
actor,
"Discuss the roadmap for the next release.",
"session-1",
"test-token",
- CancellationToken.None,
- store,
- "room-1");
+ CancellationToken.None);
- llmProvider.Requests.Should().HaveCount(2);
- llmProvider.Requests[0].RequestId.Should().Contain("node-a");
- llmProvider.Requests[1].RequestId.Should().Contain("node-b");
+ llmProvider.Requests.Should().BeEmpty();
- var messageEvents = actor.Events
- .Where(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor))
- .Select(envelope => envelope.Payload!.Unpack())
+ var requests = actor.Events
+ .Where(envelope => envelope.Payload!.Is(StreamingProxyNyxDiscussionRequested.Descriptor))
+ .Select(envelope => envelope.Payload!.Unpack())
.ToList();
- var leftEvents = actor.Events
- .Where(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor))
- .Select(envelope => envelope.Payload!.Unpack())
- .ToList();
-
- messageEvents.Should().HaveCount(1);
- messageEvents.Should().NotContain(evt => evt.Content.StartsWith("当前暂时不可用", StringComparison.Ordinal));
- messageEvents.Single().Content.Should().Contain("reply from");
- messageEvents.Single().Content.Should().Contain("node-b");
- messageEvents.Select(evt => evt.AgentId).Should().OnlyHaveUniqueItems();
- leftEvents.Should().HaveCount(1);
- leftEvents.Single().AgentId.Should().Contain("node-a");
- store.ListParticipants("room-1").Should().HaveCount(2);
+ requests.Should().ContainSingle();
+ requests.Single().SessionId.Should().Be("session-1");
+ requests.Single().Prompt.Should().Be("Discuss the roadmap for the next release.");
+ requests.Single().Participants.Should().HaveCount(2);
+ actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor));
+ actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor));
+ store.ListParticipants("room-1").Should().BeEmpty();
}
[Fact]
- public async Task GenerateRepliesAsync_ShouldIgnoreUnavailableTextResponseAndContinueWithHealthyParticipant()
+ public async Task RequestParticipantReplyAsync_ShouldReturnFailureContinuation_WhenResponseIsUnavailable()
{
var (coordinator, actor, store, llmProvider) = CreateCoordinator(request =>
{
@@ -114,52 +104,35 @@ public async Task GenerateRepliesAsync_ShouldIgnoreUnavailableTextResponseAndCon
};
});
- var participants = await coordinator.EnsureParticipantsJoinedAsync(
- "scope-1",
- "room-1",
- actor,
- store,
- "test-token",
- CancellationToken.None,
- preferredRoute: "/api/v1/proxy/s/openclaw/node-a");
-
- var roomParticipants = participants.Take(2).ToList();
+ var roomParticipants = BuildParticipants();
- await coordinator.GenerateRepliesAsync(
- roomParticipants,
- actor,
- "Discuss the roadmap for the next release.",
- "session-1",
- "test-token",
- CancellationToken.None,
- store,
- "room-1");
+ await coordinator.RequestParticipantReplyAsync(
+ "room-1",
+ new StreamingProxyNyxParticipantWorkItem(
+ "session-1",
+ "test-token",
+ 1,
+ StreamingProxyDefaults.MaxDiscussionRounds,
+ roomParticipants[0],
+ roomParticipants,
+ []),
+ CancellationToken.None);
- llmProvider.Requests.Should().HaveCount(2);
+ llmProvider.Requests.Should().HaveCount(1);
llmProvider.Requests[0].RequestId.Should().Contain("node-a");
- llmProvider.Requests[1].RequestId.Should().Contain("node-b");
- var messageEvents = actor.Events
- .Where(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor))
- .Select(envelope => envelope.Payload!.Unpack())
+ var failures = actor.Events
+ .Where(envelope => envelope.Payload!.Is(StreamingProxyNyxParticipantReplyFailed.Descriptor))
+ .Select(envelope => envelope.Payload!.Unpack())
.ToList();
- var leftEvents = actor.Events
- .Where(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor))
- .Select(envelope => envelope.Payload!.Unpack())
- .ToList();
-
- messageEvents.Should().HaveCount(1);
- messageEvents.Single().Content.Should().Contain("reply from");
- messageEvents.Single().Content.Should().Contain("node-b");
- messageEvents.Should().NotContain(evt => evt.Content.Contains("503", StringComparison.OrdinalIgnoreCase));
- leftEvents.Should().HaveCount(1);
- leftEvents.Single().AgentId.Should().Contain("node-a");
- store.ListParticipants("room-1").Should().HaveCount(2);
+ failures.Should().ContainSingle();
+ failures.Single().ParticipantId.Should().Contain("node-a");
+ actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor));
}
[Fact]
- public async Task GenerateRepliesAsync_ShouldUseStreamContentWhenSynchronousContentIsMissing()
+ public async Task RequestParticipantReplyAsync_ShouldReturnSuccessContinuation_WhenStreamContentArrives()
{
var (coordinator, actor, store, llmProvider) = CreateCoordinator(
responseFactory: _ => new LLMResponse(),
@@ -169,43 +142,31 @@ public async Task GenerateRepliesAsync_ShouldUseStreamContentWhenSynchronousCont
new LLMStreamChunk { FinishReason = "stop", IsLast = true },
]);
- var participants = await coordinator.EnsureParticipantsJoinedAsync(
- "scope-1",
- "room-1",
- actor,
- store,
- "test-token",
- CancellationToken.None,
- preferredRoute: "/api/v1/proxy/s/openclaw/node-b");
-
- var roomParticipants = participants.Take(1).ToList();
+ var roomParticipants = BuildParticipants().Skip(1).Take(1).ToList();
- await coordinator.GenerateRepliesAsync(
- roomParticipants,
- actor,
- "Discuss the roadmap for the next release.",
- "session-1",
- "test-token",
- CancellationToken.None,
- store,
- "room-1");
+ await coordinator.RequestParticipantReplyAsync(
+ "room-1",
+ new StreamingProxyNyxParticipantWorkItem(
+ "session-1",
+ "test-token",
+ 1,
+ 1,
+ roomParticipants[0],
+ roomParticipants,
+ []),
+ CancellationToken.None);
llmProvider.Requests.Should().HaveCount(1);
- var messageEvents = actor.Events
- .Where(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor))
- .Select(envelope => envelope.Payload!.Unpack())
- .ToList();
-
- var leftEvents = actor.Events
- .Where(envelope => envelope.Payload!.Is(GroupChatParticipantLeftEvent.Descriptor))
- .Select(envelope => envelope.Payload!.Unpack())
+ var replies = actor.Events
+ .Where(envelope => envelope.Payload!.Is(StreamingProxyNyxParticipantReplySucceeded.Descriptor))
+ .Select(envelope => envelope.Payload!.Unpack())
.ToList();
- messageEvents.Should().HaveCount(1);
- messageEvents.Single().Content.Should().Contain("streamed reply from");
- messageEvents.Single().SessionId.Should().Be("session-1");
- leftEvents.Should().BeEmpty();
+ replies.Should().ContainSingle();
+ replies.Single().Content.Should().Contain("streamed reply from");
+ replies.Single().SessionId.Should().Be("session-1");
+ actor.Events.Should().NotContain(envelope => envelope.Payload!.Is(GroupChatMessageEvent.Descriptor));
}
[Fact]
@@ -226,13 +187,26 @@ public async Task EnsureParticipantsJoinedAsync_ShouldFallbackToLegacyStatus_Whe
participants.Should().ContainSingle();
participants.Single().RoutePreference.Should().Be("/api/v1/proxy/s/openclaw/legacy");
participants.Single().Model.Should().Be("legacy-model");
- store.ListParticipants("room-1").Should().ContainSingle(entry =>
- entry.AgentId.Contains("svc-legacy", StringComparison.OrdinalIgnoreCase));
+ store.ListParticipants("room-1").Should().BeEmpty();
}
private static (StreamingProxyNyxParticipantCoordinator Coordinator, RecordingActor Actor, RecordingParticipantStore Store, RecordingLlmProvider Provider) CreateCoordinator()
=> CreateCoordinator(null);
+ private static List BuildParticipants() =>
+ [
+ new(
+ "svc-node-a|node-a|/api/v1/proxy/s/openclaw/node-a",
+ "/api/v1/proxy/s/openclaw/node-a",
+ "OpenClaw-Node (1)",
+ "claude-sonnet-4-5-20250929"),
+ new(
+ "svc-node-b|node-b|/api/v1/proxy/s/openclaw/node-b",
+ "/api/v1/proxy/s/openclaw/node-b",
+ "OpenClaw-Node (2)",
+ "claude-sonnet-4-5-20250929"),
+ ];
+
private static (StreamingProxyNyxParticipantCoordinator Coordinator, RecordingActor Actor, RecordingParticipantStore Store, RecordingLlmProvider Provider) CreateCoordinator(
Func? responseFactory)
=> CreateCoordinator(responseFactory, null);
@@ -486,7 +460,7 @@ public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationTo
}
}
- private sealed class RecordingParticipantStore : IStreamingProxyParticipantStore
+ private sealed class RecordingParticipantStore : IStreamingProxyParticipantQueryPort
{
private readonly Dictionary> _rooms = new(StringComparer.OrdinalIgnoreCase);
@@ -500,44 +474,6 @@ public Task> ListAsync(
return Task.FromResult(participants);
}
- public Task AddAsync(
- string roomId,
- string agentId,
- string displayName,
- CancellationToken cancellationToken = default)
- {
- if (!_rooms.TryGetValue(roomId, out var participants))
- {
- participants = [];
- _rooms[roomId] = participants;
- }
-
- participants.RemoveAll(entry => string.Equals(entry.AgentId, agentId, StringComparison.OrdinalIgnoreCase));
- participants.Add(new ParticipantStoreEntry(agentId, displayName, DateTimeOffset.UtcNow));
- return Task.CompletedTask;
- }
-
- public Task RemoveParticipantAsync(
- string roomId,
- string agentId,
- CancellationToken cancellationToken = default)
- {
- if (_rooms.TryGetValue(roomId, out var participants))
- {
- participants.RemoveAll(entry => string.Equals(entry.AgentId, agentId, StringComparison.OrdinalIgnoreCase));
- if (participants.Count == 0)
- _rooms.Remove(roomId);
- }
-
- return Task.CompletedTask;
- }
-
- public Task RemoveRoomAsync(string roomId, CancellationToken cancellationToken = default)
- {
- _rooms.Remove(roomId);
- return Task.CompletedTask;
- }
-
public IReadOnlyList ListParticipants(string roomId) =>
_rooms.TryGetValue(roomId, out var participants)
? participants
diff --git a/test/Aevatar.Studio.Tests/StreamingProxyRoomCurrentStateProjectorTests.cs b/test/Aevatar.Studio.Tests/StreamingProxyRoomCurrentStateProjectorTests.cs
new file mode 100644
index 000000000..4f82c39b4
--- /dev/null
+++ b/test/Aevatar.Studio.Tests/StreamingProxyRoomCurrentStateProjectorTests.cs
@@ -0,0 +1,109 @@
+using Aevatar.CQRS.Projection.Core.Abstractions;
+using Aevatar.CQRS.Projection.Runtime.Abstractions;
+using Aevatar.CQRS.Projection.Stores.Abstractions;
+using Aevatar.Foundation.Abstractions;
+using Aevatar.GAgents.StreamingProxy;
+using FluentAssertions;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Aevatar.Studio.Tests;
+
+public sealed class StreamingProxyRoomCurrentStateProjectorTests
+{
+ [Fact]
+ public async Task ProjectAsync_ShouldUpsertRoomCurrentState_FromCommittedRoomState()
+ {
+ var dispatcher = new RecordingWriteDispatcher();
+ var projector = new StreamingProxyRoomCurrentStateProjector(
+ dispatcher,
+ new FixedProjectionClock(DateTimeOffset.Parse("2026-05-25T00:00:00Z")));
+ var joinedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.Parse("2026-05-25T10:00:00Z"));
+
+ await projector.ProjectAsync(
+ new StreamingProxyCurrentStateProjectionContext
+ {
+ RootActorId = "room-a",
+ ProjectionKind = "streaming-proxy-current-state",
+ },
+ WrapCommitted(
+ new GroupChatParticipantJoinedEvent
+ {
+ AgentId = "agent-1",
+ DisplayName = "Alice",
+ },
+ new StreamingProxyGAgentState
+ {
+ RoomName = "Room A",
+ Participants =
+ {
+ new StreamingProxyParticipant
+ {
+ AgentId = "agent-1",
+ DisplayName = "Alice",
+ JoinedAt = joinedAt,
+ },
+ },
+ },
+ version: 7,
+ eventId: "evt-7"));
+
+ dispatcher.Upserts.Should().ContainSingle();
+ var document = dispatcher.Upserts[0];
+ document.Id.Should().Be("room-a");
+ document.ActorId.Should().Be("room-a");
+ document.StateVersion.Should().Be(7);
+ document.LastEventId.Should().Be("evt-7");
+ document.StateRoot.Is(StreamingProxyGAgentState.Descriptor).Should().BeTrue();
+ document.StateRoot.Unpack().Participants
+ .Should().ContainSingle(p => p.AgentId == "agent-1" && p.DisplayName == "Alice");
+ }
+
+ private static EventEnvelope WrapCommitted(
+ IMessage payload,
+ IMessage state,
+ long version,
+ string eventId)
+ {
+ return new EventEnvelope
+ {
+ Id = "env-1",
+ Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
+ Payload = Any.Pack(new CommittedStateEventPublished
+ {
+ StateEvent = new StateEvent
+ {
+ Version = version,
+ EventId = eventId,
+ EventData = Any.Pack(payload),
+ Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
+ },
+ StateRoot = Any.Pack(state),
+ }),
+ };
+ }
+
+ private sealed class RecordingWriteDispatcher : IProjectionWriteDispatcher
+ where TReadModel : class, IProjectionReadModel
+ {
+ public List Upserts { get; } = [];
+
+ public Task UpsertAsync(TReadModel readModel, CancellationToken ct = default)
+ {
+ ct.ThrowIfCancellationRequested();
+ Upserts.Add(readModel);
+ return Task.FromResult(ProjectionWriteResult.Applied());
+ }
+
+ public Task DeleteAsync(string id, CancellationToken ct = default)
+ {
+ ct.ThrowIfCancellationRequested();
+ return Task.FromResult(ProjectionWriteResult.Applied());
+ }
+ }
+
+ private sealed class FixedProjectionClock(DateTimeOffset utcNow) : IProjectionClock
+ {
+ public DateTimeOffset UtcNow { get; } = utcNow;
+ }
+}