diff --git a/model/data.proto b/model/data.proto index 8bad15d273..4e43bd977d 100644 --- a/model/data.proto +++ b/model/data.proto @@ -24,6 +24,10 @@ message LabelName { optional string name = 1; } +message LabelValueCollection { + repeated string member = 1; +} + message Metric { repeated LabelPair label_pair = 1; } diff --git a/model/generated/data.pb.go b/model/generated/data.pb.go index 7171aced83..a887d924cc 100644 --- a/model/generated/data.pb.go +++ b/model/generated/data.pb.go @@ -11,6 +11,7 @@ It is generated from these files: It has these top-level messages: LabelPair LabelName + LabelValueCollection Metric Fingerprint FingerprintCollection @@ -76,6 +77,22 @@ func (m *LabelName) GetName() string { return "" } +type LabelValueCollection struct { + Member []string `protobuf:"bytes,1,rep,name=member" json:"member,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *LabelValueCollection) Reset() { *m = LabelValueCollection{} } +func (m *LabelValueCollection) String() string { return proto.CompactTextString(m) } +func (*LabelValueCollection) ProtoMessage() {} + +func (m *LabelValueCollection) GetMember() []string { + if m != nil { + return m.Member + } + return nil +} + type Metric struct { LabelPair []*LabelPair `protobuf:"bytes,1,rep,name=label_pair" json:"label_pair,omitempty"` XXX_unrecognized []byte `json:"-"` diff --git a/model/generated/descriptor.blob b/model/generated/descriptor.blob index ec90e38983..9c88a52fdb 100644 Binary files a/model/generated/descriptor.blob and b/model/generated/descriptor.blob differ diff --git a/storage/metric/end_to_end_test.go b/storage/metric/end_to_end_test.go index 13113a96d8..4a36d7d75a 100644 --- a/storage/metric/end_to_end_test.go +++ b/storage/metric/end_to_end_test.go @@ -78,6 +78,50 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) { } } +func GetLabelValuesForLabelNameTests(p MetricPersistence, t test.Tester) { + testAppendSamples(p, &clientmodel.Sample{ + Value: 0, + Timestamp: 0, + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "my_metric", + "request_type": "create", + "result": "success", + }, + }, t) + + testAppendSamples(p, &clientmodel.Sample{ + Value: 0, + Timestamp: 0, + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "my_metric", + "request_type": "delete", + "outcome": "failure", + }, + }, t) + + expectedIndex := map[clientmodel.LabelName]clientmodel.LabelValues{ + clientmodel.MetricNameLabel: {"my_metric"}, + "request_type": {"create", "delete"}, + "result": {"success"}, + "outcome": {"failure"}, + } + + for name, expected := range expectedIndex { + actual, err := p.GetLabelValuesForLabelName(name) + if err != nil { + t.Fatalf("Error getting values for label %s: %v", name, err) + } + if len(actual) != len(expected) { + t.Fatalf("Number of values don't match for label %s: got %d; want %d", name, len(actual), len(expected)) + } + for i := range expected { + if actual[i] != expected[i] { + t.Fatalf("%d. Got %s; want %s", i, actual[i], expected[i]) + } + } + } +} + func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) { testAppendSamples(p, &clientmodel.Sample{ Value: 0, @@ -322,6 +366,18 @@ func BenchmarkLevelDBGetFingerprintsForLabelSet(b *testing.B) { } } +var testLevelDBGetLabelValuesForLabelName = buildLevelDBTestPersistence("get_label_values_for_labelname", GetLabelValuesForLabelNameTests) + +func TestLevelDBGetFingerprintsForLabelName(t *testing.T) { + testLevelDBGetLabelValuesForLabelName(t) +} + +func BenchmarkLevelDBGetLabelValuesForLabelName(b *testing.B) { + for i := 0; i < b.N; i++ { + testLevelDBGetLabelValuesForLabelName(b) + } +} + var testLevelDBGetMetricForFingerprint = buildLevelDBTestPersistence("get_metric_for_fingerprint", GetMetricForFingerprintTests) func TestLevelDBGetMetricForFingerprint(t *testing.T) { @@ -370,6 +426,18 @@ func BenchmarkMemoryGetFingerprintsForLabelSet(b *testing.B) { } } +var testMemoryGetLabelValuesForLabelName = buildMemoryTestPersistence(GetLabelValuesForLabelNameTests) + +func TestMemoryGetLabelValuesForLabelName(t *testing.T) { + testMemoryGetLabelValuesForLabelName(t) +} + +func BenchmarkMemoryGetLabelValuesForLabelName(b *testing.B) { + for i := 0; i < b.N; i++ { + testMemoryGetLabelValuesForLabelName(b) + } +} + var testMemoryGetMetricForFingerprint = buildMemoryTestPersistence(GetMetricForFingerprintTests) func TestMemoryGetMetricForFingerprint(t *testing.T) { diff --git a/storage/metric/index.go b/storage/metric/index.go index c71c12f674..4fe2c0088f 100644 --- a/storage/metric/index.go +++ b/storage/metric/index.go @@ -96,6 +96,90 @@ func NewLevelDBFingerprintMetricIndex(o leveldb.LevelDBOptions) (*LevelDBFingerp }, nil } +// LabelNameLabelValuesMapping is an in-memory map of LabelNames to +// LabelValues. +type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues + +// LabelNameLabelValuesIndex models a database mapping LabelNames to +// LabelValues. +type LabelNameLabelValuesIndex interface { + raw.Database + raw.Pruner + + IndexBatch(LabelNameLabelValuesMapping) error + Lookup(clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) + Has(clientmodel.LabelName) (ok bool, err error) +} + +// LevelDBLabelNameLabelValuesIndex implements LabelNameLabelValuesIndex using +// leveldb. +type LevelDBLabelNameLabelValuesIndex struct { + *leveldb.LevelDBPersistence +} + +// IndexBatch implements LabelNameLabelValuesIndex. +func (i *LevelDBLabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error { + batch := leveldb.NewBatch() + defer batch.Close() + + for labelName, labelValues := range b { + sort.Sort(labelValues) + + key := &dto.LabelName{ + Name: proto.String(string(labelName)), + } + value := &dto.LabelValueCollection{} + value.Member = make([]string, 0, len(labelValues)) + for _, labelValue := range labelValues { + value.Member = append(value.Member, string(labelValue)) + } + + batch.Put(key, value) + } + + return i.LevelDBPersistence.Commit(batch) +} + +// Lookup implements LabelNameLabelValuesIndex. +func (i *LevelDBLabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) { + k := &dto.LabelName{} + dumpLabelName(k, l) + v := &dto.LabelValueCollection{} + ok, err = i.LevelDBPersistence.Get(k, v) + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + + for _, m := range v.Member { + values = append(values, clientmodel.LabelValue(m)) + } + + return values, true, nil +} + +// Has implements LabelNameLabelValuesIndex. +func (i *LevelDBLabelNameLabelValuesIndex) Has(l clientmodel.LabelName) (ok bool, err error) { + return i.LevelDBPersistence.Has(&dto.LabelName{ + Name: proto.String(string(l)), + }) +} + +// NewLevelDBLabelNameLabelValuesIndex returns a LevelDBLabelNameLabelValuesIndex +// ready to use. +func NewLevelDBLabelNameLabelValuesIndex(o leveldb.LevelDBOptions) (*LevelDBLabelNameLabelValuesIndex, error) { + s, err := leveldb.NewLevelDBPersistence(o) + if err != nil { + return nil, err + } + + return &LevelDBLabelNameLabelValuesIndex{ + LevelDBPersistence: s, + }, nil +} + // LabelPairFingerprintMapping is an in-memory map of LabelPairs to // Fingerprints. type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints @@ -470,6 +554,7 @@ func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer { // locking semantics to enforce this. type TotalIndexer struct { FingerprintToMetric FingerprintMetricIndex + LabelNameToLabelValues LabelNameLabelValuesIndex LabelPairToFingerprint LabelPairFingerprintIndex MetricMembership MetricMembershipIndex } @@ -490,6 +575,45 @@ func findUnindexed(i MetricMembershipIndex, b FingerprintMetricMapping) (Fingerp return out, nil } +func extendLabelNameToLabelValuesIndex(i LabelNameLabelValuesIndex, b FingerprintMetricMapping) (LabelNameLabelValuesMapping, error) { + collection := map[clientmodel.LabelName]utility.Set{} + + for _, m := range b { + for l, v := range m { + set, ok := collection[l] + if !ok { + baseValues, _, err := i.Lookup(l) + if err != nil { + return nil, err + } + + set = utility.Set{} + + for _, baseValue := range baseValues { + set.Add(baseValue) + } + + collection[l] = set + } + + set.Add(v) + } + } + + batch := LabelNameLabelValuesMapping{} + for l, set := range collection { + values := make(clientmodel.LabelValues, 0, len(set)) + for e := range set { + val := e.(clientmodel.LabelValue) + values = append(values, val) + } + + batch[l] = values + } + + return batch, nil +} + func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMapping) (LabelPairFingerprintMapping, error) { collection := map[LabelPair]utility.Set{} @@ -540,6 +664,14 @@ func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error { return err } + labelNames, err := extendLabelNameToLabelValuesIndex(i.LabelNameToLabelValues, unindexed) + if err != nil { + return err + } + if err := i.LabelNameToLabelValues.IndexBatch(labelNames); err != nil { + return err + } + labelPairs, err := extendLabelPairIndex(i.LabelPairToFingerprint, unindexed) if err != nil { return err diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index e4f955008b..b26c9460d7 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -25,30 +25,15 @@ const ( failure = "failure" result = "result" - appendFingerprints = "append_fingerprints" - appendLabelPairFingerprint = "append_label_pair_fingerprint" appendSample = "append_sample" appendSamples = "append_samples" - findUnindexedMetrics = "find_unindexed_metrics" flushMemory = "flush_memory" - getBoundaryValues = "get_boundary_values" + getLabelValuesForLabelName = "get_label_values_for_label_name" getFingerprintsForLabelSet = "get_fingerprints_for_labelset" getMetricForFingerprint = "get_metric_for_fingerprint" - getRangeValues = "get_range_values" - getValueAtTime = "get_value_at_time" hasIndexMetric = "has_index_metric" - hasLabelName = "has_label_name" - hasLabelPair = "has_label_pair" - indexFingerprints = "index_fingerprints" - indexLabelNames = "index_label_names" - indexLabelPairs = "index_label_pairs" - indexMetric = "index_metric" - indexMetrics = "index_metrics" - rebuildDiskFrontier = "rebuild_disk_frontier" refreshHighWatermarks = "refresh_high_watermarks" renderView = "render_view" - setLabelPairFingerprints = "set_label_pair_fingerprints" - writeMemory = "write_memory" cutOff = "recency_threshold" processorName = "processor" diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 14fb1325e5..71349ab7a7 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -32,6 +32,9 @@ type MetricPersistence interface { // provided label set. GetFingerprintsForLabelSet(clientmodel.LabelSet) (clientmodel.Fingerprints, error) + // Get all of the label values that are associated with a given label name. + GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error) + // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 0bf8ade39b..8620305c3a 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -39,6 +39,7 @@ const sortConcurrency = 2 type LevelDBMetricPersistence struct { CurationRemarks CurationRemarker FingerprintToMetrics FingerprintMetricIndex + LabelNameToLabelValues LabelNameLabelValuesIndex LabelPairToFingerprints LabelPairFingerprintIndex MetricHighWatermarks HighWatermarker MetricMembershipIndex MetricMembershipIndex @@ -67,6 +68,7 @@ var ( curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).") fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).") highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).") + labelNameToLabelValuesCacheSize = flag.Int("labelNameToLabelValuesCacheSizeBytes", 25*1024*1024, "The size for the label name to label values index (bytes).") labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size for the label pair to metric fingerprint index (bytes).") metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 5*1024*1024, "The size for the metric membership index (bytes).") samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 50*1024*1024, "The size for the samples database (bytes).") @@ -80,6 +82,7 @@ func (l *LevelDBMetricPersistence) Close() { var persistences = []raw.Database{ l.CurationRemarks, l.FingerprintToMetrics, + l.LabelNameToLabelValues, l.LabelPairToFingerprints, l.MetricHighWatermarks, l.MetricMembershipIndex, @@ -106,7 +109,7 @@ func (l *LevelDBMetricPersistence) Close() { // NewLevelDBMetricPersistence returns a LevelDBMetricPersistence object ready // to use. func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { - workers := utility.NewUncertaintyGroup(6) + workers := utility.NewUncertaintyGroup(7) emission := &LevelDBMetricPersistence{} @@ -157,6 +160,21 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc workers.MayFail(err) }, }, + { + "Fingerprints by Label Name", + func() { + var err error + emission.LabelNameToLabelValues, err = NewLevelDBLabelNameLabelValuesIndex( + leveldb.LevelDBOptions{ + Name: "Label Values by Label Name", + Purpose: "Index", + Path: baseDirectory + "/label_values_by_label_name", + CacheSizeBytes: *labelNameToLabelValuesCacheSize, + }, + ) + workers.MayFail(err) + }, + }, { "Fingerprints by Label Name and Value Pair", func() { @@ -219,6 +237,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc emission.Indexer = &TotalIndexer{ FingerprintToMetric: emission.FingerprintToMetrics, + LabelNameToLabelValues: emission.LabelNameToLabelValues, LabelPairToFingerprint: emission.LabelPairToFingerprints, MetricMembership: emission.MetricMembershipIndex, } @@ -400,18 +419,6 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value b return l.MetricMembershipIndex.Has(m) } -// HasLabelPair returns true if the given LabelPair is present in the underlying -// LabelPair index. -func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) { - defer func(begin time.Time) { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) - }(time.Now()) - - return l.LabelPairToFingerprints.Has(p) -} - // GetFingerprintsForLabelSet returns the Fingerprints for the given LabelSet by // querying the underlying LabelPairFingerprintIndex for each LabelPair // contained in LabelSet. It implements the MetricPersistence interface. @@ -459,6 +466,22 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod return fps, nil } +// GetLabelValuesForLabelName returns the LabelValues for the given LabelName +// from the underlying LabelNameLabelValuesIndex. It implements the +// MetricPersistence interface. +func (l *LevelDBMetricPersistence) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) { + var err error + defer func(begin time.Time) { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: getLabelValuesForLabelName, result: success}, map[string]string{operation: getLabelValuesForLabelName, result: failure}) + }(time.Now()) + + values, _, err := l.LabelNameToLabelValues.Lookup(labelName) + + return values, err +} + // GetMetricForFingerprint returns the Metric for the given Fingerprint from the // underlying FingerprintMetricIndex. It implements the MetricPersistence // interface. @@ -499,6 +522,7 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La func (l *LevelDBMetricPersistence) Prune() { l.CurationRemarks.Prune() l.FingerprintToMetrics.Prune() + l.LabelNameToLabelValues.Prune() l.LabelPairToFingerprints.Prune() l.MetricHighWatermarks.Prune() l.MetricMembershipIndex.Prune() @@ -519,6 +543,11 @@ func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) { } total += size + if size, err = l.LabelNameToLabelValues.Size(); err != nil { + return 0, err + } + total += size + if size, err = l.LabelPairToFingerprints.Size(); err != nil { return 0, err } @@ -547,6 +576,7 @@ func (l *LevelDBMetricPersistence) States() raw.DatabaseStates { return raw.DatabaseStates{ l.CurationRemarks.State(), l.FingerprintToMetrics.State(), + l.LabelNameToLabelValues.State(), l.LabelPairToFingerprints.State(), l.MetricHighWatermarks.State(), l.MetricMembershipIndex.State(), diff --git a/storage/metric/memory.go b/storage/metric/memory.go index a349f4201d..568c65f39a 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -177,6 +177,7 @@ type memorySeriesStorage struct { wmCache *watermarkCache fingerprintToSeries map[clientmodel.Fingerprint]stream labelPairToFingerprints map[LabelPair]utility.Set + labelNameToLabelValues map[clientmodel.LabelName]utility.Set } // MemorySeriesOptions bundles options used by NewMemorySeriesStorage to create @@ -243,12 +244,19 @@ func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, finge Value: v, } - set, ok := s.labelPairToFingerprints[labelPair] + fps, ok := s.labelPairToFingerprints[labelPair] if !ok { - set = utility.Set{} - s.labelPairToFingerprints[labelPair] = set + fps = utility.Set{} + s.labelPairToFingerprints[labelPair] = fps } - set.Add(*fingerprint) + fps.Add(*fingerprint) + + values, ok := s.labelNameToLabelValues[k] + if !ok { + values = utility.Set{} + s.labelNameToLabelValues[k] = values + } + values.Add(v) } } return series @@ -292,6 +300,16 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue } } +// Drop a label value from the label names to label values index. +func (s *memorySeriesStorage) dropLabelValue(l clientmodel.LabelName, v clientmodel.LabelValue) { + if set, ok := s.labelNameToLabelValues[l]; ok { + set.Remove(v) + if len(set) == 0 { + delete(s.labelNameToLabelValues, l) + } + } +} + // Drop all references to a series, including any samples. func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) { series, ok := s.fingerprintToSeries[*fingerprint] @@ -308,6 +326,7 @@ func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) { set.Remove(*fingerprint) if len(set) == 0 { delete(s.labelPairToFingerprints, labelPair) + s.dropLabelValue(k, v) } } } @@ -365,6 +384,23 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l clientmodel.LabelSet) return fingerprints, nil } +func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) { + s.RLock() + defer s.RUnlock() + + set, ok := s.labelNameToLabelValues[labelName] + if !ok { + return nil, nil + } + + values := make(clientmodel.LabelValues, 0, len(set)) + for e := range set { + val := e.(clientmodel.LabelValue) + values = append(values, val) + } + return values, nil +} + func (s *memorySeriesStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) { s.RLock() defer s.RUnlock() @@ -446,6 +482,7 @@ func (s *memorySeriesStorage) Close() { s.fingerprintToSeries = nil s.labelPairToFingerprints = nil + s.labelNameToLabelValues = nil } func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) { @@ -470,6 +507,7 @@ func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage { return &memorySeriesStorage{ fingerprintToSeries: make(map[clientmodel.Fingerprint]stream), labelPairToFingerprints: make(map[LabelPair]utility.Set), + labelNameToLabelValues: make(map[clientmodel.LabelName]utility.Set), wmCache: o.WatermarkCache, } } diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 65db74b033..b932c5ad36 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -66,6 +66,29 @@ func ReadEmptyTests(p MetricPersistence, t test.Tester) { t.Error(err) return } + + hasLabelName := func(x int) (success bool) { + labelName := clientmodel.LabelName(string(x)) + + values, err := p.GetLabelValuesForLabelName(labelName) + if err != nil { + t.Error(err) + return + } + + success = len(values) == 0 + if !success { + t.Errorf("unexpected values length %d, got %d", 0, len(values)) + } + + return + } + + err = quick.Check(hasLabelName, nil) + if err != nil { + t.Error(err) + return + } } func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) { @@ -117,6 +140,16 @@ func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester return } + values, err := p.GetLabelValuesForLabelName(labelName) + if err != nil { + t.Error(err) + return + } + if len(values) != 1 { + t.Errorf("expected label values count of %d, got %d", 1, len(values)) + return + } + fingerprints, err := p.GetFingerprintsForLabelSet(clientmodel.LabelSet{ labelName: labelValue, })