diff --git a/storage/fanout.go b/storage/fanout.go index 7a99d5f6ae..344200eb99 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -270,6 +270,7 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma priErr = qryResult.selectError } } + seriesSets = append(seriesSets, qryResult.set) } if priErr != nil { return nil, nil, priErr diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go new file mode 100644 index 0000000000..20eded7c86 --- /dev/null +++ b/storage/fanout/fanout_test.go @@ -0,0 +1,100 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/teststorage" + "github.com/prometheus/prometheus/util/testutil" + + "testing" +) + +func TestSelectSorted(t *testing.T) { + + inputLabel := labels.FromStrings(model.MetricNameLabel, "a") + outputLabel := labels.FromStrings(model.MetricNameLabel, "a") + + inputTotalSize := 0 + + priStorage := teststorage.New(t) + defer priStorage.Close() + app1, _ := priStorage.Appender() + app1.Add(inputLabel, 0, 0) + inputTotalSize++ + app1.Add(inputLabel, 1000, 1) + inputTotalSize++ + app1.Add(inputLabel, 2000, 2) + inputTotalSize++ + err := app1.Commit() + testutil.Ok(t, err) + + remoteStorage1 := teststorage.New(t) + defer remoteStorage1.Close() + app2, _ := remoteStorage1.Appender() + app2.Add(inputLabel, 3000, 3) + inputTotalSize++ + app2.Add(inputLabel, 4000, 4) + inputTotalSize++ + app2.Add(inputLabel, 5000, 5) + inputTotalSize++ + err = app2.Commit() + testutil.Ok(t, err) + + remoteStorage2 := teststorage.New(t) + defer remoteStorage2.Close() + + app3, _ := remoteStorage2.Appender() + app3.Add(inputLabel, 6000, 6) + inputTotalSize++ + app3.Add(inputLabel, 7000, 7) + inputTotalSize++ + app3.Add(inputLabel, 8000, 8) + inputTotalSize++ + + err = app3.Commit() + testutil.Ok(t, err) + + fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2) + + querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + testutil.Ok(t, err) + + seriesSet, _, err := querier.SelectSorted(nil, matcher) + testutil.Ok(t, err) + + result := make(map[int64]float64) + var labelsResult labels.Labels + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator() + for iterator.Next() { + timestamp, value := iterator.At() + result[timestamp] = value + } + } + + testutil.Equals(t, labelsResult, outputLabel) + testutil.Equals(t, inputTotalSize, len(result)) + +}