Upgraded upstream Prometheus

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2021-11-03 11:23:16 +01:00
commit 17d5a5b3df
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
5 changed files with 99 additions and 35 deletions

View file

@ -148,6 +148,11 @@ type SelectHints struct {
ShardIndex uint64 // Current shard index (starts from 0 and up to ShardCount-1). ShardIndex uint64 // Current shard index (starts from 0 and up to ShardCount-1).
ShardCount uint64 // Total number of shards (0 means sharding is disabled). ShardCount uint64 // Total number of shards (0 means sharding is disabled).
// DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time.
// When disabled, the result may contain samples outside the queried time range but Select() performances
// may be improved.
DisableTrimming bool
} }
// TODO(bwplotka): Move to promql/engine_test.go? // TODO(bwplotka): Move to promql/engine_test.go?

View file

@ -1168,7 +1168,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err return err
} }
reqSize := len(*buf) reqSize := len(req)
*buf = req *buf = req
// An anonymous function allows us to defer the completion of our per-try spans // An anonymous function allows us to defer the completion of our per-try spans
@ -1264,7 +1264,6 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
} }
try++ try++
continue
} }
} }

View file

@ -808,7 +808,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
} }
all = indexr.SortedPostings(all) all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1, false))
if len(outBlocks) > 1 { if len(outBlocks) > 1 {
// To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy.
@ -820,7 +820,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
} }
all = indexr.SortedPostings(all) all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1, false))
} else { } else {
syms := indexr.Symbols() syms := indexr.Symbols()
if i == 0 { if i == 0 {

View file

@ -107,6 +107,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
mint := q.mint mint := q.mint
maxt := q.maxt maxt := q.maxt
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0 sharded := hints != nil && hints.ShardCount > 0
p, err := q.index.PostingsForMatchers(sharded, ms...) p, err := q.index.PostingsForMatchers(sharded, ms...)
if err != nil { if err != nil {
@ -122,13 +123,14 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
if hints != nil { if hints != nil {
mint = hints.Start mint = hints.Start
maxt = hints.End maxt = hints.End
disableTrimming = hints.DisableTrimming
if hints.Func == "series" { if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks. // When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt) return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming)
} }
} }
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
} }
// blockChunkQuerier provides chunk querying access to a single block database. // blockChunkQuerier provides chunk querying access to a single block database.
@ -148,9 +150,11 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
mint := q.mint mint := q.mint
maxt := q.maxt maxt := q.maxt
disableTrimming := false
if hints != nil { if hints != nil {
mint = hints.Start mint = hints.Start
maxt = hints.End maxt = hints.End
disableTrimming = hints.DisableTrimming
} }
sharded := hints != nil && hints.ShardCount > 0 sharded := hints != nil && hints.ShardCount > 0
p, err := q.index.PostingsForMatchers(sharded, ms...) p, err := q.index.PostingsForMatchers(sharded, ms...)
@ -163,7 +167,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
if sortSeries { if sortSeries {
p = q.index.SortedPostings(p) p = q.index.SortedPostings(p)
} }
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
} }
// PostingsForMatchers assembles a single postings iterator against the index reader // PostingsForMatchers assembles a single postings iterator against the index reader
@ -378,11 +382,12 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin
// Iterated series are trimmed with given min and max time as well as tombstones. // Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating. // See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct { type blockBaseSeriesSet struct {
p index.Postings p index.Postings
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
tombstones tombstones.Reader tombstones tombstones.Reader
mint, maxt int64 mint, maxt int64
disableTrimming bool
currIterFn func() *populateWithDelGenericSeriesIterator currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels currLabels labels.Labels
@ -437,11 +442,13 @@ func (b *blockBaseSeriesSet) Next() bool {
} }
// If still not entirely deleted, check if trim is needed based on requested time range. // If still not entirely deleted, check if trim is needed based on requested time range.
if chk.MinTime < b.mint { if !b.disableTrimming {
trimFront = true if chk.MinTime < b.mint {
} trimFront = true
if chk.MaxTime > b.maxt { }
trimBack = true if chk.MaxTime > b.maxt {
trimBack = true
}
} }
} }
@ -672,16 +679,17 @@ type blockSeriesSet struct {
blockBaseSeriesSet blockBaseSeriesSet
} }
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet { func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.SeriesSet {
return &blockSeriesSet{ return &blockSeriesSet{
blockBaseSeriesSet{ blockBaseSeriesSet{
index: i, index: i,
chunks: c, chunks: c,
tombstones: t, tombstones: t,
p: p, p: p,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
bufLbls: make(labels.Labels, 0, 10), disableTrimming: disableTrimming,
bufLbls: make(labels.Labels, 0, 10),
}, },
} }
} }
@ -704,16 +712,17 @@ type blockChunkSeriesSet struct {
blockBaseSeriesSet blockBaseSeriesSet
} }
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet { func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{ return &blockChunkSeriesSet{
blockBaseSeriesSet{ blockBaseSeriesSet{
index: i, index: i,
chunks: c, chunks: c,
tombstones: t, tombstones: t,
p: p, p: p,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
bufLbls: make(labels.Labels, 0, 10), disableTrimming: disableTrimming,
bufLbls: make(labels.Labels, 0, 10),
}, },
} }
} }

View file

@ -162,6 +162,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
type blockQuerierTestCase struct { type blockQuerierTestCase struct {
mint, maxt int64 mint, maxt int64
ms []*labels.Matcher ms []*labels.Matcher
hints *storage.SelectHints
exp storage.SeriesSet exp storage.SeriesSet
expChks storage.ChunkSeriesSet expChks storage.ChunkSeriesSet
} }
@ -179,7 +180,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
}, },
} }
res := q.Select(false, nil, c.ms...) res := q.Select(false, c.hints, c.ms...)
defer func() { require.NoError(t, q.Close()) }() defer func() { require.NoError(t, q.Close()) }()
for { for {
@ -214,7 +215,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
maxt: c.maxt, maxt: c.maxt,
}, },
} }
res := q.Select(false, nil, c.ms...) res := q.Select(false, c.hints, c.ms...)
defer func() { require.NoError(t, q.Close()) }() defer func() { require.NoError(t, q.Close()) }()
for { for {
@ -319,6 +320,56 @@ func TestBlockQuerier(t *testing.T) {
), ),
}), }),
}, },
{
// This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried
// time range will be returned.
mint: 2,
maxt: 6,
hints: &storage.SelectHints{Start: 2, End: 6, DisableTrimming: true},
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")},
exp: newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
),
}),
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}},
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}},
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
),
}),
},
{
// This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried
// time range will be returned.
mint: 5,
maxt: 6,
hints: &storage.SelectHints{Start: 5, End: 6, DisableTrimming: true},
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")},
exp: newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
),
}),
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
),
}),
},
} { } {
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
ir, cr, _, _ := createIdxChkReaders(t, testData) ir, cr, _, _ := createIdxChkReaders(t, testData)