storage: improve label matching and allow unset matching.

Matching of empty labels now also matches metrics where the label
was not explicitly set to the empty string.
This commit is contained in:
Fabian Reinartz 2015-06-15 18:25:31 +02:00
parent 46df1fd5ea
commit 5b91ea9b36
6 changed files with 252 additions and 119 deletions

View file

@ -72,27 +72,24 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...)
n.iterators = make(map[clientmodel.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset)
fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers)
n.fingerprints = fpts
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
for _, fp := range fpts {
for fp := range n.metrics {
// Only add the fingerprint to the instants if not yet present in the
// ranges. Ranges always contain more points and span more time than
// instants for the same offset.
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
pt.instants[fp] = struct{}{}
}
n.metrics[fp] = a.Storage.MetricForFingerprint(fp)
}
case *MatrixSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...)
n.iterators = make(map[clientmodel.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset)
fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers)
n.fingerprints = fpts
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
for _, fp := range fpts {
for fp := range n.metrics {
if pt.ranges[fp] < n.Range {
pt.ranges[fp] = n.Range
// Delete the fingerprint from the instants. Ranges always contain more
@ -100,7 +97,6 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
// an instant for the same fingerprint, should we have one.
delete(pt.instants, fp)
}
n.metrics[fp] = a.Storage.MetricForFingerprint(fp)
}
}
return true
@ -157,11 +153,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
for _, fp := range n.fingerprints {
for fp := range n.metrics {
n.iterators[fp] = a.Storage.NewIterator(fp)
}
case *MatrixSelector:
for _, fp := range n.fingerprints {
for fp := range n.metrics {
n.iterators[fp] = a.Storage.NewIterator(fp)
}
}

View file

@ -161,8 +161,6 @@ type MatrixSelector struct {
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
}
// NumberLiteral represents a number.
@ -197,8 +195,6 @@ type VectorSelector struct {
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
}
func (e *AggregateExpr) Type() ExprType { return ExprVector }

View file

@ -36,9 +36,10 @@ type Storage interface {
// NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query.
NewPreloader() Preloader
// Get all of the metric fingerprints that are associated with the
// provided label matchers.
FingerprintsForLabelMatchers(metric.LabelMatchers) clientmodel.Fingerprints
// MetricsForLabelMatchers returns the metrics from storage that satisfy the given
// label matchers. At least one label matcher must be specified that does not
// match the empty string.
MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[clientmodel.Fingerprint]clientmodel.COWMetric
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues
// Get the metric associated with the provided fingerprint.

View file

@ -336,54 +336,22 @@ func (s *memorySeriesStorage) NewPreloader() Preloader {
}
}
// FingerprintsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) FingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
// fingerprintsForLabelPairs returns the set of fingerprints that have the given labels.
// This does not work with empty label values.
func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...metric.LabelPair) map[clientmodel.Fingerprint]struct{} {
var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers {
for _, pair := range pairs {
intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type {
case metric.Equal:
fps, err := s.persistence.fingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil {
log.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
default:
values, err := s.persistence.labelValuesForLabelName(matcher.Name)
if err != nil {
log.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.fingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
log.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
fps, err := s.persistence.fingerprintsForLabelPair(pair)
if err != nil {
log.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
if len(intersection) == 0 {
@ -391,12 +359,73 @@ func (s *memorySeriesStorage) FingerprintsForLabelMatchers(labelMatchers metric.
}
result = intersection
}
return result
}
fps := make(clientmodel.Fingerprints, 0, len(result))
for fp := range result {
fps = append(fps, fp)
// MetricsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[clientmodel.Fingerprint]clientmodel.COWMetric {
var (
equals []metric.LabelPair
filters []*metric.LabelMatcher
)
for _, lm := range matchers {
if lm.Type == metric.Equal && lm.Value != "" {
equals = append(equals, metric.LabelPair{
Name: lm.Name,
Value: lm.Value,
})
} else {
filters = append(filters, lm)
}
}
return fps
var resFPs map[clientmodel.Fingerprint]struct{}
// If we cannot make a preselection based on equality matchers, expanding the other matchers to labels
// and intersecting their fingerprints is still likely to be the best choice.
if len(equals) > 0 {
resFPs = s.fingerprintsForLabelPairs(equals...)
}
var remaining metric.LabelMatchers
for _, matcher := range filters {
// Equal matches are all empty values.
if matcher.Match("") {
remaining = append(remaining, matcher)
continue
}
intersection := map[clientmodel.Fingerprint]struct{}{}
matches := matcher.Filter(s.LabelValuesForLabelName(matcher.Name))
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps := s.fingerprintsForLabelPairs(metric.LabelPair{
Name: matcher.Name,
Value: v,
})
for fp := range fps {
if _, ok := resFPs[fp]; ok || resFPs == nil {
intersection[fp] = struct{}{}
}
}
}
resFPs = intersection
}
// The intersected matchers no longer need to be compared against the actual metrics.
filters = remaining
result := make(map[clientmodel.Fingerprint]clientmodel.COWMetric, len(resFPs))
for fp := range resFPs {
result[fp] = s.MetricForFingerprint(fp)
}
for _, matcher := range filters {
for fp, met := range result {
if !matcher.Match(met.Metric[matcher.Name]) {
delete(result, fp)
}
}
}
return result
}
// LabelValuesForLabelName implements Storage.

View file

@ -29,7 +29,7 @@ import (
"github.com/prometheus/prometheus/util/testutil"
)
func TestFingerprintsForLabelMatchers(t *testing.T) {
func TestMatches(t *testing.T) {
storage, closer := NewTestStorage(t, 1)
defer closer.Close()
@ -41,6 +41,7 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
clientmodel.MetricNameLabel: clientmodel.LabelValue(fmt.Sprintf("test_metric_%d", i)),
"label1": clientmodel.LabelValue(fmt.Sprintf("test_%d", i/10)),
"label2": clientmodel.LabelValue(fmt.Sprintf("test_%d", (i+5)/10)),
"all": "const",
}
samples[i] = &clientmodel.Sample{
Metric: metric,
@ -82,15 +83,22 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
expected: fingerprints[5:10],
},
{
matchers: metric.LabelMatchers{newMatcher(metric.NotEqual, "label1", "x")},
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "x"),
},
expected: fingerprints,
},
{
matchers: metric.LabelMatchers{newMatcher(metric.NotEqual, "label1", "test_0")},
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "test_0"),
},
expected: fingerprints[10:],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.NotEqual, "label1", "test_1"),
newMatcher(metric.NotEqual, "label1", "test_2"),
@ -98,11 +106,44 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
expected: fingerprints[30:],
},
{
matchers: metric.LabelMatchers{newMatcher(metric.RegexMatch, "label1", `test_[3-5]`)},
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "label1", ""),
},
expected: fingerprints[:0],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.Equal, "label1", ""),
},
expected: fingerprints[:0],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.Equal, "label2", ""),
},
expected: fingerprints[:0],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.Equal, "not_existant", ""),
},
expected: fingerprints[10:],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.RegexMatch, "label1", `test_[3-5]`),
},
expected: fingerprints[30:60],
},
{
matchers: metric.LabelMatchers{newMatcher(metric.RegexNoMatch, "label1", `test_[3-5]`)},
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.RegexNoMatch, "label1", `test_[3-5]`),
},
expected: append(append(clientmodel.Fingerprints{}, fingerprints[:30]...), fingerprints[60:]...),
},
{
@ -122,11 +163,11 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
}
for _, mt := range matcherTests {
resfps := storage.FingerprintsForLabelMatchers(mt.matchers)
if len(mt.expected) != len(resfps) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(resfps))
res := storage.MetricsForLabelMatchers(mt.matchers...)
if len(mt.expected) != len(res) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res))
}
for _, fp1 := range resfps {
for fp1 := range res {
found := false
for _, fp2 := range mt.expected {
if fp1 == fp2 {
@ -141,6 +182,86 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
}
}
func TestFingerprintsForLabels(t *testing.T) {
storage, closer := NewTestStorage(t, 1)
defer closer.Close()
samples := make([]*clientmodel.Sample, 100)
fingerprints := make(clientmodel.Fingerprints, 100)
for i := range samples {
metric := clientmodel.Metric{
clientmodel.MetricNameLabel: clientmodel.LabelValue(fmt.Sprintf("test_metric_%d", i)),
"label1": clientmodel.LabelValue(fmt.Sprintf("test_%d", i/10)),
"label2": clientmodel.LabelValue(fmt.Sprintf("test_%d", (i+5)/10)),
}
samples[i] = &clientmodel.Sample{
Metric: metric,
Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(i),
}
fingerprints[i] = metric.FastFingerprint()
}
for _, s := range samples {
storage.Append(s)
}
storage.WaitForIndexing()
var matcherTests = []struct {
pairs []metric.LabelPair
expected clientmodel.Fingerprints
}{
{
pairs: []metric.LabelPair{{"label1", "x"}},
expected: fingerprints[:0],
},
{
pairs: []metric.LabelPair{{"label1", "test_0"}},
expected: fingerprints[:10],
},
{
pairs: []metric.LabelPair{
{"label1", "test_0"},
{"label1", "test_1"},
},
expected: fingerprints[:0],
},
{
pairs: []metric.LabelPair{
{"label1", "test_0"},
{"label2", "test_1"},
},
expected: fingerprints[5:10],
},
{
pairs: []metric.LabelPair{
{"label1", "test_1"},
{"label2", "test_2"},
},
expected: fingerprints[15:20],
},
}
for _, mt := range matcherTests {
resfps := storage.fingerprintsForLabelPairs(mt.pairs...)
if len(mt.expected) != len(resfps) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.pairs, len(resfps))
}
for fp1 := range resfps {
found := false
for _, fp2 := range mt.expected {
if fp1 == fp2 {
found = true
break
}
}
if !found {
t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.pairs)
}
}
}
}
var benchLabelMatchingRes map[clientmodel.Fingerprint]clientmodel.COWMetric
func BenchmarkLabelMatching(b *testing.B) {
@ -226,9 +347,7 @@ func BenchmarkLabelMatching(b *testing.B) {
for i := 0; i < b.N; i++ {
benchLabelMatchingRes = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
for _, mt := range matcherTests {
for _, fp := range s.FingerprintsForLabelMatchers(mt) {
benchLabelMatchingRes[fp] = s.MetricForFingerprint(fp)
}
benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...)
}
}
// Stop timer to not count the storage closing.
@ -257,17 +376,17 @@ func TestRetentionCutoff(t *testing.T) {
}
s.WaitForIndexing()
lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test")
if err != nil {
t.Fatalf("error creating label matcher: %s", err)
var fp clientmodel.Fingerprint
for f := range s.fingerprintsForLabelPairs(metric.LabelPair{"job", "test"}) {
fp = f
break
}
fp := s.FingerprintsForLabelMatchers(metric.LabelMatchers{lm})[0]
pl := s.NewPreloader()
defer pl.Close()
// Preload everything.
err = pl.PreloadRange(fp, insertStart, now, 5*time.Minute)
err := pl.PreloadRange(fp, insertStart, now, 5*time.Minute)
if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err)
}
@ -322,56 +441,50 @@ func TestDropMetrics(t *testing.T) {
}
s.WaitForIndexing()
matcher := metric.LabelMatchers{{
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "test",
}}
fps := s.FingerprintsForLabelMatchers(matcher)
fps := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
if len(fps) != 2 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps))
}
it := s.NewIterator(fps[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
it = s.NewIterator(fps[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
var fpList clientmodel.Fingerprints
for fp := range fps {
it := s.NewIterator(fp)
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
fpList = append(fpList, fp)
}
s.DropMetricsForFingerprints(fps[0])
s.DropMetricsForFingerprints(fpList[0])
s.WaitForIndexing()
fps2 := s.FingerprintsForLabelMatchers(matcher)
fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
if len(fps2) != 1 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps2))
}
it = s.NewIterator(fps[0])
it := s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
it = s.NewIterator(fps[1])
it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
s.DropMetricsForFingerprints(fps...)
s.DropMetricsForFingerprints(fpList...)
s.WaitForIndexing()
fps3 := s.FingerprintsForLabelMatchers(matcher)
fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
if len(fps3) != 0 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps3))
}
it = s.NewIterator(fps[0])
it = s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
it = s.NewIterator(fps[1])
it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals))
}

View file

@ -188,23 +188,21 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
}
fps := map[clientmodel.Fingerprint]struct{}{}
res := map[clientmodel.Fingerprint]clientmodel.COWMetric{}
for _, lm := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(lm)
if err != nil {
return nil, &apiError{errorBadData, err}
}
for _, fp := range api.Storage.FingerprintsForLabelMatchers(matchers) {
fps[fp] = struct{}{}
for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) {
res[fp] = met
}
}
metrics := make([]clientmodel.Metric, 0, len(fps))
for fp := range fps {
if met := api.Storage.MetricForFingerprint(fp).Metric; met != nil {
metrics = append(metrics, met)
}
metrics := make([]clientmodel.Metric, 0, len(res))
for _, met := range res {
metrics = append(metrics, met.Metric)
}
return metrics, nil
}
@ -221,7 +219,7 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
if err != nil {
return nil, &apiError{errorBadData, err}
}
for _, fp := range api.Storage.FingerprintsForLabelMatchers(matchers) {
for fp := range api.Storage.MetricsForLabelMatchers(matchers...) {
fps[fp] = struct{}{}
}
}