Major code cleanup.

- Make it go-vet and golint clean.
- Add comments, TODOs, etc.

Change-Id: If1392d96f3d5b4cdde597b10c8dff1769fcfabe2
This commit is contained in:
Bjoern Rabenstein 2014-09-16 15:47:24 +02:00
parent 3592dc2359
commit f5f9f3514a
27 changed files with 514 additions and 200 deletions

View file

@ -71,7 +71,7 @@ type prometheus struct {
ruleManager manager.RuleManager
targetManager retrieval.TargetManager
notifications chan notification.NotificationReqs
storage storage_ng.Storage
storage local.Storage
remoteTSDBQueue *remote.TSDBQueueManager
closeOnce sync.Once
@ -137,20 +137,19 @@ func main() {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
persistence, err := storage_ng.NewDiskPersistence(*metricsStoragePath, 1024)
persistence, err := local.NewDiskPersistence(*metricsStoragePath, 1024)
if err != nil {
glog.Fatal("Error opening disk persistence: ", err)
}
defer persistence.Close()
o := &storage_ng.MemorySeriesStorageOptions{
o := &local.MemorySeriesStorageOptions{
Persistence: persistence,
MemoryEvictionInterval: *memoryEvictionInterval,
MemoryRetentionPeriod: *memoryRetentionPeriod,
PersistencePurgeInterval: *storagePurgeInterval,
PersistenceRetentionPeriod: *storageRetentionPeriod,
}
memStorage, err := storage_ng.NewMemorySeriesStorage(o)
memStorage, err := local.NewMemorySeriesStorage(o)
if err != nil {
glog.Fatal("Error opening memory series storage: ", err)
}

View file

@ -118,11 +118,11 @@ func (rule *AlertingRule) Name() string {
return rule.name
}
func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) {
func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.Vector, timestamp, storage, stats.NewTimerGroup())
}
func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) {
func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) {
// Get the raw value of the rule expression.
exprResult, err := rule.EvalRaw(timestamp, storage)
if err != nil {

View file

@ -25,11 +25,11 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
)
var defaultStalenessDelta = flag.Duration("defaultStalenessDelta", 300*time.Second, "Default staleness delta allowance in seconds during expression evaluations.")
var stalenessDelta = flag.Duration("stalenessDelta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
// ----------------------------------------------------------------------------
// Raw data value types.
@ -179,7 +179,7 @@ type (
VectorSelector struct {
labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]storage_ng.SeriesIterator
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.Metric
// Fingerprints are populated from label matchers at query analysis time.
// TODO: do we still need these?
@ -220,7 +220,7 @@ type (
MatrixSelector struct {
labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]storage_ng.SeriesIterator
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.Metric
// Fingerprints are populated from label matchers at query analysis time.
// TODO: do we still need these?
@ -366,7 +366,7 @@ func labelsToKey(labels clientmodel.Metric) uint64 {
}
// EvalVectorInstant evaluates a VectorNode with an instant query.
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage storage_ng.Storage, queryStats *stats.TimerGroup) (Vector, error) {
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
if err != nil {
return nil, err
@ -376,7 +376,7 @@ func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage
}
// EvalVectorRange evaluates a VectorNode with a range query.
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage storage_ng.Storage, queryStats *stats.TimerGroup) (Matrix, error) {
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) {
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON.
matrix := Matrix{}
@ -538,7 +538,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp)
// Samples before target time.
if delta < 0 {
// Ignore samples outside of staleness policy window.
if -delta > *defaultStalenessDelta {
if -delta > *stalenessDelta {
continue
}
// Ignore samples that are farther away than what we've seen before.
@ -552,7 +552,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp)
// Samples after target time.
if delta >= 0 {
// Ignore samples outside of staleness policy window.
if delta > *defaultStalenessDelta {
if delta > *stalenessDelta {
continue
}
// Ignore samples that are farther away than samples we've seen before.
@ -858,7 +858,7 @@ func NewScalarLiteral(value clientmodel.SampleValue) *ScalarLiteral {
func NewVectorSelector(m metric.LabelMatchers) *VectorSelector {
return &VectorSelector{
labelMatchers: m,
iterators: map[clientmodel.Fingerprint]storage_ng.SeriesIterator{},
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.Metric{},
}
}
@ -951,7 +951,7 @@ func NewMatrixSelector(vector *VectorSelector, interval time.Duration) *MatrixSe
return &MatrixSelector{
labelMatchers: vector.labelMatchers,
interval: interval,
iterators: map[clientmodel.Fingerprint]storage_ng.SeriesIterator{},
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.Metric{},
}
}

View file

@ -151,7 +151,7 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
}
// EvalToString evaluates the given node into a string of the given format.
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage storage_ng.Storage, queryStats *stats.TimerGroup) string {
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string {
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
prepareTimer.Stop()
@ -203,7 +203,7 @@ func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputForma
}
// EvalToVector evaluates the given node into a Vector. Matrices aren't supported.
func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage storage_ng.Storage, queryStats *stats.TimerGroup) (Vector, error) {
func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
prepareTimer.Stop()

View file

@ -46,13 +46,13 @@ type QueryAnalyzer struct {
IntervalRanges IntervalRangeMap
// The underlying storage to which the query will be applied. Needed for
// extracting timeseries fingerprint information during query analysis.
storage storage_ng.Storage
storage local.Storage
}
// NewQueryAnalyzer returns a pointer to a newly instantiated
// QueryAnalyzer. The storage is needed to extract timeseries
// fingerprint information during query analysis.
func NewQueryAnalyzer(storage storage_ng.Storage) *QueryAnalyzer {
func NewQueryAnalyzer(storage local.Storage) *QueryAnalyzer {
return &QueryAnalyzer{
FullRanges: FullRangeMap{},
IntervalRanges: IntervalRangeMap{},
@ -93,7 +93,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) {
}
type iteratorInitializer struct {
storage storage_ng.Storage
storage local.Storage
}
func (i *iteratorInitializer) Visit(node Node) {
@ -109,7 +109,7 @@ func (i *iteratorInitializer) Visit(node Node) {
}
}
func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage storage_ng.Storage, queryStats *stats.TimerGroup) (storage_ng.Closer, error) {
func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage)
Walk(analyzer, node)
@ -140,7 +140,7 @@ func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage sto
return p, nil
}
func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage storage_ng.Storage, queryStats *stats.TimerGroup) (storage_ng.Closer, error) {
func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage)
Walk(analyzer, node)

View file

@ -83,7 +83,7 @@ type ruleManager struct {
done chan bool
interval time.Duration
storage storage_ng.Storage
storage local.Storage
results chan<- *extraction.Result
notifications chan<- notification.NotificationReqs
@ -93,7 +93,7 @@ type ruleManager struct {
type RuleManagerOptions struct {
EvaluationInterval time.Duration
Storage storage_ng.Storage
Storage local.Storage
Notifications chan<- notification.NotificationReqs
Results chan<- *extraction.Result

View file

@ -34,11 +34,11 @@ type RecordingRule struct {
func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) {
func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
}
func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error) {
func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error) {
// Get the raw value of the rule expression.
vector, err := rule.EvalRaw(timestamp, storage)
if err != nil {

View file

@ -29,9 +29,9 @@ type Rule interface {
Name() string
// EvalRaw evaluates the rule's vector expression without triggering any
// other actions, like recording or alerting.
EvalRaw(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error)
EvalRaw(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error)
// Eval evaluates the rule, including any associated recording or alerting actions.
Eval(timestamp clientmodel.Timestamp, storage storage_ng.Storage) (ast.Vector, error)
Eval(timestamp clientmodel.Timestamp, storage local.Storage) (ast.Vector, error)
// ToDotGraph returns a Graphviz dot graph of the rule.
ToDotGraph() string
// String returns a human-readable string representation of the rule.

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"io"
@ -8,9 +8,21 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// chunks is just a chunk slice. No methods are defined for this named type.
// TODO: Perhaps we should remove it? It might avoid errors if it's
// syntactically clear that we are dealing with a vanilly slice and not some
// kind of more complex collection.
type chunks []chunk
// chunk is the interface for all chunks. Chunks are generally not
// goroutine-safe.
type chunk interface {
// add adds a SamplePair to the chunks, performs any necessary
// re-encoding, and adds any necessary overflow chunks. It returns the
// new version of the original chunk, followed by overflow chunks, if
// any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the orginal chunk.
add(*metric.SamplePair) chunks
clone() chunk
firstTime() clientmodel.Timestamp
@ -18,15 +30,27 @@ type chunk interface {
newIterator() chunkIterator
marshal(io.Writer) error
unmarshal(io.Reader) error
// TODO: remove?
// values returns a channel, from which all sample values in the chunk
// can be received in order. The channel is closed after the last
// one. It is generally not safe to mutate the chunk while the channel
// is still open.
values() <-chan *metric.SamplePair
}
// A chunkIterator enables efficient access to the content of a chunk. It is
// generally not safe to use a chunkIterator concurrently with or after chunk
// mutation.
type chunkIterator interface {
// Gets the two values that are immediately adjacent to a given time. In
// case a value exist at precisely the given time, only that single
// value is returned. Only the first or last value is returned (as a
// single value), if the given time is before or after the first or last
// value, respectively.
getValueAtTime(clientmodel.Timestamp) metric.Values
getBoundaryValues(metric.Interval) metric.Values
// Gets all values contained within a given interval.
getRangeValues(metric.Interval) metric.Values
// Whether a given timestamp is contained between first and last value
// in the chunk.
contains(clientmodel.Timestamp) bool
}

View file

@ -1,9 +1,40 @@
// Copyright 2014 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package codec provides types that implement encoding.BinaryMarshaler and
// encoding.BinaryUnmarshaler and functions that help to encode and decode
// primitives. The Prometheus storage backend uses them to persist objects to
// files and to save objects in LevelDB.
//
// The encodings used in this package are designed in a way that objects can be
// unmarshaled from a continuous byte stream, i.e. the information when to stop
// reading is determined by the format. No separate termination information is
// needed.
//
// Strings are encoded as the length of their bytes as a varint followed by
// their bytes.
//
// Slices are encoded as their length as a varint followed by their elements.
//
// Maps are encoded as the number of mappings as a varint, followed by the
// mappings, each of which consists of the key followed by the value.
package codec
import (
"bytes"
"encoding"
"encoding/binary"
"fmt"
"io"
"sync"
@ -12,18 +43,26 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// codable implements both, encoding.BinaryMarshaler and
// encoding.BinaryUnmarshaler, which is only needed internally and therefore not
// exported for now.
type codable interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
// A byteReader is an io.ByteReader that also implements the vanilla io.Reader
// interface.
type byteReader interface {
io.Reader
io.ByteReader
}
// bufPool is a pool for staging buffers. Using a pool allows concurrency-safe
// reuse of buffers
var bufPool sync.Pool
// getBuf returns a buffer from the pool. The length of the returned slice is l.
func getBuf(l int) []byte {
x := bufPool.Get()
if x == nil {
@ -36,10 +75,14 @@ func getBuf(l int) []byte {
return buf[:l]
}
// putBuf returns a buffer to the pool.
func putBuf(buf []byte) {
bufPool.Put(buf)
}
// EncodeVarint encodes an int64 as a varint and writes it to an io.Writer.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.
func EncodeVarint(w io.Writer, i int64) error {
buf := getBuf(binary.MaxVarintLen64)
defer putBuf(buf)
@ -49,6 +92,9 @@ func EncodeVarint(w io.Writer, i int64) error {
return err
}
// EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.
func EncodeUint64(w io.Writer, u uint64) error {
buf := getBuf(8)
defer putBuf(buf)
@ -58,6 +104,9 @@ func EncodeUint64(w io.Writer, u uint64) error {
return err
}
// DecodeUint64 reads an uint64 from an io.Reader in big-endian byte-order.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.
func DecodeUint64(r io.Reader) (uint64, error) {
buf := getBuf(8)
defer putBuf(buf)
@ -68,6 +117,8 @@ func DecodeUint64(r io.Reader) (uint64, error) {
return binary.BigEndian.Uint64(buf), nil
}
// encodeString writes the varint encoded length followed by the bytes of s to
// b.
func encodeString(b *bytes.Buffer, s string) error {
if err := EncodeVarint(b, int64(len(s))); err != nil {
return err
@ -78,6 +129,7 @@ func encodeString(b *bytes.Buffer, s string) error {
return nil
}
// decodeString decodes a string encoded by encodeString.
func decodeString(b byteReader) (string, error) {
length, err := binary.ReadVarint(b)
if err != nil {
@ -93,8 +145,11 @@ func decodeString(b byteReader) (string, error) {
return string(buf), nil
}
// A CodableMetric is a clientmodel.Metric that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableMetric clientmodel.Metric
// MarshalBinary implements encoding.BinaryMarshaler.
func (m CodableMetric) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(m))); err != nil {
@ -111,10 +166,15 @@ func (m CodableMetric) MarshalBinary() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler. It can be used with the
// zero value of CodableMetric.
func (m *CodableMetric) UnmarshalBinary(buf []byte) error {
return m.UnmarshalFromReader(bytes.NewReader(buf))
}
// UnmarshalFromReader unmarshals a CodableMetric from a reader that implements
// both, io.Reader and io.ByteReader. It can be used with the zero value of
// CodableMetric.
func (m *CodableMetric) UnmarshalFromReader(r byteReader) error {
numLabelPairs, err := binary.ReadVarint(r)
if err != nil {
@ -136,56 +196,59 @@ func (m *CodableMetric) UnmarshalFromReader(r byteReader) error {
return nil
}
// A CodableFingerprint is a clientmodel.Fingerprint that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. The implementation
// depends on clientmodel.Fingerprint to be convertible to uint64. It encodes
// the fingerprint as a big-endian uint64.
type CodableFingerprint clientmodel.Fingerprint
// MarshalBinary implements encoding.BinaryMarshaler.
func (fp CodableFingerprint) MarshalBinary() ([]byte, error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fp))
return b, nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (fp *CodableFingerprint) UnmarshalBinary(buf []byte) error {
*fp = CodableFingerprint(binary.BigEndian.Uint64(buf))
return nil
}
// CodableFingerprints is a clientmodel.Fingerprints that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableFingerprints clientmodel.Fingerprints
// MarshalBinary implements encoding.BinaryMarshaler.
func (fps CodableFingerprints) MarshalBinary() ([]byte, error) {
b := bytes.NewBuffer(make([]byte, 0, binary.MaxVarintLen64+len(fps)*8))
if err := EncodeVarint(b, int64(len(fps))); err != nil {
return nil, err
}
b := make([]byte, binary.MaxVarintLen64+len(fps)*8)
lenBytes := binary.PutVarint(b, int64(len(fps)))
buf := getBuf(8)
defer putBuf(buf)
for _, fp := range fps {
binary.BigEndian.PutUint64(buf, uint64(fp))
if _, err := b.Write(buf[:8]); err != nil {
return nil, err
}
for i, fp := range fps {
binary.BigEndian.PutUint64(b[i*8+lenBytes:], uint64(fp))
}
return b.Bytes(), nil
return b[:len(fps)*8+lenBytes], nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (fps *CodableFingerprints) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
numFPs, err := binary.ReadVarint(r)
if err != nil {
return err
numFPs, offset := binary.Varint(buf)
if offset <= 0 {
return fmt.Errorf("could not decode length of CodableFingerprints, varint decoding returned %d", offset)
}
*fps = make(CodableFingerprints, numFPs)
offset := len(buf) - r.Len()
for i, _ := range *fps {
for i := range *fps {
(*fps)[i] = clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:]))
}
return nil
}
// CodableLabelPair is a metric.LabelPair that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableLabelPair metric.LabelPair
// MarshalBinary implements encoding.BinaryMarshaler.
func (lp CodableLabelPair) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := encodeString(buf, string(lp.Name)); err != nil {
@ -197,6 +260,7 @@ func (lp CodableLabelPair) MarshalBinary() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
n, err := decodeString(r)
@ -212,8 +276,11 @@ func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error {
return nil
}
// CodableLabelName is a clientmodel.LabelName that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableLabelName clientmodel.LabelName
// MarshalBinary implements encoding.BinaryMarshaler.
func (l CodableLabelName) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := encodeString(buf, string(l)); err != nil {
@ -222,6 +289,7 @@ func (l CodableLabelName) MarshalBinary() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (l *CodableLabelName) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
n, err := decodeString(r)
@ -232,8 +300,11 @@ func (l *CodableLabelName) UnmarshalBinary(buf []byte) error {
return nil
}
// CodableLabelValues is a clientmodel.LabelValues that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableLabelValues clientmodel.LabelValues
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs CodableLabelValues) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(vs))); err != nil {
@ -247,6 +318,7 @@ func (vs CodableLabelValues) MarshalBinary() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
numValues, err := binary.ReadVarint(r)
@ -255,7 +327,7 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error {
}
*vs = make(CodableLabelValues, numValues)
for i, _ := range *vs {
for i := range *vs {
v, err := decodeString(r)
if err != nil {
return err
@ -265,10 +337,13 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error {
return nil
}
// CodableTimeRange is used to define a time range and implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableTimeRange struct {
First, Last clientmodel.Timestamp
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (tr CodableTimeRange) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(tr.First)); err != nil {
@ -280,6 +355,7 @@ func (tr CodableTimeRange) MarshalBinary() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (tr *CodableTimeRange) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
first, err := binary.ReadVarint(r)

View file

@ -49,7 +49,7 @@ func TestCodec(t *testing.T) {
if len(fps1) != len(fps2) {
return false
}
for i, _ := range fps1 {
for i := range fps1 {
if fps1[i] != fps2[i] {
return false
}
@ -84,7 +84,7 @@ func TestCodec(t *testing.T) {
if len(lvs1) != len(lvs2) {
return false
}
for i, _ := range lvs1 {
for i := range lvs1 {
if lvs1[i] != lvs2[i] {
return false
}

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"encoding/binary"
@ -12,7 +12,7 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
type deltaBytes int
type deltaBytes byte
const (
d0 deltaBytes = 0
@ -45,11 +45,12 @@ const (
// delta encoding of various types (int, float) and bit width. However, once 8
// bytes would be needed to encode a delta value, a fall-back to the absolute
// numbers happens (so that timestamps are saved directly as int64 and values as
// float64).
// float64). It implements the chunk interface.
type deltaEncodedChunk struct {
buf []byte
}
// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk.
func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk {
buf := make([]byte, deltaHeaderIsIntOffset+1, 1024)
@ -71,6 +72,7 @@ func (c *deltaEncodedChunk) newFollowupChunk() chunk {
//return newDeltaEncodedChunk(c.timeBytes(), c.valueBytes(), c.isInt())
}
// clone implements chunk.
func (c *deltaEncodedChunk) clone() chunk {
buf := make([]byte, len(c.buf), 1024)
copy(buf, c.buf)
@ -141,6 +143,7 @@ func (c *deltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[deltaHeaderBaseValueOffset:])))
}
// add implements chunk.
func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks {
if len(c.buf) < deltaHeaderBytes {
c.buf = c.buf[:deltaHeaderBytes]
@ -245,7 +248,7 @@ func (c *deltaEncodedChunk) len() int {
return (len(c.buf) - deltaHeaderBytes) / c.sampleSize()
}
// TODO: remove?
// values implements chunk.
func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len()
valuesChan := make(chan *metric.SamplePair)
@ -310,14 +313,17 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
}
}
// firstTime implements chunk.
func (c *deltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.valueAtIndex(0).Timestamp
}
// lastTime implements chunk.
func (c *deltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// marshal implements chunk.
func (c *deltaEncodedChunk) marshal(w io.Writer) error {
if len(c.buf) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
@ -334,6 +340,7 @@ func (c *deltaEncodedChunk) marshal(w io.Writer) error {
return nil
}
// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
c.buf = c.buf[:cap(c.buf)]
readBytes := 0
@ -348,17 +355,20 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
return nil
}
// deltaEncodedChunkIterator implements chunkIterator.
type deltaEncodedChunkIterator struct {
chunk *deltaEncodedChunk
// TODO: add more fields here to keep track of last position.
}
// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
return &deltaEncodedChunkIterator{
chunk: c,
}
}
// getValueAtTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
i := sort.Search(it.chunk.len(), func(i int) bool {
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
@ -378,10 +388,7 @@ func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) met
}
}
func (it *deltaEncodedChunkIterator) getBoundaryValues(in metric.Interval) metric.Values {
return nil
}
// getRangeValues implements chunkIterator.
func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
oldest := sort.Search(it.chunk.len(), func(i int) bool {
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
@ -402,6 +409,7 @@ func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.V
return result
}
// contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
}

View file

@ -1,3 +1,6 @@
// Package index provides a number of indexes backed by persistent key-value
// stores. The only supported implementation of a key-value store is currently
// goleveldb, but other implementations can easily be added.
package index
import (
@ -34,6 +37,10 @@ type FingerprintMetricIndex struct {
}
// IndexBatch indexes a batch of mappings from fingerprints to metrics.
//
// This method is goroutine-safe, but note that no specific order of execution
// can be guaranteed (especially critical if IndexBatch and UnindexBatch are
// called concurrently).
func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := i.NewBatch()
@ -45,24 +52,31 @@ func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) er
}
// UnindexBatch unindexes a batch of mappings from fingerprints to metrics.
//
// This method is goroutine-safe, but note that no specific order of execution
// can be guaranteed (especially critical if IndexBatch and UnindexBatch are
// called concurrently).
func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) error {
b := i.NewBatch()
for fp, _ := range mapping {
for fp := range mapping {
b.Delete(codec.CodableFingerprint(fp))
}
return i.Commit(b)
}
// Lookup looks up a metric by fingerprint.
// Lookup looks up a metric by fingerprint. Looking up a non-existing
// fingerprint is not an error. In that case, (nil, false, nil) is returned.
//
// This method is goroutine-safe.
func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clientmodel.Metric, ok bool, err error) {
ok, err = i.Get(codec.CodableFingerprint(fp), (*codec.CodableMetric)(&metric))
return
}
// NewFingerprintMetricIndex returns a FingerprintMetricIndex
// object ready to use.
// NewFingerprintMetricIndex returns a LevelDB-backed FingerprintMetricIndex
// ready to use.
func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) {
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintToMetricDir),
@ -80,33 +94,53 @@ func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error)
// label values.
type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues
// LabelNameLabelValuesIndex models a database mapping label names to
// label values.
// LabelNameLabelValuesIndex is a KeyValueStore that maps existing label names
// to all label values stored for that label name.
type LabelNameLabelValuesIndex struct {
KeyValueStore
}
// IndexBatch implements LabelNameLabelValuesIndex.
// IndexBatch adds a batch of label name to label values mappings to the
// index. A mapping of a label name to an empty slice of label values results in
// a deletion of that mapping from the index.
//
// While this method is fundamentally goroutine-safe, note that the order of
// execution for multiple batches executed concurrently is undefined. Also, it
// is in general not safe to mutate the index while Extend or Reduce are
// running.
func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error {
batch := i.NewBatch()
for name, values := range b {
if len(values) == 0 {
batch.Delete(codec.CodableLabelName(name))
if err := batch.Delete(codec.CodableLabelName(name)); err != nil {
return err
}
} else {
batch.Put(codec.CodableLabelName(name), codec.CodableLabelValues(values))
if err := batch.Put(codec.CodableLabelName(name), codec.CodableLabelValues(values)); err != nil {
return err
}
}
}
return i.Commit(batch)
}
// Lookup looks up all label values for a given label name.
// Lookup looks up all label values for a given label name. Looking up a
// non-existing label name is not an error. In that case, (nil, false, nil) is
// returned.
//
// This method is goroutine-safe.
func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) {
ok, err = i.Get(codec.CodableLabelName(l), (*codec.CodableLabelValues)(&values))
return
}
// Extend incorporates the given metric into the index, i.e. it creates new
// label name to label values mappings for new label names, and it extends the
// label values list mapped from already existing label names appropriately.
//
// This method is not goroutine-safe.
func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error {
b := make(LabelNameLabelValuesMapping, len(m))
for ln, lv := range m {
@ -131,6 +165,16 @@ func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error {
return i.IndexBatch(b)
}
// Reduce removes label values from the index based on the given label pair to
// fingerprints mapping. The mapping to be passed in here is returned by
// LabelPairFingerprintIndex.Reduce. It contains all the label pairs that have
// now fewer fingerprints mapped to it. This method checks if any label pair has
// arrived at zero mapped fingerprints. In that case, the value of that label
// pair is removed from the list of label values mapped to the name of that
// label pair. Label names that are then mapped to zero label values are removed
// entirely from the index.
//
// This method is not goroutine-safe.
func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error {
b := make(LabelNameLabelValuesMapping, len(m))
for lp, fps := range m {
@ -164,8 +208,8 @@ func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error
return i.IndexBatch(b)
}
// NewLabelNameLabelValuesIndex returns a LabelNameLabelValuesIndex
// ready to use.
// NewLabelNameLabelValuesIndex returns a LevelDB-backed
// LabelNameLabelValuesIndex ready to use.
func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) {
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelNameToLabelValuesDir),
@ -183,13 +227,20 @@ func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex,
// fingerprints.
type LabelPairFingerprintsMapping map[metric.LabelPair]clientmodel.Fingerprints
// LabelPairFingerprintIndex models a database mapping label pairs to
// fingerprints.
// LabelPairFingerprintIndex is a KeyValueStore that maps existing label pairs
// to the fingerprints of all metrics containing those label pairs.
type LabelPairFingerprintIndex struct {
KeyValueStore
}
// IndexBatch indexes a batch of mappings from label pairs to fingerprints.
// IndexBatch indexes a batch of mappings from label pairs to fingerprints. A
// mapping to an empty slice of fingerprints results in deletion of that mapping
// from the index.
//
// While this method is fundamentally goroutine-safe, note that the order of
// execution for multiple batches executed concurrently is undefined. Also, it
// is in general not safe to mutate the index while Extend or Reduce are
// running.
func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) error {
batch := i.NewBatch()
@ -204,12 +255,21 @@ func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) e
return i.Commit(batch)
}
// Lookup looks up all fingerprints for a given label pair.
// Lookup looks up all fingerprints for a given label pair. Looking up a
// non-existing label pair is not an error. In that case, (nil, false, nil) is
// returned.
//
// This method is goroutine-safe.
func (i *LabelPairFingerprintIndex) Lookup(p metric.LabelPair) (fps clientmodel.Fingerprints, ok bool, err error) {
ok, err = i.Get((codec.CodableLabelPair)(p), (*codec.CodableFingerprints)(&fps))
return
}
// Extend incorporates the given metric into the index, i.e. it creates new
// label pair to fingerprint mappings for new label pairs, and it extends the
// fingerprint list mapped from already existing label pairs appropriately.
//
// This method is not goroutine-safe.
func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
b := make(LabelPairFingerprintsMapping, len(m))
for ln, lv := range m {
@ -236,6 +296,11 @@ func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel.
return i.IndexBatch(b)
}
// Reduce removes the given fingerprint from the fingerprint lists mapped from
// the label pairs contained in the given metric. All the updated mappings are
// returned (for consumption by LabelNameLabelValuesIndex.Reduce).
//
// This method is not goroutine-safe.
func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel.Fingerprint) (LabelPairFingerprintsMapping, error) {
b := make(LabelPairFingerprintsMapping, len(m))
for ln, lv := range m {
@ -262,8 +327,8 @@ func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel.
return b, i.IndexBatch(b)
}
// NewLabelPairFingerprintIndex returns a LabelPairFingerprintIndex
// object ready to use.
// NewLabelPairFingerprintIndex returns a LevelDB-backed
// LabelPairFingerprintIndex ready to use.
func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) {
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelPairToFingerprintsDir),
@ -283,7 +348,11 @@ type FingerprintTimeRangeIndex struct {
KeyValueStore
}
// Lookup returns the time range for the given fingerprint.
// Lookup returns the time range for the given fingerprint. Looking up a
// non-existing fingerprint is not an error. In that case, (0, 0, false, nil) is
// returned.
//
// This method is goroutine-safe.
func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTime, lastTime clientmodel.Timestamp, ok bool, err error) {
var tr codec.CodableTimeRange
ok, err = i.Get(codec.CodableFingerprint(fp), &tr)
@ -291,12 +360,14 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim
}
// Has returns true if the given fingerprint is present.
//
// This method is goroutine-safe.
func (i *FingerprintTimeRangeIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) {
return i.KeyValueStore.Has(codec.CodableFingerprint(fp))
}
// NewFingerprintTimeRangeIndex returns a FingerprintTimeRangeIndex object
// ready to use.
// NewFingerprintTimeRangeIndex returns a LevelDB-backed
// FingerprintTimeRangeIndex ready to use.
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {
fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintTimeRangeDir),

View file

@ -2,12 +2,22 @@ package index
import "encoding"
// KeyValueStore persists key/value pairs.
// KeyValueStore persists key/value pairs. Implementations must be fundamentally
// goroutine-safe. However, it is the caller's responsibility that keys and
// values can be safely marshaled and unmarshaled (via the MarshalBinary and
// UnmarshalBinary methods of the keys and values). For example, if you call the
// Put method of a KeyValueStore implementation, but the key or the value are
// modified concurrently while being marshaled into its binary representation,
// you obviously have a problem. Methods of KeyValueStore only return after
// (un)marshaling is complete.
type KeyValueStore interface {
Put(key, value encoding.BinaryMarshaler) error
Get(k encoding.BinaryMarshaler, v encoding.BinaryUnmarshaler) (bool, error)
Has(k encoding.BinaryMarshaler) (has bool, err error)
Delete(k encoding.BinaryMarshaler) error
// Get unmarshals the result into value. It returns false if no entry
// could be found for key. If value is nil, Get behaves like Has.
Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error)
Has(key encoding.BinaryMarshaler) (bool, error)
// Delete returns an error if key does not exist.
Delete(key encoding.BinaryMarshaler) error
NewBatch() Batch
Commit(b Batch) error
@ -15,7 +25,11 @@ type KeyValueStore interface {
Close() error
}
// Batch allows KeyValueStore mutations to be pooled and committed together.
// Batch allows KeyValueStore mutations to be pooled and committed together. An
// implementation does not have to be goroutine-safe. Never modify a Batch
// concurrently or commit the same batch multiple times concurrently. Marshaling
// of keys and values is guaranteed to be complete when the Put or Delete methods
// have returned.
type Batch interface {
Put(key, value encoding.BinaryMarshaler) error
Delete(key encoding.BinaryMarshaler) error

View file

@ -16,11 +16,14 @@ type LevelDB struct {
writeOpts *opt.WriteOptions
}
// LevelDBOptions provides options for a LevelDB.
type LevelDBOptions struct {
Path string
Path string // Base path to store files.
CacheSizeBytes int
}
// NewLevelDB returns a newly allocated LevelDB-backed KeyValueStore ready to
// use.
func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) {
options := &opt.Options{
Compression: opt.SnappyCompression,
@ -40,16 +43,19 @@ func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) {
}, nil
}
// NewBatch implements KeyValueStore.
func (l *LevelDB) NewBatch() Batch {
return &LevelDBBatch{
batch: &leveldb.Batch{},
}
}
// Close implements KeyValueStore.
func (l *LevelDB) Close() error {
return l.storage.Close()
}
// Get implements KeyValueStore.
func (l *LevelDB) Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error) {
k, err := key.MarshalBinary()
if err != nil {
@ -68,10 +74,12 @@ func (l *LevelDB) Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarsh
return true, value.UnmarshalBinary(raw)
}
// Has implements KeyValueStore.
func (l *LevelDB) Has(key encoding.BinaryMarshaler) (has bool, err error) {
return l.Get(key, nil)
}
// Delete implements KeyValueStore.
func (l *LevelDB) Delete(key encoding.BinaryMarshaler) error {
k, err := key.MarshalBinary()
if err != nil {
@ -80,6 +88,7 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) error {
return l.storage.Delete(k, l.writeOpts)
}
// Put implements KeyValueStore.
func (l *LevelDB) Put(key, value encoding.BinaryMarshaler) error {
k, err := key.MarshalBinary()
if err != nil {
@ -92,6 +101,7 @@ func (l *LevelDB) Put(key, value encoding.BinaryMarshaler) error {
return l.storage.Put(k, v, l.writeOpts)
}
// Commit implements KeyValueStore.
func (l *LevelDB) Commit(b Batch) error {
return l.storage.Write(b.(*LevelDBBatch).batch, l.writeOpts)
}
@ -101,6 +111,7 @@ type LevelDBBatch struct {
batch *leveldb.Batch
}
// Put implements Batch.
func (b *LevelDBBatch) Put(key, value encoding.BinaryMarshaler) error {
k, err := key.MarshalBinary()
if err != nil {
@ -114,6 +125,7 @@ func (b *LevelDBBatch) Put(key, value encoding.BinaryMarshaler) error {
return nil
}
// Delete implements Batch.
func (b *LevelDBBatch) Delete(key encoding.BinaryMarshaler) error {
k, err := key.MarshalBinary()
if err != nil {
@ -123,6 +135,7 @@ func (b *LevelDBBatch) Delete(key encoding.BinaryMarshaler) error {
return nil
}
// Reset implements Batch.
func (b *LevelDBBatch) Reset() {
b.batch.Reset()
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage_ng
package local
import (
"github.com/prometheus/client_golang/prometheus"

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
clientmodel "github.com/prometheus/client_golang/model"
@ -8,6 +8,7 @@ import (
// SeriesMap maps fingerprints to memory series.
type SeriesMap map[clientmodel.Fingerprint]*memorySeries
// Storage ingests and manages samples, along with various indexes.
type Storage interface {
// AppendSamples stores a group of new samples. Multiple samples for the same
// fingerprint need to be submitted in chronological order, from oldest to
@ -33,18 +34,23 @@ type Storage interface {
Close() error
}
// SeriesIterator enables efficient access of sample values in a series
type SeriesIterator interface {
// Get the two values that are immediately adjacent to a given time.
// Gets the two values that are immediately adjacent to a given time. In
// case a value exist at precisely the given time, only that single
// value is returned. Only the first or last value is returned (as a
// single value), if the given time is before or after the first or last
// value, respectively.
GetValueAtTime(clientmodel.Timestamp) metric.Values
// Get the boundary values of an interval: the first value older than
// the interval start, and the first value younger than the interval
// end.
// Gets the boundary values of an interval: the first and last value
// within a given interval.
GetBoundaryValues(metric.Interval) metric.Values
// Get all values contained within a provided interval.
// Gets all values contained within a given interval.
GetRangeValues(metric.Interval) metric.Values
}
// A Persistence stores samples persistently across restarts.
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts.
type Persistence interface {
// PersistChunk persists a single chunk of a series.
PersistChunk(clientmodel.Fingerprint, chunk) error

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"bufio"
@ -33,6 +33,12 @@ const (
chunkHeaderLastTimeOffset = 9
)
const (
_ = iota
flagChunkDescsLoaded byte = 1 << iota
flagHeadChunkPersisted
)
type diskPersistence struct {
basePath string
chunkLen int
@ -43,6 +49,7 @@ type diskPersistence struct {
labelNameToLabelValues *index.LabelNameLabelValuesIndex
}
// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use.
func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
if err := os.MkdirAll(basePath, 0700); err != nil {
return nil, err
@ -226,6 +233,16 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
}
for fp, series := range fingerprintToSeries {
var seriesFlags byte
if series.chunkDescsLoaded {
seriesFlags |= flagChunkDescsLoaded
}
if series.headChunkPersisted {
seriesFlags |= flagHeadChunkPersisted
}
if err := w.WriteByte(seriesFlags); err != nil {
return err
}
if err := codec.EncodeUint64(w, uint64(fp)); err != nil {
return err
}
@ -238,7 +255,7 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
return err
}
for i, chunkDesc := range series.chunkDescs {
if i < len(series.chunkDescs)-1 {
if series.headChunkPersisted || i < len(series.chunkDescs)-1 {
if err := codec.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return err
}
@ -246,7 +263,7 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
return err
}
} else {
// This is the head chunk. Fully marshal it.
// This is the non-persisted head chunk. Fully marshal it.
if err := w.WriteByte(chunkType(chunkDesc.chunk)); err != nil {
return err
}
@ -291,6 +308,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
fingerprintToSeries := make(SeriesMap, numSeries)
for ; numSeries > 0; numSeries-- {
seriesFlags, err := r.ReadByte()
if err != nil {
return nil, err
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codec.DecodeUint64(r)
if err != nil {
return nil, err
@ -305,39 +327,42 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
}
chunkDescs := make(chunkDescs, numChunkDescs)
for i := int64(0); i < numChunkDescs-1; i++ {
firstTime, err := binary.ReadVarint(r)
if err != nil {
return nil, err
for i := int64(0); i < numChunkDescs; i++ {
if headChunkPersisted || i < numChunkDescs-1 {
firstTime, err := binary.ReadVarint(r)
if err != nil {
return nil, err
}
lastTime, err := binary.ReadVarint(r)
if err != nil {
return nil, err
}
chunkDescs[i] = &chunkDesc{
firstTimeField: clientmodel.Timestamp(firstTime),
lastTimeField: clientmodel.Timestamp(lastTime),
}
} else {
// Non-persisted head chunk.
chunkType, err := r.ReadByte()
if err != nil {
return nil, err
}
chunk := chunkForType(chunkType)
if err := chunk.unmarshal(r); err != nil {
return nil, err
}
chunkDescs[i] = &chunkDesc{
chunk: chunk,
refCount: 1,
}
}
lastTime, err := binary.ReadVarint(r)
if err != nil {
return nil, err
}
chunkDescs[i] = &chunkDesc{
firstTimeField: clientmodel.Timestamp(firstTime),
lastTimeField: clientmodel.Timestamp(lastTime),
}
}
// Head chunk.
chunkType, err := r.ReadByte()
if err != nil {
return nil, err
}
chunk := chunkForType(chunkType)
if err := chunk.unmarshal(r); err != nil {
return nil, err
}
chunkDescs[numChunkDescs-1] = &chunkDesc{
chunk: chunk,
refCount: 1,
}
fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{
metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs,
chunkDescsLoaded: true,
metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs,
chunkDescsLoaded: seriesFlags&flagChunkDescsLoaded != 0,
headChunkPersisted: headChunkPersisted,
}
}
return fingerprintToSeries, nil
@ -401,97 +426,97 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie
return false, nil
}
func (d *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
// TODO: Don't do it directly, but add it to a queue (which needs to be
// drained before shutdown). Queuing would make this asynchronously, and
// then batches could be created easily.
if err := d.labelNameToLabelValues.Extend(m); err != nil {
if err := p.labelNameToLabelValues.Extend(m); err != nil {
return err
}
return d.labelPairToFingerprints.Extend(m, fp)
return p.labelPairToFingerprints.Extend(m, fp)
}
func (d *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
// TODO: Don't do it directly, but add it to a queue (which needs to be
// drained before shutdown). Queuing would make this asynchronously, and
// then batches could be created easily.
labelPairs, err := d.labelPairToFingerprints.Reduce(m, fp)
labelPairs, err := p.labelPairToFingerprints.Reduce(m, fp)
if err != nil {
return err
}
return d.labelNameToLabelValues.Reduce(labelPairs)
return p.labelNameToLabelValues.Reduce(labelPairs)
}
func (d *diskPersistence) ArchiveMetric(
func (p *diskPersistence) ArchiveMetric(
// TODO: Two step process, make sure this happens atomically.
fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp,
) error {
if err := d.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil {
if err := p.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil {
return err
}
if err := d.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil {
if err := p.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil {
return err
}
return nil
}
func (d *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) (
func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
) {
firstTime, lastTime, hasMetric, err = d.archivedFingerprintToTimeRange.Lookup(fp)
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
return
}
func (d *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
metric, _, err := d.archivedFingerprintToMetrics.Lookup(fp)
func (p *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
return metric, err
}
func (d *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error {
func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error {
// TODO: Multi-step process, make sure this happens atomically.
metric, err := d.GetArchivedMetric(fp)
metric, err := p.GetArchivedMetric(fp)
if err != nil || metric == nil {
return err
}
if err := d.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil {
return err
}
if err := d.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil {
return err
}
return d.UnindexMetric(metric, fp)
return p.UnindexMetric(metric, fp)
}
func (d *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) {
func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) {
// TODO: Multi-step process, make sure this happens atomically.
has, err := d.archivedFingerprintToTimeRange.Has(fp)
has, err := p.archivedFingerprintToTimeRange.Has(fp)
if err != nil || !has {
return false, err
}
if err := d.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil {
return false, err
}
if err := d.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil {
return false, err
}
return true, nil
}
func (d *diskPersistence) Close() error {
func (p *diskPersistence) Close() error {
var lastError error
if err := d.archivedFingerprintToMetrics.Close(); err != nil {
if err := p.archivedFingerprintToMetrics.Close(); err != nil {
lastError = err
glog.Error("Error closing archivedFingerprintToMetric index DB: ", err)
}
if err := d.archivedFingerprintToTimeRange.Close(); err != nil {
if err := p.archivedFingerprintToTimeRange.Close(); err != nil {
lastError = err
glog.Error("Error closing archivedFingerprintToTimeRange index DB: ", err)
}
if err := d.labelPairToFingerprints.Close(); err != nil {
if err := p.labelPairToFingerprints.Close(); err != nil {
lastError = err
glog.Error("Error closing labelPairToFingerprints index DB: ", err)
}
if err := d.labelNameToLabelValues.Close(); err != nil {
if err := p.labelNameToLabelValues.Close(); err != nil {
lastError = err
glog.Error("Error closing labelNameToLabelValues index DB: ", err)
}

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"sort"
@ -287,7 +287,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
if len(lvs) != len(outLvs) {
t.Errorf("%d. different number of label values. Got: %d; want %d", i, len(outLvs), len(lvs))
}
for j, _ := range lvs {
for j := range lvs {
if lvs[j] != outLvs[j] {
t.Errorf("%d.%d. label values don't match. Got: %s; want %s", i, j, outLvs[j], lvs[j])
}
@ -307,7 +307,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
if len(fps) != len(outFps) {
t.Errorf("%d. %v: different number of fingerprints. Got: %d; want %d", i, lp, len(outFps), len(fps))
}
for j, _ := range fps {
for j := range fps {
if fps[j] != outFps[j] {
t.Errorf("%d.%d. %v: fingerprints don't match. Got: %d; want %d", i, j, lp, outFps[j], fps[j])
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage_ng
package local
import (
clientmodel "github.com/prometheus/client_golang/model"

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"sort"
@ -102,25 +102,18 @@ type memorySeries struct {
metric clientmodel.Metric
// Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs chunkDescs
// Whether chunkDescs for chunks on disk are loaded. Even if false, a head
// chunk could be present. In that case, its chunkDesc will be the
// only one in chunkDescs.
// Whether chunkDescs for chunks on disk are all loaded. If false, some
// (or all) chunkDescs are only on disk. These chunks are all contiguous
// and at the tail end.
chunkDescsLoaded bool
// Whether the current head chunk has already been persisted. If true,
// the current head chunk must not be modified anymore.
headChunkPersisted bool
}
func newMemorySeries(m clientmodel.Metric) *memorySeries {
return &memorySeries{
metric: m,
// TODO: should we set this to nil initially and only create a chunk when
// adding? But right now, we also only call newMemorySeries when adding, so
// it turns out to be the same.
chunkDescs: chunkDescs{
// TODO: should there be a newChunkDesc() function?
&chunkDesc{
chunk: newDeltaEncodedChunk(d1, d0, true),
refCount: 1,
},
},
metric: m,
chunkDescsLoaded: true,
}
}
@ -129,6 +122,15 @@ func (s *memorySeries) add(v *metric.SamplePair, persistQueue chan *persistReque
s.mtx.Lock()
defer s.mtx.Unlock()
if len(s.chunkDescs) == 0 || s.headChunkPersisted {
newHead := &chunkDesc{
chunk: newDeltaEncodedChunk(d1, d0, true),
refCount: 1,
}
s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkPersisted = false
}
chunks := s.head().add(v)
s.head().chunk = chunks[0]
@ -356,6 +358,7 @@ func (s *memorySeries) lastTime() clientmodel.Timestamp {
return s.head().lastTime()
}
// GetValueAtTime implements SeriesIterator.
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
it.mtx.Lock()
defer it.mtx.Unlock()
@ -404,8 +407,52 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
}
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
// TODO: implement real GetBoundaryValues here.
return it.GetRangeValues(in)
it.mtx.Lock()
defer it.mtx.Unlock()
// Find the first relevant chunk.
i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
})
values := metric.Values{}
for ; i < len(it.chunks); i++ {
c := it.chunks[i]
var chunkIt chunkIterator
if c.firstTime().After(in.NewestInclusive) {
if len(values) == 1 {
// We found the first value already, but are now
// already past the last value. The value we
// want must be the last value of the previous
// chunk. So backtrack...
chunkIt = it.chunks[i-1].newIterator()
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
}
break
}
if len(values) == 0 {
chunkIt = c.newIterator()
firstValues := chunkIt.getValueAtTime(in.OldestInclusive)
switch len(firstValues) {
case 2:
values = append(values, firstValues[1])
case 1:
values = firstValues
default:
panic("unexpected return from getValueAtTime")
}
}
if c.lastTime().After(in.NewestInclusive) {
if chunkIt == nil {
chunkIt = c.newIterator()
}
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
break
}
}
if len(values) == 2 && values[0].Equal(&values[1]) {
return values[:1]
}
return values
}
func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values {

View file

@ -1,6 +1,8 @@
package storage_ng
// Package local contains the local time series storage used by Prometheus.
package local
import (
"fmt"
"sync"
"time"
@ -39,14 +41,19 @@ type memorySeriesStorage struct {
persistence Persistence
}
// MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values.
type MemorySeriesStorageOptions struct {
Persistence Persistence
MemoryEvictionInterval time.Duration
MemoryRetentionPeriod time.Duration
PersistencePurgeInterval time.Duration
PersistenceRetentionPeriod time.Duration
Persistence Persistence // Used to persist storage content across restarts.
MemoryEvictionInterval time.Duration // How often to check for memory eviction.
MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory.
PersistencePurgeInterval time.Duration // How often to check for purging.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
glog.Info("Loading series map and head chunks...")
fingerprintToSeries, err := o.Persistence.LoadSeriesMapAndHeads()
@ -122,8 +129,10 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySer
// The series existed before, had been archived at some
// point, and has now been unarchived, i.e. it has
// chunks on disk. Set chunkDescsLoaded accordingly so
// that they will be looked at later.
// that they will be looked at later. Also, an
// unarchived series comes with a persisted head chunk.
series.chunkDescsLoaded = false
series.headChunkPersisted = true
} else {
// This was a genuinely new series, so index the metric.
if err := s.persistence.IndexMetric(m, fp); err != nil {
@ -145,12 +154,27 @@ func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts
*/
func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) {
stalenessDelta := 300 * time.Second // TODO: Turn into parameter.
s.mtx.RLock()
series, ok := s.fingerprintToSeries[fp]
s.mtx.RUnlock()
if !ok {
panic("requested preload for non-existent series")
has, first, last, err := s.persistence.HasArchivedMetric(fp)
if err != nil {
return nil, err
}
if !has {
return nil, fmt.Errorf("requested preload for non-existent series %v", fp)
}
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
metric, err := s.persistence.GetArchivedMetric(fp)
if err != nil {
return nil, err
}
series = s.getOrCreateSeries(metric)
}
}
return series.preloadChunksForRange(from, through, s.persistence)
}
@ -195,7 +219,6 @@ func recordPersist(start time.Time, err error) {
}
func (s *memorySeriesStorage) handlePersistQueue() {
// TODO: Perhaps move this into Persistence?
for req := range s.persistQueue {
// TODO: Make this thread-safe?
persistQueueLength.Set(float64(len(s.persistQueue)))
@ -266,13 +289,20 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
}
s.mtx.RUnlock()
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
// TODO: Add archived fps:
// - Add iterator interface for KeyValueStore.
// - Iterate over s.persistence.archivedFingerprintToTimeRange.
// - If timeRange extends before ts, add fp to fps.
for _, fp := range fps {
select {
case <-stop:
glog.Info("Interrupted running series purge.")
return
default:
s.purgeSeries(fp)
s.purgeSeries(fp, ts)
}
}
glog.Info("Done purging old series data.")
@ -283,9 +313,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
// purgeSeries purges chunks older than persistenceRetentionPeriod from a
// series. If the series contains no chunks after the purge, it is dropped
// entirely.
func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) {
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
s.mtx.Lock()
// TODO: This is a lock FAR to coarse! However, we cannot lock using the
// memorySeries since we might have none (for series that are on disk
@ -303,14 +331,14 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) {
defer s.mtx.Unlock()
// First purge persisted chunks. We need to do that anyway.
allDropped, err := s.persistence.DropChunks(fp, ts)
allDropped, err := s.persistence.DropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error purging persisted chunks: ", err)
}
// Purge chunks from memory accordingly.
if series, ok := s.fingerprintToSeries[fp]; ok {
if series.purgeOlderThan(ts) {
if series.purgeOlderThan(beforeTime) {
delete(s.fingerprintToSeries, fp)
if err := s.persistence.UnindexMetric(series.metric, fp); err != nil {
glog.Errorf("Error unindexing metric %v: %v", series.metric, err)

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"fmt"

View file

@ -1,4 +1,4 @@
package storage_ng
package local
import (
"testing"
@ -17,6 +17,9 @@ func (t *testStorageCloser) Close() {
t.directory.Close()
}
// NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t testing.TB) (Storage, test.Closer) {
directory := test.NewTemporaryDirectory("test_storage", t)
persistence, err := NewDiskPersistence(directory.Path(), 1024)

View file

@ -57,7 +57,7 @@ func (q queryResultByLabelSorter) Swap(i, j int) {
q.results[i], q.results[j] = q.results[j], q.results[i]
}
func query(q string, timestamp clientmodel.Timestamp, storage storage_ng.Storage) (queryResult, error) {
func query(q string, timestamp clientmodel.Timestamp, storage local.Storage) (queryResult, error) {
exprNode, err := rules.LoadExprFromString(q)
if err != nil {
return nil, err
@ -91,7 +91,7 @@ type templateExpander struct {
funcMap text_template.FuncMap
}
func NewTemplateExpander(text string, name string, data interface{}, timestamp clientmodel.Timestamp, storage storage_ng.Storage) *templateExpander {
func NewTemplateExpander(text string, name string, data interface{}, timestamp clientmodel.Timestamp, storage local.Storage) *templateExpander {
return &templateExpander{
text: text,
name: name,

View file

@ -29,7 +29,7 @@ type MetricsService struct {
time utility.Time
Config *config.Config
TargetManager retrieval.TargetManager
Storage storage_ng.Storage
Storage local.Storage
}
func (msrv *MetricsService) RegisterHandler() {

View file

@ -32,7 +32,7 @@ var (
)
type ConsolesHandler struct {
Storage storage_ng.Storage
Storage local.Storage
}
func (h *ConsolesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {