Propagate errors through mergeSeriesSet correctly.

This commit is contained in:
Tom Wilkie 2017-07-13 15:02:01 +01:00
parent 2e0d8487e3
commit 994a7f27d6

View file

@ -246,7 +246,8 @@ func (q *mergeQuerier) Close() error {
type mergeSeriesSet struct {
currentLabels labels.Labels
currentSets []SeriesSet
sets seriesSetHeap
heap seriesSetHeap
sets []SeriesSet
}
func newMergeSeriesSet(sets []SeriesSet) SeriesSet {
@ -259,7 +260,8 @@ func newMergeSeriesSet(sets []SeriesSet) SeriesSet {
}
}
return &mergeSeriesSet{
sets: h,
heap: h,
sets: sets,
}
}
@ -268,18 +270,18 @@ func (c *mergeSeriesSet) Next() bool {
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.sets, set)
heap.Push(&c.heap, set)
}
}
if len(c.sets) == 0 {
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.sets[0].At().Labels()
for len(c.sets) > 0 && labels.Equal(c.currentLabels, c.sets[0].At().Labels()) {
set := heap.Pop(&c.sets).(SeriesSet)
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet)
c.currentSets = append(c.currentSets, set)
}
return true