Skip to content

Commit d542651

Browse files
committed
allow e2d to keep snapshot history and configure retention policies, allow single node cluster to restart with on-disk data instead of restoring old snapshot/starting fresh every time
1 parent ef539cc commit d542651

File tree

7 files changed

+233
-135
lines changed

7 files changed

+233
-135
lines changed

cmd/e2d/app/run.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"go.uber.org/zap/zapcore"
7+
"math"
78
"strings"
89
"time"
910

@@ -41,10 +42,11 @@ type runOptions struct {
4142

4243
PeerDiscovery string `env:"E2D_PEER_DISCOVERY"`
4344

44-
SnapshotBackupURL string `env:"E2D_SNAPSHOT_BACKUP_URL"`
45-
SnapshotCompression bool `env:"E2D_SNAPSHOT_COMPRESSION"`
46-
SnapshotEncryption bool `env:"E2D_SNAPSHOT_ENCRYPTION"`
47-
SnapshotInterval time.Duration `env:"E2D_SNAPSHOT_INTERVAL"`
45+
SnapshotBackupURL string `env:"E2D_SNAPSHOT_BACKUP_URL"`
46+
SnapshotCompression bool `env:"E2D_SNAPSHOT_COMPRESSION"`
47+
SnapshotEncryption bool `env:"E2D_SNAPSHOT_ENCRYPTION"`
48+
SnapshotInterval time.Duration `env:"E2D_SNAPSHOT_INTERVAL"`
49+
SnapshotRetentionTime time.Duration `env:"E2D_SNAPSHOT_RETENTION_TIME"`
4850

4951
AWSAccessKey string `env:"E2D_AWS_ACCESS_KEY"`
5052
AWSSecretKey string `env:"E2D_AWS_SECRET_KEY"`
@@ -67,17 +69,17 @@ func newRunCmd() *cobra.Command {
6769
}
6870
peerGetter, err := getPeerGetter(o)
6971
if err != nil {
70-
log.Fatalf("%+v", err)
72+
log.Fatal("unable to get peer getter", zap.Error(err))
7173
}
7274

7375
baddrs, err := getInitialBootstrapAddrs(o, peerGetter)
7476
if err != nil {
75-
log.Fatalf("%+v", err)
77+
log.Fatal("unable to get initial bootstrap addresses", zap.Error(err))
7678
}
7779

7880
snapshotter, err := getSnapshotProvider(o)
7981
if err != nil {
80-
log.Fatalf("%+v", err)
82+
log.Fatal("unable to set up snapshot provider", zap.Error(err))
8183
}
8284

8385
m, err := manager.New(&manager.Config{
@@ -141,10 +143,11 @@ func newRunCmd() *cobra.Command {
141143

142144
cmd.Flags().StringVar(&o.PeerDiscovery, "peer-discovery", "", "which method {aws-autoscaling-group,ec2-tags,do-tags} to use to discover peers")
143145

144-
cmd.Flags().DurationVar(&o.SnapshotInterval, "snapshot-interval", 1*time.Minute, "frequency of etcd snapshots")
145-
cmd.Flags().StringVar(&o.SnapshotBackupURL, "snapshot-backup-url", "", "an absolute path to shared filesystem storage (like file:///etcd-backups) or cloud storage bucket (like s3://etcd-backups) for snapshot backups")
146+
cmd.Flags().DurationVar(&o.SnapshotInterval, "snapshot-interval", 25*time.Minute, "frequency of etcd snapshots")
147+
cmd.Flags().StringVar(&o.SnapshotBackupURL, "snapshot-url", "", "an absolute path to shared filesystem directory (like file:///tmp/etcd-backups/) or cloud storage bucket (like s3://etcd-backups/mycluster/) for snapshot backups. snapshots will be named etcd.snapshot.<timestamp>, and a file etcd.snapshot.LATEST will point to the most recent snapshot.")
146148
cmd.Flags().BoolVar(&o.SnapshotCompression, "snapshot-compression", false, "compression snapshots with gzip")
147149
cmd.Flags().BoolVar(&o.SnapshotEncryption, "snapshot-encryption", false, "encrypt snapshots with aes-256")
150+
cmd.Flags().DurationVar(&o.SnapshotRetentionTime, "snapshot-retention-time", 24*time.Hour, "maximum age of a snapshot before it is deleted, set this to nonzero to enable retention support")
148151

149152
cmd.Flags().StringVar(&o.AWSAccessKey, "aws-access-key", "", "")
150153
cmd.Flags().StringVar(&o.AWSSecretKey, "aws-secret-key", "", "")
@@ -238,18 +241,21 @@ func getSnapshotProvider(o *runOptions) (snapshot.Snapshotter, error) {
238241

239242
switch u.Type {
240243
case snapshot.FileType:
241-
return snapshot.NewFileSnapshotter(u.Path)
244+
return snapshot.NewFileSnapshotter(u.Path, o.SnapshotRetentionTime)
242245
case snapshot.S3Type:
246+
origSnapshotRetentionDays := o.SnapshotRetentionTime.Hours() / 24
247+
snapshotRetentionDays := int64(math.Ceil(origSnapshotRetentionDays))
248+
if int64(origSnapshotRetentionDays) != snapshotRetentionDays {
249+
log.Warn("S3 retention time rounded to the nearest day",
250+
zap.Float64("original-days", origSnapshotRetentionDays),
251+
zap.Int64("new-days", snapshotRetentionDays),
252+
)
253+
}
243254
return snapshot.NewAmazonSnapshotter(&snapshot.AmazonConfig{
244255
RoleSessionName: o.AWSRoleSessionName,
245256
Bucket: u.Bucket,
246257
Key: u.Path,
247-
})
248-
case snapshot.SpacesType:
249-
return snapshot.NewDigitalOceanSnapshotter(&snapshot.DigitalOceanConfig{
250-
SpacesURL: o.SnapshotBackupURL,
251-
SpacesAccessKey: o.DOSpacesKey,
252-
SpacesSecretKey: o.DOSpacesSecret,
258+
RetentionDays: snapshotRetentionDays,
253259
})
254260
default:
255261
return nil, errors.Errorf("unsupported snapshot url format: %#v", o.SnapshotBackupURL)

pkg/manager/manager.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (m *Manager) restoreFromSnapshot(peers []*Peer) (bool, error) {
189189
// cluster, by conveying information about whether this is a brand new cluster
190190
// or an existing cluster that recovered from total cluster failure.
191191
func (m *Manager) startEtcdCluster(peers []*Peer) error {
192-
snapshot, err := m.restoreFromSnapshot(peers)
192+
restored, err := m.restoreFromSnapshot(peers)
193193
if err != nil {
194194
log.Error("cannot restore snapshot", zap.Error(err))
195195
}
@@ -199,7 +199,7 @@ func (m *Manager) startEtcdCluster(peers []*Peer) error {
199199
if err := m.etcd.startNew(ctx, peers); err != nil {
200200
return err
201201
}
202-
if !snapshot {
202+
if !restored {
203203
return nil
204204
}
205205

@@ -449,6 +449,7 @@ func (m *Manager) runSnapshotter() {
449449
log.Info("snapshotting disabled: no snapshot backup set")
450450
return
451451
}
452+
452453
log.Debug("starting snapshotter")
453454
ticker := time.NewTicker(m.cfg.SnapshotInterval)
454455
defer ticker.Stop()
@@ -459,7 +460,7 @@ func (m *Manager) runSnapshotter() {
459460
select {
460461
case <-ticker.C:
461462
if m.etcd.isRestarting() {
462-
log.Debug("server is restarting, skipping snapshot backup")
463+
log.Warn("server is restarting, skipping snapshot backup")
463464
continue
464465
}
465466
if !m.etcd.isLeader() {
@@ -469,7 +470,7 @@ func (m *Manager) runSnapshotter() {
469470
log.Debug("starting snapshot backup")
470471
snapshotData, snapshotSize, rev, err := m.etcd.createSnapshot(latestRev)
471472
if err != nil {
472-
log.Debug("cannot create snapshot",
473+
log.Info("skipping snapshot, etcd revision hasn't changed since last snapshot",
473474
zap.String("name", shortName(m.cfg.Name)),
474475
zap.Error(err),
475476
)
@@ -482,7 +483,7 @@ func (m *Manager) runSnapshotter() {
482483
snapshotData = snapshotutil.NewGzipReadCloser(snapshotData)
483484
}
484485
if err := m.snapshotter.Save(snapshotData); err != nil {
485-
log.Debug("cannot save snapshot",
486+
log.Error("cannot save snapshot",
486487
zap.String("name", shortName(m.cfg.Name)),
487488
zap.Error(err),
488489
)
@@ -509,9 +510,28 @@ func (m *Manager) Run() error {
509510
case 1:
510511
// a single-node etcd cluster does not require gossip or need to wait for
511512
// other members and therefore can start immediately
512-
if err := m.startEtcdCluster([]*Peer{{m.cfg.Name, m.cfg.PeerURL.String()}}); err != nil {
513-
return err
513+
peers := []*Peer{{m.cfg.Name, m.cfg.PeerURL.String()}}
514+
if _, err := os.Lstat(m.cfg.Dir); err == nil {
515+
// we have a data directory, attempt to use that
516+
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Minute)
517+
defer cancel()
518+
// try to use the on-disk data, and if it fails, fall back to restoring / creating new cluster
519+
if err := m.etcd.joinExisting(ctx, peers); err != nil {
520+
log.Error("unable to start from the on-disk etcd data, attempting to restore from snapshot or create new cluster", zap.Error(err))
521+
if err := m.startEtcdCluster(peers); err != nil {
522+
log.Error("unable to restore from backup and unable to create a new cluster", zap.Error(err))
523+
return err
524+
}
525+
}
526+
} else {
527+
// we might be either a brand new cluster of size one, or one that was recently started
528+
// after a node failure. try restoring from snapshot to recover, or create a new blank cluster instead.
529+
if err := m.startEtcdCluster(peers); err != nil {
530+
log.Error("no data directory exists, and unable to start new cluster or restore from backup", zap.Error(err))
531+
return err
532+
}
514533
}
534+
515535
case 3, 5:
516536
// all multi-node clusters require the gossip network to be started
517537
if err := m.gossip.Start(m.ctx, m.cfg.BootstrapAddrs); err != nil {

pkg/snapshot/snapshot.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package snapshot
22

33
import (
4+
"encoding/json"
45
"io"
56
"net/url"
67
"path/filepath"
@@ -35,9 +36,11 @@ type Type int
3536
const (
3637
FileType Type = iota
3738
S3Type
38-
SpacesType
3939
)
4040

41+
const snapshotFilename = "etcd.snapshot"
42+
const latestSuffix = "LATEST"
43+
4144
type URL struct {
4245
Type Type
4346
Bucket string
@@ -46,14 +49,28 @@ type URL struct {
4649

4750
var (
4851
ErrInvalidScheme = errors.New("invalid scheme")
52+
ErrInvalidDirectoryPath = errors.New("path must be a directory")
4953
ErrCannotParseURL = errors.New("cannot parse url")
5054
)
5155

56+
type LatestFile struct {
57+
Path string
58+
Timestamp string
59+
}
60+
61+
func (l *LatestFile) generate() ([]byte, error) {
62+
content, err := json.Marshal(&l)
63+
return content, err
64+
}
65+
66+
func (l *LatestFile) read(input []byte) error {
67+
return json.Unmarshal(input, l)
68+
}
69+
5270
// ParseSnapshotBackupURL deconstructs a uri into a type prefix and a bucket
5371
// example inputs and outputs:
5472
// file://file -> file://, file
5573
// s3://bucket -> s3://, bucket
56-
// https://nyc3.digitaloceanspaces.com/bucket -> digitaloceanspaces, bucket
5774
func ParseSnapshotBackupURL(s string) (*URL, error) {
5875
if !hasValidScheme(s) {
5976
return nil, errors.Wrapf(ErrInvalidScheme, "url does not specify valid scheme: %#v", s)
@@ -65,40 +82,23 @@ func ParseSnapshotBackupURL(s string) (*URL, error) {
6582

6683
switch strings.ToLower(u.Scheme) {
6784
case "file":
85+
if !strings.HasSuffix(u.Path, string(filepath.Separator)) {
86+
return nil, ErrInvalidDirectoryPath
87+
}
6888
return &URL{
6989
Type: FileType,
7090
Path: filepath.Join(u.Host, u.Path),
7191
}, nil
7292
case "s3":
73-
if u.Path == "" {
74-
u.Path = "etcd.snapshot"
93+
path := strings.TrimPrefix(u.Path, "/")
94+
if !strings.HasSuffix(path, "/") && path != "" {
95+
return nil, ErrInvalidDirectoryPath
7596
}
7697
return &URL{
7798
Type: S3Type,
7899
Bucket: u.Host,
79-
Path: strings.TrimPrefix(u.Path, "/"),
100+
Path: path,
80101
}, nil
81-
case "http", "https":
82-
if strings.Contains(u.Host, "digitaloceanspaces") {
83-
bucket, path := parseBucketKey(strings.TrimPrefix(u.Path, "/"))
84-
return &URL{
85-
Type: SpacesType,
86-
Bucket: bucket,
87-
Path: path,
88-
}, nil
89-
}
90102
}
91103
return nil, errors.Wrap(ErrCannotParseURL, s)
92104
}
93-
94-
func parseBucketKey(s string) (string, string) {
95-
parts := strings.SplitN(s, "/", 2)
96-
switch len(parts) {
97-
case 1:
98-
return parts[0], "etcd.snapshot"
99-
case 2:
100-
return parts[0], parts[1]
101-
default:
102-
return "", ""
103-
}
104-
}

0 commit comments

Comments
 (0)