storage: separate chunk package, publish more names

This is a followup to https://github.com/prometheus/prometheus/pull/2011.

This publishes more of the methods and other names of the chunk code and
moves the chunk code to its own package. There's some unavoidable
ugliness: the chunk and chunkDesc metrics are used by both packages, so
I had to move them to the chunk package. That isn't great, but I don't
see how to do it better without a larger redesign of everything. Same
for the evict requests and some other types.
This commit is contained in:
Julius Volz 2016-09-21 23:44:27 +02:00
parent 42c05dd3a2
commit ab80ced756
19 changed files with 469 additions and 425 deletions

View file

@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web"
@ -149,7 +150,7 @@ func init() {
"If set, a crash recovery will perform checks on each series file. This might take a very long time.",
)
cfg.fs.Var(
&local.DefaultChunkEncoding, "storage.local.chunk-encoding-version",
&chunk.DefaultEncoding, "storage.local.chunk-encoding-version",
"Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding), 1 (double-delta encoding), and 2 (double-delta encoding with variable bit-width).",
)
// Index cache sizes.

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import (
"container/list"
@ -27,21 +27,30 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// DefaultChunkEncoding can be changed via a flag.
var DefaultChunkEncoding = DoubleDelta
// ChunkLen is the length of a chunk in bytes.
const ChunkLen = 1024
// DefaultEncoding can be changed via a flag.
var DefaultEncoding = DoubleDelta
var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")
// ChunkEncoding defintes which encoding we are using, delta, doubledelta, or varbit
type ChunkEncoding byte
// EvictRequest is a request to evict a chunk from memory.
type EvictRequest struct {
CD *Desc
Evict bool
}
// Encoding defines which encoding we are using, delta, doubledelta, or varbit
type Encoding byte
// String implements flag.Value.
func (ce ChunkEncoding) String() string {
func (ce Encoding) String() string {
return fmt.Sprintf("%d", ce)
}
// Set implements flag.Value.
func (ce *ChunkEncoding) Set(s string) error {
func (ce *Encoding) Set(s string) error {
switch s {
case "0":
*ce = Delta
@ -57,96 +66,96 @@ func (ce *ChunkEncoding) Set(s string) error {
const (
// Delta encoding
Delta ChunkEncoding = iota
Delta Encoding = iota
// DoubleDelta encoding
DoubleDelta
// Varbit encoding
Varbit
)
// ChunkDesc contains meta-data for a chunk. Pay special attention to the
// Desc contains meta-data for a chunk. Pay special attention to the
// documented requirements for calling its methods concurrently (WRT pinning and
// locking). The doc comments spell out the requirements for each method, but
// here is an overview and general explanation:
//
// Everything that changes the pinning of the underlying chunk or deals with its
// eviction is protected by a mutex. This affects the following methods: pin,
// unpin, refCount, isEvicted, maybeEvict. These methods can be called at any
// eviction is protected by a mutex. This affects the following methods: Pin,
// Unpin, RefCount, IsEvicted, MaybeEvict. These methods can be called at any
// time without further prerequisites.
//
// Another group of methods acts on (or sets) the underlying chunk. These
// methods involve no locking. They may only be called if the caller has pinned
// the chunk (to guarantee the chunk is not evicted concurrently). Also, the
// caller must make sure nobody else will call these methods concurrently,
// either by holding the sole reference to the chunkDesc (usually during loading
// or creation) or by locking the fingerprint of the series the chunkDesc
// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk.
// either by holding the sole reference to the ChunkDesc (usually during loading
// or creation) or by locking the fingerprint of the series the ChunkDesc
// belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk.
//
// Finally, there are the special cases firstTime and lastTime. lastTime requires
// Finally, there are the special cases FirstTime and LastTime. LastTime requires
// to have locked the fingerprint of the series but the chunk does not need to
// be pinned. That's because the chunkLastTime field in chunkDesc gets populated
// be pinned. That's because the ChunkLastTime field in ChunkDesc gets populated
// upon completion of the chunk (when it is still pinned, and which happens
// while the series's fingerprint is locked). Once that has happened, calling
// lastTime does not require the chunk to be loaded anymore. Before that has
// happened, the chunk is pinned anyway. The chunkFirstTime field in chunkDesc
// is populated upon creation of a chunkDesc, so it is alway safe to call
// firstTime. The firstTime method is arguably not needed and only there for
// consistency with lastTime.
type ChunkDesc struct {
// LastTime does not require the chunk to be loaded anymore. Before that has
// happened, the chunk is pinned anyway. The ChunkFirstTime field in ChunkDesc
// is populated upon creation of a ChunkDesc, so it is alway safe to call
// FirstTime. The FirstTime method is arguably not needed and only there for
// consistency with LastTime.
type Desc struct {
sync.Mutex // Protects pinning.
c Chunk // nil if chunk is evicted.
C Chunk // nil if chunk is evicted.
rCnt int
chunkFirstTime model.Time // Populated at creation. Immutable.
chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
ChunkFirstTime model.Time // Populated at creation. Immutable.
ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
// evictListElement is nil if the chunk is not in the evict list.
// evictListElement is _not_ protected by the chunkDesc mutex.
// EvictListElement is nil if the chunk is not in the evict list.
// EvictListElement is _not_ protected by the ChunkDesc mutex.
// It must only be touched by the evict list handler in MemorySeriesStorage.
evictListElement *list.Element
EvictListElement *list.Element
}
// NewChunkDesc creates a new chunkDesc pointing to the provided chunk. The
// provided chunk is assumed to be not persisted yet. Therefore, the refCount of
// the new chunkDesc is 1 (preventing eviction prior to persisting).
func NewChunkDesc(c Chunk, firstTime model.Time) *ChunkDesc {
chunkOps.WithLabelValues(createAndPin).Inc()
atomic.AddInt64(&numMemChunks, 1)
numMemChunkDescs.Inc()
return &ChunkDesc{
c: c,
// NewDesc creates a new Desc pointing to the provided chunk. The provided chunk
// is assumed to be not persisted yet. Therefore, the refCount of the new
// ChunkDesc is 1 (preventing eviction prior to persisting).
func NewDesc(c Chunk, firstTime model.Time) *Desc {
ChunkOps.WithLabelValues(CreateAndPin).Inc()
atomic.AddInt64(&NumMemChunks, 1)
NumMemChunkDescs.Inc()
return &Desc{
C: c,
rCnt: 1,
chunkFirstTime: firstTime,
chunkLastTime: model.Earliest,
ChunkFirstTime: firstTime,
ChunkLastTime: model.Earliest,
}
}
// Add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of
// the series.
func (cd *ChunkDesc) Add(s model.SamplePair) ([]Chunk, error) {
return cd.c.Add(s)
func (cd *Desc) Add(s model.SamplePair) ([]Chunk, error) {
return cd.C.Add(s)
}
// pin increments the refCount by one. Upon increment from 0 to 1, this
// chunkDesc is removed from the evict list. To enable the latter, the
// Pin increments the refCount by one. Upon increment from 0 to 1, this
// ChunkDesc is removed from the evict list. To enable the latter, the
// evictRequests channel has to be provided. This method can be called
// concurrently at any time.
func (cd *ChunkDesc) pin(evictRequests chan<- evictRequest) {
func (cd *Desc) Pin(evictRequests chan<- EvictRequest) {
cd.Lock()
defer cd.Unlock()
if cd.rCnt == 0 {
// Remove ourselves from the evict list.
evictRequests <- evictRequest{cd, false}
evictRequests <- EvictRequest{cd, false}
}
cd.rCnt++
}
// unpin decrements the refCount by one. Upon decrement from 1 to 0, this
// chunkDesc is added to the evict list. To enable the latter, the evictRequests
// Unpin decrements the refCount by one. Upon decrement from 1 to 0, this
// ChunkDesc is added to the evict list. To enable the latter, the evictRequests
// channel has to be provided. This method can be called concurrently at any
// time.
func (cd *ChunkDesc) unpin(evictRequests chan<- evictRequest) {
func (cd *Desc) Unpin(evictRequests chan<- EvictRequest) {
cd.Lock()
defer cd.Unlock()
@ -156,95 +165,93 @@ func (cd *ChunkDesc) unpin(evictRequests chan<- evictRequest) {
cd.rCnt--
if cd.rCnt == 0 {
// Add ourselves to the back of the evict list.
evictRequests <- evictRequest{cd, true}
evictRequests <- EvictRequest{cd, true}
}
}
// refCount returns the number of pins. This method can be called concurrently
// RefCount returns the number of pins. This method can be called concurrently
// at any time.
func (cd *ChunkDesc) refCount() int {
func (cd *Desc) RefCount() int {
cd.Lock()
defer cd.Unlock()
return cd.rCnt
}
// firstTime returns the timestamp of the first sample in the chunk. This method
// FirstTime returns the timestamp of the first sample in the chunk. This method
// can be called concurrently at any time. It only returns the immutable
// cd.chunkFirstTime without any locking. Arguably, this method is
// useless. However, it provides consistency with the lastTime method.
func (cd *ChunkDesc) firstTime() model.Time {
return cd.chunkFirstTime
// cd.ChunkFirstTime without any locking. Arguably, this method is
// useless. However, it provides consistency with the LastTime method.
func (cd *Desc) FirstTime() model.Time {
return cd.ChunkFirstTime
}
// lastTime returns the timestamp of the last sample in the chunk. For safe
// LastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to
// be locked.
func (cd *ChunkDesc) lastTime() (model.Time, error) {
if cd.chunkLastTime != model.Earliest || cd.c == nil {
return cd.chunkLastTime, nil
func (cd *Desc) LastTime() (model.Time, error) {
if cd.ChunkLastTime != model.Earliest || cd.C == nil {
return cd.ChunkLastTime, nil
}
return cd.c.NewIterator().LastTimestamp()
return cd.C.NewIterator().LastTimestamp()
}
// maybePopulateLastTime populates the chunkLastTime from the underlying chunk
// MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk
// if it has not yet happened. Call this method directly after having added the
// last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series.
func (cd *ChunkDesc) maybePopulateLastTime() error {
if cd.chunkLastTime == model.Earliest && cd.c != nil {
t, err := cd.c.NewIterator().LastTimestamp()
func (cd *Desc) MaybePopulateLastTime() error {
if cd.ChunkLastTime == model.Earliest && cd.C != nil {
t, err := cd.C.NewIterator().LastTimestamp()
if err != nil {
return err
}
cd.chunkLastTime = t
cd.ChunkLastTime = t
}
return nil
}
// isEvicted returns whether the chunk is evicted. For safe concurrent access,
// IsEvicted returns whether the chunk is evicted. For safe concurrent access,
// the caller must have locked the fingerprint of the series.
func (cd *ChunkDesc) isEvicted() bool {
func (cd *Desc) IsEvicted() bool {
// Locking required here because we do not want the caller to force
// pinning the chunk first, so it could be evicted while this method is
// called.
cd.Lock()
defer cd.Unlock()
return cd.c == nil
return cd.C == nil
}
// setChunk sets the underlying chunk. The caller must have locked the
// SetChunk sets the underlying chunk. The caller must have locked the
// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first
// call pin and then set the chunk).
func (cd *ChunkDesc) setChunk(c Chunk) {
if cd.c != nil {
// call Pin and then set the chunk).
func (cd *Desc) SetChunk(c Chunk) {
if cd.C != nil {
panic("chunk already set")
}
cd.c = c
cd.C = c
}
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// is now evicted, which includes the case that the chunk was evicted even
// before this method was called. It can be called concurrently at any time.
func (cd *ChunkDesc) maybeEvict() bool {
func (cd *Desc) MaybeEvict() bool {
cd.Lock()
defer cd.Unlock()
if cd.c == nil {
if cd.C == nil {
return true
}
if cd.rCnt != 0 {
return false
}
if cd.chunkLastTime == model.Earliest {
if cd.ChunkLastTime == model.Earliest {
// This must never happen.
panic("chunkLastTime not populated for evicted chunk")
panic("ChunkLastTime not populated for evicted chunk")
}
cd.c = nil
chunkOps.WithLabelValues(evict).Inc()
atomic.AddInt64(&numMemChunks, -1)
cd.C = nil
return true
}
@ -260,18 +267,18 @@ type Chunk interface {
Add(sample model.SamplePair) ([]Chunk, error)
Clone() Chunk
FirstTime() model.Time
NewIterator() ChunkIterator
NewIterator() Iterator
Marshal(io.Writer) error
MarshalToBuf([]byte) error
Unmarshal(io.Reader) error
UnmarshalFromBuf([]byte) error
Encoding() ChunkEncoding
Encoding() Encoding
}
// ChunkIterator enables efficient access to the content of a chunk. It is
// generally not safe to use a chunkIterator concurrently with or after chunk
// Iterator enables efficient access to the content of a chunk. It is
// generally not safe to use an Iterator concurrently with or after chunk
// mutation.
type ChunkIterator interface {
type Iterator interface {
// Gets the last timestamp in the chunk.
LastTimestamp() (model.Time, error)
// Whether a given timestamp is contained between first and last value
@ -280,7 +287,7 @@ type ChunkIterator interface {
// Scans the next value in the chunk. Directly after the iterator has
// been created, the next value is the first value in the
// chunk. Otherwise, it is the value following the last value scanned or
// found (by one of the find... methods). Returns false if either the
// found (by one of the Find... methods). Returns false if either the
// end of the chunk is reached or an error has occurred.
Scan() bool
// Finds the most recent value at or before the provided time. Returns
@ -300,9 +307,9 @@ type ChunkIterator interface {
Err() error
}
// rangeValues is a utility function that retrieves all values within the given
// range from a chunkIterator.
func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, error) {
// RangeValues is a utility function that retrieves all values within the given
// range from an Iterator.
func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) {
result := []model.SamplePair{}
if !it.FindAtOrAfter(in.OldestInclusive) {
return result, it.Err()
@ -332,7 +339,7 @@ func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
// provided sample. It returns the new chunks (transcoded plus overflow) with
// the new sample at the end.
func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) {
chunkOps.WithLabelValues(transcode).Inc()
ChunkOps.WithLabelValues(Transcode).Inc()
var (
head = dst
@ -359,9 +366,9 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error)
}
// NewChunk creates a new chunk according to the encoding set by the
// DefaultChunkEncoding flag.
// DefaultEncoding flag.
func NewChunk() Chunk {
chunk, err := NewChunkForEncoding(DefaultChunkEncoding)
chunk, err := NewChunkForEncoding(DefaultEncoding)
if err != nil {
panic(err)
}
@ -369,12 +376,12 @@ func NewChunk() Chunk {
}
// NewChunkForEncoding allows configuring what chunk type you want
func NewChunkForEncoding(encoding ChunkEncoding) (Chunk, error) {
func NewChunkForEncoding(encoding Encoding) (Chunk, error) {
switch encoding {
case Delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil
return newDeltaEncodedChunk(d1, d0, true, ChunkLen), nil
case DoubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil
return newDoubleDeltaEncodedChunk(d1, d0, true, ChunkLen), nil
case Varbit:
return newVarbitChunk(varbitZeroEncoding), nil
default:
@ -402,23 +409,23 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC
return &indexAccessingChunkIterator{
len: len,
pos: -1,
lastValue: ZeroSamplePair,
lastValue: model.SamplePair{Timestamp: model.Earliest},
acc: acc,
}
}
// lastTimestamp implements chunkIterator.
// lastTimestamp implements Iterator.
func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) {
return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
}
// contains implements chunkIterator.
// contains implements Iterator.
func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) {
return !t.Before(it.acc.timestampAtIndex(0)) &&
!t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err()
}
// scan implements chunkIterator.
// scan implements Iterator.
func (it *indexAccessingChunkIterator) Scan() bool {
it.pos++
if it.pos >= it.len {
@ -431,7 +438,7 @@ func (it *indexAccessingChunkIterator) Scan() bool {
return it.acc.err() == nil
}
// findAtOrBefore implements chunkIterator.
// findAtOrBefore implements Iterator.
func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool {
return it.acc.timestampAtIndex(i).After(t)
@ -447,7 +454,7 @@ func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool {
return true
}
// findAtOrAfter implements chunkIterator.
// findAtOrAfter implements Iterator.
func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool {
return !it.acc.timestampAtIndex(i).Before(t)
@ -463,12 +470,12 @@ func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool {
return true
}
// value implements chunkIterator.
// value implements Iterator.
func (it *indexAccessingChunkIterator) Value() model.SamplePair {
return it.lastValue
}
// err implements chunkIterator.
// err implements Iterator.
func (it *indexAccessingChunkIterator) Err() error {
return it.acc.err()
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import (
"encoding/binary"
@ -184,13 +184,13 @@ func (c deltaEncodedChunk) Clone() Chunk {
return &clone
}
// firstTime implements chunk.
// FirstTime implements chunk.
func (c deltaEncodedChunk) FirstTime() model.Time {
return c.baseTime()
}
// NewIterator implements chunk.
func (c *deltaEncodedChunk) NewIterator() ChunkIterator {
func (c *deltaEncodedChunk) NewIterator() Iterator {
return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
@ -265,7 +265,7 @@ func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
}
// encoding implements chunk.
func (c deltaEncodedChunk) Encoding() ChunkEncoding { return Delta }
func (c deltaEncodedChunk) Encoding() Encoding { return Delta }
func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import (
"math"

View file

@ -15,7 +15,7 @@
// it may make sense to split those out later, but given that the tests are
// near-identical and share a helper, this feels simpler for now.
package local
package chunk
import (
"bytes"
@ -75,7 +75,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
},
}
for _, c := range cases {
chunk := c.chunkConstructor(d1, d4, false, chunkLen)
chunk := c.chunkConstructor(d1, d4, false, ChunkLen)
cs, err := chunk.Add(model.SamplePair{
Timestamp: model.Now(),
@ -85,7 +85,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err)
}
buf := make([]byte, chunkLen)
buf := make([]byte, ChunkLen)
cs[0].MarshalToBuf(buf)

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import (
"encoding/binary"
@ -191,13 +191,13 @@ func (c doubleDeltaEncodedChunk) Clone() Chunk {
return &clone
}
// firstTime implements chunk.
// FirstTime implements chunk.
func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
return c.baseTime()
}
// NewIterator( implements chunk.
func (c *doubleDeltaEncodedChunk) NewIterator() ChunkIterator {
func (c *doubleDeltaEncodedChunk) NewIterator() Iterator {
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
@ -275,7 +275,7 @@ func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
}
// encoding implements chunk.
func (c doubleDeltaEncodedChunk) Encoding() ChunkEncoding { return DoubleDelta }
func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta }
func (c doubleDeltaEncodedChunk) baseTime() model.Time {
return model.Time(

View file

@ -0,0 +1,100 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunk
import "github.com/prometheus/client_golang/prometheus"
// Usually, a separate file for instrumentation is frowned upon. Metrics should
// be close to where they are used. However, the metrics below are set all over
// the place, so we go for a separate instrumentation file in this case.
var (
ChunkOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunk_ops_total",
Help: "The total number of chunk operations by their type.",
},
[]string{OpTypeLabel},
)
ChunkDescOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkdesc_ops_total",
Help: "The total number of chunk descriptor operations by their type.",
},
[]string{OpTypeLabel},
)
NumMemChunkDescs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_chunkdescs",
Help: "The current number of chunk descriptors in memory.",
})
)
const (
namespace = "prometheus"
subsystem = "local_storage"
// OpTypeLabel is the label name for chunk operation types.
OpTypeLabel = "type"
// Op-types for ChunkOps.
// CreateAndPin is the label value for create-and-pin chunk ops.
CreateAndPin = "create" // A ChunkDesc creation with refCount=1.
// PersistAndUnpin is the label value for persist chunk ops.
PersistAndUnpin = "persist"
// Pin is the label value for pin chunk ops (excludes pin on creation).
Pin = "pin"
// Unpin is the label value for unpin chunk ops (excludes the unpin on persisting).
Unpin = "unpin"
// Clone is the label value for clone chunk ops.
Clone = "clone"
// Transcode is the label value for transcode chunk ops.
Transcode = "transcode"
// Drop is the label value for drop chunk ops.
Drop = "drop"
// Op-types for ChunkOps and ChunkDescOps.
// Evict is the label value for evict chunk desc ops.
Evict = "evict"
// Load is the label value for load chunk and chunk desc ops.
Load = "load"
)
func init() {
prometheus.MustRegister(ChunkOps)
prometheus.MustRegister(ChunkDescOps)
prometheus.MustRegister(NumMemChunkDescs)
}
var (
// NumMemChunks is the total number of chunks in memory. This is a
// global counter, also used internally, so not implemented as
// metrics. Collected in MemorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
NumMemChunks int64
// NumMemChunksDesc is the metric descriptor for the above.
NumMemChunksDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "memory_chunks"),
"The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).",
nil, nil,
)
)

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import (
"encoding/binary"
@ -195,11 +195,11 @@ const (
varbitFirstValueDeltaOffset = 38
// The following are in the "footer" and only usable if the chunk is
// still open.
varbitCountOffsetBitOffset = chunkLen - 9
varbitLastTimeDeltaOffset = chunkLen - 7
varbitLastValueDeltaOffset = chunkLen - 4
varbitLastLeadingZerosCountOffset = chunkLen - 4
varbitLastSignificantBitsCountOffset = chunkLen - 3
varbitCountOffsetBitOffset = ChunkLen - 9
varbitLastTimeDeltaOffset = ChunkLen - 7
varbitLastValueDeltaOffset = ChunkLen - 4
varbitLastLeadingZerosCountOffset = ChunkLen - 4
varbitLastSignificantBitsCountOffset = ChunkLen - 3
varbitFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here.
varbitSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here.
@ -240,18 +240,18 @@ var varbitWorstCaseBitsPerSample = map[varbitValueEncoding]int{
type varbitChunk []byte
// newVarbitChunk returns a newly allocated varbitChunk. For simplicity, all
// varbit chunks must have the length as determined by the chunkLen constant.
// varbit chunks must have the length as determined by the ChunkLen constant.
func newVarbitChunk(enc varbitValueEncoding) *varbitChunk {
if chunkLen < varbitMinLength || chunkLen > varbitMaxLength {
if ChunkLen < varbitMinLength || ChunkLen > varbitMaxLength {
panic(fmt.Errorf(
"invalid chunk length of %d bytes, need at least %d bytes and at most %d bytes",
chunkLen, varbitMinLength, varbitMaxLength,
ChunkLen, varbitMinLength, varbitMaxLength,
))
}
if enc > varbitDirectEncoding {
panic(fmt.Errorf("unknown varbit value encoding: %v", enc))
}
c := make(varbitChunk, chunkLen)
c := make(varbitChunk, ChunkLen)
c.setValueEncoding(enc)
return &c
}
@ -280,7 +280,7 @@ func (c varbitChunk) Clone() Chunk {
}
// NewIterator implements chunk.
func (c varbitChunk) NewIterator() ChunkIterator {
func (c varbitChunk) NewIterator() Iterator {
return newVarbitChunkIterator(c)
}
@ -320,9 +320,9 @@ func (c varbitChunk) UnmarshalFromBuf(buf []byte) error {
}
// encoding implements chunk.
func (c varbitChunk) Encoding() ChunkEncoding { return Varbit }
func (c varbitChunk) Encoding() Encoding { return Varbit }
// firstTime implements chunk.
// FirstTime implements chunk.
func (c varbitChunk) FirstTime() model.Time {
return model.Time(
binary.BigEndian.Uint64(
@ -548,14 +548,14 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk
}
// Analyze worst case, does it fit? If not, set new sample as the last.
if int(offset)+varbitWorstCaseBitsPerSample[encoding] > chunkLen*8 {
if int(offset)+varbitWorstCaseBitsPerSample[encoding] > ChunkLen*8 {
return c.addLastSample(s), nil
}
// Transcoding/overflow decisions first.
if encoding == varbitZeroEncoding && s.Value != lastValue {
// Cannot go on with zero encoding.
if offset > chunkLen*4 {
if offset > ChunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s)
}
@ -567,7 +567,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk
}
if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) {
// Cannot go on with int encoding.
if offset > chunkLen*4 {
if offset > ChunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s)
}
@ -903,7 +903,7 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator {
}
}
// lastTimestamp implements chunkIterator.
// lastTimestamp implements Iterator.
func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) {
if it.len == varbitFirstSampleBitOffset {
// No samples in the chunk yet.
@ -912,7 +912,7 @@ func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) {
return it.c.lastTime(), it.lastError
}
// contains implements chunkIterator.
// contains implements Iterator.
func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) {
last, err := it.LastTimestamp()
if err != nil {
@ -923,7 +923,7 @@ func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) {
!t.After(last), it.lastError
}
// scan implements chunkIterator.
// scan implements Iterator.
func (it *varbitChunkIterator) Scan() bool {
if it.lastError != nil {
return false
@ -1002,7 +1002,7 @@ func (it *varbitChunkIterator) Scan() bool {
return it.lastError == nil
}
// findAtOrBefore implements chunkIterator.
// findAtOrBefore implements Iterator.
func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool {
if it.len == 0 || t.Before(it.c.FirstTime()) {
return false
@ -1038,7 +1038,7 @@ func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool {
return it.lastError == nil
}
// findAtOrAfter implements chunkIterator.
// findAtOrAfter implements Iterator.
func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool {
if it.len == 0 || t.After(it.c.lastTime()) {
return false
@ -1061,7 +1061,7 @@ func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool {
return it.lastError == nil
}
// value implements chunkIterator.
// value implements Iterator.
func (it *varbitChunkIterator) Value() model.SamplePair {
return model.SamplePair{
Timestamp: it.t,
@ -1069,7 +1069,7 @@ func (it *varbitChunkIterator) Value() model.SamplePair {
}
}
// err implements chunkIterator.
// err implements Iterator.
func (it *varbitChunkIterator) Err() error {
return it.lastError
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import "github.com/prometheus/common/model"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package local
package chunk
import "testing"

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
)
@ -114,10 +115,10 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
)
}
s.chunkDescs = append(
make([]*ChunkDesc, 0, len(s.chunkDescs)-s.persistWatermark),
make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]...,
)
numMemChunkDescs.Sub(float64(s.persistWatermark))
chunk.NumMemChunkDescs.Sub(float64(s.persistWatermark))
s.persistWatermark = 0
s.chunkDescsOffset = 0
}
@ -290,8 +291,8 @@ func (p *persistence) sanitizeSeries(
)
s.chunkDescs = cds
s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime()
s.lastTime, err = cds[len(cds)-1].lastTime()
s.savedFirstTime = cds[0].FirstTime()
s.lastTime, err = cds[len(cds)-1].LastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
@ -314,7 +315,7 @@ func (p *persistence) sanitizeSeries(
// First, throw away the chunkDescs without chunks.
s.chunkDescs = s.chunkDescs[s.persistWatermark:]
numMemChunkDescs.Sub(float64(s.persistWatermark))
chunk.NumMemChunkDescs.Sub(float64(s.persistWatermark))
cds, err := p.loadChunkDescs(fp, 0)
if err != nil {
log.Errorf(
@ -326,10 +327,10 @@ func (p *persistence) sanitizeSeries(
}
s.persistWatermark = len(cds)
s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime()
s.savedFirstTime = cds[0].FirstTime()
s.modTime = modTime
lastTime, err := cds[len(cds)-1].lastTime()
lastTime, err := cds[len(cds)-1].LastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
@ -340,7 +341,7 @@ func (p *persistence) sanitizeSeries(
}
keepIdx := -1
for i, cd := range s.chunkDescs {
if cd.firstTime() >= lastTime {
if cd.FirstTime() >= lastTime {
keepIdx = i
break
}
@ -350,8 +351,8 @@ func (p *persistence) sanitizeSeries(
"Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.",
s.metric, fp, chunksInFile,
)
numMemChunkDescs.Sub(float64(len(s.chunkDescs)))
atomic.AddInt64(&numMemChunks, int64(-len(s.chunkDescs)))
chunk.NumMemChunkDescs.Sub(float64(len(s.chunkDescs)))
atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs)))
s.chunkDescs = cds
s.headChunkClosed = true
return fp, true
@ -360,8 +361,8 @@ func (p *persistence) sanitizeSeries(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.",
s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx,
)
numMemChunkDescs.Sub(float64(keepIdx))
atomic.AddInt64(&numMemChunks, int64(-keepIdx))
chunk.NumMemChunkDescs.Sub(float64(keepIdx))
atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx))
if keepIdx == len(s.chunkDescs) {
// No chunks from series file left, head chunk is evicted, so declare it closed.
s.headChunkClosed = true

View file

@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable"
)
@ -107,7 +108,7 @@ func (hs *headsScanner) scan() bool {
firstTime int64
lastTime int64
encoding byte
ch Chunk
ch chunk.Chunk
lastTimeHead model.Time
)
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
@ -146,7 +147,7 @@ func (hs *headsScanner) scan() bool {
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
chunkDescs := make([]*ChunkDesc, numChunkDescs)
chunkDescs := make([]*chunk.Desc, numChunkDescs)
if hs.version == headsFormatLegacyVersion {
if headChunkPersisted {
persistWatermark = numChunkDescs
@ -163,11 +164,11 @@ func (hs *headsScanner) scan() bool {
if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
chunkDescs[i] = &ChunkDesc{
chunkFirstTime: model.Time(firstTime),
chunkLastTime: model.Time(lastTime),
chunkDescs[i] = &chunk.Desc{
ChunkFirstTime: model.Time(firstTime),
ChunkLastTime: model.Time(lastTime),
}
numMemChunkDescs.Inc()
chunk.NumMemChunkDescs.Inc()
} else {
// Non-persisted chunk.
// If there are non-persisted chunks at all, we consider
@ -176,24 +177,24 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false
}
if ch, hs.err = NewChunkForEncoding(ChunkEncoding(encoding)); hs.err != nil {
if ch, hs.err = chunk.NewChunkForEncoding(chunk.Encoding(encoding)); hs.err != nil {
return false
}
if hs.err = ch.Unmarshal(hs.r); hs.err != nil {
return false
}
cd := NewChunkDesc(ch, ch.FirstTime())
cd := chunk.NewDesc(ch, ch.FirstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
hs.chunksToPersistTotal++
cd.maybePopulateLastTime()
cd.MaybePopulateLastTime()
}
chunkDescs[i] = cd
}
}
if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].lastTime(); hs.err != nil {
if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].LastTime(); hs.err != nil {
return false
}

View file

@ -13,38 +13,6 @@
package local
import "github.com/prometheus/client_golang/prometheus"
// Usually, a separate file for instrumentation is frowned upon. Metrics should
// be close to where they are used. However, the metrics below are set all over
// the place, so we go for a separate instrumentation file in this case.
var (
chunkOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunk_ops_total",
Help: "The total number of chunk operations by their type.",
},
[]string{opTypeLabel},
)
chunkDescOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkdesc_ops_total",
Help: "The total number of chunk descriptor operations by their type.",
},
[]string{opTypeLabel},
)
numMemChunkDescs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_chunkdescs",
Help: "The current number of chunk descriptors in memory.",
})
)
const (
namespace = "prometheus"
subsystem = "local_storage"
@ -64,19 +32,6 @@ const (
droppedQuarantine = "quarantine_dropped"
failedQuarantine = "quarantine_failed"
// Op-types for chunkOps.
createAndPin = "create" // A chunkDesc creation with refCount=1.
persistAndUnpin = "persist"
pin = "pin" // Excluding the pin on creation.
unpin = "unpin" // Excluding the unpin on persisting.
clone = "clone"
transcode = "transcode"
drop = "drop"
// Op-types for chunkOps and chunkDescOps.
evict = "evict"
load = "load"
seriesLocationLabel = "location"
// Maintenance types for maintainSeriesDuration.
@ -89,24 +44,3 @@ const (
outOfOrderTimestamp = "timestamp_out_of_order"
duplicateSample = "multiple_values_for_timestamp"
)
func init() {
prometheus.MustRegister(chunkOps)
prometheus.MustRegister(chunkDescOps)
prometheus.MustRegister(numMemChunkDescs)
}
var (
// Global counter, also used internally, so not implemented as
// metrics. Collected in MemorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
numMemChunks int64
// Metric descriptors for the above.
numMemChunksDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "memory_chunks"),
"The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).",
nil, nil,
)
)

View file

@ -31,6 +31,7 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/flock"
@ -60,7 +61,7 @@ const (
chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9
chunkLenWithHeader = chunkLen + chunkHeaderLen
chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen
chunkMaxBatchSize = 62 // Max no. of chunks to load or write in
// one batch. Note that 62 is the largest number of chunks that fit
// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.
@ -370,7 +371,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index int, err error) {
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) {
f, err := p.openChunkFileForWriting(fp)
if err != nil {
return -1, err
@ -399,14 +400,14 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index
// incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) {
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
f, err := p.openChunkFileForReading(fp)
if err != nil {
return nil, err
}
defer f.Close()
chunks := make([]Chunk, 0, len(indexes))
chunks := make([]chunk.Chunk, 0, len(indexes))
buf := p.bufPool.Get().([]byte)
defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
@ -436,7 +437,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err
}
for c := 0; c < batchSize; c++ {
chunk, err := NewChunkForEncoding(ChunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
chunk, err := chunk.NewChunkForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil {
return nil, err
}
@ -446,16 +447,16 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
chunks = append(chunks, chunk)
}
}
chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
atomic.AddInt64(&numMemChunks, int64(len(chunks)))
chunk.ChunkOps.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
return chunks, nil
}
// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is
// the number of chunkDescs to skip from the end of the series file. It is the
// loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is
// the number of chunk.Descs to skip from the end of the series file. It is the
// caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) {
func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
return nil, nil
@ -478,7 +479,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
}
numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
cds := make([]*ChunkDesc, numChunks)
cds := make([]*chunk.Desc, numChunks)
chunkTimesBuf := make([]byte, 16)
for i := 0; i < numChunks; i++ {
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
@ -490,13 +491,13 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
if err != nil {
return nil, err
}
cds[i] = &ChunkDesc{
chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
chunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
cds[i] = &chunk.Desc{
ChunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
}
}
chunkDescOps.WithLabelValues(load).Add(float64(len(cds)))
numMemChunkDescs.Add(float64(len(cds)))
chunk.ChunkDescOps.WithLabelValues(chunk.Load).Add(float64(len(cds)))
chunk.NumMemChunkDescs.Add(float64(len(cds)))
return cds, nil
}
@ -645,10 +646,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
for i, chunkDesc := range m.series.chunkDescs {
if i < m.series.persistWatermark {
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
return
}
lt, err := chunkDesc.lastTime()
lt, err := chunkDesc.LastTime()
if err != nil {
return
}
@ -657,10 +658,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
} else {
// This is a non-persisted chunk. Fully marshal it.
if err = w.WriteByte(byte(chunkDesc.c.Encoding())); err != nil {
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
return
}
if err = chunkDesc.c.Marshal(w); err != nil {
if err = chunkDesc.C.Marshal(w); err != nil {
return
}
}
@ -751,7 +752,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) dropAndPersistChunks(
fp model.Fingerprint, beforeTime model.Time, chunks []Chunk,
fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk,
) (
firstTimeNotDropped model.Time,
offset int,
@ -879,7 +880,7 @@ func (p *persistence) dropAndPersistChunks(
firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
chunkOps.WithLabelValues(drop).Add(float64(numDropped))
chunk.ChunkOps.WithLabelValues(chunk.Drop).Add(float64(numDropped))
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
if err != nil {
return
@ -930,7 +931,7 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
if err := os.Remove(fname); err != nil {
return -1, err
}
chunkOps.WithLabelValues(drop).Add(float64(numChunks))
chunk.ChunkOps.WithLabelValues(chunk.Drop).Add(float64(numChunks))
return numChunks, nil
}
@ -1500,7 +1501,7 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) {
return fpm, highestMappedFP, nil
}
func (p *persistence) writeChunks(w io.Writer, chunks []Chunk) error {
func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
b := p.bufPool.Get().([]byte)
defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
@ -1547,7 +1548,7 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil
}
func writeChunkHeader(header []byte, c Chunk) error {
func writeChunkHeader(header []byte, c chunk.Chunk) error {
header[chunkHeaderTypeOffset] = byte(c.Encoding())
binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:],

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/testutil"
@ -38,8 +39,8 @@ var (
m5 = model.Metric{"label": "value5"}
)
func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, testutil.Closer) {
DefaultChunkEncoding = encoding
func newTestPersistence(t *testing.T, encoding chunk.Encoding) (*persistence, testutil.Closer) {
chunk.DefaultEncoding = encoding
dir := testutil.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1)
if err != nil {
@ -53,18 +54,18 @@ func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, tes
})
}
func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint][]Chunk {
func buildTestChunks(t *testing.T, encoding chunk.Encoding) map[model.Fingerprint][]chunk.Chunk {
fps := model.Fingerprints{
m1.FastFingerprint(),
m2.FastFingerprint(),
m3.FastFingerprint(),
}
fpToChunks := map[model.Fingerprint][]Chunk{}
fpToChunks := map[model.Fingerprint][]chunk.Chunk{}
for _, fp := range fps {
fpToChunks[fp] = make([]Chunk, 0, 10)
fpToChunks[fp] = make([]chunk.Chunk, 0, 10)
for i := 0; i < 10; i++ {
ch, err := NewChunkForEncoding(encoding)
ch, err := chunk.NewChunkForEncoding(encoding)
if err != nil {
t.Fatal(err)
}
@ -81,7 +82,7 @@ func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint
return fpToChunks
}
func chunksEqual(c1, c2 Chunk) bool {
func chunksEqual(c1, c2 chunk.Chunk) bool {
it1 := c1.NewIterator()
it2 := c2.NewIterator()
for it1.Scan() && it2.Scan() {
@ -92,7 +93,7 @@ func chunksEqual(c1, c2 Chunk) bool {
return it1.Err() == nil && it2.Err() == nil
}
func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) {
func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
@ -138,14 +139,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
}
for i, cd := range actualChunkDescs {
lastTime, err := cd.lastTime()
lastTime, err := cd.LastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
if cd.FirstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), lastTime,
i, cd.FirstTime(), lastTime,
)
}
@ -159,14 +160,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
}
for i, cd := range actualChunkDescs {
lastTime, err := cd.lastTime()
lastTime, err := cd.LastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
if cd.FirstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), lastTime,
i, cd.FirstTime(), lastTime,
)
}
@ -450,7 +451,7 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1)
}
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding) {
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
@ -499,7 +500,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if !reflect.DeepEqual(loadedS1.metric, m1) {
t.Errorf("want metric %v, got %v", m1, loadedS1.metric)
}
if !reflect.DeepEqual(loadedS1.head().c, s1.head().c) {
if !reflect.DeepEqual(loadedS1.head().C, s1.head().C) {
t.Error("head chunks differ")
}
if loadedS1.chunkDescsOffset != 0 {
@ -508,11 +509,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if loadedS1.headChunkClosed {
t.Error("headChunkClosed is true")
}
if loadedS1.head().chunkFirstTime != 1 {
t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime)
if loadedS1.head().ChunkFirstTime != 1 {
t.Errorf("want ChunkFirstTime in head chunk to be 1, got %d", loadedS1.head().ChunkFirstTime)
}
if loadedS1.head().chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
if loadedS1.head().ChunkLastTime != model.Earliest {
t.Error("want ChunkLastTime in head chunk to be unset")
}
} else {
t.Errorf("couldn't find %v in loaded map", m1)
@ -521,7 +522,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if !reflect.DeepEqual(loadedS3.metric, m3) {
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
}
if loadedS3.head().c != nil {
if loadedS3.head().C != nil {
t.Error("head chunk not evicted")
}
if loadedS3.chunkDescsOffset != 0 {
@ -530,11 +531,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false")
}
if loadedS3.head().chunkFirstTime != 2 {
t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime)
if loadedS3.head().ChunkFirstTime != 2 {
t.Errorf("want ChunkFirstTime in head chunk to be 2, got %d", loadedS3.head().ChunkFirstTime)
}
if loadedS3.head().chunkLastTime != 2 {
t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime)
if loadedS3.head().ChunkLastTime != 2 {
t.Errorf("want ChunkLastTime in head chunk to be 2, got %d", loadedS3.head().ChunkLastTime)
}
} else {
t.Errorf("couldn't find %v in loaded map", m3)
@ -549,10 +550,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if got, want := loadedS4.persistWatermark, 0; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want)
}
if loadedS4.chunkDescs[2].isEvicted() {
if loadedS4.chunkDescs[2].IsEvicted() {
t.Error("3rd chunk evicted")
}
if loadedS4.chunkDescs[3].isEvicted() {
if loadedS4.chunkDescs[3].IsEvicted() {
t.Error("4th chunk evicted")
}
if loadedS4.chunkDescsOffset != 0 {
@ -562,27 +563,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
t.Error("headChunkClosed is true")
}
for i, cd := range loadedS4.chunkDescs {
if cd.chunkFirstTime != cd.c.FirstTime() {
if cd.ChunkFirstTime != cd.C.FirstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.FirstTime(), cd.chunkFirstTime,
"ChunkDesc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.C.FirstTime(), cd.ChunkFirstTime,
)
}
if i == len(loadedS4.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
if cd.ChunkLastTime != model.Earliest {
t.Error("want ChunkLastTime in head chunk to be unset")
}
continue
}
lastTime, err := cd.c.NewIterator().LastTimestamp()
lastTime, err := cd.C.NewIterator().LastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
if cd.ChunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, lastTime, cd.chunkLastTime,
"ChunkDesc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d",
i, lastTime, cd.ChunkLastTime,
)
}
}
@ -599,10 +600,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if got, want := loadedS5.persistWatermark, 3; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want)
}
if !loadedS5.chunkDescs[2].isEvicted() {
if !loadedS5.chunkDescs[2].IsEvicted() {
t.Error("3rd chunk not evicted")
}
if loadedS5.chunkDescs[3].isEvicted() {
if loadedS5.chunkDescs[3].IsEvicted() {
t.Error("4th chunk evicted")
}
if loadedS5.chunkDescsOffset != 0 {
@ -614,32 +615,32 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
for i, cd := range loadedS5.chunkDescs {
if i < 3 {
// Evicted chunks.
if cd.chunkFirstTime == model.Earliest {
t.Errorf("chunkDesc[%d]: chunkLastTime not set", i)
if cd.ChunkFirstTime == model.Earliest {
t.Errorf("ChunkDesc[%d]: ChunkLastTime not set", i)
}
continue
}
if cd.chunkFirstTime != cd.c.FirstTime() {
if cd.ChunkFirstTime != cd.C.FirstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.FirstTime(), cd.chunkFirstTime,
"ChunkDesc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.C.FirstTime(), cd.ChunkFirstTime,
)
}
if i == len(loadedS5.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
if cd.ChunkLastTime != model.Earliest {
t.Error("want ChunkLastTime in head chunk to be unset")
}
continue
}
lastTime, err := cd.c.NewIterator().LastTimestamp()
lastTime, err := cd.C.NewIterator().LastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
if cd.ChunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.chunkLastTime, lastTime,
"ChunkDesc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d",
i, cd.ChunkLastTime, lastTime,
)
}
}
@ -690,7 +691,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) {
}
}
func testFingerprintsModifiedBefore(t *testing.T, encoding ChunkEncoding) {
func testFingerprintsModifiedBefore(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
@ -769,7 +770,7 @@ func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) {
testFingerprintsModifiedBefore(t, 2)
}
func testDropArchivedMetric(t *testing.T, encoding ChunkEncoding) {
func testDropArchivedMetric(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
@ -843,7 +844,7 @@ type incrementalBatch struct {
expectedLpToFps index.LabelPairFingerprintsMapping
}
func testIndexing(t *testing.T, encoding ChunkEncoding) {
func testIndexing(t *testing.T, encoding chunk.Encoding) {
batches := []incrementalBatch{
{
fpToMetric: index.FingerprintMetricMapping{

View file

@ -20,13 +20,14 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric"
)
const (
// chunkDescEvictionFactor is a factor used for chunkDesc eviction (as opposed
// chunkDescEvictionFactor is a factor used for ChunkDesc eviction (as opposed
// to evictions of chunks, see method evictOlderThan. A chunk takes about 20x
// more memory than a chunkDesc. With a chunkDescEvictionFactor of 10, not more
// more memory than a ChunkDesc. With a chunkDescEvictionFactor of 10, not more
// than a third of the total memory taken by a series will be used for
// chunkDescs.
chunkDescEvictionFactor = 10
@ -140,8 +141,8 @@ func (sm *seriesMap) fpIter() <-chan model.Fingerprint {
type memorySeries struct {
metric model.Metric
// Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs []*ChunkDesc
// The index (within chunkDescs above) of the first chunkDesc that
chunkDescs []*chunk.Desc
// The index (within chunkDescs above) of the first ChunkDesc that
// points to a non-persisted chunk. If all chunks are persisted, then
// persistWatermark == len(chunkDescs).
persistWatermark int
@ -151,7 +152,7 @@ type memorySeries struct {
// The chunkDescs in memory might not have all the chunkDescs for the
// chunks that are persisted to disk. The missing chunkDescs are all
// contiguous and at the tail end. chunkDescsOffset is the index of the
// chunk on disk that corresponds to the first chunkDesc in memory. If
// chunk on disk that corresponds to the first ChunkDesc in memory. If
// it is 0, the chunkDescs are all loaded. A value of -1 denotes a
// special case: There are chunks on disk, but the offset to the
// chunkDescs in memory is unknown. Also, in this special case, there is
@ -160,7 +161,7 @@ type memorySeries struct {
// set).
chunkDescsOffset int
// The savedFirstTime field is used as a fallback when the
// chunkDescsOffset is not 0. It can be used to save the firstTime of the
// chunkDescsOffset is not 0. It can be used to save the FirstTime of the
// first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp.
savedFirstTime model.Time
@ -193,13 +194,13 @@ type memorySeries struct {
// set to model.Earliest. The zero value for modTime can be used if the
// modification time of the series file is unknown (e.g. if this is a genuinely
// new series).
func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time) (*memorySeries, error) {
func newMemorySeries(m model.Metric, chunkDescs []*chunk.Desc, modTime time.Time) (*memorySeries, error) {
var err error
firstTime := model.Earliest
lastTime := model.Earliest
if len(chunkDescs) > 0 {
firstTime = chunkDescs[0].firstTime()
if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil {
firstTime = chunkDescs[0].FirstTime()
if lastTime, err = chunkDescs[len(chunkDescs)-1].LastTime(); err != nil {
return nil, err
}
}
@ -220,10 +221,10 @@ func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time)
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) Add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := NewChunkDesc(NewChunk(), v.Timestamp)
newHead := chunk.NewDesc(chunk.NewChunk(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkClosed = false
} else if s.headChunkUsedByIterator && s.head().refCount() > 1 {
} else if s.headChunkUsedByIterator && s.head().RefCount() > 1 {
// We only need to clone the head chunk if the current head
// chunk was used in an iterator at all and if the refCount is
// still greater than the 1 we always have because the head
@ -233,10 +234,10 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) {
// around and keep the head chunk pinned. We needed to track
// pins by version of the head chunk, which is probably not
// worth the effort.
chunkOps.WithLabelValues(clone).Inc()
chunk.ChunkOps.WithLabelValues(chunk.Clone).Inc()
// No locking needed here because a non-persisted head chunk can
// not get evicted concurrently.
s.head().c = s.head().c.Clone()
s.head().C = s.head().C.Clone()
s.headChunkUsedByIterator = false
}
@ -244,15 +245,15 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) {
if err != nil {
return 0, err
}
s.head().c = chunks[0]
s.head().C = chunks[0]
for _, c := range chunks[1:] {
s.chunkDescs = append(s.chunkDescs, NewChunkDesc(c, c.FirstTime()))
s.chunkDescs = append(s.chunkDescs, chunk.NewDesc(c, c.FirstTime()))
}
// Populate lastTime of now-closed chunks.
for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] {
cd.maybePopulateLastTime()
cd.MaybePopulateLastTime()
}
s.lastTime = v.Timestamp
@ -275,7 +276,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
s.headChunkUsedByIterator = false
s.head().maybePopulateLastTime()
s.head().MaybePopulateLastTime()
return true
}
return false
@ -291,10 +292,10 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted
s.persistWatermark -= lenEvicted
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted))
numMemChunkDescs.Sub(float64(lenEvicted))
chunk.ChunkDescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted))
chunk.NumMemChunkDescs.Sub(float64(lenEvicted))
s.chunkDescs = append(
make([]*ChunkDesc, 0, lenToKeep),
make([]*chunk.Desc, 0, lenToKeep),
s.chunkDescs[lenEvicted:]...,
)
s.dirty = true
@ -306,7 +307,7 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
func (s *memorySeries) dropChunks(t model.Time) error {
keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs {
lt, err := cd.lastTime()
lt, err := cd.LastTime()
if err != nil {
return err
}
@ -324,7 +325,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
return nil
}
s.chunkDescs = append(
make([]*ChunkDesc, 0, len(s.chunkDescs)-keepIdx),
make([]*chunk.Desc, 0, len(s.chunkDescs)-keepIdx),
s.chunkDescs[keepIdx:]...,
)
s.persistWatermark -= keepIdx
@ -334,7 +335,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
if s.chunkDescsOffset != -1 {
s.chunkDescsOffset += keepIdx
}
numMemChunkDescs.Sub(float64(keepIdx))
chunk.NumMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true
return nil
}
@ -344,16 +345,16 @@ func (s *memorySeries) preloadChunks(
indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage,
) (SeriesIterator, error) {
loadIndexes := []int{}
pinnedChunkDescs := make([]*ChunkDesc, 0, len(indexes))
pinnedChunkDescs := make([]*chunk.Desc, 0, len(indexes))
for _, idx := range indexes {
cd := s.chunkDescs[idx]
pinnedChunkDescs = append(pinnedChunkDescs, cd)
cd.pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading.
if cd.isEvicted() {
cd.Pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading.
if cd.IsEvicted() {
loadIndexes = append(loadIndexes, idx)
}
}
chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs)))
chunk.ChunkOps.WithLabelValues(chunk.Pin).Add(float64(len(pinnedChunkDescs)))
if len(loadIndexes) > 0 {
if s.chunkDescsOffset == -1 {
@ -363,13 +364,13 @@ func (s *memorySeries) preloadChunks(
if err != nil {
// Unpin the chunks since we won't return them as pinned chunks now.
for _, cd := range pinnedChunkDescs {
cd.unpin(mss.evictRequests)
cd.Unpin(mss.evictRequests)
}
chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs)))
chunk.ChunkOps.WithLabelValues(chunk.Unpin).Add(float64(len(pinnedChunkDescs)))
return nopIter, err
}
for i, c := range chunks {
s.chunkDescs[loadIndexes[i]].setChunk(c)
s.chunkDescs[loadIndexes[i]].SetChunk(c)
}
}
@ -394,19 +395,19 @@ func (s *memorySeries) preloadChunks(
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) NewIterator(
pinnedChunkDescs []*ChunkDesc,
pinnedChunkDescs []*chunk.Desc,
quarantine func(error),
evictRequests chan<- evictRequest,
evictRequests chan<- chunk.EvictRequest,
) SeriesIterator {
chunks := make([]Chunk, 0, len(pinnedChunkDescs))
chunks := make([]chunk.Chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the
// series FP is locked and the chunk is pinned.
chunks = append(chunks, cd.c)
chunks = append(chunks, cd.C)
}
return &memorySeriesIterator{
chunks: chunks,
chunkIts: make([]ChunkIterator, len(chunks)),
chunkIts: make([]chunk.Iterator, len(chunks)),
quarantine: quarantine,
metric: s.metric,
pinnedChunkDescs: pinnedChunkDescs,
@ -453,7 +454,7 @@ func (s *memorySeries) preloadChunksForRange(
) (SeriesIterator, error) {
firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime()
firstChunkDescTime = s.chunkDescs[0].FirstTime()
}
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
cds, err := mss.loadChunkDescs(fp, s.persistWatermark)
@ -463,7 +464,7 @@ func (s *memorySeries) preloadChunksForRange(
s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsOffset = 0
s.persistWatermark += len(cds)
firstChunkDescTime = s.chunkDescs[0].firstTime()
firstChunkDescTime = s.chunkDescs[0].FirstTime()
}
if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) {
@ -472,16 +473,16 @@ func (s *memorySeries) preloadChunksForRange(
// Find first chunk with start time after "from".
fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
return s.chunkDescs[i].firstTime().After(from)
return s.chunkDescs[i].FirstTime().After(from)
})
// Find first chunk with start time after "through".
throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
return s.chunkDescs[i].firstTime().After(through)
return s.chunkDescs[i].FirstTime().After(through)
})
if fromIdx == len(s.chunkDescs) {
// Even the last chunk starts before "from". Find out if the
// series ends before "from" and we don't need to do anything.
lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime()
lt, err := s.chunkDescs[len(s.chunkDescs)-1].LastTime()
if err != nil {
return nopIter, err
}
@ -506,7 +507,7 @@ func (s *memorySeries) preloadChunksForRange(
// head returns a pointer to the head chunk descriptor. The caller must have
// locked the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors.
func (s *memorySeries) head() *ChunkDesc {
func (s *memorySeries) head() *chunk.Desc {
return s.chunkDescs[len(s.chunkDescs)-1]
}
@ -515,7 +516,7 @@ func (s *memorySeries) head() *ChunkDesc {
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) FirstTime() model.Time {
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
return s.chunkDescs[0].firstTime()
return s.chunkDescs[0].FirstTime()
}
return s.savedFirstTime
}
@ -543,7 +544,7 @@ func (s *memorySeries) lastSamplePair() model.SamplePair {
// accordingly.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) chunksToPersist() []*ChunkDesc {
func (s *memorySeries) chunksToPersist() []*chunk.Desc {
newWatermark := len(s.chunkDescs)
if !s.headChunkClosed {
newWatermark--
@ -559,20 +560,20 @@ func (s *memorySeries) chunksToPersist() []*ChunkDesc {
// memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct {
// Last chunkIterator used by ValueAtOrBeforeTime.
chunkIt ChunkIterator
// Last chunk.Iterator used by ValueAtOrBeforeTime.
chunkIt chunk.Iterator
// Caches chunkIterators.
chunkIts []ChunkIterator
chunkIts []chunk.Iterator
// The actual sample chunks.
chunks []Chunk
chunks []chunk.Chunk
// Call to quarantine the series this iterator belongs to.
quarantine func(error)
// The metric corresponding to the iterator.
metric model.Metric
// Chunks that were pinned for this iterator.
pinnedChunkDescs []*ChunkDesc
pinnedChunkDescs []*chunk.Desc
// Where to send evict requests when unpinning pinned chunks.
evictRequests chan<- evictRequest
evictRequests chan<- chunk.EvictRequest
}
// ValueAtOrBeforeTime implements SeriesIterator.
@ -642,7 +643,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
if c.FirstTime().After(in.NewestInclusive) {
break
}
chValues, err := rangeValues(it.chunkIterator(i+j), in)
chValues, err := chunk.RangeValues(it.chunkIterator(i+j), in)
if err != nil {
it.quarantine(err)
return nil
@ -656,9 +657,9 @@ func (it *memorySeriesIterator) Metric() metric.Metric {
return metric.Metric{Metric: it.metric}
}
// chunkIterator returns the chunkIterator for the chunk at position i (and
// chunkIterator returns the chunk.Iterator for the chunk at position i (and
// creates it if needed).
func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator {
func (it *memorySeriesIterator) chunkIterator(i int) chunk.Iterator {
chunkIt := it.chunkIts[i]
if chunkIt == nil {
chunkIt = it.chunks[i].NewIterator()
@ -669,9 +670,9 @@ func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator {
func (it *memorySeriesIterator) Close() {
for _, cd := range it.pinnedChunkDescs {
cd.unpin(it.evictRequests)
cd.Unpin(it.evictRequests)
}
chunkOps.WithLabelValues(unpin).Add(float64(len(it.pinnedChunkDescs)))
chunk.ChunkOps.WithLabelValues(chunk.Unpin).Add(float64(len(it.pinnedChunkDescs)))
}
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut

View file

@ -29,13 +29,13 @@ import (
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric"
)
const (
evictRequestsCap = 1024
quarantineRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP.
fpMaxSweepTime = 6 * time.Hour
@ -89,11 +89,6 @@ var (
)
)
type evictRequest struct {
cd *ChunkDesc
evict bool
}
type quarantineRequest struct {
fp model.Fingerprint
metric model.Metric
@ -171,7 +166,7 @@ type MemorySeriesStorage struct {
mapper *fpMapper
evictList *list.List
evictRequests chan evictRequest
evictRequests chan chunk.EvictRequest
evictStopping, evictStopped chan struct{}
quarantineRequests chan quarantineRequest
@ -226,7 +221,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
maxChunksToPersist: o.MaxChunksToPersist,
evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap),
evictRequests: make(chan chunk.EvictRequest, evictRequestsCap),
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
@ -779,7 +774,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
// NeedsThrottling implements Storage.
func (s *MemorySeriesStorage) NeedsThrottling() bool {
if s.getNumChunksToPersist() > s.maxChunksToPersist ||
float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks {
float64(atomic.LoadInt64(&chunk.NumMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks {
select {
case s.throttled <- struct{}{}:
default: // Do nothing, signal already pending.
@ -813,7 +808,7 @@ func (s *MemorySeriesStorage) logThrottling() {
log.
With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&numMemChunks)).
With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.")
}
@ -821,7 +816,7 @@ func (s *MemorySeriesStorage) logThrottling() {
log.
With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&numMemChunks)).
With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Info("Storage does not need throttling anymore.")
case <-s.loopStopping:
@ -833,7 +828,7 @@ func (s *MemorySeriesStorage) logThrottling() {
func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp)
if !ok {
var cds []*ChunkDesc
var cds []*chunk.Desc
var modTime time.Time
unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil {
@ -842,9 +837,9 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
}
if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc()
// We have to load chunkDescs anyway to do anything with
// We have to load chunk.Descs anyway to do anything with
// the series, so let's do it right now so that we don't
// end up with a series without any chunkDescs for a
// end up with a series without any chunk.Descs for a
// while (which is confusing as it makes the series
// appear as archived or purged).
cds, err = s.loadChunkDescs(fp, 0)
@ -936,17 +931,17 @@ func (s *MemorySeriesStorage) handleEvictList() {
// evict run is more than maxMemoryChunks/1000.
select {
case req := <-s.evictRequests:
if req.evict {
req.cd.evictListElement = s.evictList.PushBack(req.cd)
if req.Evict {
req.CD.EvictListElement = s.evictList.PushBack(req.CD)
count++
if count > s.maxMemoryChunks/1000 {
s.maybeEvict()
count = 0
}
} else {
if req.cd.evictListElement != nil {
s.evictList.Remove(req.cd.evictListElement)
req.cd.evictListElement = nil
if req.CD.EvictListElement != nil {
s.evictList.Remove(req.CD.EvictListElement)
req.CD.EvictListElement = nil
}
}
case <-ticker.C:
@ -971,39 +966,39 @@ func (s *MemorySeriesStorage) handleEvictList() {
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func (s *MemorySeriesStorage) maybeEvict() {
numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks
numChunksToEvict := int(atomic.LoadInt64(&chunk.NumMemChunks)) - s.maxMemoryChunks
if numChunksToEvict <= 0 {
return
}
chunkDescsToEvict := make([]*ChunkDesc, numChunksToEvict)
chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict)
for i := range chunkDescsToEvict {
e := s.evictList.Front()
if e == nil {
break
}
cd := e.Value.(*ChunkDesc)
cd.evictListElement = nil
cd := e.Value.(*chunk.Desc)
cd.EvictListElement = nil
chunkDescsToEvict[i] = cd
s.evictList.Remove(e)
}
// Do the actual eviction in a goroutine as we might otherwise deadlock,
// in the following way: A chunk was unpinned completely and therefore
// in the following way: A chunk was Unpinned completely and therefore
// scheduled for eviction. At the time we actually try to evict it,
// another goroutine is pinning the chunk. The pinning goroutine has
// currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the
// goroutine that is supposed to empty the channel is waiting for the
// chunkDesc lock to try to evict the chunk.
// Chunk.Desc lock to try to evict the chunk.
go func() {
for _, cd := range chunkDescsToEvict {
if cd == nil {
break
}
cd.maybeEvict()
cd.MaybeEvict()
// We don't care if the eviction succeeds. If the chunk
// was pinned in the meantime, it will be added to the
// evict list once it gets unpinned again.
// evict list once it gets Unpinned again.
}
}()
}
@ -1224,7 +1219,7 @@ loop:
// Next, the method checks if all chunks in the series are evicted. In that
// case, it archives the series and returns true.
//
// Finally, it evicts chunkDescs if there are too many.
// Finally, it evicts chunk.Descs if there are too many.
func (s *MemorySeriesStorage) maintainMemorySeries(
fp model.Fingerprint, beforeTime model.Time,
) (becameDirty bool) {
@ -1258,7 +1253,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
iOldestNotEvicted := -1
for i, cd := range series.chunkDescs {
if !cd.isEvicted() {
if !cd.IsEvicted() {
iOldestNotEvicted = i
break
}
@ -1282,7 +1277,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
}
return
}
// If we are here, the series is not archived, so check for chunkDesc
// If we are here, the series is not archived, so check for Chunk.Desc
// eviction next.
series.evictChunkDescs(iOldestNotEvicted)
@ -1316,18 +1311,18 @@ func (s *MemorySeriesStorage) writeMemorySeries(
// that belong to a series that is scheduled for quarantine
// anyway.
for _, cd := range cds {
cd.unpin(s.evictRequests)
cd.Unpin(s.evictRequests)
}
s.incNumChunksToPersist(-len(cds))
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
chunk.ChunkOps.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds)))
series.modTime = s.persistence.seriesFileModTime(fp)
}()
// Get the actual chunks from underneath the chunkDescs.
// Get the actual chunks from underneath the chunk.Descs.
// No lock required as chunks still to persist cannot be evicted.
chunks := make([]Chunk, len(cds))
chunks := make([]chunk.Chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.c
chunks[i] = cd.C
}
if !series.FirstTime().Before(beforeTime) {
@ -1413,12 +1408,12 @@ func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
}
// See persistence.loadChunks for detailed explanation.
func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) {
func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset)
}
// See persistence.loadChunkDescs for detailed explanation.
func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) {
func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
return s.persistence.loadChunkDescs(fp, offsetFromEnd)
}
@ -1468,7 +1463,7 @@ func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
var (
chunksToPersist = float64(s.getNumChunksToPersist())
maxChunksToPersist = float64(s.maxChunksToPersist)
memChunks = float64(atomic.LoadInt64(&numMemChunks))
memChunks = float64(atomic.LoadInt64(&chunk.NumMemChunks))
maxMemChunks = float64(s.maxMemoryChunks)
)
score := chunksToPersist / maxChunksToPersist
@ -1578,12 +1573,12 @@ func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
s.numSeries.Dec()
m = series.metric
// Adjust s.numChunksToPersist and numMemChunks down by
// Adjust s.numChunksToPersist and chunk.NumMemChunks down by
// the number of chunks in this series that are not
// persisted yet. Persisted chunks will be deducted from
// numMemChunks upon eviction.
// chunk.NumMemChunks upon eviction.
numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark
atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted))
atomic.AddInt64(&chunk.NumMemChunks, int64(-numChunksNotYetPersisted))
if !series.headChunkClosed {
// Head chunk wasn't counted as waiting for persistence yet.
// (But it was counted as a chunk in memory.)
@ -1641,7 +1636,7 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s.ingestedSamplesCount.Desc()
s.discardedSamplesCount.Describe(ch)
ch <- s.nonExistentSeriesMatchesCount.Desc()
ch <- numMemChunksDesc
ch <- chunk.NumMemChunksDesc
s.maintainSeriesDuration.Describe(ch)
ch <- s.persistenceUrgencyScore.Desc()
ch <- s.rushedMode.Desc()
@ -1669,9 +1664,9 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.discardedSamplesCount.Collect(ch)
ch <- s.nonExistentSeriesMatchesCount
ch <- prometheus.MustNewConstMetric(
numMemChunksDesc,
chunk.NumMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)),
float64(atomic.LoadInt64(&chunk.NumMemChunks)),
)
s.maintainSeriesDuration.Collect(ch)
ch <- s.persistenceUrgencyScore

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/testutil"
)
@ -785,7 +786,7 @@ func TestLoop(t *testing.T) {
}
}
func testChunk(t *testing.T, encoding ChunkEncoding) {
func testChunk(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 500000)
for i := range samples {
samples[i] = &model.Sample{
@ -806,10 +807,10 @@ func testChunk(t *testing.T, encoding ChunkEncoding) {
defer s.fpLocker.Unlock(m.fp) // TODO remove, see below
var values []model.SamplePair
for _, cd := range m.series.chunkDescs {
if cd.isEvicted() {
if cd.IsEvicted() {
continue
}
it := cd.c.NewIterator()
it := cd.C.NewIterator()
for it.Scan() {
values = append(values, it.Value())
}
@ -843,7 +844,7 @@ func TestChunkType2(t *testing.T) {
testChunk(t, 2)
}
func testValueAtOrBeforeTime(t *testing.T, encoding ChunkEncoding) {
func testValueAtOrBeforeTime(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000)
for i := range samples {
samples[i] = &model.Sample{
@ -921,7 +922,7 @@ func TestValueAtTimeChunkType2(t *testing.T) {
testValueAtOrBeforeTime(t, 2)
}
func benchmarkValueAtOrBeforeTime(b *testing.B, encoding ChunkEncoding) {
func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunk.Encoding) {
samples := make(model.Samples, 10000)
for i := range samples {
samples[i] = &model.Sample{
@ -1003,7 +1004,7 @@ func BenchmarkValueAtTimeChunkType2(b *testing.B) {
benchmarkValueAtOrBeforeTime(b, 2)
}
func testRangeValues(t *testing.T, encoding ChunkEncoding) {
func testRangeValues(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000)
for i := range samples {
samples[i] = &model.Sample{
@ -1159,7 +1160,7 @@ func TestRangeValuesChunkType2(t *testing.T) {
testRangeValues(t, 2)
}
func benchmarkRangeValues(b *testing.B, encoding ChunkEncoding) {
func benchmarkRangeValues(b *testing.B, encoding chunk.Encoding) {
samples := make(model.Samples, 10000)
for i := range samples {
samples[i] = &model.Sample{
@ -1207,7 +1208,7 @@ func BenchmarkRangeValuesChunkType2(b *testing.B) {
benchmarkRangeValues(b, 2)
}
func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) {
func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000)
for i := range samples {
samples[i] = &model.Sample{
@ -1271,7 +1272,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) {
// Archive metrics.
s.fpToSeries.del(fp)
lastTime, err := series.head().lastTime()
lastTime, err := series.head().LastTime()
if err != nil {
t.Fatal(err)
}
@ -1312,7 +1313,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) {
// Archive metrics.
s.fpToSeries.del(fp)
lastTime, err = series.head().lastTime()
lastTime, err = series.head().LastTime()
if err != nil {
t.Fatal(err)
}
@ -1362,7 +1363,7 @@ func TestEvictAndPurgeSeriesChunkType2(t *testing.T) {
testEvictAndPurgeSeries(t, 2)
}
func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) {
func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000)
for i := range samples {
samples[i] = &model.Sample{
@ -1401,7 +1402,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) {
s.maintainMemorySeries(fp, 0)
// Give the evict goroutine an opportunity to run.
time.Sleep(250 * time.Millisecond)
// Maintain series again to trigger chunkDesc eviction
// Maintain series again to trigger ChunkDesc eviction
s.maintainMemorySeries(fp, 0)
if oldLen <= len(series.chunkDescs) {
@ -1421,7 +1422,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) {
s.maintainMemorySeries(fp, 100000)
if len(series.chunkDescs) != 1 {
t.Errorf("Expected exactly one chunkDesc left, got %d.", len(series.chunkDescs))
t.Errorf("Expected exactly one ChunkDesc left, got %d.", len(series.chunkDescs))
}
}
@ -1433,7 +1434,7 @@ func TestEvictAndLoadChunkDescsType1(t *testing.T) {
testEvictAndLoadChunkDescs(t, 1)
}
func benchmarkAppend(b *testing.B, encoding ChunkEncoding) {
func benchmarkAppend(b *testing.B, encoding chunk.Encoding) {
samples := make(model.Samples, b.N)
for i := range samples {
samples[i] = &model.Sample{
@ -1469,7 +1470,7 @@ func BenchmarkAppendType2(b *testing.B) {
// Append a large number of random samples and then check if we can get them out
// of the storage alright.
func testFuzz(t *testing.T, encoding ChunkEncoding) {
func testFuzz(t *testing.T, encoding chunk.Encoding) {
if testing.Short() {
t.Skip("Skipping test in short mode.")
}
@ -1517,8 +1518,8 @@ func TestFuzzChunkType2(t *testing.T) {
// make things even slower):
//
// go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType
func benchmarkFuzz(b *testing.B, encoding ChunkEncoding) {
DefaultChunkEncoding = encoding
func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) {
chunk.DefaultEncoding = encoding
const samplesPerRun = 100000
rand.Seed(42)
directory := testutil.NewTemporaryDirectory("test_storage", b)

View file

@ -22,6 +22,7 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/util/testutil"
)
@ -40,8 +41,8 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t testutil.T, encoding ChunkEncoding) (*MemorySeriesStorage, testutil.Closer) {
DefaultChunkEncoding = encoding
func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage, testutil.Closer) {
chunk.DefaultEncoding = encoding
directory := testutil.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000,