// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
	"context"
	"fmt"
	"math"
	"slices"
	"sort"
	"testing"
	"time"

	"github.com/stretchr/testify/require"

	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
	"github.com/prometheus/prometheus/tsdb/chunks"
	"github.com/prometheus/prometheus/tsdb/wlog"
)

type chunkInterval struct {
	// because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output.
	// This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk
	ID   int
	mint int64
	maxt int64
}

type expChunk struct {
	c chunkInterval
	m []chunkInterval
}

// permutateChunkIntervals returns all possible orders of the given chunkIntervals.
func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval {
	if left == right {
		inCopy := make([]chunkInterval, len(in))
		copy(inCopy, in)
		return append(out, inCopy)
	}
	for i := left; i <= right; i++ {
		in[left], in[i] = in[i], in[left]
		out = permutateChunkIntervals(in, out, left+1, right)
		in[left], in[i] = in[i], in[left]
	}
	return out
}

// TestOOOHeadIndexReader_Series tests that the Series method works as expected.
// However it does so by creating chunks and memory mapping them unlike other
// tests of the head where samples are appended and we let the head memory map.
// We do this because the ingestion path and the appender for out of order
// samples are not ready yet.
func TestOOOHeadIndexReader_Series(t *testing.T) {
	tests := []struct {
		name                string
		queryMinT           int64
		queryMaxT           int64
		inputChunkIntervals []chunkInterval
		expChunks           []expChunk
	}{
		{
			name:      "Empty result and no error when head is empty",
			queryMinT: 0,
			queryMaxT: 100,
			expChunks: nil,
		},
		{
			name:      "If query interval is bigger than the existing chunks nothing is returned",
			queryMinT: 500,
			queryMaxT: 700,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 400},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval                                                                                                  [---------------------------------------]
			// Chunk 0                         [-----------------------------------------------------------]
			expChunks: nil,
		},
		{
			name:      "If query interval is smaller than the existing chunks nothing is returned",
			queryMinT: 100,
			queryMaxT: 400,
			inputChunkIntervals: []chunkInterval{
				{0, 500, 700},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval                [-----------------------------------------------------------]
			// Chunk 0:                                                                                                        [---------------------------------------]
			expChunks: nil,
		},
		{
			name:      "If query interval exceeds the existing chunk, it is returned",
			queryMinT: 100,
			queryMaxT: 400,
			inputChunkIntervals: []chunkInterval{
				{0, 150, 350},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval                [-----------------------------------------------------------]
			// Chunk 0:                                 [---------------------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 150, 350}},
			},
		},
		{
			name:      "If chunk exceeds the query interval, it is returned",
			queryMinT: 150,
			queryMaxT: 350,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 400},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval:                          [---------------------------------------]
			// Chunk 0:                       [-----------------------------------------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 400}},
			},
		},
		{
			name:      "Pairwise overlaps should return the references of the first of each pair",
			queryMinT: 0,
			queryMaxT: 700,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 200},
				{1, 500, 600},
				{2, 150, 250},
				{3, 550, 650},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------]
			// Chunk 0:                        [-------------------]
			// Chunk 1:                                                                                                        [-------------------]
			// Chunk 2:                                  [-------------------]
			// Chunk 3:                                                                                                                  [-------------------]
			// Output Graphically              [-----------------------------]                                                 [-----------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}},
				{c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}},
			},
		},
		{
			name:      "If all chunks overlap, single big chunk is returned",
			queryMinT: 0,
			queryMaxT: 700,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 200},
				{1, 200, 300},
				{2, 300, 400},
				{3, 400, 500},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------]
			// Chunk 0:                        [-------------------]
			// Chunk 1:                                            [-------------------]
			// Chunk 2:                                                                [-------------------]
			// Chunk 3:                                                                                    [------------------]
			// Output Graphically              [------------------------------------------------------------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 500}, m: []chunkInterval{{0, 100, 200}, {1, 200, 300}, {2, 300, 400}, {3, 400, 500}}},
			},
		},
		{
			name:      "If no chunks overlap, all chunks are returned",
			queryMinT: 0,
			queryMaxT: 700,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 199},
				{1, 200, 299},
				{2, 300, 399},
				{3, 400, 499},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------]
			// Chunk 0:                        [------------------]
			// Chunk 1:                                            [------------------]
			// Chunk 2:                                                                [------------------]
			// Chunk 3:                                                                                    [------------------]
			// Output Graphically              [------------------][------------------][------------------][------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 199}},
				{c: chunkInterval{1, 200, 299}},
				{c: chunkInterval{2, 300, 399}},
				{c: chunkInterval{3, 400, 499}},
			},
		},
		{
			name:      "Triplet with pairwise overlaps, query range covers all, and distractor extra chunk",
			queryMinT: 0,
			queryMaxT: 400,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 200},
				{1, 150, 300},
				{2, 250, 350},
				{3, 450, 550},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval        [--------------------------------------------------------------------]
			// Chunk 0:                        [------------------]
			// Chunk 1:                                 [-----------------------------]
			// Chunk 2:                                                     [------------------]
			// Chunk 3:                                                                                             [------------------]
			// Output Graphically              [-----------------------------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 200}, {1, 150, 300}, {2, 250, 350}}},
			},
		},
		{
			name:      "Query interval partially overlaps some chunks",
			queryMinT: 100,
			queryMaxT: 400,
			inputChunkIntervals: []chunkInterval{
				{0, 250, 500},
				{1, 0, 200},
				{2, 150, 300},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
			// Query Interval                [------------------------------------------------------------]
			// Chunk 0:                                                     [-------------------------------------------------]
			// Chunk 1:             [-----------------------------]
			// Chunk 2:                                [------------------------------]
			// Output Graphically   [-----------------------------------------------------------------------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{1, 0, 500}, m: []chunkInterval{{1, 0, 200}, {2, 150, 300}, {0, 250, 500}}},
			},
		},
		{
			name:      "A full overlap pair and disjointed triplet",
			queryMinT: 0,
			queryMaxT: 900,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 300},
				{1, 770, 850},
				{2, 150, 250},
				{3, 650, 750},
				{4, 600, 800},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700       750       800       850
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------------------------------------]
			// Chunk 0:                        [---------------------------------------]
			// Chunk 1:                                                                                                                                                               [--------------]
			// Chunk 2:                                  [-------------------]
			// Chunk 3:                                                                                                                                      [-------------------]
			// Chunk 4:                                                                                                                             [---------------------------------------]
			// Output Graphically              [---------------------------------------]                                                            [------------------------------------------------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}},
				{c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
			},
		},
		{
			name:      "Query range covers 3 disjoint chunks",
			queryMinT: 0,
			queryMaxT: 650,
			inputChunkIntervals: []chunkInterval{
				{0, 100, 150},
				{1, 300, 350},
				{2, 200, 250},
			},
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700       750       800       850
			// Query Interval        [----------------------------------------------------------------------------------------------------------------------]
			// Chunk 0:                        [-------]
			// Chunk 1:                                                              [----------]
			// Chunk 2:                                           [--------]
			// Output Graphically              [-------]          [--------]         [----------]
			expChunks: []expChunk{
				{c: chunkInterval{0, 100, 150}},
				{c: chunkInterval{2, 200, 250}},
				{c: chunkInterval{1, 300, 350}},
			},
		},
	}

	s1Lset := labels.FromStrings("foo", "bar")
	s1ID := uint64(1)

	for _, tc := range tests {
		var permutations [][]chunkInterval
		if len(tc.inputChunkIntervals) == 0 {
			// handle special case
			permutations = [][]chunkInterval{
				nil,
			}
		} else {
			permutations = permutateChunkIntervals(tc.inputChunkIntervals, nil, 0, len(tc.inputChunkIntervals)-1)
		}
		for perm, intervals := range permutations {
			for _, headChunk := range []bool{false, true} {
				t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
					h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
					defer func() {
						require.NoError(t, h.Close())
					}()
					require.NoError(t, h.Init(0))

					s1, _, _ := h.getOrCreate(s1ID, s1Lset)
					s1.ooo = &memSeriesOOOFields{}

					// define our expected chunks, by looking at the expected ChunkIntervals and setting...
					// Ref to whatever Ref the chunk has, that we refer to by ID
					findID := func(id int) chunks.ChunkRef {
						for ref, c := range intervals {
							if c.ID == id {
								return chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
							}
						}
						return 0
					}
					var expChunks []chunks.Meta
					for _, e := range tc.expChunks {
						var chunk chunkenc.Chunk
						if len(e.m) > 0 {
							mm := &multiMeta{}
							for _, x := range e.m {
								meta := chunks.Meta{
									MinTime: x.mint,
									MaxTime: x.maxt,
									Ref:     findID(x.ID),
								}
								mm.metas = append(mm.metas, meta)
							}
							chunk = mm
						}
						meta := chunks.Meta{
							Chunk:   chunk,
							MinTime: e.c.mint,
							MaxTime: e.c.maxt,
							Ref:     findID(e.c.ID),
						}
						expChunks = append(expChunks, meta)
					}

					if headChunk && len(intervals) > 0 {
						// Put the last interval in the head chunk
						s1.ooo.oooHeadChunk = &oooHeadChunk{
							chunk:   NewOOOChunk(),
							minTime: intervals[len(intervals)-1].mint,
							maxTime: intervals[len(intervals)-1].maxt,
						}
						intervals = intervals[:len(intervals)-1]
					}

					for _, ic := range intervals {
						s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{
							minTime: ic.mint,
							maxTime: ic.maxt,
						})
					}

					ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)

					var chks []chunks.Meta
					var b labels.ScratchBuilder
					err := ir.Series(storage.SeriesRef(s1ID), &b, &chks)
					require.NoError(t, err)
					require.Equal(t, s1Lset, b.Labels())
					require.Equal(t, expChunks, chks)

					err = ir.Series(storage.SeriesRef(s1ID+1), &b, &chks)
					require.Equal(t, storage.ErrNotFound, err)
				})
			}
		}
	}
}

func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
	for name, scenario := range sampleTypeScenarios {
		t.Run(name, func(t *testing.T) {
			testOOOHeadChunkReader_LabelValues(t, scenario)
		})
	}
}

//nolint:revive // unexported-return.
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
	chunkRange := int64(2000)
	head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
	head.opts.EnableOOONativeHistograms.Store(true)
	t.Cleanup(func() { require.NoError(t, head.Close()) })

	ctx := context.Background()

	app := head.Appender(context.Background())

	// Add in-order samples
	_, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1))
	require.NoError(t, err)
	_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2))
	require.NoError(t, err)

	// Add ooo samples for those series
	_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1))
	require.NoError(t, err)
	_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2))
	require.NoError(t, err)

	require.NoError(t, app.Commit())

	cases := []struct {
		name       string
		queryMinT  int64
		queryMaxT  int64
		expValues1 []string
		expValues2 []string
		expValues3 []string
		expValues4 []string
	}{
		{
			name:       "LabelValues calls when ooo head has max query range",
			queryMinT:  math.MinInt64,
			queryMaxT:  math.MaxInt64,
			expValues1: []string{"bar1"},
			expValues2: nil,
			expValues3: []string{"bar1", "bar2"},
			expValues4: []string{"bar1", "bar2"},
		},
		{
			name:       "LabelValues calls with ooo head query range not overlapping in-order data",
			queryMinT:  90,
			queryMaxT:  90,
			expValues1: []string{"bar1"},
			expValues2: nil,
			expValues3: []string{"bar1", "bar2"},
			expValues4: []string{"bar1", "bar2"},
		},
		{
			name:       "LabelValues calls with ooo head query range not overlapping out-of-order data",
			queryMinT:  100,
			queryMaxT:  100,
			expValues1: []string{"bar1"},
			expValues2: nil,
			expValues3: []string{"bar1", "bar2"},
			expValues4: []string{"bar1", "bar2"},
		},
	}

	for _, tc := range cases {
		t.Run(tc.name, func(t *testing.T) {
			// We first want to test using a head index reader that covers the biggest query interval
			oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
			matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
			values, err := oh.LabelValues(ctx, "foo", matchers...)
			sort.Strings(values)
			require.NoError(t, err)
			require.Equal(t, tc.expValues1, values)

			matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
			values, err = oh.LabelValues(ctx, "foo", matchers...)
			sort.Strings(values)
			require.NoError(t, err)
			require.Equal(t, tc.expValues2, values)

			matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
			values, err = oh.LabelValues(ctx, "foo", matchers...)
			sort.Strings(values)
			require.NoError(t, err)
			require.Equal(t, tc.expValues3, values)

			values, err = oh.LabelValues(ctx, "foo")
			sort.Strings(values)
			require.NoError(t, err)
			require.Equal(t, tc.expValues4, values)
		})
	}
}

// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.
// It does so by appending out of order samples to the db and then initializing
// an OOOHeadChunkReader to read chunks from it.
func TestOOOHeadChunkReader_Chunk(t *testing.T) {
	for name, scenario := range sampleTypeScenarios {
		t.Run(name, func(t *testing.T) {
			testOOOHeadChunkReader_Chunk(t, scenario)
		})
	}
}

//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
	opts := DefaultOptions()
	opts.OutOfOrderCapMax = 5
	opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
	opts.EnableNativeHistograms = true
	opts.EnableOOONativeHistograms = true

	s1 := labels.FromStrings("l", "v1")
	minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }

	t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
		db := newTestDBWithOpts(t, opts)

		cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
		defer cr.Close()
		c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
			Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
		})
		require.Nil(t, iterable)
		require.Equal(t, err, fmt.Errorf("not found"))
		require.Nil(t, c)
	})

	tests := []struct {
		name                 string
		queryMinT            int64
		queryMaxT            int64
		firstInOrderSampleAt int64
		inputSamples         []testValue
		expSingleChunks      bool
		expChunkError        bool
		expChunksSamples     []chunks.SampleSlice
	}{
		{
			name:                 "Getting the head when there are no overlapping chunks returns just the samples in the head",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			inputSamples: []testValue{
				{Ts: minutes(30), V: 0},
				{Ts: minutes(40), V: 0},
			},
			expChunkError:   false,
			expSingleChunks: true,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [------------------------------------------------------------------------------------------]
			// Chunk 0: Current Head                              [--------] (With 2 samples)
			// Output Graphically                                 [--------] (With 2 samples)
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(30), 0),
					scenario.sampleFunc(minutes(40), 0),
				},
			},
		},
		{
			name:                 "Getting the head chunk when there are overlapping chunks returns all combined",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			inputSamples:         []testValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}},
			expChunkError:        false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [------------------------------------------------------------------------------------------]
			// Chunk 0                                                     [---] (With 5 samples)
			// Chunk 1: Current Head                              [-----------------] (With 2 samples)
			// Output Graphically                                 [-----------------] (With 7 samples)
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(30), 1),
					scenario.sampleFunc(minutes(41), 0),
					scenario.sampleFunc(minutes(42), 0),
					scenario.sampleFunc(minutes(43), 0),
					scenario.sampleFunc(minutes(44), 0),
					scenario.sampleFunc(minutes(45), 0),
					scenario.sampleFunc(minutes(50), 1),
				},
			},
		},
		{
			name:                 "Two windows of overlapping chunks get properly converged",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			inputSamples: []testValue{
				// Chunk 0
				{Ts: minutes(10), V: 0},
				{Ts: minutes(12), V: 0},
				{Ts: minutes(14), V: 0},
				{Ts: minutes(16), V: 0},
				{Ts: minutes(20), V: 0},
				// Chunk 1
				{Ts: minutes(20), V: 1},
				{Ts: minutes(22), V: 1},
				{Ts: minutes(24), V: 1},
				{Ts: minutes(26), V: 1},
				{Ts: minutes(29), V: 1},
				// Chunk 3
				{Ts: minutes(30), V: 2},
				{Ts: minutes(32), V: 2},
				{Ts: minutes(34), V: 2},
				{Ts: minutes(36), V: 2},
				{Ts: minutes(40), V: 2},
				// Head
				{Ts: minutes(40), V: 3},
				{Ts: minutes(50), V: 3},
			},
			expChunkError: false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [------------------------------------------------------------------------------------------]
			// Chunk 0                          [--------]
			// Chunk 1                                   [-------]
			// Chunk 2                                            [--------]
			// Chunk 3: Current Head                                       [--------]
			// Output Graphically               [----------------][-----------------]
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(10), 0),
					scenario.sampleFunc(minutes(12), 0),
					scenario.sampleFunc(minutes(14), 0),
					scenario.sampleFunc(minutes(16), 0),
					scenario.sampleFunc(minutes(20), 1),
					scenario.sampleFunc(minutes(22), 1),
					scenario.sampleFunc(minutes(24), 1),
					scenario.sampleFunc(minutes(26), 1),
					scenario.sampleFunc(minutes(29), 1),
				},
				{
					scenario.sampleFunc(minutes(30), 2),
					scenario.sampleFunc(minutes(32), 2),
					scenario.sampleFunc(minutes(34), 2),
					scenario.sampleFunc(minutes(36), 2),
					scenario.sampleFunc(minutes(40), 3),
					scenario.sampleFunc(minutes(50), 3),
				},
			},
		},
		{
			name:                 "Two windows of overlapping chunks in descending order get properly converged",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			inputSamples: []testValue{
				// Chunk 0
				{Ts: minutes(40), V: 0},
				{Ts: minutes(42), V: 0},
				{Ts: minutes(44), V: 0},
				{Ts: minutes(46), V: 0},
				{Ts: minutes(50), V: 0},
				// Chunk 1
				{Ts: minutes(30), V: 1},
				{Ts: minutes(32), V: 1},
				{Ts: minutes(34), V: 1},
				{Ts: minutes(36), V: 1},
				{Ts: minutes(40), V: 1},
				// Chunk 3
				{Ts: minutes(20), V: 2},
				{Ts: minutes(22), V: 2},
				{Ts: minutes(24), V: 2},
				{Ts: minutes(26), V: 2},
				{Ts: minutes(29), V: 2},
				// Head
				{Ts: minutes(10), V: 3},
				{Ts: minutes(20), V: 3},
			},
			expChunkError: false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [------------------------------------------------------------------------------------------]
			// Chunk 0                                                     [--------]
			// Chunk 1                                            [--------]
			// Chunk 2                                   [-------]
			// Chunk 3: Current Head            [--------]
			// Output Graphically               [----------------][-----------------]
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(10), 3),
					scenario.sampleFunc(minutes(20), 2),
					scenario.sampleFunc(minutes(22), 2),
					scenario.sampleFunc(minutes(24), 2),
					scenario.sampleFunc(minutes(26), 2),
					scenario.sampleFunc(minutes(29), 2),
				},
				{
					scenario.sampleFunc(minutes(30), 1),
					scenario.sampleFunc(minutes(32), 1),
					scenario.sampleFunc(minutes(34), 1),
					scenario.sampleFunc(minutes(36), 1),
					scenario.sampleFunc(minutes(40), 0),
					scenario.sampleFunc(minutes(42), 0),
					scenario.sampleFunc(minutes(44), 0),
					scenario.sampleFunc(minutes(46), 0),
					scenario.sampleFunc(minutes(50), 0),
				},
			},
		},
		{
			name:                 "If chunks are not overlapped they are not converged",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			inputSamples: []testValue{
				// Chunk 0
				{Ts: minutes(10), V: 0},
				{Ts: minutes(12), V: 0},
				{Ts: minutes(14), V: 0},
				{Ts: minutes(16), V: 0},
				{Ts: minutes(18), V: 0},
				// Chunk 1
				{Ts: minutes(20), V: 1},
				{Ts: minutes(22), V: 1},
				{Ts: minutes(24), V: 1},
				{Ts: minutes(26), V: 1},
				{Ts: minutes(28), V: 1},
				// Chunk 3
				{Ts: minutes(30), V: 2},
				{Ts: minutes(32), V: 2},
				{Ts: minutes(34), V: 2},
				{Ts: minutes(36), V: 2},
				{Ts: minutes(38), V: 2},
				// Head
				{Ts: minutes(40), V: 3},
				{Ts: minutes(42), V: 3},
			},
			expChunkError:   false,
			expSingleChunks: true,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [------------------------------------------------------------------------------------------]
			// Chunk 0                          [-------]
			// Chunk 1                                   [-------]
			// Chunk 2                                            [-------]
			// Chunk 3: Current Head                                       [-------]
			// Output Graphically               [-------][-------][-------][--------]
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(10), 0),
					scenario.sampleFunc(minutes(12), 0),
					scenario.sampleFunc(minutes(14), 0),
					scenario.sampleFunc(minutes(16), 0),
					scenario.sampleFunc(minutes(18), 0),
				},
				{
					scenario.sampleFunc(minutes(20), 1),
					scenario.sampleFunc(minutes(22), 1),
					scenario.sampleFunc(minutes(24), 1),
					scenario.sampleFunc(minutes(26), 1),
					scenario.sampleFunc(minutes(28), 1),
				},
				{
					scenario.sampleFunc(minutes(30), 2),
					scenario.sampleFunc(minutes(32), 2),
					scenario.sampleFunc(minutes(34), 2),
					scenario.sampleFunc(minutes(36), 2),
					scenario.sampleFunc(minutes(38), 2),
				},
				{
					scenario.sampleFunc(minutes(40), 3),
					scenario.sampleFunc(minutes(42), 3),
				},
			},
		},
		{
			name:                 "Triplet of chunks overlapping returns a single merged chunk",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			inputSamples: []testValue{
				// Chunk 0
				{Ts: minutes(10), V: 0},
				{Ts: minutes(15), V: 0},
				{Ts: minutes(20), V: 0},
				{Ts: minutes(25), V: 0},
				{Ts: minutes(30), V: 0},
				// Chunk 1
				{Ts: minutes(20), V: 1},
				{Ts: minutes(25), V: 1},
				{Ts: minutes(30), V: 1},
				{Ts: minutes(35), V: 1},
				{Ts: minutes(42), V: 1},
				// Chunk 2 Head
				{Ts: minutes(32), V: 2},
				{Ts: minutes(50), V: 2},
			},
			expChunkError: false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [------------------------------------------------------------------------------------------]
			// Chunk 0                          [-----------------]
			// Chunk 1                                   [--------------------]
			// Chunk 2 Current Head                                  [--------------]
			// Output Graphically               [-----------------------------------]
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(10), 0),
					scenario.sampleFunc(minutes(15), 0),
					scenario.sampleFunc(minutes(20), 1),
					scenario.sampleFunc(minutes(25), 1),
					scenario.sampleFunc(minutes(30), 1),
					scenario.sampleFunc(minutes(32), 2),
					scenario.sampleFunc(minutes(35), 1),
					scenario.sampleFunc(minutes(42), 1),
					scenario.sampleFunc(minutes(50), 2),
				},
			},
		},
		{
			name:                 "Query interval partially overlaps with a triplet of chunks but still returns a single merged chunk",
			queryMinT:            minutes(12),
			queryMaxT:            minutes(33),
			firstInOrderSampleAt: minutes(120),
			inputSamples: []testValue{
				// Chunk 0
				{Ts: minutes(10), V: 0},
				{Ts: minutes(15), V: 0},
				{Ts: minutes(20), V: 0},
				{Ts: minutes(25), V: 0},
				{Ts: minutes(30), V: 0},
				// Chunk 1
				{Ts: minutes(20), V: 1},
				{Ts: minutes(25), V: 1},
				{Ts: minutes(30), V: 1},
				{Ts: minutes(35), V: 1},
				{Ts: minutes(42), V: 1},
				// Chunk 2 Head
				{Ts: minutes(32), V: 2},
				{Ts: minutes(50), V: 2},
			},
			expChunkError: false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval                     [------------------]
			// Chunk 0                          [-----------------]
			// Chunk 1                                   [--------------------]
			// Chunk 2 Current Head                                  [--------------]
			// Output Graphically               [-----------------------------------]
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(10), 0),
					scenario.sampleFunc(minutes(15), 0),
					scenario.sampleFunc(minutes(20), 1),
					scenario.sampleFunc(minutes(25), 1),
					scenario.sampleFunc(minutes(30), 1),
					scenario.sampleFunc(minutes(32), 2),
					scenario.sampleFunc(minutes(35), 1),
					scenario.sampleFunc(minutes(42), 1),
					scenario.sampleFunc(minutes(50), 2),
				},
			},
		},
	}

	for _, tc := range tests {
		t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
			db := newTestDBWithOpts(t, opts)

			app := db.Appender(context.Background())
			s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
			require.NoError(t, err)
			require.NoError(t, app.Commit())

			// OOO few samples for s1.
			app = db.Appender(context.Background())
			for _, s := range tc.inputSamples {
				_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
				require.NoError(t, err)
			}
			require.NoError(t, app.Commit())

			// The Series method populates the chunk metas, taking a copy of the
			// head OOO chunk if necessary. These are then used by the ChunkReader.
			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
			var chks []chunks.Meta
			var b labels.ScratchBuilder
			err = ir.Series(s1Ref, &b, &chks)
			require.NoError(t, err)
			require.Equal(t, len(tc.expChunksSamples), len(chks))

			cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
			defer cr.Close()
			for i := 0; i < len(chks); i++ {
				c, iterable, err := cr.ChunkOrIterable(chks[i])
				require.NoError(t, err)
				var it chunkenc.Iterator
				if tc.expSingleChunks {
					it = c.Iterator(nil)
				} else {
					require.Nil(t, c)
					it = iterable.Iterator(nil)
				}
				resultSamples, err := storage.ExpandSamples(it, nil)
				require.NoError(t, err)
				requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
			}
		})
	}
}

// TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests
// that if a query comes and performs a Series() call followed by a Chunks() call
// the response is consistent with the data seen by Series() even if the OOO
// head receives more samples before Chunks() is called.
// An example:
//   - Response A comes from: Series() then Chunk()
//   - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
//   - A == B
func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) {
	for name, scenario := range sampleTypeScenarios {
		t.Run(name, func(t *testing.T) {
			testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario)
		})
	}
}

//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) {
	opts := DefaultOptions()
	opts.OutOfOrderCapMax = 5
	opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
	opts.EnableNativeHistograms = true
	opts.EnableOOONativeHistograms = true

	s1 := labels.FromStrings("l", "v1")
	minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }

	tests := []struct {
		name                   string
		queryMinT              int64
		queryMaxT              int64
		firstInOrderSampleAt   int64
		initialSamples         []testValue
		samplesAfterSeriesCall []testValue
		expChunkError          bool
		expChunksSamples       []chunks.SampleSlice
	}{
		{
			name:                 "Current head gets old, new and in between sample after Series call, they all should be omitted from the result",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			initialSamples: []testValue{
				// Chunk 0
				{Ts: minutes(20), V: 0},
				{Ts: minutes(22), V: 0},
				{Ts: minutes(24), V: 0},
				{Ts: minutes(26), V: 0},
				{Ts: minutes(30), V: 0},
				// Chunk 1 Head
				{Ts: minutes(25), V: 1},
				{Ts: minutes(35), V: 1},
			},
			samplesAfterSeriesCall: []testValue{
				{Ts: minutes(10), V: 1},
				{Ts: minutes(32), V: 1},
				{Ts: minutes(50), V: 1},
			},
			expChunkError: false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [-----------------------------------]
			// Chunk 0:                                  [--------] (5 samples)
			// Chunk 1: Current Head                          [-------] (2 samples)
			// New samples added after Series()
			// Chunk 1: Current Head            [-----------------------------------] (5 samples)
			// Output Graphically                        [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept)
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(20), 0),
					scenario.sampleFunc(minutes(22), 0),
					scenario.sampleFunc(minutes(24), 0),
					scenario.sampleFunc(minutes(25), 1),
					scenario.sampleFunc(minutes(26), 0),
					scenario.sampleFunc(minutes(30), 0),
					scenario.sampleFunc(minutes(35), 1),
				},
			},
		},
		{
			name:                 "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
			queryMinT:            minutes(0),
			queryMaxT:            minutes(100),
			firstInOrderSampleAt: minutes(120),
			initialSamples: []testValue{
				// Chunk 0
				{Ts: minutes(20), V: 0},
				{Ts: minutes(22), V: 0},
				{Ts: minutes(24), V: 0},
				{Ts: minutes(26), V: 0},
				{Ts: minutes(30), V: 0},
				// Chunk 1 Head
				{Ts: minutes(25), V: 1},
				{Ts: minutes(35), V: 1},
			},
			samplesAfterSeriesCall: []testValue{
				{Ts: minutes(10), V: 1},
				{Ts: minutes(32), V: 1},
				{Ts: minutes(50), V: 1},
				// Chunk 1 gets mmapped and Chunk 2, the new head is born
				{Ts: minutes(25), V: 2},
				{Ts: minutes(31), V: 2},
			},
			expChunkError: false,
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
			// Query Interval          [-----------------------------------]
			// Chunk 0:                                  [--------] (5 samples)
			// Chunk 1: Current Head                          [-------] (2 samples)
			// New samples added after Series()
			// Chunk 1 (mmapped)                     [-------------------------] (5 samples)
			// Chunk 2: Current Head                    [-----------] (2 samples)
			// Output Graphically                        [------------]  (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1
			expChunksSamples: []chunks.SampleSlice{
				{
					scenario.sampleFunc(minutes(20), 0),
					scenario.sampleFunc(minutes(22), 0),
					scenario.sampleFunc(minutes(24), 0),
					scenario.sampleFunc(minutes(25), 1),
					scenario.sampleFunc(minutes(26), 0),
					scenario.sampleFunc(minutes(30), 0),
					scenario.sampleFunc(minutes(35), 1),
				},
			},
		},
	}

	for _, tc := range tests {
		t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
			db := newTestDBWithOpts(t, opts)

			app := db.Appender(context.Background())
			s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
			require.NoError(t, err)
			require.NoError(t, app.Commit())

			// OOO few samples for s1.
			app = db.Appender(context.Background())
			for _, s := range tc.initialSamples {
				_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
				require.NoError(t, err)
			}
			require.NoError(t, app.Commit())

			// The Series method populates the chunk metas, taking a copy of the
			// head OOO chunk if necessary. These are then used by the ChunkReader.
			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
			var chks []chunks.Meta
			var b labels.ScratchBuilder
			err = ir.Series(s1Ref, &b, &chks)
			require.NoError(t, err)
			require.Equal(t, len(tc.expChunksSamples), len(chks))

			// Now we keep receiving ooo samples
			// OOO few samples for s1.
			app = db.Appender(context.Background())
			for _, s := range tc.samplesAfterSeriesCall {
				_, _, err = scenario.appendFunc(app, s1, s.Ts, s.V)
				require.NoError(t, err)
			}
			require.NoError(t, app.Commit())

			cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
			defer cr.Close()
			for i := 0; i < len(chks); i++ {
				c, iterable, err := cr.ChunkOrIterable(chks[i])
				require.NoError(t, err)
				require.Nil(t, c)

				it := iterable.Iterator(nil)
				resultSamples, err := storage.ExpandSamples(it, nil)
				require.NoError(t, err)
				requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
			}
		})
	}
}

// TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort
// by chunk meta MinTime and in case of same references by the lower reference.
func TestSortMetaByMinTimeAndMinRef(t *testing.T) {
	tests := []struct {
		name       string
		inputMetas []chunks.Meta
		expMetas   []chunks.Meta
	}{
		{
			name: "chunks are ordered by min time",
			inputMetas: []chunks.Meta{
				{
					Ref:     0,
					MinTime: 0,
				},
				{
					Ref:     1,
					MinTime: 1,
				},
			},
			expMetas: []chunks.Meta{
				{
					Ref:     0,
					MinTime: 0,
				},
				{
					Ref:     1,
					MinTime: 1,
				},
			},
		},
		{
			name: "if same mintime, lower reference goes first",
			inputMetas: []chunks.Meta{
				{
					Ref:     10,
					MinTime: 0,
				},
				{
					Ref:     5,
					MinTime: 0,
				},
			},
			expMetas: []chunks.Meta{
				{
					Ref:     5,
					MinTime: 0,
				},
				{
					Ref:     10,
					MinTime: 0,
				},
			},
		},
	}

	for _, tc := range tests {
		t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
			slices.SortFunc(tc.inputMetas, lessByMinTimeAndMinRef)
			require.Equal(t, tc.expMetas, tc.inputMetas)
		})
	}
}

func newTestDBWithOpts(t *testing.T, opts *Options) *DB {
	dir := t.TempDir()

	db, err := Open(dir, nil, nil, opts, nil)
	require.NoError(t, err)

	t.Cleanup(func() {
		require.NoError(t, db.Close())
	})

	return db
}