Make index package more self-contained.

Moved interna from diskPersistence into the indexer.
TotalIndexer now called diskIndexer.

Change-Id: I6c8c62cb171f12bbd8a5474773af7786d71ba388
This commit is contained in:
Bjoern Rabenstein 2014-09-09 15:13:07 +02:00
parent 89f10e8eb2
commit 4770cf76a4
4 changed files with 103 additions and 92 deletions

View file

@ -1,14 +1,32 @@
package index package index
import ( import (
"flag"
"os"
"path"
"sync" "sync"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
const (
fingerprintToMetricDir = "fingerprint_to_metric"
labelNameToLabelValuesDir = "labelname_to_labelvalues"
labelPairToFingerprintsDir = "labelpair_to_fingerprints"
fingerprintMembershipDir = "fingerprint_membership"
)
var (
fingerprintToMetricCacheSize = flag.Int("storage.fingerprintToMetricCacheSizeBytes", 25*1024*1024, "The size in bytes for the fingerprint to metric index cache.")
labelNameToLabelValuesCacheSize = flag.Int("storage.labelNameToLabelValuesCacheSizeBytes", 25*1024*1024, "The size in bytes for the label name to label values index cache.")
labelPairToFingerprintsCacheSize = flag.Int("storage.labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size in bytes for the label pair to fingerprints index cache.")
fingerprintMembershipCacheSize = flag.Int("storage.fingerprintMembershipCacheSizeBytes", 5*1024*1024, "The size in bytes for the metric membership index cache.")
)
// FingerprintMetricMapping is an in-memory map of fingerprints to metrics. // FingerprintMetricMapping is an in-memory map of fingerprints to metrics.
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
@ -208,23 +226,61 @@ func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
return i.i.IndexMetrics(b) return i.i.IndexMetrics(b)
} }
// DiskIndexer is a MetricIndexer that indexes all standard facets of a metric // diskIndexer is a MetricIndexer that keeps all indexes in levelDBs except the
// that a user or the Prometheus subsystem would want to query against: // fingerprint-to-metric index for non-archived metrics (which is kept in a
// normal in-memory map, but serialized to disk at shutdown and deserialized at
// startup).
// //
// <Fingerprint> -> <existence marker> // TODO: Talk about concurrency!
// <Label Name> -> {<Label Value>, ...} type diskIndexer struct {
// <Label Name> <Label Value> -> {<Fingerprint>, ...}
// <Fingerprint> -> <Metric>
//
// This type supports concurrent queries, but only single writes, and it has no
// locking semantics to enforce this.
type DiskIndexer struct {
FingerprintToMetric *FingerprintMetricIndex FingerprintToMetric *FingerprintMetricIndex
LabelNameToLabelValues *LabelNameLabelValuesIndex LabelNameToLabelValues *LabelNameLabelValuesIndex
LabelPairToFingerprints *LabelPairFingerprintIndex LabelPairToFingerprints *LabelPairFingerprintIndex
FingerprintMembership *FingerprintMembershipIndex FingerprintMembership *FingerprintMembershipIndex
} }
func NewDiskIndexer(basePath string) (MetricIndexer, error) {
err := os.MkdirAll(basePath, 0700)
if err != nil {
return nil, err
}
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintToMetricDir),
CacheSizeBytes: *fingerprintToMetricCacheSize,
})
if err != nil {
return nil, err
}
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelNameToLabelValuesDir),
CacheSizeBytes: *labelNameToLabelValuesCacheSize,
})
if err != nil {
return nil, err
}
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelPairToFingerprintsDir),
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
})
if err != nil {
return nil, err
}
fingerprintMembershipDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintMembershipDir),
CacheSizeBytes: *fingerprintMembershipCacheSize,
})
if err != nil {
return nil, err
}
return &diskIndexer{
FingerprintToMetric: NewFingerprintMetricIndex(fingerprintToMetricDB),
LabelNameToLabelValues: NewLabelNameLabelValuesIndex(labelNameToLabelValuesDB),
LabelPairToFingerprints: NewLabelPairFingerprintIndex(labelPairToFingerprintsDB),
FingerprintMembership: NewFingerprintMembershipIndex(fingerprintMembershipDB),
}, nil
}
func findUnindexed(i *FingerprintMembershipIndex, b FingerprintMetricMapping) (FingerprintMetricMapping, error) { func findUnindexed(i *FingerprintMembershipIndex, b FingerprintMetricMapping) (FingerprintMetricMapping, error) {
out := FingerprintMetricMapping{} out := FingerprintMetricMapping{}
@ -384,7 +440,7 @@ func extendLabelPairIndex(i *LabelPairFingerprintIndex, b FingerprintMetricMappi
// IndexMetrics adds the facets of all unindexed metrics found in the given // IndexMetrics adds the facets of all unindexed metrics found in the given
// FingerprintMetricMapping to the corresponding indices. // FingerprintMetricMapping to the corresponding indices.
func (i *DiskIndexer) IndexMetrics(b FingerprintMetricMapping) error { func (i *diskIndexer) IndexMetrics(b FingerprintMetricMapping) error {
unindexed, err := findUnindexed(i.FingerprintMembership, b) unindexed, err := findUnindexed(i.FingerprintMembership, b)
if err != nil { if err != nil {
return err return err
@ -414,7 +470,7 @@ func (i *DiskIndexer) IndexMetrics(b FingerprintMetricMapping) error {
} }
// UnindexMetrics implements MetricIndexer. // UnindexMetrics implements MetricIndexer.
func (i *DiskIndexer) UnindexMetrics(b FingerprintMetricMapping) error { func (i *diskIndexer) UnindexMetrics(b FingerprintMetricMapping) error {
indexed, err := findIndexed(i.FingerprintMembership, b) indexed, err := findIndexed(i.FingerprintMembership, b)
if err != nil { if err != nil {
return err return err
@ -440,19 +496,19 @@ func (i *DiskIndexer) UnindexMetrics(b FingerprintMetricMapping) error {
return i.FingerprintMembership.UnindexBatch(indexed) return i.FingerprintMembership.UnindexBatch(indexed)
} }
func (i *DiskIndexer) ArchiveMetrics(fp clientmodel.Fingerprint, first, last clientmodel.Timestamp) error { func (i *diskIndexer) ArchiveMetrics(fp clientmodel.Fingerprint, first, last clientmodel.Timestamp) error {
// TODO: implement. // TODO: implement.
return nil return nil
} }
// GetMetricForFingerprint implements MetricIndexer. // GetMetricForFingerprint implements MetricIndexer.
func (i *DiskIndexer) GetMetricForFingerprint(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { func (i *diskIndexer) GetMetricForFingerprint(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
m, _, err := i.FingerprintToMetric.Lookup(fp) m, _, err := i.FingerprintToMetric.Lookup(fp)
return m, err return m, err
} }
// GetFingerprintsForLabelPair implements MetricIndexer. // GetFingerprintsForLabelPair implements MetricIndexer.
func (i *DiskIndexer) GetFingerprintsForLabelPair(ln clientmodel.LabelName, lv clientmodel.LabelValue) (clientmodel.Fingerprints, error) { func (i *diskIndexer) GetFingerprintsForLabelPair(ln clientmodel.LabelName, lv clientmodel.LabelValue) (clientmodel.Fingerprints, error) {
fps, _, err := i.LabelPairToFingerprints.Lookup(&metric.LabelPair{ fps, _, err := i.LabelPairToFingerprints.Lookup(&metric.LabelPair{
Name: ln, Name: ln,
Value: lv, Value: lv,
@ -461,23 +517,39 @@ func (i *DiskIndexer) GetFingerprintsForLabelPair(ln clientmodel.LabelName, lv c
} }
// GetLabelValuesForLabelName implements MetricIndexer. // GetLabelValuesForLabelName implements MetricIndexer.
func (i *DiskIndexer) GetLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { func (i *diskIndexer) GetLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) {
lvs, _, err := i.LabelNameToLabelValues.Lookup(ln) lvs, _, err := i.LabelNameToLabelValues.Lookup(ln)
return lvs, err return lvs, err
} }
// HasFingerprint implements MetricIndexer. // HasFingerprint implements MetricIndexer.
func (i *DiskIndexer) HasFingerprint(fp clientmodel.Fingerprint) (bool, error) { func (i *diskIndexer) HasFingerprint(fp clientmodel.Fingerprint) (bool, error) {
// TODO: modify. // TODO: modify.
return i.FingerprintMembership.Has(fp) return i.FingerprintMembership.Has(fp)
} }
func (i *DiskIndexer) HasArchivedFingerprint(clientmodel.Fingerprint) (present bool, first, last clientmodel.Timestamp, err error) { func (i *diskIndexer) HasArchivedFingerprint(clientmodel.Fingerprint) (present bool, first, last clientmodel.Timestamp, err error) {
// TODO: implement. // TODO: implement.
return false, 0, 0, nil return false, 0, 0, nil
} }
func (i *DiskIndexer) Close() error { func (i *diskIndexer) Close() error {
// TODO: implement var lastError error
return nil if err := i.FingerprintToMetric.Close(); err != nil {
glog.Error("Error closing FingerprintToMetric index DB: ", err)
lastError = err
}
if err := i.LabelNameToLabelValues.Close(); err != nil {
glog.Error("Error closing LabelNameToLabelValues index DB: ", err)
lastError = err
}
if err := i.LabelPairToFingerprints.Close(); err != nil {
glog.Error("Error closing LabelPairToFingerprints index DB: ", err)
lastError = err
}
if err := i.FingerprintMembership.Close(); err != nil {
glog.Error("Error closing FingerprintMembership index DB: ", err)
lastError = err
}
return lastError
} }

View file

@ -31,7 +31,7 @@ func newTestDB(t *testing.T) (KeyValueStore, test.Closer) {
}) })
} }
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics FingerprintMetricMapping, indexer *DiskIndexer) { func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics FingerprintMetricMapping, indexer *diskIndexer) {
for fp, m := range indexedFpsToMetrics { for fp, m := range indexedFpsToMetrics {
// Compare indexed metrics with input metrics. // Compare indexed metrics with input metrics.
mOut, ok, err := indexer.FingerprintToMetric.Lookup(fp) mOut, ok, err := indexer.FingerprintToMetric.Lookup(fp)
@ -224,7 +224,7 @@ func TestIndexing(t *testing.T) {
fpMsDB, fpMsCloser := newTestDB(t) fpMsDB, fpMsCloser := newTestDB(t)
defer fpMsCloser.Close() defer fpMsCloser.Close()
indexer := DiskIndexer{ indexer := diskIndexer{
FingerprintToMetric: NewFingerprintMetricIndex(fpToMetricDB), FingerprintToMetric: NewFingerprintMetricIndex(fpToMetricDB),
LabelNameToLabelValues: NewLabelNameLabelValuesIndex(lnToLvsDB), LabelNameToLabelValues: NewLabelNameLabelValuesIndex(lnToLvsDB),
LabelPairToFingerprints: NewLabelPairFingerprintIndex(lpToFpDB), LabelPairToFingerprints: NewLabelPairFingerprintIndex(lpToFpDB),

View file

@ -3,7 +3,6 @@ package storage_ng
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"flag"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -11,8 +10,6 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
//"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
@ -24,11 +21,6 @@ const (
headsFileName = "heads.db" headsFileName = "heads.db"
indexDirName = "index" indexDirName = "index"
fingerprintToMetricDir = "fingerprint_to_metric"
labelNameToLabelValuesDir = "labelname_to_labelvalues"
labelPairToFingerprintsDir = "labelpair_to_fingerprints"
fingerprintMembershipDir = "fingerprint_membership"
chunkHeaderLen = 17 chunkHeaderLen = 17
chunkHeaderTypeOffset = 0 chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1 chunkHeaderFirstTimeOffset = 1
@ -39,16 +31,8 @@ const (
headsHeaderTypeOffset = 8 headsHeaderTypeOffset = 8
) )
var (
fingerprintToMetricCacheSize = flag.Int("storage.fingerprintToMetricCacheSizeBytes", 25*1024*1024, "The size in bytes for the fingerprint to metric index cache.")
labelNameToLabelValuesCacheSize = flag.Int("storage.labelNameToLabelValuesCacheSizeBytes", 25*1024*1024, "The size in bytes for the label name to label values index cache.")
labelPairToFingerprintsCacheSize = flag.Int("storage.labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size in bytes for the label pair to fingerprints index cache.")
fingerprintMembershipCacheSize = flag.Int("storage.fingerprintMembershipCacheSizeBytes", 5*1024*1024, "The size in bytes for the metric membership index cache.")
)
type diskPersistence struct { type diskPersistence struct {
index.MetricIndexer index.MetricIndexer
indexDBs []index.KeyValueStore
basePath string basePath string
chunkLen int chunkLen int
@ -56,56 +40,16 @@ type diskPersistence struct {
} }
func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
err := os.MkdirAll(basePath, 0700) metricIndexer, err := index.NewDiskIndexer(basePath)
if err != nil {
return nil, err
}
fingerprintToMetricDB, err := index.NewLevelDB(index.LevelDBOptions{
Path: path.Join(basePath, fingerprintToMetricDir),
CacheSizeBytes: *fingerprintToMetricCacheSize,
})
if err != nil {
return nil, err
}
labelNameToLabelValuesDB, err := index.NewLevelDB(index.LevelDBOptions{
Path: path.Join(basePath, labelNameToLabelValuesDir),
CacheSizeBytes: *labelNameToLabelValuesCacheSize,
})
if err != nil {
return nil, err
}
labelPairToFingerprintsDB, err := index.NewLevelDB(index.LevelDBOptions{
Path: path.Join(basePath, labelPairToFingerprintsDir),
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
})
if err != nil {
return nil, err
}
fingerprintMembershipDB, err := index.NewLevelDB(index.LevelDBOptions{
Path: path.Join(basePath, fingerprintMembershipDir),
CacheSizeBytes: *fingerprintMembershipCacheSize,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &diskPersistence{ return &diskPersistence{
basePath: basePath, basePath: basePath,
chunkLen: chunkLen, chunkLen: chunkLen,
buf: make([]byte, binary.MaxVarintLen64), // Also sufficient for uint64. buf: make([]byte, binary.MaxVarintLen64), // Also sufficient for uint64.
MetricIndexer: &index.DiskIndexer{ MetricIndexer: metricIndexer,
FingerprintToMetric: index.NewFingerprintMetricIndex(fingerprintToMetricDB),
LabelNameToLabelValues: index.NewLabelNameLabelValuesIndex(labelNameToLabelValuesDB),
LabelPairToFingerprints: index.NewLabelPairFingerprintIndex(labelPairToFingerprintsDB),
FingerprintMembership: index.NewFingerprintMembershipIndex(fingerprintMembershipDB),
},
indexDBs: []index.KeyValueStore{
fingerprintToMetricDB,
labelNameToLabelValuesDB,
labelPairToFingerprintsDB,
fingerprintMembershipDB,
},
}, nil }, nil
} }
@ -396,12 +340,6 @@ func (p *diskPersistence) LoadHeads(fpToSeries map[clientmodel.Fingerprint]*memo
} }
func (d *diskPersistence) Close() error { func (d *diskPersistence) Close() error {
var lastError error // TODO: Move persistHeads here once fingerprintToSeries map is here.
for _, db := range d.indexDBs { return d.MetricIndexer.Close()
if err := db.Close(); err != nil {
glog.Error("Error closing index DB: ", err)
lastError = err
}
}
return lastError
} }

View file

@ -29,6 +29,7 @@ type memorySeriesStorage struct {
persistDone chan bool persistDone chan bool
stopServing chan chan<- bool stopServing chan chan<- bool
// TODO: These have to go to persistence.
fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries
labelPairToFingerprints map[metric.LabelPair]utility.Set labelPairToFingerprints map[metric.LabelPair]utility.Set
labelNameToLabelValues map[clientmodel.LabelName]utility.Set labelNameToLabelValues map[clientmodel.LabelName]utility.Set