Skip to content

Commit efe58d6

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Introduce TailRetentionEventCompactor to compact and retain the tail of the event stream
Provide a way to manage the size of an event stream Specifically, it: * Keeps the retentionSize most recent events raw. * Compacts all events that never compacted and older than the retained tail, including the most recent compaction events, into a new summary event. * Appends this new summary event to the end of the event stream. PiperOrigin-RevId: 864748009
1 parent 59e87d3 commit efe58d6

File tree

2 files changed

+439
-0
lines changed

2 files changed

+439
-0
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.summarizer;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
21+
import com.google.adk.events.Event;
22+
import com.google.adk.events.EventCompaction;
23+
import com.google.adk.sessions.BaseSessionService;
24+
import com.google.adk.sessions.Session;
25+
import io.reactivex.rxjava3.core.Completable;
26+
import io.reactivex.rxjava3.core.Maybe;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.ListIterator;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* This class performs event compaction by retaining the tail of the event stream.
36+
*
37+
* <ul>
38+
* <li>Keeps the {@code retentionSize} most recent events raw.
39+
* <li>Compacts all events that never compacted and older than the retained tail, including the
40+
* most recent compaction event, into a new summary event.
41+
* <li>The new summary event is generated by the {@link BaseEventSummarizer}.
42+
* <li>Appends this new summary event to the end of the event stream.
43+
* </ul>
44+
*
45+
* <p>This compactor produces a rolling summary. Each new compaction event includes the content of
46+
* the previous compaction event (if any) along with new events, effectively superseding all prior
47+
* compactions.
48+
*/
49+
public final class TailRetentionEventCompactor implements EventCompactor {
50+
51+
private static final Logger logger = LoggerFactory.getLogger(TailRetentionEventCompactor.class);
52+
53+
private final BaseEventSummarizer summarizer;
54+
private final int retentionSize;
55+
56+
public TailRetentionEventCompactor(BaseEventSummarizer summarizer, int retentionSize) {
57+
this.summarizer = summarizer;
58+
this.retentionSize = retentionSize;
59+
}
60+
61+
@Override
62+
public Completable compact(Session session, BaseSessionService sessionService) {
63+
checkArgument(summarizer != null, "Missing BaseEventSummarizer for event compaction");
64+
logger.debug("Running tail retention event compaction for session {}", session.id());
65+
66+
return Completable.fromMaybe(
67+
getCompactionEvents(session.events())
68+
.flatMap(summarizer::summarizeEvents)
69+
.flatMapSingle(e -> sessionService.appendEvent(session, e)));
70+
}
71+
72+
/**
73+
* Identifies events to be compacted based on the tail retention strategy.
74+
*
75+
* <p>This method iterates backwards through the event list to find the most recent compaction
76+
* event (if any) and collects all uncompacted events that occurred after the range covered by
77+
* that compaction. It then applies the retention policy, excluding the most recent {@code
78+
* retentionSize} events from being compacted.
79+
*
80+
* <p><b>Basic Scenario:</b>
81+
*
82+
* <ul>
83+
* <li>Events: E1, E2, E3, E4, E5 (Chronological order)
84+
* <li>Retention Size: 2
85+
* <li>Action: Compaction is triggered. The compactor identifies E1, E2, and E3 as eligible
86+
* since E4, E5 need to be retained.
87+
* <li>Result: E1, E2, E3 are compacted into C1.
88+
* <li>Event stream after compaction: E1, E2, E3, E4, E5, C1. (Compaction event is appended in
89+
* the end.)
90+
* </ul>
91+
*
92+
* <p><b>Advanced Scenario (Handling Gaps):</b>
93+
*
94+
* <p>Consider an edge case where retention size is 3. Event E4 appears before the last compaction
95+
* event (C2) and even the one prior (C1), but remains uncompacted and must be included in the
96+
* third compaction (C3).
97+
*
98+
* <ul>
99+
* <li>T=1: E1
100+
* <li>T=2: E2
101+
* <li>T=3: E3
102+
* <li>T=4: E4
103+
* <li>T=5: C1 (Covers T=1). Generated when getCompactionEvents returned <i>List: E1</i>. E2,
104+
* E3, E4 were preserved.
105+
* <li>T=6: E6
106+
* <li>T=7: E7
107+
* <li>T=8: C2 (Covers T=1 to T=3; starts at T=1 because it includes C1). Generated when
108+
* getCompactionEvents returned <i>List: C1, E2, E3</i>. E4, E6, E7 were preserved.
109+
* <li>T=9: E9.
110+
* </ul>
111+
*
112+
* <p><b>Execution with Retention = 3:</b>
113+
*
114+
* <ol>
115+
* <li>The method scans backward: E9, C2, E7, E6, C1, E4...
116+
* <li><b>C2</b> is identified as the most recent compaction event (end timestamp T=3).
117+
* <li><b>E9, E7, E6</b> are collected as they are newer than T=3.
118+
* <li><b>C1</b> is ignored as we only care about the boundary set by the latest compaction.
119+
* <li><b>E4</b> (T=4) is collected because it is newer than T=3.
120+
* <li>Scanning stops at E3 as it is covered by C2 (timestamp <= T=3).
121+
* <li>The initial list of events to summarize: <b>[E9, E7, E6, E4]</b>.
122+
* <li>After appending the compaction event C2, the list becomes: <b>[E9, E7, E6, E4, C2]</b>
123+
* <li>Reversing the list: <b>[C2, E4, E6, E7, E9]</b>.
124+
* <li>Applying retention (keep last 3): <b>E6, E7, E9</b> are removed from the summary list.
125+
* <li><b>Final Output:</b> {@code [C2, E4]}. E4 and the previous summary C2 will be compacted
126+
* together. The new compaction event will cover the range from the start of the included
127+
* compaction event (C2, T=1) to the end of the new events (E4, T=4).
128+
* </ol>
129+
*/
130+
private Maybe<List<Event>> getCompactionEvents(List<Event> events) {
131+
long compactionEndTimestamp = Long.MIN_VALUE;
132+
Event lastCompactionEvent = null;
133+
List<Event> eventsToSummarize = new ArrayList<>();
134+
135+
// Iterate backwards from the end of the window to summarize.
136+
// We use a single loop to:
137+
// 1. Collect all raw events that happened after the latest compaction.
138+
// 2. Identify the latest compaction event to establish the stop condition (boundary).
139+
ListIterator<Event> iter = events.listIterator(events.size());
140+
while (iter.hasPrevious()) {
141+
Event event = iter.previous();
142+
143+
if (!isCompactEvent(event)) {
144+
// Only include events that are strictly after the last compaction range.
145+
if (event.timestamp() > compactionEndTimestamp) {
146+
eventsToSummarize.add(event);
147+
continue;
148+
} else {
149+
// Exit early if we have reached the last event of last compaction range.
150+
break;
151+
}
152+
}
153+
154+
EventCompaction compaction = event.actions().compaction().orElse(null);
155+
// We use the most recent compaction event to define the time boundary. Any subsequent (older)
156+
// compaction events are ignored.
157+
if (lastCompactionEvent == null) {
158+
compactionEndTimestamp = compaction.endTimestamp();
159+
lastCompactionEvent = event;
160+
}
161+
}
162+
163+
// If there are not enough events to summarize, we can return early.
164+
if (eventsToSummarize.size() <= retentionSize) {
165+
return Maybe.empty();
166+
}
167+
168+
// Add the last compaction event to the list of events to summarize.
169+
// This is to ensure that the last compaction event is included in the summary.
170+
if (lastCompactionEvent != null) {
171+
EventCompaction compaction = lastCompactionEvent.actions().compaction().get();
172+
eventsToSummarize.add(
173+
lastCompactionEvent.toBuilder()
174+
.content(compaction.compactedContent())
175+
// Use the start timestamp so that the new summary covers the entire range.
176+
.timestamp(compaction.startTimestamp())
177+
.build());
178+
}
179+
180+
Collections.reverse(eventsToSummarize);
181+
182+
// Apply retention: keep the most recent 'retentionSize' events out of the summary.
183+
// We do this by removing them from the list of events to be summarized.
184+
eventsToSummarize
185+
.subList(eventsToSummarize.size() - retentionSize, eventsToSummarize.size())
186+
.clear();
187+
return Maybe.just(eventsToSummarize);
188+
}
189+
190+
private static boolean isCompactEvent(Event event) {
191+
return event.actions() != null && event.actions().compaction().isPresent();
192+
}
193+
}

0 commit comments

Comments
 (0)