Skip to content

Commit 8142552

Browse files
Mark HendersonMark Henderson
authored andcommitted
fix(core): add dispose functions to prevent subscription memory leaks
- Add dispose() to Share, ShareNext, Plugin, and Format namespaces - Add cleanupSession() and dispose() to ACP Agent with AbortControllers - Add Bus._getSubscriptionCount() test helpers - Add memory tests to verify cleanup works correctly Supersedes anomalyco#7032 Fixes anomalyco#3013
1 parent a5b6c57 commit 8142552

File tree

10 files changed

+1349
-250
lines changed

10 files changed

+1349
-250
lines changed

packages/opencode/src/acp/agent.ts

Lines changed: 285 additions & 241 deletions
Large diffs are not rendered by default.

packages/opencode/src/bus/index.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,19 @@ export namespace Bus {
102102
match.splice(index, 1)
103103
}
104104
}
105+
106+
/** @internal Test helper to get subscription count for a specific event type */
107+
export function _getSubscriptionCount(type: string): number {
108+
const match = state().subscriptions.get(type)
109+
return match?.length ?? 0
110+
}
111+
112+
/** @internal Test helper to get total subscription count across all event types */
113+
export function _getTotalSubscriptionCount(): number {
114+
let total = 0
115+
for (const subs of state().subscriptions.values()) {
116+
total += subs.length
117+
}
118+
return total
119+
}
105120
}

packages/opencode/src/format/index.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,14 @@ export namespace Format {
100100
return result
101101
}
102102

103+
// Store unsubscribe functions for cleanup
104+
const unsubscribers: Array<() => void> = []
105+
103106
export function init() {
104107
log.info("init")
105-
Bus.subscribe(File.Event.Edited, async (payload) => {
108+
// Clean up any existing subscriptions before adding new ones
109+
dispose()
110+
const unsub = Bus.subscribe(File.Event.Edited, async (payload) => {
106111
const file = payload.properties.file
107112
log.info("formatting", { file })
108113
const ext = path.extname(file)
@@ -133,5 +138,14 @@ export namespace Format {
133138
}
134139
}
135140
})
141+
unsubscribers.push(unsub)
142+
}
143+
144+
export function dispose() {
145+
for (const unsub of unsubscribers) {
146+
unsub()
147+
}
148+
unsubscribers.length = 0
149+
log.info("disposed format subscriptions")
136150
}
137151
}

packages/opencode/src/plugin/index.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,20 +99,34 @@ export namespace Plugin {
9999
return state().then((x) => x.hooks)
100100
}
101101

102+
// Store unsubscribe functions for cleanup
103+
const unsubscribers: Array<() => void> = []
104+
102105
export async function init() {
106+
// Clean up any existing subscriptions before adding new ones
107+
dispose()
103108
const hooks = await state().then((x) => x.hooks)
104109
const config = await Config.get()
105110
for (const hook of hooks) {
106111
// @ts-expect-error this is because we haven't moved plugin to sdk v2
107112
await hook.config?.(config)
108113
}
109-
Bus.subscribeAll(async (input) => {
114+
const unsub = Bus.subscribeAll(async (input) => {
110115
const hooks = await state().then((x) => x.hooks)
111116
for (const hook of hooks) {
112117
hook["event"]?.({
113118
event: input,
114119
})
115120
}
116121
})
122+
unsubscribers.push(unsub)
123+
}
124+
125+
export function dispose() {
126+
for (const unsub of unsubscribers) {
127+
unsub()
128+
}
129+
unsubscribers.length = 0
130+
log.info("disposed plugin subscriptions")
117131
}
118132
}

packages/opencode/src/share/share-next.ts

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,31 @@ import type * as SDK from "@opencode-ai/sdk/v2"
1010

1111
export namespace ShareNext {
1212
const log = Log.create({ service: "share-next" })
13+
let disposed = false
14+
15+
// Store unsubscribe functions for cleanup
16+
const unsubscribers: Array<() => void> = []
1317

1418
async function url() {
1519
return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai")
1620
}
1721

1822
export async function init() {
19-
Bus.subscribe(Session.Event.Updated, async (evt) => {
23+
// Clean up any existing subscriptions before adding new ones
24+
dispose()
25+
disposed = false
26+
27+
const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
28+
if (disposed) return
2029
await sync(evt.properties.info.id, [
2130
{
2231
type: "session",
2332
data: evt.properties.info,
2433
},
2534
])
2635
})
27-
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
36+
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
37+
if (disposed) return
2838
await sync(evt.properties.info.sessionID, [
2939
{
3040
type: "message",
@@ -44,22 +54,39 @@ export namespace ShareNext {
4454
])
4555
}
4656
})
47-
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
57+
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
58+
if (disposed) return
4859
await sync(evt.properties.part.sessionID, [
4960
{
5061
type: "part",
5162
data: evt.properties.part,
5263
},
5364
])
5465
})
55-
Bus.subscribe(Session.Event.Diff, async (evt) => {
66+
const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => {
67+
if (disposed) return
5668
await sync(evt.properties.sessionID, [
5769
{
5870
type: "session_diff",
5971
data: evt.properties.diff,
6072
},
6173
])
6274
})
75+
unsubscribers.push(unsub1, unsub2, unsub3, unsub4)
76+
}
77+
78+
export function dispose() {
79+
disposed = true
80+
for (const unsub of unsubscribers) {
81+
unsub()
82+
}
83+
unsubscribers.length = 0
84+
// Clear pending timeouts
85+
for (const entry of queue.values()) {
86+
clearTimeout(entry.timeout)
87+
}
88+
queue.clear()
89+
log.info("disposed share-next subscriptions")
6390
}
6491

6592
export async function create(sessionID: string) {
@@ -191,4 +218,16 @@ export namespace ShareNext {
191218
},
192219
])
193220
}
221+
222+
/** @internal Test helper to get queue size */
223+
export function _getQueueSize(): number {
224+
return queue.size
225+
}
226+
227+
/** @internal Test helper to add items to queue for testing dispose cleanup */
228+
export function _addToQueueForTesting(sessionID: string) {
229+
const dataMap = new Map<string, Data>()
230+
const timeout = setTimeout(() => {}, 10000)
231+
queue.set(sessionID, { timeout, data: dataMap })
232+
}
194233
}

packages/opencode/src/share/share.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@ export namespace Share {
99

1010
let queue: Promise<void> = Promise.resolve()
1111
const pending = new Map<string, any>()
12+
let disposed = false
13+
14+
// Store unsubscribe functions for cleanup
15+
const unsubscribers: Array<() => void> = []
1216

1317
export async function sync(key: string, content: any) {
18+
// Skip if disposed
19+
if (disposed) return
1420
const [root, ...splits] = key.split("/")
1521
if (root !== "session") return
1622
const [sub, sessionID] = splits
@@ -21,6 +27,8 @@ export namespace Share {
2127
pending.set(key, content)
2228
queue = queue
2329
.then(async () => {
30+
// Check if disposed before processing
31+
if (disposed) return
2432
const content = pending.get(key)
2533
if (content === undefined) return
2634
pending.delete(key)
@@ -46,13 +54,17 @@ export namespace Share {
4654
}
4755

4856
export function init() {
49-
Bus.subscribe(Session.Event.Updated, async (evt) => {
57+
// Clean up any existing subscriptions before adding new ones
58+
dispose()
59+
disposed = false
60+
61+
const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
5062
await sync("session/info/" + evt.properties.info.id, evt.properties.info)
5163
})
52-
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
64+
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
5365
await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info)
5466
})
55-
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
67+
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
5668
await sync(
5769
"session/part/" +
5870
evt.properties.part.sessionID +
@@ -63,6 +75,18 @@ export namespace Share {
6375
evt.properties.part,
6476
)
6577
})
78+
unsubscribers.push(unsub1, unsub2, unsub3)
79+
}
80+
81+
export function dispose() {
82+
disposed = true
83+
for (const unsub of unsubscribers) {
84+
unsub()
85+
}
86+
unsubscribers.length = 0
87+
pending.clear()
88+
queue = Promise.resolve()
89+
log.info("disposed share subscriptions")
6690
}
6791

6892
export const URL =

0 commit comments

Comments
 (0)