diff --git a/appstate/appstate.go b/appstate/appstate.go index 9c77b5f3cb..5170df231b 100644 --- a/appstate/appstate.go +++ b/appstate/appstate.go @@ -26,7 +26,7 @@ import ( type ApplicationState struct { Config config.Config RuleManager rules.RuleManager - Storage metric.Storage + Storage metric.TieredStorage TargetManager retrieval.TargetManager BuildInfo map[string]string CurationState chan metric.CurationState diff --git a/main.go b/main.go index 0404036fa2..6f129deab8 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,7 @@ var ( ) type prometheus struct { - storage metric.Storage + storage metric.TieredStorage // TODO: Refactor channels to work with arrays of results for better chunking. scrapeResults chan format.Result ruleResults chan *rules.Result @@ -86,12 +86,15 @@ func main() { if err != nil { log.Fatalf("Error opening storage: %s", err) } + if ts == nil { + log.Fatalln("Nil tiered storage.") + } scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) prometheus := prometheus{ - storage: ts, + storage: *ts, scrapeResults: scrapeResults, ruleResults: ruleResults, } @@ -105,7 +108,7 @@ func main() { targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) targetManager.AddTargetsFromConfig(conf) - ast.SetStorage(ts) + ast.SetStorage(*ts) ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval()) err = ruleManager.AddRulesFromConfig(conf) @@ -118,7 +121,7 @@ func main() { Config: conf, CurationState: make(chan metric.CurationState), RuleManager: ruleManager, - Storage: ts, + Storage: *ts, TargetManager: targetManager, } diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index a687e636e3..64a6e60cdf 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -24,7 +24,9 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal // AST-global storage to use for operations that are not supported by views // (i.e. metric->fingerprint lookups). -var queryStorage metric.Storage = nil +// +// BUG(julius): Wrap this into non-global state. +var queryStorage *metric.TieredStorage // Describes the lenience limits to apply to values from the materialized view. type StalenessPolicy struct { @@ -167,8 +169,8 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval * return sampleSets, nil } -func SetStorage(storage metric.Storage) { - queryStorage = storage +func SetStorage(storage metric.TieredStorage) { + queryStorage = &storage } func NewViewAdapter(view metric.View) *viewAdapter { diff --git a/rules/rules_test.go b/rules/rules_test.go index 1e13641cb0..cdb1560dc5 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -49,10 +49,13 @@ func vectorComparisonString(expected []string, actual []string) string { separator) } -func newTestStorage(t test.Tester) (storage metric.Storage, closer test.Closer) { +func newTestStorage(t test.Tester) (storage *metric.TieredStorage, closer test.Closer) { storage, closer = metric.NewTestTieredStorage(t) - ast.SetStorage(storage) - storeMatrix(storage, testMatrix) + if storage == nil { + t.Fatal("storage == nil") + } + ast.SetStorage(*storage) + storeMatrix(*storage, testMatrix) return } diff --git a/rules/testdata.go b/rules/testdata.go index 689e28fa51..6a4f4780b5 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -51,7 +51,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { return vector } -func storeMatrix(storage metric.Storage, matrix ast.Matrix) (err error) { +func storeMatrix(storage metric.TieredStorage, matrix ast.Matrix) (err error) { pendingSamples := model.Samples{} for _, sampleSet := range matrix { for _, sample := range sampleSet.Values { diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index f1b5ec22b3..4ca0621951 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -78,7 +78,7 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func } type testTieredStorageCloser struct { - storage Storage + storage *TieredStorage directory test.Closer } @@ -87,7 +87,7 @@ func (t testTieredStorageCloser) Close() { t.directory.Close() } -func NewTestTieredStorage(t test.Tester) (storage Storage, closer test.Closer) { +func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) { var directory test.TemporaryDirectory directory = test.NewTemporaryDirectory("test_tiered_storage", t) storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index d860ec0b2c..9994260c87 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -27,9 +27,9 @@ import ( "time" ) -// tieredStorage both persists samples and generates materialized views for +// TieredStorage both persists samples and generates materialized views for // queries. -type tieredStorage struct { +type TieredStorage struct { appendToDiskQueue chan model.Samples appendToMemoryQueue chan model.Samples diskFrontier *diskFrontier @@ -51,36 +51,13 @@ type viewJob struct { err chan error } -// Provides a unified means for batch appending values into the datastore along -// with querying for values in an efficient way. -type Storage interface { - // Enqueues Samples for storage. - AppendSamples(model.Samples) error - // Enqueus a ViewRequestBuilder for materialization, subject to a timeout. - MakeView(request ViewRequestBuilder, timeout time.Duration) (View, error) - // Starts serving requests. - Serve() - // Stops the storage subsystem, flushing all pending operations. - Drain() - Flush() - Close() - - // Get all label values that are associated with the provided label name. - GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) - // Get all of the metric fingerprints that are associated with the provided - // label set. - GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error) - // Get the metric associated with the provided fingerprint. - GetMetricForFingerprint(model.Fingerprint) (m *model.Metric, err error) -} - -func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage Storage, err error) { +func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { diskStorage, err := NewLevelDBMetricPersistence(root) if err != nil { return } - storage = &tieredStorage{ + storage = &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth), diskStorage: diskStorage, @@ -94,7 +71,8 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu return } -func (t tieredStorage) AppendSamples(s model.Samples) (err error) { +// Enqueues Samples for storage. +func (t TieredStorage) AppendSamples(s model.Samples) (err error) { if len(t.draining) > 0 { return fmt.Errorf("Storage is in the process of draining.") } @@ -104,7 +82,8 @@ func (t tieredStorage) AppendSamples(s model.Samples) (err error) { return } -func (t tieredStorage) Drain() { +// Stops the storage subsystem, flushing all pending operations. +func (t TieredStorage) Drain() { log.Println("Starting drain...") drainingDone := make(chan bool) if len(t.draining) == 0 { @@ -114,7 +93,8 @@ func (t tieredStorage) Drain() { log.Println("Done.") } -func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { +// Enqueus a ViewRequestBuilder for materialization, subject to a timeout. +func (t TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { if len(t.draining) > 0 { err = fmt.Errorf("Storage is in the process of draining.") return @@ -148,7 +128,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat return } -func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { +func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { begin := time.Now() defer func() { duration := time.Since(begin) @@ -163,7 +143,8 @@ func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { return } -func (t *tieredStorage) Serve() { +// Starts serving requests. +func (t TieredStorage) Serve() { flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) defer flushMemoryTicker.Stop() writeMemoryTicker := time.NewTicker(t.writeMemoryInterval) @@ -193,7 +174,7 @@ func (t *tieredStorage) Serve() { } } -func (t *tieredStorage) reportQueues() { +func (t TieredStorage) reportQueues() { queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue))) queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue))) @@ -204,7 +185,7 @@ func (t *tieredStorage) reportQueues() { queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue))) } -func (t *tieredStorage) writeMemory() { +func (t *TieredStorage) writeMemory() { begin := time.Now() defer func() { duration := time.Since(begin) @@ -222,11 +203,11 @@ func (t *tieredStorage) writeMemory() { } } -func (t tieredStorage) Flush() { +func (t TieredStorage) Flush() { t.flush() } -func (t tieredStorage) Close() { +func (t TieredStorage) Close() { log.Println("Closing tiered storage...") t.Drain() t.diskStorage.Close() @@ -239,7 +220,7 @@ func (t tieredStorage) Close() { } // Write all pending appends. -func (t tieredStorage) flush() (err error) { +func (t TieredStorage) flush() (err error) { // Trim any old values to reduce iterative write costs. t.flushMemory() t.writeMemory() @@ -330,7 +311,7 @@ func (f memoryToDiskFlusher) Close() { } // Persist a whole bunch of samples to the datastore. -func (t *tieredStorage) flushMemory() { +func (t *TieredStorage) flushMemory() { begin := time.Now() defer func() { duration := time.Since(begin) @@ -353,7 +334,7 @@ func (t *tieredStorage) flushMemory() { return } -func (t *tieredStorage) renderView(viewJob viewJob) { +func (t TieredStorage) renderView(viewJob viewJob) { // Telemetry. var err error begin := time.Now() @@ -482,7 +463,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) { return } -func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) { +func (t TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) { var ( targetKey = &dto.SampleKey{ Fingerprint: fingerprint.ToDTO(), @@ -558,7 +539,8 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier return } -func (t *tieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { +// Get all label values that are associated with the provided label name. +func (t TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { diskValues, err := t.diskStorage.GetAllValuesForLabel(labelName) if err != nil { return @@ -579,7 +561,9 @@ func (t *tieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values return } -func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) { +// Get all of the metric fingerprints that are associated with the provided +// label set. +func (t TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) { memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) if err != nil { return @@ -599,7 +583,8 @@ func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fin return } -func (t *tieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) { +// Get the metric associated with the provided fingerprint. +func (t TieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) { m, err = t.memoryArena.GetMetricForFingerprint(f) if err != nil { return diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 03789d0369..8ce35eeffe 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -348,7 +348,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { if flushToDisk { tiered.Flush() } else { - tiered.(*tieredStorage).writeMemory() + tiered.writeMemory() } requestBuilder := NewViewRequestBuilder() @@ -480,12 +480,12 @@ func TestGetAllValuesForLabel(t *testing.T) { Metric: model.Metric{model.MetricNameLabel: model.LabelValue(metric.metricName)}, } if metric.appendToMemory { - if err := tiered.(*tieredStorage).memoryArena.AppendSample(sample); err != nil { + if err := tiered.memoryArena.AppendSample(sample); err != nil { t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) } } if metric.appendToDisk { - if err := tiered.(*tieredStorage).diskStorage.AppendSample(sample); err != nil { + if err := tiered.diskStorage.AppendSample(sample); err != nil { t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) } } @@ -517,10 +517,10 @@ func TestGetFingerprintsForLabelSet(t *testing.T) { diskSample := model.Sample{ Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/bar"}, } - if err := tiered.(*tieredStorage).memoryArena.AppendSample(memorySample); err != nil { + if err := tiered.memoryArena.AppendSample(memorySample); err != nil { t.Fatalf("Failed to add fixture data: %s", err) } - if err := tiered.(*tieredStorage).diskStorage.AppendSample(diskSample); err != nil { + if err := tiered.diskStorage.AppendSample(diskSample); err != nil { t.Fatalf("Failed to add fixture data: %s", err) } tiered.Flush()