Merge pull request #351 from prometheus/refactor/storage/componentize-index

Replace index writes with wrapped interface.
This commit is contained in:
Matt T. Proud 2013-08-11 17:16:05 -07:00
commit 974eacf36f
3 changed files with 197 additions and 211 deletions

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
)
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
@ -211,20 +212,20 @@ func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOption
}, nil
}
type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelSetFingerprintIndex interface {
type LabelPairFingerprintIndex interface {
raw.ForEacher
raw.Pruner
IndexBatch(LabelSetFingerprintMapping) error
IndexBatch(LabelPairFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
type LevelDBLabelSetFingerprintIndex struct {
type LevelDBLabelPairFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
@ -232,7 +233,7 @@ type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *LevelDBLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -256,7 +257,7 @@ func (i *LevelDBLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMappin
return i.p.Commit(batch)
}
func (i *LevelDBLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
@ -281,7 +282,7 @@ func (i *LevelDBLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fi
return m, true, nil
}
func (i *LevelDBLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
func (i *LevelDBLabelPairFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
@ -290,35 +291,35 @@ func (i *LevelDBLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error)
return i.p.Has(k)
}
func (i *LevelDBLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
func (i *LevelDBLabelPairFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LevelDBLabelSetFingerprintIndex) Prune() (bool, error) {
func (i *LevelDBLabelPairFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LevelDBLabelSetFingerprintIndex) Close() {
func (i *LevelDBLabelPairFingerprintIndex) Close() {
i.p.Close()
}
func (i *LevelDBLabelSetFingerprintIndex) Size() (uint64, bool, error) {
func (i *LevelDBLabelPairFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBLabelSetFingerprintIndex) State() *raw.DatabaseState {
func (i *LevelDBLabelPairFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelSetFingerprintIndex, error) {
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LevelDBLabelSetFingerprintIndex{
return &LevelDBLabelPairFingerprintIndex{
p: s,
}, nil
}
@ -326,7 +327,7 @@ func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOption
type MetricMembershipIndex interface {
raw.Pruner
IndexBatch([]clientmodel.Metric) error
IndexBatch(FingerprintMetricMapping) error
Has(clientmodel.Metric) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
@ -338,11 +339,11 @@ type LevelDBMetricMembershipIndex struct {
var existenceIdentity = new(dto.MembershipIndexValue)
func (i *LevelDBMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
func (i *LevelDBMetricMembershipIndex) IndexBatch(b FingerprintMetricMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for _, m := range ms {
for _, m := range b {
k := new(dto.Metric)
dumpMetric(k, m)
batch.Put(k, existenceIdentity)
@ -391,3 +392,155 @@ func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (*L
p: s,
}, nil
}
// MetricIndexer indexes facets of a clientmodel.Metric.
type MetricIndexer interface {
// IndexMetric makes no assumptions about the concurrency safety of the
// underlying implementer.
IndexMetrics(FingerprintMetricMapping) error
}
// TotalIndexer is a MetricIndexer that indexes all standard facets of a metric
// that a user or the Prometheus subsystem would want to query against:
//
// "<Label Name>" -> {Fingerprint, ...}
// "<Label Name> <Label Value>" -> {Fingerprint, ...}
//
// "<Fingerprint>" -> Metric
//
// "<Metric>" -> Existence Value
//
// This type supports concrete queries but only single writes, and it has no
// locking semantics to enforce this.
type TotalIndexer struct {
FingerprintToMetric FingerprintMetricIndex
LabelNameToFingerprint LabelNameFingerprintIndex
LabelPairToFingerprint LabelPairFingerprintIndex
MetricMembership MetricMembershipIndex
}
func findUnindexed(i MetricMembershipIndex, b FingerprintMetricMapping) (FingerprintMetricMapping, error) {
out := FingerprintMetricMapping{}
for fp, m := range b {
has, err := i.Has(m)
if err != nil {
return nil, err
}
if !has {
out[fp] = m
}
}
return out, nil
}
func extendLabelNameIndex(i LabelNameFingerprintIndex, b FingerprintMetricMapping) (LabelNameFingerprintMapping, error) {
collection := map[clientmodel.LabelName]utility.Set{}
for fp, m := range b {
for l := range m {
set, ok := collection[l]
if !ok {
baseFps, _, err := i.Lookup(l)
if err != nil {
return nil, err
}
set = utility.Set{}
for _, baseFp := range baseFps {
set.Add(*baseFp)
}
collection[l] = set
}
set.Add(fp)
}
}
batch := LabelNameFingerprintMapping{}
for l, set := range collection {
fps := clientmodel.Fingerprints{}
for e := range set {
fp := e.(clientmodel.Fingerprint)
fps = append(fps, &fp)
}
batch[l] = fps
}
return batch, nil
}
func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMapping) (LabelPairFingerprintMapping, error) {
collection := map[LabelPair]utility.Set{}
for fp, m := range b {
for n, v := range m {
pair := LabelPair{
Name: n,
Value: v,
}
set, ok := collection[pair]
if !ok {
baseFps, _, err := i.Lookup(&pair)
if err != nil {
return nil, err
}
set = utility.Set{}
for _, baseFp := range baseFps {
set.Add(*baseFp)
}
collection[pair] = set
}
set.Add(fp)
}
}
batch := LabelPairFingerprintMapping{}
for pair, set := range collection {
fps := batch[pair]
for element := range set {
fp := element.(clientmodel.Fingerprint)
fps = append(fps, &fp)
}
batch[pair] = fps
}
return batch, nil
}
func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error {
unindexed, err := findUnindexed(i.MetricMembership, b)
if err != nil {
return err
}
labelNames, err := extendLabelNameIndex(i.LabelNameToFingerprint, unindexed)
if err != nil {
return err
}
if err := i.LabelNameToFingerprint.IndexBatch(labelNames); err != nil {
return err
}
labelPairs, err := extendLabelPairIndex(i.LabelPairToFingerprint, unindexed)
if err != nil {
return err
}
if err := i.LabelPairToFingerprint.IndexBatch(labelPairs); err != nil {
return err
}
if err := i.FingerprintToMetric.IndexBatch(unindexed); err != nil {
return err
}
return i.MetricMembership.IndexBatch(unindexed)
}

View file

@ -39,10 +39,25 @@ type LevelDBMetricPersistence struct {
CurationRemarks CurationRemarker
fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex
labelSetToFingerprints LabelPairFingerprintIndex
MetricHighWatermarks HighWatermarker
metricMembershipIndex MetricMembershipIndex
MetricSamples *leveldb.LevelDBPersistence
Indexer MetricIndexer
MetricSamples *leveldb.LevelDBPersistence
// The remaining indices will be replaced with generalized interface resolvers:
//
// type FingerprintResolver interface {
// GetFingerprintForMetric(clientmodel.Metric) (*clientmodel.Fingerprint, bool, error)
// GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, bool, error)
// GetFingerprintsForLabelSet(LabelPair) (clientmodel.Fingerprints, bool, error)
// }
// type MetricResolver interface {
// GetMetricsForFingerprint(clientmodel.Fingerprints) (FingerprintMetricMapping, bool, error)
// }
}
var (
@ -228,6 +243,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
return nil, fmt.Errorf("Unable to open metric persistence.")
}
emission.Indexer = &TotalIndexer{
FingerprintToMetric: emission.fingerprintToMetrics,
LabelNameToFingerprint: emission.labelNameToFingerprints,
LabelPairToFingerprint: emission.labelSetToFingerprints,
MetricMembership: emission.metricMembershipIndex,
}
return emission, nil
}
@ -277,197 +299,6 @@ func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint
return fingerprintToSamples
}
// findUnindexedMetrics scours the metric membership index for each given Metric
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
// absent.
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed FingerprintMetricMapping, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
}(time.Now())
unindexed = FingerprintMetricMapping{}
for fingerprint, metric := range candidates {
indexHas, err := l.hasIndexMetric(metric)
if err != nil {
return unindexed, err
}
if !indexHas {
unindexed[fingerprint] = metric
}
}
return unindexed, nil
}
// indexLabelNames accumulates all label name to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelNames(metrics FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
}(time.Now())
retrieved := map[clientmodel.LabelName]utility.Set{}
for fingerprint, metric := range metrics {
for labelName := range metric {
fingerprintSet, ok := retrieved[labelName]
if !ok {
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
if err != nil {
return err
}
fingerprintSet = utility.Set{}
retrieved[labelName] = fingerprintSet
for _, fingerprint := range fingerprints {
fingerprintSet.Add(*fingerprint)
}
}
fingerprintSet.Add(fingerprint)
}
}
pending := LabelNameFingerprintMapping{}
for name, set := range retrieved {
fps := pending[name]
for fp := range set {
f := fp.(clientmodel.Fingerprint)
fps = append(fps, &f)
}
pending[name] = fps
}
return l.labelNameToFingerprints.IndexBatch(pending)
}
// indexLabelPairs accumulates all label pair to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
}(time.Now())
collection := map[LabelPair]utility.Set{}
for fingerprint, metric := range metrics {
for labelName, labelValue := range metric {
labelPair := LabelPair{
Name: labelName,
Value: labelValue,
}
fingerprintSet, ok := collection[labelPair]
if !ok {
fingerprints, _, err := l.labelSetToFingerprints.Lookup(&labelPair)
if err != nil {
return err
}
fingerprintSet = utility.Set{}
for _, fingerprint := range fingerprints {
fingerprintSet.Add(*fingerprint)
}
collection[labelPair] = fingerprintSet
}
fingerprintSet.Add(fingerprint)
}
}
batch := LabelSetFingerprintMapping{}
for pair, elements := range collection {
fps := batch[pair]
for element := range elements {
fp := element.(clientmodel.Fingerprint)
fps = append(fps, &fp)
}
batch[pair] = fps
}
return l.labelSetToFingerprints.IndexBatch(batch)
}
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
// in the index and then bulk updates.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexFingerprints(b FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
}(time.Now())
return l.fingerprintToMetrics.IndexBatch(b)
}
// indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
}(time.Now())
absentees, err := l.findUnindexedMetrics(fingerprints)
if err != nil {
return
}
if len(absentees) == 0 {
return
}
// TODO: For the missing fingerprints, determine what label names and pairs
// are absent and act accordingly and append fingerprints.
workers := utility.NewUncertaintyGroup(3)
go func() {
workers.MayFail(l.indexLabelNames(absentees))
}()
go func() {
workers.MayFail(l.indexLabelPairs(absentees))
}()
go func() {
workers.MayFail(l.indexFingerprints(absentees))
}()
// If any of the preceding operations failed, we will have inconsistent
// indices. Thusly, the Metric membership index should NOT be updated, as
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
if !workers.Wait() {
return fmt.Errorf("Could not index due to %s", workers.Errors())
}
ms := []clientmodel.Metric{}
for _, m := range absentees {
ms = append(ms, m)
}
return l.metricMembershipIndex.IndexBatch(ms)
}
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -505,7 +336,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
metrics[fingerprint] = samples[0].Metric
}
indexErrChan <- l.indexMetrics(metrics)
indexErrChan <- l.Indexer.IndexMetrics(metrics)
}(fingerprintToSamples)
go func(groups map[clientmodel.Fingerprint]clientmodel.Samples) {

View file

@ -95,6 +95,8 @@ type TieredStorage struct {
diskSemaphore chan bool
wmCache *WatermarkCache
Indexer MetricIndexer
}
// viewJob encapsulates a request to extract sample values from the datastore.