Add label names -> label values index.

Change-Id: Ie39b4044558afc4d1aa937de7dcf8df61f821fb4
This commit is contained in:
Julius Volz 2014-03-24 12:08:28 +01:00
parent 71d2ff406d
commit ae30453214
10 changed files with 343 additions and 33 deletions

View file

@ -24,6 +24,10 @@ message LabelName {
optional string name = 1;
}
message LabelValueCollection {
repeated string member = 1;
}
message Metric {
repeated LabelPair label_pair = 1;
}

View file

@ -11,6 +11,7 @@ It is generated from these files:
It has these top-level messages:
LabelPair
LabelName
LabelValueCollection
Metric
Fingerprint
FingerprintCollection
@ -76,6 +77,22 @@ func (m *LabelName) GetName() string {
return ""
}
type LabelValueCollection struct {
Member []string `protobuf:"bytes,1,rep,name=member" json:"member,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *LabelValueCollection) Reset() { *m = LabelValueCollection{} }
func (m *LabelValueCollection) String() string { return proto.CompactTextString(m) }
func (*LabelValueCollection) ProtoMessage() {}
func (m *LabelValueCollection) GetMember() []string {
if m != nil {
return m.Member
}
return nil
}
type Metric struct {
LabelPair []*LabelPair `protobuf:"bytes,1,rep,name=label_pair" json:"label_pair,omitempty"`
XXX_unrecognized []byte `json:"-"`

Binary file not shown.

View file

@ -78,6 +78,50 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
}
}
func GetLabelValuesForLabelNameTests(p MetricPersistence, t test.Tester) {
testAppendSamples(p, &clientmodel.Sample{
Value: 0,
Timestamp: 0,
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "my_metric",
"request_type": "create",
"result": "success",
},
}, t)
testAppendSamples(p, &clientmodel.Sample{
Value: 0,
Timestamp: 0,
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "my_metric",
"request_type": "delete",
"outcome": "failure",
},
}, t)
expectedIndex := map[clientmodel.LabelName]clientmodel.LabelValues{
clientmodel.MetricNameLabel: {"my_metric"},
"request_type": {"create", "delete"},
"result": {"success"},
"outcome": {"failure"},
}
for name, expected := range expectedIndex {
actual, err := p.GetLabelValuesForLabelName(name)
if err != nil {
t.Fatalf("Error getting values for label %s: %v", name, err)
}
if len(actual) != len(expected) {
t.Fatalf("Number of values don't match for label %s: got %d; want %d", name, len(actual), len(expected))
}
for i := range expected {
if actual[i] != expected[i] {
t.Fatalf("%d. Got %s; want %s", i, actual[i], expected[i])
}
}
}
}
func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
testAppendSamples(p, &clientmodel.Sample{
Value: 0,
@ -322,6 +366,18 @@ func BenchmarkLevelDBGetFingerprintsForLabelSet(b *testing.B) {
}
}
var testLevelDBGetLabelValuesForLabelName = buildLevelDBTestPersistence("get_label_values_for_labelname", GetLabelValuesForLabelNameTests)
func TestLevelDBGetFingerprintsForLabelName(t *testing.T) {
testLevelDBGetLabelValuesForLabelName(t)
}
func BenchmarkLevelDBGetLabelValuesForLabelName(b *testing.B) {
for i := 0; i < b.N; i++ {
testLevelDBGetLabelValuesForLabelName(b)
}
}
var testLevelDBGetMetricForFingerprint = buildLevelDBTestPersistence("get_metric_for_fingerprint", GetMetricForFingerprintTests)
func TestLevelDBGetMetricForFingerprint(t *testing.T) {
@ -370,6 +426,18 @@ func BenchmarkMemoryGetFingerprintsForLabelSet(b *testing.B) {
}
}
var testMemoryGetLabelValuesForLabelName = buildMemoryTestPersistence(GetLabelValuesForLabelNameTests)
func TestMemoryGetLabelValuesForLabelName(t *testing.T) {
testMemoryGetLabelValuesForLabelName(t)
}
func BenchmarkMemoryGetLabelValuesForLabelName(b *testing.B) {
for i := 0; i < b.N; i++ {
testMemoryGetLabelValuesForLabelName(b)
}
}
var testMemoryGetMetricForFingerprint = buildMemoryTestPersistence(GetMetricForFingerprintTests)
func TestMemoryGetMetricForFingerprint(t *testing.T) {

View file

@ -96,6 +96,90 @@ func NewLevelDBFingerprintMetricIndex(o leveldb.LevelDBOptions) (*LevelDBFingerp
}, nil
}
// LabelNameLabelValuesMapping is an in-memory map of LabelNames to
// LabelValues.
type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues
// LabelNameLabelValuesIndex models a database mapping LabelNames to
// LabelValues.
type LabelNameLabelValuesIndex interface {
raw.Database
raw.Pruner
IndexBatch(LabelNameLabelValuesMapping) error
Lookup(clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error)
}
// LevelDBLabelNameLabelValuesIndex implements LabelNameLabelValuesIndex using
// leveldb.
type LevelDBLabelNameLabelValuesIndex struct {
*leveldb.LevelDBPersistence
}
// IndexBatch implements LabelNameLabelValuesIndex.
func (i *LevelDBLabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for labelName, labelValues := range b {
sort.Sort(labelValues)
key := &dto.LabelName{
Name: proto.String(string(labelName)),
}
value := &dto.LabelValueCollection{}
value.Member = make([]string, 0, len(labelValues))
for _, labelValue := range labelValues {
value.Member = append(value.Member, string(labelValue))
}
batch.Put(key, value)
}
return i.LevelDBPersistence.Commit(batch)
}
// Lookup implements LabelNameLabelValuesIndex.
func (i *LevelDBLabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) {
k := &dto.LabelName{}
dumpLabelName(k, l)
v := &dto.LabelValueCollection{}
ok, err = i.LevelDBPersistence.Get(k, v)
if err != nil {
return nil, false, err
}
if !ok {
return nil, false, nil
}
for _, m := range v.Member {
values = append(values, clientmodel.LabelValue(m))
}
return values, true, nil
}
// Has implements LabelNameLabelValuesIndex.
func (i *LevelDBLabelNameLabelValuesIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.LevelDBPersistence.Has(&dto.LabelName{
Name: proto.String(string(l)),
})
}
// NewLevelDBLabelNameLabelValuesIndex returns a LevelDBLabelNameLabelValuesIndex
// ready to use.
func NewLevelDBLabelNameLabelValuesIndex(o leveldb.LevelDBOptions) (*LevelDBLabelNameLabelValuesIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o)
if err != nil {
return nil, err
}
return &LevelDBLabelNameLabelValuesIndex{
LevelDBPersistence: s,
}, nil
}
// LabelPairFingerprintMapping is an in-memory map of LabelPairs to
// Fingerprints.
type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints
@ -470,6 +554,7 @@ func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer {
// locking semantics to enforce this.
type TotalIndexer struct {
FingerprintToMetric FingerprintMetricIndex
LabelNameToLabelValues LabelNameLabelValuesIndex
LabelPairToFingerprint LabelPairFingerprintIndex
MetricMembership MetricMembershipIndex
}
@ -490,6 +575,45 @@ func findUnindexed(i MetricMembershipIndex, b FingerprintMetricMapping) (Fingerp
return out, nil
}
func extendLabelNameToLabelValuesIndex(i LabelNameLabelValuesIndex, b FingerprintMetricMapping) (LabelNameLabelValuesMapping, error) {
collection := map[clientmodel.LabelName]utility.Set{}
for _, m := range b {
for l, v := range m {
set, ok := collection[l]
if !ok {
baseValues, _, err := i.Lookup(l)
if err != nil {
return nil, err
}
set = utility.Set{}
for _, baseValue := range baseValues {
set.Add(baseValue)
}
collection[l] = set
}
set.Add(v)
}
}
batch := LabelNameLabelValuesMapping{}
for l, set := range collection {
values := make(clientmodel.LabelValues, 0, len(set))
for e := range set {
val := e.(clientmodel.LabelValue)
values = append(values, val)
}
batch[l] = values
}
return batch, nil
}
func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMapping) (LabelPairFingerprintMapping, error) {
collection := map[LabelPair]utility.Set{}
@ -540,6 +664,14 @@ func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error {
return err
}
labelNames, err := extendLabelNameToLabelValuesIndex(i.LabelNameToLabelValues, unindexed)
if err != nil {
return err
}
if err := i.LabelNameToLabelValues.IndexBatch(labelNames); err != nil {
return err
}
labelPairs, err := extendLabelPairIndex(i.LabelPairToFingerprint, unindexed)
if err != nil {
return err

View file

@ -25,30 +25,15 @@ const (
failure = "failure"
result = "result"
appendFingerprints = "append_fingerprints"
appendLabelPairFingerprint = "append_label_pair_fingerprint"
appendSample = "append_sample"
appendSamples = "append_samples"
findUnindexedMetrics = "find_unindexed_metrics"
flushMemory = "flush_memory"
getBoundaryValues = "get_boundary_values"
getLabelValuesForLabelName = "get_label_values_for_label_name"
getFingerprintsForLabelSet = "get_fingerprints_for_labelset"
getMetricForFingerprint = "get_metric_for_fingerprint"
getRangeValues = "get_range_values"
getValueAtTime = "get_value_at_time"
hasIndexMetric = "has_index_metric"
hasLabelName = "has_label_name"
hasLabelPair = "has_label_pair"
indexFingerprints = "index_fingerprints"
indexLabelNames = "index_label_names"
indexLabelPairs = "index_label_pairs"
indexMetric = "index_metric"
indexMetrics = "index_metrics"
rebuildDiskFrontier = "rebuild_disk_frontier"
refreshHighWatermarks = "refresh_high_watermarks"
renderView = "render_view"
setLabelPairFingerprints = "set_label_pair_fingerprints"
writeMemory = "write_memory"
cutOff = "recency_threshold"
processorName = "processor"

View file

@ -32,6 +32,9 @@ type MetricPersistence interface {
// provided label set.
GetFingerprintsForLabelSet(clientmodel.LabelSet) (clientmodel.Fingerprints, error)
// Get all of the label values that are associated with a given label name.
GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error)
// Get the metric associated with the provided fingerprint.
GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error)

View file

@ -39,6 +39,7 @@ const sortConcurrency = 2
type LevelDBMetricPersistence struct {
CurationRemarks CurationRemarker
FingerprintToMetrics FingerprintMetricIndex
LabelNameToLabelValues LabelNameLabelValuesIndex
LabelPairToFingerprints LabelPairFingerprintIndex
MetricHighWatermarks HighWatermarker
MetricMembershipIndex MetricMembershipIndex
@ -67,6 +68,7 @@ var (
curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).")
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).")
highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).")
labelNameToLabelValuesCacheSize = flag.Int("labelNameToLabelValuesCacheSizeBytes", 25*1024*1024, "The size for the label name to label values index (bytes).")
labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size for the label pair to metric fingerprint index (bytes).")
metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 5*1024*1024, "The size for the metric membership index (bytes).")
samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 50*1024*1024, "The size for the samples database (bytes).")
@ -80,6 +82,7 @@ func (l *LevelDBMetricPersistence) Close() {
var persistences = []raw.Database{
l.CurationRemarks,
l.FingerprintToMetrics,
l.LabelNameToLabelValues,
l.LabelPairToFingerprints,
l.MetricHighWatermarks,
l.MetricMembershipIndex,
@ -106,7 +109,7 @@ func (l *LevelDBMetricPersistence) Close() {
// NewLevelDBMetricPersistence returns a LevelDBMetricPersistence object ready
// to use.
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
workers := utility.NewUncertaintyGroup(6)
workers := utility.NewUncertaintyGroup(7)
emission := &LevelDBMetricPersistence{}
@ -157,6 +160,21 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
workers.MayFail(err)
},
},
{
"Fingerprints by Label Name",
func() {
var err error
emission.LabelNameToLabelValues, err = NewLevelDBLabelNameLabelValuesIndex(
leveldb.LevelDBOptions{
Name: "Label Values by Label Name",
Purpose: "Index",
Path: baseDirectory + "/label_values_by_label_name",
CacheSizeBytes: *labelNameToLabelValuesCacheSize,
},
)
workers.MayFail(err)
},
},
{
"Fingerprints by Label Name and Value Pair",
func() {
@ -219,6 +237,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
emission.Indexer = &TotalIndexer{
FingerprintToMetric: emission.FingerprintToMetrics,
LabelNameToLabelValues: emission.LabelNameToLabelValues,
LabelPairToFingerprint: emission.LabelPairToFingerprints,
MetricMembership: emission.MetricMembershipIndex,
}
@ -400,18 +419,6 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value b
return l.MetricMembershipIndex.Has(m)
}
// HasLabelPair returns true if the given LabelPair is present in the underlying
// LabelPair index.
func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now())
return l.LabelPairToFingerprints.Has(p)
}
// GetFingerprintsForLabelSet returns the Fingerprints for the given LabelSet by
// querying the underlying LabelPairFingerprintIndex for each LabelPair
// contained in LabelSet. It implements the MetricPersistence interface.
@ -459,6 +466,22 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
return fps, nil
}
// GetLabelValuesForLabelName returns the LabelValues for the given LabelName
// from the underlying LabelNameLabelValuesIndex. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
var err error
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getLabelValuesForLabelName, result: success}, map[string]string{operation: getLabelValuesForLabelName, result: failure})
}(time.Now())
values, _, err := l.LabelNameToLabelValues.Lookup(labelName)
return values, err
}
// GetMetricForFingerprint returns the Metric for the given Fingerprint from the
// underlying FingerprintMetricIndex. It implements the MetricPersistence
// interface.
@ -499,6 +522,7 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
func (l *LevelDBMetricPersistence) Prune() {
l.CurationRemarks.Prune()
l.FingerprintToMetrics.Prune()
l.LabelNameToLabelValues.Prune()
l.LabelPairToFingerprints.Prune()
l.MetricHighWatermarks.Prune()
l.MetricMembershipIndex.Prune()
@ -519,6 +543,11 @@ func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
}
total += size
if size, err = l.LabelNameToLabelValues.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.LabelPairToFingerprints.Size(); err != nil {
return 0, err
}
@ -547,6 +576,7 @@ func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
return raw.DatabaseStates{
l.CurationRemarks.State(),
l.FingerprintToMetrics.State(),
l.LabelNameToLabelValues.State(),
l.LabelPairToFingerprints.State(),
l.MetricHighWatermarks.State(),
l.MetricMembershipIndex.State(),

View file

@ -177,6 +177,7 @@ type memorySeriesStorage struct {
wmCache *watermarkCache
fingerprintToSeries map[clientmodel.Fingerprint]stream
labelPairToFingerprints map[LabelPair]utility.Set
labelNameToLabelValues map[clientmodel.LabelName]utility.Set
}
// MemorySeriesOptions bundles options used by NewMemorySeriesStorage to create
@ -243,12 +244,19 @@ func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, finge
Value: v,
}
set, ok := s.labelPairToFingerprints[labelPair]
fps, ok := s.labelPairToFingerprints[labelPair]
if !ok {
set = utility.Set{}
s.labelPairToFingerprints[labelPair] = set
fps = utility.Set{}
s.labelPairToFingerprints[labelPair] = fps
}
set.Add(*fingerprint)
fps.Add(*fingerprint)
values, ok := s.labelNameToLabelValues[k]
if !ok {
values = utility.Set{}
s.labelNameToLabelValues[k] = values
}
values.Add(v)
}
}
return series
@ -292,6 +300,16 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue
}
}
// Drop a label value from the label names to label values index.
func (s *memorySeriesStorage) dropLabelValue(l clientmodel.LabelName, v clientmodel.LabelValue) {
if set, ok := s.labelNameToLabelValues[l]; ok {
set.Remove(v)
if len(set) == 0 {
delete(s.labelNameToLabelValues, l)
}
}
}
// Drop all references to a series, including any samples.
func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
series, ok := s.fingerprintToSeries[*fingerprint]
@ -308,6 +326,7 @@ func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
set.Remove(*fingerprint)
if len(set) == 0 {
delete(s.labelPairToFingerprints, labelPair)
s.dropLabelValue(k, v)
}
}
}
@ -365,6 +384,23 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l clientmodel.LabelSet)
return fingerprints, nil
}
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
s.RLock()
defer s.RUnlock()
set, ok := s.labelNameToLabelValues[labelName]
if !ok {
return nil, nil
}
values := make(clientmodel.LabelValues, 0, len(set))
for e := range set {
val := e.(clientmodel.LabelValue)
values = append(values, val)
}
return values, nil
}
func (s *memorySeriesStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {
s.RLock()
defer s.RUnlock()
@ -446,6 +482,7 @@ func (s *memorySeriesStorage) Close() {
s.fingerprintToSeries = nil
s.labelPairToFingerprints = nil
s.labelNameToLabelValues = nil
}
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
@ -470,6 +507,7 @@ func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),
labelPairToFingerprints: make(map[LabelPair]utility.Set),
labelNameToLabelValues: make(map[clientmodel.LabelName]utility.Set),
wmCache: o.WatermarkCache,
}
}

View file

@ -66,6 +66,29 @@ func ReadEmptyTests(p MetricPersistence, t test.Tester) {
t.Error(err)
return
}
hasLabelName := func(x int) (success bool) {
labelName := clientmodel.LabelName(string(x))
values, err := p.GetLabelValuesForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
success = len(values) == 0
if !success {
t.Errorf("unexpected values length %d, got %d", 0, len(values))
}
return
}
err = quick.Check(hasLabelName, nil)
if err != nil {
t.Error(err)
return
}
}
func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) {
@ -117,6 +140,16 @@ func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester
return
}
values, err := p.GetLabelValuesForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
if len(values) != 1 {
t.Errorf("expected label values count of %d, got %d", 1, len(values))
return
}
fingerprints, err := p.GetFingerprintsForLabelSet(clientmodel.LabelSet{
labelName: labelValue,
})