diff --git a/storage/interface.go b/storage/interface.go index 2cbf86126..a422d8581 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -104,6 +104,13 @@ type dedupedSeriesSet struct { // DeduplicateSeriesSet merges two SeriesSet and removes duplicates. // If two series exist in both sets, their datapoints must be equal. func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet { + if a == nil { + return b + } + if b == nil { + return a + } + s := &dedupedSeriesSet{a: a, b: b} s.adone = !s.a.Next() s.bdone = !s.b.Next() diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 06e6cc1dc..9cbcf8eeb 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -340,11 +340,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var set storage.SeriesSet for _, mset := range matcherSets { - if set == nil { - set = q.Select(mset...) - } else { - set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) - } + set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) } metrics := []labels.Labels{} diff --git a/web/federate.go b/web/federate.go index c6a463d1f..29221fdcc 100644 --- a/web/federate.go +++ b/web/federate.go @@ -69,42 +69,46 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } defer q.Close() - // TODO(fabxc): expose merge functionality in storage interface. - // We just concatenate results for all sets of matchers, which may produce - // duplicated results. vec := make(promql.Vector, 0, 8000) + var set storage.SeriesSet + for _, mset := range matcherSets { - series := q.Select(mset...) - for series.Next() { - s := series.At() - // TODO(fabxc): allow fast path for most recent sample either - // in the storage itself or caching layer in Prometheus. - it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6)) + set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) + } + if set == nil { + return + } - var t int64 - var v float64 + for set.Next() { + s := set.At() - ok := it.Seek(maxt) - if ok { - t, v = it.Values() - } else { - t, v, ok = it.PeekBack() - if !ok { - continue - } + // TODO(fabxc): allow fast path for most recent sample either + // in the storage itself or caching layer in Prometheus. + it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6)) + + var t int64 + var v float64 + + ok := it.Seek(maxt) + if ok { + t, v = it.Values() + } else { + t, v, ok = it.PeekBack() + if !ok { + continue } + } - vec = append(vec, promql.Sample{ - Metric: s.Labels(), - Point: promql.Point{T: t, V: v}, - }) - } - if series.Err() != nil { - federationErrors.Inc() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + vec = append(vec, promql.Sample{ + Metric: s.Labels(), + Point: promql.Point{T: t, V: v}, + }) + } + if set.Err() != nil { + federationErrors.Inc() + http.Error(w, err.Error(), http.StatusInternalServerError) + return } sort.Sort(byName(vec))