Merge pull request #357 from prometheus/refactor/storage/index-pipelines

Depointerize storage conf. and chain ingester.
This commit is contained in:
juliusv 2013-08-12 08:17:03 -07:00
commit 83fb0a9a2d
10 changed files with 53 additions and 45 deletions

View file

@ -129,8 +129,8 @@ func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error {
"Description": interpolateMessage(req.Description, req.Labels, req.Value), "Description": interpolateMessage(req.Description, req.Labels, req.Value),
"Labels": req.Labels, "Labels": req.Labels,
"Payload": map[string]interface{}{ "Payload": map[string]interface{}{
"Value": req.Value, "Value": req.Value,
"ActiveSince": req.ActiveSince, "ActiveSince": req.ActiveSince,
"GeneratorUrl": n.prometheusUrl, "GeneratorUrl": n.prometheusUrl,
"AlertingRule": req.RuleString, "AlertingRule": req.RuleString,
}, },

View file

@ -202,17 +202,26 @@ func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) e
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1` const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1`
type channelIngester chan<- *extraction.Result
func (i channelIngester) Ingest(r *extraction.Result) error {
i <- r
return nil
}
type extendLabelsIngester struct { type extendLabelsIngester struct {
baseLabels clientmodel.LabelSet baseLabels clientmodel.LabelSet
results chan<- *extraction.Result
i extraction.Ingester
} }
func (i *extendLabelsIngester) Ingest(r *extraction.Result) error { func (i *extendLabelsIngester) Ingest(r *extraction.Result) error {
for _, s := range r.Samples { for _, s := range r.Samples {
s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix) s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix)
} }
i.results <- r
return nil return i.i.Ingest(r)
} }
func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) { func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) {
@ -261,10 +270,11 @@ func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result)
ingester := &extendLabelsIngester{ ingester := &extendLabelsIngester{
baseLabels: baseLabels, baseLabels: baseLabels,
results: results,
i: channelIngester(results),
} }
processOptions := &extraction.ProcessOptions{ processOptions := &extraction.ProcessOptions{
Timestamp: timestamp, Timestamp: timestamp,
} }
return processor.ProcessSingle(buf, ingester, processOptions) return processor.ProcessSingle(buf, ingester, processOptions)
} }

View file

@ -28,10 +28,10 @@ import (
func TestTargetScrapeUpdatesState(t *testing.T) { func TestTargetScrapeUpdatesState(t *testing.T) {
testTarget := target{ testTarget := target{
scheduler: literalScheduler{}, scheduler: literalScheduler{},
state: UNKNOWN, state: UNKNOWN,
address: "bad schema", address: "bad schema",
httpClient: utility.NewDeadlineClient(0), httpClient: utility.NewDeadlineClient(0),
} }
testTarget.Scrape(time.Time{}, make(chan *extraction.Result, 2)) testTarget.Scrape(time.Time{}, make(chan *extraction.Result, 2))
if testTarget.state != UNREACHABLE { if testTarget.state != UNREACHABLE {
@ -44,7 +44,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
scheduler: literalScheduler{}, scheduler: literalScheduler{},
address: "http://example.url", address: "http://example.url",
baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"},
httpClient: utility.NewDeadlineClient(0), httpClient: utility.NewDeadlineClient(0),
} }
now := time.Now() now := time.Now()

View file

@ -101,8 +101,8 @@ func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) {
return false, nil return false, nil
} }
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) { func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -201,8 +201,8 @@ type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) { func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -313,8 +313,8 @@ func (i *LevelDBLabelPairFingerprintIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) { func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -382,8 +382,8 @@ type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) { func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -128,7 +128,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Label Names and Value Pairs by Fingerprint", "Label Names and Value Pairs by Fingerprint",
func() { func() {
var err error var err error
emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(&LevelDBFingerprintMetricIndexOptions{ emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(LevelDBFingerprintMetricIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "Metrics by Fingerprint", Name: "Metrics by Fingerprint",
Purpose: "Index", Purpose: "Index",
@ -143,13 +143,12 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Samples by Fingerprint", "Samples by Fingerprint",
func() { func() {
var err error var err error
o := &leveldb.LevelDBOptions{ emission.MetricSamples, err = leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{
Name: "Samples", Name: "Samples",
Purpose: "Timeseries", Purpose: "Timeseries",
Path: baseDirectory + "/samples_by_fingerprint", Path: baseDirectory + "/samples_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize, CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
} })
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -157,7 +156,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"High Watermarks by Fingerprint", "High Watermarks by Fingerprint",
func() { func() {
var err error var err error
emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "High Watermarks", Name: "High Watermarks",
Purpose: "The youngest sample in the database per metric.", Purpose: "The youngest sample in the database per metric.",
@ -171,7 +170,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name", "Fingerprints by Label Name",
func() { func() {
var err error var err error
emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(&LevelDBLabelNameFingerprintIndexOptions{ emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(LevelDBLabelNameFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Name", Name: "Fingerprints by Label Name",
Purpose: "Index", Purpose: "Index",
@ -186,7 +185,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair", "Fingerprints by Label Name and Value Pair",
func() { func() {
var err error var err error
emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(&LevelDBLabelSetFingerprintIndexOptions{ emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Pair", Name: "Fingerprints by Label Pair",
Purpose: "Index", Purpose: "Index",
@ -202,7 +201,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
func() { func() {
var err error var err error
emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex( emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex(
&LevelDBMetricMembershipIndexOptions{ LevelDBMetricMembershipIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "Metric Membership", Name: "Metric Membership",
Purpose: "Index", Purpose: "Index",
@ -217,7 +216,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Sample Curation Remarks", "Sample Curation Remarks",
func() { func() {
var err error var err error
emission.CurationRemarks, err = NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ emission.CurationRemarks, err = NewLevelDBCurationRemarker(LevelDBCurationRemarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "Sample Curation Remarks", Name: "Sample Curation Remarks",
Purpose: "Ledger of Progress for Various Curators", Purpose: "Ledger of Progress for Various Curators",

View file

@ -845,7 +845,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close() defer sampleDirectory.Close()
curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ curatorStates, err := NewLevelDBCurationRemarker(LevelDBCurationRemarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Path: curatorDirectory.Path(), Path: curatorDirectory.Path(),
}, },
@ -854,7 +854,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ watermarkStates, err := NewLevelDBHighWatermarker(LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(), Path: watermarkDirectory.Path(),
}, },
@ -864,7 +864,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
} }
defer watermarkStates.Close() defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ samples, err := leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{
Path: sampleDirectory.Path(), Path: sampleDirectory.Path(),
}) })
if err != nil { if err != nil {
@ -1370,7 +1370,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close() defer sampleDirectory.Close()
curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ curatorStates, err := NewLevelDBCurationRemarker(LevelDBCurationRemarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Path: curatorDirectory.Path(), Path: curatorDirectory.Path(),
}, },
@ -1380,7 +1380,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
} }
defer curatorStates.Close() defer curatorStates.Close()
watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ watermarkStates, err := NewLevelDBHighWatermarker(LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(), Path: watermarkDirectory.Path(),
}, },
@ -1390,7 +1390,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
} }
defer watermarkStates.Close() defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{Path: sampleDirectory.Path()}) samples, err := leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{Path: sampleDirectory.Path()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -254,8 +254,8 @@ type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) { func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -323,8 +323,8 @@ func (w *LevelDBCurationRemarker) Update(pair *curationKey, t time.Time) error {
}) })
} }
func NewLevelDBCurationRemarker(o *LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) { func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -48,8 +48,8 @@ type LevelDBIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelDBMembershipIndex(o *LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) { func NewLevelDBMembershipIndex(o LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) {
leveldbPersistence, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) leveldbPersistence, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -184,7 +184,7 @@ type LevelDBOptions struct {
Compression Compression Compression Compression
} }
func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) { func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
options := levigo.NewOptions() options := levigo.NewOptions()
options.SetCreateIfMissing(true) options.SetCreateIfMissing(true)
options.SetParanoidChecks(o.UseParanoidChecks) options.SetParanoidChecks(o.UseParanoidChecks)

View file

@ -61,11 +61,10 @@ type (
func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) {
t = test.NewTemporaryDirectory(n, p.tester) t = test.NewTemporaryDirectory(n, p.tester)
o := &leveldb.LevelDBOptions{ persistence, err := leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{
Path: t.Path(), Path: t.Path(),
CacheSizeBytes: cacheCapacity, CacheSizeBytes: cacheCapacity,
} })
persistence, err := leveldb.NewLevelDBPersistence(o)
if err != nil { if err != nil {
defer t.Close() defer t.Close()
p.tester.Fatal(err) p.tester.Fatal(err)