mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #14874 from krajorama/fix-panic-in-ooo-query2
BUGFIX: TSDB: panic in chunk querier
This commit is contained in:
commit
ab5994da02
|
@ -3493,6 +3493,56 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryOOOHeadDuringTruncate(t *testing.T) {
|
func TestQueryOOOHeadDuringTruncate(t *testing.T) {
|
||||||
|
testQueryOOOHeadDuringTruncate(t,
|
||||||
|
func(db *DB, minT, maxT int64) (storage.LabelQuerier, error) {
|
||||||
|
return db.Querier(minT, maxT)
|
||||||
|
},
|
||||||
|
func(t *testing.T, lq storage.LabelQuerier, minT, _ int64) {
|
||||||
|
// Samples
|
||||||
|
q, ok := lq.(storage.Querier)
|
||||||
|
require.True(t, ok)
|
||||||
|
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
require.True(t, ss.Next())
|
||||||
|
s := ss.At()
|
||||||
|
require.False(t, ss.Next()) // One series.
|
||||||
|
it := s.Iterator(nil)
|
||||||
|
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
||||||
|
require.Equal(t, minT, it.AtT()) // It is an in-order sample.
|
||||||
|
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
||||||
|
require.Equal(t, minT+50, it.AtT()) // it is an out-of-order sample.
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChunkQueryOOOHeadDuringTruncate(t *testing.T) {
|
||||||
|
testQueryOOOHeadDuringTruncate(t,
|
||||||
|
func(db *DB, minT, maxT int64) (storage.LabelQuerier, error) {
|
||||||
|
return db.ChunkQuerier(minT, maxT)
|
||||||
|
},
|
||||||
|
func(t *testing.T, lq storage.LabelQuerier, minT, _ int64) {
|
||||||
|
// Chunks
|
||||||
|
q, ok := lq.(storage.ChunkQuerier)
|
||||||
|
require.True(t, ok)
|
||||||
|
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
require.True(t, ss.Next())
|
||||||
|
s := ss.At()
|
||||||
|
require.False(t, ss.Next()) // One series.
|
||||||
|
metaIt := s.Iterator(nil)
|
||||||
|
require.True(t, metaIt.Next())
|
||||||
|
meta := metaIt.At()
|
||||||
|
// Samples
|
||||||
|
it := meta.Chunk.Iterator(nil)
|
||||||
|
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
||||||
|
require.Equal(t, minT, it.AtT()) // It is an in-order sample.
|
||||||
|
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
||||||
|
require.Equal(t, minT+50, it.AtT()) // it is an out-of-order sample.
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testQueryOOOHeadDuringTruncate(t *testing.T, makeQuerier func(db *DB, minT, maxT int64) (storage.LabelQuerier, error), verify func(t *testing.T, q storage.LabelQuerier, minT, maxT int64)) {
|
||||||
const maxT int64 = 6000
|
const maxT int64 = 6000
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
@ -3545,7 +3595,7 @@ func TestQueryOOOHeadDuringTruncate(t *testing.T) {
|
||||||
// Wait for the compaction to start.
|
// Wait for the compaction to start.
|
||||||
<-allowQueryToStart
|
<-allowQueryToStart
|
||||||
|
|
||||||
q, err := db.Querier(1500, 2500)
|
q, err := makeQuerier(db, 1500, 2500)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
queryStarted <- struct{}{} // Unblock the compaction.
|
queryStarted <- struct{}{} // Unblock the compaction.
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
@ -3562,17 +3612,7 @@ func TestQueryOOOHeadDuringTruncate(t *testing.T) {
|
||||||
require.Empty(t, annots)
|
require.Empty(t, annots)
|
||||||
require.Equal(t, []string{"b"}, res)
|
require.Equal(t, []string{"b"}, res)
|
||||||
|
|
||||||
// Samples
|
verify(t, q, 1500, 2500)
|
||||||
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
|
||||||
require.True(t, ss.Next())
|
|
||||||
s := ss.At()
|
|
||||||
require.False(t, ss.Next()) // One series.
|
|
||||||
it := s.Iterator(nil)
|
|
||||||
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
|
||||||
require.Equal(t, int64(1500), it.AtT()) // It is an in-order sample.
|
|
||||||
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
|
||||||
require.Equal(t, int64(1550), it.AtT()) // it is an out-of-order sample.
|
|
||||||
require.NoError(t, it.Err())
|
|
||||||
|
|
||||||
require.NoError(t, q.Close()) // Cannot be deferred as the compaction waits for queries to close before finishing.
|
require.NoError(t, q.Close()) // Cannot be deferred as the compaction waits for queries to close before finishing.
|
||||||
|
|
||||||
|
|
|
@ -586,15 +586,24 @@ func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIso
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
if q.querier == nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
if q.querier == nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
return q.querier.LabelNames(ctx, hints, matchers...)
|
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *HeadAndOOOChunkQuerier) Close() error {
|
func (q *HeadAndOOOChunkQuerier) Close() error {
|
||||||
q.chunkr.Close()
|
q.chunkr.Close()
|
||||||
|
if q.querier == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return q.querier.Close()
|
return q.querier.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue