From f5f9f3514a12bb9332ae85187377827d518370a8 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 16 Sep 2014 15:47:24 +0200 Subject: [PATCH] Major code cleanup. - Make it go-vet and golint clean. - Add comments, TODOs, etc. Change-Id: If1392d96f3d5b4cdde597b10c8dff1769fcfabe2 --- main.go | 9 +- rules/alerting.go | 4 +- rules/ast/ast.go | 20 ++--- rules/ast/printer.go | 4 +- rules/ast/query_analyzer.go | 10 +-- rules/manager/manager.go | 4 +- rules/recording.go | 4 +- rules/rules.go | 4 +- storage/local/chunk.go | 32 ++++++- storage/local/codec/codec.go | 116 +++++++++++++++++++----- storage/local/codec/codec_test.go | 4 +- storage/local/delta.go | 24 +++-- storage/local/index/index.go | 113 ++++++++++++++++++----- storage/local/index/interface.go | 24 +++-- storage/local/index/leveldb.go | 15 +++- storage/local/instrumentation.go | 2 +- storage/local/interface.go | 20 +++-- storage/local/persistence.go | 143 ++++++++++++++++++------------ storage/local/persistence_test.go | 6 +- storage/local/preload.go | 2 +- storage/local/series.go | 81 +++++++++++++---- storage/local/storage.go | 58 ++++++++---- storage/local/storage_test.go | 2 +- storage/local/test_helpers.go | 5 +- templates/templates.go | 4 +- web/api/api.go | 2 +- web/consoles.go | 2 +- 27 files changed, 514 insertions(+), 200 deletions(-) diff --git a/main.go b/main.go index 7ea0347cf3..5523bfc39f 100644 --- a/main.go +++ b/main.go @@ -71,7 +71,7 @@ type prometheus struct { ruleManager manager.RuleManager targetManager retrieval.TargetManager notifications chan notification.NotificationReqs - storage storage_ng.Storage + storage local.Storage remoteTSDBQueue *remote.TSDBQueueManager closeOnce sync.Once @@ -137,20 +137,19 @@ func main() { glog.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - persistence, err := storage_ng.NewDiskPersistence(*metricsStoragePath, 1024) + persistence, err := local.NewDiskPersistence(*metricsStoragePath, 1024) if err != nil { glog.Fatal("Error opening disk persistence: ", err) } - defer persistence.Close() - o := &storage_ng.MemorySeriesStorageOptions{ + o := &local.MemorySeriesStorageOptions{ Persistence: persistence, MemoryEvictionInterval: *memoryEvictionInterval, MemoryRetentionPeriod: *memoryRetentionPeriod, PersistencePurgeInterval: *storagePurgeInterval, PersistenceRetentionPeriod: *storageRetentionPeriod, } - memStorage, err := storage_ng.NewMemorySeriesStorage(o) + memStorage, err := local.NewMemorySeriesStorage(o) if err != nil { glog.Fatal("Error opening memory series storage: ", err) } diff --git a/rules/alerting.go b/rules/alerting.go index 062f0d258e..d8caee7d5e 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -118,11 +118,11 @@ func (rule *AlertingRule) Name() string { return rule.name } -func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) { +func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) { return ast.EvalVectorInstant(rule.Vector, timestamp, storage, stats.NewTimerGroup()) } -func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) { +func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) { // Get the raw value of the rule expression. exprResult, err := rule.EvalRaw(timestamp, storage) if err != nil { diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 87359c36fc..215269c518 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -25,11 +25,11 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/stats" - "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/local" + "github.com/prometheus/prometheus/storage/metric" ) -var defaultStalenessDelta = flag.Duration("defaultStalenessDelta", 300*time.Second, "Default staleness delta allowance in seconds during expression evaluations.") +var stalenessDelta = flag.Duration("stalenessDelta", 300*time.Second, "Staleness delta allowance during expression evaluations.") // ---------------------------------------------------------------------------- // Raw data value types. @@ -179,7 +179,7 @@ type ( VectorSelector struct { labelMatchers metric.LabelMatchers // The series iterators are populated at query analysis time. - iterators map[clientmodel.Fingerprint]storage_ng.SeriesIterator + iterators map[clientmodel.Fingerprint]local.SeriesIterator metrics map[clientmodel.Fingerprint]clientmodel.Metric // Fingerprints are populated from label matchers at query analysis time. // TODO: do we still need these? @@ -220,7 +220,7 @@ type ( MatrixSelector struct { labelMatchers metric.LabelMatchers // The series iterators are populated at query analysis time. - iterators map[clientmodel.Fingerprint]storage_ng.SeriesIterator + iterators map[clientmodel.Fingerprint]local.SeriesIterator metrics map[clientmodel.Fingerprint]clientmodel.Metric // Fingerprints are populated from label matchers at query analysis time. // TODO: do we still need these? @@ -366,7 +366,7 @@ func labelsToKey(labels clientmodel.Metric) uint64 { } // EvalVectorInstant evaluates a VectorNode with an instant query. -func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage storage_ng.Storage, queryStats *stats.TimerGroup) (Vector, error) { +func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) if err != nil { return nil, err @@ -376,7 +376,7 @@ func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage } // EvalVectorRange evaluates a VectorNode with a range query. -func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage storage_ng.Storage, queryStats *stats.TimerGroup) (Matrix, error) { +func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) { // Explicitly initialize to an empty matrix since a nil Matrix encodes to // null in JSON. matrix := Matrix{} @@ -538,7 +538,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp) // Samples before target time. if delta < 0 { // Ignore samples outside of staleness policy window. - if -delta > *defaultStalenessDelta { + if -delta > *stalenessDelta { continue } // Ignore samples that are farther away than what we've seen before. @@ -552,7 +552,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp) // Samples after target time. if delta >= 0 { // Ignore samples outside of staleness policy window. - if delta > *defaultStalenessDelta { + if delta > *stalenessDelta { continue } // Ignore samples that are farther away than samples we've seen before. @@ -858,7 +858,7 @@ func NewScalarLiteral(value clientmodel.SampleValue) *ScalarLiteral { func NewVectorSelector(m metric.LabelMatchers) *VectorSelector { return &VectorSelector{ labelMatchers: m, - iterators: map[clientmodel.Fingerprint]storage_ng.SeriesIterator{}, + iterators: map[clientmodel.Fingerprint]local.SeriesIterator{}, metrics: map[clientmodel.Fingerprint]clientmodel.Metric{}, } } @@ -951,7 +951,7 @@ func NewMatrixSelector(vector *VectorSelector, interval time.Duration) *MatrixSe return &MatrixSelector{ labelMatchers: vector.labelMatchers, interval: interval, - iterators: map[clientmodel.Fingerprint]storage_ng.SeriesIterator{}, + iterators: map[clientmodel.Fingerprint]local.SeriesIterator{}, metrics: map[clientmodel.Fingerprint]clientmodel.Metric{}, } } diff --git a/rules/ast/printer.go b/rules/ast/printer.go index f90fc245aa..b32f0b10f4 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -151,7 +151,7 @@ func TypedValueToJSON(data interface{}, typeStr string) string { } // EvalToString evaluates the given node into a string of the given format. -func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage storage_ng.Storage, queryStats *stats.TimerGroup) string { +func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string { prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start() closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) prepareTimer.Stop() @@ -203,7 +203,7 @@ func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputForma } // EvalToVector evaluates the given node into a Vector. Matrices aren't supported. -func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage storage_ng.Storage, queryStats *stats.TimerGroup) (Vector, error) { +func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start() closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) prepareTimer.Stop() diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index bab00c5008..24f4ec5d3d 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -46,13 +46,13 @@ type QueryAnalyzer struct { IntervalRanges IntervalRangeMap // The underlying storage to which the query will be applied. Needed for // extracting timeseries fingerprint information during query analysis. - storage storage_ng.Storage + storage local.Storage } // NewQueryAnalyzer returns a pointer to a newly instantiated // QueryAnalyzer. The storage is needed to extract timeseries // fingerprint information during query analysis. -func NewQueryAnalyzer(storage storage_ng.Storage) *QueryAnalyzer { +func NewQueryAnalyzer(storage local.Storage) *QueryAnalyzer { return &QueryAnalyzer{ FullRanges: FullRangeMap{}, IntervalRanges: IntervalRangeMap{}, @@ -93,7 +93,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { } type iteratorInitializer struct { - storage storage_ng.Storage + storage local.Storage } func (i *iteratorInitializer) Visit(node Node) { @@ -109,7 +109,7 @@ func (i *iteratorInitializer) Visit(node Node) { } } -func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage storage_ng.Storage, queryStats *stats.TimerGroup) (storage_ng.Closer, error) { +func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) { analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start() analyzer := NewQueryAnalyzer(storage) Walk(analyzer, node) @@ -140,7 +140,7 @@ func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage sto return p, nil } -func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage storage_ng.Storage, queryStats *stats.TimerGroup) (storage_ng.Closer, error) { +func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) { analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start() analyzer := NewQueryAnalyzer(storage) Walk(analyzer, node) diff --git a/rules/manager/manager.go b/rules/manager/manager.go index a7b22f0781..6a00e96158 100644 --- a/rules/manager/manager.go +++ b/rules/manager/manager.go @@ -83,7 +83,7 @@ type ruleManager struct { done chan bool interval time.Duration - storage storage_ng.Storage + storage local.Storage results chan<- *extraction.Result notifications chan<- notification.NotificationReqs @@ -93,7 +93,7 @@ type ruleManager struct { type RuleManagerOptions struct { EvaluationInterval time.Duration - Storage storage_ng.Storage + Storage local.Storage Notifications chan<- notification.NotificationReqs Results chan<- *extraction.Result diff --git a/rules/recording.go b/rules/recording.go index 8a259b30c5..f6e5f39f41 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -34,11 +34,11 @@ type RecordingRule struct { func (rule RecordingRule) Name() string { return rule.name } -func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) { +func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) { return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup()) } -func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) { +func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) { // Get the raw value of the rule expression. vector, err := rule.EvalRaw(timestamp, storage) if err != nil { diff --git a/rules/rules.go b/rules/rules.go index b959b9a334..54be96a11a 100644 --- a/rules/rules.go +++ b/rules/rules.go @@ -29,9 +29,9 @@ type Rule interface { Name() string // EvalRaw evaluates the rule's vector expression without triggering any // other actions, like recording or alerting. - EvalRaw(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) + EvalRaw(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) // Eval evaluates the rule, including any associated recording or alerting actions. - Eval(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) + Eval(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) // ToDotGraph returns a Graphviz dot graph of the rule. ToDotGraph() string // String returns a human-readable string representation of the rule. diff --git a/storage/local/chunk.go b/storage/local/chunk.go index feb7e5d042..65eae2c0dc 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "io" @@ -8,9 +8,21 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// chunks is just a chunk slice. No methods are defined for this named type. +// TODO: Perhaps we should remove it? It might avoid errors if it's +// syntactically clear that we are dealing with a vanilly slice and not some +// kind of more complex collection. type chunks []chunk +// chunk is the interface for all chunks. Chunks are generally not +// goroutine-safe. type chunk interface { + // add adds a SamplePair to the chunks, performs any necessary + // re-encoding, and adds any necessary overflow chunks. It returns the + // new version of the original chunk, followed by overflow chunks, if + // any. The first chunk returned might be the same as the original one + // or a newly allocated version. In any case, take the returned chunk as + // the relevant one and discard the orginal chunk. add(*metric.SamplePair) chunks clone() chunk firstTime() clientmodel.Timestamp @@ -18,15 +30,27 @@ type chunk interface { newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error - - // TODO: remove? + // values returns a channel, from which all sample values in the chunk + // can be received in order. The channel is closed after the last + // one. It is generally not safe to mutate the chunk while the channel + // is still open. values() <-chan *metric.SamplePair } +// A chunkIterator enables efficient access to the content of a chunk. It is +// generally not safe to use a chunkIterator concurrently with or after chunk +// mutation. type chunkIterator interface { + // Gets the two values that are immediately adjacent to a given time. In + // case a value exist at precisely the given time, only that single + // value is returned. Only the first or last value is returned (as a + // single value), if the given time is before or after the first or last + // value, respectively. getValueAtTime(clientmodel.Timestamp) metric.Values - getBoundaryValues(metric.Interval) metric.Values + // Gets all values contained within a given interval. getRangeValues(metric.Interval) metric.Values + // Whether a given timestamp is contained between first and last value + // in the chunk. contains(clientmodel.Timestamp) bool } diff --git a/storage/local/codec/codec.go b/storage/local/codec/codec.go index d4e5dc5944..c75b7a1401 100644 --- a/storage/local/codec/codec.go +++ b/storage/local/codec/codec.go @@ -1,9 +1,40 @@ +// Copyright 2014 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package codec provides types that implement encoding.BinaryMarshaler and +// encoding.BinaryUnmarshaler and functions that help to encode and decode +// primitives. The Prometheus storage backend uses them to persist objects to +// files and to save objects in LevelDB. +// +// The encodings used in this package are designed in a way that objects can be +// unmarshaled from a continuous byte stream, i.e. the information when to stop +// reading is determined by the format. No separate termination information is +// needed. +// +// Strings are encoded as the length of their bytes as a varint followed by +// their bytes. +// +// Slices are encoded as their length as a varint followed by their elements. +// +// Maps are encoded as the number of mappings as a varint, followed by the +// mappings, each of which consists of the key followed by the value. package codec import ( "bytes" "encoding" "encoding/binary" + "fmt" "io" "sync" @@ -12,18 +43,26 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// codable implements both, encoding.BinaryMarshaler and +// encoding.BinaryUnmarshaler, which is only needed internally and therefore not +// exported for now. type codable interface { encoding.BinaryMarshaler encoding.BinaryUnmarshaler } +// A byteReader is an io.ByteReader that also implements the vanilla io.Reader +// interface. type byteReader interface { io.Reader io.ByteReader } +// bufPool is a pool for staging buffers. Using a pool allows concurrency-safe +// reuse of buffers var bufPool sync.Pool +// getBuf returns a buffer from the pool. The length of the returned slice is l. func getBuf(l int) []byte { x := bufPool.Get() if x == nil { @@ -36,10 +75,14 @@ func getBuf(l int) []byte { return buf[:l] } +// putBuf returns a buffer to the pool. func putBuf(buf []byte) { bufPool.Put(buf) } +// EncodeVarint encodes an int64 as a varint and writes it to an io.Writer. +// This is a GC-friendly implementation that takes the required staging buffer +// from a buffer pool. func EncodeVarint(w io.Writer, i int64) error { buf := getBuf(binary.MaxVarintLen64) defer putBuf(buf) @@ -49,6 +92,9 @@ func EncodeVarint(w io.Writer, i int64) error { return err } +// EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order. +// This is a GC-friendly implementation that takes the required staging buffer +// from a buffer pool. func EncodeUint64(w io.Writer, u uint64) error { buf := getBuf(8) defer putBuf(buf) @@ -58,6 +104,9 @@ func EncodeUint64(w io.Writer, u uint64) error { return err } +// DecodeUint64 reads an uint64 from an io.Reader in big-endian byte-order. +// This is a GC-friendly implementation that takes the required staging buffer +// from a buffer pool. func DecodeUint64(r io.Reader) (uint64, error) { buf := getBuf(8) defer putBuf(buf) @@ -68,6 +117,8 @@ func DecodeUint64(r io.Reader) (uint64, error) { return binary.BigEndian.Uint64(buf), nil } +// encodeString writes the varint encoded length followed by the bytes of s to +// b. func encodeString(b *bytes.Buffer, s string) error { if err := EncodeVarint(b, int64(len(s))); err != nil { return err @@ -78,6 +129,7 @@ func encodeString(b *bytes.Buffer, s string) error { return nil } +// decodeString decodes a string encoded by encodeString. func decodeString(b byteReader) (string, error) { length, err := binary.ReadVarint(b) if err != nil { @@ -93,8 +145,11 @@ func decodeString(b byteReader) (string, error) { return string(buf), nil } +// A CodableMetric is a clientmodel.Metric that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. type CodableMetric clientmodel.Metric +// MarshalBinary implements encoding.BinaryMarshaler. func (m CodableMetric) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := EncodeVarint(buf, int64(len(m))); err != nil { @@ -111,10 +166,15 @@ func (m CodableMetric) MarshalBinary() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. It can be used with the +// zero value of CodableMetric. func (m *CodableMetric) UnmarshalBinary(buf []byte) error { return m.UnmarshalFromReader(bytes.NewReader(buf)) } +// UnmarshalFromReader unmarshals a CodableMetric from a reader that implements +// both, io.Reader and io.ByteReader. It can be used with the zero value of +// CodableMetric. func (m *CodableMetric) UnmarshalFromReader(r byteReader) error { numLabelPairs, err := binary.ReadVarint(r) if err != nil { @@ -136,56 +196,59 @@ func (m *CodableMetric) UnmarshalFromReader(r byteReader) error { return nil } +// A CodableFingerprint is a clientmodel.Fingerprint that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. The implementation +// depends on clientmodel.Fingerprint to be convertible to uint64. It encodes +// the fingerprint as a big-endian uint64. type CodableFingerprint clientmodel.Fingerprint +// MarshalBinary implements encoding.BinaryMarshaler. func (fp CodableFingerprint) MarshalBinary() ([]byte, error) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(fp)) return b, nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. func (fp *CodableFingerprint) UnmarshalBinary(buf []byte) error { *fp = CodableFingerprint(binary.BigEndian.Uint64(buf)) return nil } +// CodableFingerprints is a clientmodel.Fingerprints that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. type CodableFingerprints clientmodel.Fingerprints +// MarshalBinary implements encoding.BinaryMarshaler. func (fps CodableFingerprints) MarshalBinary() ([]byte, error) { - b := bytes.NewBuffer(make([]byte, 0, binary.MaxVarintLen64+len(fps)*8)) - if err := EncodeVarint(b, int64(len(fps))); err != nil { - return nil, err - } + b := make([]byte, binary.MaxVarintLen64+len(fps)*8) + lenBytes := binary.PutVarint(b, int64(len(fps))) - buf := getBuf(8) - defer putBuf(buf) - - for _, fp := range fps { - binary.BigEndian.PutUint64(buf, uint64(fp)) - if _, err := b.Write(buf[:8]); err != nil { - return nil, err - } + for i, fp := range fps { + binary.BigEndian.PutUint64(b[i*8+lenBytes:], uint64(fp)) } - return b.Bytes(), nil + return b[:len(fps)*8+lenBytes], nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. func (fps *CodableFingerprints) UnmarshalBinary(buf []byte) error { - r := bytes.NewReader(buf) - numFPs, err := binary.ReadVarint(r) - if err != nil { - return err + numFPs, offset := binary.Varint(buf) + if offset <= 0 { + return fmt.Errorf("could not decode length of CodableFingerprints, varint decoding returned %d", offset) } *fps = make(CodableFingerprints, numFPs) - offset := len(buf) - r.Len() - for i, _ := range *fps { + for i := range *fps { (*fps)[i] = clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:])) } return nil } +// CodableLabelPair is a metric.LabelPair that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. type CodableLabelPair metric.LabelPair +// MarshalBinary implements encoding.BinaryMarshaler. func (lp CodableLabelPair) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := encodeString(buf, string(lp.Name)); err != nil { @@ -197,6 +260,7 @@ func (lp CodableLabelPair) MarshalBinary() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) n, err := decodeString(r) @@ -212,8 +276,11 @@ func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error { return nil } +// CodableLabelName is a clientmodel.LabelName that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. type CodableLabelName clientmodel.LabelName +// MarshalBinary implements encoding.BinaryMarshaler. func (l CodableLabelName) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := encodeString(buf, string(l)); err != nil { @@ -222,6 +289,7 @@ func (l CodableLabelName) MarshalBinary() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. func (l *CodableLabelName) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) n, err := decodeString(r) @@ -232,8 +300,11 @@ func (l *CodableLabelName) UnmarshalBinary(buf []byte) error { return nil } +// CodableLabelValues is a clientmodel.LabelValues that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. type CodableLabelValues clientmodel.LabelValues +// MarshalBinary implements encoding.BinaryMarshaler. func (vs CodableLabelValues) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := EncodeVarint(buf, int64(len(vs))); err != nil { @@ -247,6 +318,7 @@ func (vs CodableLabelValues) MarshalBinary() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) numValues, err := binary.ReadVarint(r) @@ -255,7 +327,7 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error { } *vs = make(CodableLabelValues, numValues) - for i, _ := range *vs { + for i := range *vs { v, err := decodeString(r) if err != nil { return err @@ -265,10 +337,13 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error { return nil } +// CodableTimeRange is used to define a time range and implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. type CodableTimeRange struct { First, Last clientmodel.Timestamp } +// MarshalBinary implements encoding.BinaryMarshaler. func (tr CodableTimeRange) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := EncodeVarint(buf, int64(tr.First)); err != nil { @@ -280,6 +355,7 @@ func (tr CodableTimeRange) MarshalBinary() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalBinary implements encoding.BinaryUnmarshaler. func (tr *CodableTimeRange) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) first, err := binary.ReadVarint(r) diff --git a/storage/local/codec/codec_test.go b/storage/local/codec/codec_test.go index 062ae457a3..3dd30193b5 100644 --- a/storage/local/codec/codec_test.go +++ b/storage/local/codec/codec_test.go @@ -49,7 +49,7 @@ func TestCodec(t *testing.T) { if len(fps1) != len(fps2) { return false } - for i, _ := range fps1 { + for i := range fps1 { if fps1[i] != fps2[i] { return false } @@ -84,7 +84,7 @@ func TestCodec(t *testing.T) { if len(lvs1) != len(lvs2) { return false } - for i, _ := range lvs1 { + for i := range lvs1 { if lvs1[i] != lvs2[i] { return false } diff --git a/storage/local/delta.go b/storage/local/delta.go index 83a1fef7e1..6bd326b99a 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "encoding/binary" @@ -12,7 +12,7 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -type deltaBytes int +type deltaBytes byte const ( d0 deltaBytes = 0 @@ -45,11 +45,12 @@ const ( // delta encoding of various types (int, float) and bit width. However, once 8 // bytes would be needed to encode a delta value, a fall-back to the absolute // numbers happens (so that timestamps are saved directly as int64 and values as -// float64). +// float64). It implements the chunk interface. type deltaEncodedChunk struct { buf []byte } +// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk. func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk { buf := make([]byte, deltaHeaderIsIntOffset+1, 1024) @@ -71,6 +72,7 @@ func (c *deltaEncodedChunk) newFollowupChunk() chunk { //return newDeltaEncodedChunk(c.timeBytes(), c.valueBytes(), c.isInt()) } +// clone implements chunk. func (c *deltaEncodedChunk) clone() chunk { buf := make([]byte, len(c.buf), 1024) copy(buf, c.buf) @@ -141,6 +143,7 @@ func (c *deltaEncodedChunk) baseValue() clientmodel.SampleValue { return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[deltaHeaderBaseValueOffset:]))) } +// add implements chunk. func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks { if len(c.buf) < deltaHeaderBytes { c.buf = c.buf[:deltaHeaderBytes] @@ -245,7 +248,7 @@ func (c *deltaEncodedChunk) len() int { return (len(c.buf) - deltaHeaderBytes) / c.sampleSize() } -// TODO: remove? +// values implements chunk. func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair { n := c.len() valuesChan := make(chan *metric.SamplePair) @@ -310,14 +313,17 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } } +// firstTime implements chunk. func (c *deltaEncodedChunk) firstTime() clientmodel.Timestamp { return c.valueAtIndex(0).Timestamp } +// lastTime implements chunk. func (c *deltaEncodedChunk) lastTime() clientmodel.Timestamp { return c.valueAtIndex(c.len() - 1).Timestamp } +// marshal implements chunk. func (c *deltaEncodedChunk) marshal(w io.Writer) error { if len(c.buf) > math.MaxUint16 { panic("chunk buffer length would overflow a 16 bit uint.") @@ -334,6 +340,7 @@ func (c *deltaEncodedChunk) marshal(w io.Writer) error { return nil } +// unmarshal implements chunk. func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { c.buf = c.buf[:cap(c.buf)] readBytes := 0 @@ -348,17 +355,20 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { return nil } +// deltaEncodedChunkIterator implements chunkIterator. type deltaEncodedChunkIterator struct { chunk *deltaEncodedChunk // TODO: add more fields here to keep track of last position. } +// newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { return &deltaEncodedChunkIterator{ chunk: c, } } +// getValueAtTime implements chunkIterator. func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { i := sort.Search(it.chunk.len(), func(i int) bool { return !it.chunk.valueAtIndex(i).Timestamp.Before(t) @@ -378,10 +388,7 @@ func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) met } } -func (it *deltaEncodedChunkIterator) getBoundaryValues(in metric.Interval) metric.Values { - return nil -} - +// getRangeValues implements chunkIterator. func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values { oldest := sort.Search(it.chunk.len(), func(i int) bool { return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive) @@ -402,6 +409,7 @@ func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.V return result } +// contains implements chunkIterator. func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool { return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime()) } diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 4e9b2758ad..2916338fc1 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -1,3 +1,6 @@ +// Package index provides a number of indexes backed by persistent key-value +// stores. The only supported implementation of a key-value store is currently +// goleveldb, but other implementations can easily be added. package index import ( @@ -34,6 +37,10 @@ type FingerprintMetricIndex struct { } // IndexBatch indexes a batch of mappings from fingerprints to metrics. +// +// This method is goroutine-safe, but note that no specific order of execution +// can be guaranteed (especially critical if IndexBatch and UnindexBatch are +// called concurrently). func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { b := i.NewBatch() @@ -45,24 +52,31 @@ func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) er } // UnindexBatch unindexes a batch of mappings from fingerprints to metrics. +// +// This method is goroutine-safe, but note that no specific order of execution +// can be guaranteed (especially critical if IndexBatch and UnindexBatch are +// called concurrently). func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) error { b := i.NewBatch() - for fp, _ := range mapping { + for fp := range mapping { b.Delete(codec.CodableFingerprint(fp)) } return i.Commit(b) } -// Lookup looks up a metric by fingerprint. +// Lookup looks up a metric by fingerprint. Looking up a non-existing +// fingerprint is not an error. In that case, (nil, false, nil) is returned. +// +// This method is goroutine-safe. func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clientmodel.Metric, ok bool, err error) { ok, err = i.Get(codec.CodableFingerprint(fp), (*codec.CodableMetric)(&metric)) return } -// NewFingerprintMetricIndex returns a FingerprintMetricIndex -// object ready to use. +// NewFingerprintMetricIndex returns a LevelDB-backed FingerprintMetricIndex +// ready to use. func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) { fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, fingerprintToMetricDir), @@ -80,33 +94,53 @@ func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) // label values. type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues -// LabelNameLabelValuesIndex models a database mapping label names to -// label values. +// LabelNameLabelValuesIndex is a KeyValueStore that maps existing label names +// to all label values stored for that label name. type LabelNameLabelValuesIndex struct { KeyValueStore } -// IndexBatch implements LabelNameLabelValuesIndex. +// IndexBatch adds a batch of label name to label values mappings to the +// index. A mapping of a label name to an empty slice of label values results in +// a deletion of that mapping from the index. +// +// While this method is fundamentally goroutine-safe, note that the order of +// execution for multiple batches executed concurrently is undefined. Also, it +// is in general not safe to mutate the index while Extend or Reduce are +// running. func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error { batch := i.NewBatch() for name, values := range b { if len(values) == 0 { - batch.Delete(codec.CodableLabelName(name)) + if err := batch.Delete(codec.CodableLabelName(name)); err != nil { + return err + } } else { - batch.Put(codec.CodableLabelName(name), codec.CodableLabelValues(values)) + if err := batch.Put(codec.CodableLabelName(name), codec.CodableLabelValues(values)); err != nil { + return err + } } } return i.Commit(batch) } -// Lookup looks up all label values for a given label name. +// Lookup looks up all label values for a given label name. Looking up a +// non-existing label name is not an error. In that case, (nil, false, nil) is +// returned. +// +// This method is goroutine-safe. func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) { ok, err = i.Get(codec.CodableLabelName(l), (*codec.CodableLabelValues)(&values)) return } +// Extend incorporates the given metric into the index, i.e. it creates new +// label name to label values mappings for new label names, and it extends the +// label values list mapped from already existing label names appropriately. +// +// This method is not goroutine-safe. func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error { b := make(LabelNameLabelValuesMapping, len(m)) for ln, lv := range m { @@ -131,6 +165,16 @@ func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error { return i.IndexBatch(b) } +// Reduce removes label values from the index based on the given label pair to +// fingerprints mapping. The mapping to be passed in here is returned by +// LabelPairFingerprintIndex.Reduce. It contains all the label pairs that have +// now fewer fingerprints mapped to it. This method checks if any label pair has +// arrived at zero mapped fingerprints. In that case, the value of that label +// pair is removed from the list of label values mapped to the name of that +// label pair. Label names that are then mapped to zero label values are removed +// entirely from the index. +// +// This method is not goroutine-safe. func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error { b := make(LabelNameLabelValuesMapping, len(m)) for lp, fps := range m { @@ -164,8 +208,8 @@ func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error return i.IndexBatch(b) } -// NewLabelNameLabelValuesIndex returns a LabelNameLabelValuesIndex -// ready to use. +// NewLabelNameLabelValuesIndex returns a LevelDB-backed +// LabelNameLabelValuesIndex ready to use. func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) { labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, labelNameToLabelValuesDir), @@ -183,13 +227,20 @@ func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, // fingerprints. type LabelPairFingerprintsMapping map[metric.LabelPair]clientmodel.Fingerprints -// LabelPairFingerprintIndex models a database mapping label pairs to -// fingerprints. +// LabelPairFingerprintIndex is a KeyValueStore that maps existing label pairs +// to the fingerprints of all metrics containing those label pairs. type LabelPairFingerprintIndex struct { KeyValueStore } -// IndexBatch indexes a batch of mappings from label pairs to fingerprints. +// IndexBatch indexes a batch of mappings from label pairs to fingerprints. A +// mapping to an empty slice of fingerprints results in deletion of that mapping +// from the index. +// +// While this method is fundamentally goroutine-safe, note that the order of +// execution for multiple batches executed concurrently is undefined. Also, it +// is in general not safe to mutate the index while Extend or Reduce are +// running. func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) error { batch := i.NewBatch() @@ -204,12 +255,21 @@ func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) e return i.Commit(batch) } -// Lookup looks up all fingerprints for a given label pair. +// Lookup looks up all fingerprints for a given label pair. Looking up a +// non-existing label pair is not an error. In that case, (nil, false, nil) is +// returned. +// +// This method is goroutine-safe. func (i *LabelPairFingerprintIndex) Lookup(p metric.LabelPair) (fps clientmodel.Fingerprints, ok bool, err error) { ok, err = i.Get((codec.CodableLabelPair)(p), (*codec.CodableFingerprints)(&fps)) return } +// Extend incorporates the given metric into the index, i.e. it creates new +// label pair to fingerprint mappings for new label pairs, and it extends the +// fingerprint list mapped from already existing label pairs appropriately. +// +// This method is not goroutine-safe. func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel.Fingerprint) error { b := make(LabelPairFingerprintsMapping, len(m)) for ln, lv := range m { @@ -236,6 +296,11 @@ func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel. return i.IndexBatch(b) } +// Reduce removes the given fingerprint from the fingerprint lists mapped from +// the label pairs contained in the given metric. All the updated mappings are +// returned (for consumption by LabelNameLabelValuesIndex.Reduce). +// +// This method is not goroutine-safe. func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel.Fingerprint) (LabelPairFingerprintsMapping, error) { b := make(LabelPairFingerprintsMapping, len(m)) for ln, lv := range m { @@ -262,8 +327,8 @@ func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel. return b, i.IndexBatch(b) } -// NewLabelPairFingerprintIndex returns a LabelPairFingerprintIndex -// object ready to use. +// NewLabelPairFingerprintIndex returns a LevelDB-backed +// LabelPairFingerprintIndex ready to use. func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) { labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, labelPairToFingerprintsDir), @@ -283,7 +348,11 @@ type FingerprintTimeRangeIndex struct { KeyValueStore } -// Lookup returns the time range for the given fingerprint. +// Lookup returns the time range for the given fingerprint. Looking up a +// non-existing fingerprint is not an error. In that case, (0, 0, false, nil) is +// returned. +// +// This method is goroutine-safe. func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTime, lastTime clientmodel.Timestamp, ok bool, err error) { var tr codec.CodableTimeRange ok, err = i.Get(codec.CodableFingerprint(fp), &tr) @@ -291,12 +360,14 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim } // Has returns true if the given fingerprint is present. +// +// This method is goroutine-safe. func (i *FingerprintTimeRangeIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) { return i.KeyValueStore.Has(codec.CodableFingerprint(fp)) } -// NewFingerprintTimeRangeIndex returns a FingerprintTimeRangeIndex object -// ready to use. +// NewFingerprintTimeRangeIndex returns a LevelDB-backed +// FingerprintTimeRangeIndex ready to use. func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, fingerprintTimeRangeDir), diff --git a/storage/local/index/interface.go b/storage/local/index/interface.go index 088088a77e..273313785a 100644 --- a/storage/local/index/interface.go +++ b/storage/local/index/interface.go @@ -2,12 +2,22 @@ package index import "encoding" -// KeyValueStore persists key/value pairs. +// KeyValueStore persists key/value pairs. Implementations must be fundamentally +// goroutine-safe. However, it is the caller's responsibility that keys and +// values can be safely marshaled and unmarshaled (via the MarshalBinary and +// UnmarshalBinary methods of the keys and values). For example, if you call the +// Put method of a KeyValueStore implementation, but the key or the value are +// modified concurrently while being marshaled into its binary representation, +// you obviously have a problem. Methods of KeyValueStore only return after +// (un)marshaling is complete. type KeyValueStore interface { Put(key, value encoding.BinaryMarshaler) error - Get(k encoding.BinaryMarshaler, v encoding.BinaryUnmarshaler) (bool, error) - Has(k encoding.BinaryMarshaler) (has bool, err error) - Delete(k encoding.BinaryMarshaler) error + // Get unmarshals the result into value. It returns false if no entry + // could be found for key. If value is nil, Get behaves like Has. + Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error) + Has(key encoding.BinaryMarshaler) (bool, error) + // Delete returns an error if key does not exist. + Delete(key encoding.BinaryMarshaler) error NewBatch() Batch Commit(b Batch) error @@ -15,7 +25,11 @@ type KeyValueStore interface { Close() error } -// Batch allows KeyValueStore mutations to be pooled and committed together. +// Batch allows KeyValueStore mutations to be pooled and committed together. An +// implementation does not have to be goroutine-safe. Never modify a Batch +// concurrently or commit the same batch multiple times concurrently. Marshaling +// of keys and values is guaranteed to be complete when the Put or Delete methods +// have returned. type Batch interface { Put(key, value encoding.BinaryMarshaler) error Delete(key encoding.BinaryMarshaler) error diff --git a/storage/local/index/leveldb.go b/storage/local/index/leveldb.go index 2133e9e433..fa9455b3fb 100644 --- a/storage/local/index/leveldb.go +++ b/storage/local/index/leveldb.go @@ -16,11 +16,14 @@ type LevelDB struct { writeOpts *opt.WriteOptions } +// LevelDBOptions provides options for a LevelDB. type LevelDBOptions struct { - Path string + Path string // Base path to store files. CacheSizeBytes int } +// NewLevelDB returns a newly allocated LevelDB-backed KeyValueStore ready to +// use. func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) { options := &opt.Options{ Compression: opt.SnappyCompression, @@ -40,16 +43,19 @@ func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) { }, nil } +// NewBatch implements KeyValueStore. func (l *LevelDB) NewBatch() Batch { return &LevelDBBatch{ batch: &leveldb.Batch{}, } } +// Close implements KeyValueStore. func (l *LevelDB) Close() error { return l.storage.Close() } +// Get implements KeyValueStore. func (l *LevelDB) Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error) { k, err := key.MarshalBinary() if err != nil { @@ -68,10 +74,12 @@ func (l *LevelDB) Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarsh return true, value.UnmarshalBinary(raw) } +// Has implements KeyValueStore. func (l *LevelDB) Has(key encoding.BinaryMarshaler) (has bool, err error) { return l.Get(key, nil) } +// Delete implements KeyValueStore. func (l *LevelDB) Delete(key encoding.BinaryMarshaler) error { k, err := key.MarshalBinary() if err != nil { @@ -80,6 +88,7 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) error { return l.storage.Delete(k, l.writeOpts) } +// Put implements KeyValueStore. func (l *LevelDB) Put(key, value encoding.BinaryMarshaler) error { k, err := key.MarshalBinary() if err != nil { @@ -92,6 +101,7 @@ func (l *LevelDB) Put(key, value encoding.BinaryMarshaler) error { return l.storage.Put(k, v, l.writeOpts) } +// Commit implements KeyValueStore. func (l *LevelDB) Commit(b Batch) error { return l.storage.Write(b.(*LevelDBBatch).batch, l.writeOpts) } @@ -101,6 +111,7 @@ type LevelDBBatch struct { batch *leveldb.Batch } +// Put implements Batch. func (b *LevelDBBatch) Put(key, value encoding.BinaryMarshaler) error { k, err := key.MarshalBinary() if err != nil { @@ -114,6 +125,7 @@ func (b *LevelDBBatch) Put(key, value encoding.BinaryMarshaler) error { return nil } +// Delete implements Batch. func (b *LevelDBBatch) Delete(key encoding.BinaryMarshaler) error { k, err := key.MarshalBinary() if err != nil { @@ -123,6 +135,7 @@ func (b *LevelDBBatch) Delete(key encoding.BinaryMarshaler) error { return nil } +// Reset implements Batch. func (b *LevelDBBatch) Reset() { b.batch.Reset() } diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index a3fe2cb0a2..41564e6889 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage_ng +package local import ( "github.com/prometheus/client_golang/prometheus" diff --git a/storage/local/interface.go b/storage/local/interface.go index 0a4dd04ab7..847d13bf34 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( clientmodel "github.com/prometheus/client_golang/model" @@ -8,6 +8,7 @@ import ( // SeriesMap maps fingerprints to memory series. type SeriesMap map[clientmodel.Fingerprint]*memorySeries +// Storage ingests and manages samples, along with various indexes. type Storage interface { // AppendSamples stores a group of new samples. Multiple samples for the same // fingerprint need to be submitted in chronological order, from oldest to @@ -33,18 +34,23 @@ type Storage interface { Close() error } +// SeriesIterator enables efficient access of sample values in a series type SeriesIterator interface { - // Get the two values that are immediately adjacent to a given time. + // Gets the two values that are immediately adjacent to a given time. In + // case a value exist at precisely the given time, only that single + // value is returned. Only the first or last value is returned (as a + // single value), if the given time is before or after the first or last + // value, respectively. GetValueAtTime(clientmodel.Timestamp) metric.Values - // Get the boundary values of an interval: the first value older than - // the interval start, and the first value younger than the interval - // end. + // Gets the boundary values of an interval: the first and last value + // within a given interval. GetBoundaryValues(metric.Interval) metric.Values - // Get all values contained within a provided interval. + // Gets all values contained within a given interval. GetRangeValues(metric.Interval) metric.Values } -// A Persistence stores samples persistently across restarts. +// A Persistence is used by a Storage implementation to store samples +// persistently across restarts. type Persistence interface { // PersistChunk persists a single chunk of a series. PersistChunk(clientmodel.Fingerprint, chunk) error diff --git a/storage/local/persistence.go b/storage/local/persistence.go index c69b78f344..da2826e293 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "bufio" @@ -33,6 +33,12 @@ const ( chunkHeaderLastTimeOffset = 9 ) +const ( + _ = iota + flagChunkDescsLoaded byte = 1 << iota + flagHeadChunkPersisted +) + type diskPersistence struct { basePath string chunkLen int @@ -43,6 +49,7 @@ type diskPersistence struct { labelNameToLabelValues *index.LabelNameLabelValuesIndex } +// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err @@ -226,6 +233,16 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap } for fp, series := range fingerprintToSeries { + var seriesFlags byte + if series.chunkDescsLoaded { + seriesFlags |= flagChunkDescsLoaded + } + if series.headChunkPersisted { + seriesFlags |= flagHeadChunkPersisted + } + if err := w.WriteByte(seriesFlags); err != nil { + return err + } if err := codec.EncodeUint64(w, uint64(fp)); err != nil { return err } @@ -238,7 +255,7 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap return err } for i, chunkDesc := range series.chunkDescs { - if i < len(series.chunkDescs)-1 { + if series.headChunkPersisted || i < len(series.chunkDescs)-1 { if err := codec.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return err } @@ -246,7 +263,7 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap return err } } else { - // This is the head chunk. Fully marshal it. + // This is the non-persisted head chunk. Fully marshal it. if err := w.WriteByte(chunkType(chunkDesc.chunk)); err != nil { return err } @@ -291,6 +308,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { fingerprintToSeries := make(SeriesMap, numSeries) for ; numSeries > 0; numSeries-- { + seriesFlags, err := r.ReadByte() + if err != nil { + return nil, err + } + headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 fp, err := codec.DecodeUint64(r) if err != nil { return nil, err @@ -305,39 +327,42 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { } chunkDescs := make(chunkDescs, numChunkDescs) - for i := int64(0); i < numChunkDescs-1; i++ { - firstTime, err := binary.ReadVarint(r) - if err != nil { - return nil, err + for i := int64(0); i < numChunkDescs; i++ { + if headChunkPersisted || i < numChunkDescs-1 { + firstTime, err := binary.ReadVarint(r) + if err != nil { + return nil, err + } + lastTime, err := binary.ReadVarint(r) + if err != nil { + return nil, err + } + chunkDescs[i] = &chunkDesc{ + firstTimeField: clientmodel.Timestamp(firstTime), + lastTimeField: clientmodel.Timestamp(lastTime), + } + } else { + // Non-persisted head chunk. + chunkType, err := r.ReadByte() + if err != nil { + return nil, err + } + chunk := chunkForType(chunkType) + if err := chunk.unmarshal(r); err != nil { + return nil, err + } + chunkDescs[i] = &chunkDesc{ + chunk: chunk, + refCount: 1, + } } - lastTime, err := binary.ReadVarint(r) - if err != nil { - return nil, err - } - chunkDescs[i] = &chunkDesc{ - firstTimeField: clientmodel.Timestamp(firstTime), - lastTimeField: clientmodel.Timestamp(lastTime), - } - } - - // Head chunk. - chunkType, err := r.ReadByte() - if err != nil { - return nil, err - } - chunk := chunkForType(chunkType) - if err := chunk.unmarshal(r); err != nil { - return nil, err - } - chunkDescs[numChunkDescs-1] = &chunkDesc{ - chunk: chunk, - refCount: 1, } fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{ - metric: clientmodel.Metric(metric), - chunkDescs: chunkDescs, - chunkDescsLoaded: true, + metric: clientmodel.Metric(metric), + chunkDescs: chunkDescs, + chunkDescsLoaded: seriesFlags&flagChunkDescsLoaded != 0, + headChunkPersisted: headChunkPersisted, } } return fingerprintToSeries, nil @@ -401,97 +426,97 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie return false, nil } -func (d *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { +func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { // TODO: Don't do it directly, but add it to a queue (which needs to be // drained before shutdown). Queuing would make this asynchronously, and // then batches could be created easily. - if err := d.labelNameToLabelValues.Extend(m); err != nil { + if err := p.labelNameToLabelValues.Extend(m); err != nil { return err } - return d.labelPairToFingerprints.Extend(m, fp) + return p.labelPairToFingerprints.Extend(m, fp) } -func (d *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { +func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { // TODO: Don't do it directly, but add it to a queue (which needs to be // drained before shutdown). Queuing would make this asynchronously, and // then batches could be created easily. - labelPairs, err := d.labelPairToFingerprints.Reduce(m, fp) + labelPairs, err := p.labelPairToFingerprints.Reduce(m, fp) if err != nil { return err } - return d.labelNameToLabelValues.Reduce(labelPairs) + return p.labelNameToLabelValues.Reduce(labelPairs) } -func (d *diskPersistence) ArchiveMetric( +func (p *diskPersistence) ArchiveMetric( // TODO: Two step process, make sure this happens atomically. fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, ) error { - if err := d.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil { + if err := p.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil { return err } - if err := d.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil { + if err := p.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil { return err } return nil } -func (d *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( +func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) { - firstTime, lastTime, hasMetric, err = d.archivedFingerprintToTimeRange.Lookup(fp) + firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) return } -func (d *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { - metric, _, err := d.archivedFingerprintToMetrics.Lookup(fp) +func (p *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { + metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) return metric, err } -func (d *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { +func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { // TODO: Multi-step process, make sure this happens atomically. - metric, err := d.GetArchivedMetric(fp) + metric, err := p.GetArchivedMetric(fp) if err != nil || metric == nil { return err } - if err := d.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { return err } - if err := d.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { return err } - return d.UnindexMetric(metric, fp) + return p.UnindexMetric(metric, fp) } -func (d *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { +func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { // TODO: Multi-step process, make sure this happens atomically. - has, err := d.archivedFingerprintToTimeRange.Has(fp) + has, err := p.archivedFingerprintToTimeRange.Has(fp) if err != nil || !has { return false, err } - if err := d.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { return false, err } - if err := d.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { return false, err } return true, nil } -func (d *diskPersistence) Close() error { +func (p *diskPersistence) Close() error { var lastError error - if err := d.archivedFingerprintToMetrics.Close(); err != nil { + if err := p.archivedFingerprintToMetrics.Close(); err != nil { lastError = err glog.Error("Error closing archivedFingerprintToMetric index DB: ", err) } - if err := d.archivedFingerprintToTimeRange.Close(); err != nil { + if err := p.archivedFingerprintToTimeRange.Close(); err != nil { lastError = err glog.Error("Error closing archivedFingerprintToTimeRange index DB: ", err) } - if err := d.labelPairToFingerprints.Close(); err != nil { + if err := p.labelPairToFingerprints.Close(); err != nil { lastError = err glog.Error("Error closing labelPairToFingerprints index DB: ", err) } - if err := d.labelNameToLabelValues.Close(); err != nil { + if err := p.labelNameToLabelValues.Close(); err != nil { lastError = err glog.Error("Error closing labelNameToLabelValues index DB: ", err) } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 4ff325b4e2..a82d52be0e 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "sort" @@ -287,7 +287,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet if len(lvs) != len(outLvs) { t.Errorf("%d. different number of label values. Got: %d; want %d", i, len(outLvs), len(lvs)) } - for j, _ := range lvs { + for j := range lvs { if lvs[j] != outLvs[j] { t.Errorf("%d.%d. label values don't match. Got: %s; want %s", i, j, outLvs[j], lvs[j]) } @@ -307,7 +307,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet if len(fps) != len(outFps) { t.Errorf("%d. %v: different number of fingerprints. Got: %d; want %d", i, lp, len(outFps), len(fps)) } - for j, _ := range fps { + for j := range fps { if fps[j] != outFps[j] { t.Errorf("%d.%d. %v: fingerprints don't match. Got: %d; want %d", i, j, lp, outFps[j], fps[j]) } diff --git a/storage/local/preload.go b/storage/local/preload.go index 550746e9e5..3f05c6d783 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage_ng +package local import ( clientmodel "github.com/prometheus/client_golang/model" diff --git a/storage/local/series.go b/storage/local/series.go index 4ee1045b92..cf5c03c298 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "sort" @@ -102,25 +102,18 @@ type memorySeries struct { metric clientmodel.Metric // Sorted by start time, overlapping chunk ranges are forbidden. chunkDescs chunkDescs - // Whether chunkDescs for chunks on disk are loaded. Even if false, a head - // chunk could be present. In that case, its chunkDesc will be the - // only one in chunkDescs. + // Whether chunkDescs for chunks on disk are all loaded. If false, some + // (or all) chunkDescs are only on disk. These chunks are all contiguous + // and at the tail end. chunkDescsLoaded bool + // Whether the current head chunk has already been persisted. If true, + // the current head chunk must not be modified anymore. + headChunkPersisted bool } func newMemorySeries(m clientmodel.Metric) *memorySeries { return &memorySeries{ - metric: m, - // TODO: should we set this to nil initially and only create a chunk when - // adding? But right now, we also only call newMemorySeries when adding, so - // it turns out to be the same. - chunkDescs: chunkDescs{ - // TODO: should there be a newChunkDesc() function? - &chunkDesc{ - chunk: newDeltaEncodedChunk(d1, d0, true), - refCount: 1, - }, - }, + metric: m, chunkDescsLoaded: true, } } @@ -129,6 +122,15 @@ func (s *memorySeries) add(v *metric.SamplePair, persistQueue chan *persistReque s.mtx.Lock() defer s.mtx.Unlock() + if len(s.chunkDescs) == 0 || s.headChunkPersisted { + newHead := &chunkDesc{ + chunk: newDeltaEncodedChunk(d1, d0, true), + refCount: 1, + } + s.chunkDescs = append(s.chunkDescs, newHead) + s.headChunkPersisted = false + } + chunks := s.head().add(v) s.head().chunk = chunks[0] @@ -356,6 +358,7 @@ func (s *memorySeries) lastTime() clientmodel.Timestamp { return s.head().lastTime() } +// GetValueAtTime implements SeriesIterator. func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { it.mtx.Lock() defer it.mtx.Unlock() @@ -404,8 +407,52 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V } func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { - // TODO: implement real GetBoundaryValues here. - return it.GetRangeValues(in) + it.mtx.Lock() + defer it.mtx.Unlock() + + // Find the first relevant chunk. + i := sort.Search(len(it.chunks), func(i int) bool { + return !it.chunks[i].lastTime().Before(in.OldestInclusive) + }) + values := metric.Values{} + for ; i < len(it.chunks); i++ { + c := it.chunks[i] + var chunkIt chunkIterator + if c.firstTime().After(in.NewestInclusive) { + if len(values) == 1 { + // We found the first value already, but are now + // already past the last value. The value we + // want must be the last value of the previous + // chunk. So backtrack... + chunkIt = it.chunks[i-1].newIterator() + values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0]) + } + break + } + if len(values) == 0 { + chunkIt = c.newIterator() + firstValues := chunkIt.getValueAtTime(in.OldestInclusive) + switch len(firstValues) { + case 2: + values = append(values, firstValues[1]) + case 1: + values = firstValues + default: + panic("unexpected return from getValueAtTime") + } + } + if c.lastTime().After(in.NewestInclusive) { + if chunkIt == nil { + chunkIt = c.newIterator() + } + values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0]) + break + } + } + if len(values) == 2 && values[0].Equal(&values[1]) { + return values[:1] + } + return values } func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values { diff --git a/storage/local/storage.go b/storage/local/storage.go index d855e28bc1..587928d870 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -1,6 +1,8 @@ -package storage_ng +// Package local contains the local time series storage used by Prometheus. +package local import ( + "fmt" "sync" "time" @@ -39,14 +41,19 @@ type memorySeriesStorage struct { persistence Persistence } +// MemorySeriesStorageOptions contains options needed by +// NewMemorySeriesStorage. It is not safe to leave any of those at their zero +// values. type MemorySeriesStorageOptions struct { - Persistence Persistence - MemoryEvictionInterval time.Duration - MemoryRetentionPeriod time.Duration - PersistencePurgeInterval time.Duration - PersistenceRetentionPeriod time.Duration + Persistence Persistence // Used to persist storage content across restarts. + MemoryEvictionInterval time.Duration // How often to check for memory eviction. + MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory. + PersistencePurgeInterval time.Duration // How often to check for purging. + PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. } +// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still +// has to be called to start the storage. func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { glog.Info("Loading series map and head chunks...") fingerprintToSeries, err := o.Persistence.LoadSeriesMapAndHeads() @@ -122,8 +129,10 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySer // The series existed before, had been archived at some // point, and has now been unarchived, i.e. it has // chunks on disk. Set chunkDescsLoaded accordingly so - // that they will be looked at later. + // that they will be looked at later. Also, an + // unarchived series comes with a persisted head chunk. series.chunkDescsLoaded = false + series.headChunkPersisted = true } else { // This was a genuinely new series, so index the metric. if err := s.persistence.IndexMetric(m, fp); err != nil { @@ -145,12 +154,27 @@ func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts */ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) { + stalenessDelta := 300 * time.Second // TODO: Turn into parameter. + s.mtx.RLock() series, ok := s.fingerprintToSeries[fp] s.mtx.RUnlock() if !ok { - panic("requested preload for non-existent series") + has, first, last, err := s.persistence.HasArchivedMetric(fp) + if err != nil { + return nil, err + } + if !has { + return nil, fmt.Errorf("requested preload for non-existent series %v", fp) + } + if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { + metric, err := s.persistence.GetArchivedMetric(fp) + if err != nil { + return nil, err + } + series = s.getOrCreateSeries(metric) + } } return series.preloadChunksForRange(from, through, s.persistence) } @@ -195,7 +219,6 @@ func recordPersist(start time.Time, err error) { } func (s *memorySeriesStorage) handlePersistQueue() { - // TODO: Perhaps move this into Persistence? for req := range s.persistQueue { // TODO: Make this thread-safe? persistQueueLength.Set(float64(len(s.persistQueue))) @@ -266,13 +289,20 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { } s.mtx.RUnlock() + ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod) + + // TODO: Add archived fps: + // - Add iterator interface for KeyValueStore. + // - Iterate over s.persistence.archivedFingerprintToTimeRange. + // - If timeRange extends before ts, add fp to fps. + for _, fp := range fps { select { case <-stop: glog.Info("Interrupted running series purge.") return default: - s.purgeSeries(fp) + s.purgeSeries(fp, ts) } } glog.Info("Done purging old series data.") @@ -283,9 +313,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { // purgeSeries purges chunks older than persistenceRetentionPeriod from a // series. If the series contains no chunks after the purge, it is dropped // entirely. -func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) { - ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod) - +func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { s.mtx.Lock() // TODO: This is a lock FAR to coarse! However, we cannot lock using the // memorySeries since we might have none (for series that are on disk @@ -303,14 +331,14 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) { defer s.mtx.Unlock() // First purge persisted chunks. We need to do that anyway. - allDropped, err := s.persistence.DropChunks(fp, ts) + allDropped, err := s.persistence.DropChunks(fp, beforeTime) if err != nil { glog.Error("Error purging persisted chunks: ", err) } // Purge chunks from memory accordingly. if series, ok := s.fingerprintToSeries[fp]; ok { - if series.purgeOlderThan(ts) { + if series.purgeOlderThan(beforeTime) { delete(s.fingerprintToSeries, fp) if err := s.persistence.UnindexMetric(series.metric, fp); err != nil { glog.Errorf("Error unindexing metric %v: %v", series.metric, err) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 23bee2505f..56c75faaae 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "fmt" diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index f657188502..8d39618ddc 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -1,4 +1,4 @@ -package storage_ng +package local import ( "testing" @@ -17,6 +17,9 @@ func (t *testStorageCloser) Close() { t.directory.Close() } +// NewTestStorage creates a storage instance backed by files in a temporary +// directory. The returned storage is already in serving state. Upon closing the +// returned test.Closer, the temporary directory is cleaned up. func NewTestStorage(t testing.TB) (Storage, test.Closer) { directory := test.NewTemporaryDirectory("test_storage", t) persistence, err := NewDiskPersistence(directory.Path(), 1024) diff --git a/templates/templates.go b/templates/templates.go index 1f8c8f04a3..308e1e4de6 100644 --- a/templates/templates.go +++ b/templates/templates.go @@ -57,7 +57,7 @@ func (q queryResultByLabelSorter) Swap(i, j int) { q.results[i], q.results[j] = q.results[j], q.results[i] } -func query(q string, timestamp clientmodel.Timestamp, storage storage_ng.Storage) (queryResult, error) { +func query(q string, timestamp clientmodel.Timestamp, storage local.Storage) (queryResult, error) { exprNode, err := rules.LoadExprFromString(q) if err != nil { return nil, err @@ -91,7 +91,7 @@ type templateExpander struct { funcMap text_template.FuncMap } -func NewTemplateExpander(text string, name string, data interface{}, timestamp clientmodel.Timestamp, storage storage_ng.Storage) *templateExpander { +func NewTemplateExpander(text string, name string, data interface{}, timestamp clientmodel.Timestamp, storage local.Storage) *templateExpander { return &templateExpander{ text: text, name: name, diff --git a/web/api/api.go b/web/api/api.go index 3ebd431d9c..13d24db746 100644 --- a/web/api/api.go +++ b/web/api/api.go @@ -29,7 +29,7 @@ type MetricsService struct { time utility.Time Config *config.Config TargetManager retrieval.TargetManager - Storage storage_ng.Storage + Storage local.Storage } func (msrv *MetricsService) RegisterHandler() { diff --git a/web/consoles.go b/web/consoles.go index e4e8d7f350..d3fe9da9ad 100644 --- a/web/consoles.go +++ b/web/consoles.go @@ -32,7 +32,7 @@ var ( ) type ConsolesHandler struct { - Storage storage_ng.Storage + Storage local.Storage } func (h *ConsolesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {