mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-19 09:42:48 -08:00
5bee0cfce2
The ChunkReader interface's Chunk() has been changed to ChunkOrIterable(). This is a precursor to OOO native histogram support - with OOO native histograms, the chunks.Meta passed to Chunk() can result in multiple chunks being returned rather than just a single chunk (e.g. if oooMergedChunk has a counter reset in the middle). To support this, ChunkOrIterable() requires either a single chunk or an iterable to be returned. If an iterable is returned, the caller has the responsibility of converting the samples from the iterable into possibly multiple chunks. The OOOHeadChunkReader now returns an iterable rather than a chunk to prepare for the native histograms case. Also as a beneficial side effect, oooMergedChunk and boundedChunk has been simplified as they only need to implement the Iterable interface now, not the full Chunk interface. --------- Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com> Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
877 lines
26 KiB
Go
877 lines
26 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"container/heap"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
|
|
"golang.org/x/exp/slices"
|
|
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/util/annotations"
|
|
)
|
|
|
|
type mergeGenericQuerier struct {
|
|
queriers []genericQuerier
|
|
|
|
// mergeFn is used when we see series from different queriers Selects with the same labels.
|
|
mergeFn genericSeriesMergeFunc
|
|
|
|
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
|
|
concurrentSelect bool
|
|
}
|
|
|
|
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
|
|
// See NewFanout commentary to learn more about primary vs secondary differences.
|
|
//
|
|
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
|
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
|
|
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
|
for _, q := range primaries {
|
|
if _, ok := q.(noopQuerier); !ok && q != nil {
|
|
queriers = append(queriers, newGenericQuerierFrom(q))
|
|
}
|
|
}
|
|
for _, q := range secondaries {
|
|
if _, ok := q.(noopQuerier); !ok && q != nil {
|
|
queriers = append(queriers, newSecondaryQuerierFrom(q))
|
|
}
|
|
}
|
|
|
|
concurrentSelect := false
|
|
if len(secondaries) > 0 {
|
|
concurrentSelect = true
|
|
}
|
|
return &querierAdapter{&mergeGenericQuerier{
|
|
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
|
|
queriers: queriers,
|
|
concurrentSelect: concurrentSelect,
|
|
}}
|
|
}
|
|
|
|
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
|
|
// See NewFanout commentary to learn more about primary vs secondary differences.
|
|
//
|
|
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
|
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
|
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
|
|
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
|
for _, q := range primaries {
|
|
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
|
|
queriers = append(queriers, newGenericQuerierFromChunk(q))
|
|
}
|
|
}
|
|
for _, querier := range secondaries {
|
|
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
|
|
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
|
|
}
|
|
}
|
|
|
|
concurrentSelect := false
|
|
if len(secondaries) > 0 {
|
|
concurrentSelect = true
|
|
}
|
|
return &chunkQuerierAdapter{&mergeGenericQuerier{
|
|
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
|
|
queriers: queriers,
|
|
concurrentSelect: concurrentSelect,
|
|
}}
|
|
}
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
|
if len(q.queriers) == 0 {
|
|
return noopGenericSeriesSet{}
|
|
}
|
|
if len(q.queriers) == 1 {
|
|
return q.queriers[0].Select(ctx, sortSeries, hints, matchers...)
|
|
}
|
|
|
|
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
|
|
if !q.concurrentSelect {
|
|
for _, querier := range q.queriers {
|
|
// We need to sort for merge to work.
|
|
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
|
|
}
|
|
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
|
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
|
return s, s.Next()
|
|
}}
|
|
}
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
seriesSetChan = make(chan genericSeriesSet)
|
|
)
|
|
// Schedule all Selects for all queriers we know about.
|
|
for _, querier := range q.queriers {
|
|
wg.Add(1)
|
|
go func(qr genericQuerier) {
|
|
defer wg.Done()
|
|
|
|
// We need to sort for NewMergeSeriesSet to work.
|
|
seriesSetChan <- qr.Select(ctx, true, hints, matchers...)
|
|
}(querier)
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(seriesSetChan)
|
|
}()
|
|
|
|
for r := range seriesSetChan {
|
|
seriesSets = append(seriesSets, r)
|
|
}
|
|
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
|
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
|
return s, s.Next()
|
|
}}
|
|
}
|
|
|
|
type labelGenericQueriers []genericQuerier
|
|
|
|
func (l labelGenericQueriers) Len() int { return len(l) }
|
|
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
|
|
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
|
|
i := len(l) / 2
|
|
return l[:i], l[i:]
|
|
}
|
|
|
|
// LabelValues returns all potential values for a label name.
|
|
// If matchers are specified the returned result set is reduced
|
|
// to label values of metrics matching the matchers.
|
|
func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
|
res, ws, err := q.lvals(ctx, q.queriers, name, matchers...)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err)
|
|
}
|
|
return res, ws, nil
|
|
}
|
|
|
|
// lvals performs merge sort for LabelValues from multiple queriers.
|
|
func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
|
if lq.Len() == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
if lq.Len() == 1 {
|
|
return lq.Get(0).LabelValues(ctx, n, matchers...)
|
|
}
|
|
a, b := lq.SplitByHalf()
|
|
|
|
var ws annotations.Annotations
|
|
s1, w, err := q.lvals(ctx, a, n, matchers...)
|
|
ws.Merge(w)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
s2, ws, err := q.lvals(ctx, b, n, matchers...)
|
|
ws.Merge(w)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
return mergeStrings(s1, s2), ws, nil
|
|
}
|
|
|
|
func mergeStrings(a, b []string) []string {
|
|
maxl := len(a)
|
|
if len(b) > len(a) {
|
|
maxl = len(b)
|
|
}
|
|
res := make([]string, 0, maxl*10/9)
|
|
|
|
for len(a) > 0 && len(b) > 0 {
|
|
switch {
|
|
case a[0] == b[0]:
|
|
res = append(res, a[0])
|
|
a, b = a[1:], b[1:]
|
|
case a[0] < b[0]:
|
|
res = append(res, a[0])
|
|
a = a[1:]
|
|
default:
|
|
res = append(res, b[0])
|
|
b = b[1:]
|
|
}
|
|
}
|
|
|
|
// Append all remaining elements.
|
|
res = append(res, a...)
|
|
res = append(res, b...)
|
|
return res
|
|
}
|
|
|
|
// LabelNames returns all the unique label names present in all queriers in sorted order.
|
|
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
|
var (
|
|
labelNamesMap = make(map[string]struct{})
|
|
warnings annotations.Annotations
|
|
)
|
|
for _, querier := range q.queriers {
|
|
names, wrn, err := querier.LabelNames(ctx, matchers...)
|
|
if wrn != nil {
|
|
// TODO(bwplotka): We could potentially wrap warnings.
|
|
warnings.Merge(wrn)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
|
|
}
|
|
for _, name := range names {
|
|
labelNamesMap[name] = struct{}{}
|
|
}
|
|
}
|
|
if len(labelNamesMap) == 0 {
|
|
return nil, warnings, nil
|
|
}
|
|
|
|
labelNames := make([]string, 0, len(labelNamesMap))
|
|
for name := range labelNamesMap {
|
|
labelNames = append(labelNames, name)
|
|
}
|
|
slices.Sort(labelNames)
|
|
return labelNames, warnings, nil
|
|
}
|
|
|
|
// Close releases the resources of the generic querier.
|
|
func (q *mergeGenericQuerier) Close() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, querier := range q.queriers {
|
|
if err := querier.Close(); err != nil {
|
|
errs.Add(err)
|
|
}
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
|
|
// It has to handle time-overlapped series as well.
|
|
type VerticalSeriesMergeFunc func(...Series) Series
|
|
|
|
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
|
|
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
|
|
genericSets := make([]genericSeriesSet, 0, len(sets))
|
|
for _, s := range sets {
|
|
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
|
|
}
|
|
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
|
|
}
|
|
|
|
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
|
|
// chunk series with the same labels into single ChunkSeries.
|
|
//
|
|
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
|
|
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
|
|
|
|
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
|
|
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
|
|
genericSets := make([]genericSeriesSet, 0, len(sets))
|
|
for _, s := range sets {
|
|
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
|
|
}
|
|
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
|
|
}
|
|
|
|
// genericMergeSeriesSet implements genericSeriesSet.
|
|
type genericMergeSeriesSet struct {
|
|
currentLabels labels.Labels
|
|
mergeFunc genericSeriesMergeFunc
|
|
|
|
heap genericSeriesSetHeap
|
|
sets []genericSeriesSet
|
|
currentSets []genericSeriesSet
|
|
}
|
|
|
|
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
|
|
// series returned by the series sets when iterating.
|
|
// Each series set must return its series in labels order, otherwise
|
|
// merged series set will be incorrect.
|
|
// Overlapped situations are merged using provided mergeFunc.
|
|
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
|
|
if len(sets) == 1 {
|
|
return sets[0]
|
|
}
|
|
|
|
// We are pre-advancing sets, so we can introspect the label of the
|
|
// series under the cursor.
|
|
var h genericSeriesSetHeap
|
|
for _, set := range sets {
|
|
if set == nil {
|
|
continue
|
|
}
|
|
if set.Next() {
|
|
heap.Push(&h, set)
|
|
}
|
|
if err := set.Err(); err != nil {
|
|
return errorOnlySeriesSet{err}
|
|
}
|
|
}
|
|
return &genericMergeSeriesSet{
|
|
mergeFunc: mergeFunc,
|
|
sets: sets,
|
|
heap: h,
|
|
}
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Next() bool {
|
|
// Run in a loop because the "next" series sets may not be valid anymore.
|
|
// If, for the current label set, all the next series sets come from
|
|
// failed remote storage sources, we want to keep trying with the next label set.
|
|
for {
|
|
// Firstly advance all the current series sets. If any of them have run out,
|
|
// we can drop them, otherwise they should be inserted back into the heap.
|
|
for _, set := range c.currentSets {
|
|
if set.Next() {
|
|
heap.Push(&c.heap, set)
|
|
}
|
|
}
|
|
|
|
if len(c.heap) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Now, pop items of the heap that have equal label sets.
|
|
c.currentSets = c.currentSets[:0]
|
|
c.currentLabels = c.heap[0].At().Labels()
|
|
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
|
|
set := heap.Pop(&c.heap).(genericSeriesSet)
|
|
c.currentSets = append(c.currentSets, set)
|
|
}
|
|
|
|
// As long as the current set contains at least 1 set,
|
|
// then it should return true.
|
|
if len(c.currentSets) != 0 {
|
|
break
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) At() Labels {
|
|
if len(c.currentSets) == 1 {
|
|
return c.currentSets[0].At()
|
|
}
|
|
series := make([]Labels, 0, len(c.currentSets))
|
|
for _, seriesSet := range c.currentSets {
|
|
series = append(series, seriesSet.At())
|
|
}
|
|
return c.mergeFunc(series...)
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Err() error {
|
|
for _, set := range c.sets {
|
|
if err := set.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Warnings() annotations.Annotations {
|
|
var ws annotations.Annotations
|
|
for _, set := range c.sets {
|
|
ws.Merge(set.Warnings())
|
|
}
|
|
return ws
|
|
}
|
|
|
|
type genericSeriesSetHeap []genericSeriesSet
|
|
|
|
func (h genericSeriesSetHeap) Len() int { return len(h) }
|
|
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h genericSeriesSetHeap) Less(i, j int) bool {
|
|
a, b := h[i].At().Labels(), h[j].At().Labels()
|
|
return labels.Compare(a, b) < 0
|
|
}
|
|
|
|
func (h *genericSeriesSetHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(genericSeriesSet))
|
|
}
|
|
|
|
func (h *genericSeriesSetHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
|
|
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
|
|
// timestamp are dropped.
|
|
//
|
|
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
|
|
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
|
|
// this never happens.
|
|
//
|
|
// It's optimized for non-overlap cases as well.
|
|
func ChainedSeriesMerge(series ...Series) Series {
|
|
if len(series) == 0 {
|
|
return nil
|
|
}
|
|
return &SeriesEntry{
|
|
Lset: series[0].Labels(),
|
|
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
|
|
return ChainSampleIteratorFromSeries(it, series)
|
|
},
|
|
}
|
|
}
|
|
|
|
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
|
|
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
|
|
// timestamp are dropped. It's optimized for non-overlap cases as well.
|
|
type chainSampleIterator struct {
|
|
iterators []chunkenc.Iterator
|
|
h samplesIteratorHeap
|
|
|
|
curr chunkenc.Iterator
|
|
lastT int64
|
|
|
|
// Whether the previous and the current sample are direct neighbors
|
|
// within the same base iterator.
|
|
consecutive bool
|
|
}
|
|
|
|
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
|
|
func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator {
|
|
csi, ok := it.(*chainSampleIterator)
|
|
if !ok {
|
|
csi = &chainSampleIterator{}
|
|
}
|
|
if cap(csi.iterators) < length {
|
|
csi.iterators = make([]chunkenc.Iterator, length)
|
|
} else {
|
|
csi.iterators = csi.iterators[:length]
|
|
}
|
|
csi.h = nil
|
|
csi.lastT = math.MinInt64
|
|
return csi
|
|
}
|
|
|
|
func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator {
|
|
csi := getChainSampleIterator(it, len(series))
|
|
for i, s := range series {
|
|
csi.iterators[i] = s.Iterator(csi.iterators[i])
|
|
}
|
|
return csi
|
|
}
|
|
|
|
func ChainSampleIteratorFromIterables(it chunkenc.Iterator, iterables []chunkenc.Iterable) chunkenc.Iterator {
|
|
csi := getChainSampleIterator(it, len(iterables))
|
|
for i, c := range iterables {
|
|
csi.iterators[i] = c.Iterator(csi.iterators[i])
|
|
}
|
|
return csi
|
|
}
|
|
|
|
func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator {
|
|
csi := getChainSampleIterator(it, 0)
|
|
csi.iterators = iterators
|
|
return csi
|
|
}
|
|
|
|
func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
|
|
// No-op check.
|
|
if c.curr != nil && c.lastT >= t {
|
|
return c.curr.Seek(c.lastT)
|
|
}
|
|
// Don't bother to find out if the next sample is consecutive. Callers
|
|
// of Seek usually aren't interested anyway.
|
|
c.consecutive = false
|
|
c.h = samplesIteratorHeap{}
|
|
for _, iter := range c.iterators {
|
|
if iter.Seek(t) == chunkenc.ValNone {
|
|
if iter.Err() != nil {
|
|
// If any iterator is reporting an error, abort.
|
|
return chunkenc.ValNone
|
|
}
|
|
continue
|
|
}
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
if len(c.h) > 0 {
|
|
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
|
c.lastT = c.curr.AtT()
|
|
return c.curr.Seek(c.lastT)
|
|
}
|
|
c.curr = nil
|
|
return chunkenc.ValNone
|
|
}
|
|
|
|
func (c *chainSampleIterator) At() (t int64, v float64) {
|
|
if c.curr == nil {
|
|
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")
|
|
}
|
|
return c.curr.At()
|
|
}
|
|
|
|
func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
|
|
if c.curr == nil {
|
|
panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.")
|
|
}
|
|
t, h := c.curr.AtHistogram()
|
|
// If the current sample is not consecutive with the previous one, we
|
|
// cannot be sure anymore about counter resets for counter histograms.
|
|
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
|
|
// non-consecutive `CounterReset` sample, we could keep the hint as
|
|
// `CounterReset`. But then we needed to track the previous sample
|
|
// in more detail, which might not be worth it.
|
|
if !c.consecutive && h.CounterResetHint != histogram.GaugeType {
|
|
h.CounterResetHint = histogram.UnknownCounterReset
|
|
}
|
|
return t, h
|
|
}
|
|
|
|
func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
|
|
if c.curr == nil {
|
|
panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.")
|
|
}
|
|
t, fh := c.curr.AtFloatHistogram()
|
|
// If the current sample is not consecutive with the previous one, we
|
|
// cannot be sure anymore about counter resets for counter histograms.
|
|
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
|
|
// non-consecutive `CounterReset` sample, we could keep the hint as
|
|
// `CounterReset`. But then we needed to track the previous sample
|
|
// in more detail, which might not be worth it.
|
|
if !c.consecutive && fh.CounterResetHint != histogram.GaugeType {
|
|
fh.CounterResetHint = histogram.UnknownCounterReset
|
|
}
|
|
return t, fh
|
|
}
|
|
|
|
func (c *chainSampleIterator) AtT() int64 {
|
|
if c.curr == nil {
|
|
panic("chainSampleIterator.AtT called before first .Next or after .Next returned false.")
|
|
}
|
|
return c.curr.AtT()
|
|
}
|
|
|
|
func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
|
var (
|
|
currT int64
|
|
currValueType chunkenc.ValueType
|
|
iteratorChanged bool
|
|
)
|
|
if c.h == nil {
|
|
iteratorChanged = true
|
|
c.h = samplesIteratorHeap{}
|
|
// We call c.curr.Next() as the first thing below.
|
|
// So, we don't call Next() on it here.
|
|
c.curr = c.iterators[0]
|
|
for _, iter := range c.iterators[1:] {
|
|
if iter.Next() == chunkenc.ValNone {
|
|
if iter.Err() != nil {
|
|
// If any iterator is reporting an error, abort.
|
|
// If c.iterators[0] is reporting an error, we'll handle that below.
|
|
return chunkenc.ValNone
|
|
}
|
|
} else {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
}
|
|
|
|
if c.curr == nil {
|
|
return chunkenc.ValNone
|
|
}
|
|
|
|
for {
|
|
currValueType = c.curr.Next()
|
|
|
|
if currValueType == chunkenc.ValNone {
|
|
if c.curr.Err() != nil {
|
|
// Abort if we've hit an error.
|
|
return chunkenc.ValNone
|
|
}
|
|
|
|
if len(c.h) == 0 {
|
|
// No iterator left to iterate.
|
|
c.curr = nil
|
|
return chunkenc.ValNone
|
|
}
|
|
} else {
|
|
currT = c.curr.AtT()
|
|
if currT == c.lastT {
|
|
// Ignoring sample for the same timestamp.
|
|
continue
|
|
}
|
|
if len(c.h) == 0 {
|
|
// curr is the only iterator remaining,
|
|
// no need to check with the heap.
|
|
break
|
|
}
|
|
|
|
// Check current iterator with the top of the heap.
|
|
nextT := c.h[0].AtT()
|
|
if currT < nextT {
|
|
// Current iterator has smaller timestamp than the heap.
|
|
break
|
|
}
|
|
// Current iterator does not hold the smallest timestamp.
|
|
heap.Push(&c.h, c.curr)
|
|
}
|
|
|
|
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
|
iteratorChanged = true
|
|
currT = c.curr.AtT()
|
|
currValueType = c.curr.Seek(currT)
|
|
if currT != c.lastT {
|
|
break
|
|
}
|
|
}
|
|
|
|
c.consecutive = !iteratorChanged
|
|
c.lastT = currT
|
|
return currValueType
|
|
}
|
|
|
|
func (c *chainSampleIterator) Err() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, iter := range c.iterators {
|
|
errs.Add(iter.Err())
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
type samplesIteratorHeap []chunkenc.Iterator
|
|
|
|
func (h samplesIteratorHeap) Len() int { return len(h) }
|
|
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h samplesIteratorHeap) Less(i, j int) bool {
|
|
return h[i].AtT() < h[j].AtT()
|
|
}
|
|
|
|
func (h *samplesIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(chunkenc.Iterator))
|
|
}
|
|
|
|
func (h *samplesIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
|
|
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
|
|
// Samples from overlapped chunks are merged using series vertical merge func.
|
|
// It expects the same labels for each given series.
|
|
//
|
|
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
|
|
// to handle overlaps between series.
|
|
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
|
|
return func(series ...ChunkSeries) ChunkSeries {
|
|
if len(series) == 0 {
|
|
return nil
|
|
}
|
|
return &ChunkSeriesEntry{
|
|
Lset: series[0].Labels(),
|
|
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
|
iterators := make([]chunks.Iterator, 0, len(series))
|
|
for _, s := range series {
|
|
iterators = append(iterators, s.Iterator(nil))
|
|
}
|
|
return &compactChunkIterator{
|
|
mergeFunc: mergeFunc,
|
|
iterators: iterators,
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
|
|
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
|
|
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
|
type compactChunkIterator struct {
|
|
mergeFunc VerticalSeriesMergeFunc
|
|
iterators []chunks.Iterator
|
|
|
|
h chunkIteratorHeap
|
|
|
|
err error
|
|
curr chunks.Meta
|
|
}
|
|
|
|
func (c *compactChunkIterator) At() chunks.Meta {
|
|
return c.curr
|
|
}
|
|
|
|
func (c *compactChunkIterator) Next() bool {
|
|
if c.h == nil {
|
|
for _, iter := range c.iterators {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
}
|
|
if len(c.h) == 0 {
|
|
return false
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(chunks.Iterator)
|
|
c.curr = iter.At()
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
|
|
var (
|
|
overlapping []Series
|
|
oMaxTime = c.curr.MaxTime
|
|
prev = c.curr
|
|
)
|
|
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
|
|
for len(c.h) > 0 {
|
|
// Get the next oldest chunk by min, then max time.
|
|
next := c.h[0].At()
|
|
if next.MinTime > oMaxTime {
|
|
// No overlap with current one.
|
|
break
|
|
}
|
|
|
|
// Only do something if it is not a perfect duplicate.
|
|
if next.MinTime != prev.MinTime ||
|
|
next.MaxTime != prev.MaxTime ||
|
|
!bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
|
|
// We operate on same series, so labels do not matter here.
|
|
overlapping = append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), next))
|
|
if next.MaxTime > oMaxTime {
|
|
oMaxTime = next.MaxTime
|
|
}
|
|
prev = next
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(chunks.Iterator)
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
if len(overlapping) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
|
|
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil)
|
|
if !iter.Next() {
|
|
if c.err = iter.Err(); c.err != nil {
|
|
return false
|
|
}
|
|
panic("unexpected seriesToChunkEncoder lack of iterations")
|
|
}
|
|
c.curr = iter.At()
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *compactChunkIterator) Err() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, iter := range c.iterators {
|
|
errs.Add(iter.Err())
|
|
}
|
|
errs.Add(c.err)
|
|
return errs.Err()
|
|
}
|
|
|
|
type chunkIteratorHeap []chunks.Iterator
|
|
|
|
func (h chunkIteratorHeap) Len() int { return len(h) }
|
|
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h chunkIteratorHeap) Less(i, j int) bool {
|
|
at := h[i].At()
|
|
bt := h[j].At()
|
|
if at.MinTime == bt.MinTime {
|
|
return at.MaxTime < bt.MaxTime
|
|
}
|
|
return at.MinTime < bt.MinTime
|
|
}
|
|
|
|
func (h *chunkIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(chunks.Iterator))
|
|
}
|
|
|
|
func (h *chunkIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// NewConcatenatingChunkSeriesMerger returns a VerticalChunkSeriesMergeFunc that simply concatenates the
|
|
// chunks from the series. The resultant stream of chunks for a series might be overlapping and unsorted.
|
|
func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
|
|
return func(series ...ChunkSeries) ChunkSeries {
|
|
if len(series) == 0 {
|
|
return nil
|
|
}
|
|
return &ChunkSeriesEntry{
|
|
Lset: series[0].Labels(),
|
|
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
|
iterators := make([]chunks.Iterator, 0, len(series))
|
|
for _, s := range series {
|
|
iterators = append(iterators, s.Iterator(nil))
|
|
}
|
|
return &concatenatingChunkIterator{
|
|
iterators: iterators,
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
type concatenatingChunkIterator struct {
|
|
iterators []chunks.Iterator
|
|
idx int
|
|
|
|
curr chunks.Meta
|
|
}
|
|
|
|
func (c *concatenatingChunkIterator) At() chunks.Meta {
|
|
return c.curr
|
|
}
|
|
|
|
func (c *concatenatingChunkIterator) Next() bool {
|
|
if c.idx >= len(c.iterators) {
|
|
return false
|
|
}
|
|
if c.iterators[c.idx].Next() {
|
|
c.curr = c.iterators[c.idx].At()
|
|
return true
|
|
}
|
|
if c.iterators[c.idx].Err() != nil {
|
|
return false
|
|
}
|
|
c.idx++
|
|
return c.Next()
|
|
}
|
|
|
|
func (c *concatenatingChunkIterator) Err() error {
|
|
errs := tsdb_errors.NewMulti()
|
|
for _, iter := range c.iterators {
|
|
errs.Add(iter.Err())
|
|
}
|
|
return errs.Err()
|
|
}
|