diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 50b4a0e1e8..f1bdc7d347 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4196,7 +4196,7 @@ func TestOOOAppendAndQuery(t *testing.T) { s2 := labels.FromStrings("foo", "bar2") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - expSamples := make(map[string][]tsdbutil.Sample) + appendedSamples := make(map[string][]tsdbutil.Sample) totalSamples := 0 addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) { app := db.Appender(context.Background()) @@ -4209,7 +4209,7 @@ func TestOOOAppendAndQuery(t *testing.T) { require.Error(t, err) } else { require.NoError(t, err) - expSamples[key] = append(expSamples[key], sample{t: min, v: val}) + appendedSamples[key] = append(appendedSamples[key], sample{t: min, v: val}) totalSamples++ } } @@ -4220,17 +4220,30 @@ func TestOOOAppendAndQuery(t *testing.T) { } } - testQuery := func() { - querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + testQuery := func(from, to int64) { + querier, err := db.Querier(context.TODO(), from, to) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) - for k, v := range expSamples { + for k, v := range appendedSamples { sort.Slice(v, func(i, j int) bool { return v[i].T() < v[j].T() }) - expSamples[k] = v + appendedSamples[k] = v + } + + expSamples := make(map[string][]tsdbutil.Sample) + for k, samples := range appendedSamples { + for _, s := range samples { + if s.T() < from { + continue + } + if s.T() > to { + continue + } + expSamples[k] = append(expSamples[k], s) + } } require.Equal(t, expSamples, seriesSet) require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") @@ -4245,40 +4258,43 @@ func TestOOOAppendAndQuery(t *testing.T) { addSample(s1, 300, 300, false) addSample(s2, 290, 290, false) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Some ooo samples. addSample(s1, 250, 260, false) addSample(s2, 255, 265, false) verifyOOOMinMaxTimes(250, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) + testQuery(minutes(250), minutes(265)) // Test querying ono data time range + testQuery(minutes(290), minutes(300)) // Test querying in-order data time range + testQuery(minutes(250), minutes(300)) // Test querying the entire range // Out of time window. addSample(s1, 59, 59, true) addSample(s2, 49, 49, true) verifyOOOMinMaxTimes(250, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // At the edge of time window, also it would be "out of bound" without the ooo support. addSample(s1, 60, 65, false) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // This sample is not within the time window w.r.t. the head's maxt, but it is within the window // w.r.t. the series' maxt. But we consider only head's maxt. addSample(s2, 59, 59, true) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Now the sample is within time window w.r.t. the head's maxt. addSample(s2, 60, 65, false) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Out of time window again. addSample(s1, 59, 59, true) addSample(s2, 49, 49, true) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Generating some m-map chunks. The m-map chunks here are in such a way // that when sorted w.r.t. mint, the last chunk's maxt is not the overall maxt @@ -4287,7 +4303,7 @@ func TestOOOAppendAndQuery(t *testing.T) { addSample(s1, 180, 249, false) require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) } func TestOOODisabled(t *testing.T) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index c3a4a7ce92..a8dbe9b371 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -49,7 +49,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wal" ) -func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL) { +func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wal.WAL) { dir := t.TempDir() wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) @@ -59,6 +59,9 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + if oooEnabled { + opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) + } h, err := NewHead(nil, nil, wlog, nil, opts, nil) require.NoError(t, err) @@ -72,7 +75,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _ := newTestHead(b, 10000, false) + h, _ := newTestHead(b, 10000, false, false) defer func() { require.NoError(b, h.Close()) }() @@ -270,7 +273,7 @@ func BenchmarkLoadWAL(b *testing.B) { // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // returned results are correct. func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, false) + head, _ := newTestHead(t, DefaultBlockDuration, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -486,7 +489,7 @@ func TestHead_ReadWAL(t *testing.T) { }, } - head, w := newTestHead(t, 1000, compress) + head, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, head.Close()) }() @@ -530,7 +533,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w := newTestHead(t, 1000, false) + head, w := newTestHead(t, 1000, false, false) require.NoError(t, head.Init(0)) @@ -590,7 +593,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_ActiveAppenders(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer head.Close() require.NoError(t, head.Init(0)) @@ -623,14 +626,14 @@ func TestHead_ActiveAppenders(t *testing.T) { } func TestHead_UnknownWALRecord(t *testing.T) { - head, w := newTestHead(t, 1000, false) + head, w := newTestHead(t, 1000, false, false) w.Log([]byte{255, 42}) require.NoError(t, head.Init(0)) require.NoError(t, head.Close()) } func TestHead_Truncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -789,7 +792,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - head, w := newTestHead(t, 1000, compress) + head, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, head.Close()) }() @@ -856,7 +859,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { for _, c := range cases { - head, w := newTestHead(t, 1000, compress) + head, w := newTestHead(t, 1000, compress, false) require.NoError(t, head.Init(0)) app := head.Appender(context.Background()) @@ -937,7 +940,7 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { - hb, _ := newTestHead(t, 1000000, false) + hb, _ := newTestHead(t, 1000000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -990,7 +993,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { numSamples := 10000 // Enough samples to cause a checkpoint. - hb, w := newTestHead(t, int64(numSamples)*10, false) + hb, w := newTestHead(t, int64(numSamples)*10, false, false) for i := 0; i < numSamples; i++ { app := hb.Appender(context.Background()) @@ -1079,7 +1082,7 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - hb, _ := newTestHead(t, 100000, false) + hb, _ := newTestHead(t, 100000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1360,7 +1363,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1414,7 +1417,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1469,7 +1472,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1499,7 +1502,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1532,7 +1535,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestHead_LogRollback(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { - h, w := newTestHead(t, 1000, compress) + h, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, h.Close()) }() @@ -1726,7 +1729,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wlog := newTestHead(t, 1000, false) + h, wlog := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1756,7 +1759,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1839,7 +1842,7 @@ func TestMemSeriesIsolation(t *testing.T) { } // Test isolation without restart of Head. - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) i := addSamples(hb) testIsolation(hb, i) @@ -1901,7 +1904,7 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, hb.Close()) // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. - hb, w := newTestHead(t, 1000, false) + hb, w := newTestHead(t, 1000, false, false) i = addSamples(hb) require.NoError(t, hb.Close()) @@ -1954,7 +1957,7 @@ func TestIsolationRollback(t *testing.T) { } // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1985,7 +1988,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2022,7 +2025,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -2047,7 +2050,7 @@ func TestIsolationWithoutAdd(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2142,7 +2145,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { } func testHeadSeriesChunkRace(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -2177,7 +2180,7 @@ func testHeadSeriesChunkRace(t *testing.T) { } func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2237,7 +2240,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { } func TestHeadLabelValuesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) app := head.Appender(context.Background()) @@ -2296,7 +2299,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } func TestHeadLabelNamesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2364,7 +2367,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { } func TestHeadShardedPostings(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2428,7 +2431,7 @@ func TestHeadShardedPostings(t *testing.T) { } func TestErrReuseAppender(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2464,7 +2467,7 @@ func TestErrReuseAppender(t *testing.T) { func TestHeadMintAfterTruncation(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false) + head, _ := newTestHead(t, chunkRange, false, false) app := head.Appender(context.Background()) _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) @@ -2498,7 +2501,7 @@ func TestHeadMintAfterTruncation(t *testing.T) { func TestHeadExemplars(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false) + head, _ := newTestHead(t, chunkRange, false, false) app := head.Appender(context.Background()) l := labels.FromStrings("traceId", "123") @@ -2520,7 +2523,7 @@ func TestHeadExemplars(t *testing.T) { func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { chunkRange := int64(2000) - head, _ := newTestHead(b, chunkRange, false) + head, _ := newTestHead(b, chunkRange, false, false) b.Cleanup(func() { require.NoError(b, head.Close()) }) app := head.Appender(context.Background()) @@ -2832,7 +2835,7 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) { } func TestChunkSnapshot(t *testing.T) { - head, _ := newTestHead(t, 120*4, false) + head, _ := newTestHead(t, 120*4, false, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -3074,7 +3077,7 @@ func TestChunkSnapshot(t *testing.T) { } func TestSnapshotError(t *testing.T) { - head, _ := newTestHead(t, 120*4, false) + head, _ := newTestHead(t, 120*4, false, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -3407,7 +3410,7 @@ func TestOOOMmapReplay(t *testing.T) { } func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 2261d59152..aa1293585e 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -134,6 +134,27 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, lbls *labels.Labels, return nil } +// PostingsForMatchers needs to be overridden so that the right IndexReader +// implementation gets passed down to the PostingsForMatchers call. +func (oh *OOOHeadIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + return oh.head.pfmc.PostingsForMatchers(oh, concurrent, ms...) +} + +// LabelValues needs to be overridden from the headIndexReader implementation due +// to the check that happens at the beginning where we make sure that the query +// interval overlaps with the head minooot and maxooot. +func (oh *OOOHeadIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { + return []string{}, nil + } + + if len(matchers) == 0 { + return oh.head.postings.LabelValues(name), nil + } + + return labelValuesWithMatchers(oh, name, matchers...) +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 3262651bb7..c7c27f763c 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "fmt" + "math" "sort" "testing" "time" @@ -280,7 +281,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { for perm, intervals := range permutations { for _, headChunk := range []bool{false, true} { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, true) defer func() { require.NoError(t, h.Close()) }() @@ -357,6 +358,99 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { } } +func TestOOOHeadChunkReader_LabelValues(t *testing.T) { + chunkRange := int64(2000) + head, _ := newTestHead(t, chunkRange, false, true) + t.Cleanup(func() { require.NoError(t, head.Close()) }) + + app := head.Appender(context.Background()) + + // Add in-order samples + _, err := app.Append(0, labels.Labels{ + {Name: "foo", Value: "bar1"}, + }, 100, 1) + require.NoError(t, err) + _, err = app.Append(0, labels.Labels{ + {Name: "foo", Value: "bar2"}, + }, 100, 2) + require.NoError(t, err) + + // Add ooo samples for those series + _, err = app.Append(0, labels.Labels{ + {Name: "foo", Value: "bar1"}, + }, 90, 1) + require.NoError(t, err) + _, err = app.Append(0, labels.Labels{ + {Name: "foo", Value: "bar2"}, + }, 90, 2) + require.NoError(t, err) + + require.NoError(t, app.Commit()) + + cases := []struct { + name string + queryMinT int64 + queryMaxT int64 + expValues1 []string + expValues2 []string + expValues3 []string + expValues4 []string + }{ + { + name: "LabelValues calls when ooo head has max query range", + queryMinT: math.MinInt64, + queryMaxT: math.MaxInt64, + expValues1: []string{"bar1"}, + expValues2: []string{}, + expValues3: []string{"bar1", "bar2"}, + expValues4: []string{"bar1", "bar2"}, + }, + { + name: "LabelValues calls with ooo head query range not overlapping in-order data", + queryMinT: 90, + queryMaxT: 90, + expValues1: []string{"bar1"}, + expValues2: []string{}, + expValues3: []string{"bar1", "bar2"}, + expValues4: []string{"bar1", "bar2"}, + }, + { + name: "LabelValues calls with ooo head query range not overlapping out-of-order data", + queryMinT: 100, + queryMaxT: 100, + expValues1: []string{}, + expValues2: []string{}, + expValues3: []string{}, + expValues4: []string{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // We first want to test using a head index reader that covers the biggest query interval + oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT) + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} + values, err := oh.LabelValues("foo", matchers...) + require.NoError(t, err) + require.Equal(t, tc.expValues1, values) + + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")} + values, err = oh.LabelValues("foo", matchers...) + require.NoError(t, err) + require.Equal(t, tc.expValues2, values) + + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")} + values, err = oh.LabelValues("foo", matchers...) + require.NoError(t, err) + require.Equal(t, tc.expValues3, values) + + values, err = oh.LabelValues("foo") + require.NoError(t, err) + require.Equal(t, tc.expValues4, values) + }) + } +} + // TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected. // It does so by appending out of order samples to the db and then initializing // an OOOHeadChunkReader to read chunks from it.