// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
	"math"
	"sort"
	"strings"
	"unicode/utf8"

	"github.com/pkg/errors"

	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
	"github.com/prometheus/prometheus/tsdb/chunks"
	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
	"github.com/prometheus/prometheus/tsdb/index"
	"github.com/prometheus/prometheus/tsdb/tombstones"
)

// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
var regexMetaCharacterBytes [16]byte

// isRegexMetaCharacter reports whether byte b needs to be escaped.
func isRegexMetaCharacter(b byte) bool {
	return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
}

func init() {
	for _, b := range []byte(`.+*?()|[]{}^$`) {
		regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
	}
}

type blockBaseQuerier struct {
	index      IndexReader
	chunks     ChunkReader
	tombstones tombstones.Reader

	closed bool

	mint, maxt int64
}

func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
	indexr, err := b.Index()
	if err != nil {
		return nil, errors.Wrap(err, "open index reader")
	}
	chunkr, err := b.Chunks()
	if err != nil {
		indexr.Close()
		return nil, errors.Wrap(err, "open chunk reader")
	}
	tombsr, err := b.Tombstones()
	if err != nil {
		indexr.Close()
		chunkr.Close()
		return nil, errors.Wrap(err, "open tombstone reader")
	}

	if tombsr == nil {
		tombsr = tombstones.NewMemTombstones()
	}
	return &blockBaseQuerier{
		mint:       mint,
		maxt:       maxt,
		index:      indexr,
		chunks:     chunkr,
		tombstones: tombsr,
	}, nil
}

func (q *blockBaseQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
	res, err := q.index.SortedLabelValues(name, matchers...)
	return res, nil, err
}

func (q *blockBaseQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
	res, err := q.index.LabelNames(matchers...)
	return res, nil, err
}

func (q *blockBaseQuerier) Close() error {
	if q.closed {
		return errors.New("block querier already closed")
	}

	errs := tsdb_errors.NewMulti(
		q.index.Close(),
		q.chunks.Close(),
		q.tombstones.Close(),
	)
	q.closed = true
	return errs.Err()
}

type blockQuerier struct {
	*blockBaseQuerier
}

// NewBlockQuerier returns a querier against the block reader and requested min and max time range.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
	q, err := newBlockBaseQuerier(b, mint, maxt)
	if err != nil {
		return nil, err
	}
	return &blockQuerier{blockBaseQuerier: q}, nil
}

func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
	mint := q.mint
	maxt := q.maxt
	disableTrimming := false

	p, err := PostingsForMatchers(q.index, ms...)
	if err != nil {
		return storage.ErrSeriesSet(err)
	}
	if sortSeries {
		p = q.index.SortedPostings(p)
	}

	if hints != nil {
		mint = hints.Start
		maxt = hints.End
		disableTrimming = hints.DisableTrimming
		if hints.Func == "series" {
			// When you're only looking up metadata (for example series API), you don't need to load any chunks.
			return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming)
		}
	}

	return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
}

// blockChunkQuerier provides chunk querying access to a single block database.
type blockChunkQuerier struct {
	*blockBaseQuerier
}

// NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.
func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
	q, err := newBlockBaseQuerier(b, mint, maxt)
	if err != nil {
		return nil, err
	}
	return &blockChunkQuerier{blockBaseQuerier: q}, nil
}

func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
	mint := q.mint
	maxt := q.maxt
	disableTrimming := false
	if hints != nil {
		mint = hints.Start
		maxt = hints.End
		disableTrimming = hints.DisableTrimming
	}
	p, err := PostingsForMatchers(q.index, ms...)
	if err != nil {
		return storage.ErrChunkSeriesSet(err)
	}
	if sortSeries {
		p = q.index.SortedPostings(p)
	}
	return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
}

func findSetMatches(pattern string) []string {
	// Return empty matches if the wrapper from Prometheus is missing.
	if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
		return nil
	}
	escaped := false
	sets := []*strings.Builder{{}}
	for i := 4; i < len(pattern)-2; i++ {
		if escaped {
			switch {
			case isRegexMetaCharacter(pattern[i]):
				sets[len(sets)-1].WriteByte(pattern[i])
			case pattern[i] == '\\':
				sets[len(sets)-1].WriteByte('\\')
			default:
				return nil
			}
			escaped = false
		} else {
			switch {
			case isRegexMetaCharacter(pattern[i]):
				if pattern[i] == '|' {
					sets = append(sets, &strings.Builder{})
				} else {
					return nil
				}
			case pattern[i] == '\\':
				escaped = true
			default:
				sets[len(sets)-1].WriteByte(pattern[i])
			}
		}
	}
	matches := make([]string, 0, len(sets))
	for _, s := range sets {
		if s.Len() > 0 {
			matches = append(matches, s.String())
		}
	}
	return matches
}

// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. The resulting postings are not ordered by series.
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
	var its, notIts []index.Postings
	// See which label must be non-empty.
	// Optimization for case like {l=~".", l!="1"}.
	labelMustBeSet := make(map[string]bool, len(ms))
	for _, m := range ms {
		if !m.Matches("") {
			labelMustBeSet[m.Name] = true
		}
	}

	for _, m := range ms {
		if labelMustBeSet[m.Name] {
			// If this matcher must be non-empty, we can be smarter.
			matchesEmpty := m.Matches("")
			isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
			if isNot && matchesEmpty { // l!="foo"
				// If the label can't be empty and is a Not and the inner matcher
				// doesn't match empty, then subtract it out at the end.
				inverse, err := m.Inverse()
				if err != nil {
					return nil, err
				}

				it, err := postingsForMatcher(ix, inverse)
				if err != nil {
					return nil, err
				}
				notIts = append(notIts, it)
			} else if isNot && !matchesEmpty { // l!=""
				// If the label can't be empty and is a Not, but the inner matcher can
				// be empty we need to use inversePostingsForMatcher.
				inverse, err := m.Inverse()
				if err != nil {
					return nil, err
				}

				it, err := inversePostingsForMatcher(ix, inverse)
				if err != nil {
					return nil, err
				}
				its = append(its, it)
			} else { // l="a"
				// Non-Not matcher, use normal postingsForMatcher.
				it, err := postingsForMatcher(ix, m)
				if err != nil {
					return nil, err
				}
				its = append(its, it)
			}
		} else { // l=""
			// If the matchers for a labelname selects an empty value, it selects all
			// the series which don't have the label name set too. See:
			// https://github.com/prometheus/prometheus/issues/3575 and
			// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
			it, err := inversePostingsForMatcher(ix, m)
			if err != nil {
				return nil, err
			}
			notIts = append(notIts, it)
		}
	}

	// If there's nothing to subtract from, add in everything and remove the notIts later.
	if len(its) == 0 && len(notIts) != 0 {
		k, v := index.AllPostingsKey()
		allPostings, err := ix.Postings(k, v)
		if err != nil {
			return nil, err
		}
		its = append(its, allPostings)
	}

	it := index.Intersect(its...)

	for _, n := range notIts {
		it = index.Without(it, n)
	}

	return it, nil
}

func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
	// This method will not return postings for missing labels.

	// Fast-path for equal matching.
	if m.Type == labels.MatchEqual {
		return ix.Postings(m.Name, m.Value)
	}

	// Fast-path for set matching.
	if m.Type == labels.MatchRegexp {
		setMatches := findSetMatches(m.GetRegexString())
		if len(setMatches) > 0 {
			sort.Strings(setMatches)
			return ix.Postings(m.Name, setMatches...)
		}
	}

	vals, err := ix.LabelValues(m.Name)
	if err != nil {
		return nil, err
	}

	var res []string
	lastVal, isSorted := "", true
	for _, val := range vals {
		if m.Matches(val) {
			res = append(res, val)
			if isSorted && val < lastVal {
				isSorted = false
			}
			lastVal = val
		}
	}

	if len(res) == 0 {
		return index.EmptyPostings(), nil
	}

	if !isSorted {
		sort.Strings(res)
	}
	return ix.Postings(m.Name, res...)
}

// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
	vals, err := ix.LabelValues(m.Name)
	if err != nil {
		return nil, err
	}

	var res []string
	lastVal, isSorted := "", true
	for _, val := range vals {
		if !m.Matches(val) {
			res = append(res, val)
			if isSorted && val < lastVal {
				isSorted = false
			}
			lastVal = val
		}
	}

	if !isSorted {
		sort.Strings(res)
	}
	return ix.Postings(m.Name, res...)
}

func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
	// We're only interested in metrics which have the label <name>.
	requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, name, "")
	if err != nil {
		return nil, errors.Wrapf(err, "Failed to instantiate label matcher")
	}

	var p index.Postings
	p, err = PostingsForMatchers(r, append(matchers, requireLabel)...)
	if err != nil {
		return nil, err
	}

	dedupe := map[string]interface{}{}
	for p.Next() {
		v, err := r.LabelValueFor(p.At(), name)
		if err != nil {
			if err == storage.ErrNotFound {
				continue
			}

			return nil, err
		}
		dedupe[v] = nil
	}

	if err = p.Err(); err != nil {
		return nil, err
	}

	values := make([]string, 0, len(dedupe))
	for value := range dedupe {
		values = append(values, value)
	}

	return values, nil
}

func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
	p, err := PostingsForMatchers(r, matchers...)
	if err != nil {
		return nil, err
	}

	var postings []storage.SeriesRef
	for p.Next() {
		postings = append(postings, p.At())
	}
	if p.Err() != nil {
		return nil, errors.Wrapf(p.Err(), "postings for label names with matchers")
	}

	return r.LabelNamesFor(postings...)
}

// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
	p               index.Postings
	index           IndexReader
	chunks          ChunkReader
	tombstones      tombstones.Reader
	mint, maxt      int64
	disableTrimming bool

	currIterFn func() *populateWithDelGenericSeriesIterator
	currLabels labels.Labels

	bufChks []chunks.Meta
	bufLbls labels.Labels
	err     error
}

func (b *blockBaseSeriesSet) Next() bool {
	for b.p.Next() {
		if err := b.index.Series(b.p.At(), &b.bufLbls, &b.bufChks); err != nil {
			// Postings may be stale. Skip if no underlying series exists.
			if errors.Cause(err) == storage.ErrNotFound {
				continue
			}
			b.err = errors.Wrapf(err, "get series %d", b.p.At())
			return false
		}

		if len(b.bufChks) == 0 {
			continue
		}

		intervals, err := b.tombstones.Get(b.p.At())
		if err != nil {
			b.err = errors.Wrap(err, "get tombstones")
			return false
		}

		// NOTE:
		// * block time range is half-open: [meta.MinTime, meta.MaxTime).
		// * chunks are both closed: [chk.MinTime, chk.MaxTime].
		// * requested time ranges are closed: [req.Start, req.End].

		var trimFront, trimBack bool

		// Copy chunks as iterables are reusable.
		chks := make([]chunks.Meta, 0, len(b.bufChks))

		// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
		for _, chk := range b.bufChks {
			if chk.MaxTime < b.mint {
				continue
			}
			if chk.MinTime > b.maxt {
				continue
			}

			if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
				chks = append(chks, chk)
			}

			// If still not entirely deleted, check if trim is needed based on requested time range.
			if !b.disableTrimming {
				if chk.MinTime < b.mint {
					trimFront = true
				}
				if chk.MaxTime > b.maxt {
					trimBack = true
				}
			}
		}

		if len(chks) == 0 {
			continue
		}

		if trimFront {
			intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
		}
		if trimBack {
			intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
		}

		b.currLabels = make(labels.Labels, len(b.bufLbls))
		copy(b.currLabels, b.bufLbls)

		b.currIterFn = func() *populateWithDelGenericSeriesIterator {
			return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
		}
		return true
	}
	return false
}

func (b *blockBaseSeriesSet) Err() error {
	if b.err != nil {
		return b.err
	}
	return b.p.Err()
}

func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }

// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
// that chunks are trimmed based on given tombstones interval if any.
//
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
// is invalid and chunk rewrite is needed, currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
	chunks ChunkReader
	// chks are expected to be sorted by minTime and should be related to the same, single series.
	chks []chunks.Meta

	i         int
	err       error
	bufIter   *DeletedIterator
	intervals tombstones.Intervals

	currDelIter chunkenc.Iterator
	currChkMeta chunks.Meta
}

func newPopulateWithDelGenericSeriesIterator(
	chunks ChunkReader,
	chks []chunks.Meta,
	intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
	return &populateWithDelGenericSeriesIterator{
		chunks:    chunks,
		chks:      chks,
		i:         -1,
		bufIter:   &DeletedIterator{},
		intervals: intervals,
	}
}

func (p *populateWithDelGenericSeriesIterator) next() bool {
	if p.err != nil || p.i >= len(p.chks)-1 {
		return false
	}

	p.i++
	p.currChkMeta = p.chks[p.i]

	p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
	if p.err != nil {
		p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
		return false
	}

	p.bufIter.Intervals = p.bufIter.Intervals[:0]
	for _, interval := range p.intervals {
		if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
			p.bufIter.Intervals = p.bufIter.Intervals.Add(interval)
		}
	}

	// Re-encode head chunks that are still open (being appended to) or
	// outside the compacted MaxTime range.
	// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
	// This happens when snapshotting the head block or just fetching chunks from TSDB.
	//
	// TODO think how to avoid the typecasting to verify when it is head block.
	_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
	if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
		// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
		p.currDelIter = nil
		return true
	}

	// We don't want full chunk or it's potentially still opened, take just part of it.
	p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
	p.currDelIter = p.bufIter
	return true
}

func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }

func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
	return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
}

func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
	return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
}

// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
	*populateWithDelGenericSeriesIterator

	curr chunkenc.Iterator
}

func (p *populateWithDelSeriesIterator) Next() bool {
	if p.curr != nil && p.curr.Next() {
		return true
	}

	for p.next() {
		if p.currDelIter != nil {
			p.curr = p.currDelIter
		} else {
			p.curr = p.currChkMeta.Chunk.Iterator(nil)
		}
		if p.curr.Next() {
			return true
		}
	}
	return false
}

func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
	if p.curr != nil && p.curr.Seek(t) {
		return true
	}
	for p.Next() {
		if p.curr.Seek(t) {
			return true
		}
	}
	return false
}

func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }

func (p *populateWithDelSeriesIterator) Err() error {
	if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
		return err
	}
	if p.curr != nil {
		return p.curr.Err()
	}
	return nil
}

type populateWithDelChunkSeriesIterator struct {
	*populateWithDelGenericSeriesIterator

	curr chunks.Meta
}

func (p *populateWithDelChunkSeriesIterator) Next() bool {
	if !p.next() {
		return false
	}

	p.curr = p.currChkMeta
	if p.currDelIter == nil {
		return true
	}

	// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
	newChunk := chunkenc.NewXORChunk()
	app, err := newChunk.Appender()
	if err != nil {
		p.err = err
		return false
	}

	if !p.currDelIter.Next() {
		if err := p.currDelIter.Err(); err != nil {
			p.err = errors.Wrap(err, "iterate chunk while re-encoding")
			return false
		}

		// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
		p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
		return false
	}

	t, v := p.currDelIter.At()
	p.curr.MinTime = t
	app.Append(t, v)

	for p.currDelIter.Next() {
		t, v = p.currDelIter.At()
		app.Append(t, v)
	}
	if err := p.currDelIter.Err(); err != nil {
		p.err = errors.Wrap(err, "iterate chunk while re-encoding")
		return false
	}

	p.curr.Chunk = newChunk
	p.curr.MaxTime = t
	return true
}

func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }

// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.
// Samples from chunks are also trimmed to requested min and max time.
type blockSeriesSet struct {
	blockBaseSeriesSet
}

func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.SeriesSet {
	return &blockSeriesSet{
		blockBaseSeriesSet{
			index:           i,
			chunks:          c,
			tombstones:      t,
			p:               p,
			mint:            mint,
			maxt:            maxt,
			disableTrimming: disableTrimming,
			bufLbls:         make(labels.Labels, 0, 10),
		},
	}
}

func (b *blockSeriesSet) At() storage.Series {
	// At can be looped over before iterating, so save the current value locally.
	currIterFn := b.currIterFn
	return &storage.SeriesEntry{
		Lset: b.currLabels,
		SampleIteratorFn: func() chunkenc.Iterator {
			return currIterFn().toSeriesIterator()
		},
	}
}

// blockChunkSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
// Chunks are also trimmed to requested [min and max] (keeping samples with min and max timestamps).
type blockChunkSeriesSet struct {
	blockBaseSeriesSet
}

func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
	return &blockChunkSeriesSet{
		blockBaseSeriesSet{
			index:           i,
			chunks:          c,
			tombstones:      t,
			p:               p,
			mint:            mint,
			maxt:            maxt,
			disableTrimming: disableTrimming,
			bufLbls:         make(labels.Labels, 0, 10),
		},
	}
}

func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
	// At can be looped over before iterating, so save the current value locally.
	currIterFn := b.currIterFn
	return &storage.ChunkSeriesEntry{
		Lset: b.currLabels,
		ChunkIteratorFn: func() chunks.Iterator {
			return currIterFn().toChunkSeriesIterator()
		},
	}
}

// NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.
func NewMergedStringIter(a, b index.StringIter) index.StringIter {
	return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
}

type mergedStringIter struct {
	a        index.StringIter
	b        index.StringIter
	aok, bok bool
	cur      string
}

func (m *mergedStringIter) Next() bool {
	if (!m.aok && !m.bok) || (m.Err() != nil) {
		return false
	}

	if !m.aok {
		m.cur = m.b.At()
		m.bok = m.b.Next()
	} else if !m.bok {
		m.cur = m.a.At()
		m.aok = m.a.Next()
	} else if m.b.At() > m.a.At() {
		m.cur = m.a.At()
		m.aok = m.a.Next()
	} else if m.a.At() > m.b.At() {
		m.cur = m.b.At()
		m.bok = m.b.Next()
	} else { // Equal.
		m.cur = m.b.At()
		m.aok = m.a.Next()
		m.bok = m.b.Next()
	}

	return true
}
func (m mergedStringIter) At() string { return m.cur }
func (m mergedStringIter) Err() error {
	if m.a.Err() != nil {
		return m.a.Err()
	}
	return m.b.Err()
}

// DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.
type DeletedIterator struct {
	// Iter is an Iterator to be wrapped.
	Iter chunkenc.Iterator
	// Intervals are the deletion intervals.
	Intervals tombstones.Intervals
}

func (it *DeletedIterator) At() (int64, float64) {
	return it.Iter.At()
}

func (it *DeletedIterator) Seek(t int64) bool {
	if it.Iter.Err() != nil {
		return false
	}
	if ok := it.Iter.Seek(t); !ok {
		return false
	}

	// Now double check if the entry falls into a deleted interval.
	ts, _ := it.At()
	for _, itv := range it.Intervals {
		if ts < itv.Mint {
			return true
		}

		if ts > itv.Maxt {
			it.Intervals = it.Intervals[1:]
			continue
		}

		// We're in the middle of an interval, we can now call Next().
		return it.Next()
	}

	// The timestamp is greater than all the deleted intervals.
	return true
}

func (it *DeletedIterator) Next() bool {
Outer:
	for it.Iter.Next() {
		ts, _ := it.Iter.At()

		for _, tr := range it.Intervals {
			if tr.InBounds(ts) {
				continue Outer
			}

			if ts <= tr.Maxt {
				return true
			}
			it.Intervals = it.Intervals[1:]
		}
		return true
	}
	return false
}

func (it *DeletedIterator) Err() error { return it.Iter.Err() }

type nopChunkReader struct {
	emptyChunk chunkenc.Chunk
}

func newNopChunkReader() ChunkReader {
	return nopChunkReader{
		emptyChunk: chunkenc.NewXORChunk(),
	}
}

func (cr nopChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
	return cr.emptyChunk, nil
}

func (cr nopChunkReader) Close() error { return nil }