From a5141e4d0a7b7a2644d4dc35acfb9cca831c8768 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sat, 10 Aug 2013 17:02:28 +0200 Subject: [PATCH] Depointerize storage conf. and chain ingester. The storage builders need to work with the assumption that they have a copy of the underlying configuration data if any mutations are made. --- notification/notification.go | 4 ++-- retrieval/target.go | 20 +++++++++++++++----- retrieval/target_test.go | 10 +++++----- storage/metric/index.go | 16 ++++++++-------- storage/metric/leveldb.go | 17 ++++++++--------- storage/metric/processor_test.go | 12 ++++++------ storage/metric/watermark.go | 8 ++++---- storage/raw/index/leveldb/leveldb.go | 4 ++-- storage/raw/leveldb/leveldb.go | 2 +- storage/raw/leveldb/test/fixtures.go | 5 ++--- 10 files changed, 53 insertions(+), 45 deletions(-) diff --git a/notification/notification.go b/notification/notification.go index f13f13ba4..77c1cad90 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -129,8 +129,8 @@ func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error { "Description": interpolateMessage(req.Description, req.Labels, req.Value), "Labels": req.Labels, "Payload": map[string]interface{}{ - "Value": req.Value, - "ActiveSince": req.ActiveSince, + "Value": req.Value, + "ActiveSince": req.ActiveSince, "GeneratorUrl": n.prometheusUrl, "AlertingRule": req.RuleString, }, diff --git a/retrieval/target.go b/retrieval/target.go index 9f90406e7..d12b9bd97 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -202,17 +202,26 @@ func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) e const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1` +type channelIngester chan<- *extraction.Result + +func (i channelIngester) Ingest(r *extraction.Result) error { + i <- r + + return nil +} + type extendLabelsIngester struct { baseLabels clientmodel.LabelSet - results chan<- *extraction.Result + + i extraction.Ingester } func (i *extendLabelsIngester) Ingest(r *extraction.Result) error { for _, s := range r.Samples { s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix) } - i.results <- r - return nil + + return i.i.Ingest(r) } func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) { @@ -261,10 +270,11 @@ func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) ingester := &extendLabelsIngester{ baseLabels: baseLabels, - results: results, + + i: channelIngester(results), } processOptions := &extraction.ProcessOptions{ - Timestamp: timestamp, + Timestamp: timestamp, } return processor.ProcessSingle(buf, ingester, processOptions) } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index aa1b93711..a6cab0206 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -28,10 +28,10 @@ import ( func TestTargetScrapeUpdatesState(t *testing.T) { testTarget := target{ - scheduler: literalScheduler{}, - state: UNKNOWN, - address: "bad schema", - httpClient: utility.NewDeadlineClient(0), + scheduler: literalScheduler{}, + state: UNKNOWN, + address: "bad schema", + httpClient: utility.NewDeadlineClient(0), } testTarget.Scrape(time.Time{}, make(chan *extraction.Result, 2)) if testTarget.state != UNREACHABLE { @@ -44,7 +44,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) { scheduler: literalScheduler{}, address: "http://example.url", baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, - httpClient: utility.NewDeadlineClient(0), + httpClient: utility.NewDeadlineClient(0), } now := time.Now() diff --git a/storage/metric/index.go b/storage/metric/index.go index 1ba652c97..18285f441 100644 --- a/storage/metric/index.go +++ b/storage/metric/index.go @@ -101,8 +101,8 @@ func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) { return false, nil } -func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) { - s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) { + s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } @@ -201,8 +201,8 @@ type LevelDBLabelNameFingerprintIndexOptions struct { leveldb.LevelDBOptions } -func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) { - s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) { + s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } @@ -313,8 +313,8 @@ func (i *LevelDBLabelPairFingerprintIndex) State() *raw.DatabaseState { return i.p.State() } -func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) { - s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) { + s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } @@ -382,8 +382,8 @@ type LevelDBMetricMembershipIndexOptions struct { leveldb.LevelDBOptions } -func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) { - s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) { + s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index f901385f3..7ce3d87a9 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -128,7 +128,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Label Names and Value Pairs by Fingerprint", func() { var err error - emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(&LevelDBFingerprintMetricIndexOptions{ + emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(LevelDBFingerprintMetricIndexOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "Metrics by Fingerprint", Purpose: "Index", @@ -143,13 +143,12 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Samples by Fingerprint", func() { var err error - o := &leveldb.LevelDBOptions{ + emission.MetricSamples, err = leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{ Name: "Samples", Purpose: "Timeseries", Path: baseDirectory + "/samples_by_fingerprint", CacheSizeBytes: *fingerprintsToLabelPairCacheSize, - } - emission.MetricSamples, err = leveldb.NewLevelDBPersistence(o) + }) workers.MayFail(err) }, }, @@ -157,7 +156,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "High Watermarks by Fingerprint", func() { var err error - emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ + emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(LevelDBHighWatermarkerOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "High Watermarks", Purpose: "The youngest sample in the database per metric.", @@ -171,7 +170,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name", func() { var err error - emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(&LevelDBLabelNameFingerprintIndexOptions{ + emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(LevelDBLabelNameFingerprintIndexOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "Fingerprints by Label Name", Purpose: "Index", @@ -186,7 +185,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name and Value Pair", func() { var err error - emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(&LevelDBLabelSetFingerprintIndexOptions{ + emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "Fingerprints by Label Pair", Purpose: "Index", @@ -202,7 +201,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc func() { var err error emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex( - &LevelDBMetricMembershipIndexOptions{ + LevelDBMetricMembershipIndexOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "Metric Membership", Purpose: "Index", @@ -217,7 +216,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Sample Curation Remarks", func() { var err error - emission.CurationRemarks, err = NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ + emission.CurationRemarks, err = NewLevelDBCurationRemarker(LevelDBCurationRemarkerOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "Sample Curation Remarks", Purpose: "Ledger of Progress for Various Curators", diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 1b5921a78..775838261 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -845,7 +845,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ + curatorStates, err := NewLevelDBCurationRemarker(LevelDBCurationRemarkerOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Path: curatorDirectory.Path(), }, @@ -854,7 +854,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { t.Fatal(err) } - watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ + watermarkStates, err := NewLevelDBHighWatermarker(LevelDBHighWatermarkerOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Path: watermarkDirectory.Path(), }, @@ -864,7 +864,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { } defer watermarkStates.Close() - samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ + samples, err := leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{ Path: sampleDirectory.Path(), }) if err != nil { @@ -1370,7 +1370,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ + curatorStates, err := NewLevelDBCurationRemarker(LevelDBCurationRemarkerOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Path: curatorDirectory.Path(), }, @@ -1380,7 +1380,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { } defer curatorStates.Close() - watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ + watermarkStates, err := NewLevelDBHighWatermarker(LevelDBHighWatermarkerOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Path: watermarkDirectory.Path(), }, @@ -1390,7 +1390,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { } defer watermarkStates.Close() - samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{Path: sampleDirectory.Path()}) + samples, err := leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{Path: sampleDirectory.Path()}) if err != nil { t.Fatal(err) } diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index b509b28c6..b0a6fe1be 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -254,8 +254,8 @@ type LevelDBHighWatermarkerOptions struct { leveldb.LevelDBOptions } -func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) { - s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) { + s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } @@ -323,8 +323,8 @@ func (w *LevelDBCurationRemarker) Update(pair *curationKey, t time.Time) error { }) } -func NewLevelDBCurationRemarker(o *LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) { - s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) { + s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 43f8ce444..a498f8c14 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -48,8 +48,8 @@ type LevelDBIndexOptions struct { leveldb.LevelDBOptions } -func NewLevelDBMembershipIndex(o *LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) { - leveldbPersistence, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) +func NewLevelDBMembershipIndex(o LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) { + leveldbPersistence, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { return nil, err } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 8709c4882..0bd25a54c 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -184,7 +184,7 @@ type LevelDBOptions struct { Compression Compression } -func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) { +func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) { options := levigo.NewOptions() options.SetCreateIfMissing(true) options.SetParanoidChecks(o.UseParanoidChecks) diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 808df886b..cee367511 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -61,11 +61,10 @@ type ( func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { t = test.NewTemporaryDirectory(n, p.tester) - o := &leveldb.LevelDBOptions{ + persistence, err := leveldb.NewLevelDBPersistence(leveldb.LevelDBOptions{ Path: t.Path(), CacheSizeBytes: cacheCapacity, - } - persistence, err := leveldb.NewLevelDBPersistence(o) + }) if err != nil { defer t.Close() p.tester.Fatal(err)