mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
b1ed4a0a66
* Push the matchers for LabelNames all the way into the index. NB This doesn't actually implement it in the index, just plumbs it through for now... Signed-off-by: Tom Wilkie <tom@grafana.com> * Hack it up. Does not work. Signed-off-by: Tom Wilkie <tom@grafana.com> * Revert changes I don't understand Can't see why do we need to hold a mutex on symbols, and the purpose of the LabelNamesFor method. Maybe I'll need to re-add this later. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Implement LabelNamesFor This method provides the label names that appear in the postings provided. We do that deeper than the label values because we know beforehand that most of the label names we'll be the same across different postings, and we don't want to go down an up looking up the same symbols for all different series. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Mutex on symbols should be unlocked However, I still don't understand why do we need a mutex here. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Fix head.LabelNamesFor Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Implement mockIndex LabelNames with matchers Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Nitpick on slice initialisation Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Add tests for LabelNamesWithMatchers Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Fix the mutex mess on head.LabelValues/LabelNames I still don't see why we need to grab that unrelated mutex, but at least now we're grabbing it consistently Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Check error after iterating postings Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Use the error from posting when there was en error in postings Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Update storage/interface.go comment Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go comment Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go wrapped error msg Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go wrapped error msg Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go warpped error msg Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Remove unneeded comment Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Add testcases for LabelNames w/matchers in api.go Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Use t.Cleanup() instead of defer in tests Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Tom Wilkie <tom@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
246 lines
6.7 KiB
Go
246 lines
6.7 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/teststorage"
|
|
)
|
|
|
|
func TestFanout_SelectSorted(t *testing.T) {
|
|
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
|
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
|
|
|
inputTotalSize := 0
|
|
ctx := context.Background()
|
|
|
|
priStorage := teststorage.New(t)
|
|
defer priStorage.Close()
|
|
app1 := priStorage.Appender(ctx)
|
|
app1.Append(0, inputLabel, 0, 0)
|
|
inputTotalSize++
|
|
app1.Append(0, inputLabel, 1000, 1)
|
|
inputTotalSize++
|
|
app1.Append(0, inputLabel, 2000, 2)
|
|
inputTotalSize++
|
|
err := app1.Commit()
|
|
require.NoError(t, err)
|
|
|
|
remoteStorage1 := teststorage.New(t)
|
|
defer remoteStorage1.Close()
|
|
app2 := remoteStorage1.Appender(ctx)
|
|
app2.Append(0, inputLabel, 3000, 3)
|
|
inputTotalSize++
|
|
app2.Append(0, inputLabel, 4000, 4)
|
|
inputTotalSize++
|
|
app2.Append(0, inputLabel, 5000, 5)
|
|
inputTotalSize++
|
|
err = app2.Commit()
|
|
require.NoError(t, err)
|
|
|
|
remoteStorage2 := teststorage.New(t)
|
|
defer remoteStorage2.Close()
|
|
|
|
app3 := remoteStorage2.Appender(ctx)
|
|
app3.Append(0, inputLabel, 6000, 6)
|
|
inputTotalSize++
|
|
app3.Append(0, inputLabel, 7000, 7)
|
|
inputTotalSize++
|
|
app3.Append(0, inputLabel, 8000, 8)
|
|
inputTotalSize++
|
|
|
|
err = app3.Commit()
|
|
require.NoError(t, err)
|
|
|
|
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
|
|
|
|
t.Run("querier", func(t *testing.T) {
|
|
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
|
require.NoError(t, err)
|
|
|
|
seriesSet := querier.Select(true, nil, matcher)
|
|
|
|
result := make(map[int64]float64)
|
|
var labelsResult labels.Labels
|
|
for seriesSet.Next() {
|
|
series := seriesSet.At()
|
|
seriesLabels := series.Labels()
|
|
labelsResult = seriesLabels
|
|
iterator := series.Iterator()
|
|
for iterator.Next() {
|
|
timestamp, value := iterator.At()
|
|
result[timestamp] = value
|
|
}
|
|
}
|
|
|
|
require.Equal(t, labelsResult, outputLabel)
|
|
require.Equal(t, inputTotalSize, len(result))
|
|
})
|
|
t.Run("chunk querier", func(t *testing.T) {
|
|
querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
|
require.NoError(t, err)
|
|
|
|
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(true, nil, matcher))
|
|
|
|
result := make(map[int64]float64)
|
|
var labelsResult labels.Labels
|
|
for seriesSet.Next() {
|
|
series := seriesSet.At()
|
|
seriesLabels := series.Labels()
|
|
labelsResult = seriesLabels
|
|
iterator := series.Iterator()
|
|
for iterator.Next() {
|
|
timestamp, value := iterator.At()
|
|
result[timestamp] = value
|
|
}
|
|
}
|
|
|
|
require.NoError(t, seriesSet.Err())
|
|
require.Equal(t, labelsResult, outputLabel)
|
|
require.Equal(t, inputTotalSize, len(result))
|
|
})
|
|
}
|
|
|
|
func TestFanoutErrors(t *testing.T) {
|
|
workingStorage := teststorage.New(t)
|
|
defer workingStorage.Close()
|
|
|
|
cases := []struct {
|
|
primary storage.Storage
|
|
secondary storage.Storage
|
|
warning error
|
|
err error
|
|
}{
|
|
{
|
|
primary: workingStorage,
|
|
secondary: errStorage{},
|
|
warning: errSelect,
|
|
err: nil,
|
|
},
|
|
{
|
|
primary: errStorage{},
|
|
secondary: workingStorage,
|
|
warning: nil,
|
|
err: errSelect,
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary)
|
|
|
|
t.Run("samples", func(t *testing.T) {
|
|
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
|
|
ss := querier.Select(true, nil, matcher)
|
|
|
|
// Exhaust.
|
|
for ss.Next() {
|
|
ss.At()
|
|
}
|
|
|
|
if tc.err != nil {
|
|
require.Error(t, ss.Err())
|
|
require.Equal(t, tc.err.Error(), ss.Err().Error())
|
|
}
|
|
|
|
if tc.warning != nil {
|
|
require.Greater(t, len(ss.Warnings()), 0, "warnings expected")
|
|
require.Error(t, ss.Warnings()[0])
|
|
require.Equal(t, tc.warning.Error(), ss.Warnings()[0].Error())
|
|
}
|
|
})
|
|
t.Run("chunks", func(t *testing.T) {
|
|
t.Skip("enable once TestStorage and TSDB implements ChunkQuerier")
|
|
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
|
|
ss := querier.Select(true, nil, matcher)
|
|
|
|
// Exhaust.
|
|
for ss.Next() {
|
|
ss.At()
|
|
}
|
|
|
|
if tc.err != nil {
|
|
require.Error(t, ss.Err())
|
|
require.Equal(t, tc.err.Error(), ss.Err().Error())
|
|
}
|
|
|
|
if tc.warning != nil {
|
|
require.Greater(t, len(ss.Warnings()), 0, "warnings expected")
|
|
require.Error(t, ss.Warnings()[0])
|
|
require.Equal(t, tc.warning.Error(), ss.Warnings()[0].Error())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
var errSelect = errors.New("select error")
|
|
|
|
type errStorage struct{}
|
|
|
|
type errQuerier struct{}
|
|
|
|
func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) {
|
|
return errQuerier{}, nil
|
|
}
|
|
|
|
type errChunkQuerier struct{ errQuerier }
|
|
|
|
func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) {
|
|
return errChunkQuerier{}, nil
|
|
}
|
|
func (errStorage) Appender(_ context.Context) storage.Appender { return nil }
|
|
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
|
func (errStorage) Close() error { return nil }
|
|
|
|
func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
|
return storage.ErrSeriesSet(errSelect)
|
|
}
|
|
|
|
func (errQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
|
return nil, nil, errors.New("label values error")
|
|
}
|
|
|
|
func (errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) {
|
|
return nil, nil, errors.New("label names error")
|
|
}
|
|
|
|
func (errQuerier) Close() error { return nil }
|
|
|
|
func (errChunkQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
|
|
return storage.ErrChunkSeriesSet(errSelect)
|
|
}
|