// Copyright 2022 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tsdb import ( "context" "fmt" "math" "slices" "sort" "testing" "time" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/wlog" ) type chunkInterval struct { // because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output. // This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk ID int mint int64 maxt int64 } // permutateChunkIntervals returns all possible orders of the given chunkIntervals. func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval { if left == right { inCopy := make([]chunkInterval, len(in)) copy(inCopy, in) return append(out, inCopy) } for i := left; i <= right; i++ { in[left], in[i] = in[i], in[left] out = permutateChunkIntervals(in, out, left+1, right) in[left], in[i] = in[i], in[left] } return out } // TestOOOHeadIndexReader_Series tests that the Series method works as expected. // However it does so by creating chunks and memory mapping them unlike other // tests of the head where samples are appended and we let the head memory map. // We do this because the ingestion path and the appender for out of order // samples are not ready yet. func TestOOOHeadIndexReader_Series(t *testing.T) { tests := []struct { name string queryMinT int64 queryMaxT int64 inputChunkIntervals []chunkInterval expChunks []chunkInterval }{ { name: "Empty result and no error when head is empty", queryMinT: 0, queryMaxT: 100, expChunks: nil, }, { name: "If query interval is bigger than the existing chunks nothing is returned", queryMinT: 500, queryMaxT: 700, inputChunkIntervals: []chunkInterval{ {0, 100, 400}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [---------------------------------------] // Chunk 0 [-----------------------------------------------------------] expChunks: nil, }, { name: "If query interval is smaller than the existing chunks nothing is returned", queryMinT: 100, queryMaxT: 400, inputChunkIntervals: []chunkInterval{ {0, 500, 700}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [-----------------------------------------------------------] // Chunk 0: [---------------------------------------] expChunks: nil, }, { name: "If query interval exceeds the existing chunk, it is returned", queryMinT: 100, queryMaxT: 400, inputChunkIntervals: []chunkInterval{ {0, 150, 350}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [-----------------------------------------------------------] // Chunk 0: [---------------------------------------] expChunks: []chunkInterval{ {0, 150, 350}, }, }, { name: "If chunk exceeds the query interval, it is returned", queryMinT: 150, queryMaxT: 350, inputChunkIntervals: []chunkInterval{ {0, 100, 400}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval: [---------------------------------------] // Chunk 0: [-----------------------------------------------------------] expChunks: []chunkInterval{ {0, 100, 400}, }, }, { name: "Pairwise overlaps should return the references of the first of each pair", queryMinT: 0, queryMaxT: 700, inputChunkIntervals: []chunkInterval{ {0, 100, 200}, {1, 500, 600}, {2, 150, 250}, {3, 550, 650}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [---------------------------------------------------------------------------------------------------------------------------------] // Chunk 0: [-------------------] // Chunk 1: [-------------------] // Chunk 2: [-------------------] // Chunk 3: [-------------------] // Output Graphically [-----------------------------] [-----------------------------] expChunks: []chunkInterval{ {0, 100, 250}, {1, 500, 650}, }, }, { name: "If all chunks overlap, single big chunk is returned", queryMinT: 0, queryMaxT: 700, inputChunkIntervals: []chunkInterval{ {0, 100, 200}, {1, 200, 300}, {2, 300, 400}, {3, 400, 500}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [---------------------------------------------------------------------------------------------------------------------------------] // Chunk 0: [-------------------] // Chunk 1: [-------------------] // Chunk 2: [-------------------] // Chunk 3: [------------------] // Output Graphically [------------------------------------------------------------------------------] expChunks: []chunkInterval{ {0, 100, 500}, }, }, { name: "If no chunks overlap, all chunks are returned", queryMinT: 0, queryMaxT: 700, inputChunkIntervals: []chunkInterval{ {0, 100, 199}, {1, 200, 299}, {2, 300, 399}, {3, 400, 499}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [---------------------------------------------------------------------------------------------------------------------------------] // Chunk 0: [------------------] // Chunk 1: [------------------] // Chunk 2: [------------------] // Chunk 3: [------------------] // Output Graphically [------------------][------------------][------------------][------------------] expChunks: []chunkInterval{ {0, 100, 199}, {1, 200, 299}, {2, 300, 399}, {3, 400, 499}, }, }, { name: "Triplet with pairwise overlaps, query range covers all, and distractor extra chunk", queryMinT: 0, queryMaxT: 400, inputChunkIntervals: []chunkInterval{ {0, 100, 200}, {1, 150, 300}, {2, 250, 350}, {3, 450, 550}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [--------------------------------------------------------------------] // Chunk 0: [------------------] // Chunk 1: [-----------------------------] // Chunk 2: [------------------] // Chunk 3: [------------------] // Output Graphically [-----------------------------------------------] expChunks: []chunkInterval{ {0, 100, 350}, }, }, { name: "Query interval partially overlaps some chunks", queryMinT: 100, queryMaxT: 400, inputChunkIntervals: []chunkInterval{ {0, 250, 500}, {1, 0, 200}, {2, 150, 300}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [------------------------------------------------------------] // Chunk 0: [-------------------------------------------------] // Chunk 1: [-----------------------------] // Chunk 2: [------------------------------] // Output Graphically [-----------------------------------------------------------------------------------------] expChunks: []chunkInterval{ {1, 0, 500}, }, }, { name: "A full overlap pair and disjointed triplet", queryMinT: 0, queryMaxT: 900, inputChunkIntervals: []chunkInterval{ {0, 100, 300}, {1, 770, 850}, {2, 150, 250}, {3, 650, 750}, {4, 600, 800}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 750 800 850 // Query Interval [---------------------------------------------------------------------------------------------------------------------------------------------------------------] // Chunk 0: [---------------------------------------] // Chunk 1: [--------------] // Chunk 2: [-------------------] // Chunk 3: [-------------------] // Chunk 4: [---------------------------------------] // Output Graphically [---------------------------------------] [------------------------------------------------] expChunks: []chunkInterval{ {0, 100, 300}, {4, 600, 850}, }, }, { name: "Query range covers 3 disjoint chunks", queryMinT: 0, queryMaxT: 650, inputChunkIntervals: []chunkInterval{ {0, 100, 150}, {1, 300, 350}, {2, 200, 250}, }, // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 750 800 850 // Query Interval [----------------------------------------------------------------------------------------------------------------------] // Chunk 0: [-------] // Chunk 1: [----------] // Chunk 2: [--------] // Output Graphically [-------] [--------] [----------] expChunks: []chunkInterval{ {0, 100, 150}, {1, 300, 350}, {2, 200, 250}, }, }, } s1Lset := labels.FromStrings("foo", "bar") s1ID := uint64(1) for _, tc := range tests { var permutations [][]chunkInterval if len(tc.inputChunkIntervals) == 0 { // handle special case permutations = [][]chunkInterval{ nil, } } else { permutations = permutateChunkIntervals(tc.inputChunkIntervals, nil, 0, len(tc.inputChunkIntervals)-1) } for perm, intervals := range permutations { for _, headChunk := range []bool{false, true} { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { h, _ := newTestHead(t, 1000, wlog.CompressionNone, true) defer func() { require.NoError(t, h.Close()) }() require.NoError(t, h.Init(0)) s1, _, _ := h.getOrCreate(s1ID, s1Lset) s1.ooo = &memSeriesOOOFields{} // define our expected chunks, by looking at the expected ChunkIntervals and setting... var expChunks []chunks.Meta for _, e := range tc.expChunks { meta := chunks.Meta{ Chunk: chunkenc.Chunk(nil), MinTime: e.mint, MaxTime: e.maxt, } // Ref to whatever Ref the chunk has, that we refer to by ID for ref, c := range intervals { if c.ID == e.ID { meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref))) break } } expChunks = append(expChunks, meta) } slices.SortFunc(expChunks, lessByMinTimeAndMinRef) // We always want the chunks to come back sorted by minTime asc. if headChunk && len(intervals) > 0 { // Put the last interval in the head chunk s1.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), minTime: intervals[len(intervals)-1].mint, maxTime: intervals[len(intervals)-1].maxt, } intervals = intervals[:len(intervals)-1] } for _, ic := range intervals { s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{ minTime: ic.mint, maxTime: ic.maxt, }) } ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err := ir.Series(storage.SeriesRef(s1ID), &b, &chks) require.NoError(t, err) require.Equal(t, s1Lset, b.Labels()) require.Equal(t, expChunks, chks) err = ir.Series(storage.SeriesRef(s1ID+1), &b, &chks) require.Equal(t, storage.ErrNotFound, err) }) } } } } func TestOOOHeadChunkReader_LabelValues(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { testOOOHeadChunkReader_LabelValues(t, scenario) }) } } //nolint:revive // unexported-return. func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) { chunkRange := int64(2000) head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true) t.Cleanup(func() { require.NoError(t, head.Close()) }) ctx := context.Background() app := head.Appender(context.Background()) // Add in-order samples _, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1)) require.NoError(t, err) _, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2)) require.NoError(t, err) // Add ooo samples for those series _, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1)) require.NoError(t, err) _, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2)) require.NoError(t, err) require.NoError(t, app.Commit()) cases := []struct { name string queryMinT int64 queryMaxT int64 expValues1 []string expValues2 []string expValues3 []string expValues4 []string }{ { name: "LabelValues calls when ooo head has max query range", queryMinT: math.MinInt64, queryMaxT: math.MaxInt64, expValues1: []string{"bar1"}, expValues2: nil, expValues3: []string{"bar1", "bar2"}, expValues4: []string{"bar1", "bar2"}, }, { name: "LabelValues calls with ooo head query range not overlapping in-order data", queryMinT: 90, queryMaxT: 90, expValues1: []string{"bar1"}, expValues2: nil, expValues3: []string{"bar1", "bar2"}, expValues4: []string{"bar1", "bar2"}, }, { name: "LabelValues calls with ooo head query range not overlapping out-of-order data", queryMinT: 100, queryMaxT: 100, expValues1: []string{}, expValues2: []string{}, expValues3: []string{}, expValues4: []string{}, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { // We first want to test using a head index reader that covers the biggest query interval oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} values, err := oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) require.NoError(t, err) require.Equal(t, tc.expValues1, values) matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")} values, err = oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) require.NoError(t, err) require.Equal(t, tc.expValues2, values) matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")} values, err = oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) require.NoError(t, err) require.Equal(t, tc.expValues3, values) values, err = oh.LabelValues(ctx, "foo") sort.Strings(values) require.NoError(t, err) require.Equal(t, tc.expValues4, values) }) } } // TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected. // It does so by appending out of order samples to the db and then initializing // an OOOHeadChunkReader to read chunks from it. func TestOOOHeadChunkReader_Chunk(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { testOOOHeadChunkReader_Chunk(t, scenario) }) } } //nolint:revive // unexported-return. func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 5 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() s1 := labels.FromStrings("l", "v1") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found")) require.Nil(t, c) }) tests := []struct { name string queryMinT int64 queryMaxT int64 firstInOrderSampleAt int64 inputSamples []testValue expChunkError bool expChunksSamples []chunks.SampleSlice }{ { name: "Getting the head when there are no overlapping chunks returns just the samples in the head", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{ {Ts: minutes(30), V: 0}, {Ts: minutes(40), V: 0}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0: Current Head [--------] (With 2 samples) // Output Graphically [--------] (With 2 samples) expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(30), 0), scenario.sampleFunc(minutes(40), 0), }, }, }, { name: "Getting the head chunk when there are overlapping chunks returns all combined", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}}, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [---] (With 5 samples) // Chunk 1: Current Head [-----------------] (With 2 samples) // Output Graphically [-----------------] (With 7 samples) expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(30), 1), scenario.sampleFunc(minutes(41), 0), scenario.sampleFunc(minutes(42), 0), scenario.sampleFunc(minutes(43), 0), scenario.sampleFunc(minutes(44), 0), scenario.sampleFunc(minutes(45), 0), scenario.sampleFunc(minutes(50), 1), }, }, }, { name: "Two windows of overlapping chunks get properly converged", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{ // Chunk 0 {Ts: minutes(10), V: 0}, {Ts: minutes(12), V: 0}, {Ts: minutes(14), V: 0}, {Ts: minutes(16), V: 0}, {Ts: minutes(20), V: 0}, // Chunk 1 {Ts: minutes(20), V: 1}, {Ts: minutes(22), V: 1}, {Ts: minutes(24), V: 1}, {Ts: minutes(26), V: 1}, {Ts: minutes(29), V: 1}, // Chunk 3 {Ts: minutes(30), V: 2}, {Ts: minutes(32), V: 2}, {Ts: minutes(34), V: 2}, {Ts: minutes(36), V: 2}, {Ts: minutes(40), V: 2}, // Head {Ts: minutes(40), V: 3}, {Ts: minutes(50), V: 3}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [--------] // Chunk 1 [-------] // Chunk 2 [--------] // Chunk 3: Current Head [--------] // Output Graphically [----------------][-----------------] expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(10), 0), scenario.sampleFunc(minutes(12), 0), scenario.sampleFunc(minutes(14), 0), scenario.sampleFunc(minutes(16), 0), scenario.sampleFunc(minutes(20), 1), scenario.sampleFunc(minutes(22), 1), scenario.sampleFunc(minutes(24), 1), scenario.sampleFunc(minutes(26), 1), scenario.sampleFunc(minutes(29), 1), }, { scenario.sampleFunc(minutes(30), 2), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(34), 2), scenario.sampleFunc(minutes(36), 2), scenario.sampleFunc(minutes(40), 3), scenario.sampleFunc(minutes(50), 3), }, }, }, { name: "Two windows of overlapping chunks in descending order get properly converged", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{ // Chunk 0 {Ts: minutes(40), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(46), V: 0}, {Ts: minutes(50), V: 0}, // Chunk 1 {Ts: minutes(30), V: 1}, {Ts: minutes(32), V: 1}, {Ts: minutes(34), V: 1}, {Ts: minutes(36), V: 1}, {Ts: minutes(40), V: 1}, // Chunk 3 {Ts: minutes(20), V: 2}, {Ts: minutes(22), V: 2}, {Ts: minutes(24), V: 2}, {Ts: minutes(26), V: 2}, {Ts: minutes(29), V: 2}, // Head {Ts: minutes(10), V: 3}, {Ts: minutes(20), V: 3}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [--------] // Chunk 1 [--------] // Chunk 2 [-------] // Chunk 3: Current Head [--------] // Output Graphically [----------------][-----------------] expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(10), 3), scenario.sampleFunc(minutes(20), 2), scenario.sampleFunc(minutes(22), 2), scenario.sampleFunc(minutes(24), 2), scenario.sampleFunc(minutes(26), 2), scenario.sampleFunc(minutes(29), 2), }, { scenario.sampleFunc(minutes(30), 1), scenario.sampleFunc(minutes(32), 1), scenario.sampleFunc(minutes(34), 1), scenario.sampleFunc(minutes(36), 1), scenario.sampleFunc(minutes(40), 0), scenario.sampleFunc(minutes(42), 0), scenario.sampleFunc(minutes(44), 0), scenario.sampleFunc(minutes(46), 0), scenario.sampleFunc(minutes(50), 0), }, }, }, { name: "If chunks are not overlapped they are not converged", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{ // Chunk 0 {Ts: minutes(10), V: 0}, {Ts: minutes(12), V: 0}, {Ts: minutes(14), V: 0}, {Ts: minutes(16), V: 0}, {Ts: minutes(18), V: 0}, // Chunk 1 {Ts: minutes(20), V: 1}, {Ts: minutes(22), V: 1}, {Ts: minutes(24), V: 1}, {Ts: minutes(26), V: 1}, {Ts: minutes(28), V: 1}, // Chunk 3 {Ts: minutes(30), V: 2}, {Ts: minutes(32), V: 2}, {Ts: minutes(34), V: 2}, {Ts: minutes(36), V: 2}, {Ts: minutes(38), V: 2}, // Head {Ts: minutes(40), V: 3}, {Ts: minutes(42), V: 3}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [-------] // Chunk 1 [-------] // Chunk 2 [-------] // Chunk 3: Current Head [-------] // Output Graphically [-------][-------][-------][--------] expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(10), 0), scenario.sampleFunc(minutes(12), 0), scenario.sampleFunc(minutes(14), 0), scenario.sampleFunc(minutes(16), 0), scenario.sampleFunc(minutes(18), 0), }, { scenario.sampleFunc(minutes(20), 1), scenario.sampleFunc(minutes(22), 1), scenario.sampleFunc(minutes(24), 1), scenario.sampleFunc(minutes(26), 1), scenario.sampleFunc(minutes(28), 1), }, { scenario.sampleFunc(minutes(30), 2), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(34), 2), scenario.sampleFunc(minutes(36), 2), scenario.sampleFunc(minutes(38), 2), }, { scenario.sampleFunc(minutes(40), 3), scenario.sampleFunc(minutes(42), 3), }, }, }, { name: "Triplet of chunks overlapping returns a single merged chunk", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{ // Chunk 0 {Ts: minutes(10), V: 0}, {Ts: minutes(15), V: 0}, {Ts: minutes(20), V: 0}, {Ts: minutes(25), V: 0}, {Ts: minutes(30), V: 0}, // Chunk 1 {Ts: minutes(20), V: 1}, {Ts: minutes(25), V: 1}, {Ts: minutes(30), V: 1}, {Ts: minutes(35), V: 1}, {Ts: minutes(42), V: 1}, // Chunk 2 Head {Ts: minutes(32), V: 2}, {Ts: minutes(50), V: 2}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [-----------------] // Chunk 1 [--------------------] // Chunk 2 Current Head [--------------] // Output Graphically [-----------------------------------] expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(10), 0), scenario.sampleFunc(minutes(15), 0), scenario.sampleFunc(minutes(20), 1), scenario.sampleFunc(minutes(25), 1), scenario.sampleFunc(minutes(30), 1), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(35), 1), scenario.sampleFunc(minutes(42), 1), scenario.sampleFunc(minutes(50), 2), }, }, }, { name: "Query interval partially overlaps with a triplet of chunks but still returns a single merged chunk", queryMinT: minutes(12), queryMaxT: minutes(33), firstInOrderSampleAt: minutes(120), inputSamples: []testValue{ // Chunk 0 {Ts: minutes(10), V: 0}, {Ts: minutes(15), V: 0}, {Ts: minutes(20), V: 0}, {Ts: minutes(25), V: 0}, {Ts: minutes(30), V: 0}, // Chunk 1 {Ts: minutes(20), V: 1}, {Ts: minutes(25), V: 1}, {Ts: minutes(30), V: 1}, {Ts: minutes(35), V: 1}, {Ts: minutes(42), V: 1}, // Chunk 2 Head {Ts: minutes(32), V: 2}, {Ts: minutes(50), V: 2}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------] // Chunk 0 [-----------------] // Chunk 1 [--------------------] // Chunk 2 Current Head [--------------] // Output Graphically [-----------------------------------] expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(10), 0), scenario.sampleFunc(minutes(15), 0), scenario.sampleFunc(minutes(20), 1), scenario.sampleFunc(minutes(25), 1), scenario.sampleFunc(minutes(30), 1), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(35), 1), scenario.sampleFunc(minutes(42), 1), scenario.sampleFunc(minutes(50), 2), }, }, }, } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { db := newTestDBWithOpts(t, opts) app := db.Appender(context.Background()) s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()) require.NoError(t, err) require.NoError(t, app.Commit()) // OOO few samples for s1. app = db.Appender(context.Background()) for _, s := range tc.inputSamples { _, _, err := scenario.appendFunc(app, s1, s.Ts, s.V) require.NoError(t, err) } require.NoError(t, app.Commit()) // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) require.NoError(t, err) require.Nil(t, c) it := iterable.Iterator(nil) resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) } }) } } // TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests // that if a query comes and performs a Series() call followed by a Chunks() call // the response is consistent with the data seen by Series() even if the OOO // head receives more samples before Chunks() is called. // An example: // - Response A comes from: Series() then Chunk() // - Response B comes from : Series(), in parallel new samples added to the head, then Chunk() // - A == B func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario) }) } } //nolint:revive // unexported-return. func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 5 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() s1 := labels.FromStrings("l", "v1") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } tests := []struct { name string queryMinT int64 queryMaxT int64 firstInOrderSampleAt int64 initialSamples []testValue samplesAfterSeriesCall []testValue expChunkError bool expChunksSamples []chunks.SampleSlice }{ { name: "Current head gets old, new and in between sample after Series call, they all should be omitted from the result", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), initialSamples: []testValue{ // Chunk 0 {Ts: minutes(20), V: 0}, {Ts: minutes(22), V: 0}, {Ts: minutes(24), V: 0}, {Ts: minutes(26), V: 0}, {Ts: minutes(30), V: 0}, // Chunk 1 Head {Ts: minutes(25), V: 1}, {Ts: minutes(35), V: 1}, }, samplesAfterSeriesCall: []testValue{ {Ts: minutes(10), V: 1}, {Ts: minutes(32), V: 1}, {Ts: minutes(50), V: 1}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [-----------------------------------] // Chunk 0: [--------] (5 samples) // Chunk 1: Current Head [-------] (2 samples) // New samples added after Series() // Chunk 1: Current Head [-----------------------------------] (5 samples) // Output Graphically [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept) expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(20), 0), scenario.sampleFunc(minutes(22), 0), scenario.sampleFunc(minutes(24), 0), scenario.sampleFunc(minutes(25), 1), scenario.sampleFunc(minutes(26), 0), scenario.sampleFunc(minutes(30), 0), scenario.sampleFunc(minutes(35), 1), }, }, }, { name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), initialSamples: []testValue{ // Chunk 0 {Ts: minutes(20), V: 0}, {Ts: minutes(22), V: 0}, {Ts: minutes(24), V: 0}, {Ts: minutes(26), V: 0}, {Ts: minutes(30), V: 0}, // Chunk 1 Head {Ts: minutes(25), V: 1}, {Ts: minutes(35), V: 1}, }, samplesAfterSeriesCall: []testValue{ {Ts: minutes(10), V: 1}, {Ts: minutes(32), V: 1}, {Ts: minutes(50), V: 1}, // Chunk 1 gets mmapped and Chunk 2, the new head is born {Ts: minutes(25), V: 2}, {Ts: minutes(31), V: 2}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [-----------------------------------] // Chunk 0: [--------] (5 samples) // Chunk 1: Current Head [-------] (2 samples) // New samples added after Series() // Chunk 1 (mmapped) [-------------------------] (5 samples) // Chunk 2: Current Head [-----------] (2 samples) // Output Graphically [------------] (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1 expChunksSamples: []chunks.SampleSlice{ { scenario.sampleFunc(minutes(20), 0), scenario.sampleFunc(minutes(22), 0), scenario.sampleFunc(minutes(24), 0), scenario.sampleFunc(minutes(25), 1), scenario.sampleFunc(minutes(26), 0), scenario.sampleFunc(minutes(30), 0), scenario.sampleFunc(minutes(35), 1), }, }, }, } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { db := newTestDBWithOpts(t, opts) app := db.Appender(context.Background()) s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()) require.NoError(t, err) require.NoError(t, app.Commit()) // OOO few samples for s1. app = db.Appender(context.Background()) for _, s := range tc.initialSamples { _, _, err := scenario.appendFunc(app, s1, s.Ts, s.V) require.NoError(t, err) } require.NoError(t, app.Commit()) // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) // Now we keep receiving ooo samples // OOO few samples for s1. app = db.Appender(context.Background()) for _, s := range tc.samplesAfterSeriesCall { _, _, err = scenario.appendFunc(app, s1, s.Ts, s.V) require.NoError(t, err) } require.NoError(t, app.Commit()) cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) require.NoError(t, err) require.Nil(t, c) it := iterable.Iterator(nil) resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) } }) } } // TestSortByMinTimeAndMinRef tests that the sort function for chunk metas does sort // by chunk meta MinTime and in case of same references by the lower reference. func TestSortByMinTimeAndMinRef(t *testing.T) { tests := []struct { name string input []chunkMetaAndChunkDiskMapperRef exp []chunkMetaAndChunkDiskMapperRef }{ { name: "chunks are ordered by min time", input: []chunkMetaAndChunkDiskMapperRef{ { meta: chunks.Meta{ Ref: 0, MinTime: 0, }, ref: chunks.ChunkDiskMapperRef(0), }, { meta: chunks.Meta{ Ref: 1, MinTime: 1, }, ref: chunks.ChunkDiskMapperRef(1), }, }, exp: []chunkMetaAndChunkDiskMapperRef{ { meta: chunks.Meta{ Ref: 0, MinTime: 0, }, ref: chunks.ChunkDiskMapperRef(0), }, { meta: chunks.Meta{ Ref: 1, MinTime: 1, }, ref: chunks.ChunkDiskMapperRef(1), }, }, }, { name: "if same mintime, lower reference goes first", input: []chunkMetaAndChunkDiskMapperRef{ { meta: chunks.Meta{ Ref: 10, MinTime: 0, }, ref: chunks.ChunkDiskMapperRef(0), }, { meta: chunks.Meta{ Ref: 5, MinTime: 0, }, ref: chunks.ChunkDiskMapperRef(1), }, }, exp: []chunkMetaAndChunkDiskMapperRef{ { meta: chunks.Meta{ Ref: 5, MinTime: 0, }, ref: chunks.ChunkDiskMapperRef(1), }, { meta: chunks.Meta{ Ref: 10, MinTime: 0, }, ref: chunks.ChunkDiskMapperRef(0), }, }, }, } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { slices.SortFunc(tc.input, refLessByMinTimeAndMinRef) require.Equal(t, tc.exp, tc.input) }) } } // TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort // by chunk meta MinTime and in case of same references by the lower reference. func TestSortMetaByMinTimeAndMinRef(t *testing.T) { tests := []struct { name string inputMetas []chunks.Meta expMetas []chunks.Meta }{ { name: "chunks are ordered by min time", inputMetas: []chunks.Meta{ { Ref: 0, MinTime: 0, }, { Ref: 1, MinTime: 1, }, }, expMetas: []chunks.Meta{ { Ref: 0, MinTime: 0, }, { Ref: 1, MinTime: 1, }, }, }, { name: "if same mintime, lower reference goes first", inputMetas: []chunks.Meta{ { Ref: 10, MinTime: 0, }, { Ref: 5, MinTime: 0, }, }, expMetas: []chunks.Meta{ { Ref: 5, MinTime: 0, }, { Ref: 10, MinTime: 0, }, }, }, } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { slices.SortFunc(tc.inputMetas, lessByMinTimeAndMinRef) require.Equal(t, tc.expMetas, tc.inputMetas) }) } } func newTestDBWithOpts(t *testing.T, opts *Options) *DB { dir := t.TempDir() db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, db.Close()) }) return db }