From 73684020545d78a5d8419c8aa510d1a15848ea64 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Wed, 10 Aug 2022 17:33:36 +0200 Subject: [PATCH 1/4] Fix OOO Head LabelValues and PostingsForMatchers OOOHeadIndexReader was using the headIndexReader PostingsForMatchers() and LabelValues() implementation which lead to a very subtle bug that led to wrong query results. headIndexReader LabelValues() implementation checks if the query timerange overlaps with the head maxt and mint and if it doesnt it returns an empty list of values. Since this code was also used by the ooo head it led to wrong results that we were not able to see in tests because our queries where always from MinInt64 to MaxInt64. This commit also adds a new test that performs multiple time range queries to make sure this never happens again. Signed-off-by: Jesus Vazquez --- tsdb/db_test.go | 44 +++++++++++++++++++++++++++++-------------- tsdb/ooo_head_read.go | 21 +++++++++++++++++++++ 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 50b4a0e1e8..f1bdc7d347 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4196,7 +4196,7 @@ func TestOOOAppendAndQuery(t *testing.T) { s2 := labels.FromStrings("foo", "bar2") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - expSamples := make(map[string][]tsdbutil.Sample) + appendedSamples := make(map[string][]tsdbutil.Sample) totalSamples := 0 addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) { app := db.Appender(context.Background()) @@ -4209,7 +4209,7 @@ func TestOOOAppendAndQuery(t *testing.T) { require.Error(t, err) } else { require.NoError(t, err) - expSamples[key] = append(expSamples[key], sample{t: min, v: val}) + appendedSamples[key] = append(appendedSamples[key], sample{t: min, v: val}) totalSamples++ } } @@ -4220,17 +4220,30 @@ func TestOOOAppendAndQuery(t *testing.T) { } } - testQuery := func() { - querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + testQuery := func(from, to int64) { + querier, err := db.Querier(context.TODO(), from, to) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) - for k, v := range expSamples { + for k, v := range appendedSamples { sort.Slice(v, func(i, j int) bool { return v[i].T() < v[j].T() }) - expSamples[k] = v + appendedSamples[k] = v + } + + expSamples := make(map[string][]tsdbutil.Sample) + for k, samples := range appendedSamples { + for _, s := range samples { + if s.T() < from { + continue + } + if s.T() > to { + continue + } + expSamples[k] = append(expSamples[k], s) + } } require.Equal(t, expSamples, seriesSet) require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") @@ -4245,40 +4258,43 @@ func TestOOOAppendAndQuery(t *testing.T) { addSample(s1, 300, 300, false) addSample(s2, 290, 290, false) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Some ooo samples. addSample(s1, 250, 260, false) addSample(s2, 255, 265, false) verifyOOOMinMaxTimes(250, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) + testQuery(minutes(250), minutes(265)) // Test querying ono data time range + testQuery(minutes(290), minutes(300)) // Test querying in-order data time range + testQuery(minutes(250), minutes(300)) // Test querying the entire range // Out of time window. addSample(s1, 59, 59, true) addSample(s2, 49, 49, true) verifyOOOMinMaxTimes(250, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // At the edge of time window, also it would be "out of bound" without the ooo support. addSample(s1, 60, 65, false) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // This sample is not within the time window w.r.t. the head's maxt, but it is within the window // w.r.t. the series' maxt. But we consider only head's maxt. addSample(s2, 59, 59, true) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Now the sample is within time window w.r.t. the head's maxt. addSample(s2, 60, 65, false) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Out of time window again. addSample(s1, 59, 59, true) addSample(s2, 49, 49, true) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) // Generating some m-map chunks. The m-map chunks here are in such a way // that when sorted w.r.t. mint, the last chunk's maxt is not the overall maxt @@ -4287,7 +4303,7 @@ func TestOOOAppendAndQuery(t *testing.T) { addSample(s1, 180, 249, false) require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) verifyOOOMinMaxTimes(60, 265) - testQuery() + testQuery(math.MinInt64, math.MaxInt64) } func TestOOODisabled(t *testing.T) { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 2261d59152..aa1293585e 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -134,6 +134,27 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, lbls *labels.Labels, return nil } +// PostingsForMatchers needs to be overridden so that the right IndexReader +// implementation gets passed down to the PostingsForMatchers call. +func (oh *OOOHeadIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + return oh.head.pfmc.PostingsForMatchers(oh, concurrent, ms...) +} + +// LabelValues needs to be overridden from the headIndexReader implementation due +// to the check that happens at the beginning where we make sure that the query +// interval overlaps with the head minooot and maxooot. +func (oh *OOOHeadIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { + return []string{}, nil + } + + if len(matchers) == 0 { + return oh.head.postings.LabelValues(name), nil + } + + return labelValuesWithMatchers(oh, name, matchers...) +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef From 347305807316034bea4b49c657013cf70e37dc6d Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Wed, 10 Aug 2022 23:36:02 +0200 Subject: [PATCH 2/4] Start writing OOOHeadIndexReader LabelValues test Signed-off-by: Jesus Vazquez --- tsdb/head_test.go | 79 ++++++++++++++++++++------------------ tsdb/ooo_head_read_test.go | 38 +++++++++++++++++- 2 files changed, 78 insertions(+), 39 deletions(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index c3a4a7ce92..a8dbe9b371 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -49,7 +49,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wal" ) -func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL) { +func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wal.WAL) { dir := t.TempDir() wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) @@ -59,6 +59,9 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + if oooEnabled { + opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) + } h, err := NewHead(nil, nil, wlog, nil, opts, nil) require.NoError(t, err) @@ -72,7 +75,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _ := newTestHead(b, 10000, false) + h, _ := newTestHead(b, 10000, false, false) defer func() { require.NoError(b, h.Close()) }() @@ -270,7 +273,7 @@ func BenchmarkLoadWAL(b *testing.B) { // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // returned results are correct. func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, false) + head, _ := newTestHead(t, DefaultBlockDuration, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -486,7 +489,7 @@ func TestHead_ReadWAL(t *testing.T) { }, } - head, w := newTestHead(t, 1000, compress) + head, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, head.Close()) }() @@ -530,7 +533,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w := newTestHead(t, 1000, false) + head, w := newTestHead(t, 1000, false, false) require.NoError(t, head.Init(0)) @@ -590,7 +593,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_ActiveAppenders(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer head.Close() require.NoError(t, head.Init(0)) @@ -623,14 +626,14 @@ func TestHead_ActiveAppenders(t *testing.T) { } func TestHead_UnknownWALRecord(t *testing.T) { - head, w := newTestHead(t, 1000, false) + head, w := newTestHead(t, 1000, false, false) w.Log([]byte{255, 42}) require.NoError(t, head.Init(0)) require.NoError(t, head.Close()) } func TestHead_Truncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -789,7 +792,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - head, w := newTestHead(t, 1000, compress) + head, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, head.Close()) }() @@ -856,7 +859,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { for _, c := range cases { - head, w := newTestHead(t, 1000, compress) + head, w := newTestHead(t, 1000, compress, false) require.NoError(t, head.Init(0)) app := head.Appender(context.Background()) @@ -937,7 +940,7 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { - hb, _ := newTestHead(t, 1000000, false) + hb, _ := newTestHead(t, 1000000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -990,7 +993,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { numSamples := 10000 // Enough samples to cause a checkpoint. - hb, w := newTestHead(t, int64(numSamples)*10, false) + hb, w := newTestHead(t, int64(numSamples)*10, false, false) for i := 0; i < numSamples; i++ { app := hb.Appender(context.Background()) @@ -1079,7 +1082,7 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - hb, _ := newTestHead(t, 100000, false) + hb, _ := newTestHead(t, 100000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1360,7 +1363,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1414,7 +1417,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1469,7 +1472,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1499,7 +1502,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1532,7 +1535,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestHead_LogRollback(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { - h, w := newTestHead(t, 1000, compress) + h, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, h.Close()) }() @@ -1726,7 +1729,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wlog := newTestHead(t, 1000, false) + h, wlog := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1756,7 +1759,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1839,7 +1842,7 @@ func TestMemSeriesIsolation(t *testing.T) { } // Test isolation without restart of Head. - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) i := addSamples(hb) testIsolation(hb, i) @@ -1901,7 +1904,7 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, hb.Close()) // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. - hb, w := newTestHead(t, 1000, false) + hb, w := newTestHead(t, 1000, false, false) i = addSamples(hb) require.NoError(t, hb.Close()) @@ -1954,7 +1957,7 @@ func TestIsolationRollback(t *testing.T) { } // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1985,7 +1988,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2022,7 +2025,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -2047,7 +2050,7 @@ func TestIsolationWithoutAdd(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2142,7 +2145,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { } func testHeadSeriesChunkRace(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -2177,7 +2180,7 @@ func testHeadSeriesChunkRace(t *testing.T) { } func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2237,7 +2240,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { } func TestHeadLabelValuesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) app := head.Appender(context.Background()) @@ -2296,7 +2299,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } func TestHeadLabelNamesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2364,7 +2367,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { } func TestHeadShardedPostings(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2428,7 +2431,7 @@ func TestHeadShardedPostings(t *testing.T) { } func TestErrReuseAppender(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, head.Close()) }() @@ -2464,7 +2467,7 @@ func TestErrReuseAppender(t *testing.T) { func TestHeadMintAfterTruncation(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false) + head, _ := newTestHead(t, chunkRange, false, false) app := head.Appender(context.Background()) _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) @@ -2498,7 +2501,7 @@ func TestHeadMintAfterTruncation(t *testing.T) { func TestHeadExemplars(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false) + head, _ := newTestHead(t, chunkRange, false, false) app := head.Appender(context.Background()) l := labels.FromStrings("traceId", "123") @@ -2520,7 +2523,7 @@ func TestHeadExemplars(t *testing.T) { func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { chunkRange := int64(2000) - head, _ := newTestHead(b, chunkRange, false) + head, _ := newTestHead(b, chunkRange, false, false) b.Cleanup(func() { require.NoError(b, head.Close()) }) app := head.Appender(context.Background()) @@ -2832,7 +2835,7 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) { } func TestChunkSnapshot(t *testing.T) { - head, _ := newTestHead(t, 120*4, false) + head, _ := newTestHead(t, 120*4, false, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -3074,7 +3077,7 @@ func TestChunkSnapshot(t *testing.T) { } func TestSnapshotError(t *testing.T) { - head, _ := newTestHead(t, 120*4, false) + head, _ := newTestHead(t, 120*4, false, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -3407,7 +3410,7 @@ func TestOOOMmapReplay(t *testing.T) { } func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, false) defer func() { require.NoError(t, h.Close()) }() diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 3262651bb7..8c3a52a1d6 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "fmt" + "math" "sort" "testing" "time" @@ -280,7 +281,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { for perm, intervals := range permutations { for _, headChunk := range []bool{false, true} { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { - h, _ := newTestHead(t, 1000, false) + h, _ := newTestHead(t, 1000, false, true) defer func() { require.NoError(t, h.Close()) }() @@ -357,6 +358,41 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { } } +func TestOOOHeadChunkReader_LabelValues(t *testing.T) { + chunkRange := int64(2000) + head, _ := newTestHead(t, chunkRange, false, true) + t.Cleanup(func() { require.NoError(t, head.Close()) }) + + app := head.Appender(context.Background()) + + metricCount := 100 + for i := 0; i < metricCount; i++ { + _, err := app.Append(0, labels.Labels{ + {Name: "a_unique", Value: fmt.Sprintf("value%d", i)}, + {Name: "b_tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))}, + }, 100, 0) // TODO change timestamp to have some in order and some out of order up to 10 minutes + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + oh := NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64) + + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a_unique", "value0")} + values, err := oh.LabelValues("a_unique", matchers...) + require.NoError(t, err) + require.Equal(t, []string{"value0"}, values) + + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "a_unique", "^value.*")} + values, err = oh.LabelValues("a_unique", matchers...) + require.NoError(t, err) + require.Equal(t, []string{}, values) + + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "b_tens", "value.")} + values, err = oh.LabelValues("b_tens", matchers...) + require.NoError(t, err) + require.Equal(t, []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, values) +} + // TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected. // It does so by appending out of order samples to the db and then initializing // an OOOHeadChunkReader to read chunks from it. From 14ec85d4d2f7c0819fe6938580bea2c8736aa431 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Thu, 11 Aug 2022 10:29:06 +0200 Subject: [PATCH 3/4] Fix LabelValues test --- tsdb/ooo_head_read_test.go | 45 ++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 8c3a52a1d6..12a28c1bb6 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -365,32 +365,43 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { app := head.Appender(context.Background()) - metricCount := 100 - for i := 0; i < metricCount; i++ { - _, err := app.Append(0, labels.Labels{ - {Name: "a_unique", Value: fmt.Sprintf("value%d", i)}, - {Name: "b_tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))}, - }, 100, 0) // TODO change timestamp to have some in order and some out of order up to 10 minutes - require.NoError(t, err) - } + // Add in-order samples + _, err := app.Append(0, labels.Labels{ + {Name: "foo", Value: fmt.Sprintf("bar1")}, + }, 100, 1) + require.NoError(t, err) + _, err = app.Append(0, labels.Labels{ + {Name: "foo", Value: fmt.Sprintf("bar2")}, + }, 100, 2) + require.NoError(t, err) + + // Add ooo samples for those series + _, err = app.Append(0, labels.Labels{ + {Name: "foo", Value: fmt.Sprintf("bar1")}, + }, 90, 1) + require.NoError(t, err) + _, err = app.Append(0, labels.Labels{ + {Name: "foo", Value: fmt.Sprintf("bar2")}, + }, 90, 2) + require.NoError(t, err) + require.NoError(t, app.Commit()) oh := NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64) - - matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a_unique", "value0")} - values, err := oh.LabelValues("a_unique", matchers...) + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} + values, err := oh.LabelValues("foo", matchers...) require.NoError(t, err) - require.Equal(t, []string{"value0"}, values) + require.Equal(t, []string{"bar1"}, values) - matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "a_unique", "^value.*")} - values, err = oh.LabelValues("a_unique", matchers...) + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")} + values, err = oh.LabelValues("foo", matchers...) require.NoError(t, err) require.Equal(t, []string{}, values) - matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "b_tens", "value.")} - values, err = oh.LabelValues("b_tens", matchers...) + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")} + values, err = oh.LabelValues("foo", matchers...) require.NoError(t, err) - require.Equal(t, []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, values) + require.Equal(t, []string{"bar1", "bar2"}, values) } // TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected. From 6bff0d91138c1f3af25662144b3a3d921fa8ce87 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Thu, 11 Aug 2022 15:49:58 +0200 Subject: [PATCH 4/4] Add suggestions to labelValues test Signed-off-by: Jesus Vazquez --- tsdb/ooo_head_read_test.go | 81 ++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 12a28c1bb6..c7c27f763c 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -367,41 +367,88 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { // Add in-order samples _, err := app.Append(0, labels.Labels{ - {Name: "foo", Value: fmt.Sprintf("bar1")}, + {Name: "foo", Value: "bar1"}, }, 100, 1) require.NoError(t, err) _, err = app.Append(0, labels.Labels{ - {Name: "foo", Value: fmt.Sprintf("bar2")}, + {Name: "foo", Value: "bar2"}, }, 100, 2) require.NoError(t, err) // Add ooo samples for those series _, err = app.Append(0, labels.Labels{ - {Name: "foo", Value: fmt.Sprintf("bar1")}, + {Name: "foo", Value: "bar1"}, }, 90, 1) require.NoError(t, err) _, err = app.Append(0, labels.Labels{ - {Name: "foo", Value: fmt.Sprintf("bar2")}, + {Name: "foo", Value: "bar2"}, }, 90, 2) require.NoError(t, err) require.NoError(t, app.Commit()) - oh := NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64) - matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} - values, err := oh.LabelValues("foo", matchers...) - require.NoError(t, err) - require.Equal(t, []string{"bar1"}, values) + cases := []struct { + name string + queryMinT int64 + queryMaxT int64 + expValues1 []string + expValues2 []string + expValues3 []string + expValues4 []string + }{ + { + name: "LabelValues calls when ooo head has max query range", + queryMinT: math.MinInt64, + queryMaxT: math.MaxInt64, + expValues1: []string{"bar1"}, + expValues2: []string{}, + expValues3: []string{"bar1", "bar2"}, + expValues4: []string{"bar1", "bar2"}, + }, + { + name: "LabelValues calls with ooo head query range not overlapping in-order data", + queryMinT: 90, + queryMaxT: 90, + expValues1: []string{"bar1"}, + expValues2: []string{}, + expValues3: []string{"bar1", "bar2"}, + expValues4: []string{"bar1", "bar2"}, + }, + { + name: "LabelValues calls with ooo head query range not overlapping out-of-order data", + queryMinT: 100, + queryMaxT: 100, + expValues1: []string{}, + expValues2: []string{}, + expValues3: []string{}, + expValues4: []string{}, + }, + } - matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")} - values, err = oh.LabelValues("foo", matchers...) - require.NoError(t, err) - require.Equal(t, []string{}, values) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // We first want to test using a head index reader that covers the biggest query interval + oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT) + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} + values, err := oh.LabelValues("foo", matchers...) + require.NoError(t, err) + require.Equal(t, tc.expValues1, values) - matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")} - values, err = oh.LabelValues("foo", matchers...) - require.NoError(t, err) - require.Equal(t, []string{"bar1", "bar2"}, values) + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")} + values, err = oh.LabelValues("foo", matchers...) + require.NoError(t, err) + require.Equal(t, tc.expValues2, values) + + matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")} + values, err = oh.LabelValues("foo", matchers...) + require.NoError(t, err) + require.Equal(t, tc.expValues3, values) + + values, err = oh.LabelValues("foo") + require.NoError(t, err) + require.Equal(t, tc.expValues4, values) + }) + } } // TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.