Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/containerd-nydus-grpc/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ func Start(ctx context.Context, cfg *config.SnapshotterConfig) error {
}
}

if interval := cfg.RemoteConfig.AuthConfig.CredentialRenewalInterval; interval > 0 {
auth.InitCredentialRenewal(ctx, interval)
}

return Serve(ctx, rs, opt, stopSignal)
}

Expand Down
103 changes: 31 additions & 72 deletions pkg/auth/renewal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,98 +7,57 @@
package auth

import (
"context"
"sync"
"time"

"github.com/containerd/log"
"github.com/containerd/nydus-snapshotter/pkg/metrics/data"
"github.com/containerd/nydus-snapshotter/pkg/rafs"
)

// renewalStore is the global credential store. It is nil when credential
// renewal is disabled (the default).
var renewalStore *credentialStore

// InitCredentialRenewal initializes the credential renewal subsystem.
// It creates the global store, runs an initial reconciliation pass to seed
// entries from live RAFS instances, and starts a background goroutine that
// reconciles and renews credentials at the given interval.
func InitCredentialRenewal(ctx context.Context, interval time.Duration) {
// InitCredentialStore creates the global credential store without starting
// any background goroutine. The caller is responsible for driving renewal
// (e.g., from snapshot/renewal.go).
func InitCredentialStore(interval time.Duration) {
renewalStore = newCredentialStore(interval)
// Reconcile existing credentials a first time.
renewalStore.reconcile()

log.G(ctx).WithField("interval", interval).Info("credential renewal initialized")
go renewLoop(ctx, renewalStore)
}

// renewLoop is the background goroutine that periodically reconciles and
// renews credentials.
func renewLoop(ctx context.Context, store *credentialStore) {
ticker := time.NewTicker(store.renewInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
store.reconcile()
}
}
}

// reconcile reconciles the credential store against the live RAFS instance
// list and renews all entries that should be active.
// 4 different situations are possible:
// - entry in store, live in RAFS: renew
// - entry in store, not live in RAFS: evict
// - entry not in store, not live in RAFS: nothing
// - entry not in store, live in RAFS: add
func (s *credentialStore) reconcile() {
live := make(map[string]struct{})
for _, r := range rafs.RafsGlobalCache.List() {
if r.ImageID != "" {
live[r.ImageID] = struct{}{}
}
}

for _, entry := range s.Entries() {
if _, inRAFS := live[entry.ref]; inRAFS {
log.L.WithField("ref", entry.ref).Debug("renewing credential entry")
s.renewEntry(entry.ref)
} else if time.Since(entry.renewedAt) >= s.renewInterval/2 {
s.Remove(entry.ref)
}
// Grace period: the entry was added recently (within interval/2) but
// has no RAFS entry yet. There is a possible race between a concurrent
// image pull and this renewal tick: GetRegistryKeyChain adds the ref to the
// store on the first layer fetch, but the RAFS entry is only created later
// when the mount completes. Evicting here would cause redundant provider
// lookups for every remaining layer fetch in the pull. We leave the
// entry intact; the next tick will either find it in RAFS (normal) or
// evict it (pull abandoned or failed).
}

for ref := range live {
if s.Get(ref) == nil {
s.renewEntry(ref)
}
}
}

// renewEntry fetches fresh credentials for ref from the renewable provider
// list. fetchFromProviders writes to the renewal store on success, so
// renewEntry only needs to update metrics.
func (s *credentialStore) renewEntry(ref string) {
kc := fetchFromProviders(&AuthRequest{Ref: ref, ValidUntil: time.Now().Add(s.renewInterval)}, renewableProviders())
// RenewCredential fetches fresh credentials for ref from the renewable
// provider list and caches them in the global store. Returns the keychain
// on success or nil on failure. Emits renewal metrics.
func RenewCredential(ref string) *PassKeyChain {
kc := fetchFromProviders(
&AuthRequest{Ref: ref, ValidUntil: time.Now().Add(renewalStore.renewInterval)},
renewableProviders(),
)
if kc != nil {
data.CredentialRenewals.WithLabelValues(ref, "success").Inc()
} else {
log.L.WithField("ref", ref).Warn("credential renewal returned no credentials from any provider")
data.CredentialRenewals.WithLabelValues(ref, "failure").Inc()
}
return kc
}

// EvictStaleCredentials removes store entries whose ref is not present in
// liveRefs. Entries added recently (within interval/2) are kept to avoid
// racing with a concurrent image pull: GetRegistryKeyChain adds the ref to
// the store on the first layer fetch, but the RAFS entry is only created
// later when the mount completes. Evicting here would cause redundant
// provider lookups for every remaining layer fetch in the pull.
func EvictStaleCredentials(liveRefs map[string]struct{}) {
if renewalStore == nil {
return
}
grace := renewalStore.renewInterval / 2
for _, entry := range renewalStore.Entries() {
if _, live := liveRefs[entry.ref]; !live && time.Since(entry.renewedAt) >= grace {
renewalStore.Remove(entry.ref)
}
}
}

// --- credentialEntry ---
Expand Down
134 changes: 89 additions & 45 deletions pkg/auth/renewal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package auth

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -206,16 +205,17 @@ func TestCredentialStoreConcurrency(t *testing.T) {
wg.Wait()
}

// --- renewEntry ---
// --- RenewCredential ---

func TestRenewEntry(t *testing.T) {
func TestRenewCredential(t *testing.T) {
const ref = "docker.io/library/nginx:latest"

tests := []struct {
name string
provider *trackingProvider
wantUser string
wantCall int32
wantNil bool
}{
{
name: "updates store on success",
Expand All @@ -224,24 +224,14 @@ func TestRenewEntry(t *testing.T) {
wantCall: 1,
},
{
name: "leaves existing entry unchanged on failure",
name: "returns nil on failure",
provider: func() *trackingProvider {
p := &trackingProvider{}
p.failNext.Store(true)
return p
}(),
wantUser: "original",
wantCall: 1,
},
{
name: "leaves existing entry unchanged when provider returns nil",
provider: func() *trackingProvider {
p := &trackingProvider{}
p.nilNext.Store(true)
return p
}(),
wantUser: "original",
wantCall: 1,
wantNil: true,
},
}

Expand All @@ -254,21 +244,97 @@ func TestRenewEntry(t *testing.T) {
renewalStore = oldStore
}()
renewableProviders = func() []AuthProvider { return []AuthProvider{tt.provider} }
renewalStore = newCredentialStore(5 * time.Minute)

store := newCredentialStore(5 * time.Minute)
store.Add(ref, &PassKeyChain{Username: "original", Password: "original"})
renewalStore = store

store.renewEntry(ref)
got := RenewCredential(ref)

assert.Equal(t, tt.wantCall, tt.provider.calls.Load())
got := store.Get(ref)
require.NotNil(t, got)
assert.Equal(t, tt.wantUser, got.Username)
if tt.wantNil {
assert.Nil(t, got)
} else {
require.NotNil(t, got)
assert.Equal(t, tt.wantUser, got.Username)
}
})
}
}

// --- EvictStaleCredentials ---

func TestEvictStaleCredentials(t *testing.T) {
tests := []struct {
name string
stored []string
live map[string]struct{}
wantRefs []string
}{
{
name: "evicts refs not in live set",
stored: []string{"ref-a", "ref-b", "ref-c"},
live: map[string]struct{}{"ref-a": {}, "ref-c": {}},
wantRefs: []string{"ref-a", "ref-c"},
},
{
name: "evicts all when live set is empty",
stored: []string{"ref-a", "ref-b"},
live: map[string]struct{}{},
wantRefs: nil,
},
{
name: "keeps all when all are live",
stored: []string{"ref-a", "ref-b"},
live: map[string]struct{}{"ref-a": {}, "ref-b": {}},
wantRefs: []string{"ref-a", "ref-b"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oldStore := renewalStore
defer func() { renewalStore = oldStore }()

// Use a tiny interval so grace period (interval/2) is effectively zero.
renewalStore = newCredentialStore(time.Millisecond)
for _, ref := range tt.stored {
renewalStore.Add(ref, &PassKeyChain{Username: "u", Password: "p"})
}
time.Sleep(time.Millisecond)

EvictStaleCredentials(tt.live)

entries := renewalStore.Entries()
gotRefs := make([]string, 0, len(entries))
for _, e := range entries {
gotRefs = append(gotRefs, e.ref)
}
assert.ElementsMatch(t, tt.wantRefs, gotRefs)
})
}
}

func TestEvictStaleCredentials_GracePeriod(t *testing.T) {
oldStore := renewalStore
defer func() { renewalStore = oldStore }()

// Grace period is interval/2 = 2.5 minutes. A freshly added entry
// should survive eviction even when not in the live set.
renewalStore = newCredentialStore(5 * time.Minute)
renewalStore.Add("recent-ref", &PassKeyChain{Username: "u", Password: "p"})

EvictStaleCredentials(map[string]struct{}{})

assert.NotNil(t, renewalStore.Get("recent-ref"), "entry within grace period should not be evicted")
}

func TestEvictStaleCredentials_NilStore(t *testing.T) {
oldStore := renewalStore
defer func() { renewalStore = oldStore }()
renewalStore = nil

// Should not panic.
EvictStaleCredentials(map[string]struct{}{"ref": {}})
}

// --- getRegistryKeyChainFromProviders ---

func TestGetRegistryKeyChainFromProviders(t *testing.T) {
Expand Down Expand Up @@ -347,25 +413,3 @@ func TestGetRegistryKeyChainFromProviders(t *testing.T) {
})
}
}

// --- InitCredentialRenewal ---

// TestInitCredentialRenewalLifecycle verifies that the renewal goroutine
// starts, ticks, and stops cleanly on context cancellation. The RAFS cache
// is empty in the test environment so reconcile is a no-op each tick.
func TestInitCredentialRenewalLifecycle(t *testing.T) {
oldStore := renewalStore
defer func() { renewalStore = oldStore }()

ctx, cancel := context.WithCancel(t.Context())

InitCredentialRenewal(ctx, 30*time.Millisecond)
require.NotNil(t, renewalStore)

// Let it tick a few times without error.
time.Sleep(100 * time.Millisecond)

cancel()
// Give goroutine time to observe cancellation.
time.Sleep(60 * time.Millisecond)
}
17 changes: 17 additions & 0 deletions pkg/daemon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
endpointStart = "/api/v1/daemon/start"
// Request nydus daemon to exit
endpointExit = "/api/v1/daemon/exit"
// Update daemon configuration at runtime.
endpointConfig = "/api/v1/config"

// --- V2 API begins
// Add/remove blobs managed by the blob cache manager.
Expand All @@ -72,6 +74,8 @@ type NydusdClient interface {
GetInflightMetrics() (*types.InflightMetrics, error)
GetCacheMetrics(sid string) (*types.CacheMetrics, error)

UpdateConfig(id string, params map[string]string) error

TakeOver() error
SendFd() error
Start() error
Expand Down Expand Up @@ -322,6 +326,19 @@ func (c *nydusdClient) GetCacheMetrics(sid string) (*types.CacheMetrics, error)
return &m, nil
}

func (c *nydusdClient) UpdateConfig(id string, params map[string]string) error {
body, err := json.Marshal(params)
if err != nil {
return errors.Wrap(err, "marshal config params")
}

q := query{}
q.Add("id", id)
url := c.url(endpointConfig, q)

return c.request(http.MethodPut, url, bytes.NewBuffer(body), nil)
}

func (c *nydusdClient) TakeOver() error {
url := c.url(endpointTakeOver, query{})
return c.request(http.MethodPut, url, nil, nil)
Expand Down
Loading
Loading