Revert "Fixed iterator regression: Avoid using heap for each sample when iterating."

This reverts commit 2c8b2c5915.
This commit is contained in:
Bartlomiej Plotka 2020-09-04 17:10:42 +01:00
parent 2c8b2c5915
commit a399227a9f

View file

@ -16,7 +16,6 @@ package storage
import ( import (
"bytes" "bytes"
"container/heap" "container/heap"
"fmt"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -419,7 +418,8 @@ func (h *genericSeriesSetHeap) Pop() interface{} {
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective // with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens. // this never happens.
// //
// It's optimized for non-overlap cases as well. // NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead
// to handle overlaps between series.
func ChainedSeriesMerge(series ...Series) Series { func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 { if len(series) == 0 {
return nil return nil
@ -438,12 +438,10 @@ func ChainedSeriesMerge(series ...Series) Series {
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps // chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same // order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped. It's optimized for non-overlap cases as well. // timestamp are dropped.
type chainSampleIterator struct { type chainSampleIterator struct {
iterators []chunkenc.Iterator iterators []chunkenc.Iterator
h samplesIteratorHeap h samplesIteratorHeap
curr chunkenc.Iterator
} }
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
@ -464,42 +462,43 @@ func (c *chainSampleIterator) Seek(t int64) bool {
} }
func (c *chainSampleIterator) At() (t int64, v float64) { func (c *chainSampleIterator) At() (t int64, v float64) {
if c.h == nil { if len(c.h) == 0 {
panic("chainSampleIterator.At() called after .Next() returned false.") panic("chainSampleIterator.At() called after .Next() returned false.")
} }
return c.curr.At()
return c.h[0].At()
} }
func (c *chainSampleIterator) Next() bool { func (c *chainSampleIterator) Next() bool {
if c.h == nil { if c.h == nil {
c.curr = c.iterators[0] for _, iter := range c.iterators {
for _, iter := range c.iterators[1:] {
if iter.Next() { if iter.Next() {
heap.Push(&c.h, iter) heap.Push(&c.h, iter)
} }
} }
} else if len(c.h) == 0 {
return c.curr != nil && c.curr.Next() return len(c.h) > 0
} }
for { if len(c.h) == 0 {
if c.curr.Next() { return false
currt, _ := c.curr.At() }
nextt, _ := c.h[0].At()
if currt < nextt { currt, _ := c.At()
return true for len(c.h) > 0 {
} nextt, _ := c.h[0].At()
if currt == nextt { // All but one of the overlapping samples will be dropped.
fmt.Println("same ts", currt, nextt) if nextt != currt {
// Ignoring sample. break
continue
}
heap.Push(&c.h, c.curr)
} }
c.curr = heap.Pop(&c.h).(chunkenc.Iterator) iter := heap.Pop(&c.h).(chunkenc.Iterator)
return true if iter.Next() {
heap.Push(&c.h, iter)
}
} }
return len(c.h) > 0
} }
func (c *chainSampleIterator) Err() error { func (c *chainSampleIterator) Err() error {