Merge pull request #25 from grafana/upgrade-prometheus

Upgrade prometheus
This commit is contained in:
Marco Pracucci 2021-11-03 12:02:24 +01:00 committed by GitHub
commit c04bab8758
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 35 deletions

View file

@ -148,6 +148,11 @@ type SelectHints struct {
ShardIndex uint64 // Current shard index (starts from 0 and up to ShardCount-1). ShardIndex uint64 // Current shard index (starts from 0 and up to ShardCount-1).
ShardCount uint64 // Total number of shards (0 means sharding is disabled). ShardCount uint64 // Total number of shards (0 means sharding is disabled).
// DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time.
// When disabled, the result may contain samples outside the queried time range but Select() performances
// may be improved.
DisableTrimming bool
} }
// TODO(bwplotka): Move to promql/engine_test.go? // TODO(bwplotka): Move to promql/engine_test.go?

View file

@ -1168,7 +1168,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err return err
} }
reqSize := len(*buf) reqSize := len(req)
*buf = req *buf = req
// An anonymous function allows us to defer the completion of our per-try spans // An anonymous function allows us to defer the completion of our per-try spans
@ -1264,7 +1264,6 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
} }
try++ try++
continue
} }
} }

View file

@ -808,7 +808,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
} }
all = indexr.SortedPostings(all) all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1, false))
if len(outBlocks) > 1 { if len(outBlocks) > 1 {
// To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy.
@ -820,7 +820,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
} }
all = indexr.SortedPostings(all) all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1, false))
} else { } else {
syms := indexr.Symbols() syms := indexr.Symbols()
if i == 0 { if i == 0 {

View file

@ -107,6 +107,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
mint := q.mint mint := q.mint
maxt := q.maxt maxt := q.maxt
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0 sharded := hints != nil && hints.ShardCount > 0
p, err := q.index.PostingsForMatchers(sharded, ms...) p, err := q.index.PostingsForMatchers(sharded, ms...)
if err != nil { if err != nil {
@ -122,13 +123,14 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
if hints != nil { if hints != nil {
mint = hints.Start mint = hints.Start
maxt = hints.End maxt = hints.End
disableTrimming = hints.DisableTrimming
if hints.Func == "series" { if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks. // When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt) return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming)
} }
} }
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
} }
// blockChunkQuerier provides chunk querying access to a single block database. // blockChunkQuerier provides chunk querying access to a single block database.
@ -148,9 +150,11 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
mint := q.mint mint := q.mint
maxt := q.maxt maxt := q.maxt
disableTrimming := false
if hints != nil { if hints != nil {
mint = hints.Start mint = hints.Start
maxt = hints.End maxt = hints.End
disableTrimming = hints.DisableTrimming
} }
sharded := hints != nil && hints.ShardCount > 0 sharded := hints != nil && hints.ShardCount > 0
p, err := q.index.PostingsForMatchers(sharded, ms...) p, err := q.index.PostingsForMatchers(sharded, ms...)
@ -163,7 +167,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
if sortSeries { if sortSeries {
p = q.index.SortedPostings(p) p = q.index.SortedPostings(p)
} }
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
} }
// PostingsForMatchers assembles a single postings iterator against the index reader // PostingsForMatchers assembles a single postings iterator against the index reader
@ -383,6 +387,7 @@ type blockBaseSeriesSet struct {
chunks ChunkReader chunks ChunkReader
tombstones tombstones.Reader tombstones tombstones.Reader
mint, maxt int64 mint, maxt int64
disableTrimming bool
currIterFn func() *populateWithDelGenericSeriesIterator currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels currLabels labels.Labels
@ -437,6 +442,7 @@ func (b *blockBaseSeriesSet) Next() bool {
} }
// If still not entirely deleted, check if trim is needed based on requested time range. // If still not entirely deleted, check if trim is needed based on requested time range.
if !b.disableTrimming {
if chk.MinTime < b.mint { if chk.MinTime < b.mint {
trimFront = true trimFront = true
} }
@ -444,6 +450,7 @@ func (b *blockBaseSeriesSet) Next() bool {
trimBack = true trimBack = true
} }
} }
}
if len(chks) == 0 { if len(chks) == 0 {
continue continue
@ -672,7 +679,7 @@ type blockSeriesSet struct {
blockBaseSeriesSet blockBaseSeriesSet
} }
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet { func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.SeriesSet {
return &blockSeriesSet{ return &blockSeriesSet{
blockBaseSeriesSet{ blockBaseSeriesSet{
index: i, index: i,
@ -681,6 +688,7 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde
p: p, p: p,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
disableTrimming: disableTrimming,
bufLbls: make(labels.Labels, 0, 10), bufLbls: make(labels.Labels, 0, 10),
}, },
} }
@ -704,7 +712,7 @@ type blockChunkSeriesSet struct {
blockBaseSeriesSet blockBaseSeriesSet
} }
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet { func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{ return &blockChunkSeriesSet{
blockBaseSeriesSet{ blockBaseSeriesSet{
index: i, index: i,
@ -713,6 +721,7 @@ func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p
p: p, p: p,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
disableTrimming: disableTrimming,
bufLbls: make(labels.Labels, 0, 10), bufLbls: make(labels.Labels, 0, 10),
}, },
} }

View file

@ -162,6 +162,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
type blockQuerierTestCase struct { type blockQuerierTestCase struct {
mint, maxt int64 mint, maxt int64
ms []*labels.Matcher ms []*labels.Matcher
hints *storage.SelectHints
exp storage.SeriesSet exp storage.SeriesSet
expChks storage.ChunkSeriesSet expChks storage.ChunkSeriesSet
} }
@ -179,7 +180,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
}, },
} }
res := q.Select(false, nil, c.ms...) res := q.Select(false, c.hints, c.ms...)
defer func() { require.NoError(t, q.Close()) }() defer func() { require.NoError(t, q.Close()) }()
for { for {
@ -214,7 +215,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
maxt: c.maxt, maxt: c.maxt,
}, },
} }
res := q.Select(false, nil, c.ms...) res := q.Select(false, c.hints, c.ms...)
defer func() { require.NoError(t, q.Close()) }() defer func() { require.NoError(t, q.Close()) }()
for { for {
@ -319,6 +320,56 @@ func TestBlockQuerier(t *testing.T) {
), ),
}), }),
}, },
{
// This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried
// time range will be returned.
mint: 2,
maxt: 6,
hints: &storage.SelectHints{Start: 2, End: 6, DisableTrimming: true},
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")},
exp: newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
),
}),
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}},
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}},
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
),
}),
},
{
// This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried
// time range will be returned.
mint: 5,
maxt: 6,
hints: &storage.SelectHints{Start: 5, End: 6, DisableTrimming: true},
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")},
exp: newMockSeriesSet([]storage.Series{
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
),
}),
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}},
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
),
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
),
}),
},
} { } {
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
ir, cr, _, _ := createIdxChkReaders(t, testData) ir, cr, _, _ := createIdxChkReaders(t, testData)