web: deduplicate series in federation

This commit is contained in:
Fabian Reinartz 2017-04-04 11:13:46 +02:00
parent f56644e3ae
commit bbcf20ba01
3 changed files with 41 additions and 34 deletions

View file

@ -104,6 +104,13 @@ type dedupedSeriesSet struct {
// DeduplicateSeriesSet merges two SeriesSet and removes duplicates. // DeduplicateSeriesSet merges two SeriesSet and removes duplicates.
// If two series exist in both sets, their datapoints must be equal. // If two series exist in both sets, their datapoints must be equal.
func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet { func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet {
if a == nil {
return b
}
if b == nil {
return a
}
s := &dedupedSeriesSet{a: a, b: b} s := &dedupedSeriesSet{a: a, b: b}
s.adone = !s.a.Next() s.adone = !s.a.Next()
s.bdone = !s.b.Next() s.bdone = !s.b.Next()

View file

@ -340,12 +340,8 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
var set storage.SeriesSet var set storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
if set == nil {
set = q.Select(mset...)
} else {
set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) set = storage.DeduplicateSeriesSet(set, q.Select(mset...))
} }
}
metrics := []labels.Labels{} metrics := []labels.Labels{}

View file

@ -69,15 +69,20 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
defer q.Close() defer q.Close()
// TODO(fabxc): expose merge functionality in storage interface.
// We just concatenate results for all sets of matchers, which may produce
// duplicated results.
vec := make(promql.Vector, 0, 8000) vec := make(promql.Vector, 0, 8000)
var set storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
series := q.Select(mset...) set = storage.DeduplicateSeriesSet(set, q.Select(mset...))
for series.Next() { }
s := series.At() if set == nil {
return
}
for set.Next() {
s := set.At()
// TODO(fabxc): allow fast path for most recent sample either // TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus. // in the storage itself or caching layer in Prometheus.
it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6)) it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6))
@ -100,12 +105,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
Point: promql.Point{T: t, V: v}, Point: promql.Point{T: t, V: v},
}) })
} }
if series.Err() != nil { if set.Err() != nil {
federationErrors.Inc() federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
}
sort.Sort(byName(vec)) sort.Sort(byName(vec))