mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-28 14:12:10 -08:00
b1ed4a0a66
* Push the matchers for LabelNames all the way into the index. NB This doesn't actually implement it in the index, just plumbs it through for now... Signed-off-by: Tom Wilkie <tom@grafana.com> * Hack it up. Does not work. Signed-off-by: Tom Wilkie <tom@grafana.com> * Revert changes I don't understand Can't see why do we need to hold a mutex on symbols, and the purpose of the LabelNamesFor method. Maybe I'll need to re-add this later. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Implement LabelNamesFor This method provides the label names that appear in the postings provided. We do that deeper than the label values because we know beforehand that most of the label names we'll be the same across different postings, and we don't want to go down an up looking up the same symbols for all different series. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Mutex on symbols should be unlocked However, I still don't understand why do we need a mutex here. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Fix head.LabelNamesFor Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Implement mockIndex LabelNames with matchers Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Nitpick on slice initialisation Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Add tests for LabelNamesWithMatchers Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Fix the mutex mess on head.LabelValues/LabelNames I still don't see why we need to grab that unrelated mutex, but at least now we're grabbing it consistently Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Check error after iterating postings Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Use the error from posting when there was en error in postings Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Update storage/interface.go comment Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go comment Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go wrapped error msg Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go wrapped error msg Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Update tsdb/index/index.go warpped error msg Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Remove unneeded comment Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Add testcases for LabelNames w/matchers in api.go Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Use t.Cleanup() instead of defer in tests Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Tom Wilkie <tom@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
284 lines
8.1 KiB
Go
284 lines
8.1 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
)
|
|
|
|
type sampleAndChunkQueryableClient struct {
|
|
client ReadClient
|
|
externalLabels labels.Labels
|
|
requiredMatchers []*labels.Matcher
|
|
readRecent bool
|
|
callback startTimeCallback
|
|
}
|
|
|
|
// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
|
|
func NewSampleAndChunkQueryableClient(
|
|
c ReadClient,
|
|
externalLabels labels.Labels,
|
|
requiredMatchers []*labels.Matcher,
|
|
readRecent bool,
|
|
callback startTimeCallback,
|
|
) storage.SampleAndChunkQueryable {
|
|
return &sampleAndChunkQueryableClient{
|
|
client: c,
|
|
|
|
externalLabels: externalLabels,
|
|
requiredMatchers: requiredMatchers,
|
|
readRecent: readRecent,
|
|
callback: callback,
|
|
}
|
|
}
|
|
|
|
func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
|
q := &querier{
|
|
ctx: ctx,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
client: c.client,
|
|
externalLabels: c.externalLabels,
|
|
requiredMatchers: c.requiredMatchers,
|
|
}
|
|
if c.readRecent {
|
|
return q, nil
|
|
}
|
|
|
|
var (
|
|
noop bool
|
|
err error
|
|
)
|
|
q.maxt, noop, err = c.preferLocalStorage(mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if noop {
|
|
return storage.NoopQuerier(), nil
|
|
}
|
|
return q, nil
|
|
}
|
|
|
|
func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
|
cq := &chunkQuerier{
|
|
querier: querier{
|
|
ctx: ctx,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
client: c.client,
|
|
externalLabels: c.externalLabels,
|
|
requiredMatchers: c.requiredMatchers,
|
|
},
|
|
}
|
|
if c.readRecent {
|
|
return cq, nil
|
|
}
|
|
|
|
var (
|
|
noop bool
|
|
err error
|
|
)
|
|
cq.querier.maxt, noop, err = c.preferLocalStorage(mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if noop {
|
|
return storage.NoopChunkedQuerier(), nil
|
|
}
|
|
return cq, nil
|
|
}
|
|
|
|
// preferLocalStorage returns noop if requested timeframe can be answered completely by the local TSDB, and
|
|
// reduces maxt if the timeframe can be partially answered by TSDB.
|
|
func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cmaxt int64, noop bool, err error) {
|
|
localStartTime, err := c.callback()
|
|
if err != nil {
|
|
return 0, false, err
|
|
}
|
|
cmaxt = maxt
|
|
|
|
// Avoid queries whose time range is later than the first timestamp in local DB.
|
|
if mint > localStartTime {
|
|
return 0, true, nil
|
|
}
|
|
// Query only samples older than the first timestamp in local DB.
|
|
if maxt > localStartTime {
|
|
cmaxt = localStartTime
|
|
}
|
|
return cmaxt, false, nil
|
|
}
|
|
|
|
type querier struct {
|
|
ctx context.Context
|
|
mint, maxt int64
|
|
client ReadClient
|
|
|
|
// Derived from configuration.
|
|
externalLabels labels.Labels
|
|
requiredMatchers []*labels.Matcher
|
|
}
|
|
|
|
// Select implements storage.Querier and uses the given matchers to read series sets from the client.
|
|
// Select also adds equality matchers for all external labels to the list of matchers before calling remote endpoint.
|
|
// The added external labels are removed from the returned series sets.
|
|
//
|
|
// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the
|
|
// requiredMatchers. Otherwise it'll just call remote endpoint.
|
|
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
|
if len(q.requiredMatchers) > 0 {
|
|
// Copy to not modify slice configured by user.
|
|
requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...)
|
|
for _, m := range matchers {
|
|
for i, r := range requiredMatchers {
|
|
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
|
|
// Requirement matched.
|
|
requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
if len(requiredMatchers) == 0 {
|
|
break
|
|
}
|
|
}
|
|
if len(requiredMatchers) > 0 {
|
|
return storage.NoopSeriesSet()
|
|
}
|
|
}
|
|
|
|
m, added := q.addExternalLabels(matchers)
|
|
query, err := ToQuery(q.mint, q.maxt, m, hints)
|
|
if err != nil {
|
|
return storage.ErrSeriesSet(errors.Wrap(err, "toQuery"))
|
|
}
|
|
|
|
res, err := q.client.Read(q.ctx, query)
|
|
if err != nil {
|
|
return storage.ErrSeriesSet(errors.Wrap(err, "remote_read"))
|
|
}
|
|
return newSeriesSetFilter(FromQueryResult(sortSeries, res), added)
|
|
}
|
|
|
|
// addExternalLabels adds matchers for each external label. External labels
|
|
// that already have a corresponding user-supplied matcher are skipped, as we
|
|
// assume that the user explicitly wants to select a different value for them.
|
|
// We return the new set of matchers, along with a map of labels for which
|
|
// matchers were added, so that these can later be removed from the result
|
|
// time series again.
|
|
func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) {
|
|
el := make(labels.Labels, len(q.externalLabels))
|
|
copy(el, q.externalLabels)
|
|
|
|
// ms won't be sorted, so have to O(n^2) the search.
|
|
for _, m := range ms {
|
|
for i := 0; i < len(el); {
|
|
if el[i].Name == m.Name {
|
|
el = el[:i+copy(el[i:], el[i+1:])]
|
|
continue
|
|
}
|
|
i++
|
|
}
|
|
}
|
|
|
|
for _, l := range el {
|
|
m, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
ms = append(ms, m)
|
|
}
|
|
return ms, el
|
|
}
|
|
|
|
// LabelValues implements storage.Querier and is a noop.
|
|
func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
|
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
|
|
return nil, nil, errors.New("not implemented")
|
|
}
|
|
|
|
// LabelNames implements storage.Querier and is a noop.
|
|
func (q *querier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) {
|
|
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
|
|
return nil, nil, errors.New("not implemented")
|
|
}
|
|
|
|
// Close implements storage.Querier and is a noop.
|
|
func (q *querier) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier.
|
|
type chunkQuerier struct {
|
|
querier
|
|
}
|
|
|
|
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client.
|
|
// It uses remote.querier.Select so it supports external labels and required matchers if specified.
|
|
func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
|
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket).
|
|
return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...))
|
|
}
|
|
|
|
func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet {
|
|
return &seriesSetFilter{
|
|
SeriesSet: ss,
|
|
toFilter: toFilter,
|
|
}
|
|
}
|
|
|
|
type seriesSetFilter struct {
|
|
storage.SeriesSet
|
|
toFilter labels.Labels
|
|
querier storage.Querier
|
|
}
|
|
|
|
func (ssf *seriesSetFilter) GetQuerier() storage.Querier {
|
|
return ssf.querier
|
|
}
|
|
|
|
func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) {
|
|
ssf.querier = querier
|
|
}
|
|
|
|
func (ssf seriesSetFilter) At() storage.Series {
|
|
return seriesFilter{
|
|
Series: ssf.SeriesSet.At(),
|
|
toFilter: ssf.toFilter,
|
|
}
|
|
}
|
|
|
|
type seriesFilter struct {
|
|
storage.Series
|
|
toFilter labels.Labels
|
|
}
|
|
|
|
func (sf seriesFilter) Labels() labels.Labels {
|
|
labels := sf.Series.Labels()
|
|
for i, j := 0, 0; i < len(labels) && j < len(sf.toFilter); {
|
|
if labels[i].Name < sf.toFilter[j].Name {
|
|
i++
|
|
} else if labels[i].Name > sf.toFilter[j].Name {
|
|
j++
|
|
} else {
|
|
labels = labels[:i+copy(labels[i:], labels[i+1:])]
|
|
j++
|
|
}
|
|
}
|
|
return labels
|
|
}
|