prometheus/storage/remote/read.go
Oleg Zaytsev b1ed4a0a66
LabelNames API with matchers (#9083)
* Push the matchers for LabelNames all the way into the index.

NB This doesn't actually implement it in the index, just plumbs it through for now...

Signed-off-by: Tom Wilkie <tom@grafana.com>

* Hack it up.  Does not work.

Signed-off-by: Tom Wilkie <tom@grafana.com>

* Revert changes I don't understand

Can't see why do we need to hold a mutex on symbols, and the purpose of
the LabelNamesFor method.

Maybe I'll need to re-add this later.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Implement LabelNamesFor

This method provides the label names that appear in the postings
provided. We do that deeper than the label values because we know
beforehand that most of the label names we'll be the same across
different postings, and we don't want to go down an up looking up the
same symbols for all different series.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Mutex on symbols should be unlocked

However, I still don't understand why do we need a mutex here.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Fix head.LabelNamesFor

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Implement mockIndex LabelNames with matchers

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Nitpick on slice initialisation

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Add tests for LabelNamesWithMatchers

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Fix the mutex mess on head.LabelValues/LabelNames

I still don't see why we need to grab that unrelated mutex, but at least
now we're grabbing it consistently

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Check error after iterating postings

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Use the error from posting when there was en error in postings

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Update storage/interface.go comment

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* Update tsdb/index/index.go comment

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* Update tsdb/index/index.go wrapped error msg

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* Update tsdb/index/index.go wrapped error msg

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* Update tsdb/index/index.go warpped error msg

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* Remove unneeded comment

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Add testcases for LabelNames w/matchers in api.go

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Use t.Cleanup() instead of defer in tests

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

Co-authored-by: Tom Wilkie <tom@grafana.com>
Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
2021-07-20 18:08:08 +05:30

284 lines
8.1 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
type sampleAndChunkQueryableClient struct {
client ReadClient
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
readRecent bool
callback startTimeCallback
}
// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
func NewSampleAndChunkQueryableClient(
c ReadClient,
externalLabels labels.Labels,
requiredMatchers []*labels.Matcher,
readRecent bool,
callback startTimeCallback,
) storage.SampleAndChunkQueryable {
return &sampleAndChunkQueryableClient{
client: c,
externalLabels: externalLabels,
requiredMatchers: requiredMatchers,
readRecent: readRecent,
callback: callback,
}
}
func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q := &querier{
ctx: ctx,
mint: mint,
maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
}
if c.readRecent {
return q, nil
}
var (
noop bool
err error
)
q.maxt, noop, err = c.preferLocalStorage(mint, maxt)
if err != nil {
return nil, err
}
if noop {
return storage.NoopQuerier(), nil
}
return q, nil
}
func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
cq := &chunkQuerier{
querier: querier{
ctx: ctx,
mint: mint,
maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
},
}
if c.readRecent {
return cq, nil
}
var (
noop bool
err error
)
cq.querier.maxt, noop, err = c.preferLocalStorage(mint, maxt)
if err != nil {
return nil, err
}
if noop {
return storage.NoopChunkedQuerier(), nil
}
return cq, nil
}
// preferLocalStorage returns noop if requested timeframe can be answered completely by the local TSDB, and
// reduces maxt if the timeframe can be partially answered by TSDB.
func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cmaxt int64, noop bool, err error) {
localStartTime, err := c.callback()
if err != nil {
return 0, false, err
}
cmaxt = maxt
// Avoid queries whose time range is later than the first timestamp in local DB.
if mint > localStartTime {
return 0, true, nil
}
// Query only samples older than the first timestamp in local DB.
if maxt > localStartTime {
cmaxt = localStartTime
}
return cmaxt, false, nil
}
type querier struct {
ctx context.Context
mint, maxt int64
client ReadClient
// Derived from configuration.
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
}
// Select implements storage.Querier and uses the given matchers to read series sets from the client.
// Select also adds equality matchers for all external labels to the list of matchers before calling remote endpoint.
// The added external labels are removed from the returned series sets.
//
// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the
// requiredMatchers. Otherwise it'll just call remote endpoint.
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if len(q.requiredMatchers) > 0 {
// Copy to not modify slice configured by user.
requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...)
for _, m := range matchers {
for i, r := range requiredMatchers {
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
// Requirement matched.
requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...)
break
}
}
if len(requiredMatchers) == 0 {
break
}
}
if len(requiredMatchers) > 0 {
return storage.NoopSeriesSet()
}
}
m, added := q.addExternalLabels(matchers)
query, err := ToQuery(q.mint, q.maxt, m, hints)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "toQuery"))
}
res, err := q.client.Read(q.ctx, query)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "remote_read"))
}
return newSeriesSetFilter(FromQueryResult(sortSeries, res), added)
}
// addExternalLabels adds matchers for each external label. External labels
// that already have a corresponding user-supplied matcher are skipped, as we
// assume that the user explicitly wants to select a different value for them.
// We return the new set of matchers, along with a map of labels for which
// matchers were added, so that these can later be removed from the result
// time series again.
func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) {
el := make(labels.Labels, len(q.externalLabels))
copy(el, q.externalLabels)
// ms won't be sorted, so have to O(n^2) the search.
for _, m := range ms {
for i := 0; i < len(el); {
if el[i].Name == m.Name {
el = el[:i+copy(el[i:], el[i+1:])]
continue
}
i++
}
}
for _, l := range el {
m, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
if err != nil {
panic(err)
}
ms = append(ms, m)
}
return ms, el
}
// LabelValues implements storage.Querier and is a noop.
func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}
// LabelNames implements storage.Querier and is a noop.
func (q *querier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}
// Close implements storage.Querier and is a noop.
func (q *querier) Close() error {
return nil
}
// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier.
type chunkQuerier struct {
querier
}
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client.
// It uses remote.querier.Select so it supports external labels and required matchers if specified.
func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket).
return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...))
}
func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet {
return &seriesSetFilter{
SeriesSet: ss,
toFilter: toFilter,
}
}
type seriesSetFilter struct {
storage.SeriesSet
toFilter labels.Labels
querier storage.Querier
}
func (ssf *seriesSetFilter) GetQuerier() storage.Querier {
return ssf.querier
}
func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) {
ssf.querier = querier
}
func (ssf seriesSetFilter) At() storage.Series {
return seriesFilter{
Series: ssf.SeriesSet.At(),
toFilter: ssf.toFilter,
}
}
type seriesFilter struct {
storage.Series
toFilter labels.Labels
}
func (sf seriesFilter) Labels() labels.Labels {
labels := sf.Series.Labels()
for i, j := 0, 0; i < len(labels) && j < len(sf.toFilter); {
if labels[i].Name < sf.toFilter[j].Name {
i++
} else if labels[i].Name > sf.toFilter[j].Name {
j++
} else {
labels = labels[:i+copy(labels[i:], labels[i+1:])]
j++
}
}
return labels
}