Skip to content

Commit 1c2463c

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Add Compaction RequestProcessor for event compaction in llm flow
PiperOrigin-RevId: 864704217
1 parent 9901307 commit 1c2463c

File tree

2 files changed

+216
-0
lines changed

2 files changed

+216
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.flows.llmflows;
18+
19+
import com.google.adk.agents.InvocationContext;
20+
import com.google.adk.models.LlmRequest;
21+
import com.google.adk.summarizer.EventsCompactionConfig;
22+
import com.google.adk.summarizer.TailRetentionEventCompactor;
23+
import com.google.common.collect.ImmutableList;
24+
import io.reactivex.rxjava3.core.Single;
25+
import java.util.Optional;
26+
27+
/** Request processor that performs event compaction. */
28+
public class Compaction implements RequestProcessor {
29+
30+
@Override
31+
public Single<RequestProcessingResult> processRequest(
32+
InvocationContext context, LlmRequest request) {
33+
Optional<EventsCompactionConfig> configOpt = context.eventsCompactionConfig();
34+
35+
if (configOpt.isEmpty()) {
36+
return Single.just(RequestProcessingResult.create(request, ImmutableList.of()));
37+
}
38+
39+
EventsCompactionConfig config = configOpt.get();
40+
41+
if (config.tokenThreshold() == null || config.eventRetentionSize() == null) {
42+
return Single.just(RequestProcessingResult.create(request, ImmutableList.of()));
43+
}
44+
45+
// Extract out the retention size and token threshold from the new config.
46+
int retentionSize = config.eventRetentionSize();
47+
int tokenThreshold = config.tokenThreshold();
48+
49+
// Summarizer will not be missing since the runner will always add a default one if missing.
50+
TailRetentionEventCompactor compactor =
51+
new TailRetentionEventCompactor(config.summarizer(), retentionSize, tokenThreshold);
52+
53+
return compactor
54+
.compact(context.session(), context.sessionService())
55+
.andThen(
56+
Single.just(
57+
RequestProcessor.RequestProcessingResult.create(request, ImmutableList.of())));
58+
}
59+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.flows.llmflows;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.ArgumentMatchers.eq;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
import com.google.adk.agents.InvocationContext;
27+
import com.google.adk.events.Event;
28+
import com.google.adk.models.LlmRequest;
29+
import com.google.adk.sessions.BaseSessionService;
30+
import com.google.adk.sessions.Session;
31+
import com.google.adk.summarizer.BaseEventSummarizer;
32+
import com.google.adk.summarizer.EventsCompactionConfig;
33+
import com.google.common.collect.ImmutableList;
34+
import com.google.genai.types.GenerateContentResponseUsageMetadata;
35+
import io.reactivex.rxjava3.core.Maybe;
36+
import io.reactivex.rxjava3.core.Single;
37+
import java.util.Optional;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
import org.junit.runner.RunWith;
41+
import org.junit.runners.JUnit4;
42+
43+
@RunWith(JUnit4.class)
44+
public class CompactionTest {
45+
46+
private InvocationContext context;
47+
private LlmRequest request;
48+
private Session session;
49+
private BaseSessionService sessionService;
50+
private BaseEventSummarizer summarizer;
51+
52+
@Before
53+
public void setUp() {
54+
context = mock(InvocationContext.class);
55+
request = LlmRequest.builder().build();
56+
session = Session.builder("test-session").build();
57+
sessionService = mock(BaseSessionService.class);
58+
summarizer = mock(BaseEventSummarizer.class);
59+
60+
when(context.session()).thenReturn(session);
61+
when(context.sessionService()).thenReturn(sessionService);
62+
}
63+
64+
@Test
65+
public void processRequest_noConfig_doesNothing() {
66+
when(context.eventsCompactionConfig()).thenReturn(Optional.empty());
67+
68+
Compaction compaction = new Compaction();
69+
compaction
70+
.processRequest(context, request)
71+
.test()
72+
.assertNoErrors()
73+
.assertValue(r -> r.updatedRequest() == request);
74+
75+
verify(sessionService, never()).appendEvent(any(), any());
76+
}
77+
78+
@Test
79+
public void processRequest_withConfig_triggersCompaction() {
80+
// Setup config with threshold 100
81+
EventsCompactionConfig config = new EventsCompactionConfig(5, 1, summarizer, 100, 2);
82+
when(context.eventsCompactionConfig()).thenReturn(Optional.of(config));
83+
84+
// Setup events with usage > 100 to trigger compaction
85+
Event event1 = mock(Event.class);
86+
Event event2 = mock(Event.class);
87+
Event event3 = mock(Event.class);
88+
when(event3.usageMetadata())
89+
.thenReturn(
90+
Optional.of(
91+
GenerateContentResponseUsageMetadata.builder().promptTokenCount(200).build()));
92+
93+
session =
94+
Session.builder("test-session").events(ImmutableList.of(event1, event2, event3)).build();
95+
when(context.session()).thenReturn(session);
96+
97+
// Summarizer mock
98+
Event summaryEvent = mock(Event.class);
99+
when(summarizer.summarizeEvents(any())).thenReturn(Maybe.just(summaryEvent));
100+
when(sessionService.appendEvent(any(), any())).thenReturn(Single.just(summaryEvent));
101+
102+
Compaction compaction = new Compaction();
103+
compaction
104+
.processRequest(context, request)
105+
.test()
106+
.assertNoErrors()
107+
.assertValue(r -> r.updatedRequest() == request);
108+
109+
// Verify compaction happened and result was appended
110+
verify(sessionService).appendEvent(eq(session), eq(summaryEvent));
111+
}
112+
113+
@Test
114+
public void processRequest_withConfig_skipsCompactionIfBelowThreshold() {
115+
// Setup config with threshold 500
116+
EventsCompactionConfig config = new EventsCompactionConfig(5, 1, summarizer, 500, 2);
117+
when(context.eventsCompactionConfig()).thenReturn(Optional.of(config));
118+
119+
// Setup events with usage 200 (below 500)
120+
Event event3 = mock(Event.class);
121+
when(event3.usageMetadata())
122+
.thenReturn(
123+
Optional.of(
124+
GenerateContentResponseUsageMetadata.builder().promptTokenCount(200).build()));
125+
126+
session = Session.builder("test-session").events(ImmutableList.of(event3)).build();
127+
when(context.session()).thenReturn(session);
128+
129+
Compaction compaction = new Compaction();
130+
compaction
131+
.processRequest(context, request)
132+
.test()
133+
.assertNoErrors()
134+
.assertValue(r -> r.updatedRequest() == request);
135+
136+
// Verify NO compaction
137+
verify(sessionService, never()).appendEvent(any(), any());
138+
}
139+
140+
@Test
141+
public void processRequest_withConfig_nullRetentionSize_doesNothing() {
142+
// Setup config with retentionSize = null
143+
EventsCompactionConfig config = new EventsCompactionConfig(5, 1, summarizer, 100, null);
144+
when(context.eventsCompactionConfig()).thenReturn(Optional.of(config));
145+
146+
Compaction compaction = new Compaction();
147+
compaction
148+
.processRequest(context, request)
149+
.test()
150+
.assertNoErrors()
151+
.assertValue(r -> r.updatedRequest() == request);
152+
153+
// Verify NO compaction and session.events() is not called
154+
verify(sessionService, never()).appendEvent(any(), any());
155+
verify(context, never()).session();
156+
}
157+
}

0 commit comments

Comments
 (0)