From f82e56fbba088c23f550827e95c1fff0a8b2cf4c Mon Sep 17 00:00:00 2001 From: sniper Date: Wed, 3 Nov 2021 17:10:31 +0800 Subject: [PATCH 1/2] fix request bytes size and continue is useless (#9635) Signed-off-by: kalmanzhao Co-authored-by: kalmanzhao --- storage/remote/queue_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index d76b634051..4cbcdd2b93 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1168,7 +1168,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti return err } - reqSize := len(*buf) + reqSize := len(req) *buf = req // An anonymous function allows us to defer the completion of our per-try spans @@ -1264,7 +1264,6 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } try++ - continue } } From 9f5ff5b2697d7856cd9fb587a397992d984eb468 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 3 Nov 2021 11:08:34 +0100 Subject: [PATCH 2/2] Allow to disable trimming when querying TSDB (#9647) * Allow to disable trimming when querying TSDB Signed-off-by: Marco Pracucci * Addressed review comments Signed-off-by: Marco Pracucci * Added unit test Signed-off-by: Marco Pracucci * Renamed TrimDisabled to DisableTrimming Signed-off-by: Marco Pracucci --- storage/interface.go | 5 ++++ tsdb/compact.go | 2 +- tsdb/querier.go | 68 +++++++++++++++++++++++++------------------- tsdb/querier_test.go | 55 +++++++++++++++++++++++++++++++++-- 4 files changed, 98 insertions(+), 32 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index d3dab0e21e..92ace42ae6 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -145,6 +145,11 @@ type SelectHints struct { Grouping []string // List of label names used in aggregation. By bool // Indicate whether it is without or by. Range int64 // Range vector selector range in milliseconds. + + // DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time. + // When disabled, the result may contain samples outside the queried time range but Select() performances + // may be improved. + DisableTrimming bool } // TODO(bwplotka): Move to promql/engine_test.go? diff --git a/tsdb/compact.go b/tsdb/compact.go index ce197f09a6..3c333f8584 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -726,7 +726,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } all = indexr.SortedPostings(all) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1)) + sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false)) syms := indexr.Symbols() if i == 0 { symbols = syms diff --git a/tsdb/querier.go b/tsdb/querier.go index a0bf762c14..90ee63854d 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -123,6 +123,8 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { mint := q.mint maxt := q.maxt + disableTrimming := false + p, err := PostingsForMatchers(q.index, ms...) if err != nil { return storage.ErrSeriesSet(err) @@ -134,13 +136,14 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms .. if hints != nil { mint = hints.Start maxt = hints.End + disableTrimming = hints.DisableTrimming if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. - return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt) + return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming) } } - return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) + return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) } // blockChunkQuerier provides chunk querying access to a single block database. @@ -160,9 +163,11 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { mint := q.mint maxt := q.maxt + disableTrimming := false if hints != nil { mint = hints.Start maxt = hints.End + disableTrimming = hints.DisableTrimming } p, err := PostingsForMatchers(q.index, ms...) if err != nil { @@ -171,7 +176,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, if sortSeries { p = q.index.SortedPostings(p) } - return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) + return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) } func findSetMatches(pattern string) []string { @@ -428,11 +433,12 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin // Iterated series are trimmed with given min and max time as well as tombstones. // See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating. type blockBaseSeriesSet struct { - p index.Postings - index IndexReader - chunks ChunkReader - tombstones tombstones.Reader - mint, maxt int64 + p index.Postings + index IndexReader + chunks ChunkReader + tombstones tombstones.Reader + mint, maxt int64 + disableTrimming bool currIterFn func() *populateWithDelGenericSeriesIterator currLabels labels.Labels @@ -487,11 +493,13 @@ func (b *blockBaseSeriesSet) Next() bool { } // If still not entirely deleted, check if trim is needed based on requested time range. - if chk.MinTime < b.mint { - trimFront = true - } - if chk.MaxTime > b.maxt { - trimBack = true + if !b.disableTrimming { + if chk.MinTime < b.mint { + trimFront = true + } + if chk.MaxTime > b.maxt { + trimBack = true + } } } @@ -722,16 +730,17 @@ type blockSeriesSet struct { blockBaseSeriesSet } -func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet { +func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.SeriesSet { return &blockSeriesSet{ blockBaseSeriesSet{ - index: i, - chunks: c, - tombstones: t, - p: p, - mint: mint, - maxt: maxt, - bufLbls: make(labels.Labels, 0, 10), + index: i, + chunks: c, + tombstones: t, + p: p, + mint: mint, + maxt: maxt, + disableTrimming: disableTrimming, + bufLbls: make(labels.Labels, 0, 10), }, } } @@ -754,16 +763,17 @@ type blockChunkSeriesSet struct { blockBaseSeriesSet } -func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet { +func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet { return &blockChunkSeriesSet{ blockBaseSeriesSet{ - index: i, - chunks: c, - tombstones: t, - p: p, - mint: mint, - maxt: maxt, - bufLbls: make(labels.Labels, 0, 10), + index: i, + chunks: c, + tombstones: t, + p: p, + mint: mint, + maxt: maxt, + disableTrimming: disableTrimming, + bufLbls: make(labels.Labels, 0, 10), }, } } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index c90aadaf39..f10be88b04 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -162,6 +162,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe type blockQuerierTestCase struct { mint, maxt int64 ms []*labels.Matcher + hints *storage.SelectHints exp storage.SeriesSet expChks storage.ChunkSeriesSet } @@ -179,7 +180,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C }, } - res := q.Select(false, nil, c.ms...) + res := q.Select(false, c.hints, c.ms...) defer func() { require.NoError(t, q.Close()) }() for { @@ -214,7 +215,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C maxt: c.maxt, }, } - res := q.Select(false, nil, c.ms...) + res := q.Select(false, c.hints, c.ms...) defer func() { require.NoError(t, q.Close()) }() for { @@ -319,6 +320,56 @@ func TestBlockQuerier(t *testing.T) { ), }), }, + { + // This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried + // time range will be returned. + mint: 2, + maxt: 6, + hints: &storage.SelectHints{Start: 2, End: 6, DisableTrimming: true}, + ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")}, + exp: newMockSeriesSet([]storage.Series{ + storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}}, + []tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}, sample{7, 4}}, + ), + storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}, + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}}, + ), + }), + expChks: newMockChunkSeriesSet([]storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}}, + []tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}}, + []tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, + ), + storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}, + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}, + []tsdbutil.Sample{sample{5, 3}, sample{6, 6}}, + ), + }), + }, + { + // This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried + // time range will be returned. + mint: 5, + maxt: 6, + hints: &storage.SelectHints{Start: 5, End: 6, DisableTrimming: true}, + ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")}, + exp: newMockSeriesSet([]storage.Series{ + storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}}, + []tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, + ), + storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}, + []tsdbutil.Sample{sample{5, 3}, sample{6, 6}}, + ), + }), + expChks: newMockChunkSeriesSet([]storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}}, + []tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, + ), + storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}, + []tsdbutil.Sample{sample{5, 3}, sample{6, 6}}, + ), + }), + }, } { t.Run("", func(t *testing.T) { ir, cr, _, _ := createIdxChkReaders(t, testData)