mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
fdbc40a9ef
* expose NewChainSampleIterator func Signed-off-by: Ben Ye <ben.ye@bytedance.com> * add comment Signed-off-by: Ben Ye <ben.ye@bytedance.com> * update comments Signed-off-by: Ben Ye <ben.ye@bytedance.com>
718 lines
21 KiB
Go
718 lines
21 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"container/heap"
|
|
"math"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
)
|
|
|
|
type mergeGenericQuerier struct {
|
|
queriers []genericQuerier
|
|
|
|
// mergeFn is used when we see series from different queriers Selects with the same labels.
|
|
mergeFn genericSeriesMergeFunc
|
|
|
|
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
|
|
concurrentSelect bool
|
|
}
|
|
|
|
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
|
|
// See NewFanout commentary to learn more about primary vs secondary differences.
|
|
//
|
|
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
|
func NewMergeQuerier(primaries []Querier, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
|
|
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
|
for _, q := range primaries {
|
|
if _, ok := q.(noopQuerier); !ok && q != nil {
|
|
queriers = append(queriers, newGenericQuerierFrom(q))
|
|
}
|
|
}
|
|
for _, q := range secondaries {
|
|
if _, ok := q.(noopQuerier); !ok && q != nil {
|
|
queriers = append(queriers, newSecondaryQuerierFrom(q))
|
|
}
|
|
}
|
|
|
|
concurrentSelect := false
|
|
if len(secondaries) > 0 {
|
|
concurrentSelect = true
|
|
}
|
|
return &querierAdapter{&mergeGenericQuerier{
|
|
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
|
|
queriers: queriers,
|
|
concurrentSelect: concurrentSelect,
|
|
}}
|
|
}
|
|
|
|
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
|
|
// See NewFanout commentary to learn more about primary vs secondary differences.
|
|
//
|
|
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
|
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
|
func NewMergeChunkQuerier(primaries []ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
|
|
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
|
for _, q := range primaries {
|
|
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
|
|
queriers = append(queriers, newGenericQuerierFromChunk(q))
|
|
}
|
|
}
|
|
for _, querier := range secondaries {
|
|
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
|
|
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
|
|
}
|
|
}
|
|
|
|
concurrentSelect := false
|
|
if len(secondaries) > 0 {
|
|
concurrentSelect = true
|
|
}
|
|
return &chunkQuerierAdapter{&mergeGenericQuerier{
|
|
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
|
|
queriers: queriers,
|
|
concurrentSelect: concurrentSelect,
|
|
}}
|
|
}
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
|
if len(q.queriers) == 0 {
|
|
return noopGenericSeriesSet{}
|
|
}
|
|
if len(q.queriers) == 1 {
|
|
return q.queriers[0].Select(sortSeries, hints, matchers...)
|
|
}
|
|
|
|
var seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
|
|
if !q.concurrentSelect {
|
|
for _, querier := range q.queriers {
|
|
// We need to sort for merge to work.
|
|
seriesSets = append(seriesSets, querier.Select(true, hints, matchers...))
|
|
}
|
|
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
|
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
|
return s, s.Next()
|
|
}}
|
|
}
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
seriesSetChan = make(chan genericSeriesSet)
|
|
)
|
|
// Schedule all Selects for all queriers we know about.
|
|
for _, querier := range q.queriers {
|
|
wg.Add(1)
|
|
go func(qr genericQuerier) {
|
|
defer wg.Done()
|
|
|
|
// We need to sort for NewMergeSeriesSet to work.
|
|
seriesSetChan <- qr.Select(true, hints, matchers...)
|
|
}(querier)
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(seriesSetChan)
|
|
}()
|
|
|
|
for r := range seriesSetChan {
|
|
seriesSets = append(seriesSets, r)
|
|
}
|
|
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
|
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
|
return s, s.Next()
|
|
}}
|
|
}
|
|
|
|
type labelGenericQueriers []genericQuerier
|
|
|
|
func (l labelGenericQueriers) Len() int { return len(l) }
|
|
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
|
|
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
|
|
i := len(l) / 2
|
|
return l[:i], l[i:]
|
|
}
|
|
|
|
// LabelValues returns all potential values for a label name.
|
|
// If matchers are specified the returned result set is reduced
|
|
// to label values of metrics matching the matchers.
|
|
func (q *mergeGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
|
res, ws, err := q.lvals(q.queriers, name, matchers...)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "LabelValues() from merge generic querier for label %s", name)
|
|
}
|
|
return res, ws, nil
|
|
}
|
|
|
|
// lvals performs merge sort for LabelValues from multiple queriers.
|
|
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
|
if lq.Len() == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
if lq.Len() == 1 {
|
|
return lq.Get(0).LabelValues(n, matchers...)
|
|
}
|
|
a, b := lq.SplitByHalf()
|
|
|
|
var ws Warnings
|
|
s1, w, err := q.lvals(a, n, matchers...)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
s2, ws, err := q.lvals(b, n, matchers...)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
return mergeStrings(s1, s2), ws, nil
|
|
}
|
|
|
|
func mergeStrings(a, b []string) []string {
|
|
maxl := len(a)
|
|
if len(b) > len(a) {
|
|
maxl = len(b)
|
|
}
|
|
res := make([]string, 0, maxl*10/9)
|
|
|
|
for len(a) > 0 && len(b) > 0 {
|
|
if a[0] == b[0] {
|
|
res = append(res, a[0])
|
|
a, b = a[1:], b[1:]
|
|
} else if a[0] < b[0] {
|
|
res = append(res, a[0])
|
|
a = a[1:]
|
|
} else {
|
|
res = append(res, b[0])
|
|
b = b[1:]
|
|
}
|
|
}
|
|
|
|
// Append all remaining elements.
|
|
res = append(res, a...)
|
|
res = append(res, b...)
|
|
return res
|
|
}
|
|
|
|
// LabelNames returns all the unique label names present in all queriers in sorted order.
|
|
func (q *mergeGenericQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
|
var (
|
|
labelNamesMap = make(map[string]struct{})
|
|
warnings Warnings
|
|
)
|
|
for _, querier := range q.queriers {
|
|
names, wrn, err := querier.LabelNames(matchers...)
|
|
if wrn != nil {
|
|
// TODO(bwplotka): We could potentially wrap warnings.
|
|
warnings = append(warnings, wrn...)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "LabelNames() from merge generic querier")
|
|
}
|
|
for _, name := range names {
|
|
labelNamesMap[name] = struct{}{}
|
|
}
|
|
}
|
|
if len(labelNamesMap) == 0 {
|
|
return nil, warnings, nil
|
|
}
|
|
|
|
labelNames := make([]string, 0, len(labelNamesMap))
|
|
for name := range labelNamesMap {
|
|
labelNames = append(labelNames, name)
|
|
}
|
|
sort.Strings(labelNames)
|
|
return labelNames, warnings, nil
|
|
}
|
|
|
|
// Close releases the resources of the generic querier.
|
|
func (q *mergeGenericQuerier) Close() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, querier := range q.queriers {
|
|
if err := querier.Close(); err != nil {
|
|
errs.Add(err)
|
|
}
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
|
|
// It has to handle time-overlapped series as well.
|
|
type VerticalSeriesMergeFunc func(...Series) Series
|
|
|
|
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
|
|
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
|
|
genericSets := make([]genericSeriesSet, 0, len(sets))
|
|
for _, s := range sets {
|
|
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
|
|
|
|
}
|
|
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
|
|
}
|
|
|
|
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
|
|
// chunk series with the same labels into single ChunkSeries.
|
|
//
|
|
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
|
|
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
|
|
|
|
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
|
|
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
|
|
genericSets := make([]genericSeriesSet, 0, len(sets))
|
|
for _, s := range sets {
|
|
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
|
|
|
|
}
|
|
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
|
|
}
|
|
|
|
// genericMergeSeriesSet implements genericSeriesSet.
|
|
type genericMergeSeriesSet struct {
|
|
currentLabels labels.Labels
|
|
mergeFunc genericSeriesMergeFunc
|
|
|
|
heap genericSeriesSetHeap
|
|
sets []genericSeriesSet
|
|
currentSets []genericSeriesSet
|
|
}
|
|
|
|
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
|
|
// series returned by the series sets when iterating.
|
|
// Each series set must return its series in labels order, otherwise
|
|
// merged series set will be incorrect.
|
|
// Overlapped situations are merged using provided mergeFunc.
|
|
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
|
|
if len(sets) == 1 {
|
|
return sets[0]
|
|
}
|
|
|
|
// We are pre-advancing sets, so we can introspect the label of the
|
|
// series under the cursor.
|
|
var h genericSeriesSetHeap
|
|
for _, set := range sets {
|
|
if set == nil {
|
|
continue
|
|
}
|
|
if set.Next() {
|
|
heap.Push(&h, set)
|
|
}
|
|
if err := set.Err(); err != nil {
|
|
return errorOnlySeriesSet{err}
|
|
}
|
|
}
|
|
return &genericMergeSeriesSet{
|
|
mergeFunc: mergeFunc,
|
|
sets: sets,
|
|
heap: h,
|
|
}
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Next() bool {
|
|
// Run in a loop because the "next" series sets may not be valid anymore.
|
|
// If, for the current label set, all the next series sets come from
|
|
// failed remote storage sources, we want to keep trying with the next label set.
|
|
for {
|
|
// Firstly advance all the current series sets. If any of them have run out,
|
|
// we can drop them, otherwise they should be inserted back into the heap.
|
|
for _, set := range c.currentSets {
|
|
if set.Next() {
|
|
heap.Push(&c.heap, set)
|
|
}
|
|
}
|
|
|
|
if len(c.heap) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Now, pop items of the heap that have equal label sets.
|
|
c.currentSets = nil
|
|
c.currentLabels = c.heap[0].At().Labels()
|
|
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
|
|
set := heap.Pop(&c.heap).(genericSeriesSet)
|
|
c.currentSets = append(c.currentSets, set)
|
|
}
|
|
|
|
// As long as the current set contains at least 1 set,
|
|
// then it should return true.
|
|
if len(c.currentSets) != 0 {
|
|
break
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) At() Labels {
|
|
if len(c.currentSets) == 1 {
|
|
return c.currentSets[0].At()
|
|
}
|
|
series := make([]Labels, 0, len(c.currentSets))
|
|
for _, seriesSet := range c.currentSets {
|
|
series = append(series, seriesSet.At())
|
|
}
|
|
return c.mergeFunc(series...)
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Err() error {
|
|
for _, set := range c.sets {
|
|
if err := set.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Warnings() Warnings {
|
|
var ws Warnings
|
|
for _, set := range c.sets {
|
|
ws = append(ws, set.Warnings()...)
|
|
}
|
|
return ws
|
|
}
|
|
|
|
type genericSeriesSetHeap []genericSeriesSet
|
|
|
|
func (h genericSeriesSetHeap) Len() int { return len(h) }
|
|
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h genericSeriesSetHeap) Less(i, j int) bool {
|
|
a, b := h[i].At().Labels(), h[j].At().Labels()
|
|
return labels.Compare(a, b) < 0
|
|
}
|
|
|
|
func (h *genericSeriesSetHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(genericSeriesSet))
|
|
}
|
|
|
|
func (h *genericSeriesSetHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
|
|
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
|
|
// timestamp are dropped.
|
|
//
|
|
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
|
|
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
|
|
// this never happens.
|
|
//
|
|
// It's optimized for non-overlap cases as well.
|
|
func ChainedSeriesMerge(series ...Series) Series {
|
|
if len(series) == 0 {
|
|
return nil
|
|
}
|
|
return &SeriesEntry{
|
|
Lset: series[0].Labels(),
|
|
SampleIteratorFn: func() chunkenc.Iterator {
|
|
iterators := make([]chunkenc.Iterator, 0, len(series))
|
|
for _, s := range series {
|
|
iterators = append(iterators, s.Iterator())
|
|
}
|
|
return NewChainSampleIterator(iterators)
|
|
},
|
|
}
|
|
}
|
|
|
|
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
|
|
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
|
|
// timestamp are dropped. It's optimized for non-overlap cases as well.
|
|
type chainSampleIterator struct {
|
|
iterators []chunkenc.Iterator
|
|
h samplesIteratorHeap
|
|
|
|
curr chunkenc.Iterator
|
|
lastt int64
|
|
}
|
|
|
|
// NewChainSampleIterator returns a single iterator that iterates over the samples from the given iterators in a sorted
|
|
// fashion. If samples overlap, one sample from overlapped ones is kept (randomly) and all others with the same
|
|
// timestamp are dropped.
|
|
func NewChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
|
|
return &chainSampleIterator{
|
|
iterators: iterators,
|
|
h: nil,
|
|
lastt: math.MinInt64,
|
|
}
|
|
}
|
|
|
|
func (c *chainSampleIterator) Seek(t int64) bool {
|
|
c.h = samplesIteratorHeap{}
|
|
for _, iter := range c.iterators {
|
|
if iter.Seek(t) {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
if len(c.h) > 0 {
|
|
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
|
c.lastt, _ = c.curr.At()
|
|
return true
|
|
}
|
|
c.curr = nil
|
|
return false
|
|
}
|
|
|
|
func (c *chainSampleIterator) At() (t int64, v float64) {
|
|
if c.curr == nil {
|
|
panic("chainSampleIterator.At() called before first .Next() or after .Next() returned false.")
|
|
}
|
|
return c.curr.At()
|
|
}
|
|
|
|
func (c *chainSampleIterator) Next() bool {
|
|
if c.h == nil {
|
|
c.h = samplesIteratorHeap{}
|
|
// We call c.curr.Next() as the first thing below.
|
|
// So, we don't call Next() on it here.
|
|
c.curr = c.iterators[0]
|
|
for _, iter := range c.iterators[1:] {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
}
|
|
|
|
if c.curr == nil {
|
|
return false
|
|
}
|
|
|
|
var currt int64
|
|
for {
|
|
if c.curr.Next() {
|
|
currt, _ = c.curr.At()
|
|
if currt == c.lastt {
|
|
// Ignoring sample for the same timestamp.
|
|
continue
|
|
}
|
|
if len(c.h) == 0 {
|
|
// curr is the only iterator remaining,
|
|
// no need to check with the heap.
|
|
break
|
|
}
|
|
|
|
// Check current iterator with the top of the heap.
|
|
if nextt, _ := c.h[0].At(); currt < nextt {
|
|
// Current iterator has smaller timestamp than the heap.
|
|
break
|
|
}
|
|
// Current iterator does not hold the smallest timestamp.
|
|
heap.Push(&c.h, c.curr)
|
|
} else if len(c.h) == 0 {
|
|
// No iterator left to iterate.
|
|
c.curr = nil
|
|
return false
|
|
}
|
|
|
|
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
|
currt, _ = c.curr.At()
|
|
if currt != c.lastt {
|
|
break
|
|
}
|
|
}
|
|
|
|
c.lastt = currt
|
|
return true
|
|
}
|
|
|
|
func (c *chainSampleIterator) Err() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, iter := range c.iterators {
|
|
errs.Add(iter.Err())
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
type samplesIteratorHeap []chunkenc.Iterator
|
|
|
|
func (h samplesIteratorHeap) Len() int { return len(h) }
|
|
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h samplesIteratorHeap) Less(i, j int) bool {
|
|
at, _ := h[i].At()
|
|
bt, _ := h[j].At()
|
|
return at < bt
|
|
}
|
|
|
|
func (h *samplesIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(chunkenc.Iterator))
|
|
}
|
|
|
|
func (h *samplesIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
|
|
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
|
|
// Samples from overlapped chunks are merged using series vertical merge func.
|
|
// It expects the same labels for each given series.
|
|
//
|
|
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
|
|
// to handle overlaps between series.
|
|
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
|
|
return func(series ...ChunkSeries) ChunkSeries {
|
|
if len(series) == 0 {
|
|
return nil
|
|
}
|
|
return &ChunkSeriesEntry{
|
|
Lset: series[0].Labels(),
|
|
ChunkIteratorFn: func() chunks.Iterator {
|
|
iterators := make([]chunks.Iterator, 0, len(series))
|
|
for _, s := range series {
|
|
iterators = append(iterators, s.Iterator())
|
|
}
|
|
return &compactChunkIterator{
|
|
mergeFunc: mergeFunc,
|
|
iterators: iterators,
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
|
|
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
|
|
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
|
type compactChunkIterator struct {
|
|
mergeFunc VerticalSeriesMergeFunc
|
|
iterators []chunks.Iterator
|
|
|
|
h chunkIteratorHeap
|
|
|
|
err error
|
|
curr chunks.Meta
|
|
}
|
|
|
|
func (c *compactChunkIterator) At() chunks.Meta {
|
|
return c.curr
|
|
}
|
|
|
|
func (c *compactChunkIterator) Next() bool {
|
|
if c.h == nil {
|
|
for _, iter := range c.iterators {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
}
|
|
if len(c.h) == 0 {
|
|
return false
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(chunks.Iterator)
|
|
c.curr = iter.At()
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
|
|
var (
|
|
overlapping []Series
|
|
oMaxTime = c.curr.MaxTime
|
|
prev = c.curr
|
|
)
|
|
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
|
|
for len(c.h) > 0 {
|
|
// Get the next oldest chunk by min, then max time.
|
|
next := c.h[0].At()
|
|
if next.MinTime > oMaxTime {
|
|
// No overlap with current one.
|
|
break
|
|
}
|
|
|
|
if next.MinTime == prev.MinTime &&
|
|
next.MaxTime == prev.MaxTime &&
|
|
bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
|
|
// 1:1 duplicates, skip it.
|
|
} else {
|
|
// We operate on same series, so labels does not matter here.
|
|
overlapping = append(overlapping, newChunkToSeriesDecoder(nil, next))
|
|
if next.MaxTime > oMaxTime {
|
|
oMaxTime = next.MaxTime
|
|
}
|
|
prev = next
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(chunks.Iterator)
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
if len(overlapping) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
|
|
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator()
|
|
if !iter.Next() {
|
|
if c.err = iter.Err(); c.err != nil {
|
|
return false
|
|
}
|
|
panic("unexpected seriesToChunkEncoder lack of iterations")
|
|
}
|
|
c.curr = iter.At()
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *compactChunkIterator) Err() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, iter := range c.iterators {
|
|
errs.Add(iter.Err())
|
|
}
|
|
errs.Add(c.err)
|
|
return errs.Err()
|
|
}
|
|
|
|
type chunkIteratorHeap []chunks.Iterator
|
|
|
|
func (h chunkIteratorHeap) Len() int { return len(h) }
|
|
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h chunkIteratorHeap) Less(i, j int) bool {
|
|
at := h[i].At()
|
|
bt := h[j].At()
|
|
if at.MinTime == bt.MinTime {
|
|
return at.MaxTime < bt.MaxTime
|
|
}
|
|
return at.MinTime < bt.MinTime
|
|
}
|
|
|
|
func (h *chunkIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(chunks.Iterator))
|
|
}
|
|
|
|
func (h *chunkIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|