diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index a6da45373..27d126820 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/web" @@ -149,7 +150,7 @@ func init() { "If set, a crash recovery will perform checks on each series file. This might take a very long time.", ) cfg.fs.Var( - &local.DefaultChunkEncoding, "storage.local.chunk-encoding-version", + &chunk.DefaultEncoding, "storage.local.chunk-encoding-version", "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding), 1 (double-delta encoding), and 2 (double-delta encoding with variable bit-width).", ) // Index cache sizes. diff --git a/storage/local/chunk.go b/storage/local/chunk/chunk.go similarity index 63% rename from storage/local/chunk.go rename to storage/local/chunk/chunk.go index 0075a1a37..d26237394 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk/chunk.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import ( "container/list" @@ -27,28 +27,37 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -// DefaultChunkEncoding can be changed via a flag. -var DefaultChunkEncoding = DoubleDelta +// ChunkLen is the length of a chunk in bytes. +const ChunkLen = 1024 + +// DefaultEncoding can be changed via a flag. +var DefaultEncoding = DoubleDelta var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") -// ChunkEncoding defintes which encoding we are using, delta, doubledelta, or varbit -type ChunkEncoding byte +// EvictRequest is a request to evict a chunk from memory. +type EvictRequest struct { + Desc *Desc + Evict bool +} + +// Encoding defines which encoding we are using, delta, doubledelta, or varbit +type Encoding byte // String implements flag.Value. -func (ce ChunkEncoding) String() string { - return fmt.Sprintf("%d", ce) +func (e Encoding) String() string { + return fmt.Sprintf("%d", e) } // Set implements flag.Value. -func (ce *ChunkEncoding) Set(s string) error { +func (e *Encoding) Set(s string) error { switch s { case "0": - *ce = Delta + *e = Delta case "1": - *ce = DoubleDelta + *e = DoubleDelta case "2": - *ce = Varbit + *e = Varbit default: return fmt.Errorf("invalid chunk encoding: %s", s) } @@ -57,194 +66,192 @@ func (ce *ChunkEncoding) Set(s string) error { const ( // Delta encoding - Delta ChunkEncoding = iota + Delta Encoding = iota // DoubleDelta encoding DoubleDelta // Varbit encoding Varbit ) -// ChunkDesc contains meta-data for a chunk. Pay special attention to the +// Desc contains meta-data for a chunk. Pay special attention to the // documented requirements for calling its methods concurrently (WRT pinning and // locking). The doc comments spell out the requirements for each method, but // here is an overview and general explanation: // // Everything that changes the pinning of the underlying chunk or deals with its -// eviction is protected by a mutex. This affects the following methods: pin, -// unpin, refCount, isEvicted, maybeEvict. These methods can be called at any +// eviction is protected by a mutex. This affects the following methods: Pin, +// Unpin, RefCount, IsEvicted, MaybeEvict. These methods can be called at any // time without further prerequisites. // // Another group of methods acts on (or sets) the underlying chunk. These // methods involve no locking. They may only be called if the caller has pinned // the chunk (to guarantee the chunk is not evicted concurrently). Also, the // caller must make sure nobody else will call these methods concurrently, -// either by holding the sole reference to the chunkDesc (usually during loading -// or creation) or by locking the fingerprint of the series the chunkDesc -// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk. +// either by holding the sole reference to the Desc (usually during loading +// or creation) or by locking the fingerprint of the series the Desc +// belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk. // -// Finally, there are the special cases firstTime and lastTime. lastTime requires +// Finally, there are the special cases FirstTime and LastTime. LastTime requires // to have locked the fingerprint of the series but the chunk does not need to -// be pinned. That's because the chunkLastTime field in chunkDesc gets populated +// be pinned. That's because the ChunkLastTime field in Desc gets populated // upon completion of the chunk (when it is still pinned, and which happens // while the series's fingerprint is locked). Once that has happened, calling -// lastTime does not require the chunk to be loaded anymore. Before that has -// happened, the chunk is pinned anyway. The chunkFirstTime field in chunkDesc -// is populated upon creation of a chunkDesc, so it is alway safe to call -// firstTime. The firstTime method is arguably not needed and only there for -// consistency with lastTime. -type ChunkDesc struct { +// LastTime does not require the chunk to be loaded anymore. Before that has +// happened, the chunk is pinned anyway. The ChunkFirstTime field in Desc +// is populated upon creation of a Desc, so it is alway safe to call +// FirstTime. The FirstTime method is arguably not needed and only there for +// consistency with LastTime. +type Desc struct { sync.Mutex // Protects pinning. - c Chunk // nil if chunk is evicted. + C Chunk // nil if chunk is evicted. rCnt int - chunkFirstTime model.Time // Populated at creation. Immutable. - chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. + ChunkFirstTime model.Time // Populated at creation. Immutable. + ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. - // evictListElement is nil if the chunk is not in the evict list. - // evictListElement is _not_ protected by the chunkDesc mutex. + // EvictListElement is nil if the chunk is not in the evict list. + // EvictListElement is _not_ protected by the Desc mutex. // It must only be touched by the evict list handler in MemorySeriesStorage. - evictListElement *list.Element + EvictListElement *list.Element } -// NewChunkDesc creates a new chunkDesc pointing to the provided chunk. The -// provided chunk is assumed to be not persisted yet. Therefore, the refCount of -// the new chunkDesc is 1 (preventing eviction prior to persisting). -func NewChunkDesc(c Chunk, firstTime model.Time) *ChunkDesc { - chunkOps.WithLabelValues(createAndPin).Inc() - atomic.AddInt64(&numMemChunks, 1) - numMemChunkDescs.Inc() - return &ChunkDesc{ - c: c, +// NewDesc creates a new Desc pointing to the provided chunk. The provided chunk +// is assumed to be not persisted yet. Therefore, the refCount of the new +// Desc is 1 (preventing eviction prior to persisting). +func NewDesc(c Chunk, firstTime model.Time) *Desc { + Ops.WithLabelValues(CreateAndPin).Inc() + atomic.AddInt64(&NumMemChunks, 1) + NumMemDescs.Inc() + return &Desc{ + C: c, rCnt: 1, - chunkFirstTime: firstTime, - chunkLastTime: model.Earliest, + ChunkFirstTime: firstTime, + ChunkLastTime: model.Earliest, } } // Add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (cd *ChunkDesc) Add(s model.SamplePair) ([]Chunk, error) { - return cd.c.Add(s) +func (d *Desc) Add(s model.SamplePair) ([]Chunk, error) { + return d.C.Add(s) } -// pin increments the refCount by one. Upon increment from 0 to 1, this -// chunkDesc is removed from the evict list. To enable the latter, the +// Pin increments the refCount by one. Upon increment from 0 to 1, this +// Desc is removed from the evict list. To enable the latter, the // evictRequests channel has to be provided. This method can be called // concurrently at any time. -func (cd *ChunkDesc) pin(evictRequests chan<- evictRequest) { - cd.Lock() - defer cd.Unlock() +func (d *Desc) Pin(evictRequests chan<- EvictRequest) { + d.Lock() + defer d.Unlock() - if cd.rCnt == 0 { + if d.rCnt == 0 { // Remove ourselves from the evict list. - evictRequests <- evictRequest{cd, false} + evictRequests <- EvictRequest{d, false} } - cd.rCnt++ + d.rCnt++ } -// unpin decrements the refCount by one. Upon decrement from 1 to 0, this -// chunkDesc is added to the evict list. To enable the latter, the evictRequests +// Unpin decrements the refCount by one. Upon decrement from 1 to 0, this +// Desc is added to the evict list. To enable the latter, the evictRequests // channel has to be provided. This method can be called concurrently at any // time. -func (cd *ChunkDesc) unpin(evictRequests chan<- evictRequest) { - cd.Lock() - defer cd.Unlock() +func (d *Desc) Unpin(evictRequests chan<- EvictRequest) { + d.Lock() + defer d.Unlock() - if cd.rCnt == 0 { + if d.rCnt == 0 { panic("cannot unpin already unpinned chunk") } - cd.rCnt-- - if cd.rCnt == 0 { + d.rCnt-- + if d.rCnt == 0 { // Add ourselves to the back of the evict list. - evictRequests <- evictRequest{cd, true} + evictRequests <- EvictRequest{d, true} } } -// refCount returns the number of pins. This method can be called concurrently +// RefCount returns the number of pins. This method can be called concurrently // at any time. -func (cd *ChunkDesc) refCount() int { - cd.Lock() - defer cd.Unlock() +func (d *Desc) RefCount() int { + d.Lock() + defer d.Unlock() - return cd.rCnt + return d.rCnt } -// firstTime returns the timestamp of the first sample in the chunk. This method +// FirstTime returns the timestamp of the first sample in the chunk. This method // can be called concurrently at any time. It only returns the immutable -// cd.chunkFirstTime without any locking. Arguably, this method is -// useless. However, it provides consistency with the lastTime method. -func (cd *ChunkDesc) firstTime() model.Time { - return cd.chunkFirstTime +// d.ChunkFirstTime without any locking. Arguably, this method is +// useless. However, it provides consistency with the LastTime method. +func (d *Desc) FirstTime() model.Time { + return d.ChunkFirstTime } -// lastTime returns the timestamp of the last sample in the chunk. For safe +// LastTime returns the timestamp of the last sample in the chunk. For safe // concurrent access, this method requires the fingerprint of the time series to // be locked. -func (cd *ChunkDesc) lastTime() (model.Time, error) { - if cd.chunkLastTime != model.Earliest || cd.c == nil { - return cd.chunkLastTime, nil +func (d *Desc) LastTime() (model.Time, error) { + if d.ChunkLastTime != model.Earliest || d.C == nil { + return d.ChunkLastTime, nil } - return cd.c.NewIterator().LastTimestamp() + return d.C.NewIterator().LastTimestamp() } -// maybePopulateLastTime populates the chunkLastTime from the underlying chunk +// MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk // if it has not yet happened. Call this method directly after having added the // last sample to a chunk or after closing a head chunk due to age. For safe // concurrent access, the chunk must be pinned, and the caller must have locked // the fingerprint of the series. -func (cd *ChunkDesc) maybePopulateLastTime() error { - if cd.chunkLastTime == model.Earliest && cd.c != nil { - t, err := cd.c.NewIterator().LastTimestamp() +func (d *Desc) MaybePopulateLastTime() error { + if d.ChunkLastTime == model.Earliest && d.C != nil { + t, err := d.C.NewIterator().LastTimestamp() if err != nil { return err } - cd.chunkLastTime = t + d.ChunkLastTime = t } return nil } -// isEvicted returns whether the chunk is evicted. For safe concurrent access, +// IsEvicted returns whether the chunk is evicted. For safe concurrent access, // the caller must have locked the fingerprint of the series. -func (cd *ChunkDesc) isEvicted() bool { +func (d *Desc) IsEvicted() bool { // Locking required here because we do not want the caller to force // pinning the chunk first, so it could be evicted while this method is // called. - cd.Lock() - defer cd.Unlock() + d.Lock() + defer d.Unlock() - return cd.c == nil + return d.C == nil } -// setChunk sets the underlying chunk. The caller must have locked the +// SetChunk sets the underlying chunk. The caller must have locked the // fingerprint of the series and must have "pre-pinned" the chunk (i.e. first -// call pin and then set the chunk). -func (cd *ChunkDesc) setChunk(c Chunk) { - if cd.c != nil { +// call Pin and then set the chunk). +func (d *Desc) SetChunk(c Chunk) { + if d.C != nil { panic("chunk already set") } - cd.c = c + d.C = c } -// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk +// MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // is now evicted, which includes the case that the chunk was evicted even // before this method was called. It can be called concurrently at any time. -func (cd *ChunkDesc) maybeEvict() bool { - cd.Lock() - defer cd.Unlock() +func (d *Desc) MaybeEvict() bool { + d.Lock() + defer d.Unlock() - if cd.c == nil { + if d.C == nil { return true } - if cd.rCnt != 0 { + if d.rCnt != 0 { return false } - if cd.chunkLastTime == model.Earliest { + if d.ChunkLastTime == model.Earliest { // This must never happen. - panic("chunkLastTime not populated for evicted chunk") + panic("ChunkLastTime not populated for evicted chunk") } - cd.c = nil - chunkOps.WithLabelValues(evict).Inc() - atomic.AddInt64(&numMemChunks, -1) + d.C = nil return true } @@ -260,18 +267,18 @@ type Chunk interface { Add(sample model.SamplePair) ([]Chunk, error) Clone() Chunk FirstTime() model.Time - NewIterator() ChunkIterator + NewIterator() Iterator Marshal(io.Writer) error MarshalToBuf([]byte) error Unmarshal(io.Reader) error UnmarshalFromBuf([]byte) error - Encoding() ChunkEncoding + Encoding() Encoding } -// ChunkIterator enables efficient access to the content of a chunk. It is -// generally not safe to use a chunkIterator concurrently with or after chunk +// Iterator enables efficient access to the content of a chunk. It is +// generally not safe to use an Iterator concurrently with or after chunk // mutation. -type ChunkIterator interface { +type Iterator interface { // Gets the last timestamp in the chunk. LastTimestamp() (model.Time, error) // Whether a given timestamp is contained between first and last value @@ -280,7 +287,7 @@ type ChunkIterator interface { // Scans the next value in the chunk. Directly after the iterator has // been created, the next value is the first value in the // chunk. Otherwise, it is the value following the last value scanned or - // found (by one of the find... methods). Returns false if either the + // found (by one of the Find... methods). Returns false if either the // end of the chunk is reached or an error has occurred. Scan() bool // Finds the most recent value at or before the provided time. Returns @@ -292,7 +299,7 @@ type ChunkIterator interface { // or an error has occurred. FindAtOrAfter(model.Time) bool // Returns the last value scanned (by the scan method) or found (by one - // of the find... methods). It returns ZeroSamplePair before any of + // of the find... methods). It returns model.ZeroSamplePair before any of // those methods were called. Value() model.SamplePair // Returns the last error encountered. In general, an error signals data @@ -300,9 +307,9 @@ type ChunkIterator interface { Err() error } -// rangeValues is a utility function that retrieves all values within the given -// range from a chunkIterator. -func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, error) { +// RangeValues is a utility function that retrieves all values within the given +// range from an Iterator. +func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) { result := []model.SamplePair{} if !it.FindAtOrAfter(in.OldestInclusive) { return result, it.Err() @@ -320,7 +327,7 @@ func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, erro // chunk, adds the provided sample to it, and returns a chunk slice containing // the provided old chunk followed by the new overflow chunk. func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { - overflowChunks, err := NewChunk().Add(s) + overflowChunks, err := New().Add(s) if err != nil { return nil, err } @@ -332,7 +339,7 @@ func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { // provided sample. It returns the new chunks (transcoded plus overflow) with // the new sample at the end. func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) { - chunkOps.WithLabelValues(transcode).Inc() + Ops.WithLabelValues(Transcode).Inc() var ( head = dst @@ -358,23 +365,23 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) return append(body, NewChunks...), nil } -// NewChunk creates a new chunk according to the encoding set by the -// DefaultChunkEncoding flag. -func NewChunk() Chunk { - chunk, err := NewChunkForEncoding(DefaultChunkEncoding) +// New creates a new chunk according to the encoding set by the +// DefaultEncoding flag. +func New() Chunk { + chunk, err := NewForEncoding(DefaultEncoding) if err != nil { panic(err) } return chunk } -// NewChunkForEncoding allows configuring what chunk type you want -func NewChunkForEncoding(encoding ChunkEncoding) (Chunk, error) { +// NewForEncoding allows configuring what chunk type you want +func NewForEncoding(encoding Encoding) (Chunk, error) { switch encoding { case Delta: - return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil + return newDeltaEncodedChunk(d1, d0, true, ChunkLen), nil case DoubleDelta: - return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil + return newDoubleDeltaEncodedChunk(d1, d0, true, ChunkLen), nil case Varbit: return newVarbitChunk(varbitZeroEncoding), nil default: @@ -402,23 +409,23 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC return &indexAccessingChunkIterator{ len: len, pos: -1, - lastValue: ZeroSamplePair, + lastValue: model.ZeroSamplePair, acc: acc, } } -// lastTimestamp implements chunkIterator. +// lastTimestamp implements Iterator. func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) { return it.acc.timestampAtIndex(it.len - 1), it.acc.err() } -// contains implements chunkIterator. +// contains implements Iterator. func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) { return !t.Before(it.acc.timestampAtIndex(0)) && !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() } -// scan implements chunkIterator. +// scan implements Iterator. func (it *indexAccessingChunkIterator) Scan() bool { it.pos++ if it.pos >= it.len { @@ -431,7 +438,7 @@ func (it *indexAccessingChunkIterator) Scan() bool { return it.acc.err() == nil } -// findAtOrBefore implements chunkIterator. +// findAtOrBefore implements Iterator. func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool { i := sort.Search(it.len, func(i int) bool { return it.acc.timestampAtIndex(i).After(t) @@ -447,7 +454,7 @@ func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool { return true } -// findAtOrAfter implements chunkIterator. +// findAtOrAfter implements Iterator. func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool { i := sort.Search(it.len, func(i int) bool { return !it.acc.timestampAtIndex(i).Before(t) @@ -463,12 +470,12 @@ func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool { return true } -// value implements chunkIterator. +// value implements Iterator. func (it *indexAccessingChunkIterator) Value() model.SamplePair { return it.lastValue } -// err implements chunkIterator. +// err implements Iterator. func (it *indexAccessingChunkIterator) Err() error { return it.acc.err() } diff --git a/storage/local/delta.go b/storage/local/chunk/delta.go similarity index 98% rename from storage/local/delta.go rename to storage/local/chunk/delta.go index e2e0dc0a6..f9bbff4a8 100644 --- a/storage/local/delta.go +++ b/storage/local/chunk/delta.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import ( "encoding/binary" @@ -184,13 +184,13 @@ func (c deltaEncodedChunk) Clone() Chunk { return &clone } -// firstTime implements chunk. +// FirstTime implements chunk. func (c deltaEncodedChunk) FirstTime() model.Time { return c.baseTime() } // NewIterator implements chunk. -func (c *deltaEncodedChunk) NewIterator() ChunkIterator { +func (c *deltaEncodedChunk) NewIterator() Iterator { return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ c: *c, baseT: c.baseTime(), @@ -265,7 +265,7 @@ func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { } // encoding implements chunk. -func (c deltaEncodedChunk) Encoding() ChunkEncoding { return Delta } +func (c deltaEncodedChunk) Encoding() Encoding { return Delta } func (c deltaEncodedChunk) timeBytes() deltaBytes { return deltaBytes(c[deltaHeaderTimeBytesOffset]) diff --git a/storage/local/delta_helpers.go b/storage/local/chunk/delta_helpers.go similarity index 99% rename from storage/local/delta_helpers.go rename to storage/local/chunk/delta_helpers.go index 0908b8dd4..81e5d18cb 100644 --- a/storage/local/delta_helpers.go +++ b/storage/local/chunk/delta_helpers.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import ( "math" diff --git a/storage/local/delta_test.go b/storage/local/chunk/delta_test.go similarity index 96% rename from storage/local/delta_test.go rename to storage/local/chunk/delta_test.go index 8f63c208f..357929574 100644 --- a/storage/local/delta_test.go +++ b/storage/local/chunk/delta_test.go @@ -15,7 +15,7 @@ // it may make sense to split those out later, but given that the tests are // near-identical and share a helper, this feels simpler for now. -package local +package chunk import ( "bytes" @@ -75,7 +75,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { }, } for _, c := range cases { - chunk := c.chunkConstructor(d1, d4, false, chunkLen) + chunk := c.chunkConstructor(d1, d4, false, ChunkLen) cs, err := chunk.Add(model.SamplePair{ Timestamp: model.Now(), @@ -85,7 +85,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err) } - buf := make([]byte, chunkLen) + buf := make([]byte, ChunkLen) cs[0].MarshalToBuf(buf) diff --git a/storage/local/doubledelta.go b/storage/local/chunk/doubledelta.go similarity index 98% rename from storage/local/doubledelta.go rename to storage/local/chunk/doubledelta.go index e8f3cecef..debf5232d 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/chunk/doubledelta.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import ( "encoding/binary" @@ -191,13 +191,13 @@ func (c doubleDeltaEncodedChunk) Clone() Chunk { return &clone } -// firstTime implements chunk. +// FirstTime implements chunk. func (c doubleDeltaEncodedChunk) FirstTime() model.Time { return c.baseTime() } // NewIterator( implements chunk. -func (c *doubleDeltaEncodedChunk) NewIterator() ChunkIterator { +func (c *doubleDeltaEncodedChunk) NewIterator() Iterator { return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ c: *c, baseT: c.baseTime(), @@ -275,7 +275,7 @@ func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { } // encoding implements chunk. -func (c doubleDeltaEncodedChunk) Encoding() ChunkEncoding { return DoubleDelta } +func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta } func (c doubleDeltaEncodedChunk) baseTime() model.Time { return model.Time( diff --git a/storage/local/chunk/instrumentation.go b/storage/local/chunk/instrumentation.go new file mode 100644 index 000000000..2a67fb5bd --- /dev/null +++ b/storage/local/chunk/instrumentation.go @@ -0,0 +1,100 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import "github.com/prometheus/client_golang/prometheus" + +// Usually, a separate file for instrumentation is frowned upon. Metrics should +// be close to where they are used. However, the metrics below are set all over +// the place, so we go for a separate instrumentation file in this case. +var ( + Ops = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunk_ops_total", + Help: "The total number of chunk operations by their type.", + }, + []string{OpTypeLabel}, + ) + DescOps = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkdesc_ops_total", + Help: "The total number of chunk descriptor operations by their type.", + }, + []string{OpTypeLabel}, + ) + NumMemDescs = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_chunkdescs", + Help: "The current number of chunk descriptors in memory.", + }) +) + +const ( + namespace = "prometheus" + subsystem = "local_storage" + + // OpTypeLabel is the label name for chunk operation types. + OpTypeLabel = "type" + + // Op-types for ChunkOps. + + // CreateAndPin is the label value for create-and-pin chunk ops. + CreateAndPin = "create" // A Desc creation with refCount=1. + // PersistAndUnpin is the label value for persist chunk ops. + PersistAndUnpin = "persist" + // Pin is the label value for pin chunk ops (excludes pin on creation). + Pin = "pin" + // Unpin is the label value for unpin chunk ops (excludes the unpin on persisting). + Unpin = "unpin" + // Clone is the label value for clone chunk ops. + Clone = "clone" + // Transcode is the label value for transcode chunk ops. + Transcode = "transcode" + // Drop is the label value for drop chunk ops. + Drop = "drop" + + // Op-types for ChunkOps and ChunkDescOps. + + // Evict is the label value for evict chunk desc ops. + Evict = "evict" + // Load is the label value for load chunk and chunk desc ops. + Load = "load" +) + +func init() { + prometheus.MustRegister(Ops) + prometheus.MustRegister(DescOps) + prometheus.MustRegister(NumMemDescs) +} + +var ( + // NumMemChunks is the total number of chunks in memory. This is a + // global counter, also used internally, so not implemented as + // metrics. Collected in MemorySeriesStorage.Collect. + // TODO(beorn7): As it is used internally, it is actually very bad style + // to have it as a global variable. + NumMemChunks int64 + + // NumMemChunksDesc is the metric descriptor for the above. + NumMemChunksDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "memory_chunks"), + "The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).", + nil, nil, + ) +) diff --git a/storage/local/varbit.go b/storage/local/chunk/varbit.go similarity index 97% rename from storage/local/varbit.go rename to storage/local/chunk/varbit.go index 3c909210a..f5590abd3 100644 --- a/storage/local/varbit.go +++ b/storage/local/chunk/varbit.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import ( "encoding/binary" @@ -195,11 +195,11 @@ const ( varbitFirstValueDeltaOffset = 38 // The following are in the "footer" and only usable if the chunk is // still open. - varbitCountOffsetBitOffset = chunkLen - 9 - varbitLastTimeDeltaOffset = chunkLen - 7 - varbitLastValueDeltaOffset = chunkLen - 4 - varbitLastLeadingZerosCountOffset = chunkLen - 4 - varbitLastSignificantBitsCountOffset = chunkLen - 3 + varbitCountOffsetBitOffset = ChunkLen - 9 + varbitLastTimeDeltaOffset = ChunkLen - 7 + varbitLastValueDeltaOffset = ChunkLen - 4 + varbitLastLeadingZerosCountOffset = ChunkLen - 4 + varbitLastSignificantBitsCountOffset = ChunkLen - 3 varbitFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here. varbitSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here. @@ -240,18 +240,18 @@ var varbitWorstCaseBitsPerSample = map[varbitValueEncoding]int{ type varbitChunk []byte // newVarbitChunk returns a newly allocated varbitChunk. For simplicity, all -// varbit chunks must have the length as determined by the chunkLen constant. +// varbit chunks must have the length as determined by the ChunkLen constant. func newVarbitChunk(enc varbitValueEncoding) *varbitChunk { - if chunkLen < varbitMinLength || chunkLen > varbitMaxLength { + if ChunkLen < varbitMinLength || ChunkLen > varbitMaxLength { panic(fmt.Errorf( "invalid chunk length of %d bytes, need at least %d bytes and at most %d bytes", - chunkLen, varbitMinLength, varbitMaxLength, + ChunkLen, varbitMinLength, varbitMaxLength, )) } if enc > varbitDirectEncoding { panic(fmt.Errorf("unknown varbit value encoding: %v", enc)) } - c := make(varbitChunk, chunkLen) + c := make(varbitChunk, ChunkLen) c.setValueEncoding(enc) return &c } @@ -280,7 +280,7 @@ func (c varbitChunk) Clone() Chunk { } // NewIterator implements chunk. -func (c varbitChunk) NewIterator() ChunkIterator { +func (c varbitChunk) NewIterator() Iterator { return newVarbitChunkIterator(c) } @@ -320,9 +320,9 @@ func (c varbitChunk) UnmarshalFromBuf(buf []byte) error { } // encoding implements chunk. -func (c varbitChunk) Encoding() ChunkEncoding { return Varbit } +func (c varbitChunk) Encoding() Encoding { return Varbit } -// firstTime implements chunk. +// FirstTime implements chunk. func (c varbitChunk) FirstTime() model.Time { return model.Time( binary.BigEndian.Uint64( @@ -548,14 +548,14 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk } // Analyze worst case, does it fit? If not, set new sample as the last. - if int(offset)+varbitWorstCaseBitsPerSample[encoding] > chunkLen*8 { + if int(offset)+varbitWorstCaseBitsPerSample[encoding] > ChunkLen*8 { return c.addLastSample(s), nil } // Transcoding/overflow decisions first. if encoding == varbitZeroEncoding && s.Value != lastValue { // Cannot go on with zero encoding. - if offset > chunkLen*4 { + if offset > ChunkLen*4 { // Chunk already half full. Don't transcode, overflow instead. return addToOverflowChunk(c, s) } @@ -567,7 +567,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk } if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) { // Cannot go on with int encoding. - if offset > chunkLen*4 { + if offset > ChunkLen*4 { // Chunk already half full. Don't transcode, overflow instead. return addToOverflowChunk(c, s) } @@ -903,7 +903,7 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator { } } -// lastTimestamp implements chunkIterator. +// lastTimestamp implements Iterator. func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) { if it.len == varbitFirstSampleBitOffset { // No samples in the chunk yet. @@ -912,7 +912,7 @@ func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) { return it.c.lastTime(), it.lastError } -// contains implements chunkIterator. +// contains implements Iterator. func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) { last, err := it.LastTimestamp() if err != nil { @@ -923,7 +923,7 @@ func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) { !t.After(last), it.lastError } -// scan implements chunkIterator. +// scan implements Iterator. func (it *varbitChunkIterator) Scan() bool { if it.lastError != nil { return false @@ -1002,7 +1002,7 @@ func (it *varbitChunkIterator) Scan() bool { return it.lastError == nil } -// findAtOrBefore implements chunkIterator. +// findAtOrBefore implements Iterator. func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool { if it.len == 0 || t.Before(it.c.FirstTime()) { return false @@ -1038,7 +1038,7 @@ func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool { return it.lastError == nil } -// findAtOrAfter implements chunkIterator. +// findAtOrAfter implements Iterator. func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool { if it.len == 0 || t.After(it.c.lastTime()) { return false @@ -1061,7 +1061,7 @@ func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool { return it.lastError == nil } -// value implements chunkIterator. +// value implements Iterator. func (it *varbitChunkIterator) Value() model.SamplePair { return model.SamplePair{ Timestamp: it.t, @@ -1069,7 +1069,7 @@ func (it *varbitChunkIterator) Value() model.SamplePair { } } -// err implements chunkIterator. +// err implements Iterator. func (it *varbitChunkIterator) Err() error { return it.lastError } diff --git a/storage/local/varbit_helpers.go b/storage/local/chunk/varbit_helpers.go similarity index 99% rename from storage/local/varbit_helpers.go rename to storage/local/chunk/varbit_helpers.go index 771fb7ded..cc637a992 100644 --- a/storage/local/varbit_helpers.go +++ b/storage/local/chunk/varbit_helpers.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import "github.com/prometheus/common/model" diff --git a/storage/local/varbit_test.go b/storage/local/chunk/varbit_test.go similarity index 99% rename from storage/local/varbit_test.go rename to storage/local/chunk/varbit_test.go index f440283f0..e0d030822 100644 --- a/storage/local/varbit_test.go +++ b/storage/local/chunk/varbit_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package chunk import "testing" diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 09eb39500..db3cf6eaa 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/index" ) @@ -114,10 +115,10 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint ) } s.chunkDescs = append( - make([]*ChunkDesc, 0, len(s.chunkDescs)-s.persistWatermark), + make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark), s.chunkDescs[s.persistWatermark:]..., ) - numMemChunkDescs.Sub(float64(s.persistWatermark)) + chunk.NumMemDescs.Sub(float64(s.persistWatermark)) s.persistWatermark = 0 s.chunkDescsOffset = 0 } @@ -290,8 +291,8 @@ func (p *persistence) sanitizeSeries( ) s.chunkDescs = cds s.chunkDescsOffset = 0 - s.savedFirstTime = cds[0].firstTime() - s.lastTime, err = cds[len(cds)-1].lastTime() + s.savedFirstTime = cds[0].FirstTime() + s.lastTime, err = cds[len(cds)-1].LastTime() if err != nil { log.Errorf( "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", @@ -314,7 +315,7 @@ func (p *persistence) sanitizeSeries( // First, throw away the chunkDescs without chunks. s.chunkDescs = s.chunkDescs[s.persistWatermark:] - numMemChunkDescs.Sub(float64(s.persistWatermark)) + chunk.NumMemDescs.Sub(float64(s.persistWatermark)) cds, err := p.loadChunkDescs(fp, 0) if err != nil { log.Errorf( @@ -326,10 +327,10 @@ func (p *persistence) sanitizeSeries( } s.persistWatermark = len(cds) s.chunkDescsOffset = 0 - s.savedFirstTime = cds[0].firstTime() + s.savedFirstTime = cds[0].FirstTime() s.modTime = modTime - lastTime, err := cds[len(cds)-1].lastTime() + lastTime, err := cds[len(cds)-1].LastTime() if err != nil { log.Errorf( "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", @@ -340,7 +341,7 @@ func (p *persistence) sanitizeSeries( } keepIdx := -1 for i, cd := range s.chunkDescs { - if cd.firstTime() >= lastTime { + if cd.FirstTime() >= lastTime { keepIdx = i break } @@ -350,8 +351,8 @@ func (p *persistence) sanitizeSeries( "Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.", s.metric, fp, chunksInFile, ) - numMemChunkDescs.Sub(float64(len(s.chunkDescs))) - atomic.AddInt64(&numMemChunks, int64(-len(s.chunkDescs))) + chunk.NumMemDescs.Sub(float64(len(s.chunkDescs))) + atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs))) s.chunkDescs = cds s.headChunkClosed = true return fp, true @@ -360,8 +361,8 @@ func (p *persistence) sanitizeSeries( "Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.", s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx, ) - numMemChunkDescs.Sub(float64(keepIdx)) - atomic.AddInt64(&numMemChunks, int64(-keepIdx)) + chunk.NumMemDescs.Sub(float64(keepIdx)) + atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx)) if keepIdx == len(s.chunkDescs) { // No chunks from series file left, head chunk is evicted, so declare it closed. s.headChunkClosed = true diff --git a/storage/local/heads.go b/storage/local/heads.go index d8f019800..14a317592 100644 --- a/storage/local/heads.go +++ b/storage/local/heads.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/local/codable" ) @@ -107,7 +108,7 @@ func (hs *headsScanner) scan() bool { firstTime int64 lastTime int64 encoding byte - ch Chunk + ch chunk.Chunk lastTimeHead model.Time ) if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { @@ -146,7 +147,7 @@ func (hs *headsScanner) scan() bool { if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } - chunkDescs := make([]*ChunkDesc, numChunkDescs) + chunkDescs := make([]*chunk.Desc, numChunkDescs) if hs.version == headsFormatLegacyVersion { if headChunkPersisted { persistWatermark = numChunkDescs @@ -163,11 +164,11 @@ func (hs *headsScanner) scan() bool { if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } - chunkDescs[i] = &ChunkDesc{ - chunkFirstTime: model.Time(firstTime), - chunkLastTime: model.Time(lastTime), + chunkDescs[i] = &chunk.Desc{ + ChunkFirstTime: model.Time(firstTime), + ChunkLastTime: model.Time(lastTime), } - numMemChunkDescs.Inc() + chunk.NumMemDescs.Inc() } else { // Non-persisted chunk. // If there are non-persisted chunks at all, we consider @@ -176,24 +177,24 @@ func (hs *headsScanner) scan() bool { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { return false } - if ch, hs.err = NewChunkForEncoding(ChunkEncoding(encoding)); hs.err != nil { + if ch, hs.err = chunk.NewForEncoding(chunk.Encoding(encoding)); hs.err != nil { return false } if hs.err = ch.Unmarshal(hs.r); hs.err != nil { return false } - cd := NewChunkDesc(ch, ch.FirstTime()) + cd := chunk.NewDesc(ch, ch.FirstTime()) if i < numChunkDescs-1 { // This is NOT the head chunk. So it's a chunk // to be persisted, and we need to populate lastTime. hs.chunksToPersistTotal++ - cd.maybePopulateLastTime() + cd.MaybePopulateLastTime() } chunkDescs[i] = cd } } - if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].lastTime(); hs.err != nil { + if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].LastTime(); hs.err != nil { return false } diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 28b218672..479e13821 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -13,38 +13,6 @@ package local -import "github.com/prometheus/client_golang/prometheus" - -// Usually, a separate file for instrumentation is frowned upon. Metrics should -// be close to where they are used. However, the metrics below are set all over -// the place, so we go for a separate instrumentation file in this case. -var ( - chunkOps = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunk_ops_total", - Help: "The total number of chunk operations by their type.", - }, - []string{opTypeLabel}, - ) - chunkDescOps = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunkdesc_ops_total", - Help: "The total number of chunk descriptor operations by their type.", - }, - []string{opTypeLabel}, - ) - numMemChunkDescs = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "memory_chunkdescs", - Help: "The current number of chunk descriptors in memory.", - }) -) - const ( namespace = "prometheus" subsystem = "local_storage" @@ -64,19 +32,6 @@ const ( droppedQuarantine = "quarantine_dropped" failedQuarantine = "quarantine_failed" - // Op-types for chunkOps. - createAndPin = "create" // A chunkDesc creation with refCount=1. - persistAndUnpin = "persist" - pin = "pin" // Excluding the pin on creation. - unpin = "unpin" // Excluding the unpin on persisting. - clone = "clone" - transcode = "transcode" - drop = "drop" - - // Op-types for chunkOps and chunkDescOps. - evict = "evict" - load = "load" - seriesLocationLabel = "location" // Maintenance types for maintainSeriesDuration. @@ -89,24 +44,3 @@ const ( outOfOrderTimestamp = "timestamp_out_of_order" duplicateSample = "multiple_values_for_timestamp" ) - -func init() { - prometheus.MustRegister(chunkOps) - prometheus.MustRegister(chunkDescOps) - prometheus.MustRegister(numMemChunkDescs) -} - -var ( - // Global counter, also used internally, so not implemented as - // metrics. Collected in MemorySeriesStorage.Collect. - // TODO(beorn7): As it is used internally, it is actually very bad style - // to have it as a global variable. - numMemChunks int64 - - // Metric descriptors for the above. - numMemChunksDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "memory_chunks"), - "The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).", - nil, nil, - ) -) diff --git a/storage/local/interface.go b/storage/local/interface.go index 4b88a7f6b..7e95a158b 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -91,7 +91,7 @@ type Querier interface { type SeriesIterator interface { // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no - // applicable value exists, ZeroSamplePair is returned. + // applicable value exists, model.ZeroSamplePair is returned. ValueAtOrBeforeTime(model.Time) model.SamplePair // Gets all values contained within a given interval. RangeValues(metric.Interval) []model.SamplePair @@ -100,17 +100,3 @@ type SeriesIterator interface { // Closes the iterator and releases the underlying data. Close() } - -// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local -// package to signal a non-existing sample pair. It is a SamplePair with -// timestamp model.Earliest and value 0.0. Note that the natural zero value of -// SamplePair has a timestamp of 0, which is possible to appear in a real -// SamplePair and thus not suitable to signal a non-existing SamplePair. -var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} - -// ZeroSample is the pseudo zero-value of model.Sample used by the local package -// to signal a non-existing sample. It is a Sample with timestamp -// model.Earliest, value 0.0, and metric nil. Note that the natural zero value -// of Sample has a timestamp of 0, which is possible to appear in a real -// Sample and thus not suitable to signal a non-existing Sample. -var ZeroSample = model.Sample{Timestamp: model.Earliest} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index b0e1e3a2d..b871cdc70 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/util/flock" @@ -60,7 +61,7 @@ const ( chunkHeaderTypeOffset = 0 chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 - chunkLenWithHeader = chunkLen + chunkHeaderLen + chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen chunkMaxBatchSize = 62 // Max no. of chunks to load or write in // one batch. Note that 62 is the largest number of chunks that fit // into 64kiB on disk because chunkHeaderLen is added to each 1k chunk. @@ -370,7 +371,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa // // Returning an error signals problems with the series file. In this case, the // caller should quarantine the series. -func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index int, err error) { +func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) { f, err := p.openChunkFileForWriting(fp) if err != nil { return -1, err @@ -399,14 +400,14 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index // incrementally larger indexes. The indexOffset denotes the offset to be added to // each index in indexes. It is the caller's responsibility to not persist or // drop anything for the same fingerprint concurrently. -func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) { +func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) { f, err := p.openChunkFileForReading(fp) if err != nil { return nil, err } defer f.Close() - chunks := make([]Chunk, 0, len(indexes)) + chunks := make([]chunk.Chunk, 0, len(indexes)) buf := p.bufPool.Get().([]byte) defer func() { // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' @@ -436,7 +437,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse return nil, err } for c := 0; c < batchSize; c++ { - chunk, err := NewChunkForEncoding(ChunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) if err != nil { return nil, err } @@ -446,16 +447,16 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse chunks = append(chunks, chunk) } } - chunkOps.WithLabelValues(load).Add(float64(len(chunks))) - atomic.AddInt64(&numMemChunks, int64(len(chunks))) + chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks))) + atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks))) return chunks, nil } -// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is -// the number of chunkDescs to skip from the end of the series file. It is the +// loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is +// the number of chunk.Descs to skip from the end of the series file. It is the // caller's responsibility to not persist or drop anything for the same // fingerprint concurrently. -func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) { +func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return nil, nil @@ -478,7 +479,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ } numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd - cds := make([]*ChunkDesc, numChunks) + cds := make([]*chunk.Desc, numChunks) chunkTimesBuf := make([]byte, 16) for i := 0; i < numChunks; i++ { _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) @@ -490,13 +491,13 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ if err != nil { return nil, err } - cds[i] = &ChunkDesc{ - chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)), - chunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), + cds[i] = &chunk.Desc{ + ChunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)), + ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } } - chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) - numMemChunkDescs.Add(float64(len(cds))) + chunk.DescOps.WithLabelValues(chunk.Load).Add(float64(len(cds))) + chunk.NumMemDescs.Add(float64(len(cds))) return cds, nil } @@ -645,10 +646,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } for i, chunkDesc := range m.series.chunkDescs { if i < m.series.persistWatermark { - if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { + if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil { return } - lt, err := chunkDesc.lastTime() + lt, err := chunkDesc.LastTime() if err != nil { return } @@ -657,10 +658,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } else { // This is a non-persisted chunk. Fully marshal it. - if err = w.WriteByte(byte(chunkDesc.c.Encoding())); err != nil { + if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { return } - if err = chunkDesc.c.Marshal(w); err != nil { + if err = chunkDesc.C.Marshal(w); err != nil { return } } @@ -751,7 +752,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in // Returning an error signals problems with the series file. In this case, the // caller should quarantine the series. func (p *persistence) dropAndPersistChunks( - fp model.Fingerprint, beforeTime model.Time, chunks []Chunk, + fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk, ) ( firstTimeNotDropped model.Time, offset int, @@ -879,7 +880,7 @@ func (p *persistence) dropAndPersistChunks( firstTimeNotDropped = model.Time( binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]), ) - chunkOps.WithLabelValues(drop).Add(float64(numDropped)) + chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped)) _, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR) if err != nil { return @@ -930,7 +931,7 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) { if err := os.Remove(fname); err != nil { return -1, err } - chunkOps.WithLabelValues(drop).Add(float64(numChunks)) + chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numChunks)) return numChunks, nil } @@ -1500,7 +1501,7 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) { return fpm, highestMappedFP, nil } -func (p *persistence) writeChunks(w io.Writer, chunks []Chunk) error { +func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error { b := p.bufPool.Get().([]byte) defer func() { // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' @@ -1547,7 +1548,7 @@ func chunkIndexForOffset(offset int64) (int, error) { return int(offset) / chunkLenWithHeader, nil } -func writeChunkHeader(header []byte, c Chunk) error { +func writeChunkHeader(header []byte, c chunk.Chunk) error { header[chunkHeaderTypeOffset] = byte(c.Encoding()) binary.LittleEndian.PutUint64( header[chunkHeaderFirstTimeOffset:], diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 51bb6a2d5..82cb18a2c 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/util/testutil" @@ -38,8 +39,8 @@ var ( m5 = model.Metric{"label": "value5"} ) -func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, testutil.Closer) { - DefaultChunkEncoding = encoding +func newTestPersistence(t *testing.T, encoding chunk.Encoding) (*persistence, testutil.Closer) { + chunk.DefaultEncoding = encoding dir := testutil.NewTemporaryDirectory("test_persistence", t) p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1) if err != nil { @@ -53,18 +54,18 @@ func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, tes }) } -func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint][]Chunk { +func buildTestChunks(t *testing.T, encoding chunk.Encoding) map[model.Fingerprint][]chunk.Chunk { fps := model.Fingerprints{ m1.FastFingerprint(), m2.FastFingerprint(), m3.FastFingerprint(), } - fpToChunks := map[model.Fingerprint][]Chunk{} + fpToChunks := map[model.Fingerprint][]chunk.Chunk{} for _, fp := range fps { - fpToChunks[fp] = make([]Chunk, 0, 10) + fpToChunks[fp] = make([]chunk.Chunk, 0, 10) for i := 0; i < 10; i++ { - ch, err := NewChunkForEncoding(encoding) + ch, err := chunk.NewForEncoding(encoding) if err != nil { t.Fatal(err) } @@ -81,7 +82,7 @@ func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint return fpToChunks } -func chunksEqual(c1, c2 Chunk) bool { +func chunksEqual(c1, c2 chunk.Chunk) bool { it1 := c1.NewIterator() it2 := c2.NewIterator() for it1.Scan() && it2.Scan() { @@ -92,7 +93,7 @@ func chunksEqual(c1, c2 Chunk) bool { return it1.Err() == nil && it2.Err() == nil } -func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) { +func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -138,14 +139,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) } for i, cd := range actualChunkDescs { - lastTime, err := cd.lastTime() + lastTime, err := cd.LastTime() if err != nil { t.Fatal(err) } - if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { + if cd.FirstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), lastTime, + i, cd.FirstTime(), lastTime, ) } @@ -159,14 +160,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) } for i, cd := range actualChunkDescs { - lastTime, err := cd.lastTime() + lastTime, err := cd.LastTime() if err != nil { t.Fatal(err) } - if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { + if cd.FirstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), lastTime, + i, cd.FirstTime(), lastTime, ) } @@ -450,7 +451,7 @@ func TestPersistLoadDropChunksType1(t *testing.T) { testPersistLoadDropChunks(t, 1) } -func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding) { +func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -499,7 +500,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding if !reflect.DeepEqual(loadedS1.metric, m1) { t.Errorf("want metric %v, got %v", m1, loadedS1.metric) } - if !reflect.DeepEqual(loadedS1.head().c, s1.head().c) { + if !reflect.DeepEqual(loadedS1.head().C, s1.head().C) { t.Error("head chunks differ") } if loadedS1.chunkDescsOffset != 0 { @@ -508,11 +509,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding if loadedS1.headChunkClosed { t.Error("headChunkClosed is true") } - if loadedS1.head().chunkFirstTime != 1 { - t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime) + if loadedS1.head().ChunkFirstTime != 1 { + t.Errorf("want ChunkFirstTime in head chunk to be 1, got %d", loadedS1.head().ChunkFirstTime) } - if loadedS1.head().chunkLastTime != model.Earliest { - t.Error("want chunkLastTime in head chunk to be unset") + if loadedS1.head().ChunkLastTime != model.Earliest { + t.Error("want ChunkLastTime in head chunk to be unset") } } else { t.Errorf("couldn't find %v in loaded map", m1) @@ -521,7 +522,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding if !reflect.DeepEqual(loadedS3.metric, m3) { t.Errorf("want metric %v, got %v", m3, loadedS3.metric) } - if loadedS3.head().c != nil { + if loadedS3.head().C != nil { t.Error("head chunk not evicted") } if loadedS3.chunkDescsOffset != 0 { @@ -530,11 +531,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding if !loadedS3.headChunkClosed { t.Error("headChunkClosed is false") } - if loadedS3.head().chunkFirstTime != 2 { - t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime) + if loadedS3.head().ChunkFirstTime != 2 { + t.Errorf("want ChunkFirstTime in head chunk to be 2, got %d", loadedS3.head().ChunkFirstTime) } - if loadedS3.head().chunkLastTime != 2 { - t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime) + if loadedS3.head().ChunkLastTime != 2 { + t.Errorf("want ChunkLastTime in head chunk to be 2, got %d", loadedS3.head().ChunkLastTime) } } else { t.Errorf("couldn't find %v in loaded map", m3) @@ -549,10 +550,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding if got, want := loadedS4.persistWatermark, 0; got != want { t.Errorf("got persistWatermark %d, want %d", got, want) } - if loadedS4.chunkDescs[2].isEvicted() { + if loadedS4.chunkDescs[2].IsEvicted() { t.Error("3rd chunk evicted") } - if loadedS4.chunkDescs[3].isEvicted() { + if loadedS4.chunkDescs[3].IsEvicted() { t.Error("4th chunk evicted") } if loadedS4.chunkDescsOffset != 0 { @@ -562,27 +563,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding t.Error("headChunkClosed is true") } for i, cd := range loadedS4.chunkDescs { - if cd.chunkFirstTime != cd.c.FirstTime() { + if cd.ChunkFirstTime != cd.C.FirstTime() { t.Errorf( - "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", - i, cd.c.FirstTime(), cd.chunkFirstTime, + "chunk.Desc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d", + i, cd.C.FirstTime(), cd.ChunkFirstTime, ) } if i == len(loadedS4.chunkDescs)-1 { // Head chunk. - if cd.chunkLastTime != model.Earliest { - t.Error("want chunkLastTime in head chunk to be unset") + if cd.ChunkLastTime != model.Earliest { + t.Error("want ChunkLastTime in head chunk to be unset") } continue } - lastTime, err := cd.c.NewIterator().LastTimestamp() + lastTime, err := cd.C.NewIterator().LastTimestamp() if err != nil { t.Fatal(err) } - if cd.chunkLastTime != lastTime { + if cd.ChunkLastTime != lastTime { t.Errorf( - "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, lastTime, cd.chunkLastTime, + "chunk.Desc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d", + i, lastTime, cd.ChunkLastTime, ) } } @@ -599,10 +600,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding if got, want := loadedS5.persistWatermark, 3; got != want { t.Errorf("got persistWatermark %d, want %d", got, want) } - if !loadedS5.chunkDescs[2].isEvicted() { + if !loadedS5.chunkDescs[2].IsEvicted() { t.Error("3rd chunk not evicted") } - if loadedS5.chunkDescs[3].isEvicted() { + if loadedS5.chunkDescs[3].IsEvicted() { t.Error("4th chunk evicted") } if loadedS5.chunkDescsOffset != 0 { @@ -614,32 +615,32 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding for i, cd := range loadedS5.chunkDescs { if i < 3 { // Evicted chunks. - if cd.chunkFirstTime == model.Earliest { - t.Errorf("chunkDesc[%d]: chunkLastTime not set", i) + if cd.ChunkFirstTime == model.Earliest { + t.Errorf("chunk.Desc[%d]: ChunkLastTime not set", i) } continue } - if cd.chunkFirstTime != cd.c.FirstTime() { + if cd.ChunkFirstTime != cd.C.FirstTime() { t.Errorf( - "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", - i, cd.c.FirstTime(), cd.chunkFirstTime, + "chunk.Desc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d", + i, cd.C.FirstTime(), cd.ChunkFirstTime, ) } if i == len(loadedS5.chunkDescs)-1 { // Head chunk. - if cd.chunkLastTime != model.Earliest { - t.Error("want chunkLastTime in head chunk to be unset") + if cd.ChunkLastTime != model.Earliest { + t.Error("want ChunkLastTime in head chunk to be unset") } continue } - lastTime, err := cd.c.NewIterator().LastTimestamp() + lastTime, err := cd.C.NewIterator().LastTimestamp() if err != nil { t.Fatal(err) } - if cd.chunkLastTime != lastTime { + if cd.ChunkLastTime != lastTime { t.Errorf( - "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.chunkLastTime, lastTime, + "chunk.Desc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d", + i, cd.ChunkLastTime, lastTime, ) } } @@ -690,7 +691,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) { } } -func testFingerprintsModifiedBefore(t *testing.T, encoding ChunkEncoding) { +func testFingerprintsModifiedBefore(t *testing.T, encoding chunk.Encoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -769,7 +770,7 @@ func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) { testFingerprintsModifiedBefore(t, 2) } -func testDropArchivedMetric(t *testing.T, encoding ChunkEncoding) { +func testDropArchivedMetric(t *testing.T, encoding chunk.Encoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -843,7 +844,7 @@ type incrementalBatch struct { expectedLpToFps index.LabelPairFingerprintsMapping } -func testIndexing(t *testing.T, encoding ChunkEncoding) { +func testIndexing(t *testing.T, encoding chunk.Encoding) { batches := []incrementalBatch{ { fpToMetric: index.FingerprintMetricMapping{ diff --git a/storage/local/series.go b/storage/local/series.go index bbc7fbce7..9717cb0ea 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -20,13 +20,14 @@ import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/metric" ) const ( - // chunkDescEvictionFactor is a factor used for chunkDesc eviction (as opposed + // chunkDescEvictionFactor is a factor used for chunk.Desc eviction (as opposed // to evictions of chunks, see method evictOlderThan. A chunk takes about 20x - // more memory than a chunkDesc. With a chunkDescEvictionFactor of 10, not more + // more memory than a chunk.Desc. With a chunkDescEvictionFactor of 10, not more // than a third of the total memory taken by a series will be used for // chunkDescs. chunkDescEvictionFactor = 10 @@ -140,8 +141,8 @@ func (sm *seriesMap) fpIter() <-chan model.Fingerprint { type memorySeries struct { metric model.Metric // Sorted by start time, overlapping chunk ranges are forbidden. - chunkDescs []*ChunkDesc - // The index (within chunkDescs above) of the first chunkDesc that + chunkDescs []*chunk.Desc + // The index (within chunkDescs above) of the first chunk.Desc that // points to a non-persisted chunk. If all chunks are persisted, then // persistWatermark == len(chunkDescs). persistWatermark int @@ -151,7 +152,7 @@ type memorySeries struct { // The chunkDescs in memory might not have all the chunkDescs for the // chunks that are persisted to disk. The missing chunkDescs are all // contiguous and at the tail end. chunkDescsOffset is the index of the - // chunk on disk that corresponds to the first chunkDesc in memory. If + // chunk on disk that corresponds to the first chunk.Desc in memory. If // it is 0, the chunkDescs are all loaded. A value of -1 denotes a // special case: There are chunks on disk, but the offset to the // chunkDescs in memory is unknown. Also, in this special case, there is @@ -160,7 +161,7 @@ type memorySeries struct { // set). chunkDescsOffset int // The savedFirstTime field is used as a fallback when the - // chunkDescsOffset is not 0. It can be used to save the firstTime of the + // chunkDescsOffset is not 0. It can be used to save the FirstTime of the // first chunk before its chunk desc is evicted. In doubt, this field is // just set to the oldest possible timestamp. savedFirstTime model.Time @@ -193,13 +194,13 @@ type memorySeries struct { // set to model.Earliest. The zero value for modTime can be used if the // modification time of the series file is unknown (e.g. if this is a genuinely // new series). -func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time) (*memorySeries, error) { +func newMemorySeries(m model.Metric, chunkDescs []*chunk.Desc, modTime time.Time) (*memorySeries, error) { var err error firstTime := model.Earliest lastTime := model.Earliest if len(chunkDescs) > 0 { - firstTime = chunkDescs[0].firstTime() - if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil { + firstTime = chunkDescs[0].FirstTime() + if lastTime, err = chunkDescs[len(chunkDescs)-1].LastTime(); err != nil { return nil, err } } @@ -220,10 +221,10 @@ func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time) // The caller must have locked the fingerprint of the series. func (s *memorySeries) Add(v model.SamplePair) (int, error) { if len(s.chunkDescs) == 0 || s.headChunkClosed { - newHead := NewChunkDesc(NewChunk(), v.Timestamp) + newHead := chunk.NewDesc(chunk.New(), v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false - } else if s.headChunkUsedByIterator && s.head().refCount() > 1 { + } else if s.headChunkUsedByIterator && s.head().RefCount() > 1 { // We only need to clone the head chunk if the current head // chunk was used in an iterator at all and if the refCount is // still greater than the 1 we always have because the head @@ -233,10 +234,10 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) { // around and keep the head chunk pinned. We needed to track // pins by version of the head chunk, which is probably not // worth the effort. - chunkOps.WithLabelValues(clone).Inc() + chunk.Ops.WithLabelValues(chunk.Clone).Inc() // No locking needed here because a non-persisted head chunk can // not get evicted concurrently. - s.head().c = s.head().c.Clone() + s.head().C = s.head().C.Clone() s.headChunkUsedByIterator = false } @@ -244,15 +245,15 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) { if err != nil { return 0, err } - s.head().c = chunks[0] + s.head().C = chunks[0] for _, c := range chunks[1:] { - s.chunkDescs = append(s.chunkDescs, NewChunkDesc(c, c.FirstTime())) + s.chunkDescs = append(s.chunkDescs, chunk.NewDesc(c, c.FirstTime())) } // Populate lastTime of now-closed chunks. for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] { - cd.maybePopulateLastTime() + cd.MaybePopulateLastTime() } s.lastTime = v.Timestamp @@ -275,7 +276,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool { // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. s.headChunkUsedByIterator = false - s.head().maybePopulateLastTime() + s.head().MaybePopulateLastTime() return true } return false @@ -287,14 +288,14 @@ func (s *memorySeries) maybeCloseHeadChunk() bool { func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { - s.savedFirstTime = s.FirstTime() + s.savedFirstTime = s.firstTime() lenEvicted := len(s.chunkDescs) - lenToKeep s.chunkDescsOffset += lenEvicted s.persistWatermark -= lenEvicted - chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) - numMemChunkDescs.Sub(float64(lenEvicted)) + chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted)) + chunk.NumMemDescs.Sub(float64(lenEvicted)) s.chunkDescs = append( - make([]*ChunkDesc, 0, lenToKeep), + make([]*chunk.Desc, 0, lenToKeep), s.chunkDescs[lenEvicted:]..., ) s.dirty = true @@ -306,7 +307,7 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { func (s *memorySeries) dropChunks(t model.Time) error { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { - lt, err := cd.lastTime() + lt, err := cd.LastTime() if err != nil { return err } @@ -324,7 +325,7 @@ func (s *memorySeries) dropChunks(t model.Time) error { return nil } s.chunkDescs = append( - make([]*ChunkDesc, 0, len(s.chunkDescs)-keepIdx), + make([]*chunk.Desc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]..., ) s.persistWatermark -= keepIdx @@ -334,7 +335,7 @@ func (s *memorySeries) dropChunks(t model.Time) error { if s.chunkDescsOffset != -1 { s.chunkDescsOffset += keepIdx } - numMemChunkDescs.Sub(float64(keepIdx)) + chunk.NumMemDescs.Sub(float64(keepIdx)) s.dirty = true return nil } @@ -344,16 +345,16 @@ func (s *memorySeries) preloadChunks( indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, ) (SeriesIterator, error) { loadIndexes := []int{} - pinnedChunkDescs := make([]*ChunkDesc, 0, len(indexes)) + pinnedChunkDescs := make([]*chunk.Desc, 0, len(indexes)) for _, idx := range indexes { cd := s.chunkDescs[idx] pinnedChunkDescs = append(pinnedChunkDescs, cd) - cd.pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading. - if cd.isEvicted() { + cd.Pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading. + if cd.IsEvicted() { loadIndexes = append(loadIndexes, idx) } } - chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs))) + chunk.Ops.WithLabelValues(chunk.Pin).Add(float64(len(pinnedChunkDescs))) if len(loadIndexes) > 0 { if s.chunkDescsOffset == -1 { @@ -363,13 +364,13 @@ func (s *memorySeries) preloadChunks( if err != nil { // Unpin the chunks since we won't return them as pinned chunks now. for _, cd := range pinnedChunkDescs { - cd.unpin(mss.evictRequests) + cd.Unpin(mss.evictRequests) } - chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) + chunk.Ops.WithLabelValues(chunk.Unpin).Add(float64(len(pinnedChunkDescs))) return nopIter, err } for i, c := range chunks { - s.chunkDescs[loadIndexes[i]].setChunk(c) + s.chunkDescs[loadIndexes[i]].SetChunk(c) } } @@ -394,19 +395,19 @@ func (s *memorySeries) preloadChunks( // // The caller must have locked the fingerprint of the memorySeries. func (s *memorySeries) NewIterator( - pinnedChunkDescs []*ChunkDesc, + pinnedChunkDescs []*chunk.Desc, quarantine func(error), - evictRequests chan<- evictRequest, + evictRequests chan<- chunk.EvictRequest, ) SeriesIterator { - chunks := make([]Chunk, 0, len(pinnedChunkDescs)) + chunks := make([]chunk.Chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { // It's OK to directly access cd.c here (without locking) as the // series FP is locked and the chunk is pinned. - chunks = append(chunks, cd.c) + chunks = append(chunks, cd.C) } return &memorySeriesIterator{ chunks: chunks, - chunkIts: make([]ChunkIterator, len(chunks)), + chunkIts: make([]chunk.Iterator, len(chunks)), quarantine: quarantine, metric: s.metric, pinnedChunkDescs: pinnedChunkDescs, @@ -429,7 +430,7 @@ func (s *memorySeries) preloadChunksForInstant( lastSample := s.lastSamplePair() if !through.Before(lastSample.Timestamp) && !from.After(lastSample.Timestamp) && - lastSample != ZeroSamplePair { + lastSample != model.ZeroSamplePair { iter := &boundedIterator{ it: &singleSampleSeriesIterator{ samplePair: lastSample, @@ -453,7 +454,7 @@ func (s *memorySeries) preloadChunksForRange( ) (SeriesIterator, error) { firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { - firstChunkDescTime = s.chunkDescs[0].firstTime() + firstChunkDescTime = s.chunkDescs[0].FirstTime() } if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { cds, err := mss.loadChunkDescs(fp, s.persistWatermark) @@ -463,7 +464,7 @@ func (s *memorySeries) preloadChunksForRange( s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescsOffset = 0 s.persistWatermark += len(cds) - firstChunkDescTime = s.chunkDescs[0].firstTime() + firstChunkDescTime = s.chunkDescs[0].FirstTime() } if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) { @@ -472,16 +473,16 @@ func (s *memorySeries) preloadChunksForRange( // Find first chunk with start time after "from". fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool { - return s.chunkDescs[i].firstTime().After(from) + return s.chunkDescs[i].FirstTime().After(from) }) // Find first chunk with start time after "through". throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool { - return s.chunkDescs[i].firstTime().After(through) + return s.chunkDescs[i].FirstTime().After(through) }) if fromIdx == len(s.chunkDescs) { // Even the last chunk starts before "from". Find out if the // series ends before "from" and we don't need to do anything. - lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() + lt, err := s.chunkDescs[len(s.chunkDescs)-1].LastTime() if err != nil { return nopIter, err } @@ -506,22 +507,22 @@ func (s *memorySeries) preloadChunksForRange( // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *memorySeries) head() *ChunkDesc { +func (s *memorySeries) head() *chunk.Desc { return s.chunkDescs[len(s.chunkDescs)-1] } // firstTime returns the timestamp of the first sample in the series. // // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) FirstTime() model.Time { +func (s *memorySeries) firstTime() model.Time { if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { - return s.chunkDescs[0].firstTime() + return s.chunkDescs[0].FirstTime() } return s.savedFirstTime } // lastSamplePair returns the last ingested SamplePair. It returns -// ZeroSamplePair if this memorySeries has never received a sample (via the add +// model.ZeroSamplePair if this memorySeries has never received a sample (via the add // method), which is the case for freshly unarchived series or newly created // ones and also for all series after a server restart. However, in that case, // series will most likely be considered stale anyway. @@ -529,7 +530,7 @@ func (s *memorySeries) FirstTime() model.Time { // The caller must have locked the fingerprint of the memorySeries. func (s *memorySeries) lastSamplePair() model.SamplePair { if !s.lastSampleValueSet { - return ZeroSamplePair + return model.ZeroSamplePair } return model.SamplePair{ Timestamp: s.lastTime, @@ -543,7 +544,7 @@ func (s *memorySeries) lastSamplePair() model.SamplePair { // accordingly. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) chunksToPersist() []*ChunkDesc { +func (s *memorySeries) chunksToPersist() []*chunk.Desc { newWatermark := len(s.chunkDescs) if !s.headChunkClosed { newWatermark-- @@ -559,20 +560,20 @@ func (s *memorySeries) chunksToPersist() []*ChunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - // Last chunkIterator used by ValueAtOrBeforeTime. - chunkIt ChunkIterator + // Last chunk.Iterator used by ValueAtOrBeforeTime. + chunkIt chunk.Iterator // Caches chunkIterators. - chunkIts []ChunkIterator + chunkIts []chunk.Iterator // The actual sample chunks. - chunks []Chunk + chunks []chunk.Chunk // Call to quarantine the series this iterator belongs to. quarantine func(error) // The metric corresponding to the iterator. metric model.Metric // Chunks that were pinned for this iterator. - pinnedChunkDescs []*ChunkDesc + pinnedChunkDescs []*chunk.Desc // Where to send evict requests when unpinning pinned chunks. - evictRequests chan<- evictRequest + evictRequests chan<- chunk.EvictRequest } // ValueAtOrBeforeTime implements SeriesIterator. @@ -582,7 +583,7 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa containsT, err := it.chunkIt.Contains(t) if err != nil { it.quarantine(err) - return ZeroSamplePair + return model.ZeroSamplePair } if containsT { if it.chunkIt.FindAtOrBefore(t) { @@ -591,12 +592,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa if it.chunkIt.Err() != nil { it.quarantine(it.chunkIt.Err()) } - return ZeroSamplePair + return model.ZeroSamplePair } } if len(it.chunks) == 0 { - return ZeroSamplePair + return model.ZeroSamplePair } // Find the last chunk where FirstTime() is before or equal to t. @@ -606,7 +607,7 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa }) if i == len(it.chunks) { // Even the first chunk starts after t. - return ZeroSamplePair + return model.ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) if it.chunkIt.FindAtOrBefore(t) { @@ -615,7 +616,7 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa if it.chunkIt.Err() != nil { it.quarantine(it.chunkIt.Err()) } - return ZeroSamplePair + return model.ZeroSamplePair } // RangeValues implements SeriesIterator. @@ -642,7 +643,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.FirstTime().After(in.NewestInclusive) { break } - chValues, err := rangeValues(it.chunkIterator(i+j), in) + chValues, err := chunk.RangeValues(it.chunkIterator(i+j), in) if err != nil { it.quarantine(err) return nil @@ -656,9 +657,9 @@ func (it *memorySeriesIterator) Metric() metric.Metric { return metric.Metric{Metric: it.metric} } -// chunkIterator returns the chunkIterator for the chunk at position i (and +// chunkIterator returns the chunk.Iterator for the chunk at position i (and // creates it if needed). -func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator { +func (it *memorySeriesIterator) chunkIterator(i int) chunk.Iterator { chunkIt := it.chunkIts[i] if chunkIt == nil { chunkIt = it.chunks[i].NewIterator() @@ -669,9 +670,9 @@ func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator { func (it *memorySeriesIterator) Close() { for _, cd := range it.pinnedChunkDescs { - cd.unpin(it.evictRequests) + cd.Unpin(it.evictRequests) } - chunkOps.WithLabelValues(unpin).Add(float64(len(it.pinnedChunkDescs))) + chunk.Ops.WithLabelValues(chunk.Unpin).Add(float64(len(it.pinnedChunkDescs))) } // singleSampleSeriesIterator implements Series Iterator. It is a "shortcut @@ -685,7 +686,7 @@ type singleSampleSeriesIterator struct { // ValueAtTime implements SeriesIterator. func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { if it.samplePair.Timestamp.After(t) { - return ZeroSamplePair + return model.ZeroSamplePair } return it.samplePair } @@ -711,7 +712,7 @@ type nopSeriesIterator struct{} // ValueAtTime implements SeriesIterator. func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { - return ZeroSamplePair + return model.ZeroSamplePair } // RangeValues implements SeriesIterator. diff --git a/storage/local/storage.go b/storage/local/storage.go index 2a1efca65..603d38dfb 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -29,13 +29,13 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/metric" ) const ( evictRequestsCap = 1024 quarantineRequestsCap = 1024 - chunkLen = 1024 // See waitForNextFP. fpMaxSweepTime = 6 * time.Hour @@ -89,11 +89,6 @@ var ( ) ) -type evictRequest struct { - cd *ChunkDesc - evict bool -} - type quarantineRequest struct { fp model.Fingerprint metric model.Metric @@ -171,7 +166,7 @@ type MemorySeriesStorage struct { mapper *fpMapper evictList *list.List - evictRequests chan evictRequest + evictRequests chan chunk.EvictRequest evictStopping, evictStopped chan struct{} quarantineRequests chan quarantineRequest @@ -226,7 +221,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage maxChunksToPersist: o.MaxChunksToPersist, evictList: list.New(), - evictRequests: make(chan evictRequest, evictRequestsCap), + evictRequests: make(chan chunk.EvictRequest, evictRequestsCap), evictStopping: make(chan struct{}), evictStopped: make(chan struct{}), @@ -457,7 +452,7 @@ type boundedIterator struct { // ValueAtOrBeforeTime implements the SeriesIterator interface. func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { if ts < bit.start { - return ZeroSamplePair + return model.ZeroSamplePair } return bit.it.ValueAtOrBeforeTime(ts) } @@ -662,7 +657,7 @@ func (s *MemorySeriesStorage) metricForRange( ) (model.Metric, *memorySeries, bool) { series, ok := s.fpToSeries.get(fp) if ok { - if series.lastTime.Before(from) || series.FirstTime().After(through) { + if series.lastTime.Before(from) || series.firstTime().After(through) { return nil, nil, false } return series.metric, series, true @@ -779,7 +774,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error { // NeedsThrottling implements Storage. func (s *MemorySeriesStorage) NeedsThrottling() bool { if s.getNumChunksToPersist() > s.maxChunksToPersist || - float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { + float64(atomic.LoadInt64(&chunk.NumMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { select { case s.throttled <- struct{}{}: default: // Do nothing, signal already pending. @@ -813,7 +808,7 @@ func (s *MemorySeriesStorage) logThrottling() { log. With("chunksToPersist", s.getNumChunksToPersist()). With("maxChunksToPersist", s.maxChunksToPersist). - With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") } @@ -821,7 +816,7 @@ func (s *MemorySeriesStorage) logThrottling() { log. With("chunksToPersist", s.getNumChunksToPersist()). With("maxChunksToPersist", s.maxChunksToPersist). - With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). Info("Storage does not need throttling anymore.") case <-s.loopStopping: @@ -833,7 +828,7 @@ func (s *MemorySeriesStorage) logThrottling() { func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { series, ok := s.fpToSeries.get(fp) if !ok { - var cds []*ChunkDesc + var cds []*chunk.Desc var modTime time.Time unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { @@ -842,9 +837,9 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() - // We have to load chunkDescs anyway to do anything with + // We have to load chunk.Descs anyway to do anything with // the series, so let's do it right now so that we don't - // end up with a series without any chunkDescs for a + // end up with a series without any chunk.Descs for a // while (which is confusing as it makes the series // appear as archived or purged). cds, err = s.loadChunkDescs(fp, 0) @@ -936,17 +931,17 @@ func (s *MemorySeriesStorage) handleEvictList() { // evict run is more than maxMemoryChunks/1000. select { case req := <-s.evictRequests: - if req.evict { - req.cd.evictListElement = s.evictList.PushBack(req.cd) + if req.Evict { + req.Desc.EvictListElement = s.evictList.PushBack(req.Desc) count++ if count > s.maxMemoryChunks/1000 { s.maybeEvict() count = 0 } } else { - if req.cd.evictListElement != nil { - s.evictList.Remove(req.cd.evictListElement) - req.cd.evictListElement = nil + if req.Desc.EvictListElement != nil { + s.evictList.Remove(req.Desc.EvictListElement) + req.Desc.EvictListElement = nil } } case <-ticker.C: @@ -971,39 +966,39 @@ func (s *MemorySeriesStorage) handleEvictList() { // maybeEvict is a local helper method. Must only be called by handleEvictList. func (s *MemorySeriesStorage) maybeEvict() { - numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks + numChunksToEvict := int(atomic.LoadInt64(&chunk.NumMemChunks)) - s.maxMemoryChunks if numChunksToEvict <= 0 { return } - chunkDescsToEvict := make([]*ChunkDesc, numChunksToEvict) + chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict) for i := range chunkDescsToEvict { e := s.evictList.Front() if e == nil { break } - cd := e.Value.(*ChunkDesc) - cd.evictListElement = nil + cd := e.Value.(*chunk.Desc) + cd.EvictListElement = nil chunkDescsToEvict[i] = cd s.evictList.Remove(e) } // Do the actual eviction in a goroutine as we might otherwise deadlock, - // in the following way: A chunk was unpinned completely and therefore + // in the following way: A chunk was Unpinned completely and therefore // scheduled for eviction. At the time we actually try to evict it, // another goroutine is pinning the chunk. The pinning goroutine has // currently locked the chunk and tries to send the evict request (to // remove the chunk from the evict list) to the evictRequests // channel. The send blocks because evictRequests is full. However, the // goroutine that is supposed to empty the channel is waiting for the - // chunkDesc lock to try to evict the chunk. + // Chunk.Desc lock to try to evict the chunk. go func() { for _, cd := range chunkDescsToEvict { if cd == nil { break } - cd.maybeEvict() + cd.MaybeEvict() // We don't care if the eviction succeeds. If the chunk // was pinned in the meantime, it will be added to the - // evict list once it gets unpinned again. + // evict list once it gets Unpinned again. } }() } @@ -1224,7 +1219,7 @@ loop: // Next, the method checks if all chunks in the series are evicted. In that // case, it archives the series and returns true. // -// Finally, it evicts chunkDescs if there are too many. +// Finally, it evicts chunk.Descs if there are too many. func (s *MemorySeriesStorage) maintainMemorySeries( fp model.Fingerprint, beforeTime model.Time, ) (becameDirty bool) { @@ -1258,7 +1253,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( iOldestNotEvicted := -1 for i, cd := range series.chunkDescs { - if !cd.isEvicted() { + if !cd.IsEvicted() { iOldestNotEvicted = i break } @@ -1269,7 +1264,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), series.lastTime) + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) if oldWatermark < int64(series.lastTime) { @@ -1282,7 +1277,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( } return } - // If we are here, the series is not archived, so check for chunkDesc + // If we are here, the series is not archived, so check for Chunk.Desc // eviction next. series.evictChunkDescs(iOldestNotEvicted) @@ -1316,21 +1311,21 @@ func (s *MemorySeriesStorage) writeMemorySeries( // that belong to a series that is scheduled for quarantine // anyway. for _, cd := range cds { - cd.unpin(s.evictRequests) + cd.Unpin(s.evictRequests) } s.incNumChunksToPersist(-len(cds)) - chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds))) + chunk.Ops.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds))) series.modTime = s.persistence.seriesFileModTime(fp) }() - // Get the actual chunks from underneath the chunkDescs. + // Get the actual chunks from underneath the chunk.Descs. // No lock required as chunks still to persist cannot be evicted. - chunks := make([]Chunk, len(cds)) + chunks := make([]chunk.Chunk, len(cds)) for i, cd := range cds { - chunks[i] = cd.c + chunks[i] = cd.C } - if !series.FirstTime().Before(beforeTime) { + if !series.firstTime().Before(beforeTime) { // Oldest sample not old enough, just append chunks, if any. if len(cds) == 0 { return false @@ -1413,12 +1408,12 @@ func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor } // See persistence.loadChunks for detailed explanation. -func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) { +func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) { return s.persistence.loadChunks(fp, indexes, indexOffset) } // See persistence.loadChunkDescs for detailed explanation. -func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) { +func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) { return s.persistence.loadChunkDescs(fp, offsetFromEnd) } @@ -1468,7 +1463,7 @@ func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 { var ( chunksToPersist = float64(s.getNumChunksToPersist()) maxChunksToPersist = float64(s.maxChunksToPersist) - memChunks = float64(atomic.LoadInt64(&numMemChunks)) + memChunks = float64(atomic.LoadInt64(&chunk.NumMemChunks)) maxMemChunks = float64(s.maxMemoryChunks) ) score := chunksToPersist / maxChunksToPersist @@ -1578,12 +1573,12 @@ func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, s.numSeries.Dec() m = series.metric - // Adjust s.numChunksToPersist and numMemChunks down by + // Adjust s.numChunksToPersist and chunk.NumMemChunks down by // the number of chunks in this series that are not // persisted yet. Persisted chunks will be deducted from - // numMemChunks upon eviction. + // chunk.NumMemChunks upon eviction. numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark - atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted)) + atomic.AddInt64(&chunk.NumMemChunks, int64(-numChunksNotYetPersisted)) if !series.headChunkClosed { // Head chunk wasn't counted as waiting for persistence yet. // (But it was counted as a chunk in memory.) @@ -1641,7 +1636,7 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.ingestedSamplesCount.Desc() s.discardedSamplesCount.Describe(ch) ch <- s.nonExistentSeriesMatchesCount.Desc() - ch <- numMemChunksDesc + ch <- chunk.NumMemChunksDesc s.maintainSeriesDuration.Describe(ch) ch <- s.persistenceUrgencyScore.Desc() ch <- s.rushedMode.Desc() @@ -1669,9 +1664,9 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.discardedSamplesCount.Collect(ch) ch <- s.nonExistentSeriesMatchesCount ch <- prometheus.MustNewConstMetric( - numMemChunksDesc, + chunk.NumMemChunksDesc, prometheus.GaugeValue, - float64(atomic.LoadInt64(&numMemChunks)), + float64(atomic.LoadInt64(&chunk.NumMemChunks)), ) s.maintainSeriesDuration.Collect(ch) ch <- s.persistenceUrgencyScore diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a52689f70..361144f70 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/util/testutil" ) @@ -68,7 +69,7 @@ func TestMatches(t *testing.T) { t.Fatal("could not retrieve series for fp", fp) } storage.fpLocker.Lock(fp) - storage.persistence.archiveMetric(fp, s.metric, s.FirstTime(), s.lastTime) + storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) storage.fpLocker.Unlock(fp) } @@ -785,7 +786,7 @@ func TestLoop(t *testing.T) { } } -func testChunk(t *testing.T, encoding ChunkEncoding) { +func testChunk(t *testing.T, encoding chunk.Encoding) { samples := make(model.Samples, 500000) for i := range samples { samples[i] = &model.Sample{ @@ -806,10 +807,10 @@ func testChunk(t *testing.T, encoding ChunkEncoding) { defer s.fpLocker.Unlock(m.fp) // TODO remove, see below var values []model.SamplePair for _, cd := range m.series.chunkDescs { - if cd.isEvicted() { + if cd.IsEvicted() { continue } - it := cd.c.NewIterator() + it := cd.C.NewIterator() for it.Scan() { values = append(values, it.Value()) } @@ -843,7 +844,7 @@ func TestChunkType2(t *testing.T) { testChunk(t, 2) } -func testValueAtOrBeforeTime(t *testing.T, encoding ChunkEncoding) { +func testValueAtOrBeforeTime(t *testing.T, encoding chunk.Encoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -921,7 +922,7 @@ func TestValueAtTimeChunkType2(t *testing.T) { testValueAtOrBeforeTime(t, 2) } -func benchmarkValueAtOrBeforeTime(b *testing.B, encoding ChunkEncoding) { +func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunk.Encoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1003,7 +1004,7 @@ func BenchmarkValueAtTimeChunkType2(b *testing.B) { benchmarkValueAtOrBeforeTime(b, 2) } -func testRangeValues(t *testing.T, encoding ChunkEncoding) { +func testRangeValues(t *testing.T, encoding chunk.Encoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1159,7 +1160,7 @@ func TestRangeValuesChunkType2(t *testing.T) { testRangeValues(t, 2) } -func benchmarkRangeValues(b *testing.B, encoding ChunkEncoding) { +func benchmarkRangeValues(b *testing.B, encoding chunk.Encoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1207,7 +1208,7 @@ func BenchmarkRangeValuesChunkType2(b *testing.B) { benchmarkRangeValues(b, 2) } -func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) { +func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1271,11 +1272,11 @@ func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) - lastTime, err := series.head().lastTime() + lastTime, err := series.head().LastTime() if err != nil { t.Fatal(err) } - s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime) + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") @@ -1312,11 +1313,11 @@ func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) - lastTime, err = series.head().lastTime() + lastTime, err = series.head().LastTime() if err != nil { t.Fatal(err) } - s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime) + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") @@ -1362,7 +1363,7 @@ func TestEvictAndPurgeSeriesChunkType2(t *testing.T) { testEvictAndPurgeSeries(t, 2) } -func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) { +func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1401,7 +1402,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) { s.maintainMemorySeries(fp, 0) // Give the evict goroutine an opportunity to run. time.Sleep(250 * time.Millisecond) - // Maintain series again to trigger chunkDesc eviction + // Maintain series again to trigger chunk.Desc eviction. s.maintainMemorySeries(fp, 0) if oldLen <= len(series.chunkDescs) { @@ -1421,7 +1422,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) { s.maintainMemorySeries(fp, 100000) if len(series.chunkDescs) != 1 { - t.Errorf("Expected exactly one chunkDesc left, got %d.", len(series.chunkDescs)) + t.Errorf("Expected exactly one chunk.Desc left, got %d.", len(series.chunkDescs)) } } @@ -1433,7 +1434,7 @@ func TestEvictAndLoadChunkDescsType1(t *testing.T) { testEvictAndLoadChunkDescs(t, 1) } -func benchmarkAppend(b *testing.B, encoding ChunkEncoding) { +func benchmarkAppend(b *testing.B, encoding chunk.Encoding) { samples := make(model.Samples, b.N) for i := range samples { samples[i] = &model.Sample{ @@ -1469,7 +1470,7 @@ func BenchmarkAppendType2(b *testing.B) { // Append a large number of random samples and then check if we can get them out // of the storage alright. -func testFuzz(t *testing.T, encoding ChunkEncoding) { +func testFuzz(t *testing.T, encoding chunk.Encoding) { if testing.Short() { t.Skip("Skipping test in short mode.") } @@ -1517,8 +1518,8 @@ func TestFuzzChunkType2(t *testing.T) { // make things even slower): // // go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType -func benchmarkFuzz(b *testing.B, encoding ChunkEncoding) { - DefaultChunkEncoding = encoding +func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) { + chunk.DefaultEncoding = encoding const samplesPerRun = 100000 rand.Seed(42) directory := testutil.NewTemporaryDirectory("test_storage", b) diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 48beef0cd..e28cf19ff 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -22,6 +22,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/util/testutil" ) @@ -40,8 +41,8 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t testutil.T, encoding ChunkEncoding) (*MemorySeriesStorage, testutil.Closer) { - DefaultChunkEncoding = encoding +func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage, testutil.Closer) { + chunk.DefaultEncoding = encoding directory := testutil.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, diff --git a/vendor/github.com/prometheus/common/model/value.go b/vendor/github.com/prometheus/common/model/value.go index dbf5d10e4..7728abaee 100644 --- a/vendor/github.com/prometheus/common/model/value.go +++ b/vendor/github.com/prometheus/common/model/value.go @@ -22,6 +22,22 @@ import ( "strings" ) +var ( + // ZeroSamplePair is the pseudo zero-value of SamplePair used to signal a + // non-existing sample pair. It is a SamplePair with timestamp Earliest and + // value 0.0. Note that the natural zero value of SamplePair has a timestamp + // of 0, which is possible to appear in a real SamplePair and thus not + // suitable to signal a non-existing SamplePair. + ZeroSamplePair = SamplePair{Timestamp: Earliest} + + // ZeroSample is the pseudo zero-value of Sample used to signal a + // non-existing sample. It is a Sample with timestamp Earliest, value 0.0, + // and metric nil. Note that the natural zero value of Sample has a timestamp + // of 0, which is possible to appear in a real Sample and thus not suitable + // to signal a non-existing Sample. + ZeroSample = Sample{Timestamp: Earliest} +) + // A SampleValue is a representation of a value for a given sample at a given // time. type SampleValue float64 diff --git a/vendor/vendor.json b/vendor/vendor.json index de46d9743..1077dfd67 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -205,8 +205,8 @@ { "checksumSHA1": "Jx0GXl5hGnO25s3ryyvtdWHdCpw=", "path": "github.com/prometheus/common/model", - "revision": "4402f4e5ea79ec15f3c574773b6a5198fbea215f", - "revisionTime": "2016-06-23T15:14:27Z" + "revision": "e35a2e33a50a7d756c7afdfaf609f93905a0c111", + "revisionTime": "2016-09-28T14:38:18+02:00" }, { "checksumSHA1": "CKVJRc1NREmfoAWQLHxqWQlvxo0=",