// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
	"sort"
	"strings"
	"unicode/utf8"

	"github.com/pkg/errors"
	"github.com/prometheus/prometheus/pkg/labels"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
	"github.com/prometheus/prometheus/tsdb/chunks"
	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
	"github.com/prometheus/prometheus/tsdb/index"
	"github.com/prometheus/prometheus/tsdb/tombstones"
)

// querier aggregates querying results from time blocks within
// a single partition.
type querier struct {
	blocks []storage.Querier
}

func (q *querier) LabelValues(n string) ([]string, storage.Warnings, error) {
	return q.lvals(q.blocks, n)
}

// LabelNames returns all the unique label names present querier blocks.
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
	labelNamesMap := make(map[string]struct{})
	var ws storage.Warnings
	for _, b := range q.blocks {
		names, w, err := b.LabelNames()
		ws = append(ws, w...)
		if err != nil {
			return nil, ws, errors.Wrap(err, "LabelNames() from Querier")
		}
		for _, name := range names {
			labelNamesMap[name] = struct{}{}
		}
	}

	labelNames := make([]string, 0, len(labelNamesMap))
	for name := range labelNamesMap {
		labelNames = append(labelNames, name)
	}
	sort.Strings(labelNames)

	return labelNames, ws, nil
}

func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warnings, error) {
	if len(qs) == 0 {
		return nil, nil, nil
	}
	if len(qs) == 1 {
		return qs[0].LabelValues(n)
	}
	l := len(qs) / 2

	var ws storage.Warnings
	s1, w, err := q.lvals(qs[:l], n)
	ws = append(ws, w...)
	if err != nil {
		return nil, ws, err
	}
	s2, ws, err := q.lvals(qs[l:], n)
	ws = append(ws, w...)
	if err != nil {
		return nil, ws, err
	}
	return mergeStrings(s1, s2), ws, nil
}

func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	if len(q.blocks) != 1 {
		return q.SelectSorted(p, ms...)
	}
	// Sorting Head series is slow, and unneeded when only the
	// Head is being queried. Sorting blocks is a noop.
	return q.blocks[0].Select(p, ms...)
}

func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	if len(q.blocks) == 0 {
		return storage.EmptySeriesSet(), nil, nil
	}
	ss := make([]storage.SeriesSet, len(q.blocks))
	var ws storage.Warnings
	for i, b := range q.blocks {
		s, w, err := b.SelectSorted(p, ms...)
		ws = append(ws, w...)
		if err != nil {
			return nil, ws, err
		}
		ss[i] = s
	}

	return NewMergedSeriesSet(ss), ws, nil
}

func (q *querier) Close() error {
	var merr tsdb_errors.MultiError

	for _, bq := range q.blocks {
		merr.Add(bq.Close())
	}
	return merr.Err()
}

// verticalQuerier aggregates querying results from time blocks within
// a single partition. The block time ranges can be overlapping.
type verticalQuerier struct {
	querier
}

func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	return q.sel(p, q.blocks, ms)
}

func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	return q.sel(p, q.blocks, ms)
}

func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	if len(qs) == 0 {
		return storage.EmptySeriesSet(), nil, nil
	}
	if len(qs) == 1 {
		return qs[0].SelectSorted(p, ms...)
	}
	l := len(qs) / 2

	var ws storage.Warnings
	a, w, err := q.sel(p, qs[:l], ms)
	ws = append(ws, w...)
	if err != nil {
		return nil, ws, err
	}
	b, w, err := q.sel(p, qs[l:], ms)
	ws = append(ws, w...)
	if err != nil {
		return nil, ws, err
	}
	return newMergedVerticalSeriesSet(a, b), ws, nil
}

// NewBlockQuerier returns a querier against the reader.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
	indexr, err := b.Index(mint, maxt)
	if err != nil {
		return nil, errors.Wrapf(err, "open index reader")
	}
	chunkr, err := b.Chunks()
	if err != nil {
		indexr.Close()
		return nil, errors.Wrapf(err, "open chunk reader")
	}
	tombsr, err := b.Tombstones()
	if err != nil {
		indexr.Close()
		chunkr.Close()
		return nil, errors.Wrapf(err, "open tombstone reader")
	}
	return &blockQuerier{
		mint:       mint,
		maxt:       maxt,
		index:      indexr,
		chunks:     chunkr,
		tombstones: tombsr,
	}, nil
}

// blockQuerier provides querying access to a single block database.
type blockQuerier struct {
	index      IndexReader
	chunks     ChunkReader
	tombstones tombstones.Reader

	closed bool

	mint, maxt int64
}

func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
	if err != nil {
		return nil, nil, err
	}

	mint := q.mint
	maxt := q.maxt
	if p != nil {
		mint = p.Start
		maxt = p.End
	}
	return &blockSeriesSet{
		set: &populatedChunkSeries{
			set:    base,
			chunks: q.chunks,
			mint:   mint,
			maxt:   maxt,
		},

		mint: mint,
		maxt: maxt,
	}, nil, nil
}

func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
	base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
	if err != nil {
		return nil, nil, err
	}

	mint := q.mint
	maxt := q.maxt
	if p != nil {
		mint = p.Start
		maxt = p.End
	}
	return &blockSeriesSet{
		set: &populatedChunkSeries{
			set:    base,
			chunks: q.chunks,
			mint:   mint,
			maxt:   maxt,
		},

		mint: mint,
		maxt: maxt,
	}, nil, nil
}

func (q *blockQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
	res, err := q.index.LabelValues(name)
	return res, nil, err
}

func (q *blockQuerier) LabelNames() ([]string, storage.Warnings, error) {
	res, err := q.index.LabelNames()
	return res, nil, err
}

func (q *blockQuerier) Close() error {
	if q.closed {
		return errors.New("block querier already closed")
	}

	var merr tsdb_errors.MultiError
	merr.Add(q.index.Close())
	merr.Add(q.chunks.Close())
	merr.Add(q.tombstones.Close())
	q.closed = true
	return merr.Err()
}

// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
var regexMetaCharacterBytes [16]byte

// isRegexMetaCharacter reports whether byte b needs to be escaped.
func isRegexMetaCharacter(b byte) bool {
	return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
}

func init() {
	for _, b := range []byte(`.+*?()|[]{}^$`) {
		regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
	}
}

func findSetMatches(pattern string) []string {
	// Return empty matches if the wrapper from Prometheus is missing.
	if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
		return nil
	}
	escaped := false
	sets := []*strings.Builder{{}}
	for i := 4; i < len(pattern)-2; i++ {
		if escaped {
			switch {
			case isRegexMetaCharacter(pattern[i]):
				sets[len(sets)-1].WriteByte(pattern[i])
			case pattern[i] == '\\':
				sets[len(sets)-1].WriteByte('\\')
			default:
				return nil
			}
			escaped = false
		} else {
			switch {
			case isRegexMetaCharacter(pattern[i]):
				if pattern[i] == '|' {
					sets = append(sets, &strings.Builder{})
				} else {
					return nil
				}
			case pattern[i] == '\\':
				escaped = true
			default:
				sets[len(sets)-1].WriteByte(pattern[i])
			}
		}
	}
	matches := make([]string, 0, len(sets))
	for _, s := range sets {
		if s.Len() > 0 {
			matches = append(matches, s.String())
		}
	}
	return matches
}

// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. The resulting postings are not ordered by series.
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
	var its, notIts []index.Postings
	// See which label must be non-empty.
	// Optimization for case like {l=~".", l!="1"}.
	labelMustBeSet := make(map[string]bool, len(ms))
	for _, m := range ms {
		if !m.Matches("") {
			labelMustBeSet[m.Name] = true
		}
	}

	for _, m := range ms {
		if labelMustBeSet[m.Name] {
			// If this matcher must be non-empty, we can be smarter.
			matchesEmpty := m.Matches("")
			isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
			if isNot && matchesEmpty { // l!="foo"
				// If the label can't be empty and is a Not and the inner matcher
				// doesn't match empty, then subtract it out at the end.
				inverse, err := m.Inverse()
				if err != nil {
					return nil, err
				}

				it, err := postingsForMatcher(ix, inverse)
				if err != nil {
					return nil, err
				}
				notIts = append(notIts, it)
			} else if isNot && !matchesEmpty { // l!=""
				// If the label can't be empty and is a Not, but the inner matcher can
				// be empty we need to use inversePostingsForMatcher.
				inverse, err := m.Inverse()
				if err != nil {
					return nil, err
				}

				it, err := inversePostingsForMatcher(ix, inverse)
				if err != nil {
					return nil, err
				}
				its = append(its, it)
			} else { // l="a"
				// Non-Not matcher, use normal postingsForMatcher.
				it, err := postingsForMatcher(ix, m)
				if err != nil {
					return nil, err
				}
				its = append(its, it)
			}
		} else { // l=""
			// If the matchers for a labelname selects an empty value, it selects all
			// the series which don't have the label name set too. See:
			// https://github.com/prometheus/prometheus/issues/3575 and
			// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
			it, err := inversePostingsForMatcher(ix, m)
			if err != nil {
				return nil, err
			}
			notIts = append(notIts, it)
		}
	}

	// If there's nothing to subtract from, add in everything and remove the notIts later.
	if len(its) == 0 && len(notIts) != 0 {
		k, v := index.AllPostingsKey()
		allPostings, err := ix.Postings(k, v)
		if err != nil {
			return nil, err
		}
		its = append(its, allPostings)
	}

	it := index.Intersect(its...)

	for _, n := range notIts {
		it = index.Without(it, n)
	}

	return it, nil
}

func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
	// This method will not return postings for missing labels.

	// Fast-path for equal matching.
	if m.Type == labels.MatchEqual {
		return ix.Postings(m.Name, m.Value)
	}

	// Fast-path for set matching.
	if m.Type == labels.MatchRegexp {
		setMatches := findSetMatches(m.GetRegexString())
		if len(setMatches) > 0 {
			sort.Strings(setMatches)
			return ix.Postings(m.Name, setMatches...)
		}
	}

	vals, err := ix.LabelValues(m.Name)
	if err != nil {
		return nil, err
	}

	var res []string
	for _, val := range vals {
		if m.Matches(val) {
			res = append(res, val)
		}
	}

	if len(res) == 0 {
		return index.EmptyPostings(), nil
	}

	return ix.Postings(m.Name, res...)
}

// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
	vals, err := ix.LabelValues(m.Name)
	if err != nil {
		return nil, err
	}

	var res []string
	for _, val := range vals {
		if !m.Matches(val) {
			res = append(res, val)
		}
	}

	return ix.Postings(m.Name, res...)
}

func mergeStrings(a, b []string) []string {
	maxl := len(a)
	if len(b) > len(a) {
		maxl = len(b)
	}
	res := make([]string, 0, maxl*10/9)

	for len(a) > 0 && len(b) > 0 {
		d := strings.Compare(a[0], b[0])

		if d == 0 {
			res = append(res, a[0])
			a, b = a[1:], b[1:]
		} else if d < 0 {
			res = append(res, a[0])
			a = a[1:]
		} else if d > 0 {
			res = append(res, b[0])
			b = b[1:]
		}
	}

	// Append all remaining elements.
	res = append(res, a...)
	res = append(res, b...)
	return res
}

// mergedSeriesSet returns a series sets slice as a single series set. The input series sets
// must be sorted and sequential in time.
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package.
type mergedSeriesSet struct {
	all  []storage.SeriesSet
	buf  []storage.SeriesSet // A buffer for keeping the order of SeriesSet slice during forwarding the SeriesSet.
	ids  []int               // The indices of chosen SeriesSet for the current run.
	done bool
	err  error
	cur  storage.Series
}

// TODO(bwplotka): Merge this with merge SeriesSet available in storage package.
func NewMergedSeriesSet(all []storage.SeriesSet) storage.SeriesSet {
	if len(all) == 1 {
		return all[0]
	}
	s := &mergedSeriesSet{all: all}
	// Initialize first elements of all sets as Next() needs
	// one element look-ahead.
	s.nextAll()
	if len(s.all) == 0 {
		s.done = true
	}

	return s
}

func (s *mergedSeriesSet) At() storage.Series {
	return s.cur
}

func (s *mergedSeriesSet) Err() error {
	return s.err
}

// nextAll is to call Next() for all SeriesSet.
// Because the order of the SeriesSet slice will affect the results,
// we need to use an buffer slice to hold the order.
func (s *mergedSeriesSet) nextAll() {
	s.buf = s.buf[:0]
	for _, ss := range s.all {
		if ss.Next() {
			s.buf = append(s.buf, ss)
		} else if ss.Err() != nil {
			s.done = true
			s.err = ss.Err()
			break
		}
	}
	s.all, s.buf = s.buf, s.all
}

// nextWithID is to call Next() for the SeriesSet with the indices of s.ids.
// Because the order of the SeriesSet slice will affect the results,
// we need to use an buffer slice to hold the order.
func (s *mergedSeriesSet) nextWithID() {
	if len(s.ids) == 0 {
		return
	}

	s.buf = s.buf[:0]
	i1 := 0
	i2 := 0
	for i1 < len(s.all) {
		if i2 < len(s.ids) && i1 == s.ids[i2] {
			if !s.all[s.ids[i2]].Next() {
				if s.all[s.ids[i2]].Err() != nil {
					s.done = true
					s.err = s.all[s.ids[i2]].Err()
					break
				}
				i2++
				i1++
				continue
			}
			i2++
		}
		s.buf = append(s.buf, s.all[i1])
		i1++
	}
	s.all, s.buf = s.buf, s.all
}

func (s *mergedSeriesSet) Next() bool {
	if s.done {
		return false
	}

	s.nextWithID()
	if s.done {
		return false
	}
	s.ids = s.ids[:0]
	if len(s.all) == 0 {
		s.done = true
		return false
	}

	// Here we are looking for a set of series sets with the lowest labels,
	// and we will cache their indexes in s.ids.
	s.ids = append(s.ids, 0)
	for i := 1; i < len(s.all); i++ {
		cmp := labels.Compare(s.all[s.ids[0]].At().Labels(), s.all[i].At().Labels())
		if cmp > 0 {
			s.ids = s.ids[:1]
			s.ids[0] = i
		} else if cmp == 0 {
			s.ids = append(s.ids, i)
		}
	}

	if len(s.ids) > 1 {
		series := make([]storage.Series, len(s.ids))
		for i, idx := range s.ids {
			series[i] = s.all[idx].At()
		}
		s.cur = &chainedSeries{series: series}
	} else {
		s.cur = s.all[s.ids[0]].At()
	}
	return true
}

type mergedVerticalSeriesSet struct {
	a, b         storage.SeriesSet
	cur          storage.Series
	adone, bdone bool
}

// NewMergedVerticalSeriesSet takes two series sets as a single series set.
// The input series sets must be sorted and
// the time ranges of the series can be overlapping.
func NewMergedVerticalSeriesSet(a, b storage.SeriesSet) storage.SeriesSet {
	return newMergedVerticalSeriesSet(a, b)
}

func newMergedVerticalSeriesSet(a, b storage.SeriesSet) *mergedVerticalSeriesSet {
	s := &mergedVerticalSeriesSet{a: a, b: b}
	// Initialize first elements of both sets as Next() needs
	// one element look-ahead.
	s.adone = !s.a.Next()
	s.bdone = !s.b.Next()

	return s
}

func (s *mergedVerticalSeriesSet) At() storage.Series {
	return s.cur
}

func (s *mergedVerticalSeriesSet) Err() error {
	if s.a.Err() != nil {
		return s.a.Err()
	}
	return s.b.Err()
}

func (s *mergedVerticalSeriesSet) compare() int {
	if s.adone {
		return 1
	}
	if s.bdone {
		return -1
	}
	return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
}

func (s *mergedVerticalSeriesSet) Next() bool {
	if s.adone && s.bdone || s.Err() != nil {
		return false
	}

	d := s.compare()

	// Both sets contain the current series. Chain them into a single one.
	if d > 0 {
		s.cur = s.b.At()
		s.bdone = !s.b.Next()
	} else if d < 0 {
		s.cur = s.a.At()
		s.adone = !s.a.Next()
	} else {
		s.cur = &verticalChainedSeries{series: []storage.Series{s.a.At(), s.b.At()}}
		s.adone = !s.a.Next()
		s.bdone = !s.b.Next()
	}
	return true
}

// baseChunkSeries loads the label set and chunk references for a postings
// list from an index. It filters out series that have labels set that should be unset.
type baseChunkSeries struct {
	p          index.Postings
	index      IndexReader
	tombstones tombstones.Reader

	lset      labels.Labels
	chks      []chunks.Meta
	intervals tombstones.Intervals
	err       error
}

// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
	return lookupChunkSeries(false, ir, tr, ms...)
}

// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader. Series will be in order.
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
	return lookupChunkSeries(true, ir, tr, ms...)
}

func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
	if tr == nil {
		tr = tombstones.NewMemTombstones()
	}
	p, err := PostingsForMatchers(ir, ms...)
	if err != nil {
		return nil, err
	}
	if sorted {
		p = ir.SortedPostings(p)
	}
	return &baseChunkSeries{
		p:          p,
		index:      ir,
		tombstones: tr,
	}, nil
}

func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
	return s.lset, s.chks, s.intervals
}

func (s *baseChunkSeries) Err() error { return s.err }

func (s *baseChunkSeries) Next() bool {
	var (
		lset     = make(labels.Labels, len(s.lset))
		chkMetas = make([]chunks.Meta, len(s.chks))
		err      error
	)

	for s.p.Next() {
		ref := s.p.At()
		if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
			// Postings may be stale. Skip if no underlying series exists.
			if errors.Cause(err) == ErrNotFound {
				continue
			}
			s.err = err
			return false
		}

		s.lset = lset
		s.chks = chkMetas
		s.intervals, err = s.tombstones.Get(s.p.At())
		if err != nil {
			s.err = errors.Wrap(err, "get tombstones")
			return false
		}

		if len(s.intervals) > 0 {
			// Only those chunks that are not entirely deleted.
			chks := make([]chunks.Meta, 0, len(s.chks))
			for _, chk := range s.chks {
				if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(s.intervals)) {
					chks = append(chks, chk)
				}
			}

			s.chks = chks
		}

		return true
	}
	if err := s.p.Err(); err != nil {
		s.err = err
	}
	return false
}

// populatedChunkSeries loads chunk data from a store for a set of series
// with known chunk references. It filters out chunks that do not fit the
// given time range.
type populatedChunkSeries struct {
	set        storage.ChunkSeriesSet
	chunks     ChunkReader
	mint, maxt int64

	err       error
	chks      []chunks.Meta
	lset      labels.Labels
	intervals tombstones.Intervals
}

func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
	return s.lset, s.chks, s.intervals
}

func (s *populatedChunkSeries) Err() error { return s.err }

func (s *populatedChunkSeries) Next() bool {
	for s.set.Next() {
		lset, chks, dranges := s.set.At()

		for len(chks) > 0 {
			if chks[0].MaxTime >= s.mint {
				break
			}
			chks = chks[1:]
		}

		// This is to delete in place while iterating.
		for i, rlen := 0, len(chks); i < rlen; i++ {
			j := i - (rlen - len(chks))
			c := &chks[j]

			// Break out at the first chunk that has no overlap with mint, maxt.
			if c.MinTime > s.maxt {
				chks = chks[:j]
				break
			}

			c.Chunk, s.err = s.chunks.Chunk(c.Ref)
			if s.err != nil {
				// This means that the chunk has be garbage collected. Remove it from the list.
				if s.err == ErrNotFound {
					s.err = nil
					// Delete in-place.
					s.chks = append(chks[:j], chks[j+1:]...)
				}
				return false
			}
		}

		if len(chks) == 0 {
			continue
		}

		s.lset = lset
		s.chks = chks
		s.intervals = dranges

		return true
	}
	if err := s.set.Err(); err != nil {
		s.err = err
	}
	return false
}

// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
	set storage.ChunkSeriesSet
	err error
	cur storage.Series

	mint, maxt int64
}

func (s *blockSeriesSet) Next() bool {
	for s.set.Next() {
		lset, chunks, dranges := s.set.At()
		s.cur = &chunkSeries{
			labels: lset,
			chunks: chunks,
			mint:   s.mint,
			maxt:   s.maxt,

			intervals: dranges,
		}
		return true
	}
	if s.set.Err() != nil {
		s.err = s.set.Err()
	}
	return false
}

func (s *blockSeriesSet) At() storage.Series { return s.cur }
func (s *blockSeriesSet) Err() error         { return s.err }

// chunkSeries is a series that is backed by a sequence of chunks holding
// time series data.
type chunkSeries struct {
	labels labels.Labels
	chunks []chunks.Meta // in-order chunk refs

	mint, maxt int64

	intervals tombstones.Intervals
}

func (s *chunkSeries) Labels() labels.Labels {
	return s.labels
}

func (s *chunkSeries) Iterator() chunkenc.Iterator {
	return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
}

// chainedSeries implements a series for a list of time-sorted series.
// They all must have the same labels.
type chainedSeries struct {
	series []storage.Series
}

func (s *chainedSeries) Labels() labels.Labels {
	return s.series[0].Labels()
}

func (s *chainedSeries) Iterator() chunkenc.Iterator {
	return newChainedSeriesIterator(s.series...)
}

// chainedSeriesIterator implements a series iterator over a list
// of time-sorted, non-overlapping iterators.
type chainedSeriesIterator struct {
	series []storage.Series // series in time order

	i   int
	cur chunkenc.Iterator
}

func newChainedSeriesIterator(s ...storage.Series) *chainedSeriesIterator {
	return &chainedSeriesIterator{
		series: s,
		i:      0,
		cur:    s[0].Iterator(),
	}
}

func (it *chainedSeriesIterator) Seek(t int64) bool {
	// We just scan the chained series sequentially as they are already
	// pre-selected by relevant time and should be accessed sequentially anyway.
	for i, s := range it.series[it.i:] {
		cur := s.Iterator()
		if !cur.Seek(t) {
			continue
		}
		it.cur = cur
		it.i += i
		return true
	}
	return false
}

func (it *chainedSeriesIterator) Next() bool {
	if it.cur.Next() {
		return true
	}
	if err := it.cur.Err(); err != nil {
		return false
	}
	if it.i == len(it.series)-1 {
		return false
	}

	it.i++
	it.cur = it.series[it.i].Iterator()

	return it.Next()
}

func (it *chainedSeriesIterator) At() (t int64, v float64) {
	return it.cur.At()
}

func (it *chainedSeriesIterator) Err() error {
	return it.cur.Err()
}

// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series.
// They all must have the same labels.
type verticalChainedSeries struct {
	series []storage.Series
}

func (s *verticalChainedSeries) Labels() labels.Labels {
	return s.series[0].Labels()
}

func (s *verticalChainedSeries) Iterator() chunkenc.Iterator {
	return newVerticalMergeSeriesIterator(s.series...)
}

// verticalMergeSeriesIterator implements a series iterator over a list
// of time-sorted, time-overlapping iterators.
type verticalMergeSeriesIterator struct {
	a, b                  chunkenc.Iterator
	aok, bok, initialized bool

	curT int64
	curV float64
}

func newVerticalMergeSeriesIterator(s ...storage.Series) chunkenc.Iterator {
	if len(s) == 1 {
		return s[0].Iterator()
	} else if len(s) == 2 {
		return &verticalMergeSeriesIterator{
			a: s[0].Iterator(),
			b: s[1].Iterator(),
		}
	}
	return &verticalMergeSeriesIterator{
		a: s[0].Iterator(),
		b: newVerticalMergeSeriesIterator(s[1:]...),
	}
}

func (it *verticalMergeSeriesIterator) Seek(t int64) bool {
	it.aok, it.bok = it.a.Seek(t), it.b.Seek(t)
	it.initialized = true
	return it.Next()
}

func (it *verticalMergeSeriesIterator) Next() bool {
	if !it.initialized {
		it.aok = it.a.Next()
		it.bok = it.b.Next()
		it.initialized = true
	}

	if !it.aok && !it.bok {
		return false
	}

	if !it.aok {
		it.curT, it.curV = it.b.At()
		it.bok = it.b.Next()
		return true
	}
	if !it.bok {
		it.curT, it.curV = it.a.At()
		it.aok = it.a.Next()
		return true
	}

	acurT, acurV := it.a.At()
	bcurT, bcurV := it.b.At()
	if acurT < bcurT {
		it.curT, it.curV = acurT, acurV
		it.aok = it.a.Next()
	} else if acurT > bcurT {
		it.curT, it.curV = bcurT, bcurV
		it.bok = it.b.Next()
	} else {
		it.curT, it.curV = bcurT, bcurV
		it.aok = it.a.Next()
		it.bok = it.b.Next()
	}
	return true
}

func (it *verticalMergeSeriesIterator) At() (t int64, v float64) {
	return it.curT, it.curV
}

func (it *verticalMergeSeriesIterator) Err() error {
	if it.a.Err() != nil {
		return it.a.Err()
	}
	return it.b.Err()
}

// chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct {
	chunks []chunks.Meta

	i          int
	cur        chunkenc.Iterator
	bufDelIter *deletedIterator

	maxt, mint int64

	intervals tombstones.Intervals
}

func newChunkSeriesIterator(cs []chunks.Meta, dranges tombstones.Intervals, mint, maxt int64) *chunkSeriesIterator {
	csi := &chunkSeriesIterator{
		chunks: cs,
		i:      0,

		mint: mint,
		maxt: maxt,

		intervals: dranges,
	}
	csi.resetCurIterator()

	return csi
}

func (it *chunkSeriesIterator) resetCurIterator() {
	if len(it.intervals) == 0 {
		it.cur = it.chunks[it.i].Chunk.Iterator(it.cur)
		return
	}
	if it.bufDelIter == nil {
		it.bufDelIter = &deletedIterator{
			intervals: it.intervals,
		}
	}
	it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it)
	it.cur = it.bufDelIter
}

func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
	if t > it.maxt {
		return false
	}

	// Seek to the first valid value after t.
	if t < it.mint {
		t = it.mint
	}

	for ; it.chunks[it.i].MaxTime < t; it.i++ {
		if it.i == len(it.chunks)-1 {
			return false
		}
	}

	it.resetCurIterator()

	for it.cur.Next() {
		t0, _ := it.cur.At()
		if t0 >= t {
			return true
		}
	}
	return false
}

func (it *chunkSeriesIterator) At() (t int64, v float64) {
	return it.cur.At()
}

func (it *chunkSeriesIterator) Next() bool {
	if it.cur.Next() {
		t, _ := it.cur.At()

		if t < it.mint {
			if !it.Seek(it.mint) {
				return false
			}
			t, _ = it.At()

			return t <= it.maxt
		}
		if t > it.maxt {
			return false
		}
		return true
	}
	if err := it.cur.Err(); err != nil {
		return false
	}
	if it.i == len(it.chunks)-1 {
		return false
	}

	it.i++
	it.resetCurIterator()

	return it.Next()
}

func (it *chunkSeriesIterator) Err() error {
	return it.cur.Err()
}

// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
	it chunkenc.Iterator

	intervals tombstones.Intervals
}

func (it *deletedIterator) At() (int64, float64) {
	return it.it.At()
}

func (it *deletedIterator) Seek(t int64) bool {
	if it.it.Err() != nil {
		return false
	}
	return it.it.Seek(t)
}

func (it *deletedIterator) Next() bool {
Outer:
	for it.it.Next() {
		ts, _ := it.it.At()

		for _, tr := range it.intervals {
			if tr.InBounds(ts) {
				continue Outer
			}

			if ts <= tr.Maxt {
				return true

			}
			it.intervals = it.intervals[1:]
		}
		return true
	}
	return false
}

func (it *deletedIterator) Err() error { return it.it.Err() }