prometheus/tsdb/ooo_head_read_test.go
György Krajcsovits e6a682f046 Reproduce populateWithDelChunkSeriesIterator corrupting chunk meta
When handling recoded histogram chunks the min time of the chunk is
updated by mistake. It should only update when the chunk is completely
new.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
2024-10-18 10:34:22 +02:00

1139 lines
42 KiB
Go

// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"fmt"
"math"
"slices"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
)
type chunkInterval struct {
// because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output.
// This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk
ID int
mint int64
maxt int64
}
type expChunk struct {
c chunkInterval
m []chunkInterval
}
// permutateChunkIntervals returns all possible orders of the given chunkIntervals.
func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval {
if left == right {
inCopy := make([]chunkInterval, len(in))
copy(inCopy, in)
return append(out, inCopy)
}
for i := left; i <= right; i++ {
in[left], in[i] = in[i], in[left]
out = permutateChunkIntervals(in, out, left+1, right)
in[left], in[i] = in[i], in[left]
}
return out
}
// TestOOOHeadIndexReader_Series tests that the Series method works as expected.
// However it does so by creating chunks and memory mapping them unlike other
// tests of the head where samples are appended and we let the head memory map.
// We do this because the ingestion path and the appender for out of order
// samples are not ready yet.
func TestOOOHeadIndexReader_Series(t *testing.T) {
tests := []struct {
name string
queryMinT int64
queryMaxT int64
inputChunkIntervals []chunkInterval
expChunks []expChunk
}{
{
name: "Empty result and no error when head is empty",
queryMinT: 0,
queryMaxT: 100,
expChunks: nil,
},
{
name: "If query interval is bigger than the existing chunks nothing is returned",
queryMinT: 500,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 400},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------]
// Chunk 0 [-----------------------------------------------------------]
expChunks: nil,
},
{
name: "If query interval is smaller than the existing chunks nothing is returned",
queryMinT: 100,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 500, 700},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [-----------------------------------------------------------]
// Chunk 0: [---------------------------------------]
expChunks: nil,
},
{
name: "If query interval exceeds the existing chunk, it is returned",
queryMinT: 100,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 150, 350},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [-----------------------------------------------------------]
// Chunk 0: [---------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 150, 350}},
},
},
{
name: "If chunk exceeds the query interval, it is returned",
queryMinT: 150,
queryMaxT: 350,
inputChunkIntervals: []chunkInterval{
{0, 100, 400},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval: [---------------------------------------]
// Chunk 0: [-----------------------------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 400}},
},
},
{
name: "Pairwise overlaps should return the references of the first of each pair",
queryMinT: 0,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 200},
{1, 500, 600},
{2, 150, 250},
{3, 550, 650},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [-------------------]
// Chunk 1: [-------------------]
// Chunk 2: [-------------------]
// Chunk 3: [-------------------]
// Output Graphically [-----------------------------] [-----------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}},
{c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}},
},
},
{
name: "If all chunks overlap, single big chunk is returned",
queryMinT: 0,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 200},
{1, 200, 300},
{2, 300, 400},
{3, 400, 500},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [-------------------]
// Chunk 1: [-------------------]
// Chunk 2: [-------------------]
// Chunk 3: [------------------]
// Output Graphically [------------------------------------------------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 500}, m: []chunkInterval{{0, 100, 200}, {1, 200, 300}, {2, 300, 400}, {3, 400, 500}}},
},
},
{
name: "If no chunks overlap, all chunks are returned",
queryMinT: 0,
queryMaxT: 700,
inputChunkIntervals: []chunkInterval{
{0, 100, 199},
{1, 200, 299},
{2, 300, 399},
{3, 400, 499},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [------------------]
// Chunk 1: [------------------]
// Chunk 2: [------------------]
// Chunk 3: [------------------]
// Output Graphically [------------------][------------------][------------------][------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 199}},
{c: chunkInterval{1, 200, 299}},
{c: chunkInterval{2, 300, 399}},
{c: chunkInterval{3, 400, 499}},
},
},
{
name: "Triplet with pairwise overlaps, query range covers all, and distractor extra chunk",
queryMinT: 0,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 100, 200},
{1, 150, 300},
{2, 250, 350},
{3, 450, 550},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [--------------------------------------------------------------------]
// Chunk 0: [------------------]
// Chunk 1: [-----------------------------]
// Chunk 2: [------------------]
// Chunk 3: [------------------]
// Output Graphically [-----------------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 200}, {1, 150, 300}, {2, 250, 350}}},
},
},
{
name: "Query interval partially overlaps some chunks",
queryMinT: 100,
queryMaxT: 400,
inputChunkIntervals: []chunkInterval{
{0, 250, 500},
{1, 0, 200},
{2, 150, 300},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
// Query Interval [------------------------------------------------------------]
// Chunk 0: [-------------------------------------------------]
// Chunk 1: [-----------------------------]
// Chunk 2: [------------------------------]
// Output Graphically [-----------------------------------------------------------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{1, 0, 500}, m: []chunkInterval{{1, 0, 200}, {2, 150, 300}, {0, 250, 500}}},
},
},
{
name: "A full overlap pair and disjointed triplet",
queryMinT: 0,
queryMaxT: 900,
inputChunkIntervals: []chunkInterval{
{0, 100, 300},
{1, 770, 850},
{2, 150, 250},
{3, 650, 750},
{4, 600, 800},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 750 800 850
// Query Interval [---------------------------------------------------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [---------------------------------------]
// Chunk 1: [--------------]
// Chunk 2: [-------------------]
// Chunk 3: [-------------------]
// Chunk 4: [---------------------------------------]
// Output Graphically [---------------------------------------] [------------------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}},
{c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
},
},
{
name: "Query range covers 3 disjoint chunks",
queryMinT: 0,
queryMaxT: 650,
inputChunkIntervals: []chunkInterval{
{0, 100, 150},
{1, 300, 350},
{2, 200, 250},
},
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 750 800 850
// Query Interval [----------------------------------------------------------------------------------------------------------------------]
// Chunk 0: [-------]
// Chunk 1: [----------]
// Chunk 2: [--------]
// Output Graphically [-------] [--------] [----------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 150}},
{c: chunkInterval{2, 200, 250}},
{c: chunkInterval{1, 300, 350}},
},
},
}
s1Lset := labels.FromStrings("foo", "bar")
s1ID := uint64(1)
for _, tc := range tests {
var permutations [][]chunkInterval
if len(tc.inputChunkIntervals) == 0 {
// handle special case
permutations = [][]chunkInterval{
nil,
}
} else {
permutations = permutateChunkIntervals(tc.inputChunkIntervals, nil, 0, len(tc.inputChunkIntervals)-1)
}
for perm, intervals := range permutations {
for _, headChunk := range []bool{false, true} {
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
defer func() {
require.NoError(t, h.Close())
}()
require.NoError(t, h.Init(0))
s1, _, _ := h.getOrCreate(s1ID, s1Lset)
s1.ooo = &memSeriesOOOFields{}
// define our expected chunks, by looking at the expected ChunkIntervals and setting...
// Ref to whatever Ref the chunk has, that we refer to by ID
findID := func(id int) chunks.ChunkRef {
for ref, c := range intervals {
if c.ID == id {
return chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
}
}
return 0
}
var expChunks []chunks.Meta
for _, e := range tc.expChunks {
var chunk chunkenc.Chunk
if len(e.m) > 0 {
mm := &multiMeta{}
for _, x := range e.m {
meta := chunks.Meta{
MinTime: x.mint,
MaxTime: x.maxt,
Ref: findID(x.ID),
}
mm.metas = append(mm.metas, meta)
}
chunk = mm
}
meta := chunks.Meta{
Chunk: chunk,
MinTime: e.c.mint,
MaxTime: e.c.maxt,
Ref: findID(e.c.ID),
}
expChunks = append(expChunks, meta)
}
if headChunk && len(intervals) > 0 {
// Put the last interval in the head chunk
s1.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(),
minTime: intervals[len(intervals)-1].mint,
maxTime: intervals[len(intervals)-1].maxt,
}
intervals = intervals[:len(intervals)-1]
}
for _, ic := range intervals {
s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{
minTime: ic.mint,
maxTime: ic.maxt,
})
}
ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err := ir.Series(storage.SeriesRef(s1ID), &b, &chks)
require.NoError(t, err)
require.Equal(t, s1Lset, b.Labels())
require.Equal(t, expChunks, chks)
err = ir.Series(storage.SeriesRef(s1ID+1), &b, &chks)
require.Equal(t, storage.ErrNotFound, err)
})
}
}
}
}
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_LabelValues(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
head.opts.EnableOOONativeHistograms.Store(true)
t.Cleanup(func() { require.NoError(t, head.Close()) })
ctx := context.Background()
app := head.Appender(context.Background())
// Add in-order samples
_, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1))
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2))
require.NoError(t, err)
// Add ooo samples for those series
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1))
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2))
require.NoError(t, err)
require.NoError(t, app.Commit())
cases := []struct {
name string
queryMinT int64
queryMaxT int64
expValues1 []string
expValues2 []string
expValues3 []string
expValues4 []string
}{
{
name: "LabelValues calls when ooo head has max query range",
queryMinT: math.MinInt64,
queryMaxT: math.MaxInt64,
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
{
name: "LabelValues calls with ooo head query range not overlapping in-order data",
queryMinT: 90,
queryMaxT: 90,
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
{
name: "LabelValues calls with ooo head query range not overlapping out-of-order data",
queryMinT: 100,
queryMaxT: 100,
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues1, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
values, err = oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues2, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
values, err = oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues3, values)
values, err = oh.LabelValues(ctx, "foo")
sort.Strings(values)
require.NoError(t, err)
require.Equal(t, tc.expValues4, values)
})
}
}
// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.
// It does so by appending out of order samples to the db and then initializing
// an OOOHeadChunkReader to read chunks from it.
func TestOOOHeadChunkReader_Chunk(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_Chunk(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
opts.EnableNativeHistograms = true
opts.EnableOOONativeHistograms = true
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
})
require.Nil(t, iterable)
require.Equal(t, err, fmt.Errorf("not found"))
require.Nil(t, c)
})
tests := []struct {
name string
queryMinT int64
queryMaxT int64
firstInOrderSampleAt int64
inputSamples []testValue
expSingleChunks bool
expChunkError bool
expChunksSamples []chunks.SampleSlice
}{
{
name: "Getting the head when there are no overlapping chunks returns just the samples in the head",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
{Ts: minutes(30), V: 0},
{Ts: minutes(40), V: 0},
},
expChunkError: false,
expSingleChunks: true,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0: Current Head [--------] (With 2 samples)
// Output Graphically [--------] (With 2 samples)
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(40), 0),
},
},
},
{
name: "Getting the head chunk when there are overlapping chunks returns all combined",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [---] (With 5 samples)
// Chunk 1: Current Head [-----------------] (With 2 samples)
// Output Graphically [-----------------] (With 7 samples)
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(41), 0),
scenario.sampleFunc(minutes(42), 0),
scenario.sampleFunc(minutes(43), 0),
scenario.sampleFunc(minutes(44), 0),
scenario.sampleFunc(minutes(45), 0),
scenario.sampleFunc(minutes(50), 1),
},
},
},
{
name: "Two windows of overlapping chunks get properly converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(12), V: 0},
{Ts: minutes(14), V: 0},
{Ts: minutes(16), V: 0},
{Ts: minutes(20), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(22), V: 1},
{Ts: minutes(24), V: 1},
{Ts: minutes(26), V: 1},
{Ts: minutes(29), V: 1},
// Chunk 3
{Ts: minutes(30), V: 2},
{Ts: minutes(32), V: 2},
{Ts: minutes(34), V: 2},
{Ts: minutes(36), V: 2},
{Ts: minutes(40), V: 2},
// Head
{Ts: minutes(40), V: 3},
{Ts: minutes(50), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [--------]
// Chunk 1 [-------]
// Chunk 2 [--------]
// Chunk 3: Current Head [--------]
// Output Graphically [----------------][-----------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(12), 0),
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(29), 1),
},
{
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(50), 3),
},
},
},
{
name: "Two windows of overlapping chunks in descending order get properly converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(40), V: 0},
{Ts: minutes(42), V: 0},
{Ts: minutes(44), V: 0},
{Ts: minutes(46), V: 0},
{Ts: minutes(50), V: 0},
// Chunk 1
{Ts: minutes(30), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(34), V: 1},
{Ts: minutes(36), V: 1},
{Ts: minutes(40), V: 1},
// Chunk 3
{Ts: minutes(20), V: 2},
{Ts: minutes(22), V: 2},
{Ts: minutes(24), V: 2},
{Ts: minutes(26), V: 2},
{Ts: minutes(29), V: 2},
// Head
{Ts: minutes(10), V: 3},
{Ts: minutes(20), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [--------]
// Chunk 1 [--------]
// Chunk 2 [-------]
// Chunk 3: Current Head [--------]
// Output Graphically [----------------][-----------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 3),
scenario.sampleFunc(minutes(20), 2),
scenario.sampleFunc(minutes(22), 2),
scenario.sampleFunc(minutes(24), 2),
scenario.sampleFunc(minutes(26), 2),
scenario.sampleFunc(minutes(29), 2),
},
{
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 1),
scenario.sampleFunc(minutes(34), 1),
scenario.sampleFunc(minutes(36), 1),
scenario.sampleFunc(minutes(40), 0),
scenario.sampleFunc(minutes(42), 0),
scenario.sampleFunc(minutes(44), 0),
scenario.sampleFunc(minutes(46), 0),
scenario.sampleFunc(minutes(50), 0),
},
},
},
{
name: "If chunks are not overlapped they are not converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(12), V: 0},
{Ts: minutes(14), V: 0},
{Ts: minutes(16), V: 0},
{Ts: minutes(18), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(22), V: 1},
{Ts: minutes(24), V: 1},
{Ts: minutes(26), V: 1},
{Ts: minutes(28), V: 1},
// Chunk 3
{Ts: minutes(30), V: 2},
{Ts: minutes(32), V: 2},
{Ts: minutes(34), V: 2},
{Ts: minutes(36), V: 2},
{Ts: minutes(38), V: 2},
// Head
{Ts: minutes(40), V: 3},
{Ts: minutes(42), V: 3},
},
expChunkError: false,
expSingleChunks: true,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [-------]
// Chunk 1 [-------]
// Chunk 2 [-------]
// Chunk 3: Current Head [-------]
// Output Graphically [-------][-------][-------][--------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(12), 0),
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(18), 0),
},
{
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(28), 1),
},
{
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(38), 2),
},
{
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(42), 3),
},
},
},
{
name: "Triplet of chunks overlapping returns a single merged chunk",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(15), V: 0},
{Ts: minutes(20), V: 0},
{Ts: minutes(25), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(25), V: 1},
{Ts: minutes(30), V: 1},
{Ts: minutes(35), V: 1},
{Ts: minutes(42), V: 1},
// Chunk 2 Head
{Ts: minutes(32), V: 2},
{Ts: minutes(50), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [-----------------]
// Chunk 1 [--------------------]
// Chunk 2 Current Head [--------------]
// Output Graphically [-----------------------------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(15), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(35), 1),
scenario.sampleFunc(minutes(42), 1),
scenario.sampleFunc(minutes(50), 2),
},
},
},
{
name: "Query interval partially overlaps with a triplet of chunks but still returns a single merged chunk",
queryMinT: minutes(12),
queryMaxT: minutes(33),
firstInOrderSampleAt: minutes(120),
inputSamples: []testValue{
// Chunk 0
{Ts: minutes(10), V: 0},
{Ts: minutes(15), V: 0},
{Ts: minutes(20), V: 0},
{Ts: minutes(25), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1
{Ts: minutes(20), V: 1},
{Ts: minutes(25), V: 1},
{Ts: minutes(30), V: 1},
{Ts: minutes(35), V: 1},
{Ts: minutes(42), V: 1},
// Chunk 2 Head
{Ts: minutes(32), V: 2},
{Ts: minutes(50), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------]
// Chunk 0 [-----------------]
// Chunk 1 [--------------------]
// Chunk 2 Current Head [--------------]
// Output Graphically [-----------------------------------]
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(15), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(35), 1),
scenario.sampleFunc(minutes(42), 1),
scenario.sampleFunc(minutes(50), 2),
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
app := db.Appender(context.Background())
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
require.NoError(t, err)
require.NoError(t, app.Commit())
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.inputSamples {
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
require.NoError(t, err)
var it chunkenc.Iterator
if tc.expSingleChunks {
it = c.Iterator(nil)
} else {
require.Nil(t, c)
it = iterable.Iterator(nil)
}
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}
}
// TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests
// that if a query comes and performs a Series() call followed by a Chunks() call
// the response is consistent with the data seen by Series() even if the OOO
// head receives more samples before Chunks() is called.
// An example:
// - Response A comes from: Series() then Chunk()
// - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
// - A == B
func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
opts.EnableNativeHistograms = true
opts.EnableOOONativeHistograms = true
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
tests := []struct {
name string
queryMinT int64
queryMaxT int64
firstInOrderSampleAt int64
initialSamples []testValue
samplesAfterSeriesCall []testValue
expChunkError bool
expChunksSamples []chunks.SampleSlice
}{
{
name: "Current head gets old, new and in between sample after Series call, they all should be omitted from the result",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
initialSamples: []testValue{
// Chunk 0
{Ts: minutes(20), V: 0},
{Ts: minutes(22), V: 0},
{Ts: minutes(24), V: 0},
{Ts: minutes(26), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1 Head
{Ts: minutes(25), V: 1},
{Ts: minutes(35), V: 1},
},
samplesAfterSeriesCall: []testValue{
{Ts: minutes(10), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(50), V: 1},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [-----------------------------------]
// Chunk 0: [--------] (5 samples)
// Chunk 1: Current Head [-------] (2 samples)
// New samples added after Series()
// Chunk 1: Current Head [-----------------------------------] (5 samples)
// Output Graphically [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept)
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(20), 0),
scenario.sampleFunc(minutes(22), 0),
scenario.sampleFunc(minutes(24), 0),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(26), 0),
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(35), 1),
},
},
},
{
name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
initialSamples: []testValue{
// Chunk 0
{Ts: minutes(20), V: 0},
{Ts: minutes(22), V: 0},
{Ts: minutes(24), V: 0},
{Ts: minutes(26), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1 Head
{Ts: minutes(25), V: 1},
{Ts: minutes(35), V: 1},
},
samplesAfterSeriesCall: []testValue{
{Ts: minutes(10), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(50), V: 1},
// Chunk 1 gets mmapped and Chunk 2, the new head is born
{Ts: minutes(25), V: 2},
{Ts: minutes(31), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [-----------------------------------]
// Chunk 0: [--------] (5 samples)
// Chunk 1: Current Head [-------] (2 samples)
// New samples added after Series()
// Chunk 1 (mmapped) [-------------------------] (5 samples)
// Chunk 2: Current Head [-----------] (2 samples)
// Output Graphically [------------] (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1
expChunksSamples: []chunks.SampleSlice{
{
scenario.sampleFunc(minutes(20), 0),
scenario.sampleFunc(minutes(22), 0),
scenario.sampleFunc(minutes(24), 0),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(26), 0),
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(35), 1),
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
app := db.Appender(context.Background())
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
require.NoError(t, err)
require.NoError(t, app.Commit())
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.initialSamples {
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
// Now we keep receiving ooo samples
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.samplesAfterSeriesCall {
_, _, err = scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
require.NoError(t, err)
require.Nil(t, c)
it := iterable.Iterator(nil)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}
}
// TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort
// by chunk meta MinTime and in case of same references by the lower reference.
func TestSortMetaByMinTimeAndMinRef(t *testing.T) {
tests := []struct {
name string
inputMetas []chunks.Meta
expMetas []chunks.Meta
}{
{
name: "chunks are ordered by min time",
inputMetas: []chunks.Meta{
{
Ref: 0,
MinTime: 0,
},
{
Ref: 1,
MinTime: 1,
},
},
expMetas: []chunks.Meta{
{
Ref: 0,
MinTime: 0,
},
{
Ref: 1,
MinTime: 1,
},
},
},
{
name: "if same mintime, lower reference goes first",
inputMetas: []chunks.Meta{
{
Ref: 10,
MinTime: 0,
},
{
Ref: 5,
MinTime: 0,
},
},
expMetas: []chunks.Meta{
{
Ref: 5,
MinTime: 0,
},
{
Ref: 10,
MinTime: 0,
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
slices.SortFunc(tc.inputMetas, lessByMinTimeAndMinRef)
require.Equal(t, tc.expMetas, tc.inputMetas)
})
}
}
func newTestDBWithOpts(t *testing.T, opts *Options) *DB {
dir := t.TempDir()
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
return db
}