prometheus/tsdb/ooo_head_read_test.go
Bryan Boreham 26b3de0438 TSDB: Remove OOOHeadIndexReader
Use headIndexReader instead.

OOOCompactionHeadIndexReader needs to be expanded slightly, because it previously delegated to OOOHeadIndexReader.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
2024-08-14 13:41:13 +01:00

1196 lines
42 KiB
Go

// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"fmt"
"math"
"slices"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
)
type chunkInterval struct {
// because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output.
// This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk
ID int
mint int64
maxt int64
}
// permutateChunkIntervals returns all possible orders of the given chunkIntervals.
func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval {
if left == right {
inCopy := make([]chunkInterval, len(in))
copy(inCopy, in)
return append(out, inCopy)
}
for i := left; i <= right; i++ {
in[left], in[i] = in[i], in[left]
out = permutateChunkIntervals(in, out, left+1, right)
in[left], in[i] = in[i], in[left]
}
return out
}
// TestOOOHeadIndexReader_Series tests that the Series method works as expected.
// However it does so by creating chunks and memory mapping them unlike other
// tests of the head where samples are appended and we let the head memory map.
// We do this because the ingestion path and the appender for out of order
// samples are not ready yet.
func TestOOOHeadIndexReader_Series(t *testing.T) {
tests := []struct {
name string
queryMinT int64
queryMaxT int64
inputChunkIntervals []chunkInterval
expChunks []chunkInterval
}{
{
name: "Empty result and no error when head is empty",
queryMinT: 0,
queryMaxT: 100,
expChunks: nil,
},
{
name: "If query interval is bigger than the existing chunks nothing is returned",
queryMinT: 500,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 400},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------]
// Chunk 0 [-----------------------------------------------------------]
expChunks: nil,
},
{
name: "If query interval is smaller than the existing chunks nothing is returned",
queryMinT: 100,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 500, 700},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [-----------------------------------------------------------]
// Chunk 0: [---------------------------------------]
expChunks: nil,
},
{
name: "If query interval exceeds the existing chunk, it is returned",
queryMinT: 100,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 150, 350},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [-----------------------------------------------------------]
// Chunk 0: [---------------------------------------]
expChunks: []chunkInterval{
{0, 150, 350},
},
},
{
name: "If chunk exceeds the query interval, it is returned",
queryMinT: 150,
queryMaxT: 350,
inputChunkIntervals: []chunkInterval{
{0, 100, 400},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval: [---------------------------------------]
// Chunk 0: [-----------------------------------------------------------]
expChunks: []chunkInterval{
{0, 100, 400},
},
},
{
name: "Pairwise overlaps should return the references of the first of each pair",
queryMinT: 0,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 200},
{1, 500, 600},
{2, 150, 250},
{3, 550, 650},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [-------------------]
// Chunk 1: [-------------------]
// Chunk 2: [-------------------]
// Chunk 3: [-------------------]
// Output Graphically [-----------------------------] [-----------------------------]
expChunks: []chunkInterval{
{0, 100, 250},
{1, 500, 650},
},
},
{
name: "If all chunks overlap, single big chunk is returned",
queryMinT: 0,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 200},
{1, 200, 300},
{2, 300, 400},
{3, 400, 500},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [-------------------]
// Chunk 1: [-------------------]
// Chunk 2: [-------------------]
// Chunk 3: [------------------]
// Output Graphically [------------------------------------------------------------------------------]
expChunks: []chunkInterval{
{0, 100, 500},
},
},
{
name: "If no chunks overlap, all chunks are returned",
queryMinT: 0,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 199},
{1, 200, 299},
{2, 300, 399},
{3, 400, 499},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [------------------]
// Chunk 1: [------------------]
// Chunk 2: [------------------]
// Chunk 3: [------------------]
// Output Graphically [------------------][------------------][------------------][------------------]
expChunks: []chunkInterval{
{0, 100, 199},
{1, 200, 299},
{2, 300, 399},
{3, 400, 499},
},
},
{
name: "Triplet with pairwise overlaps, query range covers all, and distractor extra chunk",
queryMinT: 0,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 100, 200},
{1, 150, 300},
{2, 250, 350},
{3, 450, 550},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [--------------------------------------------------------------------]
// Chunk 0: [------------------]
// Chunk 1: [-----------------------------]
// Chunk 2: [------------------]
// Chunk 3: [------------------]
// Output Graphically [-----------------------------------------------]
expChunks: []chunkInterval{
{0, 100, 350},
},
},
{
name: "Query interval partially overlaps some chunks",
queryMinT: 100,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 250, 500},
{1, 0, 200},
{2, 150, 300},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [------------------------------------------------------------]
// Chunk 0: [-------------------------------------------------]
// Chunk 1: [-----------------------------]
// Chunk 2: [------------------------------]
// Output Graphically [-----------------------------------------------------------------------------------------]
expChunks: []chunkInterval{
{1, 0, 500},
},
},
{
name: "A full overlap pair and disjointed triplet",
queryMinT: 0,
queryMaxT: 900,
inputChunkIntervals: []chunkInterval{
{0, 100, 300},
{1, 770, 850},
{2, 150, 250},
{3, 650, 750},
{4, 600, 800},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 750 800 850
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [---------------------------------------]
// Chunk 1: [--------------]
// Chunk 2: [-------------------]
// Chunk 3: [-------------------]
// Chunk 4: [---------------------------------------]
// Output Graphically [---------------------------------------] [------------------------------------------------]
expChunks: []chunkInterval{
{0, 100, 300},
{4, 600, 850},
},
},
{
name: "Query range covers 3 disjoint chunks",
queryMinT: 0,
queryMaxT: 650,
inputChunkIntervals: []chunkInterval{
{0, 100, 150},
{1, 300, 350},
{2, 200, 250},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 750 800 850
// Query Interval [----------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [-------]
// Chunk 1: [----------]
// Chunk 2: [--------]
// Output Graphically [-------] [--------] [----------]
expChunks: []chunkInterval{
{0, 100, 150},
{1, 300, 350},
{2, 200, 250},
},
},
}
s1Lset := labels.FromStrings("foo", "bar")
s1ID := uint64(1)
for _, tc := range tests {
var permutations [][]chunkInterval
if len(tc.inputChunkIntervals) == 0 {
// handle special case
permutations = [][]chunkInterval{
nil,
}
} else {
permutations = permutateChunkIntervals(tc.inputChunkIntervals, nil, 0, len(tc.inputChunkIntervals)-1)
}
for perm, intervals := range permutations {
for _, headChunk := range []bool{false, true} {
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
defer func() {
require.NoError(t, h.Close())
}()
require.NoError(t, h.Init(0))
s1, _, _ := h.getOrCreate(s1ID, s1Lset)
s1.ooo = &memSeriesOOOFields{}
// define our expected chunks, by looking at the expected ChunkIntervals and setting...
var expChunks []chunks.Meta
for _, e := range tc.expChunks {
meta := chunks.Meta{
Chunk: chunkenc.Chunk(nil),
MinTime: e.mint,
MaxTime: e.maxt,
}
// Ref to whatever Ref the chunk has, that we refer to by ID
for ref, c := range intervals {
if c.ID == e.ID {
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
break
}
}
expChunks = append(expChunks, meta)
}
slices.SortFunc(expChunks, lessByMinTimeAndMinRef) // We always want the chunks to come back sorted by minTime asc.
if headChunk && len(intervals) > 0 {
// Put the last interval in the head chunk
s1.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(),
minTime: intervals[len(intervals)-1].mint,
maxTime: intervals[len(intervals)-1].maxt,
}
intervals = intervals[:len(intervals)-1]
}
for _, ic := range intervals {
s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{
minTime: ic.mint,
maxTime: ic.maxt,
})
}
ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err := ir.Series(storage.SeriesRef(s1ID), &b, &chks)
require.NoError(t, err)
require.Equal(t, s1Lset, b.Labels())
require.Equal(t, expChunks, chks)
err = ir.Series(storage.SeriesRef(s1ID+1), &b, &chks)
require.Equal(t, storage.ErrNotFound, err)
})
}
}
}
}
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_LabelValues(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
t.Cleanup(func() { require.NoError(t, head.Close()) })
ctx := context.Background()
app := head.Appender(context.Background())
// Add in-order samples
_, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1))
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2))
require.NoError(t, err)
// Add ooo samples for those series
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1))
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2))
require.NoError(t, err)
require.NoError(t, app.Commit())
cases := []struct {
name string
queryMinT int64
queryMaxT int64
expValues1 []string
expValues2 []string
expValues3 []string
expValues4 []string
}{
{
name: "LabelValues calls when ooo head has max query range",
queryMinT: math.MinInt64,
queryMaxT: math.MaxInt64,
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
{
name: "LabelValues calls with ooo head query range not overlapping in-order data",
queryMinT: 90,
queryMaxT: 90,
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
{
name: "LabelValues calls with ooo head query range not overlapping out-of-order data",
queryMinT: 100,
queryMaxT: 100,
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues1, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
values, err = oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues2, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
values, err = oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues3, values)
values, err = oh.LabelValues(ctx, "foo")
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues4, values)
})
}
}
// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.
// It does so by appending out of order samples to the db and then initializing
// an OOOHeadChunkReader to read chunks from it.
func TestOOOHeadChunkReader_Chunk(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_Chunk(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
})
require.Nil(t, iterable)
require.Equal(t, err, fmt.Errorf("not found"))
require.Nil(t, c)
})
tests := []struct {
name string
queryMinT int64
queryMaxT int64
firstInOrderSampleAt int64
inputSamples []testValue
expChunkError bool
expChunksSamples []chunks.SampleSlice
}{
{
name: "Getting the head when there are no overlapping chunks returns just the samples in the head",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
{Ts: minutes(30), V: 0},
{Ts: minutes(40), V: 0},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0: Current Head [--------] (With 2 samples)
// Output Graphically [--------] (With 2 samples)
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(40), 0),
},
},
},
{
name: "Getting the head chunk when there are overlapping chunks returns all combined",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [---] (With 5 samples)
// Chunk 1: Current Head [-----------------] (With 2 samples)
// Output Graphically [-----------------] (With 7 samples)
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(41), 0),
scenario.sampleFunc(minutes(42), 0),
scenario.sampleFunc(minutes(43), 0),
scenario.sampleFunc(minutes(44), 0),
scenario.sampleFunc(minutes(45), 0),
scenario.sampleFunc(minutes(50), 1),
},
},
},
{
name: "Two windows of overlapping chunks get properly converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(12), V: 0},
{Ts: minutes(14), V: 0},
{Ts: minutes(16), V: 0},
{Ts: minutes(20), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(22), V: 1},
{Ts: minutes(24), V: 1},
{Ts: minutes(26), V: 1},
{Ts: minutes(29), V: 1},
// Chunk 3
{Ts: minutes(30), V: 2},
{Ts: minutes(32), V: 2},
{Ts: minutes(34), V: 2},
{Ts: minutes(36), V: 2},
{Ts: minutes(40), V: 2},
// Head
{Ts: minutes(40), V: 3},
{Ts: minutes(50), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [--------]
// Chunk 1 [-------]
// Chunk 2 [--------]
// Chunk 3: Current Head [--------]
// Output Graphically [----------------][-----------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(12), 0),
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(29), 1),
},
{
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(50), 3),
},
},
},
{
name: "Two windows of overlapping chunks in descending order get properly converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(40), V: 0},
{Ts: minutes(42), V: 0},
{Ts: minutes(44), V: 0},
{Ts: minutes(46), V: 0},
{Ts: minutes(50), V: 0},
// Chunk 1
{Ts: minutes(30), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(34), V: 1},
{Ts: minutes(36), V: 1},
{Ts: minutes(40), V: 1},
// Chunk 3
{Ts: minutes(20), V: 2},
{Ts: minutes(22), V: 2},
{Ts: minutes(24), V: 2},
{Ts: minutes(26), V: 2},
{Ts: minutes(29), V: 2},
// Head
{Ts: minutes(10), V: 3},
{Ts: minutes(20), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [--------]
// Chunk 1 [--------]
// Chunk 2 [-------]
// Chunk 3: Current Head [--------]
// Output Graphically [----------------][-----------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 3),
scenario.sampleFunc(minutes(20), 2),
scenario.sampleFunc(minutes(22), 2),
scenario.sampleFunc(minutes(24), 2),
scenario.sampleFunc(minutes(26), 2),
scenario.sampleFunc(minutes(29), 2),
},
{
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 1),
scenario.sampleFunc(minutes(34), 1),
scenario.sampleFunc(minutes(36), 1),
scenario.sampleFunc(minutes(40), 0),
scenario.sampleFunc(minutes(42), 0),
scenario.sampleFunc(minutes(44), 0),
scenario.sampleFunc(minutes(46), 0),
scenario.sampleFunc(minutes(50), 0),
},
},
},
{
name: "If chunks are not overlapped they are not converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(12), V: 0},
{Ts: minutes(14), V: 0},
{Ts: minutes(16), V: 0},
{Ts: minutes(18), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(22), V: 1},
{Ts: minutes(24), V: 1},
{Ts: minutes(26), V: 1},
{Ts: minutes(28), V: 1},
// Chunk 3
{Ts: minutes(30), V: 2},
{Ts: minutes(32), V: 2},
{Ts: minutes(34), V: 2},
{Ts: minutes(36), V: 2},
{Ts: minutes(38), V: 2},
// Head
{Ts: minutes(40), V: 3},
{Ts: minutes(42), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [-------]
// Chunk 1 [-------]
// Chunk 2 [-------]
// Chunk 3: Current Head [-------]
// Output Graphically [-------][-------][-------][--------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(12), 0),
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(18), 0),
},
{
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(28), 1),
},
{
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(38), 2),
},
{
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(42), 3),
},
},
},
{
name: "Triplet of chunks overlapping returns a single merged chunk",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(15), V: 0},
{Ts: minutes(20), V: 0},
{Ts: minutes(25), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(25), V: 1},
{Ts: minutes(30), V: 1},
{Ts: minutes(35), V: 1},
{Ts: minutes(42), V: 1},
// Chunk 2 Head
{Ts: minutes(32), V: 2},
{Ts: minutes(50), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [-----------------]
// Chunk 1 [--------------------]
// Chunk 2 Current Head [--------------]
// Output Graphically [-----------------------------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(15), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(35), 1),
scenario.sampleFunc(minutes(42), 1),
scenario.sampleFunc(minutes(50), 2),
},
},
},
{
name: "Query interval partially overlaps with a triplet of chunks but still returns a single merged chunk",
queryMinT: minutes(12),
queryMaxT: minutes(33),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(15), V: 0},
{Ts: minutes(20), V: 0},
{Ts: minutes(25), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(25), V: 1},
{Ts: minutes(30), V: 1},
{Ts: minutes(35), V: 1},
{Ts: minutes(42), V: 1},
// Chunk 2 Head
{Ts: minutes(32), V: 2},
{Ts: minutes(50), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------]
// Chunk 0 [-----------------]
// Chunk 1 [--------------------]
// Chunk 2 Current Head [--------------]
// Output Graphically [-----------------------------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(15), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(35), 1),
scenario.sampleFunc(minutes(42), 1),
scenario.sampleFunc(minutes(50), 2),
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
app := db.Appender(context.Background())
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
require.NoError(t, err)
require.NoError(t, app.Commit())
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.inputSamples {
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
require.NoError(t, err)
require.Nil(t, c)
it := iterable.Iterator(nil)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
}
})
}
}
// TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests
// that if a query comes and performs a Series() call followed by a Chunks() call
// the response is consistent with the data seen by Series() even if the OOO
// head receives more samples before Chunks() is called.
// An example:
// - Response A comes from: Series() then Chunk()
// - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
// - A == B
func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
tests := []struct {
name string
queryMinT int64
queryMaxT int64
firstInOrderSampleAt int64
initialSamples []testValue
samplesAfterSeriesCall []testValue
expChunkError bool
expChunksSamples []chunks.SampleSlice
}{
{
name: "Current head gets old, new and in between sample after Series call, they all should be omitted from the result",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
initialSamples: []testValue{
// Chunk 0
{Ts: minutes(20), V: 0},
{Ts: minutes(22), V: 0},
{Ts: minutes(24), V: 0},
{Ts: minutes(26), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1 Head
{Ts: minutes(25), V: 1},
{Ts: minutes(35), V: 1},
},
samplesAfterSeriesCall: []testValue{
{Ts: minutes(10), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(50), V: 1},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [-----------------------------------]
// Chunk 0: [--------] (5 samples)
// Chunk 1: Current Head [-------] (2 samples)
// New samples added after Series()
// Chunk 1: Current Head [-----------------------------------] (5 samples)
// Output Graphically [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept)
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(20), 0),
scenario.sampleFunc(minutes(22), 0),
scenario.sampleFunc(minutes(24), 0),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(26), 0),
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(35), 1),
},
},
},
{
name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
initialSamples: []testValue{
// Chunk 0
{Ts: minutes(20), V: 0},
{Ts: minutes(22), V: 0},
{Ts: minutes(24), V: 0},
{Ts: minutes(26), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1 Head
{Ts: minutes(25), V: 1},
{Ts: minutes(35), V: 1},
},
samplesAfterSeriesCall: []testValue{
{Ts: minutes(10), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(50), V: 1},
// Chunk 1 gets mmapped and Chunk 2, the new head is born
{Ts: minutes(25), V: 2},
{Ts: minutes(31), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [-----------------------------------]
// Chunk 0: [--------] (5 samples)
// Chunk 1: Current Head [-------] (2 samples)
// New samples added after Series()
// Chunk 1 (mmapped) [-------------------------] (5 samples)
// Chunk 2: Current Head [-----------] (2 samples)
// Output Graphically [------------] (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(20), 0),
scenario.sampleFunc(minutes(22), 0),
scenario.sampleFunc(minutes(24), 0),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(26), 0),
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(35), 1),
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
app := db.Appender(context.Background())
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
require.NoError(t, err)
require.NoError(t, app.Commit())
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.initialSamples {
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
// Now we keep receiving ooo samples
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.samplesAfterSeriesCall {
_, _, err = scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
require.NoError(t, err)
require.Nil(t, c)
it := iterable.Iterator(nil)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
}
})
}
}
// TestSortByMinTimeAndMinRef tests that the sort function for chunk metas does sort
// by chunk meta MinTime and in case of same references by the lower reference.
func TestSortByMinTimeAndMinRef(t *testing.T) {
tests := []struct {
name string
input []chunkMetaAndChunkDiskMapperRef
exp []chunkMetaAndChunkDiskMapperRef
}{
{
name: "chunks are ordered by min time",
input: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 0,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
{
meta: chunks.Meta{
Ref: 1,
MinTime: 1,
},
ref: chunks.ChunkDiskMapperRef(1),
},
},
exp: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 0,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
{
meta: chunks.Meta{
Ref: 1,
MinTime: 1,
},
ref: chunks.ChunkDiskMapperRef(1),
},
},
},
{
name: "if same mintime, lower reference goes first",
input: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 10,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
{
meta: chunks.Meta{
Ref: 5,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(1),
},
},
exp: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 5,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(1),
},
{
meta: chunks.Meta{
Ref: 10,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
slices.SortFunc(tc.input, refLessByMinTimeAndMinRef)
require.Equal(t, tc.exp, tc.input)
})
}
}
// TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort
// by chunk meta MinTime and in case of same references by the lower reference.
func TestSortMetaByMinTimeAndMinRef(t *testing.T) {
tests := []struct {
name string
inputMetas []chunks.Meta
expMetas []chunks.Meta
}{
{
name: "chunks are ordered by min time",
inputMetas: []chunks.Meta{
{
Ref: 0,
MinTime: 0,
},
{
Ref: 1,
MinTime: 1,
},
},
expMetas: []chunks.Meta{
{
Ref: 0,
MinTime: 0,
},
{
Ref: 1,
MinTime: 1,
},
},
},
{
name: "if same mintime, lower reference goes first",
inputMetas: []chunks.Meta{
{
Ref: 10,
MinTime: 0,
},
{
Ref: 5,
MinTime: 0,
},
},
expMetas: []chunks.Meta{
{
Ref: 5,
MinTime: 0,
},
{
Ref: 10,
MinTime: 0,
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
slices.SortFunc(tc.inputMetas, lessByMinTimeAndMinRef)
require.Equal(t, tc.expMetas, tc.inputMetas)
})
}
}
func newTestDBWithOpts(t *testing.T, opts *Options) *DB {
dir := t.TempDir()
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
return db
}