Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package baidubce
import (
"context"
"flag"
"fmt"

"io"
"time"
Expand Down Expand Up @@ -41,6 +42,7 @@ type BOSStorageConfig struct {
Endpoint string `yaml:"endpoint"`
AccessKeyID string `yaml:"access_key_id"`
SecretAccessKey flagext.Secret `yaml:"secret_access_key"`
PathPrefix string `yaml:"path_prefix"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -54,6 +56,7 @@ func (cfg *BOSStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.StringVar(&cfg.Endpoint, prefix+"baidubce.endpoint", DefaultEndpoint, "BOS endpoint to connect to.")
f.StringVar(&cfg.AccessKeyID, prefix+"baidubce.access-key-id", "", "Baidu Cloud Engine (BCE) Access Key ID.")
f.Var(&cfg.SecretAccessKey, prefix+"baidubce.secret-access-key", "Baidu Cloud Engine (BCE) Secret Access Key.")
f.StringVar(&cfg.PathPrefix, prefix+"baidubce.path-prefix", "", "BOS write prefix")
}

type BOSObjectStorage struct {
Expand Down Expand Up @@ -84,6 +87,7 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
if err != nil {
return err
}
objectKey := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, objectKey)
_, err = b.client.BasicPutObject(b.cfg.BucketName, objectKey, body)
return err
})
Expand All @@ -93,6 +97,7 @@ func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.
var res *api.GetObjectResult
err := instrument.CollectedRequest(ctx, "BOS.GetObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
objectKey := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, objectKey)
res, requestErr = b.client.BasicGetObject(b.cfg.BucketName, objectKey)
return requestErr
})
Expand All @@ -109,6 +114,7 @@ func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter st

err := instrument.CollectedRequest(ctx, "BOS.List", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
args := new(api.ListObjectsArgs)
prefix := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, prefix)
args.Prefix = prefix
args.Delimiter = delimiter
for {
Expand Down Expand Up @@ -146,6 +152,7 @@ func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter st

func (b *BOSObjectStorage) DeleteObject(ctx context.Context, objectKey string) error {
return instrument.CollectedRequest(ctx, "BOS.DeleteObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
objectKey := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, objectKey)
err := b.client.DeleteObject(b.cfg.BucketName, objectKey)
return err
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/storage/cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool)
}

ss := strings.Split(object.Key, delimiter)
if len(ss) < 2 || len(ss) > 3 {
return fmt.Errorf("invalid key: %s", object.Key)
}
// if len(ss) < 2 || len(ss) > 3 {
// return fmt.Errorf("invalid key: %s", object.Key)
// }

tableName := ss[0]
tbl, ok := c.tables[tableName]
Expand Down