Skip to content

Commit c04a11d

Browse files
yuchen-dbjnyi
authored andcommitted
Cherry-pick upstream goroutine leak fixes from OSS Thanos (#245)
1 parent d3157f5 commit c04a11d

7 files changed

Lines changed: 87 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1212

1313
### Fixed
1414

15+
- [#8378](https://github.com/thanos-io/thanos/pull/8378): Store: fix the reuse of dirty posting slices
16+
1517
### Added
1618

1719
### Changed

pkg/store/bucket.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/prometheus/prometheus/tsdb/chunks"
3535
"github.com/prometheus/prometheus/tsdb/encoding"
3636
"github.com/prometheus/prometheus/tsdb/index"
37+
"github.com/prometheus/prometheus/util/zeropool"
3738
"github.com/weaveworks/common/httpgrpc"
3839
"golang.org/x/exp/slices"
3940
"golang.org/x/sync/errgroup"
@@ -125,8 +126,22 @@ const (
125126
var (
126127
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
127128
hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}
129+
postingsPool zeropool.Pool[[]storage.SeriesRef]
128130
)
129131

132+
func getPostingsSlice() []storage.SeriesRef {
133+
if p := postingsPool.Get(); p != nil {
134+
return p
135+
}
136+
137+
// Pre-allocate slice with initial capacity.
138+
return make([]storage.SeriesRef, 0, 1024)
139+
}
140+
141+
func putPostingsSlice(p []storage.SeriesRef) {
142+
postingsPool.Put(p[:0])
143+
}
144+
130145
type bucketStoreMetrics struct {
131146
blocksLoaded prometheus.Gauge
132147
blockLoads prometheus.Counter
@@ -1693,6 +1708,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
16931708
err = g.Wait()
16941709
})
16951710
if err != nil {
1711+
for _, resp := range respSets {
1712+
resp.Close()
1713+
}
16961714
code := codes.Aborted
16971715
if s, ok := status.FromError(errors.Cause(err)); ok {
16981716
code = s.Code()
@@ -2539,6 +2557,10 @@ type bucketIndexReader struct {
25392557

25402558
indexVersion int
25412559
logger log.Logger
2560+
2561+
// Posting slice to return to the postings pool on close.
2562+
// A single bucketIndexReader should have at most 1 postings slice to return.
2563+
postings []storage.SeriesRef
25422564
}
25432565

25442566
func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader {
@@ -2659,13 +2681,13 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch
26592681

26602682
// ExpandPostingsWithContext returns the postings expanded as a slice and considers context.
26612683
func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage.SeriesRef, error) {
2662-
res := make([]storage.SeriesRef, 0, 1024) // Pre-allocate slice with initial capacity
2684+
res := getPostingsSlice()
26632685
i := 0
26642686
for p.Next() {
26652687
i++
26662688
if i%checkContextEveryNIterations == 0 {
26672689
if err := ctx.Err(); err != nil {
2668-
return nil, err
2690+
return res, err
26692691
}
26702692
}
26712693
res = append(res, p.At())
@@ -2958,6 +2980,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
29582980
}
29592981

29602982
ps, err := ExpandPostingsWithContext(ctx, p)
2983+
r.postings = ps
29612984
if err != nil {
29622985
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
29632986
return false, nil, nil
@@ -3394,6 +3417,10 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym
33943417
// Close released the underlying resources of the reader.
33953418
func (r *bucketIndexReader) Close() error {
33963419
r.block.pendingReaders.Done()
3420+
3421+
if r.postings != nil {
3422+
putPostingsSlice(r.postings)
3423+
}
33973424
return nil
33983425
}
33993426

pkg/store/bucket_e2e_test.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
131131
return
132132
}
133133

134-
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
134+
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, opts ...BucketStoreOption) *storeSuite {
135135
series := []labels.Labels{
136136
labels.FromStrings("a", "1", "b", "1"),
137137
labels.FromStrings("a", "1", "b", "2"),
@@ -176,10 +176,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
176176
true,
177177
true,
178178
time.Minute,
179-
WithLogger(s.logger),
180-
WithIndexCache(s.cache),
181-
WithFilterConfig(filterConf),
182-
WithRegistry(reg),
179+
append(opts, WithLogger(s.logger),
180+
WithIndexCache(s.cache),
181+
WithFilterConfig(filterConf),
182+
WithRegistry(reg))...,
183183
)
184184
testutil.Ok(t, err)
185185
defer func() { testutil.Ok(t, store.Close()) }()
@@ -619,6 +619,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
619619
maxChunksLimit uint64
620620
maxSeriesLimit uint64
621621
maxBytesLimit int64
622+
storeOpts []BucketStoreOption
622623
expectedErr string
623624
code codes.Code
624625
}{
@@ -630,19 +631,40 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
630631
expectedErr: "exceeded chunks limit",
631632
code: codes.ResourceExhausted,
632633
},
634+
"should fail if the max chunks limit is exceeded - ResourceExhausted (sortingStrategyNone)": {
635+
maxChunksLimit: expectedChunks - 1,
636+
expectedErr: "exceeded chunks limit",
637+
storeOpts: []BucketStoreOption{WithDontResort(true)},
638+
code: codes.ResourceExhausted,
639+
},
633640
"should fail if the max series limit is exceeded - ResourceExhausted": {
634641
maxChunksLimit: expectedChunks,
635642
expectedErr: "exceeded series limit",
636643
maxSeriesLimit: 1,
637644
code: codes.ResourceExhausted,
638645
},
646+
"should fail if the max series limit is exceeded - ResourceExhausted (sortingStrategyNone)": {
647+
maxChunksLimit: expectedChunks,
648+
expectedErr: "exceeded series limit",
649+
maxSeriesLimit: 1,
650+
storeOpts: []BucketStoreOption{WithDontResort(true)},
651+
code: codes.ResourceExhausted,
652+
},
639653
"should fail if the max bytes limit is exceeded - ResourceExhausted": {
640654
maxChunksLimit: expectedChunks,
641655
expectedErr: "exceeded bytes limit",
642656
maxSeriesLimit: 2,
643657
maxBytesLimit: 1,
644658
code: codes.ResourceExhausted,
645659
},
660+
"should fail if the max bytes limit is exceeded - ResourceExhausted (sortingStrategyNone)": {
661+
maxChunksLimit: expectedChunks,
662+
expectedErr: "exceeded bytes limit",
663+
maxSeriesLimit: 2,
664+
maxBytesLimit: 1,
665+
storeOpts: []BucketStoreOption{WithDontResort(true)},
666+
code: codes.ResourceExhausted,
667+
},
646668
}
647669

648670
for testName, testData := range cases {
@@ -653,7 +675,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
653675

654676
dir := t.TempDir()
655677

656-
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf)
678+
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf, testData.storeOpts...)
657679
testutil.Ok(t, s.store.SyncBlocks(ctx))
658680

659681
req := &storepb.SeriesRequest{

pkg/store/bucket_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2971,7 +2971,7 @@ func TestExpandPostingsWithContextCancel(t *testing.T) {
29712971
res, err := ExpandPostingsWithContext(ctx, p)
29722972
testutil.NotOk(t, err)
29732973
testutil.Equals(t, context.Canceled, err)
2974-
testutil.Equals(t, []storage.SeriesRef(nil), res)
2974+
testutil.Equals(t, true, cap(res) == 1024)
29752975
}
29762976

29772977
func samePostingGroup(a, b *postingGroup) bool {

pkg/store/lazy_postings.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,19 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
279279
return nil, nil, errors.Wrap(err, "get postings")
280280
}
281281

282+
result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups)
283+
if err := ctx.Err(); err != nil {
284+
return nil, nil, err
285+
}
286+
ps, err := ExpandPostingsWithContext(ctx, result)
287+
r.postings = ps
288+
if err != nil {
289+
return nil, nil, errors.Wrap(err, "expand")
290+
}
291+
return ps, lazyMatchers, nil
292+
}
293+
294+
func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings {
282295
// Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys
283296
// again, and this is exactly the same order as before (when building the groups), so we can simply
284297
// use one incrementing index to fetch postings from returned slice.
@@ -306,14 +319,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
306319
}
307320
}
308321

309-
result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...))
310-
311-
if err := ctx.Err(); err != nil {
312-
return nil, nil, err
313-
}
314-
ps, err := ExpandPostingsWithContext(ctx, result)
315-
if err != nil {
316-
return nil, nil, errors.Wrap(err, "expand")
317-
}
318-
return ps, lazyMatchers, nil
322+
return index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...))
319323
}

pkg/store/proxy_merge.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/pkg/errors"
2020
"github.com/prometheus/client_golang/prometheus"
2121
"github.com/prometheus/prometheus/model/labels"
22+
2223
grpc_opentracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware"
2324

2425
"github.com/thanos-io/thanos/pkg/losertree"
@@ -277,6 +278,8 @@ type lazyRespSet struct {
277278
initialized bool
278279

279280
shardMatcher *storepb.ShardMatcher
281+
282+
donec chan struct{}
280283
}
281284

282285
func (l *lazyRespSet) isEmpty() bool {
@@ -385,6 +388,7 @@ func newLazyRespSet(
385388
ringHead: 0,
386389
ringTail: 0,
387390
closed: false,
391+
donec: make(chan struct{}),
388392
}
389393
respSet.storeLabels = make(map[string]struct{})
390394
for _, ls := range storeLabelSets {
@@ -403,6 +407,8 @@ func newLazyRespSet(
403407
l.span.SetTag("processed.samples", seriesStats.Samples)
404408
l.span.SetTag("processed.bytes", bytesProcessed)
405409
l.span.Finish()
410+
411+
close(l.donec)
406412
}()
407413

408414
numResponses := 0
@@ -611,13 +617,14 @@ func newAsyncRespSet(
611617

612618
func (l *lazyRespSet) Close() {
613619
l.bufferedResponsesMtx.Lock()
614-
defer l.bufferedResponsesMtx.Unlock()
615-
616620
l.closeSeries()
617621
l.closed = true
618622
l.bufferSlotEvent.Signal()
619623
l.noMoreData = true
620624
l.dataOrFinishEvent.Signal()
625+
l.bufferedResponsesMtx.Unlock()
626+
627+
<-l.donec // Wait for the internal goroutine to complete its work.
621628

622629
l.shardMatcher.Close()
623630
_ = l.cl.CloseSend()
@@ -806,11 +813,15 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]
806813
}
807814

808815
func (l *eagerRespSet) Close() {
816+
l.wg.Wait()
817+
809818
if l.closeSeries != nil {
810819
l.closeSeries()
811820
}
821+
l.wg.Wait()
812822
l.shardMatcher.Close()
813823
_ = l.cl.CloseSend()
824+
814825
}
815826

816827
func (l *eagerRespSet) At() *storepb.SeriesResponse {

pkg/store/proxy_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,10 +2055,6 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
20552055
return
20562056
}
20572057
}
2058-
2059-
// Wait until the last goroutine exits which is stuck on time.Sleep().
2060-
// Otherwise, goleak complains.
2061-
time.Sleep(5 * time.Second)
20622058
}
20632059

20642060
func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {

0 commit comments

Comments
 (0)