Merge pull request #314 from grafana/jvp/fix-ooo-head-postingsformatchers-and-labelvalues

Fix OOO Head LabelValues and PostingsForMatchers
This commit is contained in:
Jesus Vazquez 2022-08-11 16:23:28 +02:00 committed by GitHub
commit e9456018fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 187 additions and 53 deletions

View file

@ -4196,7 +4196,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
s2 := labels.FromStrings("foo", "bar2") s2 := labels.FromStrings("foo", "bar2")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
expSamples := make(map[string][]tsdbutil.Sample) appendedSamples := make(map[string][]tsdbutil.Sample)
totalSamples := 0 totalSamples := 0
addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) { addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) {
app := db.Appender(context.Background()) app := db.Appender(context.Background())
@ -4209,7 +4209,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
require.Error(t, err) require.Error(t, err)
} else { } else {
require.NoError(t, err) require.NoError(t, err)
expSamples[key] = append(expSamples[key], sample{t: min, v: val}) appendedSamples[key] = append(appendedSamples[key], sample{t: min, v: val})
totalSamples++ totalSamples++
} }
} }
@ -4220,17 +4220,30 @@ func TestOOOAppendAndQuery(t *testing.T) {
} }
} }
testQuery := func() { testQuery := func(from, to int64) {
querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64) querier, err := db.Querier(context.TODO(), from, to)
require.NoError(t, err) require.NoError(t, err)
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
for k, v := range expSamples { for k, v := range appendedSamples {
sort.Slice(v, func(i, j int) bool { sort.Slice(v, func(i, j int) bool {
return v[i].T() < v[j].T() return v[i].T() < v[j].T()
}) })
expSamples[k] = v appendedSamples[k] = v
}
expSamples := make(map[string][]tsdbutil.Sample)
for k, samples := range appendedSamples {
for _, s := range samples {
if s.T() < from {
continue
}
if s.T() > to {
continue
}
expSamples[k] = append(expSamples[k], s)
}
} }
require.Equal(t, expSamples, seriesSet) require.Equal(t, expSamples, seriesSet)
require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
@ -4245,40 +4258,43 @@ func TestOOOAppendAndQuery(t *testing.T) {
addSample(s1, 300, 300, false) addSample(s1, 300, 300, false)
addSample(s2, 290, 290, false) addSample(s2, 290, 290, false)
require.Equal(t, float64(2), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
testQuery() testQuery(math.MinInt64, math.MaxInt64)
// Some ooo samples. // Some ooo samples.
addSample(s1, 250, 260, false) addSample(s1, 250, 260, false)
addSample(s2, 255, 265, false) addSample(s2, 255, 265, false)
verifyOOOMinMaxTimes(250, 265) verifyOOOMinMaxTimes(250, 265)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
testQuery(minutes(250), minutes(265)) // Test querying ono data time range
testQuery(minutes(290), minutes(300)) // Test querying in-order data time range
testQuery(minutes(250), minutes(300)) // Test querying the entire range
// Out of time window. // Out of time window.
addSample(s1, 59, 59, true) addSample(s1, 59, 59, true)
addSample(s2, 49, 49, true) addSample(s2, 49, 49, true)
verifyOOOMinMaxTimes(250, 265) verifyOOOMinMaxTimes(250, 265)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
// At the edge of time window, also it would be "out of bound" without the ooo support. // At the edge of time window, also it would be "out of bound" without the ooo support.
addSample(s1, 60, 65, false) addSample(s1, 60, 65, false)
verifyOOOMinMaxTimes(60, 265) verifyOOOMinMaxTimes(60, 265)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
// This sample is not within the time window w.r.t. the head's maxt, but it is within the window // This sample is not within the time window w.r.t. the head's maxt, but it is within the window
// w.r.t. the series' maxt. But we consider only head's maxt. // w.r.t. the series' maxt. But we consider only head's maxt.
addSample(s2, 59, 59, true) addSample(s2, 59, 59, true)
verifyOOOMinMaxTimes(60, 265) verifyOOOMinMaxTimes(60, 265)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
// Now the sample is within time window w.r.t. the head's maxt. // Now the sample is within time window w.r.t. the head's maxt.
addSample(s2, 60, 65, false) addSample(s2, 60, 65, false)
verifyOOOMinMaxTimes(60, 265) verifyOOOMinMaxTimes(60, 265)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
// Out of time window again. // Out of time window again.
addSample(s1, 59, 59, true) addSample(s1, 59, 59, true)
addSample(s2, 49, 49, true) addSample(s2, 49, 49, true)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
// Generating some m-map chunks. The m-map chunks here are in such a way // Generating some m-map chunks. The m-map chunks here are in such a way
// that when sorted w.r.t. mint, the last chunk's maxt is not the overall maxt // that when sorted w.r.t. mint, the last chunk's maxt is not the overall maxt
@ -4287,7 +4303,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
addSample(s1, 180, 249, false) addSample(s1, 180, 249, false)
require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
verifyOOOMinMaxTimes(60, 265) verifyOOOMinMaxTimes(60, 265)
testQuery() testQuery(math.MinInt64, math.MaxInt64)
} }
func TestOOODisabled(t *testing.T) { func TestOOODisabled(t *testing.T) {

View file

@ -49,7 +49,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wal"
) )
func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL) { func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wal.WAL) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
require.NoError(t, err) require.NoError(t, err)
@ -59,6 +59,9 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
if oooEnabled {
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
}
h, err := NewHead(nil, nil, wlog, nil, opts, nil) h, err := NewHead(nil, nil, wlog, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
@ -72,7 +75,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
func BenchmarkCreateSeries(b *testing.B) { func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0) series := genSeries(b.N, 10, 0, 0)
h, _ := newTestHead(b, 10000, false) h, _ := newTestHead(b, 10000, false, false)
defer func() { defer func() {
require.NoError(b, h.Close()) require.NoError(b, h.Close())
}() }()
@ -270,7 +273,7 @@ func BenchmarkLoadWAL(b *testing.B) {
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
// returned results are correct. // returned results are correct.
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, false) head, _ := newTestHead(t, DefaultBlockDuration, false, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -486,7 +489,7 @@ func TestHead_ReadWAL(t *testing.T) {
}, },
} }
head, w := newTestHead(t, 1000, compress) head, w := newTestHead(t, 1000, compress, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -530,7 +533,7 @@ func TestHead_ReadWAL(t *testing.T) {
} }
func TestHead_WALMultiRef(t *testing.T) { func TestHead_WALMultiRef(t *testing.T) {
head, w := newTestHead(t, 1000, false) head, w := newTestHead(t, 1000, false, false)
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
@ -590,7 +593,7 @@ func TestHead_WALMultiRef(t *testing.T) {
} }
func TestHead_ActiveAppenders(t *testing.T) { func TestHead_ActiveAppenders(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false, false)
defer head.Close() defer head.Close()
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
@ -623,14 +626,14 @@ func TestHead_ActiveAppenders(t *testing.T) {
} }
func TestHead_UnknownWALRecord(t *testing.T) { func TestHead_UnknownWALRecord(t *testing.T) {
head, w := newTestHead(t, 1000, false) head, w := newTestHead(t, 1000, false, false)
w.Log([]byte{255, 42}) w.Log([]byte{255, 42})
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
require.NoError(t, head.Close()) require.NoError(t, head.Close())
} }
func TestHead_Truncate(t *testing.T) { func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -789,7 +792,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
{Ref: 50, T: 90, V: 1}, {Ref: 50, T: 90, V: 1},
}, },
} }
head, w := newTestHead(t, 1000, compress) head, w := newTestHead(t, 1000, compress, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -856,7 +859,7 @@ func TestHeadDeleteSimple(t *testing.T) {
for _, compress := range []bool{false, true} { for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
for _, c := range cases { for _, c := range cases {
head, w := newTestHead(t, 1000, compress) head, w := newTestHead(t, 1000, compress, false)
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
app := head.Appender(context.Background()) app := head.Appender(context.Background())
@ -937,7 +940,7 @@ func TestHeadDeleteSimple(t *testing.T) {
} }
func TestDeleteUntilCurMax(t *testing.T) { func TestDeleteUntilCurMax(t *testing.T) {
hb, _ := newTestHead(t, 1000000, false) hb, _ := newTestHead(t, 1000000, false, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
}() }()
@ -990,7 +993,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
numSamples := 10000 numSamples := 10000
// Enough samples to cause a checkpoint. // Enough samples to cause a checkpoint.
hb, w := newTestHead(t, int64(numSamples)*10, false) hb, w := newTestHead(t, int64(numSamples)*10, false, false)
for i := 0; i < numSamples; i++ { for i := 0; i < numSamples; i++ {
app := hb.Appender(context.Background()) app := hb.Appender(context.Background())
@ -1079,7 +1082,7 @@ func TestDelete_e2e(t *testing.T) {
seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
} }
hb, _ := newTestHead(t, 100000, false) hb, _ := newTestHead(t, 100000, false, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
}() }()
@ -1360,7 +1363,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
func TestGCChunkAccess(t *testing.T) { func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it. // Put a chunk, select it. GC it and then access it.
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1414,7 +1417,7 @@ func TestGCChunkAccess(t *testing.T) {
func TestGCSeriesAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it. // Put a series, select it. GC it and then access it.
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1469,7 +1472,7 @@ func TestGCSeriesAccess(t *testing.T) {
} }
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1499,7 +1502,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
} }
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1532,7 +1535,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
func TestHead_LogRollback(t *testing.T) { func TestHead_LogRollback(t *testing.T) {
for _, compress := range []bool{false, true} { for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
h, w := newTestHead(t, 1000, compress) h, w := newTestHead(t, 1000, compress, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1726,7 +1729,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
} }
func TestNewWalSegmentOnTruncate(t *testing.T) { func TestNewWalSegmentOnTruncate(t *testing.T) {
h, wlog := newTestHead(t, 1000, false) h, wlog := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1756,7 +1759,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
} }
func TestAddDuplicateLabelName(t *testing.T) { func TestAddDuplicateLabelName(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1839,7 +1842,7 @@ func TestMemSeriesIsolation(t *testing.T) {
} }
// Test isolation without restart of Head. // Test isolation without restart of Head.
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false, false)
i := addSamples(hb) i := addSamples(hb)
testIsolation(hb, i) testIsolation(hb, i)
@ -1901,7 +1904,7 @@ func TestMemSeriesIsolation(t *testing.T) {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
// Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay.
hb, w := newTestHead(t, 1000, false) hb, w := newTestHead(t, 1000, false, false)
i = addSamples(hb) i = addSamples(hb)
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
@ -1954,7 +1957,7 @@ func TestIsolationRollback(t *testing.T) {
} }
// Rollback after a failed append and test if the low watermark has progressed anyway. // Rollback after a failed append and test if the low watermark has progressed anyway.
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
}() }()
@ -1985,7 +1988,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled") t.Skip("skipping test since tsdb isolation is disabled")
} }
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
}() }()
@ -2022,7 +2025,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled") t.Skip("skipping test since tsdb isolation is disabled")
} }
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -2047,7 +2050,7 @@ func TestIsolationWithoutAdd(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled") t.Skip("skipping test since tsdb isolation is disabled")
} }
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
}() }()
@ -2142,7 +2145,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
} }
func testHeadSeriesChunkRace(t *testing.T) { func testHeadSeriesChunkRace(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -2177,7 +2180,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
} }
func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -2237,7 +2240,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
} }
func TestHeadLabelValuesWithMatchers(t *testing.T) { func TestHeadLabelValuesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() { require.NoError(t, head.Close()) }) t.Cleanup(func() { require.NoError(t, head.Close()) })
app := head.Appender(context.Background()) app := head.Appender(context.Background())
@ -2296,7 +2299,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
} }
func TestHeadLabelNamesWithMatchers(t *testing.T) { func TestHeadLabelNamesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -2364,7 +2367,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) {
} }
func TestHeadShardedPostings(t *testing.T) { func TestHeadShardedPostings(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -2428,7 +2431,7 @@ func TestHeadShardedPostings(t *testing.T) {
} }
func TestErrReuseAppender(t *testing.T) { func TestErrReuseAppender(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
@ -2464,7 +2467,7 @@ func TestErrReuseAppender(t *testing.T) {
func TestHeadMintAfterTruncation(t *testing.T) { func TestHeadMintAfterTruncation(t *testing.T) {
chunkRange := int64(2000) chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, false) head, _ := newTestHead(t, chunkRange, false, false)
app := head.Appender(context.Background()) app := head.Appender(context.Background())
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100)
@ -2498,7 +2501,7 @@ func TestHeadMintAfterTruncation(t *testing.T) {
func TestHeadExemplars(t *testing.T) { func TestHeadExemplars(t *testing.T) {
chunkRange := int64(2000) chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, false) head, _ := newTestHead(t, chunkRange, false, false)
app := head.Appender(context.Background()) app := head.Appender(context.Background())
l := labels.FromStrings("traceId", "123") l := labels.FromStrings("traceId", "123")
@ -2520,7 +2523,7 @@ func TestHeadExemplars(t *testing.T) {
func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
chunkRange := int64(2000) chunkRange := int64(2000)
head, _ := newTestHead(b, chunkRange, false) head, _ := newTestHead(b, chunkRange, false, false)
b.Cleanup(func() { require.NoError(b, head.Close()) }) b.Cleanup(func() { require.NoError(b, head.Close()) })
app := head.Appender(context.Background()) app := head.Appender(context.Background())
@ -2832,7 +2835,7 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) {
} }
func TestChunkSnapshot(t *testing.T) { func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, false) head, _ := newTestHead(t, 120*4, false, false)
defer func() { defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close()) require.NoError(t, head.Close())
@ -3074,7 +3077,7 @@ func TestChunkSnapshot(t *testing.T) {
} }
func TestSnapshotError(t *testing.T) { func TestSnapshotError(t *testing.T) {
head, _ := newTestHead(t, 120*4, false) head, _ := newTestHead(t, 120*4, false, false)
defer func() { defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close()) require.NoError(t, head.Close())
@ -3407,7 +3410,7 @@ func TestOOOMmapReplay(t *testing.T) {
} }
func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()

View file

@ -134,6 +134,27 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, lbls *labels.Labels,
return nil return nil
} }
// PostingsForMatchers needs to be overridden so that the right IndexReader
// implementation gets passed down to the PostingsForMatchers call.
func (oh *OOOHeadIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
return oh.head.pfmc.PostingsForMatchers(oh, concurrent, ms...)
}
// LabelValues needs to be overridden from the headIndexReader implementation due
// to the check that happens at the beginning where we make sure that the query
// interval overlaps with the head minooot and maxooot.
func (oh *OOOHeadIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() {
return []string{}, nil
}
if len(matchers) == 0 {
return oh.head.postings.LabelValues(name), nil
}
return labelValuesWithMatchers(oh, name, matchers...)
}
type chunkMetaAndChunkDiskMapperRef struct { type chunkMetaAndChunkDiskMapperRef struct {
meta chunks.Meta meta chunks.Meta
ref chunks.ChunkDiskMapperRef ref chunks.ChunkDiskMapperRef

View file

@ -3,6 +3,7 @@ package tsdb
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"sort" "sort"
"testing" "testing"
"time" "time"
@ -280,7 +281,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
for perm, intervals := range permutations { for perm, intervals := range permutations {
for _, headChunk := range []bool{false, true} { for _, headChunk := range []bool{false, true} {
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false, true)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -357,6 +358,99 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
} }
} }
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, false, true)
t.Cleanup(func() { require.NoError(t, head.Close()) })
app := head.Appender(context.Background())
// Add in-order samples
_, err := app.Append(0, labels.Labels{
{Name: "foo", Value: "bar1"},
}, 100, 1)
require.NoError(t, err)
_, err = app.Append(0, labels.Labels{
{Name: "foo", Value: "bar2"},
}, 100, 2)
require.NoError(t, err)
// Add ooo samples for those series
_, err = app.Append(0, labels.Labels{
{Name: "foo", Value: "bar1"},
}, 90, 1)
require.NoError(t, err)
_, err = app.Append(0, labels.Labels{
{Name: "foo", Value: "bar2"},
}, 90, 2)
require.NoError(t, err)
require.NoError(t, app.Commit())
cases := []struct {
name string
queryMinT int64
queryMaxT int64
expValues1 []string
expValues2 []string
expValues3 []string
expValues4 []string
}{
{
name: "LabelValues calls when ooo head has max query range",
queryMinT: math.MinInt64,
queryMaxT: math.MaxInt64,
expValues1: []string{"bar1"},
expValues2: []string{},
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
{
name: "LabelValues calls with ooo head query range not overlapping in-order data",
queryMinT: 90,
queryMaxT: 90,
expValues1: []string{"bar1"},
expValues2: []string{},
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
{
name: "LabelValues calls with ooo head query range not overlapping out-of-order data",
queryMinT: 100,
queryMaxT: 100,
expValues1: []string{},
expValues2: []string{},
expValues3: []string{},
expValues4: []string{},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval
oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues("foo", matchers...)
require.NoError(t, err)
require.Equal(t, tc.expValues1, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
values, err = oh.LabelValues("foo", matchers...)
require.NoError(t, err)
require.Equal(t, tc.expValues2, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
values, err = oh.LabelValues("foo", matchers...)
require.NoError(t, err)
require.Equal(t, tc.expValues3, values)
values, err = oh.LabelValues("foo")
require.NoError(t, err)
require.Equal(t, tc.expValues4, values)
})
}
}
// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected. // TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.
// It does so by appending out of order samples to the db and then initializing // It does so by appending out of order samples to the db and then initializing
// an OOOHeadChunkReader to read chunks from it. // an OOOHeadChunkReader to read chunks from it.