// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
	"context"
	"errors"
	"fmt"
	"math"
	"slices"

	"github.com/oklog/ulid"

	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
	"github.com/prometheus/prometheus/tsdb/chunks"
	"github.com/prometheus/prometheus/tsdb/index"
	"github.com/prometheus/prometheus/tsdb/tombstones"
	"github.com/prometheus/prometheus/util/annotations"
)

var _ IndexReader = &HeadAndOOOIndexReader{}

type HeadAndOOOIndexReader struct {
	*headIndexReader            // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
	inoMint                     int64
	lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
}

var _ chunkenc.Iterable = &mergedOOOChunks{}

// mergedOOOChunks holds the list of iterables for overlapping chunks.
type mergedOOOChunks struct {
	chunkIterables []chunkenc.Iterable
}

func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
	return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
}

func NewHeadAndOOOIndexReader(head *Head, inoMint, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
	hr := &headIndexReader{
		head: head,
		mint: mint,
		maxt: maxt,
	}
	return &HeadAndOOOIndexReader{hr, inoMint, lastGarbageCollectedMmapRef}
}

func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
	s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))

	if s == nil {
		oh.head.metrics.seriesNotFound.Inc()
		return storage.ErrNotFound
	}
	builder.Assign(s.labels())

	if chks == nil {
		return nil
	}

	s.Lock()
	defer s.Unlock()
	*chks = (*chks)[:0]

	if s.ooo != nil {
		return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
	}
	*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
	return nil
}

// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
// any chunk at or before this ref will not be considered. 0 disables this check.
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
	tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))

	addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
		tmpChks = append(tmpChks, chunks.Meta{
			MinTime: minT,
			MaxTime: maxT,
			Ref:     ref,
			Chunk:   chunk,
		})
	}

	// Collect all chunks that overlap the query range.
	if s.ooo.oooHeadChunk != nil {
		c := s.ooo.oooHeadChunk
		if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
			ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
			if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
				chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
				if err != nil {
					handleChunkWriteError(err)
					return nil
				}
				for _, chk := range chks {
					addChunk(chk.minTime, chk.maxTime, ref, chk.chunk)
				}
			} else {
				var emptyChunk chunkenc.Chunk
				addChunk(c.minTime, c.maxTime, ref, emptyChunk)
			}
		}
	}
	for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
		c := s.ooo.oooMmappedChunks[i]
		if c.OverlapsClosedInterval(mint, maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
			ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
			addChunk(c.minTime, c.maxTime, ref, nil)
		}
	}

	if includeInOrder {
		tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks)
	}

	// There is nothing to do if we did not collect any chunk.
	if len(tmpChks) == 0 {
		return nil
	}

	// Next we want to sort all the collected chunks by min time so we can find
	// those that overlap.
	slices.SortFunc(tmpChks, lessByMinTimeAndMinRef)

	// Next we want to iterate the sorted collected chunks and return composites for chunks that overlap with others.
	// Example chunks of a series: 5:(100, 200) 6:(500, 600) 7:(150, 250) 8:(550, 650)
	// In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return
	// [5,7], [6,8].
	toBeMerged := tmpChks[0]
	for _, c := range tmpChks[1:] {
		if c.MinTime > toBeMerged.MaxTime {
			// This chunk doesn't overlap. Send current toBeMerged to output and start a new one.
			*chks = append(*chks, toBeMerged)
			toBeMerged = c
		} else {
			// Merge this chunk with existing toBeMerged.
			if mm, ok := toBeMerged.Chunk.(*multiMeta); ok {
				mm.metas = append(mm.metas, c)
			} else {
				toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged, c}}
			}
			if toBeMerged.MaxTime < c.MaxTime {
				toBeMerged.MaxTime = c.MaxTime
			}
		}
	}
	*chks = append(*chks, toBeMerged)

	return nil
}

// Fake Chunk object to pass a set of Metas inside Meta.Chunk.
type multiMeta struct {
	chunkenc.Chunk // We don't expect any of the methods to be called.
	metas          []chunks.Meta
}

// LabelValues needs to be overridden from the headIndexReader implementation
// so we can return labels within either in-order range or ooo range.
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
	if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() {
		return []string{}, nil
	}

	if len(matchers) == 0 {
		return oh.head.postings.LabelValues(ctx, name), nil
	}

	return labelValuesWithMatchers(ctx, oh, name, matchers...)
}

func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
	switch {
	case a.MinTime < b.MinTime:
		return -1
	case a.MinTime > b.MinTime:
		return 1
	}

	switch {
	case a.Ref < b.Ref:
		return -1
	case a.Ref > b.Ref:
		return 1
	default:
		return 0
	}
}

type HeadAndOOOChunkReader struct {
	head        *Head
	mint, maxt  int64
	cr          *headChunkReader // If nil, only read OOO chunks.
	maxMmapRef  chunks.ChunkDiskMapperRef
	oooIsoState *oooIsolationState
}

func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
	return &HeadAndOOOChunkReader{
		head:        head,
		mint:        mint,
		maxt:        maxt,
		cr:          cr,
		maxMmapRef:  maxMmapRef,
		oooIsoState: oooIsoState,
	}
}

func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
	c, it, _, err := cr.chunkOrIterable(meta, false)
	return c, it, err
}

// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
// behaviour is only implemented for the in-order head chunk.
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
	return cr.chunkOrIterable(meta, true)
}

func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
	sid, cid, isOOO := unpackHeadChunkRef(meta.Ref)
	s := cr.head.series.getByID(sid)
	// This means that the series has been garbage collected.
	if s == nil {
		return nil, nil, 0, storage.ErrNotFound
	}
	var isoState *isolationState
	if cr.cr != nil {
		isoState = cr.cr.isoState
	}

	s.Lock()
	defer s.Unlock()

	if meta.Chunk == nil {
		c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk)
		return c, nil, maxt, err
	}
	mm, ok := meta.Chunk.(*multiMeta)
	if !ok { // Complete chunk was supplied.
		return meta.Chunk, nil, meta.MaxTime, nil
	}
	// We have a composite meta: construct a composite iterable.
	mc := &mergedOOOChunks{}
	for _, m := range mm.metas {
		switch {
		case m.Chunk != nil:
			mc.chunkIterables = append(mc.chunkIterables, m.Chunk)
		default:
			_, cid, isOOO := unpackHeadChunkRef(m.Ref)
			iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk)
			if err != nil {
				return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err)
			}
			mc.chunkIterables = append(mc.chunkIterables, iterable)
		}
	}
	return nil, mc, meta.MaxTime, nil
}

func (cr *HeadAndOOOChunkReader) Close() error {
	if cr.cr != nil && cr.cr.isoState != nil {
		cr.cr.isoState.Close()
	}
	if cr.oooIsoState != nil {
		cr.oooIsoState.Close()
	}
	return nil
}

type OOOCompactionHead struct {
	head        *Head
	lastMmapRef chunks.ChunkDiskMapperRef
	lastWBLFile int
	postings    []storage.SeriesRef
	chunkRange  int64
	mint, maxt  int64 // Among all the compactable chunks.
}

// NewOOOCompactionHead does the following:
// 1. M-maps all the in-memory ooo chunks.
// 2. Compute the expected block ranges while iterating through all ooo series and store it.
// 3. Store the list of postings having ooo series.
// 4. Cuts a new WBL file for the OOO WBL.
// All the above together have a bit of CPU and memory overhead, and can have a bit of impact
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) {
	ch := &OOOCompactionHead{
		head:       head,
		chunkRange: head.chunkRange.Load(),
		mint:       math.MaxInt64,
		maxt:       math.MinInt64,
	}

	if head.wbl != nil {
		lastWBLFile, err := head.wbl.NextSegmentSync()
		if err != nil {
			return nil, err
		}
		ch.lastWBLFile = lastWBLFile
	}

	hr := headIndexReader{head: head, mint: ch.mint, maxt: ch.maxt}
	n, v := index.AllPostingsKey()
	// TODO: filter to series with OOO samples, before sorting.
	p, err := hr.Postings(ctx, n, v)
	if err != nil {
		return nil, err
	}
	p = hr.SortedPostings(p)

	var lastSeq, lastOff int
	for p.Next() {
		seriesRef := p.At()
		ms := head.series.getByID(chunks.HeadSeriesRef(seriesRef))
		if ms == nil {
			continue
		}

		// M-map the in-memory chunk and keep track of the last one.
		// Also build the block ranges -> series map.
		// TODO: consider having a lock specifically for ooo data.
		ms.Lock()

		if ms.ooo == nil {
			ms.Unlock()
			continue
		}

		var lastMmapRef chunks.ChunkDiskMapperRef
		mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger)
		if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
			// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
			mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
		}
		if len(mmapRefs) == 0 {
			lastMmapRef = 0
		} else {
			lastMmapRef = mmapRefs[len(mmapRefs)-1]
		}
		seq, off := lastMmapRef.Unpack()
		if seq > lastSeq || (seq == lastSeq && off > lastOff) {
			ch.lastMmapRef, lastSeq, lastOff = lastMmapRef, seq, off
		}
		if len(ms.ooo.oooMmappedChunks) > 0 {
			ch.postings = append(ch.postings, seriesRef)
			for _, c := range ms.ooo.oooMmappedChunks {
				if c.minTime < ch.mint {
					ch.mint = c.minTime
				}
				if c.maxTime > ch.maxt {
					ch.maxt = c.maxTime
				}
			}
		}
		ms.Unlock()
	}

	return ch, nil
}

func (ch *OOOCompactionHead) Index() (IndexReader, error) {
	return NewOOOCompactionHeadIndexReader(ch), nil
}

func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
	return NewHeadAndOOOChunkReader(ch.head, ch.mint, ch.maxt, nil, nil, ch.lastMmapRef), nil
}

func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {
	return tombstones.NewMemTombstones(), nil
}

var oooCompactionHeadULID = ulid.MustParse("0000000000XX000COMPACTHEAD")

func (ch *OOOCompactionHead) Meta() BlockMeta {
	return BlockMeta{
		MinTime: ch.mint,
		MaxTime: ch.maxt,
		ULID:    oooCompactionHeadULID,
		Stats: BlockStats{
			NumSeries: uint64(len(ch.postings)),
		},
	}
}

// CloneForTimeRange clones the OOOCompactionHead such that the IndexReader and ChunkReader
// obtained from this only looks at the m-map chunks within the given time ranges while not looking
// beyond the ch.lastMmapRef.
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead {
	return &OOOCompactionHead{
		head:        ch.head,
		lastMmapRef: ch.lastMmapRef,
		postings:    ch.postings,
		chunkRange:  ch.chunkRange,
		mint:        mint,
		maxt:        maxt,
	}
}

func (ch *OOOCompactionHead) Size() int64                            { return 0 }
func (ch *OOOCompactionHead) MinTime() int64                         { return ch.mint }
func (ch *OOOCompactionHead) MaxTime() int64                         { return ch.maxt }
func (ch *OOOCompactionHead) ChunkRange() int64                      { return ch.chunkRange }
func (ch *OOOCompactionHead) LastMmapRef() chunks.ChunkDiskMapperRef { return ch.lastMmapRef }
func (ch *OOOCompactionHead) LastWBLFile() int                       { return ch.lastWBLFile }

type OOOCompactionHeadIndexReader struct {
	ch *OOOCompactionHead
}

func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader {
	return &OOOCompactionHeadIndexReader{ch: ch}
}

func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter {
	hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt}
	return hr.Symbols()
}

func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) {
	n, v := index.AllPostingsKey()
	if name != n || len(values) != 1 || values[0] != v {
		return nil, errors.New("only AllPostingsKey is supported")
	}
	return index.NewListPostings(ir.ch.postings), nil
}

func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
	return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings {
	return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
	// This will already be sorted from the Postings() call above.
	return p
}

func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
	hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt}
	return hr.ShardedPostings(p, shardIndex, shardCount)
}

func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
	s := ir.ch.head.series.getByID(chunks.HeadSeriesRef(ref))

	if s == nil {
		ir.ch.head.metrics.seriesNotFound.Inc()
		return storage.ErrNotFound
	}
	builder.Assign(s.labels())

	s.Lock()
	defer s.Unlock()
	*chks = (*chks)[:0]

	if s.ooo == nil {
		return nil
	}

	return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
}

func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
	return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
	return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
	return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) LabelNames(context.Context, ...*labels.Matcher) ([]string, error) {
	return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) {
	return "", errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error) {
	return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) Close() error {
	return nil
}

// HeadAndOOOQuerier queries both the head and the out-of-order head.
type HeadAndOOOQuerier struct {
	mint, maxt int64
	head       *Head
	index      IndexReader
	chunkr     ChunkReader
	querier    storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end.
}

func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
	cr := &headChunkReader{
		head:     head,
		mint:     mint,
		maxt:     maxt,
		isoState: head.iso.State(mint, maxt),
	}
	return &HeadAndOOOQuerier{
		mint:    mint,
		maxt:    maxt,
		head:    head,
		index:   NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
		chunkr:  NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
		querier: querier,
	}
}

func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
	if q.querier == nil {
		return nil, nil, nil
	}
	return q.querier.LabelValues(ctx, name, hints, matchers...)
}

func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
	if q.querier == nil {
		return nil, nil, nil
	}
	return q.querier.LabelNames(ctx, hints, matchers...)
}

func (q *HeadAndOOOQuerier) Close() error {
	q.chunkr.Close()
	if q.querier == nil {
		return nil
	}
	return q.querier.Close()
}

func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
	return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
}

// HeadAndOOOChunkQuerier queries both the head and the out-of-order head.
type HeadAndOOOChunkQuerier struct {
	mint, maxt int64
	head       *Head
	index      IndexReader
	chunkr     ChunkReader
	querier    storage.ChunkQuerier
}

func NewHeadAndOOOChunkQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
	cr := &headChunkReader{
		head:     head,
		mint:     mint,
		maxt:     maxt,
		isoState: head.iso.State(mint, maxt),
	}
	return &HeadAndOOOChunkQuerier{
		mint:    mint,
		maxt:    maxt,
		head:    head,
		index:   NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
		chunkr:  NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
		querier: querier,
	}
}

func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
	if q.querier == nil {
		return nil, nil, nil
	}
	return q.querier.LabelValues(ctx, name, hints, matchers...)
}

func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
	if q.querier == nil {
		return nil, nil, nil
	}
	return q.querier.LabelNames(ctx, hints, matchers...)
}

func (q *HeadAndOOOChunkQuerier) Close() error {
	q.chunkr.Close()
	if q.querier == nil {
		return nil
	}
	return q.querier.Close()
}

func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
	return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
}