// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package index

import (
	"context"
	"errors"
	"fmt"
	"hash/crc32"
	"os"
	"path/filepath"
	"slices"
	"sort"
	"strconv"
	"testing"

	"github.com/stretchr/testify/require"
	"go.uber.org/goleak"

	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
	"github.com/prometheus/prometheus/tsdb/chunks"
	"github.com/prometheus/prometheus/tsdb/encoding"
	"github.com/prometheus/prometheus/util/testutil"
)

func TestMain(m *testing.M) {
	goleak.VerifyTestMain(m)
}

type series struct {
	l      labels.Labels
	chunks []chunks.Meta
}

type mockIndex struct {
	series   map[storage.SeriesRef]series
	postings map[labels.Label][]storage.SeriesRef
	symbols  map[string]struct{}
}

func newMockIndex() mockIndex {
	ix := mockIndex{
		series:   make(map[storage.SeriesRef]series),
		postings: make(map[labels.Label][]storage.SeriesRef),
		symbols:  make(map[string]struct{}),
	}
	ix.postings[allPostingsKey] = []storage.SeriesRef{}
	return ix
}

func (m mockIndex) Symbols() (map[string]struct{}, error) {
	return m.symbols, nil
}

func (m mockIndex) AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error {
	if _, ok := m.series[ref]; ok {
		return fmt.Errorf("series with reference %d already added", ref)
	}
	l.Range(func(lbl labels.Label) {
		m.symbols[lbl.Name] = struct{}{}
		m.symbols[lbl.Value] = struct{}{}
		if _, ok := m.postings[lbl]; !ok {
			m.postings[lbl] = []storage.SeriesRef{}
		}
		m.postings[lbl] = append(m.postings[lbl], ref)
	})
	m.postings[allPostingsKey] = append(m.postings[allPostingsKey], ref)

	s := series{l: l}
	// Actual chunk data is not stored in the index.
	for _, c := range chunks {
		c.Chunk = nil
		s.chunks = append(s.chunks, c)
	}
	m.series[ref] = s

	return nil
}

func (m mockIndex) Close() error {
	return nil
}

func (m mockIndex) LabelValues(_ context.Context, name string) ([]string, error) {
	values := []string{}
	for l := range m.postings {
		if l.Name == name {
			values = append(values, l.Value)
		}
	}
	return values, nil
}

func (m mockIndex) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
	p := []Postings{}
	for _, value := range values {
		l := labels.Label{Name: name, Value: value}
		p = append(p, m.SortedPostings(NewListPostings(m.postings[l])))
	}
	return Merge(ctx, p...), nil
}

func (m mockIndex) SortedPostings(p Postings) Postings {
	ep, err := ExpandPostings(p)
	if err != nil {
		return ErrPostings(fmt.Errorf("expand postings: %w", err))
	}

	sort.Slice(ep, func(i, j int) bool {
		return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0
	})
	return NewListPostings(ep)
}

func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
	s, ok := m.series[ref]
	if !ok {
		return errors.New("not found")
	}
	builder.Assign(s.l)
	*chks = append((*chks)[:0], s.chunks...)

	return nil
}

func TestIndexRW_Create_Open(t *testing.T) {
	dir := t.TempDir()

	fn := filepath.Join(dir, indexFilename)

	// An empty index must still result in a readable file.
	iw, err := NewWriter(context.Background(), fn)
	require.NoError(t, err)
	require.NoError(t, iw.Close())

	ir, err := NewFileReader(fn)
	require.NoError(t, err)
	require.NoError(t, ir.Close())

	// Modify magic header must cause open to fail.
	f, err := os.OpenFile(fn, os.O_WRONLY, 0o666)
	require.NoError(t, err)
	_, err = f.WriteAt([]byte{0, 0}, 0)
	require.NoError(t, err)
	f.Close()

	_, err = NewFileReader(dir)
	require.Error(t, err)
}

func TestIndexRW_Postings(t *testing.T) {
	ctx := context.Background()
	var input indexWriterSeriesSlice
	for i := 1; i < 5; i++ {
		input = append(input, &indexWriterSeries{
			labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)),
		})
	}
	ir, fn, _ := createFileReader(ctx, t, input)

	p, err := ir.Postings(ctx, "a", "1")
	require.NoError(t, err)

	var c []chunks.Meta
	var builder labels.ScratchBuilder

	for i := 0; p.Next(); i++ {
		err := ir.Series(p.At(), &builder, &c)

		require.NoError(t, err)
		require.Empty(t, c)
		testutil.RequireEqual(t, input[i].labels, builder.Labels())
	}
	require.NoError(t, p.Err())

	// The label indices are no longer used, so test them by hand here.
	labelValuesOffsets := map[string]uint64{}
	d := encoding.NewDecbufAt(ir.b, int(ir.toc.LabelIndicesTable), castagnoliTable)
	cnt := d.Be32()

	for d.Err() == nil && d.Len() > 0 && cnt > 0 {
		require.Equal(t, 1, d.Uvarint(), "Unexpected number of keys for label indices table")
		lbl := d.UvarintStr()
		off := d.Uvarint64()
		labelValuesOffsets[lbl] = off
		cnt--
	}
	require.NoError(t, d.Err())

	labelIndices := map[string][]string{}
	for lbl, off := range labelValuesOffsets {
		d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable)
		require.Equal(t, 1, d.Be32int(), "Unexpected number of label indices table names")
		for i := d.Be32(); i > 0 && d.Err() == nil; i-- {
			v, err := ir.lookupSymbol(ctx, d.Be32())
			require.NoError(t, err)
			labelIndices[lbl] = append(labelIndices[lbl], v)
		}
		require.NoError(t, d.Err())
	}

	require.Equal(t, map[string][]string{
		"a": {"1"},
		"b": {"1", "2", "3", "4"},
	}, labelIndices)

	t.Run("ShardedPostings()", func(t *testing.T) {
		ir, err := NewFileReader(fn)
		require.NoError(t, err)
		t.Cleanup(func() {
			require.NoError(t, ir.Close())
		})

		// List all postings for a given label value. This is what we expect to get
		// in output from all shards.
		p, err = ir.Postings(ctx, "a", "1")
		require.NoError(t, err)

		var expected []storage.SeriesRef
		for p.Next() {
			expected = append(expected, p.At())
		}
		require.NoError(t, p.Err())
		require.NotEmpty(t, expected)

		// Query the same postings for each shard.
		const shardCount = uint64(4)
		actualShards := make(map[uint64][]storage.SeriesRef)
		actualPostings := make([]storage.SeriesRef, 0, len(expected))

		for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
			p, err = ir.Postings(ctx, "a", "1")
			require.NoError(t, err)

			p = ir.ShardedPostings(p, shardIndex, shardCount)
			for p.Next() {
				ref := p.At()

				actualShards[shardIndex] = append(actualShards[shardIndex], ref)
				actualPostings = append(actualPostings, ref)
			}
			require.NoError(t, p.Err())
		}

		// We expect the postings merged out of shards is the exact same of the non sharded ones.
		require.ElementsMatch(t, expected, actualPostings)

		// We expect the series in each shard are the expected ones.
		for shardIndex, ids := range actualShards {
			for _, id := range ids {
				var lbls labels.ScratchBuilder

				require.NoError(t, ir.Series(id, &lbls, nil))
				require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount)
			}
		}
	})
}

func TestPostingsMany(t *testing.T) {
	ctx := context.Background()
	// Create a label in the index which has 999 values.
	var input indexWriterSeriesSlice
	for i := 1; i < 1000; i++ {
		v := fmt.Sprintf("%03d", i)
		input = append(input, &indexWriterSeries{
			labels: labels.FromStrings("i", v, "foo", "bar"),
		})
	}
	ir, _, symbols := createFileReader(ctx, t, input)

	cases := []struct {
		in []string
	}{
		// Simple cases, everything is present.
		{in: []string{"002"}},
		{in: []string{"031", "032", "033"}},
		{in: []string{"032", "033"}},
		{in: []string{"127", "128"}},
		{in: []string{"127", "128", "129"}},
		{in: []string{"127", "129"}},
		{in: []string{"128", "129"}},
		{in: []string{"998", "999"}},
		{in: []string{"999"}},
		// Before actual values.
		{in: []string{"000"}},
		{in: []string{"000", "001"}},
		{in: []string{"000", "002"}},
		// After actual values.
		{in: []string{"999a"}},
		{in: []string{"999", "999a"}},
		{in: []string{"998", "999", "999a"}},
		// In the middle of actual values.
		{in: []string{"126a", "127", "128"}},
		{in: []string{"127", "127a", "128"}},
		{in: []string{"127", "127a", "128", "128a", "129"}},
		{in: []string{"127", "128a", "129"}},
		{in: []string{"128", "128a", "129"}},
		{in: []string{"128", "129", "129a"}},
		{in: []string{"126a", "126b", "127", "127a", "127b", "128", "128a", "128b", "129", "129a", "129b"}},
	}

	var builder labels.ScratchBuilder
	for _, c := range cases {
		it, err := ir.Postings(ctx, "i", c.in...)
		require.NoError(t, err)

		got := []string{}
		var metas []chunks.Meta
		for it.Next() {
			require.NoError(t, ir.Series(it.At(), &builder, &metas))
			got = append(got, builder.Labels().Get("i"))
		}
		require.NoError(t, it.Err())
		exp := []string{}
		for _, e := range c.in {
			if _, ok := symbols[e]; ok && e != "l" {
				exp = append(exp, e)
			}
		}
		require.Equal(t, exp, got, fmt.Sprintf("input: %v", c.in))
	}
}

func TestPersistence_index_e2e(t *testing.T) {
	ctx := context.Background()
	lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000)
	require.NoError(t, err)
	// Sort labels as the index writer expects series in sorted order.
	sort.Sort(labels.Slice(lbls))

	var input indexWriterSeriesSlice
	ref := uint64(0)
	// Generate ChunkMetas for every label set.
	for i, lset := range lbls {
		var metas []chunks.Meta

		for j := 0; j <= (i % 20); j++ {
			ref++
			metas = append(metas, chunks.Meta{
				MinTime: int64(j * 10000),
				MaxTime: int64((j+1)*10000) - 1,
				Ref:     chunks.ChunkRef(ref),
				Chunk:   chunkenc.NewXORChunk(),
			})
		}
		input = append(input, &indexWriterSeries{
			labels: lset,
			chunks: metas,
		})
	}

	ir, _, _ := createFileReader(ctx, t, input)

	// Population procedure as done by compaction.
	var (
		postings = NewMemPostings()
		values   = map[string]map[string]struct{}{}
	)

	mi := newMockIndex()

	for i, s := range input {
		require.NoError(t, mi.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))

		s.labels.Range(func(l labels.Label) {
			valset, ok := values[l.Name]
			if !ok {
				valset = map[string]struct{}{}
				values[l.Name] = valset
			}
			valset[l.Value] = struct{}{}
		})
		postings.Add(storage.SeriesRef(i), s.labels)
	}

	for p := range mi.postings {
		gotp, err := ir.Postings(ctx, p.Name, p.Value)
		require.NoError(t, err)

		expp, err := mi.Postings(ctx, p.Name, p.Value)
		require.NoError(t, err)

		var chks, expchks []chunks.Meta
		var builder, eBuilder labels.ScratchBuilder

		for gotp.Next() {
			require.True(t, expp.Next())

			ref := gotp.At()

			err := ir.Series(ref, &builder, &chks)
			require.NoError(t, err)

			err = mi.Series(expp.At(), &eBuilder, &expchks)
			require.NoError(t, err)
			testutil.RequireEqual(t, eBuilder.Labels(), builder.Labels())
			require.Equal(t, expchks, chks)
		}
		require.False(t, expp.Next(), "Expected no more postings for %q=%q", p.Name, p.Value)
		require.NoError(t, gotp.Err())
	}

	labelPairs := map[string][]string{}
	for l := range mi.postings {
		labelPairs[l.Name] = append(labelPairs[l.Name], l.Value)
	}
	for k, v := range labelPairs {
		sort.Strings(v)

		res, err := ir.SortedLabelValues(ctx, k)
		require.NoError(t, err)

		require.Equal(t, len(v), len(res))
		for i := 0; i < len(v); i++ {
			require.Equal(t, v[i], res[i])
		}
	}

	gotSymbols := []string{}
	it := ir.Symbols()
	for it.Next() {
		gotSymbols = append(gotSymbols, it.At())
	}
	require.NoError(t, it.Err())
	expSymbols := []string{}
	for s := range mi.symbols {
		expSymbols = append(expSymbols, s)
	}
	sort.Strings(expSymbols)
	require.Equal(t, expSymbols, gotSymbols)
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
	w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"))
	require.NoError(t, err)

	require.NoError(t, w.AddSymbol("__name__"))
	require.NoError(t, w.AddSymbol("metric_1"))
	require.NoError(t, w.AddSymbol("metric_2"))

	require.NoError(t, w.AddSeries(0, labels.FromStrings("__name__", "metric_1", "__name__", "metric_2")))

	err = w.Close()
	require.Error(t, err)
	require.ErrorContains(t, err, "corruption detected when writing postings to index")
}

func TestDecbufUvarintWithInvalidBuffer(t *testing.T) {
	b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})

	db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable)
	require.Error(t, db.Err())
}

func TestReaderWithInvalidBuffer(t *testing.T) {
	b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})

	_, err := NewReader(b)
	require.Error(t, err)
}

// TestNewFileReaderErrorNoOpenFiles ensures that in case of an error no file remains open.
func TestNewFileReaderErrorNoOpenFiles(t *testing.T) {
	dir := testutil.NewTemporaryDirectory("block", t)

	idxName := filepath.Join(dir.Path(), "index")
	err := os.WriteFile(idxName, []byte("corrupted contents"), 0o666)
	require.NoError(t, err)

	_, err = NewFileReader(idxName)
	require.Error(t, err)

	// dir.Close will fail on Win if idxName fd is not closed on error path.
	dir.Close()
}

func TestSymbols(t *testing.T) {
	buf := encoding.Encbuf{}

	// Add prefix to the buffer to simulate symbols as part of larger buffer.
	buf.PutUvarintStr("something")

	symbolsStart := buf.Len()
	buf.PutBE32int(204) // Length of symbols table.
	buf.PutBE32int(100) // Number of symbols.
	for i := 0; i < 100; i++ {
		// i represents index in unicode characters table.
		buf.PutUvarintStr(string(rune(i))) // Symbol.
	}
	checksum := crc32.Checksum(buf.Get()[symbolsStart+4:], castagnoliTable)
	buf.PutBE32(checksum) // Check sum at the end.

	s, err := NewSymbols(realByteSlice(buf.Get()), FormatV2, symbolsStart)
	require.NoError(t, err)

	// We store only 4 offsets to symbols.
	require.Equal(t, 32, s.Size())

	for i := 99; i >= 0; i-- {
		s, err := s.Lookup(uint32(i))
		require.NoError(t, err)
		require.Equal(t, string(rune(i)), s)
	}
	_, err = s.Lookup(100)
	require.Error(t, err)

	for i := 99; i >= 0; i-- {
		r, err := s.ReverseLookup(string(rune(i)))
		require.NoError(t, err)
		require.Equal(t, uint32(i), r)
	}
	_, err = s.ReverseLookup(string(rune(100)))
	require.Error(t, err)

	iter := s.Iter()
	i := 0
	for iter.Next() {
		require.Equal(t, string(rune(i)), iter.At())
		i++
	}
	require.NoError(t, iter.Err())
}

func BenchmarkReader_ShardedPostings(b *testing.B) {
	const (
		numSeries = 10000
		numShards = 16
	)

	ctx := context.Background()
	var input indexWriterSeriesSlice
	for i := 1; i <= numSeries; i++ {
		input = append(input, &indexWriterSeries{
			labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)),
		})
	}
	ir, _, _ := createFileReader(ctx, b, input)
	b.ResetTimer()

	for n := 0; n < b.N; n++ {
		allPostings, err := ir.Postings(ctx, "const", fmt.Sprintf("%10d", 1))
		require.NoError(b, err)

		ir.ShardedPostings(allPostings, uint64(n%numShards), numShards)
	}
}

func TestDecoder_Postings_WrongInput(t *testing.T) {
	_, _, err := (&Decoder{}).Postings([]byte("the cake is a lie"))
	require.Error(t, err)
}

func TestChunksRefOrdering(t *testing.T) {
	dir := t.TempDir()

	idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"))
	require.NoError(t, err)

	require.NoError(t, idx.AddSymbol("1"))
	require.NoError(t, idx.AddSymbol("2"))
	require.NoError(t, idx.AddSymbol("__name__"))

	c50 := chunks.Meta{Ref: 50}
	c100 := chunks.Meta{Ref: 100}
	c200 := chunks.Meta{Ref: 200}

	require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"), c100))
	require.EqualError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c50), "unsorted chunk reference: 50, previous: 100")
	require.NoError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c200))
	require.NoError(t, idx.Close())
}

func TestChunksTimeOrdering(t *testing.T) {
	dir := t.TempDir()

	idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"))
	require.NoError(t, err)

	require.NoError(t, idx.AddSymbol("1"))
	require.NoError(t, idx.AddSymbol("2"))
	require.NoError(t, idx.AddSymbol("__name__"))

	require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"),
		chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10}, // Also checks that first chunk can have MinTime: 0.
		chunks.Meta{Ref: 2, MinTime: 11, MaxTime: 20},
		chunks.Meta{Ref: 3, MinTime: 21, MaxTime: 30},
	))

	require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"),
		chunks.Meta{Ref: 10, MinTime: 0, MaxTime: 10},
		chunks.Meta{Ref: 20, MinTime: 10, MaxTime: 20},
	), "chunk minT 10 is not higher than previous chunk maxT 10")

	require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"),
		chunks.Meta{Ref: 10, MinTime: 100, MaxTime: 30},
	), "chunk maxT 30 is less than minT 100")

	require.NoError(t, idx.Close())
}

func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
	const seriesCount = 1000
	var input indexWriterSeriesSlice
	for i := 1; i <= seriesCount; i++ {
		input = append(input, &indexWriterSeries{
			labels: labels.FromStrings("__name__", fmt.Sprintf("%4d", i)),
			chunks: []chunks.Meta{
				{Ref: 1, MinTime: 0, MaxTime: 10},
			},
		})
	}
	ir, _, _ := createFileReader(context.Background(), t, input)

	failAfter := uint64(seriesCount / 2) // Fail after processing half of the series.
	ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
	p := ir.PostingsForLabelMatching(ctx, "__name__", func(string) bool {
		return true
	})
	require.Error(t, p.Err())
	require.Equal(t, failAfter, ctx.Count())
}

func TestReader_LabelNamesForHonorsContextCancel(t *testing.T) {
	const seriesCount = 1000
	var input indexWriterSeriesSlice
	for i := 1; i <= seriesCount; i++ {
		input = append(input, &indexWriterSeries{
			labels: labels.FromStrings(labels.MetricName, fmt.Sprintf("%4d", i)),
			chunks: []chunks.Meta{
				{Ref: 1, MinTime: 0, MaxTime: 10},
			},
		})
	}
	ir, _, _ := createFileReader(context.Background(), t, input)

	name, value := AllPostingsKey()
	p, err := ir.Postings(context.Background(), name, value)
	require.NoError(t, err)
	// We check context cancellation every 128 iterations so 3 will fail after
	// iterating 3 * 128 series.
	failAfter := uint64(3)
	ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
	_, err = ir.LabelNamesFor(ctx, p)
	require.Error(t, err)
	require.Equal(t, failAfter, ctx.Count())
}

// createFileReader creates a temporary index file. It writes the provided input to this file.
// It returns a Reader for this file, the file's name, and the symbol map.
func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, string, map[string]struct{}) {
	tb.Helper()

	fn := filepath.Join(tb.TempDir(), indexFilename)

	iw, err := NewWriter(ctx, fn)
	require.NoError(tb, err)

	symbols := map[string]struct{}{}
	for _, s := range input {
		s.labels.Range(func(l labels.Label) {
			symbols[l.Name] = struct{}{}
			symbols[l.Value] = struct{}{}
		})
	}

	syms := []string{}
	for s := range symbols {
		syms = append(syms, s)
	}
	slices.Sort(syms)
	for _, s := range syms {
		require.NoError(tb, iw.AddSymbol(s))
	}
	for i, s := range input {
		require.NoError(tb, iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))
	}
	require.NoError(tb, iw.Close())

	ir, err := NewFileReader(fn)
	require.NoError(tb, err)
	tb.Cleanup(func() {
		require.NoError(tb, ir.Close())
	})
	return ir, fn, symbols
}