Merge pull request #810 from prometheus/fabxc/lmatch

Match empty labels.
This commit is contained in:
Fabian Reinartz 2015-06-22 15:45:50 +02:00
commit 1eff186555
10 changed files with 381 additions and 118 deletions

View file

@ -72,27 +72,24 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
Inspect(a.Expr, func(node Node) bool { Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...)
n.iterators = make(map[clientmodel.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset) pt := getPreloadTimes(n.Offset)
fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers) for fp := range n.metrics {
n.fingerprints = fpts
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
for _, fp := range fpts {
// Only add the fingerprint to the instants if not yet present in the // Only add the fingerprint to the instants if not yet present in the
// ranges. Ranges always contain more points and span more time than // ranges. Ranges always contain more points and span more time than
// instants for the same offset. // instants for the same offset.
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges { if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
pt.instants[fp] = struct{}{} pt.instants[fp] = struct{}{}
} }
n.metrics[fp] = a.Storage.MetricForFingerprint(fp)
} }
case *MatrixSelector: case *MatrixSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...)
n.iterators = make(map[clientmodel.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset) pt := getPreloadTimes(n.Offset)
fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers) for fp := range n.metrics {
n.fingerprints = fpts
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
for _, fp := range fpts {
if pt.ranges[fp] < n.Range { if pt.ranges[fp] < n.Range {
pt.ranges[fp] = n.Range pt.ranges[fp] = n.Range
// Delete the fingerprint from the instants. Ranges always contain more // Delete the fingerprint from the instants. Ranges always contain more
@ -100,7 +97,6 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
// an instant for the same fingerprint, should we have one. // an instant for the same fingerprint, should we have one.
delete(pt.instants, fp) delete(pt.instants, fp)
} }
n.metrics[fp] = a.Storage.MetricForFingerprint(fp)
} }
} }
return true return true
@ -157,11 +153,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
Inspect(a.Expr, func(node Node) bool { Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
for _, fp := range n.fingerprints { for fp := range n.metrics {
n.iterators[fp] = a.Storage.NewIterator(fp) n.iterators[fp] = a.Storage.NewIterator(fp)
} }
case *MatrixSelector: case *MatrixSelector:
for _, fp := range n.fingerprints { for fp := range n.metrics {
n.iterators[fp] = a.Storage.NewIterator(fp) n.iterators[fp] = a.Storage.NewIterator(fp)
} }
} }

View file

@ -161,8 +161,6 @@ type MatrixSelector struct {
// The series iterators are populated at query analysis time. // The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
} }
// NumberLiteral represents a number. // NumberLiteral represents a number.
@ -197,8 +195,6 @@ type VectorSelector struct {
// The series iterators are populated at query analysis time. // The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
} }
func (e *AggregateExpr) Type() ExprType { return ExprVector } func (e *AggregateExpr) Type() ExprType { return ExprVector }

View file

@ -883,6 +883,25 @@ func (p *parser) vectorSelector(name string) *VectorSelector {
if len(matchers) == 0 { if len(matchers) == 0 {
p.errorf("vector selector must contain label matchers or metric name") p.errorf("vector selector must contain label matchers or metric name")
} }
// A vector selector must contain at least one non-empty matcher to prevent
// implicit selection of all metrics (e.g. by a typo).
notEmpty := false
for _, lm := range matchers {
// Matching changes the inner state of the regex and causes reflect.DeepEqual
// to return false, which break tests.
// Thus, we create a new label matcher for this testing.
lm, err := metric.NewLabelMatcher(lm.Type, lm.Name, lm.Value)
if err != nil {
p.error(err)
}
if !lm.Match("") {
notEmpty = true
break
}
}
if !notEmpty {
p.errorf("vector selector must contain at least one non-empty matcher")
}
var err error var err error
var offset time.Duration var offset time.Duration

View file

@ -587,6 +587,22 @@ var testExpr = []struct {
input: `{}`, input: `{}`,
fail: true, fail: true,
errMsg: "vector selector must contain label matchers or metric name", errMsg: "vector selector must contain label matchers or metric name",
}, {
input: `{x=""}`,
fail: true,
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `{x=~".*"}`,
fail: true,
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `{x!~".+"}`,
fail: true,
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `{x!="a"}`,
fail: true,
errMsg: "vector selector must contain at least one non-empty matcher",
}, { }, {
input: `foo{__name__="bar"}`, input: `foo{__name__="bar"}`,
fail: true, fail: true,

View file

@ -378,7 +378,7 @@ eval instant at 50m drop_common_labels(http_requests{group="production",job="api
http_requests{instance="0"} 100 http_requests{instance="0"} 100
http_requests{instance="1"} 200 http_requests{instance="1"} 200
eval instant at 50m {__name__=~".*"} eval instant at 50m {__name__=~".+"}
http_requests{group="canary", instance="0", job="api-server"} 300 http_requests{group="canary", instance="0", job="api-server"} 300
http_requests{group="canary", instance="0", job="app-server"} 700 http_requests{group="canary", instance="0", job="app-server"} 700
http_requests{group="canary", instance="1", job="api-server"} 400 http_requests{group="canary", instance="1", job="api-server"} 400

View file

@ -36,9 +36,10 @@ type Storage interface {
// NewPreloader returns a new Preloader which allows preloading and pinning // NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query. // series data into memory for use within a query.
NewPreloader() Preloader NewPreloader() Preloader
// Get all of the metric fingerprints that are associated with the // MetricsForLabelMatchers returns the metrics from storage that satisfy the given
// provided label matchers. // label matchers. At least one label matcher must be specified that does not
FingerprintsForLabelMatchers(metric.LabelMatchers) clientmodel.Fingerprints // match the empty string.
MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[clientmodel.Fingerprint]clientmodel.COWMetric
// Get all of the label values that are associated with a given label name. // Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues LabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.

View file

@ -365,19 +365,13 @@ func (s *memorySeriesStorage) NewPreloader() Preloader {
} }
} }
// FingerprintsForLabelMatchers implements Storage. // fingerprintsForLabelPairs returns the set of fingerprints that have the given labels.
func (s *memorySeriesStorage) FingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints { // This does not work with empty label values.
func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...metric.LabelPair) map[clientmodel.Fingerprint]struct{} {
var result map[clientmodel.Fingerprint]struct{} var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers { for _, pair := range pairs {
intersection := map[clientmodel.Fingerprint]struct{}{} intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type { fps, err := s.persistence.fingerprintsForLabelPair(pair)
case metric.Equal:
fps, err := s.persistence.fingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil { if err != nil {
log.Error("Error getting fingerprints for label pair: ", err) log.Error("Error getting fingerprints for label pair: ", err)
} }
@ -389,43 +383,78 @@ func (s *memorySeriesStorage) FingerprintsForLabelMatchers(labelMatchers metric.
intersection[fp] = struct{}{} intersection[fp] = struct{}{}
} }
} }
default:
values, err := s.persistence.labelValuesForLabelName(matcher.Name)
if err != nil {
log.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.fingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
log.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
}
}
if len(intersection) == 0 { if len(intersection) == 0 {
return nil return nil
} }
result = intersection result = intersection
} }
return result
}
fps := make(clientmodel.Fingerprints, 0, len(result)) // MetricsForLabelMatchers implements Storage.
for fp := range result { func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[clientmodel.Fingerprint]clientmodel.COWMetric {
fps = append(fps, fp) var (
equals []metric.LabelPair
filters []*metric.LabelMatcher
)
for _, lm := range matchers {
if lm.Type == metric.Equal && lm.Value != "" {
equals = append(equals, metric.LabelPair{
Name: lm.Name,
Value: lm.Value,
})
} else {
filters = append(filters, lm)
} }
return fps }
var resFPs map[clientmodel.Fingerprint]struct{}
// If we cannot make a preselection based on equality matchers, expanding the other matchers to labels
// and intersecting their fingerprints is still likely to be the best choice.
if len(equals) > 0 {
resFPs = s.fingerprintsForLabelPairs(equals...)
}
var remaining metric.LabelMatchers
for _, matcher := range filters {
// Equal matches are all empty values.
if matcher.Match("") {
remaining = append(remaining, matcher)
continue
}
intersection := map[clientmodel.Fingerprint]struct{}{}
matches := matcher.Filter(s.LabelValuesForLabelName(matcher.Name))
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps := s.fingerprintsForLabelPairs(metric.LabelPair{
Name: matcher.Name,
Value: v,
})
for fp := range fps {
if _, ok := resFPs[fp]; ok || resFPs == nil {
intersection[fp] = struct{}{}
}
}
}
resFPs = intersection
}
// The intersected matchers no longer need to be compared against the actual metrics.
filters = remaining
result := make(map[clientmodel.Fingerprint]clientmodel.COWMetric, len(resFPs))
for fp := range resFPs {
result[fp] = s.MetricForFingerprint(fp)
}
for _, matcher := range filters {
for fp, met := range result {
if !matcher.Match(met.Metric[matcher.Name]) {
delete(result, fp)
}
}
}
return result
} }
// LabelValuesForLabelName implements Storage. // LabelValuesForLabelName implements Storage.

View file

@ -15,6 +15,7 @@ package local
import ( import (
"fmt" "fmt"
"hash/fnv"
"math/rand" "math/rand"
"testing" "testing"
"testing/quick" "testing/quick"
@ -28,7 +29,7 @@ import (
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
func TestFingerprintsForLabelMatchers(t *testing.T) { func TestMatches(t *testing.T) {
storage, closer := NewTestStorage(t, 1) storage, closer := NewTestStorage(t, 1)
defer closer.Close() defer closer.Close()
@ -40,6 +41,7 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
clientmodel.MetricNameLabel: clientmodel.LabelValue(fmt.Sprintf("test_metric_%d", i)), clientmodel.MetricNameLabel: clientmodel.LabelValue(fmt.Sprintf("test_metric_%d", i)),
"label1": clientmodel.LabelValue(fmt.Sprintf("test_%d", i/10)), "label1": clientmodel.LabelValue(fmt.Sprintf("test_%d", i/10)),
"label2": clientmodel.LabelValue(fmt.Sprintf("test_%d", (i+5)/10)), "label2": clientmodel.LabelValue(fmt.Sprintf("test_%d", (i+5)/10)),
"all": "const",
} }
samples[i] = &clientmodel.Sample{ samples[i] = &clientmodel.Sample{
Metric: metric, Metric: metric,
@ -81,15 +83,22 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
expected: fingerprints[5:10], expected: fingerprints[5:10],
}, },
{ {
matchers: metric.LabelMatchers{newMatcher(metric.NotEqual, "label1", "x")}, matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "x"),
},
expected: fingerprints, expected: fingerprints,
}, },
{ {
matchers: metric.LabelMatchers{newMatcher(metric.NotEqual, "label1", "test_0")}, matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "test_0"),
},
expected: fingerprints[10:], expected: fingerprints[10:],
}, },
{ {
matchers: metric.LabelMatchers{ matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "test_0"), newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.NotEqual, "label1", "test_1"), newMatcher(metric.NotEqual, "label1", "test_1"),
newMatcher(metric.NotEqual, "label1", "test_2"), newMatcher(metric.NotEqual, "label1", "test_2"),
@ -97,11 +106,44 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
expected: fingerprints[30:], expected: fingerprints[30:],
}, },
{ {
matchers: metric.LabelMatchers{newMatcher(metric.RegexMatch, "label1", `test_[3-5]`)}, matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "label1", ""),
},
expected: fingerprints[:0],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.Equal, "label1", ""),
},
expected: fingerprints[:0],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.Equal, "label2", ""),
},
expected: fingerprints[:0],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.NotEqual, "label1", "test_0"),
newMatcher(metric.Equal, "not_existant", ""),
},
expected: fingerprints[10:],
},
{
matchers: metric.LabelMatchers{
newMatcher(metric.RegexMatch, "label1", `test_[3-5]`),
},
expected: fingerprints[30:60], expected: fingerprints[30:60],
}, },
{ {
matchers: metric.LabelMatchers{newMatcher(metric.RegexNoMatch, "label1", `test_[3-5]`)}, matchers: metric.LabelMatchers{
newMatcher(metric.Equal, "all", "const"),
newMatcher(metric.RegexNoMatch, "label1", `test_[3-5]`),
},
expected: append(append(clientmodel.Fingerprints{}, fingerprints[:30]...), fingerprints[60:]...), expected: append(append(clientmodel.Fingerprints{}, fingerprints[:30]...), fingerprints[60:]...),
}, },
{ {
@ -121,11 +163,11 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
} }
for _, mt := range matcherTests { for _, mt := range matcherTests {
resfps := storage.FingerprintsForLabelMatchers(mt.matchers) res := storage.MetricsForLabelMatchers(mt.matchers...)
if len(mt.expected) != len(resfps) { if len(mt.expected) != len(res) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(resfps)) t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res))
} }
for _, fp1 := range resfps { for fp1 := range res {
found := false found := false
for _, fp2 := range mt.expected { for _, fp2 := range mt.expected {
if fp1 == fp2 { if fp1 == fp2 {
@ -140,6 +182,178 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
} }
} }
func TestFingerprintsForLabels(t *testing.T) {
storage, closer := NewTestStorage(t, 1)
defer closer.Close()
samples := make([]*clientmodel.Sample, 100)
fingerprints := make(clientmodel.Fingerprints, 100)
for i := range samples {
metric := clientmodel.Metric{
clientmodel.MetricNameLabel: clientmodel.LabelValue(fmt.Sprintf("test_metric_%d", i)),
"label1": clientmodel.LabelValue(fmt.Sprintf("test_%d", i/10)),
"label2": clientmodel.LabelValue(fmt.Sprintf("test_%d", (i+5)/10)),
}
samples[i] = &clientmodel.Sample{
Metric: metric,
Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(i),
}
fingerprints[i] = metric.FastFingerprint()
}
for _, s := range samples {
storage.Append(s)
}
storage.WaitForIndexing()
var matcherTests = []struct {
pairs []metric.LabelPair
expected clientmodel.Fingerprints
}{
{
pairs: []metric.LabelPair{{"label1", "x"}},
expected: fingerprints[:0],
},
{
pairs: []metric.LabelPair{{"label1", "test_0"}},
expected: fingerprints[:10],
},
{
pairs: []metric.LabelPair{
{"label1", "test_0"},
{"label1", "test_1"},
},
expected: fingerprints[:0],
},
{
pairs: []metric.LabelPair{
{"label1", "test_0"},
{"label2", "test_1"},
},
expected: fingerprints[5:10],
},
{
pairs: []metric.LabelPair{
{"label1", "test_1"},
{"label2", "test_2"},
},
expected: fingerprints[15:20],
},
}
for _, mt := range matcherTests {
resfps := storage.fingerprintsForLabelPairs(mt.pairs...)
if len(mt.expected) != len(resfps) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.pairs, len(resfps))
}
for fp1 := range resfps {
found := false
for _, fp2 := range mt.expected {
if fp1 == fp2 {
found = true
break
}
}
if !found {
t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.pairs)
}
}
}
}
var benchLabelMatchingRes map[clientmodel.Fingerprint]clientmodel.COWMetric
func BenchmarkLabelMatching(b *testing.B) {
s, closer := NewTestStorage(b, 1)
defer closer.Close()
h := fnv.New64a()
lbl := func(x int) clientmodel.LabelValue {
h.Reset()
h.Write([]byte(fmt.Sprintf("%d", x)))
return clientmodel.LabelValue(fmt.Sprintf("%d", h.Sum64()))
}
M := 32
met := clientmodel.Metric{}
for i := 0; i < M; i++ {
met["label_a"] = lbl(i)
for j := 0; j < M; j++ {
met["label_b"] = lbl(j)
for k := 0; k < M; k++ {
met["label_c"] = lbl(k)
for l := 0; l < M; l++ {
met["label_d"] = lbl(l)
s.Append(&clientmodel.Sample{
Metric: met.Clone(),
Timestamp: 0,
Value: 1,
})
}
}
}
}
s.WaitForIndexing()
newMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher {
lm, err := metric.NewLabelMatcher(matchType, name, value)
if err != nil {
b.Fatalf("error creating label matcher: %s", err)
}
return lm
}
var matcherTests = []metric.LabelMatchers{
{
newMatcher(metric.Equal, "label_a", lbl(1)),
},
{
newMatcher(metric.Equal, "label_a", lbl(3)),
newMatcher(metric.Equal, "label_c", lbl(3)),
},
{
newMatcher(metric.Equal, "label_a", lbl(3)),
newMatcher(metric.Equal, "label_c", lbl(3)),
newMatcher(metric.NotEqual, "label_d", lbl(3)),
},
{
newMatcher(metric.Equal, "label_a", lbl(3)),
newMatcher(metric.Equal, "label_b", lbl(3)),
newMatcher(metric.Equal, "label_c", lbl(3)),
newMatcher(metric.NotEqual, "label_d", lbl(3)),
},
{
newMatcher(metric.RegexMatch, "label_a", ".+"),
},
{
newMatcher(metric.Equal, "label_a", lbl(3)),
newMatcher(metric.RegexMatch, "label_a", ".+"),
},
{
newMatcher(metric.Equal, "label_a", lbl(1)),
newMatcher(metric.RegexMatch, "label_c", "("+lbl(3)+"|"+lbl(10)+")"),
},
{
newMatcher(metric.Equal, "label_a", lbl(3)),
newMatcher(metric.Equal, "label_a", lbl(4)),
newMatcher(metric.RegexMatch, "label_c", "("+lbl(3)+"|"+lbl(10)+")"),
},
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
benchLabelMatchingRes = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
for _, mt := range matcherTests {
benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...)
}
}
// Stop timer to not count the storage closing.
b.StopTimer()
}
func TestRetentionCutoff(t *testing.T) { func TestRetentionCutoff(t *testing.T) {
now := clientmodel.Now() now := clientmodel.Now()
insertStart := now.Add(-2 * time.Hour) insertStart := now.Add(-2 * time.Hour)
@ -162,17 +376,17 @@ func TestRetentionCutoff(t *testing.T) {
} }
s.WaitForIndexing() s.WaitForIndexing()
lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test") var fp clientmodel.Fingerprint
if err != nil { for f := range s.fingerprintsForLabelPairs(metric.LabelPair{"job", "test"}) {
t.Fatalf("error creating label matcher: %s", err) fp = f
break
} }
fp := s.FingerprintsForLabelMatchers(metric.LabelMatchers{lm})[0]
pl := s.NewPreloader() pl := s.NewPreloader()
defer pl.Close() defer pl.Close()
// Preload everything. // Preload everything.
err = pl.PreloadRange(fp, insertStart, now, 5*time.Minute) err := pl.PreloadRange(fp, insertStart, now, 5*time.Minute)
if err != nil { if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err) t.Fatalf("Error preloading outdated chunks: %s", err)
} }
@ -227,56 +441,50 @@ func TestDropMetrics(t *testing.T) {
} }
s.WaitForIndexing() s.WaitForIndexing()
matcher := metric.LabelMatchers{{ fps := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "test",
}}
fps := s.FingerprintsForLabelMatchers(matcher)
if len(fps) != 2 { if len(fps) != 2 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps)) t.Fatalf("unexpected number of fingerprints: %d", len(fps))
} }
it := s.NewIterator(fps[0]) var fpList clientmodel.Fingerprints
for fp := range fps {
it := s.NewIterator(fp)
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fps[1]) fpList = append(fpList, fp)
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
} }
s.DropMetricsForFingerprints(fps[0]) s.DropMetricsForFingerprints(fpList[0])
s.WaitForIndexing() s.WaitForIndexing()
fps2 := s.FingerprintsForLabelMatchers(matcher) fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
if len(fps2) != 1 { if len(fps2) != 1 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) t.Fatalf("unexpected number of fingerprints: %d", len(fps2))
} }
it = s.NewIterator(fps[0]) it := s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fps[1]) it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
s.DropMetricsForFingerprints(fps...) s.DropMetricsForFingerprints(fpList...)
s.WaitForIndexing() s.WaitForIndexing()
fps3 := s.FingerprintsForLabelMatchers(matcher) fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
if len(fps3) != 0 { if len(fps3) != 0 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) t.Fatalf("unexpected number of fingerprints: %d", len(fps3))
} }
it = s.NewIterator(fps[0]) it = s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fps[1]) it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }

View file

@ -188,23 +188,21 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
if len(r.Form["match[]"]) == 0 { if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
} }
fps := map[clientmodel.Fingerprint]struct{}{} res := map[clientmodel.Fingerprint]clientmodel.COWMetric{}
for _, lm := range r.Form["match[]"] { for _, lm := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(lm) matchers, err := promql.ParseMetricSelector(lm)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
for _, fp := range api.Storage.FingerprintsForLabelMatchers(matchers) { for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) {
fps[fp] = struct{}{} res[fp] = met
} }
} }
metrics := make([]clientmodel.Metric, 0, len(fps)) metrics := make([]clientmodel.Metric, 0, len(res))
for fp := range fps { for _, met := range res {
if met := api.Storage.MetricForFingerprint(fp).Metric; met != nil { metrics = append(metrics, met.Metric)
metrics = append(metrics, met)
}
} }
return metrics, nil return metrics, nil
} }
@ -221,7 +219,7 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
for _, fp := range api.Storage.FingerprintsForLabelMatchers(matchers) { for fp := range api.Storage.MetricsForLabelMatchers(matchers...) {
fps[fp] = struct{}{} fps[fp] = struct{}{}
} }
} }

View file

@ -277,7 +277,7 @@ func TestEndpoints(t *testing.T) {
}, { }, {
endpoint: api.dropSeries, endpoint: api.dropSeries,
query: url.Values{ query: url.Values{
"match[]": []string{`{__name__=~".*"}`}, "match[]": []string{`{__name__=~".+"}`},
}, },
response: struct { response: struct {
NumDeleted int `json:"numDeleted"` NumDeleted int `json:"numDeleted"`