Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions internal/example/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func main() {

// Create the snapshot that we'll serve to Envoy
snapshot := example.GenerateSnapshot()
if err := snapshot.Consistent(); err != nil {
l.Errorf("snapshot inconsistency: %+v\n%+v", snapshot, err)
os.Exit(1)
}
l.Debugf("will serve snapshot %+v", snapshot)

// Add the snapshot to the cache
Expand Down
13 changes: 6 additions & 7 deletions internal/example/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
router "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
)

Expand Down Expand Up @@ -168,12 +167,12 @@ func makeConfigSource() *core.ConfigSource {
return source
}

func GenerateSnapshot() *cache.Snapshot {
snap, _ := cache.NewSnapshot("1",
map[resource.Type][]types.Resource{
resource.ClusterType: {makeCluster(ClusterName)},
resource.RouteType: {makeRoute(RouteName, ClusterName)},
resource.ListenerType: {makeHTTPListener(ListenerName, RouteName)},
func GenerateSnapshot() *types.Snapshot {
snap, _ := types.NewSnapshot("1",
map[string][]types.SnapshotResource{
resource.ClusterType: {{Name: ClusterName, Resource: makeCluster(ClusterName)}},
resource.RouteType: {{Name: RouteName, Resource: makeRoute(RouteName, ClusterName)}},
resource.ListenerType: {{Name: ListenerName, Resource: makeHTTPListener(ListenerName, RouteName)}},
},
)
return snap
Expand Down
221 changes: 221 additions & 0 deletions pkg/cache/internal/cached_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think we should avoid the use of an "internal" package. See my comment below about potentially adding new packages and/or files.


import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sync"
"time"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
)

// Resource is the base interface for the xDS payload.
type Resource interface {
proto.Message
}

// ResourceWithTTL is a Resource with an optional TTL.
type ResourceWithTTL struct {
Resource Resource
TTL *time.Duration
}

// CachedResource is used to track resources added by the user in the cache.
// It contains the resource itself and its associated version (currently in two different modes).
// It should not be altered once created, to allow concurrent access.
type CachedResource struct {
Name string
typeURL string

resource Resource
ttl *time.Duration

// cacheVersion is the version of the cache at the time of last update, used in sotw.
cacheVersion string

marshalFunc func() ([]byte, error)
computeResourceVersionFunc func() (string, error)
}

type CachedResourceOption = func(*CachedResource)

// WithCacheVersion allows specifying the cacheVersion when the resource is set.
func WithCacheVersion(version string) CachedResourceOption {
return func(r *CachedResource) { r.cacheVersion = version }
}

// WithMarshaledResource enables the user to provide the already marshaled bytes if they have them.
// Those bytes should strive at being consistent if the object has not changed (beware protobuf non-deterministic marshaling)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Those bytes should strive at being consistent if the object has not changed (beware protobuf non-deterministic marshaling)
// Those bytes should strive for consistency if the object has not changed (beware of protobuf non-deterministic marshaling)

// or alternatively the resource version should also then be set.
// By default it is computed by performing a deterministic protobuf marshaling.
func WithMarshaledResource(bytes []byte) CachedResourceOption {
if len(bytes) == 0 {
return func(*CachedResource) {}
}
return func(r *CachedResource) { r.marshalFunc = func() ([]byte, error) { return bytes, nil } }
}

// WithResourceVersion enables the user to provide the resource version to be used.
// This version should be constant if the object has not changed to avoid needlessly sending resources to clients.
// By default it is computed by hashing the serialized version of the resource.
func WithResourceVersion(version string) CachedResourceOption {
if version == "" {
return func(*CachedResource) {}
}
return func(r *CachedResource) { r.computeResourceVersionFunc = func() (string, error) { return version, nil } }
}

// WithResourceTTL sets a TTL on the resource, that will be sent to the client with the payload.
func WithResourceTTL(ttl *time.Duration) CachedResourceOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe WithTTL? It sounds like you should be able to set the full Resource along with the TTL given the current name

return func(r *CachedResource) { r.ttl = ttl }
}

func NewCachedResource(name, typeURL string, res Resource, opts ...CachedResourceOption) *CachedResource {
cachedRes := &CachedResource{
Name: name,
typeURL: typeURL,
resource: res,
}
for _, opt := range opts {
opt(cachedRes)
}
if cachedRes.marshalFunc == nil {
cachedRes.marshalFunc = sync.OnceValues(func() ([]byte, error) {
return marshalResource(res)
})
}
if cachedRes.computeResourceVersionFunc == nil {
cachedRes.computeResourceVersionFunc = sync.OnceValues(func() (string, error) {
marshaled, err := cachedRes.marshalFunc()
if err != nil {
return "", fmt.Errorf("marshaling resource: %w", err)
}
return hashResource(marshaled), nil
})
}
return cachedRes
}

// SetCacheVersion updates the cache version. This violates the assumption that all fields can be safely read concurrently.
// It's required today for the linear cache constructor, where we are guaranteed resources are not being used concurrently,
// and should not be used elsewhere.
func (c *CachedResource) SetCacheVersion(version string) {
c.cacheVersion = version
}

// HasTTL returns whether the resource has a TTL set.
func (c *CachedResource) HasTTL() bool {
return c.ttl != nil
}

// getMarshaledResource lazily marshals the resource and returns the bytes.
func (c *CachedResource) getMarshaledResource() ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delete this method? It seems to only be called in the below methods and it just returns c.marshalfunc() anyways?

return c.marshalFunc()
}

// GetResourceVersion returns a stable version reflecting the resource content.
// By default it is built by hashing the serialized version of the object, using deterministic serializing.
func (c *CachedResource) GetResourceVersion() (string, error) {
return c.computeResourceVersionFunc()
}

// GetVersion returns the version for the resource.
// By default it returns the cache version when the resource was added, but if requested it will return a
// version specific to the resource content.
func (c *CachedResource) GetVersion(useResourceVersion bool) (string, error) {
if !useResourceVersion {
return c.cacheVersion, nil
}

return c.GetResourceVersion()
}

// GetRawResource returns the underlying resource for use in legacy accessors.
func (c *CachedResource) GetRawResource() ResourceWithTTL {
return ResourceWithTTL{
Resource: c.resource,
TTL: c.ttl,
}
}

var deltaResourceTypeURL = "type.googleapis.com/" + string(proto.MessageName(&discovery.Resource{}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to something non-delta specific?


// GetSotwResource returns the serialized resource to return within a sotw response, including TTL handling if applicable.
// It uses lazily computed and cached fields to ensure a resource is serialized at most once per update.
func (c *CachedResource) GetSotwResource(isHeartbeat bool) (*anypb.Any, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should call this method and the "delta" version something like

GetAnyResource()

vs

GetDiscoveryResource()

or something along those lines. My thought here is that this CachedResource doesn't necessarily care about Sotw or Delta, it just cares about different ways to serialize a discovery response given a provided marshaled resource. IMHO the method names should be description as to how the data is returned rather than the protocol that it will be used with

buildResource := func() (*anypb.Any, error) {
marshaled, err := c.getMarshaledResource()
if err != nil {
return nil, fmt.Errorf("marshaling: %w", err)
}
return &anypb.Any{
TypeUrl: c.typeURL,
Value: marshaled,
}, nil
}

if c.ttl == nil {
return buildResource()
}

wrappedResource := &discovery.Resource{
Name: c.Name,
Ttl: durationpb.New(*c.ttl),
}

if !isHeartbeat {
rsrc, err := buildResource()
if err != nil {
return nil, err
}
wrappedResource.Resource = rsrc
}

marshaled, err := marshalResource(wrappedResource)
if err != nil {
return nil, fmt.Errorf("marshaling discovery resource: %w", err)
}

return &anypb.Any{
TypeUrl: deltaResourceTypeURL,
Value: marshaled,
}, nil
}

// GetDeltaResource returns the serialized resource to return within a delta response.
// It uses lazily computed and cached fields to ensure a resource is serialized at most once per update.
func (c *CachedResource) GetDeltaResource() (*discovery.Resource, error) {
marshaled, err := c.getMarshaledResource()
if err != nil {
return nil, fmt.Errorf("marshaling: %w", err)
}
version, err := c.GetResourceVersion()
if err != nil {
return nil, fmt.Errorf("computing version: %w", err)
}
return &discovery.Resource{
Name: c.Name,
Resource: &anypb.Any{
TypeUrl: c.typeURL,
Value: marshaled,
},
Version: version,
}, nil
}

// hashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes.
func hashResource(resource []byte) string {
hasher := sha256.New()
hasher.Write(resource)
return hex.EncodeToString(hasher.Sum(nil))
}

// marshalResource converts the Resource to MarshaledResource.
func marshalResource(resource Resource) ([]byte, error) {
return proto.MarshalOptions{Deterministic: true}.Marshal(resource)
}
Loading