mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
BUGFIX: TSDB: panic in query during truncation with OOO head (#14831)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Check if headQuerier is nil before trying to use it. * TestQueryOOOHeadDuringTruncate: unit test to check query during truncate Regression test for #14822 * Simulate race between query and Compact() Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
dbe40d8f57
commit
536d9f9ce9
|
@ -128,6 +128,7 @@ type Head struct {
|
||||||
writeNotified wlog.WriteNotified
|
writeNotified wlog.WriteNotified
|
||||||
|
|
||||||
memTruncationInProcess atomic.Bool
|
memTruncationInProcess atomic.Bool
|
||||||
|
memTruncationCallBack func() // For testing purposes.
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExemplarStorage interface {
|
type ExemplarStorage interface {
|
||||||
|
@ -1129,6 +1130,10 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
||||||
h.memTruncationInProcess.Store(true)
|
h.memTruncationInProcess.Store(true)
|
||||||
defer h.memTruncationInProcess.Store(false)
|
defer h.memTruncationInProcess.Store(false)
|
||||||
|
|
||||||
|
if h.memTruncationCallBack != nil {
|
||||||
|
h.memTruncationCallBack()
|
||||||
|
}
|
||||||
|
|
||||||
// We wait for pending queries to end that overlap with this truncation.
|
// We wait for pending queries to end that overlap with this truncation.
|
||||||
if initialized {
|
if initialized {
|
||||||
h.WaitForPendingReadersInTimeRange(h.MinTime(), mint)
|
h.WaitForPendingReadersInTimeRange(h.MinTime(), mint)
|
||||||
|
|
|
@ -3492,6 +3492,93 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestQueryOOOHeadDuringTruncate(t *testing.T) {
|
||||||
|
const maxT int64 = 6000
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
opts := DefaultOptions()
|
||||||
|
opts.EnableNativeHistograms = true
|
||||||
|
opts.OutOfOrderTimeWindow = maxT
|
||||||
|
opts.MinBlockDuration = maxT / 2 // So that head will compact up to 3000.
|
||||||
|
|
||||||
|
db, err := Open(dir, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
db.DisableCompactions()
|
||||||
|
|
||||||
|
var (
|
||||||
|
ref = storage.SeriesRef(0)
|
||||||
|
app = db.Appender(context.Background())
|
||||||
|
)
|
||||||
|
// Add in-order samples at every 100ms starting at 0ms.
|
||||||
|
for i := int64(0); i < maxT; i += 100 {
|
||||||
|
_, err := app.Append(ref, labels.FromStrings("a", "b"), i, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
// Add out-of-order samples at every 100ms starting at 50ms.
|
||||||
|
for i := int64(50); i < maxT; i += 100 {
|
||||||
|
_, err := app.Append(ref, labels.FromStrings("a", "b"), i, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
requireEqualOOOSamples(t, int(maxT/100-1), db)
|
||||||
|
|
||||||
|
// Synchronization points.
|
||||||
|
allowQueryToStart := make(chan struct{})
|
||||||
|
queryStarted := make(chan struct{})
|
||||||
|
compactionFinished := make(chan struct{})
|
||||||
|
|
||||||
|
db.head.memTruncationCallBack = func() {
|
||||||
|
// Compaction has started, let the query start and wait for it to actually start to simulate race condition.
|
||||||
|
allowQueryToStart <- struct{}{}
|
||||||
|
<-queryStarted
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
db.Compact(context.Background()) // Compact and write blocks up to 3000 (maxtT/2).
|
||||||
|
compactionFinished <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the compaction to start.
|
||||||
|
<-allowQueryToStart
|
||||||
|
|
||||||
|
q, err := db.Querier(1500, 2500)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryStarted <- struct{}{} // Unblock the compaction.
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Label names.
|
||||||
|
res, annots, err := q.LabelNames(ctx, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, annots)
|
||||||
|
require.Equal(t, []string{"a"}, res)
|
||||||
|
|
||||||
|
// Label values.
|
||||||
|
res, annots, err = q.LabelValues(ctx, "a", nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, annots)
|
||||||
|
require.Equal(t, []string{"b"}, res)
|
||||||
|
|
||||||
|
// Samples
|
||||||
|
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
require.True(t, ss.Next())
|
||||||
|
s := ss.At()
|
||||||
|
require.False(t, ss.Next()) // One series.
|
||||||
|
it := s.Iterator(nil)
|
||||||
|
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
||||||
|
require.Equal(t, int64(1500), it.AtT()) // It is an in-order sample.
|
||||||
|
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
|
||||||
|
require.Equal(t, int64(1550), it.AtT()) // it is an out-of-order sample.
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
|
||||||
|
require.NoError(t, q.Close()) // Cannot be deferred as the compaction waits for queries to close before finishing.
|
||||||
|
|
||||||
|
<-compactionFinished // Wait for compaction otherwise Go test finds stray goroutines.
|
||||||
|
}
|
||||||
|
|
||||||
func TestAppendHistogram(t *testing.T) {
|
func TestAppendHistogram(t *testing.T) {
|
||||||
l := labels.FromStrings("a", "b")
|
l := labels.FromStrings("a", "b")
|
||||||
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
|
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
|
||||||
|
|
|
@ -513,7 +513,7 @@ type HeadAndOOOQuerier struct {
|
||||||
head *Head
|
head *Head
|
||||||
index IndexReader
|
index IndexReader
|
||||||
chunkr ChunkReader
|
chunkr ChunkReader
|
||||||
querier storage.Querier
|
querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end.
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
|
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
|
||||||
|
@ -534,15 +534,24 @@ func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolatio
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
if q.querier == nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
if q.querier == nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
return q.querier.LabelNames(ctx, hints, matchers...)
|
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *HeadAndOOOQuerier) Close() error {
|
func (q *HeadAndOOOQuerier) Close() error {
|
||||||
q.chunkr.Close()
|
q.chunkr.Close()
|
||||||
|
if q.querier == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return q.querier.Close()
|
return q.querier.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue