Skip to content

Commit aefeae0

Browse files
wdauchycursoragent
andcommitted
cache: add prefix/glob watch support to LinearCache for LEDS
Envoy's LEDS uses delta xDS with glob collection subscriptions where clients subscribe to resource names ending in /* (e.g. ns_svc_8080/us-east-1a/*) and expect to receive all individual resources whose name starts with that prefix. Currently LinearCache only supports exact-name watches and full wildcard watches. A subscription to collection/* is stored as a literal key in resourceWatches and never matches individual resources under that prefix. Add a new prefixWatches map to LinearCache that is keyed by the prefix (with trailing * stripped, keeping the separator). When a subscribed resource name ends with /*, the watch is registered in prefixWatches instead of resourceWatches. notifyAll iterates prefixWatches for each modified resource and triggers matching prefix watches. computeResourceChange expands prefix subscriptions to all matching resources in the cache, detecting new, updated, and deleted resources under the prefix. Signed-off-by: William Dauchy <william.dauchy@datadoghq.com> Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 5fb8d18 commit aefeae0

2 files changed

Lines changed: 371 additions & 7 deletions

File tree

pkg/cache/v3/linear.go

Lines changed: 115 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type LinearCache struct {
7070
// It does not contain wildcard watches.
7171
// It can contain resources not present in resources.
7272
resourceWatches map[string]watches
73+
// prefixWatches keeps track of watches for glob collection subscriptions (e.g. "collection/*").
74+
// Keyed by the prefix with the trailing glob stripped but the separator kept (e.g. "collection/").
75+
prefixWatches map[string]watches
7376
// wildcardWatches keeps track of all wildcard watches currently opened.
7477
wildcardWatches watches
7578
// currentWatchID is used to index new watches.
@@ -123,6 +126,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
123126
typeURL: typeURL,
124127
resources: make(map[string]*cachedResource),
125128
resourceWatches: make(map[string]watches),
129+
prefixWatches: make(map[string]watches),
126130
wildcardWatches: newWatches(),
127131
version: 0,
128132
currentWatchID: 0,
@@ -137,6 +141,33 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
137141
return out
138142
}
139143

144+
const globSuffix = "/*"
145+
146+
// isPrefixGlob returns the prefix (including the trailing separator) if name
147+
// is a glob collection subscription (e.g. "collection/*" → "collection/").
148+
func isPrefixGlob(name string) (string, bool) {
149+
if strings.HasSuffix(name, globSuffix) {
150+
return strings.TrimSuffix(name, "*"), true
151+
}
152+
return "", false
153+
}
154+
155+
// isResourceMatchingSubscription checks whether resourceName is covered by any
156+
// entry in subscribedResources, either as an exact match or via a prefix glob.
157+
func isResourceMatchingSubscription(subscribedResources map[string]struct{}, resourceName string) bool {
158+
if _, ok := subscribedResources[resourceName]; ok {
159+
return true
160+
}
161+
for sub := range subscribedResources {
162+
if prefix, ok := isPrefixGlob(sub); ok {
163+
if strings.HasPrefix(resourceName, prefix) {
164+
return true
165+
}
166+
}
167+
}
168+
return false
169+
}
170+
140171
// computeResourceChange compares the subscription known resources and the cache current state to compute the list of resources
141172
// which have changed and should be notified to the user.
142173
//
@@ -177,6 +208,36 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, useResourceVer
177208
}
178209
} else {
179210
for resourceName := range sub.SubscribedResources() {
211+
if prefix, ok := isPrefixGlob(resourceName); ok {
212+
// Expand the prefix glob to all matching resources in the cache.
213+
for cachedName, res := range cache.resources {
214+
if !strings.HasPrefix(cachedName, prefix) {
215+
continue
216+
}
217+
knownVersion, known := knownVersions[cachedName]
218+
if !known {
219+
changedResources = append(changedResources, cachedName)
220+
} else {
221+
resourceVersion, err := res.getVersion(useResourceVersion)
222+
if err != nil {
223+
return nil, nil, fmt.Errorf("failed to compute version of %s: %w", cachedName, err)
224+
}
225+
if knownVersion != resourceVersion {
226+
changedResources = append(changedResources, cachedName)
227+
}
228+
}
229+
}
230+
// Detect resources previously returned under this prefix that were deleted.
231+
for knownName := range knownVersions {
232+
if strings.HasPrefix(knownName, prefix) {
233+
if _, ok := cache.resources[knownName]; !ok {
234+
removedResources = append(removedResources, knownName)
235+
}
236+
}
237+
}
238+
continue
239+
}
240+
180241
res, exists := cache.resources[resourceName]
181242
knownVersion, known := knownVersions[resourceName]
182243
if !exists {
@@ -206,7 +267,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, useResourceVer
206267
for resourceName := range knownVersions {
207268
// If the subscription no longer watches a resource,
208269
// we mark it as unknown on the client side to ensure it will be resent to the client if subscribing again later on.
209-
if _, ok := sub.SubscribedResources()[resourceName]; !ok {
270+
if !isResourceMatchingSubscription(sub.SubscribedResources(), resourceName) {
210271
removedResources = append(removedResources, resourceName)
211272
}
212273
}
@@ -260,6 +321,14 @@ func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (W
260321
// Therefore drives on the subscription requested resources.
261322
resourcesToReturn = make([]string, 0, len(requestedResources))
262323
for resourceName := range requestedResources {
324+
if prefix, ok := isPrefixGlob(resourceName); ok {
325+
for cachedName := range cache.resources {
326+
if strings.HasPrefix(cachedName, prefix) {
327+
resourcesToReturn = append(resourcesToReturn, cachedName)
328+
}
329+
}
330+
continue
331+
}
263332
if _, ok := cache.resources[resourceName]; ok {
264333
resourcesToReturn = append(resourcesToReturn, resourceName)
265334
}
@@ -293,13 +362,18 @@ func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (W
293362

294363
func (cache *LinearCache) notifyAll(modified []string) error {
295364
// Gather the list of watches impacted by the modified resources.
296-
resourceWatches := newWatches()
365+
triggeredWatches := newWatches()
297366
for _, name := range modified {
298-
maps.Copy(resourceWatches, cache.resourceWatches[name])
367+
maps.Copy(triggeredWatches, cache.resourceWatches[name])
368+
for prefix, pw := range cache.prefixWatches {
369+
if strings.HasPrefix(name, prefix) {
370+
maps.Copy(triggeredWatches, pw)
371+
}
372+
}
299373
}
300374

301-
// non-wildcard watches
302-
for watchID, watch := range resourceWatches {
375+
// non-wildcard and prefix watches
376+
for watchID, watch := range triggeredWatches {
303377
response, err := cache.computeResponse(watch, false)
304378
if err != nil {
305379
return err
@@ -531,6 +605,15 @@ func (cache *LinearCache) trackWatch(watch watch) func() {
531605
cache.log.Infof("[linear cache] open watch %d (delta: %t) for %s resources %v", watchID, watch.isDelta(), cache.typeURL, sub.SubscribedResources())
532606
cache.log.Debugf("[linear cache] subscription details for watch %d: known versions %v, system version %q", watchID, sub.ReturnedResources(), cache.getVersion())
533607
for name := range sub.SubscribedResources() {
608+
if prefix, ok := isPrefixGlob(name); ok {
609+
pw, exists := cache.prefixWatches[prefix]
610+
if !exists {
611+
pw = newWatches()
612+
cache.prefixWatches[prefix] = pw
613+
}
614+
pw[watchID] = watch
615+
continue
616+
}
534617
watches, exists := cache.resourceWatches[name]
535618
if !exists {
536619
watches = newWatches()
@@ -556,6 +639,14 @@ func (cache *LinearCache) removeWatch(watchID uint64, sub Subscription) {
556639
// Make sure we clean the watch for ALL resources it might be associated with,
557640
// as the channel will no longer be listened to
558641
for resource := range sub.SubscribedResources() {
642+
if prefix, ok := isPrefixGlob(resource); ok {
643+
pw := cache.prefixWatches[prefix]
644+
delete(pw, watchID)
645+
if len(pw) == 0 {
646+
delete(cache.prefixWatches, prefix)
647+
}
648+
continue
649+
}
559650
resourceWatches := cache.resourceWatches[resource]
560651
delete(resourceWatches, watchID)
561652
if len(resourceWatches) == 0 {
@@ -581,11 +672,28 @@ func (cache *LinearCache) NumResources() int {
581672
return len(cache.resources)
582673
}
583674

584-
// NumWatches returns the number of active watches for a resource name, including wildcard ones.
675+
// NumWatches returns the number of active watches for a resource name, including wildcard and prefix ones.
585676
func (cache *LinearCache) NumWatches(name string) int {
586677
cache.mu.RLock()
587678
defer cache.mu.RUnlock()
588-
return len(cache.resourceWatches[name]) + len(cache.wildcardWatches)
679+
count := len(cache.resourceWatches[name]) + len(cache.wildcardWatches)
680+
for prefix, pw := range cache.prefixWatches {
681+
if strings.HasPrefix(name, prefix) {
682+
count += len(pw)
683+
}
684+
}
685+
return count
686+
}
687+
688+
// NumPrefixWatches returns the total number of active prefix watches.
689+
func (cache *LinearCache) NumPrefixWatches() int {
690+
cache.mu.RLock()
691+
defer cache.mu.RUnlock()
692+
count := 0
693+
for _, pw := range cache.prefixWatches {
694+
count += len(pw)
695+
}
696+
return count
589697
}
590698

591699
// NumWildcardWatches returns the number of wildcard watches.

0 commit comments

Comments
 (0)