Skip to content

Commit a3c5d7c

Browse files
committed
fix: break completion loop when teammates and lead ping-pong after task done (#3)
Certain models (e.g. Kimi K2.6) ignore 'STOP' instructions and send courtesy replies after a teammate reports completion, creating an infinite wake cycle between lead and teammate. - Add reported_to_lead flag (migration 7) set on busy->ready transition when teammate has sent at least one message to lead - Reset flag on ready->busy so re-activated teammates can receive messages - Skip promptAsync delivery to completed teammates in team_message - Skip broadcast delivery to completed teammates - Skip recovery redelivery to completed teammates - Add wake-lead cooldown (5s dedup) + all-done check - Skip peer-flush and nudge for completed teammates - 7 regression tests including Q&A-still-works verification
1 parent ad53368 commit a3c5d7c

8 files changed

Lines changed: 224 additions & 9 deletions

File tree

src/hooks.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,26 @@ export function handleSessionStatusEvent(
4141
"UPDATE team_member SET status = ?, execution_status = 'idle', time_updated = ? WHERE team_id = ? AND name = ?",
4242
[newStatus, Date.now(), entry.teamId, entry.memberName]
4343
)
44+
// Mark teammate as having reported if they sent at least one message to lead (issue #3).
45+
// Set on busy→ready transition so Q&A messages during work don't prematurely block delivery.
46+
if (member.status === "busy" && newStatus === "ready") {
47+
const leadMsgCount = (db.query(
48+
"SELECT COUNT(*) as c FROM team_message WHERE team_id = ? AND from_name = ? AND to_name = 'lead'"
49+
).get(entry.teamId, entry.memberName) as { c: number }).c
50+
if (leadMsgCount > 0) {
51+
db.run(
52+
"UPDATE team_member SET reported_to_lead = 1 WHERE team_id = ? AND name = ?",
53+
[entry.teamId, entry.memberName]
54+
)
55+
}
56+
}
4457
return { memberName: entry.memberName, teamId: entry.teamId, from: member.status, to: newStatus }
4558
} else if (status === "busy") {
4659
if (member.status === "ready" || member.status === "error") {
60+
// Reset reported_to_lead so re-activated teammates can receive messages again (issue #3).
61+
// INVARIANT: every promptAsync delivery path must check hasReportedCompletion() to prevent loops.
4762
db.run(
48-
"UPDATE team_member SET status = 'busy', execution_status = 'running', time_updated = ? WHERE team_id = ? AND name = ?",
63+
"UPDATE team_member SET status = 'busy', execution_status = 'running', reported_to_lead = 0, time_updated = ? WHERE team_id = ? AND name = ?",
4964
[Date.now(), entry.teamId, entry.memberName]
5065
)
5166
return { memberName: entry.memberName, teamId: entry.teamId, from: member.status, to: "busy" }

src/index.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { MemberRegistry, DescendantTracker } from "./state"
1010
import { isWorktreeInstance } from "./util"
1111
import { handleSessionStatusEvent, handleSessionCreatedEvent, checkToolIsolation, shouldNudgeIdleMember } from "./hooks"
1212
import { notifyTeamEvent, notifyWorkingProgress } from "./notify"
13-
import { sendMessage } from "./messaging"
13+
import { sendMessage, hasReportedCompletion } from "./messaging"
1414
import { buildLeadSystemPrompt, buildTeammateSystemPrompt, buildTeamCompactionContext } from "./system-prompt"
1515
import { log, initLog } from "./log"
1616
import { findTeamBySession } from "./types"
@@ -58,6 +58,8 @@ const plugin: Plugin = async (input) => {
5858
const tracker = new DescendantTracker()
5959
const nudgedMembers = new Set<string>()
6060
const progressTracker = new ProgressTracker()
61+
const wakeLeadTimestamps = new Map<string, number>()
62+
const WAKE_LEAD_COOLDOWN_MS = 5000
6163

6264
// Extract the working HeyAPI transport from the plugin-provided v1 client and pass it
6365
// to the v2 OpencodeClient. The plugin framework provides a v1 client which stores its
@@ -176,8 +178,9 @@ const plugin: Plugin = async (input) => {
176178
}
177179

178180
// Nudge teammate if they went idle without reporting to the lead (once only)
181+
// Skip if they already reported completion (issue #3 — prevents re-waking completed teammates)
179182
const nudgeKey = `${transition.teamId}:${transition.memberName}`
180-
if (!nudgedMembers.has(nudgeKey) && shouldNudgeIdleMember(db, transition.teamId, transition.memberName)) {
183+
if (!nudgedMembers.has(nudgeKey) && shouldNudgeIdleMember(db, transition.teamId, transition.memberName) && !hasReportedCompletion(db, transition.teamId, transition.memberName)) {
181184
nudgedMembers.add(nudgeKey)
182185
log(`nudge:idle-without-report name=${transition.memberName}`)
183186
client.session.promptAsync({
@@ -232,7 +235,13 @@ const plugin: Plugin = async (input) => {
232235
const team = db.query("SELECT id FROM team WHERE lead_session_id = ? AND status = 'active'").get(sessionID) as { id: string } | null
233236
if (team) {
234237
const pending = db.query("SELECT COUNT(*) as c FROM team_message WHERE team_id = ? AND to_name = 'lead' AND delivered = 0").get(team.id) as { c: number }
235-
if (pending.c > 0) {
238+
// Skip wake if all teammates are done or if we woke recently (issue #3 — breaks completion loop)
239+
const allDone = (db.query(
240+
"SELECT COUNT(*) as c FROM team_member WHERE team_id = ? AND status NOT IN ('ready', 'shutdown', 'error')"
241+
).get(team.id) as { c: number }).c === 0
242+
const lastWake = wakeLeadTimestamps.get(team.id) ?? 0
243+
if (pending.c > 0 && !allDone && Date.now() - lastWake > WAKE_LEAD_COOLDOWN_MS) {
244+
wakeLeadTimestamps.set(team.id, Date.now())
236245
log(`wake-lead: ${pending.c} pending messages, sending promptAsync`)
237246
client.session.promptAsync({
238247
sessionID,
@@ -250,7 +259,7 @@ const plugin: Plugin = async (input) => {
250259
JOIN team t ON tm.team_id = t.id
251260
WHERE tm.session_id = ? AND t.status = 'active'`
252261
).get(sessionID) as { team_id: string; name: string } | null
253-
if (member) {
262+
if (member && !hasReportedCompletion(db, member.team_id, member.name)) {
254263
const staleThreshold = Date.now() - 5000
255264
const peerMsgs = db.query(
256265
"SELECT COUNT(*) as c FROM team_message WHERE team_id = ? AND to_name = ? AND delivered = 0 AND time_created < ?"

src/messaging.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,14 @@ export function getUndeliveredMessages(db: Database, teamId: string): MessageRow
7676
export function markDelivered(db: Database, messageId: string): void {
7777
db.run("UPDATE team_message SET delivered = 1 WHERE id = ?", [messageId])
7878
}
79+
80+
/**
81+
* Check if a teammate has reported completion to the lead.
82+
* Returns true if the reported_to_lead flag is set on the member.
83+
*/
84+
export function hasReportedCompletion(db: Database, teamId: string, memberName: string): boolean {
85+
const row = db.query(
86+
"SELECT reported_to_lead FROM team_member WHERE team_id = ? AND name = ?"
87+
).get(teamId, memberName) as { reported_to_lead: number } | null
88+
return row?.reported_to_lead === 1
89+
}

src/recovery.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { Database } from "bun:sqlite"
22
import type { PluginClient } from "./types"
33
import type { MemberRegistry } from "./state"
4-
import { getUndeliveredMessages, markDelivered } from "./messaging"
4+
import { getUndeliveredMessages, markDelivered, hasReportedCompletion } from "./messaging"
55
import { preserveBranch, preservedBranchName } from "./tools/merge-helper"
66
import { log } from "./log"
77

@@ -123,6 +123,12 @@ export async function recoverUndeliveredMessages(
123123

124124
if (!recipientSessionId) continue
125125

126+
// Skip delivery to teammates who have already reported completion (issue #3)
127+
if (hasReportedCompletion(db, team.id, msg.to_name!)) {
128+
markDelivered(db, msg.id)
129+
continue
130+
}
131+
126132
try {
127133
await client.session.promptAsync({
128134
sessionID: recipientSessionId,

src/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ export const MIGRATIONS: string[] = [
8080
`ALTER TABLE team ADD COLUMN lead_agent TEXT;`,
8181
// Migration 6: Track workspace ID for worktree-session binding
8282
`ALTER TABLE team_member ADD COLUMN workspace_id TEXT;`,
83+
// Migration 7: Track whether teammate has reported to lead (completion loop prevention, issue #3)
84+
`ALTER TABLE team_member ADD COLUMN reported_to_lead INTEGER NOT NULL DEFAULT 0;`,
8385
]
8486

8587
/**

src/tools/team-broadcast.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { ToolDeps } from "../types"
22
import { requireTeamMember } from "./shared"
3-
import { broadcastMessage, markDelivered } from "../messaging"
3+
import { broadcastMessage, markDelivered, hasReportedCompletion } from "../messaging"
44
import { log } from "../log"
55

66
/**
@@ -40,8 +40,14 @@ export async function executeTeamBroadcast(
4040
}
4141

4242
// Fire-and-forget: deliver to all recipients. Message is already persisted in DB.
43+
// Skip completed teammates to prevent re-waking them (issue #3).
4344
let delivered = 0
45+
let skipped = 0
4446
for (const recipient of recipients) {
47+
if (recipient.name !== "lead" && hasReportedCompletion(deps.db, teamInfo.teamId, recipient.name)) {
48+
skipped++
49+
continue
50+
}
4551
deps.client.session.promptAsync({
4652
sessionID: recipient.sessionId,
4753
parts: [{ type: "text", text: `[Team broadcast from ${senderName}]: ${args.text}` }],
@@ -53,5 +59,6 @@ export async function executeTeamBroadcast(
5359
})
5460
}
5561

56-
return `Broadcast sent to ${recipients.length} recipient${recipients.length !== 1 ? "s" : ""}.`
62+
const sent = recipients.length - skipped
63+
return `Broadcast sent to ${sent} recipient${sent !== 1 ? "s" : ""}.`
5764
}

src/tools/team-message.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ToolDeps } from "../types"
22
import { resolveRecipientSession } from "../types"
33
import { requireTeamMember } from "./shared"
4-
import { sendMessage, markDelivered } from "../messaging"
4+
import { sendMessage, markDelivered, hasReportedCompletion } from "../messaging"
55
import { log } from "../log"
66

77
/**
@@ -90,6 +90,11 @@ export async function executeTeamMessage(
9090
return `Message sent to ${args.to}.`
9191
}
9292

93+
// Guard: skip promptAsync delivery to teammates who have already reported completion (issue #3)
94+
if (hasReportedCompletion(deps.db, teamInfo.teamId, args.to)) {
95+
return `Message stored for ${args.to} (teammate has completed their task — message will not wake them).`
96+
}
97+
9398
// For member-to-member messages, fire-and-forget delivery is safe.
9499
const deliveryText = `[Team message from ${senderName}]: ${messageText}`
95100
deps.client.session.promptAsync({

test/completion-loop.test.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import { describe, test, expect, beforeEach } from "bun:test"
2+
import { setupDeps, insertTeam, insertMember } from "./helpers"
3+
import { executeTeamCreate } from "../src/tools/team-create"
4+
import { executeTeamSpawn } from "../src/tools/team-spawn"
5+
import { executeTeamMessage } from "../src/tools/team-message"
6+
import { handleSessionStatusEvent } from "../src/hooks"
7+
import { hasReportedCompletion } from "../src/messaging"
8+
import { sendMessage } from "../src/messaging"
9+
10+
type Deps = ReturnType<typeof setupDeps>
11+
12+
describe("issue #3: completion loop prevention", () => {
13+
let deps: Deps
14+
const leadSession = "lead-sess"
15+
16+
beforeEach(() => {
17+
deps = setupDeps()
18+
})
19+
20+
/** Helper: spawn a teammate, have them message lead, then transition busy→ready. */
21+
async function spawnAndComplete(teamName: string, memberName: string): Promise<{ teamId: string; memberSession: string }> {
22+
await executeTeamCreate(deps, { name: teamName }, leadSession)
23+
const team = deps.db.query("SELECT id FROM team WHERE name = ?").get(teamName) as { id: string }
24+
await executeTeamSpawn(deps, { name: memberName, agent: "build", prompt: "task", worktree: false }, leadSession)
25+
const memberSession = (deps.db.query("SELECT session_id FROM team_member WHERE name = ?").get(memberName) as { session_id: string }).session_id
26+
27+
// Teammate messages lead
28+
await executeTeamMessage(deps, { to: "lead", text: "here are my findings" }, memberSession)
29+
30+
// Simulate busy→ready transition (teammate finished work)
31+
deps.db.run("UPDATE team_member SET status = 'busy' WHERE team_id = ? AND name = ?", [team.id, memberName])
32+
handleSessionStatusEvent(deps.db, deps.registry, memberSession, "idle")
33+
34+
return { teamId: team.id, memberSession }
35+
}
36+
37+
test("hasReportedCompletion is false after messaging lead but BEFORE going idle", async () => {
38+
await executeTeamCreate(deps, { name: "report-team" }, leadSession)
39+
const team = deps.db.query("SELECT id FROM team WHERE name = 'report-team'").get() as { id: string }
40+
await executeTeamSpawn(deps, { name: "alice", agent: "build", prompt: "task", worktree: false }, leadSession)
41+
const aliceSession = (deps.db.query("SELECT session_id FROM team_member WHERE name = 'alice'").get() as { session_id: string }).session_id
42+
43+
expect(hasReportedCompletion(deps.db, team.id, "alice")).toBe(false)
44+
45+
// Alice messages lead — flag should NOT be set yet (she's still working)
46+
await executeTeamMessage(deps, { to: "lead", text: "done with my task" }, aliceSession)
47+
expect(hasReportedCompletion(deps.db, team.id, "alice")).toBe(false)
48+
49+
// Alice goes idle (busy→ready) — NOW the flag should be set
50+
deps.db.run("UPDATE team_member SET status = 'busy' WHERE team_id = ? AND name = 'alice'", [team.id])
51+
handleSessionStatusEvent(deps.db, deps.registry, aliceSession, "idle")
52+
expect(hasReportedCompletion(deps.db, team.id, "alice")).toBe(true)
53+
})
54+
55+
test("hasReportedCompletion stays false if teammate goes idle WITHOUT messaging lead", async () => {
56+
await executeTeamCreate(deps, { name: "no-msg-team" }, leadSession)
57+
const team = deps.db.query("SELECT id FROM team WHERE name = 'no-msg-team'").get() as { id: string }
58+
await executeTeamSpawn(deps, { name: "bob", agent: "build", prompt: "task", worktree: false }, leadSession)
59+
const bobSession = (deps.db.query("SELECT session_id FROM team_member WHERE name = 'bob'").get() as { session_id: string }).session_id
60+
61+
// Bob goes idle without ever messaging lead
62+
deps.db.run("UPDATE team_member SET status = 'busy' WHERE team_id = ? AND name = 'bob'", [team.id])
63+
handleSessionStatusEvent(deps.db, deps.registry, bobSession, "idle")
64+
expect(hasReportedCompletion(deps.db, team.id, "bob")).toBe(false)
65+
})
66+
67+
test("messages to completed teammates are stored but NOT pushed via promptAsync", async () => {
68+
const { teamId, memberSession } = await spawnAndComplete("guard-team", "charlie")
69+
deps.client.calls.length = 0
70+
71+
// Lead sends a reply (this is what Kimi K2.6 does — courtesy replies)
72+
const result = await executeTeamMessage(deps, { to: "charlie", text: "thanks charlie!" }, leadSession)
73+
74+
// Message IS stored in DB
75+
const msg = deps.db.query("SELECT content FROM team_message WHERE team_id = ? AND to_name = 'charlie' AND from_name = 'lead'").get(teamId) as { content: string } | null
76+
expect(msg).toBeTruthy()
77+
expect(msg!.content).toBe("thanks charlie!")
78+
79+
// But promptAsync was NOT called to deliver it
80+
const promptCalls = deps.client.calls.filter(c => c.method === "session.promptAsync")
81+
expect(promptCalls).toHaveLength(0)
82+
83+
// Return value warns the lead
84+
expect(result).toContain("completed")
85+
})
86+
87+
test("wake-lead skips when all teammates are ready/shutdown", async () => {
88+
const { teamId } = await spawnAndComplete("done-team", "dave")
89+
90+
// Insert another undelivered message to lead (simulating the loop)
91+
sendMessage(deps.db, { teamId, from: "dave", to: "lead", content: "duplicate" })
92+
93+
// All members should be in a terminal state
94+
const activeBusy = deps.db.query(
95+
"SELECT COUNT(*) as c FROM team_member WHERE team_id = ? AND status NOT IN ('ready', 'shutdown', 'error')"
96+
).get(teamId) as { c: number }
97+
expect(activeBusy.c).toBe(0)
98+
})
99+
100+
test("peer-flush skips for completed teammates", async () => {
101+
const { teamId } = await spawnAndComplete("flush-team", "eve")
102+
103+
expect(hasReportedCompletion(deps.db, teamId, "eve")).toBe(true)
104+
105+
// Insert a stale peer message addressed to eve (simulating the loop)
106+
deps.db.run(
107+
"INSERT INTO team_message (id, team_id, from_name, to_name, content, delivered, time_created) VALUES (?, ?, 'lead', 'eve', 'follow up', 0, ?)",
108+
["msg-stale", teamId, Date.now() - 10_000]
109+
)
110+
111+
const reported = deps.db.query("SELECT reported_to_lead FROM team_member WHERE team_id = ? AND name = 'eve'").get(teamId) as { reported_to_lead: number }
112+
expect(reported.reported_to_lead).toBe(1)
113+
})
114+
115+
test("full ping-pong regression: promptAsync calls are bounded after completion", async () => {
116+
const { teamId, memberSession } = await spawnAndComplete("loop-team", "frank")
117+
118+
// Reset call log — everything after this should be bounded
119+
deps.client.calls.length = 0
120+
121+
// Simulate the ping-pong loop that Kimi K2.6 triggers:
122+
const reply1 = await executeTeamMessage(deps, { to: "frank", text: "thanks for the report" }, leadSession)
123+
const reply2 = await executeTeamMessage(deps, { to: "frank", text: "anything else?" }, leadSession)
124+
125+
// Both replies should be stored but NOT delivered
126+
expect(reply1).toContain("completed")
127+
expect(reply2).toContain("completed")
128+
129+
// Zero promptAsync calls to frank's session after he reported
130+
const frankCalls = deps.client.calls.filter(c => {
131+
if (c.method !== "session.promptAsync") return false
132+
const args = c.args[0] as { sessionID: string }
133+
return args.sessionID === memberSession
134+
})
135+
expect(frankCalls).toHaveLength(0)
136+
})
137+
138+
test("teammate can still receive messages BEFORE going idle (Q&A works)", async () => {
139+
await executeTeamCreate(deps, { name: "qa-team" }, leadSession)
140+
const team = deps.db.query("SELECT id FROM team WHERE name = 'qa-team'").get() as { id: string }
141+
await executeTeamSpawn(deps, { name: "grace", agent: "build", prompt: "task", worktree: false }, leadSession)
142+
const graceSession = (deps.db.query("SELECT session_id FROM team_member WHERE name = 'grace'").get() as { session_id: string }).session_id
143+
144+
// Grace asks lead a question (messages lead, but is still busy)
145+
await executeTeamMessage(deps, { to: "lead", text: "I have a question about the API" }, graceSession)
146+
147+
// Grace is NOT marked as completed yet (still busy)
148+
expect(hasReportedCompletion(deps.db, team.id, "grace")).toBe(false)
149+
150+
deps.client.calls.length = 0
151+
152+
// Lead answers — this SHOULD be delivered (grace hasn't completed)
153+
const result = await executeTeamMessage(deps, { to: "grace", text: "use the v2 endpoint" }, leadSession)
154+
155+
// Message was delivered via promptAsync (not blocked)
156+
expect(result).toBe("Message sent to grace.")
157+
const promptCalls = deps.client.calls.filter(c => c.method === "session.promptAsync")
158+
expect(promptCalls).toHaveLength(1)
159+
})
160+
})

0 commit comments

Comments
 (0)