Merge pull request #2032 from prometheus/separate-chunk-package

storage: separate chunk package, publish more names
This commit is contained in:
Julius Volz 2016-10-02 20:48:08 +02:00 committed by GitHub
commit c0889fd92e
22 changed files with 533 additions and 487 deletions

View file

@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
@ -149,7 +150,7 @@ func init() {
"If set, a crash recovery will perform checks on each series file. This might take a very long time.", "If set, a crash recovery will perform checks on each series file. This might take a very long time.",
) )
cfg.fs.Var( cfg.fs.Var(
&local.DefaultChunkEncoding, "storage.local.chunk-encoding-version", &chunk.DefaultEncoding, "storage.local.chunk-encoding-version",
"Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding), 1 (double-delta encoding), and 2 (double-delta encoding with variable bit-width).", "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding), 1 (double-delta encoding), and 2 (double-delta encoding with variable bit-width).",
) )
// Index cache sizes. // Index cache sizes.

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import ( import (
"container/list" "container/list"
@ -27,28 +27,37 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
// DefaultChunkEncoding can be changed via a flag. // ChunkLen is the length of a chunk in bytes.
var DefaultChunkEncoding = DoubleDelta const ChunkLen = 1024
// DefaultEncoding can be changed via a flag.
var DefaultEncoding = DoubleDelta
var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")
// ChunkEncoding defintes which encoding we are using, delta, doubledelta, or varbit // EvictRequest is a request to evict a chunk from memory.
type ChunkEncoding byte type EvictRequest struct {
Desc *Desc
Evict bool
}
// Encoding defines which encoding we are using, delta, doubledelta, or varbit
type Encoding byte
// String implements flag.Value. // String implements flag.Value.
func (ce ChunkEncoding) String() string { func (e Encoding) String() string {
return fmt.Sprintf("%d", ce) return fmt.Sprintf("%d", e)
} }
// Set implements flag.Value. // Set implements flag.Value.
func (ce *ChunkEncoding) Set(s string) error { func (e *Encoding) Set(s string) error {
switch s { switch s {
case "0": case "0":
*ce = Delta *e = Delta
case "1": case "1":
*ce = DoubleDelta *e = DoubleDelta
case "2": case "2":
*ce = Varbit *e = Varbit
default: default:
return fmt.Errorf("invalid chunk encoding: %s", s) return fmt.Errorf("invalid chunk encoding: %s", s)
} }
@ -57,194 +66,192 @@ func (ce *ChunkEncoding) Set(s string) error {
const ( const (
// Delta encoding // Delta encoding
Delta ChunkEncoding = iota Delta Encoding = iota
// DoubleDelta encoding // DoubleDelta encoding
DoubleDelta DoubleDelta
// Varbit encoding // Varbit encoding
Varbit Varbit
) )
// ChunkDesc contains meta-data for a chunk. Pay special attention to the // Desc contains meta-data for a chunk. Pay special attention to the
// documented requirements for calling its methods concurrently (WRT pinning and // documented requirements for calling its methods concurrently (WRT pinning and
// locking). The doc comments spell out the requirements for each method, but // locking). The doc comments spell out the requirements for each method, but
// here is an overview and general explanation: // here is an overview and general explanation:
// //
// Everything that changes the pinning of the underlying chunk or deals with its // Everything that changes the pinning of the underlying chunk or deals with its
// eviction is protected by a mutex. This affects the following methods: pin, // eviction is protected by a mutex. This affects the following methods: Pin,
// unpin, refCount, isEvicted, maybeEvict. These methods can be called at any // Unpin, RefCount, IsEvicted, MaybeEvict. These methods can be called at any
// time without further prerequisites. // time without further prerequisites.
// //
// Another group of methods acts on (or sets) the underlying chunk. These // Another group of methods acts on (or sets) the underlying chunk. These
// methods involve no locking. They may only be called if the caller has pinned // methods involve no locking. They may only be called if the caller has pinned
// the chunk (to guarantee the chunk is not evicted concurrently). Also, the // the chunk (to guarantee the chunk is not evicted concurrently). Also, the
// caller must make sure nobody else will call these methods concurrently, // caller must make sure nobody else will call these methods concurrently,
// either by holding the sole reference to the chunkDesc (usually during loading // either by holding the sole reference to the Desc (usually during loading
// or creation) or by locking the fingerprint of the series the chunkDesc // or creation) or by locking the fingerprint of the series the Desc
// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk. // belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk.
// //
// Finally, there are the special cases firstTime and lastTime. lastTime requires // Finally, there are the special cases FirstTime and LastTime. LastTime requires
// to have locked the fingerprint of the series but the chunk does not need to // to have locked the fingerprint of the series but the chunk does not need to
// be pinned. That's because the chunkLastTime field in chunkDesc gets populated // be pinned. That's because the ChunkLastTime field in Desc gets populated
// upon completion of the chunk (when it is still pinned, and which happens // upon completion of the chunk (when it is still pinned, and which happens
// while the series's fingerprint is locked). Once that has happened, calling // while the series's fingerprint is locked). Once that has happened, calling
// lastTime does not require the chunk to be loaded anymore. Before that has // LastTime does not require the chunk to be loaded anymore. Before that has
// happened, the chunk is pinned anyway. The chunkFirstTime field in chunkDesc // happened, the chunk is pinned anyway. The ChunkFirstTime field in Desc
// is populated upon creation of a chunkDesc, so it is alway safe to call // is populated upon creation of a Desc, so it is alway safe to call
// firstTime. The firstTime method is arguably not needed and only there for // FirstTime. The FirstTime method is arguably not needed and only there for
// consistency with lastTime. // consistency with LastTime.
type ChunkDesc struct { type Desc struct {
sync.Mutex // Protects pinning. sync.Mutex // Protects pinning.
c Chunk // nil if chunk is evicted. C Chunk // nil if chunk is evicted.
rCnt int rCnt int
chunkFirstTime model.Time // Populated at creation. Immutable. ChunkFirstTime model.Time // Populated at creation. Immutable.
chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
// evictListElement is nil if the chunk is not in the evict list. // EvictListElement is nil if the chunk is not in the evict list.
// evictListElement is _not_ protected by the chunkDesc mutex. // EvictListElement is _not_ protected by the Desc mutex.
// It must only be touched by the evict list handler in MemorySeriesStorage. // It must only be touched by the evict list handler in MemorySeriesStorage.
evictListElement *list.Element EvictListElement *list.Element
} }
// NewChunkDesc creates a new chunkDesc pointing to the provided chunk. The // NewDesc creates a new Desc pointing to the provided chunk. The provided chunk
// provided chunk is assumed to be not persisted yet. Therefore, the refCount of // is assumed to be not persisted yet. Therefore, the refCount of the new
// the new chunkDesc is 1 (preventing eviction prior to persisting). // Desc is 1 (preventing eviction prior to persisting).
func NewChunkDesc(c Chunk, firstTime model.Time) *ChunkDesc { func NewDesc(c Chunk, firstTime model.Time) *Desc {
chunkOps.WithLabelValues(createAndPin).Inc() Ops.WithLabelValues(CreateAndPin).Inc()
atomic.AddInt64(&numMemChunks, 1) atomic.AddInt64(&NumMemChunks, 1)
numMemChunkDescs.Inc() NumMemDescs.Inc()
return &ChunkDesc{ return &Desc{
c: c, C: c,
rCnt: 1, rCnt: 1,
chunkFirstTime: firstTime, ChunkFirstTime: firstTime,
chunkLastTime: model.Earliest, ChunkLastTime: model.Earliest,
} }
} }
// Add adds a sample pair to the underlying chunk. For safe concurrent access, // Add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of // The chunk must be pinned, and the caller must have locked the fingerprint of
// the series. // the series.
func (cd *ChunkDesc) Add(s model.SamplePair) ([]Chunk, error) { func (d *Desc) Add(s model.SamplePair) ([]Chunk, error) {
return cd.c.Add(s) return d.C.Add(s)
} }
// pin increments the refCount by one. Upon increment from 0 to 1, this // Pin increments the refCount by one. Upon increment from 0 to 1, this
// chunkDesc is removed from the evict list. To enable the latter, the // Desc is removed from the evict list. To enable the latter, the
// evictRequests channel has to be provided. This method can be called // evictRequests channel has to be provided. This method can be called
// concurrently at any time. // concurrently at any time.
func (cd *ChunkDesc) pin(evictRequests chan<- evictRequest) { func (d *Desc) Pin(evictRequests chan<- EvictRequest) {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
if cd.rCnt == 0 { if d.rCnt == 0 {
// Remove ourselves from the evict list. // Remove ourselves from the evict list.
evictRequests <- evictRequest{cd, false} evictRequests <- EvictRequest{d, false}
} }
cd.rCnt++ d.rCnt++
} }
// unpin decrements the refCount by one. Upon decrement from 1 to 0, this // Unpin decrements the refCount by one. Upon decrement from 1 to 0, this
// chunkDesc is added to the evict list. To enable the latter, the evictRequests // Desc is added to the evict list. To enable the latter, the evictRequests
// channel has to be provided. This method can be called concurrently at any // channel has to be provided. This method can be called concurrently at any
// time. // time.
func (cd *ChunkDesc) unpin(evictRequests chan<- evictRequest) { func (d *Desc) Unpin(evictRequests chan<- EvictRequest) {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
if cd.rCnt == 0 { if d.rCnt == 0 {
panic("cannot unpin already unpinned chunk") panic("cannot unpin already unpinned chunk")
} }
cd.rCnt-- d.rCnt--
if cd.rCnt == 0 { if d.rCnt == 0 {
// Add ourselves to the back of the evict list. // Add ourselves to the back of the evict list.
evictRequests <- evictRequest{cd, true} evictRequests <- EvictRequest{d, true}
} }
} }
// refCount returns the number of pins. This method can be called concurrently // RefCount returns the number of pins. This method can be called concurrently
// at any time. // at any time.
func (cd *ChunkDesc) refCount() int { func (d *Desc) RefCount() int {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
return cd.rCnt return d.rCnt
} }
// firstTime returns the timestamp of the first sample in the chunk. This method // FirstTime returns the timestamp of the first sample in the chunk. This method
// can be called concurrently at any time. It only returns the immutable // can be called concurrently at any time. It only returns the immutable
// cd.chunkFirstTime without any locking. Arguably, this method is // d.ChunkFirstTime without any locking. Arguably, this method is
// useless. However, it provides consistency with the lastTime method. // useless. However, it provides consistency with the LastTime method.
func (cd *ChunkDesc) firstTime() model.Time { func (d *Desc) FirstTime() model.Time {
return cd.chunkFirstTime return d.ChunkFirstTime
} }
// lastTime returns the timestamp of the last sample in the chunk. For safe // LastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to // concurrent access, this method requires the fingerprint of the time series to
// be locked. // be locked.
func (cd *ChunkDesc) lastTime() (model.Time, error) { func (d *Desc) LastTime() (model.Time, error) {
if cd.chunkLastTime != model.Earliest || cd.c == nil { if d.ChunkLastTime != model.Earliest || d.C == nil {
return cd.chunkLastTime, nil return d.ChunkLastTime, nil
} }
return cd.c.NewIterator().LastTimestamp() return d.C.NewIterator().LastTimestamp()
} }
// maybePopulateLastTime populates the chunkLastTime from the underlying chunk // MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk
// if it has not yet happened. Call this method directly after having added the // if it has not yet happened. Call this method directly after having added the
// last sample to a chunk or after closing a head chunk due to age. For safe // last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked // concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series. // the fingerprint of the series.
func (cd *ChunkDesc) maybePopulateLastTime() error { func (d *Desc) MaybePopulateLastTime() error {
if cd.chunkLastTime == model.Earliest && cd.c != nil { if d.ChunkLastTime == model.Earliest && d.C != nil {
t, err := cd.c.NewIterator().LastTimestamp() t, err := d.C.NewIterator().LastTimestamp()
if err != nil { if err != nil {
return err return err
} }
cd.chunkLastTime = t d.ChunkLastTime = t
} }
return nil return nil
} }
// isEvicted returns whether the chunk is evicted. For safe concurrent access, // IsEvicted returns whether the chunk is evicted. For safe concurrent access,
// the caller must have locked the fingerprint of the series. // the caller must have locked the fingerprint of the series.
func (cd *ChunkDesc) isEvicted() bool { func (d *Desc) IsEvicted() bool {
// Locking required here because we do not want the caller to force // Locking required here because we do not want the caller to force
// pinning the chunk first, so it could be evicted while this method is // pinning the chunk first, so it could be evicted while this method is
// called. // called.
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
return cd.c == nil return d.C == nil
} }
// setChunk sets the underlying chunk. The caller must have locked the // SetChunk sets the underlying chunk. The caller must have locked the
// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first // fingerprint of the series and must have "pre-pinned" the chunk (i.e. first
// call pin and then set the chunk). // call Pin and then set the chunk).
func (cd *ChunkDesc) setChunk(c Chunk) { func (d *Desc) SetChunk(c Chunk) {
if cd.c != nil { if d.C != nil {
panic("chunk already set") panic("chunk already set")
} }
cd.c = c d.C = c
} }
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// is now evicted, which includes the case that the chunk was evicted even // is now evicted, which includes the case that the chunk was evicted even
// before this method was called. It can be called concurrently at any time. // before this method was called. It can be called concurrently at any time.
func (cd *ChunkDesc) maybeEvict() bool { func (d *Desc) MaybeEvict() bool {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
if cd.c == nil { if d.C == nil {
return true return true
} }
if cd.rCnt != 0 { if d.rCnt != 0 {
return false return false
} }
if cd.chunkLastTime == model.Earliest { if d.ChunkLastTime == model.Earliest {
// This must never happen. // This must never happen.
panic("chunkLastTime not populated for evicted chunk") panic("ChunkLastTime not populated for evicted chunk")
} }
cd.c = nil d.C = nil
chunkOps.WithLabelValues(evict).Inc()
atomic.AddInt64(&numMemChunks, -1)
return true return true
} }
@ -260,18 +267,18 @@ type Chunk interface {
Add(sample model.SamplePair) ([]Chunk, error) Add(sample model.SamplePair) ([]Chunk, error)
Clone() Chunk Clone() Chunk
FirstTime() model.Time FirstTime() model.Time
NewIterator() ChunkIterator NewIterator() Iterator
Marshal(io.Writer) error Marshal(io.Writer) error
MarshalToBuf([]byte) error MarshalToBuf([]byte) error
Unmarshal(io.Reader) error Unmarshal(io.Reader) error
UnmarshalFromBuf([]byte) error UnmarshalFromBuf([]byte) error
Encoding() ChunkEncoding Encoding() Encoding
} }
// ChunkIterator enables efficient access to the content of a chunk. It is // Iterator enables efficient access to the content of a chunk. It is
// generally not safe to use a chunkIterator concurrently with or after chunk // generally not safe to use an Iterator concurrently with or after chunk
// mutation. // mutation.
type ChunkIterator interface { type Iterator interface {
// Gets the last timestamp in the chunk. // Gets the last timestamp in the chunk.
LastTimestamp() (model.Time, error) LastTimestamp() (model.Time, error)
// Whether a given timestamp is contained between first and last value // Whether a given timestamp is contained between first and last value
@ -280,7 +287,7 @@ type ChunkIterator interface {
// Scans the next value in the chunk. Directly after the iterator has // Scans the next value in the chunk. Directly after the iterator has
// been created, the next value is the first value in the // been created, the next value is the first value in the
// chunk. Otherwise, it is the value following the last value scanned or // chunk. Otherwise, it is the value following the last value scanned or
// found (by one of the find... methods). Returns false if either the // found (by one of the Find... methods). Returns false if either the
// end of the chunk is reached or an error has occurred. // end of the chunk is reached or an error has occurred.
Scan() bool Scan() bool
// Finds the most recent value at or before the provided time. Returns // Finds the most recent value at or before the provided time. Returns
@ -292,7 +299,7 @@ type ChunkIterator interface {
// or an error has occurred. // or an error has occurred.
FindAtOrAfter(model.Time) bool FindAtOrAfter(model.Time) bool
// Returns the last value scanned (by the scan method) or found (by one // Returns the last value scanned (by the scan method) or found (by one
// of the find... methods). It returns ZeroSamplePair before any of // of the find... methods). It returns model.ZeroSamplePair before any of
// those methods were called. // those methods were called.
Value() model.SamplePair Value() model.SamplePair
// Returns the last error encountered. In general, an error signals data // Returns the last error encountered. In general, an error signals data
@ -300,9 +307,9 @@ type ChunkIterator interface {
Err() error Err() error
} }
// rangeValues is a utility function that retrieves all values within the given // RangeValues is a utility function that retrieves all values within the given
// range from a chunkIterator. // range from an Iterator.
func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, error) { func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) {
result := []model.SamplePair{} result := []model.SamplePair{}
if !it.FindAtOrAfter(in.OldestInclusive) { if !it.FindAtOrAfter(in.OldestInclusive) {
return result, it.Err() return result, it.Err()
@ -320,7 +327,7 @@ func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, erro
// chunk, adds the provided sample to it, and returns a chunk slice containing // chunk, adds the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk. // the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
overflowChunks, err := NewChunk().Add(s) overflowChunks, err := New().Add(s)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -332,7 +339,7 @@ func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
// provided sample. It returns the new chunks (transcoded plus overflow) with // provided sample. It returns the new chunks (transcoded plus overflow) with
// the new sample at the end. // the new sample at the end.
func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) { func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) {
chunkOps.WithLabelValues(transcode).Inc() Ops.WithLabelValues(Transcode).Inc()
var ( var (
head = dst head = dst
@ -358,23 +365,23 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error)
return append(body, NewChunks...), nil return append(body, NewChunks...), nil
} }
// NewChunk creates a new chunk according to the encoding set by the // New creates a new chunk according to the encoding set by the
// DefaultChunkEncoding flag. // DefaultEncoding flag.
func NewChunk() Chunk { func New() Chunk {
chunk, err := NewChunkForEncoding(DefaultChunkEncoding) chunk, err := NewForEncoding(DefaultEncoding)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return chunk return chunk
} }
// NewChunkForEncoding allows configuring what chunk type you want // NewForEncoding allows configuring what chunk type you want
func NewChunkForEncoding(encoding ChunkEncoding) (Chunk, error) { func NewForEncoding(encoding Encoding) (Chunk, error) {
switch encoding { switch encoding {
case Delta: case Delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil return newDeltaEncodedChunk(d1, d0, true, ChunkLen), nil
case DoubleDelta: case DoubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil return newDoubleDeltaEncodedChunk(d1, d0, true, ChunkLen), nil
case Varbit: case Varbit:
return newVarbitChunk(varbitZeroEncoding), nil return newVarbitChunk(varbitZeroEncoding), nil
default: default:
@ -402,23 +409,23 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC
return &indexAccessingChunkIterator{ return &indexAccessingChunkIterator{
len: len, len: len,
pos: -1, pos: -1,
lastValue: ZeroSamplePair, lastValue: model.ZeroSamplePair,
acc: acc, acc: acc,
} }
} }
// lastTimestamp implements chunkIterator. // lastTimestamp implements Iterator.
func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) { func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) {
return it.acc.timestampAtIndex(it.len - 1), it.acc.err() return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
} }
// contains implements chunkIterator. // contains implements Iterator.
func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) { func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) {
return !t.Before(it.acc.timestampAtIndex(0)) && return !t.Before(it.acc.timestampAtIndex(0)) &&
!t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err()
} }
// scan implements chunkIterator. // scan implements Iterator.
func (it *indexAccessingChunkIterator) Scan() bool { func (it *indexAccessingChunkIterator) Scan() bool {
it.pos++ it.pos++
if it.pos >= it.len { if it.pos >= it.len {
@ -431,7 +438,7 @@ func (it *indexAccessingChunkIterator) Scan() bool {
return it.acc.err() == nil return it.acc.err() == nil
} }
// findAtOrBefore implements chunkIterator. // findAtOrBefore implements Iterator.
func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool { func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return it.acc.timestampAtIndex(i).After(t) return it.acc.timestampAtIndex(i).After(t)
@ -447,7 +454,7 @@ func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool {
return true return true
} }
// findAtOrAfter implements chunkIterator. // findAtOrAfter implements Iterator.
func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool { func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return !it.acc.timestampAtIndex(i).Before(t) return !it.acc.timestampAtIndex(i).Before(t)
@ -463,12 +470,12 @@ func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool {
return true return true
} }
// value implements chunkIterator. // value implements Iterator.
func (it *indexAccessingChunkIterator) Value() model.SamplePair { func (it *indexAccessingChunkIterator) Value() model.SamplePair {
return it.lastValue return it.lastValue
} }
// err implements chunkIterator. // err implements Iterator.
func (it *indexAccessingChunkIterator) Err() error { func (it *indexAccessingChunkIterator) Err() error {
return it.acc.err() return it.acc.err()
} }

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import ( import (
"encoding/binary" "encoding/binary"
@ -184,13 +184,13 @@ func (c deltaEncodedChunk) Clone() Chunk {
return &clone return &clone
} }
// firstTime implements chunk. // FirstTime implements chunk.
func (c deltaEncodedChunk) FirstTime() model.Time { func (c deltaEncodedChunk) FirstTime() model.Time {
return c.baseTime() return c.baseTime()
} }
// NewIterator implements chunk. // NewIterator implements chunk.
func (c *deltaEncodedChunk) NewIterator() ChunkIterator { func (c *deltaEncodedChunk) NewIterator() Iterator {
return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
c: *c, c: *c,
baseT: c.baseTime(), baseT: c.baseTime(),
@ -265,7 +265,7 @@ func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
} }
// encoding implements chunk. // encoding implements chunk.
func (c deltaEncodedChunk) Encoding() ChunkEncoding { return Delta } func (c deltaEncodedChunk) Encoding() Encoding { return Delta }
func (c deltaEncodedChunk) timeBytes() deltaBytes { func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset]) return deltaBytes(c[deltaHeaderTimeBytesOffset])

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import ( import (
"math" "math"

View file

@ -15,7 +15,7 @@
// it may make sense to split those out later, but given that the tests are // it may make sense to split those out later, but given that the tests are
// near-identical and share a helper, this feels simpler for now. // near-identical and share a helper, this feels simpler for now.
package local package chunk
import ( import (
"bytes" "bytes"
@ -75,7 +75,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
}, },
} }
for _, c := range cases { for _, c := range cases {
chunk := c.chunkConstructor(d1, d4, false, chunkLen) chunk := c.chunkConstructor(d1, d4, false, ChunkLen)
cs, err := chunk.Add(model.SamplePair{ cs, err := chunk.Add(model.SamplePair{
Timestamp: model.Now(), Timestamp: model.Now(),
@ -85,7 +85,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err) t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err)
} }
buf := make([]byte, chunkLen) buf := make([]byte, ChunkLen)
cs[0].MarshalToBuf(buf) cs[0].MarshalToBuf(buf)

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import ( import (
"encoding/binary" "encoding/binary"
@ -191,13 +191,13 @@ func (c doubleDeltaEncodedChunk) Clone() Chunk {
return &clone return &clone
} }
// firstTime implements chunk. // FirstTime implements chunk.
func (c doubleDeltaEncodedChunk) FirstTime() model.Time { func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
return c.baseTime() return c.baseTime()
} }
// NewIterator( implements chunk. // NewIterator( implements chunk.
func (c *doubleDeltaEncodedChunk) NewIterator() ChunkIterator { func (c *doubleDeltaEncodedChunk) NewIterator() Iterator {
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
c: *c, c: *c,
baseT: c.baseTime(), baseT: c.baseTime(),
@ -275,7 +275,7 @@ func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
} }
// encoding implements chunk. // encoding implements chunk.
func (c doubleDeltaEncodedChunk) Encoding() ChunkEncoding { return DoubleDelta } func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta }
func (c doubleDeltaEncodedChunk) baseTime() model.Time { func (c doubleDeltaEncodedChunk) baseTime() model.Time {
return model.Time( return model.Time(

View file

@ -0,0 +1,100 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunk
import "github.com/prometheus/client_golang/prometheus"
// Usually, a separate file for instrumentation is frowned upon. Metrics should
// be close to where they are used. However, the metrics below are set all over
// the place, so we go for a separate instrumentation file in this case.
var (
Ops = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunk_ops_total",
Help: "The total number of chunk operations by their type.",
},
[]string{OpTypeLabel},
)
DescOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkdesc_ops_total",
Help: "The total number of chunk descriptor operations by their type.",
},
[]string{OpTypeLabel},
)
NumMemDescs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_chunkdescs",
Help: "The current number of chunk descriptors in memory.",
})
)
const (
namespace = "prometheus"
subsystem = "local_storage"
// OpTypeLabel is the label name for chunk operation types.
OpTypeLabel = "type"
// Op-types for ChunkOps.
// CreateAndPin is the label value for create-and-pin chunk ops.
CreateAndPin = "create" // A Desc creation with refCount=1.
// PersistAndUnpin is the label value for persist chunk ops.
PersistAndUnpin = "persist"
// Pin is the label value for pin chunk ops (excludes pin on creation).
Pin = "pin"
// Unpin is the label value for unpin chunk ops (excludes the unpin on persisting).
Unpin = "unpin"
// Clone is the label value for clone chunk ops.
Clone = "clone"
// Transcode is the label value for transcode chunk ops.
Transcode = "transcode"
// Drop is the label value for drop chunk ops.
Drop = "drop"
// Op-types for ChunkOps and ChunkDescOps.
// Evict is the label value for evict chunk desc ops.
Evict = "evict"
// Load is the label value for load chunk and chunk desc ops.
Load = "load"
)
func init() {
prometheus.MustRegister(Ops)
prometheus.MustRegister(DescOps)
prometheus.MustRegister(NumMemDescs)
}
var (
// NumMemChunks is the total number of chunks in memory. This is a
// global counter, also used internally, so not implemented as
// metrics. Collected in MemorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
NumMemChunks int64
// NumMemChunksDesc is the metric descriptor for the above.
NumMemChunksDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "memory_chunks"),
"The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).",
nil, nil,
)
)

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import ( import (
"encoding/binary" "encoding/binary"
@ -195,11 +195,11 @@ const (
varbitFirstValueDeltaOffset = 38 varbitFirstValueDeltaOffset = 38
// The following are in the "footer" and only usable if the chunk is // The following are in the "footer" and only usable if the chunk is
// still open. // still open.
varbitCountOffsetBitOffset = chunkLen - 9 varbitCountOffsetBitOffset = ChunkLen - 9
varbitLastTimeDeltaOffset = chunkLen - 7 varbitLastTimeDeltaOffset = ChunkLen - 7
varbitLastValueDeltaOffset = chunkLen - 4 varbitLastValueDeltaOffset = ChunkLen - 4
varbitLastLeadingZerosCountOffset = chunkLen - 4 varbitLastLeadingZerosCountOffset = ChunkLen - 4
varbitLastSignificantBitsCountOffset = chunkLen - 3 varbitLastSignificantBitsCountOffset = ChunkLen - 3
varbitFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here. varbitFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here.
varbitSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here. varbitSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here.
@ -240,18 +240,18 @@ var varbitWorstCaseBitsPerSample = map[varbitValueEncoding]int{
type varbitChunk []byte type varbitChunk []byte
// newVarbitChunk returns a newly allocated varbitChunk. For simplicity, all // newVarbitChunk returns a newly allocated varbitChunk. For simplicity, all
// varbit chunks must have the length as determined by the chunkLen constant. // varbit chunks must have the length as determined by the ChunkLen constant.
func newVarbitChunk(enc varbitValueEncoding) *varbitChunk { func newVarbitChunk(enc varbitValueEncoding) *varbitChunk {
if chunkLen < varbitMinLength || chunkLen > varbitMaxLength { if ChunkLen < varbitMinLength || ChunkLen > varbitMaxLength {
panic(fmt.Errorf( panic(fmt.Errorf(
"invalid chunk length of %d bytes, need at least %d bytes and at most %d bytes", "invalid chunk length of %d bytes, need at least %d bytes and at most %d bytes",
chunkLen, varbitMinLength, varbitMaxLength, ChunkLen, varbitMinLength, varbitMaxLength,
)) ))
} }
if enc > varbitDirectEncoding { if enc > varbitDirectEncoding {
panic(fmt.Errorf("unknown varbit value encoding: %v", enc)) panic(fmt.Errorf("unknown varbit value encoding: %v", enc))
} }
c := make(varbitChunk, chunkLen) c := make(varbitChunk, ChunkLen)
c.setValueEncoding(enc) c.setValueEncoding(enc)
return &c return &c
} }
@ -280,7 +280,7 @@ func (c varbitChunk) Clone() Chunk {
} }
// NewIterator implements chunk. // NewIterator implements chunk.
func (c varbitChunk) NewIterator() ChunkIterator { func (c varbitChunk) NewIterator() Iterator {
return newVarbitChunkIterator(c) return newVarbitChunkIterator(c)
} }
@ -320,9 +320,9 @@ func (c varbitChunk) UnmarshalFromBuf(buf []byte) error {
} }
// encoding implements chunk. // encoding implements chunk.
func (c varbitChunk) Encoding() ChunkEncoding { return Varbit } func (c varbitChunk) Encoding() Encoding { return Varbit }
// firstTime implements chunk. // FirstTime implements chunk.
func (c varbitChunk) FirstTime() model.Time { func (c varbitChunk) FirstTime() model.Time {
return model.Time( return model.Time(
binary.BigEndian.Uint64( binary.BigEndian.Uint64(
@ -548,14 +548,14 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk
} }
// Analyze worst case, does it fit? If not, set new sample as the last. // Analyze worst case, does it fit? If not, set new sample as the last.
if int(offset)+varbitWorstCaseBitsPerSample[encoding] > chunkLen*8 { if int(offset)+varbitWorstCaseBitsPerSample[encoding] > ChunkLen*8 {
return c.addLastSample(s), nil return c.addLastSample(s), nil
} }
// Transcoding/overflow decisions first. // Transcoding/overflow decisions first.
if encoding == varbitZeroEncoding && s.Value != lastValue { if encoding == varbitZeroEncoding && s.Value != lastValue {
// Cannot go on with zero encoding. // Cannot go on with zero encoding.
if offset > chunkLen*4 { if offset > ChunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead. // Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s) return addToOverflowChunk(c, s)
} }
@ -567,7 +567,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk
} }
if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) { if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) {
// Cannot go on with int encoding. // Cannot go on with int encoding.
if offset > chunkLen*4 { if offset > ChunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead. // Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s) return addToOverflowChunk(c, s)
} }
@ -903,7 +903,7 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator {
} }
} }
// lastTimestamp implements chunkIterator. // lastTimestamp implements Iterator.
func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) { func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) {
if it.len == varbitFirstSampleBitOffset { if it.len == varbitFirstSampleBitOffset {
// No samples in the chunk yet. // No samples in the chunk yet.
@ -912,7 +912,7 @@ func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) {
return it.c.lastTime(), it.lastError return it.c.lastTime(), it.lastError
} }
// contains implements chunkIterator. // contains implements Iterator.
func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) { func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) {
last, err := it.LastTimestamp() last, err := it.LastTimestamp()
if err != nil { if err != nil {
@ -923,7 +923,7 @@ func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) {
!t.After(last), it.lastError !t.After(last), it.lastError
} }
// scan implements chunkIterator. // scan implements Iterator.
func (it *varbitChunkIterator) Scan() bool { func (it *varbitChunkIterator) Scan() bool {
if it.lastError != nil { if it.lastError != nil {
return false return false
@ -1002,7 +1002,7 @@ func (it *varbitChunkIterator) Scan() bool {
return it.lastError == nil return it.lastError == nil
} }
// findAtOrBefore implements chunkIterator. // findAtOrBefore implements Iterator.
func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool { func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool {
if it.len == 0 || t.Before(it.c.FirstTime()) { if it.len == 0 || t.Before(it.c.FirstTime()) {
return false return false
@ -1038,7 +1038,7 @@ func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool {
return it.lastError == nil return it.lastError == nil
} }
// findAtOrAfter implements chunkIterator. // findAtOrAfter implements Iterator.
func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool { func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool {
if it.len == 0 || t.After(it.c.lastTime()) { if it.len == 0 || t.After(it.c.lastTime()) {
return false return false
@ -1061,7 +1061,7 @@ func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool {
return it.lastError == nil return it.lastError == nil
} }
// value implements chunkIterator. // value implements Iterator.
func (it *varbitChunkIterator) Value() model.SamplePair { func (it *varbitChunkIterator) Value() model.SamplePair {
return model.SamplePair{ return model.SamplePair{
Timestamp: it.t, Timestamp: it.t,
@ -1069,7 +1069,7 @@ func (it *varbitChunkIterator) Value() model.SamplePair {
} }
} }
// err implements chunkIterator. // err implements Iterator.
func (it *varbitChunkIterator) Err() error { func (it *varbitChunkIterator) Err() error {
return it.lastError return it.lastError
} }

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import "github.com/prometheus/common/model" import "github.com/prometheus/common/model"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package local package chunk
import "testing" import "testing"

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
) )
@ -114,10 +115,10 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
) )
} }
s.chunkDescs = append( s.chunkDescs = append(
make([]*ChunkDesc, 0, len(s.chunkDescs)-s.persistWatermark), make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]..., s.chunkDescs[s.persistWatermark:]...,
) )
numMemChunkDescs.Sub(float64(s.persistWatermark)) chunk.NumMemDescs.Sub(float64(s.persistWatermark))
s.persistWatermark = 0 s.persistWatermark = 0
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
} }
@ -290,8 +291,8 @@ func (p *persistence) sanitizeSeries(
) )
s.chunkDescs = cds s.chunkDescs = cds
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime() s.savedFirstTime = cds[0].FirstTime()
s.lastTime, err = cds[len(cds)-1].lastTime() s.lastTime, err = cds[len(cds)-1].LastTime()
if err != nil { if err != nil {
log.Errorf( log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s", "Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
@ -314,7 +315,7 @@ func (p *persistence) sanitizeSeries(
// First, throw away the chunkDescs without chunks. // First, throw away the chunkDescs without chunks.
s.chunkDescs = s.chunkDescs[s.persistWatermark:] s.chunkDescs = s.chunkDescs[s.persistWatermark:]
numMemChunkDescs.Sub(float64(s.persistWatermark)) chunk.NumMemDescs.Sub(float64(s.persistWatermark))
cds, err := p.loadChunkDescs(fp, 0) cds, err := p.loadChunkDescs(fp, 0)
if err != nil { if err != nil {
log.Errorf( log.Errorf(
@ -326,10 +327,10 @@ func (p *persistence) sanitizeSeries(
} }
s.persistWatermark = len(cds) s.persistWatermark = len(cds)
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime() s.savedFirstTime = cds[0].FirstTime()
s.modTime = modTime s.modTime = modTime
lastTime, err := cds[len(cds)-1].lastTime() lastTime, err := cds[len(cds)-1].LastTime()
if err != nil { if err != nil {
log.Errorf( log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s", "Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
@ -340,7 +341,7 @@ func (p *persistence) sanitizeSeries(
} }
keepIdx := -1 keepIdx := -1
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if cd.firstTime() >= lastTime { if cd.FirstTime() >= lastTime {
keepIdx = i keepIdx = i
break break
} }
@ -350,8 +351,8 @@ func (p *persistence) sanitizeSeries(
"Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.", "Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.",
s.metric, fp, chunksInFile, s.metric, fp, chunksInFile,
) )
numMemChunkDescs.Sub(float64(len(s.chunkDescs))) chunk.NumMemDescs.Sub(float64(len(s.chunkDescs)))
atomic.AddInt64(&numMemChunks, int64(-len(s.chunkDescs))) atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs)))
s.chunkDescs = cds s.chunkDescs = cds
s.headChunkClosed = true s.headChunkClosed = true
return fp, true return fp, true
@ -360,8 +361,8 @@ func (p *persistence) sanitizeSeries(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.", "Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.",
s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx, s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx,
) )
numMemChunkDescs.Sub(float64(keepIdx)) chunk.NumMemDescs.Sub(float64(keepIdx))
atomic.AddInt64(&numMemChunks, int64(-keepIdx)) atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx))
if keepIdx == len(s.chunkDescs) { if keepIdx == len(s.chunkDescs) {
// No chunks from series file left, head chunk is evicted, so declare it closed. // No chunks from series file left, head chunk is evicted, so declare it closed.
s.headChunkClosed = true s.headChunkClosed = true

View file

@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/codable"
) )
@ -107,7 +108,7 @@ func (hs *headsScanner) scan() bool {
firstTime int64 firstTime int64
lastTime int64 lastTime int64
encoding byte encoding byte
ch Chunk ch chunk.Chunk
lastTimeHead model.Time lastTimeHead model.Time
) )
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
@ -146,7 +147,7 @@ func (hs *headsScanner) scan() bool {
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil { if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false return false
} }
chunkDescs := make([]*ChunkDesc, numChunkDescs) chunkDescs := make([]*chunk.Desc, numChunkDescs)
if hs.version == headsFormatLegacyVersion { if hs.version == headsFormatLegacyVersion {
if headChunkPersisted { if headChunkPersisted {
persistWatermark = numChunkDescs persistWatermark = numChunkDescs
@ -163,11 +164,11 @@ func (hs *headsScanner) scan() bool {
if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false return false
} }
chunkDescs[i] = &ChunkDesc{ chunkDescs[i] = &chunk.Desc{
chunkFirstTime: model.Time(firstTime), ChunkFirstTime: model.Time(firstTime),
chunkLastTime: model.Time(lastTime), ChunkLastTime: model.Time(lastTime),
} }
numMemChunkDescs.Inc() chunk.NumMemDescs.Inc()
} else { } else {
// Non-persisted chunk. // Non-persisted chunk.
// If there are non-persisted chunks at all, we consider // If there are non-persisted chunks at all, we consider
@ -176,24 +177,24 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false return false
} }
if ch, hs.err = NewChunkForEncoding(ChunkEncoding(encoding)); hs.err != nil { if ch, hs.err = chunk.NewForEncoding(chunk.Encoding(encoding)); hs.err != nil {
return false return false
} }
if hs.err = ch.Unmarshal(hs.r); hs.err != nil { if hs.err = ch.Unmarshal(hs.r); hs.err != nil {
return false return false
} }
cd := NewChunkDesc(ch, ch.FirstTime()) cd := chunk.NewDesc(ch, ch.FirstTime())
if i < numChunkDescs-1 { if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk // This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime. // to be persisted, and we need to populate lastTime.
hs.chunksToPersistTotal++ hs.chunksToPersistTotal++
cd.maybePopulateLastTime() cd.MaybePopulateLastTime()
} }
chunkDescs[i] = cd chunkDescs[i] = cd
} }
} }
if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].lastTime(); hs.err != nil { if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].LastTime(); hs.err != nil {
return false return false
} }

View file

@ -13,38 +13,6 @@
package local package local
import "github.com/prometheus/client_golang/prometheus"
// Usually, a separate file for instrumentation is frowned upon. Metrics should
// be close to where they are used. However, the metrics below are set all over
// the place, so we go for a separate instrumentation file in this case.
var (
chunkOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunk_ops_total",
Help: "The total number of chunk operations by their type.",
},
[]string{opTypeLabel},
)
chunkDescOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkdesc_ops_total",
Help: "The total number of chunk descriptor operations by their type.",
},
[]string{opTypeLabel},
)
numMemChunkDescs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_chunkdescs",
Help: "The current number of chunk descriptors in memory.",
})
)
const ( const (
namespace = "prometheus" namespace = "prometheus"
subsystem = "local_storage" subsystem = "local_storage"
@ -64,19 +32,6 @@ const (
droppedQuarantine = "quarantine_dropped" droppedQuarantine = "quarantine_dropped"
failedQuarantine = "quarantine_failed" failedQuarantine = "quarantine_failed"
// Op-types for chunkOps.
createAndPin = "create" // A chunkDesc creation with refCount=1.
persistAndUnpin = "persist"
pin = "pin" // Excluding the pin on creation.
unpin = "unpin" // Excluding the unpin on persisting.
clone = "clone"
transcode = "transcode"
drop = "drop"
// Op-types for chunkOps and chunkDescOps.
evict = "evict"
load = "load"
seriesLocationLabel = "location" seriesLocationLabel = "location"
// Maintenance types for maintainSeriesDuration. // Maintenance types for maintainSeriesDuration.
@ -89,24 +44,3 @@ const (
outOfOrderTimestamp = "timestamp_out_of_order" outOfOrderTimestamp = "timestamp_out_of_order"
duplicateSample = "multiple_values_for_timestamp" duplicateSample = "multiple_values_for_timestamp"
) )
func init() {
prometheus.MustRegister(chunkOps)
prometheus.MustRegister(chunkDescOps)
prometheus.MustRegister(numMemChunkDescs)
}
var (
// Global counter, also used internally, so not implemented as
// metrics. Collected in MemorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
numMemChunks int64
// Metric descriptors for the above.
numMemChunksDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "memory_chunks"),
"The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).",
nil, nil,
)
)

View file

@ -91,7 +91,7 @@ type Querier interface {
type SeriesIterator interface { type SeriesIterator interface {
// Gets the value that is closest before the given time. In case a value // Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no // exists at precisely the given time, that value is returned. If no
// applicable value exists, ZeroSamplePair is returned. // applicable value exists, model.ZeroSamplePair is returned.
ValueAtOrBeforeTime(model.Time) model.SamplePair ValueAtOrBeforeTime(model.Time) model.SamplePair
// Gets all values contained within a given interval. // Gets all values contained within a given interval.
RangeValues(metric.Interval) []model.SamplePair RangeValues(metric.Interval) []model.SamplePair
@ -100,17 +100,3 @@ type SeriesIterator interface {
// Closes the iterator and releases the underlying data. // Closes the iterator and releases the underlying data.
Close() Close()
} }
// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local
// package to signal a non-existing sample pair. It is a SamplePair with
// timestamp model.Earliest and value 0.0. Note that the natural zero value of
// SamplePair has a timestamp of 0, which is possible to appear in a real
// SamplePair and thus not suitable to signal a non-existing SamplePair.
var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest}
// ZeroSample is the pseudo zero-value of model.Sample used by the local package
// to signal a non-existing sample. It is a Sample with timestamp
// model.Earliest, value 0.0, and metric nil. Note that the natural zero value
// of Sample has a timestamp of 0, which is possible to appear in a real
// Sample and thus not suitable to signal a non-existing Sample.
var ZeroSample = model.Sample{Timestamp: model.Earliest}

View file

@ -31,6 +31,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/flock" "github.com/prometheus/prometheus/util/flock"
@ -60,7 +61,7 @@ const (
chunkHeaderTypeOffset = 0 chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1 chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9 chunkHeaderLastTimeOffset = 9
chunkLenWithHeader = chunkLen + chunkHeaderLen chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen
chunkMaxBatchSize = 62 // Max no. of chunks to load or write in chunkMaxBatchSize = 62 // Max no. of chunks to load or write in
// one batch. Note that 62 is the largest number of chunks that fit // one batch. Note that 62 is the largest number of chunks that fit
// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk. // into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.
@ -370,7 +371,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
// //
// Returning an error signals problems with the series file. In this case, the // Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series. // caller should quarantine the series.
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index int, err error) { func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) {
f, err := p.openChunkFileForWriting(fp) f, err := p.openChunkFileForWriting(fp)
if err != nil { if err != nil {
return -1, err return -1, err
@ -399,14 +400,14 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index
// incrementally larger indexes. The indexOffset denotes the offset to be added to // incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or // each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently. // drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) { func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer f.Close() defer f.Close()
chunks := make([]Chunk, 0, len(indexes)) chunks := make([]chunk.Chunk, 0, len(indexes))
buf := p.bufPool.Get().([]byte) buf := p.bufPool.Get().([]byte)
defer func() { defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
@ -436,7 +437,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err return nil, err
} }
for c := 0; c < batchSize; c++ { for c := 0; c < batchSize; c++ {
chunk, err := NewChunkForEncoding(ChunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -446,16 +447,16 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
chunks = append(chunks, chunk) chunks = append(chunks, chunk)
} }
} }
chunkOps.WithLabelValues(load).Add(float64(len(chunks))) chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
atomic.AddInt64(&numMemChunks, int64(len(chunks))) atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
return chunks, nil return chunks, nil
} }
// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is // loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is
// the number of chunkDescs to skip from the end of the series file. It is the // the number of chunk.Descs to skip from the end of the series file. It is the
// caller's responsibility to not persist or drop anything for the same // caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently. // fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) { func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
@ -478,7 +479,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
} }
numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
cds := make([]*ChunkDesc, numChunks) cds := make([]*chunk.Desc, numChunks)
chunkTimesBuf := make([]byte, 16) chunkTimesBuf := make([]byte, 16)
for i := 0; i < numChunks; i++ { for i := 0; i < numChunks; i++ {
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
@ -490,13 +491,13 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
if err != nil { if err != nil {
return nil, err return nil, err
} }
cds[i] = &ChunkDesc{ cds[i] = &chunk.Desc{
chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)), ChunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
chunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
} }
} }
chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) chunk.DescOps.WithLabelValues(chunk.Load).Add(float64(len(cds)))
numMemChunkDescs.Add(float64(len(cds))) chunk.NumMemDescs.Add(float64(len(cds)))
return cds, nil return cds, nil
} }
@ -645,10 +646,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
for i, chunkDesc := range m.series.chunkDescs { for i, chunkDesc := range m.series.chunkDescs {
if i < m.series.persistWatermark { if i < m.series.persistWatermark {
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
return return
} }
lt, err := chunkDesc.lastTime() lt, err := chunkDesc.LastTime()
if err != nil { if err != nil {
return return
} }
@ -657,10 +658,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
} else { } else {
// This is a non-persisted chunk. Fully marshal it. // This is a non-persisted chunk. Fully marshal it.
if err = w.WriteByte(byte(chunkDesc.c.Encoding())); err != nil { if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
return return
} }
if err = chunkDesc.c.Marshal(w); err != nil { if err = chunkDesc.C.Marshal(w); err != nil {
return return
} }
} }
@ -751,7 +752,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
// Returning an error signals problems with the series file. In this case, the // Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series. // caller should quarantine the series.
func (p *persistence) dropAndPersistChunks( func (p *persistence) dropAndPersistChunks(
fp model.Fingerprint, beforeTime model.Time, chunks []Chunk, fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk,
) ( ) (
firstTimeNotDropped model.Time, firstTimeNotDropped model.Time,
offset int, offset int,
@ -879,7 +880,7 @@ func (p *persistence) dropAndPersistChunks(
firstTimeNotDropped = model.Time( firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]), binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
) )
chunkOps.WithLabelValues(drop).Add(float64(numDropped)) chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped))
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR) _, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
if err != nil { if err != nil {
return return
@ -930,7 +931,7 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
if err := os.Remove(fname); err != nil { if err := os.Remove(fname); err != nil {
return -1, err return -1, err
} }
chunkOps.WithLabelValues(drop).Add(float64(numChunks)) chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numChunks))
return numChunks, nil return numChunks, nil
} }
@ -1500,7 +1501,7 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) {
return fpm, highestMappedFP, nil return fpm, highestMappedFP, nil
} }
func (p *persistence) writeChunks(w io.Writer, chunks []Chunk) error { func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
b := p.bufPool.Get().([]byte) b := p.bufPool.Get().([]byte)
defer func() { defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
@ -1547,7 +1548,7 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil return int(offset) / chunkLenWithHeader, nil
} }
func writeChunkHeader(header []byte, c Chunk) error { func writeChunkHeader(header []byte, c chunk.Chunk) error {
header[chunkHeaderTypeOffset] = byte(c.Encoding()) header[chunkHeaderTypeOffset] = byte(c.Encoding())
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:], header[chunkHeaderFirstTimeOffset:],

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
@ -38,8 +39,8 @@ var (
m5 = model.Metric{"label": "value5"} m5 = model.Metric{"label": "value5"}
) )
func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, testutil.Closer) { func newTestPersistence(t *testing.T, encoding chunk.Encoding) (*persistence, testutil.Closer) {
DefaultChunkEncoding = encoding chunk.DefaultEncoding = encoding
dir := testutil.NewTemporaryDirectory("test_persistence", t) dir := testutil.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1) p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1)
if err != nil { if err != nil {
@ -53,18 +54,18 @@ func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, tes
}) })
} }
func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint][]Chunk { func buildTestChunks(t *testing.T, encoding chunk.Encoding) map[model.Fingerprint][]chunk.Chunk {
fps := model.Fingerprints{ fps := model.Fingerprints{
m1.FastFingerprint(), m1.FastFingerprint(),
m2.FastFingerprint(), m2.FastFingerprint(),
m3.FastFingerprint(), m3.FastFingerprint(),
} }
fpToChunks := map[model.Fingerprint][]Chunk{} fpToChunks := map[model.Fingerprint][]chunk.Chunk{}
for _, fp := range fps { for _, fp := range fps {
fpToChunks[fp] = make([]Chunk, 0, 10) fpToChunks[fp] = make([]chunk.Chunk, 0, 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
ch, err := NewChunkForEncoding(encoding) ch, err := chunk.NewForEncoding(encoding)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -81,7 +82,7 @@ func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint
return fpToChunks return fpToChunks
} }
func chunksEqual(c1, c2 Chunk) bool { func chunksEqual(c1, c2 chunk.Chunk) bool {
it1 := c1.NewIterator() it1 := c1.NewIterator()
it2 := c2.NewIterator() it2 := c2.NewIterator()
for it1.Scan() && it2.Scan() { for it1.Scan() && it2.Scan() {
@ -92,7 +93,7 @@ func chunksEqual(c1, c2 Chunk) bool {
return it1.Err() == nil && it2.Err() == nil return it1.Err() == nil && it2.Err() == nil
} }
func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) { func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -138,14 +139,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
} }
for i, cd := range actualChunkDescs { for i, cd := range actualChunkDescs {
lastTime, err := cd.lastTime() lastTime, err := cd.LastTime()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { if cd.FirstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf( t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.", "Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), lastTime, i, cd.FirstTime(), lastTime,
) )
} }
@ -159,14 +160,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
} }
for i, cd := range actualChunkDescs { for i, cd := range actualChunkDescs {
lastTime, err := cd.lastTime() lastTime, err := cd.LastTime()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { if cd.FirstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf( t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.", "Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), lastTime, i, cd.FirstTime(), lastTime,
) )
} }
@ -450,7 +451,7 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1) testPersistLoadDropChunks(t, 1)
} }
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding) { func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -499,7 +500,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if !reflect.DeepEqual(loadedS1.metric, m1) { if !reflect.DeepEqual(loadedS1.metric, m1) {
t.Errorf("want metric %v, got %v", m1, loadedS1.metric) t.Errorf("want metric %v, got %v", m1, loadedS1.metric)
} }
if !reflect.DeepEqual(loadedS1.head().c, s1.head().c) { if !reflect.DeepEqual(loadedS1.head().C, s1.head().C) {
t.Error("head chunks differ") t.Error("head chunks differ")
} }
if loadedS1.chunkDescsOffset != 0 { if loadedS1.chunkDescsOffset != 0 {
@ -508,11 +509,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if loadedS1.headChunkClosed { if loadedS1.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
if loadedS1.head().chunkFirstTime != 1 { if loadedS1.head().ChunkFirstTime != 1 {
t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime) t.Errorf("want ChunkFirstTime in head chunk to be 1, got %d", loadedS1.head().ChunkFirstTime)
} }
if loadedS1.head().chunkLastTime != model.Earliest { if loadedS1.head().ChunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset") t.Error("want ChunkLastTime in head chunk to be unset")
} }
} else { } else {
t.Errorf("couldn't find %v in loaded map", m1) t.Errorf("couldn't find %v in loaded map", m1)
@ -521,7 +522,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if !reflect.DeepEqual(loadedS3.metric, m3) { if !reflect.DeepEqual(loadedS3.metric, m3) {
t.Errorf("want metric %v, got %v", m3, loadedS3.metric) t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
} }
if loadedS3.head().c != nil { if loadedS3.head().C != nil {
t.Error("head chunk not evicted") t.Error("head chunk not evicted")
} }
if loadedS3.chunkDescsOffset != 0 { if loadedS3.chunkDescsOffset != 0 {
@ -530,11 +531,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if !loadedS3.headChunkClosed { if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false") t.Error("headChunkClosed is false")
} }
if loadedS3.head().chunkFirstTime != 2 { if loadedS3.head().ChunkFirstTime != 2 {
t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime) t.Errorf("want ChunkFirstTime in head chunk to be 2, got %d", loadedS3.head().ChunkFirstTime)
} }
if loadedS3.head().chunkLastTime != 2 { if loadedS3.head().ChunkLastTime != 2 {
t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime) t.Errorf("want ChunkLastTime in head chunk to be 2, got %d", loadedS3.head().ChunkLastTime)
} }
} else { } else {
t.Errorf("couldn't find %v in loaded map", m3) t.Errorf("couldn't find %v in loaded map", m3)
@ -549,10 +550,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if got, want := loadedS4.persistWatermark, 0; got != want { if got, want := loadedS4.persistWatermark, 0; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want) t.Errorf("got persistWatermark %d, want %d", got, want)
} }
if loadedS4.chunkDescs[2].isEvicted() { if loadedS4.chunkDescs[2].IsEvicted() {
t.Error("3rd chunk evicted") t.Error("3rd chunk evicted")
} }
if loadedS4.chunkDescs[3].isEvicted() { if loadedS4.chunkDescs[3].IsEvicted() {
t.Error("4th chunk evicted") t.Error("4th chunk evicted")
} }
if loadedS4.chunkDescsOffset != 0 { if loadedS4.chunkDescsOffset != 0 {
@ -562,27 +563,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
for i, cd := range loadedS4.chunkDescs { for i, cd := range loadedS4.chunkDescs {
if cd.chunkFirstTime != cd.c.FirstTime() { if cd.ChunkFirstTime != cd.C.FirstTime() {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.FirstTime(), cd.chunkFirstTime, i, cd.C.FirstTime(), cd.ChunkFirstTime,
) )
} }
if i == len(loadedS4.chunkDescs)-1 { if i == len(loadedS4.chunkDescs)-1 {
// Head chunk. // Head chunk.
if cd.chunkLastTime != model.Earliest { if cd.ChunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset") t.Error("want ChunkLastTime in head chunk to be unset")
} }
continue continue
} }
lastTime, err := cd.c.NewIterator().LastTimestamp() lastTime, err := cd.C.NewIterator().LastTimestamp()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if cd.chunkLastTime != lastTime { if cd.ChunkLastTime != lastTime {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d",
i, lastTime, cd.chunkLastTime, i, lastTime, cd.ChunkLastTime,
) )
} }
} }
@ -599,10 +600,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
if got, want := loadedS5.persistWatermark, 3; got != want { if got, want := loadedS5.persistWatermark, 3; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want) t.Errorf("got persistWatermark %d, want %d", got, want)
} }
if !loadedS5.chunkDescs[2].isEvicted() { if !loadedS5.chunkDescs[2].IsEvicted() {
t.Error("3rd chunk not evicted") t.Error("3rd chunk not evicted")
} }
if loadedS5.chunkDescs[3].isEvicted() { if loadedS5.chunkDescs[3].IsEvicted() {
t.Error("4th chunk evicted") t.Error("4th chunk evicted")
} }
if loadedS5.chunkDescsOffset != 0 { if loadedS5.chunkDescsOffset != 0 {
@ -614,32 +615,32 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding
for i, cd := range loadedS5.chunkDescs { for i, cd := range loadedS5.chunkDescs {
if i < 3 { if i < 3 {
// Evicted chunks. // Evicted chunks.
if cd.chunkFirstTime == model.Earliest { if cd.ChunkFirstTime == model.Earliest {
t.Errorf("chunkDesc[%d]: chunkLastTime not set", i) t.Errorf("chunk.Desc[%d]: ChunkLastTime not set", i)
} }
continue continue
} }
if cd.chunkFirstTime != cd.c.FirstTime() { if cd.ChunkFirstTime != cd.C.FirstTime() {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.FirstTime(), cd.chunkFirstTime, i, cd.C.FirstTime(), cd.ChunkFirstTime,
) )
} }
if i == len(loadedS5.chunkDescs)-1 { if i == len(loadedS5.chunkDescs)-1 {
// Head chunk. // Head chunk.
if cd.chunkLastTime != model.Earliest { if cd.ChunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset") t.Error("want ChunkLastTime in head chunk to be unset")
} }
continue continue
} }
lastTime, err := cd.c.NewIterator().LastTimestamp() lastTime, err := cd.C.NewIterator().LastTimestamp()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if cd.chunkLastTime != lastTime { if cd.ChunkLastTime != lastTime {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d",
i, cd.chunkLastTime, lastTime, i, cd.ChunkLastTime, lastTime,
) )
} }
} }
@ -690,7 +691,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) {
} }
} }
func testFingerprintsModifiedBefore(t *testing.T, encoding ChunkEncoding) { func testFingerprintsModifiedBefore(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -769,7 +770,7 @@ func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) {
testFingerprintsModifiedBefore(t, 2) testFingerprintsModifiedBefore(t, 2)
} }
func testDropArchivedMetric(t *testing.T, encoding ChunkEncoding) { func testDropArchivedMetric(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -843,7 +844,7 @@ type incrementalBatch struct {
expectedLpToFps index.LabelPairFingerprintsMapping expectedLpToFps index.LabelPairFingerprintsMapping
} }
func testIndexing(t *testing.T, encoding ChunkEncoding) { func testIndexing(t *testing.T, encoding chunk.Encoding) {
batches := []incrementalBatch{ batches := []incrementalBatch{
{ {
fpToMetric: index.FingerprintMetricMapping{ fpToMetric: index.FingerprintMetricMapping{

View file

@ -20,13 +20,14 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
const ( const (
// chunkDescEvictionFactor is a factor used for chunkDesc eviction (as opposed // chunkDescEvictionFactor is a factor used for chunk.Desc eviction (as opposed
// to evictions of chunks, see method evictOlderThan. A chunk takes about 20x // to evictions of chunks, see method evictOlderThan. A chunk takes about 20x
// more memory than a chunkDesc. With a chunkDescEvictionFactor of 10, not more // more memory than a chunk.Desc. With a chunkDescEvictionFactor of 10, not more
// than a third of the total memory taken by a series will be used for // than a third of the total memory taken by a series will be used for
// chunkDescs. // chunkDescs.
chunkDescEvictionFactor = 10 chunkDescEvictionFactor = 10
@ -140,8 +141,8 @@ func (sm *seriesMap) fpIter() <-chan model.Fingerprint {
type memorySeries struct { type memorySeries struct {
metric model.Metric metric model.Metric
// Sorted by start time, overlapping chunk ranges are forbidden. // Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs []*ChunkDesc chunkDescs []*chunk.Desc
// The index (within chunkDescs above) of the first chunkDesc that // The index (within chunkDescs above) of the first chunk.Desc that
// points to a non-persisted chunk. If all chunks are persisted, then // points to a non-persisted chunk. If all chunks are persisted, then
// persistWatermark == len(chunkDescs). // persistWatermark == len(chunkDescs).
persistWatermark int persistWatermark int
@ -151,7 +152,7 @@ type memorySeries struct {
// The chunkDescs in memory might not have all the chunkDescs for the // The chunkDescs in memory might not have all the chunkDescs for the
// chunks that are persisted to disk. The missing chunkDescs are all // chunks that are persisted to disk. The missing chunkDescs are all
// contiguous and at the tail end. chunkDescsOffset is the index of the // contiguous and at the tail end. chunkDescsOffset is the index of the
// chunk on disk that corresponds to the first chunkDesc in memory. If // chunk on disk that corresponds to the first chunk.Desc in memory. If
// it is 0, the chunkDescs are all loaded. A value of -1 denotes a // it is 0, the chunkDescs are all loaded. A value of -1 denotes a
// special case: There are chunks on disk, but the offset to the // special case: There are chunks on disk, but the offset to the
// chunkDescs in memory is unknown. Also, in this special case, there is // chunkDescs in memory is unknown. Also, in this special case, there is
@ -160,7 +161,7 @@ type memorySeries struct {
// set). // set).
chunkDescsOffset int chunkDescsOffset int
// The savedFirstTime field is used as a fallback when the // The savedFirstTime field is used as a fallback when the
// chunkDescsOffset is not 0. It can be used to save the firstTime of the // chunkDescsOffset is not 0. It can be used to save the FirstTime of the
// first chunk before its chunk desc is evicted. In doubt, this field is // first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp. // just set to the oldest possible timestamp.
savedFirstTime model.Time savedFirstTime model.Time
@ -193,13 +194,13 @@ type memorySeries struct {
// set to model.Earliest. The zero value for modTime can be used if the // set to model.Earliest. The zero value for modTime can be used if the
// modification time of the series file is unknown (e.g. if this is a genuinely // modification time of the series file is unknown (e.g. if this is a genuinely
// new series). // new series).
func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time) (*memorySeries, error) { func newMemorySeries(m model.Metric, chunkDescs []*chunk.Desc, modTime time.Time) (*memorySeries, error) {
var err error var err error
firstTime := model.Earliest firstTime := model.Earliest
lastTime := model.Earliest lastTime := model.Earliest
if len(chunkDescs) > 0 { if len(chunkDescs) > 0 {
firstTime = chunkDescs[0].firstTime() firstTime = chunkDescs[0].FirstTime()
if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil { if lastTime, err = chunkDescs[len(chunkDescs)-1].LastTime(); err != nil {
return nil, err return nil, err
} }
} }
@ -220,10 +221,10 @@ func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time)
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) Add(v model.SamplePair) (int, error) { func (s *memorySeries) Add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed { if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := NewChunkDesc(NewChunk(), v.Timestamp) newHead := chunk.NewDesc(chunk.New(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead) s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkClosed = false s.headChunkClosed = false
} else if s.headChunkUsedByIterator && s.head().refCount() > 1 { } else if s.headChunkUsedByIterator && s.head().RefCount() > 1 {
// We only need to clone the head chunk if the current head // We only need to clone the head chunk if the current head
// chunk was used in an iterator at all and if the refCount is // chunk was used in an iterator at all and if the refCount is
// still greater than the 1 we always have because the head // still greater than the 1 we always have because the head
@ -233,10 +234,10 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) {
// around and keep the head chunk pinned. We needed to track // around and keep the head chunk pinned. We needed to track
// pins by version of the head chunk, which is probably not // pins by version of the head chunk, which is probably not
// worth the effort. // worth the effort.
chunkOps.WithLabelValues(clone).Inc() chunk.Ops.WithLabelValues(chunk.Clone).Inc()
// No locking needed here because a non-persisted head chunk can // No locking needed here because a non-persisted head chunk can
// not get evicted concurrently. // not get evicted concurrently.
s.head().c = s.head().c.Clone() s.head().C = s.head().C.Clone()
s.headChunkUsedByIterator = false s.headChunkUsedByIterator = false
} }
@ -244,15 +245,15 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
s.head().c = chunks[0] s.head().C = chunks[0]
for _, c := range chunks[1:] { for _, c := range chunks[1:] {
s.chunkDescs = append(s.chunkDescs, NewChunkDesc(c, c.FirstTime())) s.chunkDescs = append(s.chunkDescs, chunk.NewDesc(c, c.FirstTime()))
} }
// Populate lastTime of now-closed chunks. // Populate lastTime of now-closed chunks.
for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] { for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] {
cd.maybePopulateLastTime() cd.MaybePopulateLastTime()
} }
s.lastTime = v.Timestamp s.lastTime = v.Timestamp
@ -275,7 +276,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
// Since we cannot modify the head chunk from now on, we // Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore. // don't need to bother with cloning anymore.
s.headChunkUsedByIterator = false s.headChunkUsedByIterator = false
s.head().maybePopulateLastTime() s.head().MaybePopulateLastTime()
return true return true
} }
return false return false
@ -287,14 +288,14 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted)
if lenToKeep < len(s.chunkDescs) { if lenToKeep < len(s.chunkDescs) {
s.savedFirstTime = s.FirstTime() s.savedFirstTime = s.firstTime()
lenEvicted := len(s.chunkDescs) - lenToKeep lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted s.chunkDescsOffset += lenEvicted
s.persistWatermark -= lenEvicted s.persistWatermark -= lenEvicted
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted))
numMemChunkDescs.Sub(float64(lenEvicted)) chunk.NumMemDescs.Sub(float64(lenEvicted))
s.chunkDescs = append( s.chunkDescs = append(
make([]*ChunkDesc, 0, lenToKeep), make([]*chunk.Desc, 0, lenToKeep),
s.chunkDescs[lenEvicted:]..., s.chunkDescs[lenEvicted:]...,
) )
s.dirty = true s.dirty = true
@ -306,7 +307,7 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
func (s *memorySeries) dropChunks(t model.Time) error { func (s *memorySeries) dropChunks(t model.Time) error {
keepIdx := len(s.chunkDescs) keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
lt, err := cd.lastTime() lt, err := cd.LastTime()
if err != nil { if err != nil {
return err return err
} }
@ -324,7 +325,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
return nil return nil
} }
s.chunkDescs = append( s.chunkDescs = append(
make([]*ChunkDesc, 0, len(s.chunkDescs)-keepIdx), make([]*chunk.Desc, 0, len(s.chunkDescs)-keepIdx),
s.chunkDescs[keepIdx:]..., s.chunkDescs[keepIdx:]...,
) )
s.persistWatermark -= keepIdx s.persistWatermark -= keepIdx
@ -334,7 +335,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
if s.chunkDescsOffset != -1 { if s.chunkDescsOffset != -1 {
s.chunkDescsOffset += keepIdx s.chunkDescsOffset += keepIdx
} }
numMemChunkDescs.Sub(float64(keepIdx)) chunk.NumMemDescs.Sub(float64(keepIdx))
s.dirty = true s.dirty = true
return nil return nil
} }
@ -344,16 +345,16 @@ func (s *memorySeries) preloadChunks(
indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage,
) (SeriesIterator, error) { ) (SeriesIterator, error) {
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make([]*ChunkDesc, 0, len(indexes)) pinnedChunkDescs := make([]*chunk.Desc, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
cd := s.chunkDescs[idx] cd := s.chunkDescs[idx]
pinnedChunkDescs = append(pinnedChunkDescs, cd) pinnedChunkDescs = append(pinnedChunkDescs, cd)
cd.pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading. cd.Pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading.
if cd.isEvicted() { if cd.IsEvicted() {
loadIndexes = append(loadIndexes, idx) loadIndexes = append(loadIndexes, idx)
} }
} }
chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs))) chunk.Ops.WithLabelValues(chunk.Pin).Add(float64(len(pinnedChunkDescs)))
if len(loadIndexes) > 0 { if len(loadIndexes) > 0 {
if s.chunkDescsOffset == -1 { if s.chunkDescsOffset == -1 {
@ -363,13 +364,13 @@ func (s *memorySeries) preloadChunks(
if err != nil { if err != nil {
// Unpin the chunks since we won't return them as pinned chunks now. // Unpin the chunks since we won't return them as pinned chunks now.
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
cd.unpin(mss.evictRequests) cd.Unpin(mss.evictRequests)
} }
chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) chunk.Ops.WithLabelValues(chunk.Unpin).Add(float64(len(pinnedChunkDescs)))
return nopIter, err return nopIter, err
} }
for i, c := range chunks { for i, c := range chunks {
s.chunkDescs[loadIndexes[i]].setChunk(c) s.chunkDescs[loadIndexes[i]].SetChunk(c)
} }
} }
@ -394,19 +395,19 @@ func (s *memorySeries) preloadChunks(
// //
// The caller must have locked the fingerprint of the memorySeries. // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) NewIterator( func (s *memorySeries) NewIterator(
pinnedChunkDescs []*ChunkDesc, pinnedChunkDescs []*chunk.Desc,
quarantine func(error), quarantine func(error),
evictRequests chan<- evictRequest, evictRequests chan<- chunk.EvictRequest,
) SeriesIterator { ) SeriesIterator {
chunks := make([]Chunk, 0, len(pinnedChunkDescs)) chunks := make([]chunk.Chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the // It's OK to directly access cd.c here (without locking) as the
// series FP is locked and the chunk is pinned. // series FP is locked and the chunk is pinned.
chunks = append(chunks, cd.c) chunks = append(chunks, cd.C)
} }
return &memorySeriesIterator{ return &memorySeriesIterator{
chunks: chunks, chunks: chunks,
chunkIts: make([]ChunkIterator, len(chunks)), chunkIts: make([]chunk.Iterator, len(chunks)),
quarantine: quarantine, quarantine: quarantine,
metric: s.metric, metric: s.metric,
pinnedChunkDescs: pinnedChunkDescs, pinnedChunkDescs: pinnedChunkDescs,
@ -429,7 +430,7 @@ func (s *memorySeries) preloadChunksForInstant(
lastSample := s.lastSamplePair() lastSample := s.lastSamplePair()
if !through.Before(lastSample.Timestamp) && if !through.Before(lastSample.Timestamp) &&
!from.After(lastSample.Timestamp) && !from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair { lastSample != model.ZeroSamplePair {
iter := &boundedIterator{ iter := &boundedIterator{
it: &singleSampleSeriesIterator{ it: &singleSampleSeriesIterator{
samplePair: lastSample, samplePair: lastSample,
@ -453,7 +454,7 @@ func (s *memorySeries) preloadChunksForRange(
) (SeriesIterator, error) { ) (SeriesIterator, error) {
firstChunkDescTime := model.Latest firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 { if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].FirstTime()
} }
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
cds, err := mss.loadChunkDescs(fp, s.persistWatermark) cds, err := mss.loadChunkDescs(fp, s.persistWatermark)
@ -463,7 +464,7 @@ func (s *memorySeries) preloadChunksForRange(
s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.persistWatermark += len(cds) s.persistWatermark += len(cds)
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].FirstTime()
} }
if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) { if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) {
@ -472,16 +473,16 @@ func (s *memorySeries) preloadChunksForRange(
// Find first chunk with start time after "from". // Find first chunk with start time after "from".
fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool { fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
return s.chunkDescs[i].firstTime().After(from) return s.chunkDescs[i].FirstTime().After(from)
}) })
// Find first chunk with start time after "through". // Find first chunk with start time after "through".
throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool { throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
return s.chunkDescs[i].firstTime().After(through) return s.chunkDescs[i].FirstTime().After(through)
}) })
if fromIdx == len(s.chunkDescs) { if fromIdx == len(s.chunkDescs) {
// Even the last chunk starts before "from". Find out if the // Even the last chunk starts before "from". Find out if the
// series ends before "from" and we don't need to do anything. // series ends before "from" and we don't need to do anything.
lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() lt, err := s.chunkDescs[len(s.chunkDescs)-1].LastTime()
if err != nil { if err != nil {
return nopIter, err return nopIter, err
} }
@ -506,22 +507,22 @@ func (s *memorySeries) preloadChunksForRange(
// head returns a pointer to the head chunk descriptor. The caller must have // head returns a pointer to the head chunk descriptor. The caller must have
// locked the fingerprint of the memorySeries. This method will panic if this // locked the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors. // series has no chunk descriptors.
func (s *memorySeries) head() *ChunkDesc { func (s *memorySeries) head() *chunk.Desc {
return s.chunkDescs[len(s.chunkDescs)-1] return s.chunkDescs[len(s.chunkDescs)-1]
} }
// firstTime returns the timestamp of the first sample in the series. // firstTime returns the timestamp of the first sample in the series.
// //
// The caller must have locked the fingerprint of the memorySeries. // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) FirstTime() model.Time { func (s *memorySeries) firstTime() model.Time {
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
return s.chunkDescs[0].firstTime() return s.chunkDescs[0].FirstTime()
} }
return s.savedFirstTime return s.savedFirstTime
} }
// lastSamplePair returns the last ingested SamplePair. It returns // lastSamplePair returns the last ingested SamplePair. It returns
// ZeroSamplePair if this memorySeries has never received a sample (via the add // model.ZeroSamplePair if this memorySeries has never received a sample (via the add
// method), which is the case for freshly unarchived series or newly created // method), which is the case for freshly unarchived series or newly created
// ones and also for all series after a server restart. However, in that case, // ones and also for all series after a server restart. However, in that case,
// series will most likely be considered stale anyway. // series will most likely be considered stale anyway.
@ -529,7 +530,7 @@ func (s *memorySeries) FirstTime() model.Time {
// The caller must have locked the fingerprint of the memorySeries. // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) lastSamplePair() model.SamplePair { func (s *memorySeries) lastSamplePair() model.SamplePair {
if !s.lastSampleValueSet { if !s.lastSampleValueSet {
return ZeroSamplePair return model.ZeroSamplePair
} }
return model.SamplePair{ return model.SamplePair{
Timestamp: s.lastTime, Timestamp: s.lastTime,
@ -543,7 +544,7 @@ func (s *memorySeries) lastSamplePair() model.SamplePair {
// accordingly. // accordingly.
// //
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) chunksToPersist() []*ChunkDesc { func (s *memorySeries) chunksToPersist() []*chunk.Desc {
newWatermark := len(s.chunkDescs) newWatermark := len(s.chunkDescs)
if !s.headChunkClosed { if !s.headChunkClosed {
newWatermark-- newWatermark--
@ -559,20 +560,20 @@ func (s *memorySeries) chunksToPersist() []*ChunkDesc {
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
// Last chunkIterator used by ValueAtOrBeforeTime. // Last chunk.Iterator used by ValueAtOrBeforeTime.
chunkIt ChunkIterator chunkIt chunk.Iterator
// Caches chunkIterators. // Caches chunkIterators.
chunkIts []ChunkIterator chunkIts []chunk.Iterator
// The actual sample chunks. // The actual sample chunks.
chunks []Chunk chunks []chunk.Chunk
// Call to quarantine the series this iterator belongs to. // Call to quarantine the series this iterator belongs to.
quarantine func(error) quarantine func(error)
// The metric corresponding to the iterator. // The metric corresponding to the iterator.
metric model.Metric metric model.Metric
// Chunks that were pinned for this iterator. // Chunks that were pinned for this iterator.
pinnedChunkDescs []*ChunkDesc pinnedChunkDescs []*chunk.Desc
// Where to send evict requests when unpinning pinned chunks. // Where to send evict requests when unpinning pinned chunks.
evictRequests chan<- evictRequest evictRequests chan<- chunk.EvictRequest
} }
// ValueAtOrBeforeTime implements SeriesIterator. // ValueAtOrBeforeTime implements SeriesIterator.
@ -582,7 +583,7 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
containsT, err := it.chunkIt.Contains(t) containsT, err := it.chunkIt.Contains(t)
if err != nil { if err != nil {
it.quarantine(err) it.quarantine(err)
return ZeroSamplePair return model.ZeroSamplePair
} }
if containsT { if containsT {
if it.chunkIt.FindAtOrBefore(t) { if it.chunkIt.FindAtOrBefore(t) {
@ -591,12 +592,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
if it.chunkIt.Err() != nil { if it.chunkIt.Err() != nil {
it.quarantine(it.chunkIt.Err()) it.quarantine(it.chunkIt.Err())
} }
return ZeroSamplePair return model.ZeroSamplePair
} }
} }
if len(it.chunks) == 0 { if len(it.chunks) == 0 {
return ZeroSamplePair return model.ZeroSamplePair
} }
// Find the last chunk where FirstTime() is before or equal to t. // Find the last chunk where FirstTime() is before or equal to t.
@ -606,7 +607,7 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
}) })
if i == len(it.chunks) { if i == len(it.chunks) {
// Even the first chunk starts after t. // Even the first chunk starts after t.
return ZeroSamplePair return model.ZeroSamplePair
} }
it.chunkIt = it.chunkIterator(l - i) it.chunkIt = it.chunkIterator(l - i)
if it.chunkIt.FindAtOrBefore(t) { if it.chunkIt.FindAtOrBefore(t) {
@ -615,7 +616,7 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
if it.chunkIt.Err() != nil { if it.chunkIt.Err() != nil {
it.quarantine(it.chunkIt.Err()) it.quarantine(it.chunkIt.Err())
} }
return ZeroSamplePair return model.ZeroSamplePair
} }
// RangeValues implements SeriesIterator. // RangeValues implements SeriesIterator.
@ -642,7 +643,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
if c.FirstTime().After(in.NewestInclusive) { if c.FirstTime().After(in.NewestInclusive) {
break break
} }
chValues, err := rangeValues(it.chunkIterator(i+j), in) chValues, err := chunk.RangeValues(it.chunkIterator(i+j), in)
if err != nil { if err != nil {
it.quarantine(err) it.quarantine(err)
return nil return nil
@ -656,9 +657,9 @@ func (it *memorySeriesIterator) Metric() metric.Metric {
return metric.Metric{Metric: it.metric} return metric.Metric{Metric: it.metric}
} }
// chunkIterator returns the chunkIterator for the chunk at position i (and // chunkIterator returns the chunk.Iterator for the chunk at position i (and
// creates it if needed). // creates it if needed).
func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator { func (it *memorySeriesIterator) chunkIterator(i int) chunk.Iterator {
chunkIt := it.chunkIts[i] chunkIt := it.chunkIts[i]
if chunkIt == nil { if chunkIt == nil {
chunkIt = it.chunks[i].NewIterator() chunkIt = it.chunks[i].NewIterator()
@ -669,9 +670,9 @@ func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator {
func (it *memorySeriesIterator) Close() { func (it *memorySeriesIterator) Close() {
for _, cd := range it.pinnedChunkDescs { for _, cd := range it.pinnedChunkDescs {
cd.unpin(it.evictRequests) cd.Unpin(it.evictRequests)
} }
chunkOps.WithLabelValues(unpin).Add(float64(len(it.pinnedChunkDescs))) chunk.Ops.WithLabelValues(chunk.Unpin).Add(float64(len(it.pinnedChunkDescs)))
} }
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut // singleSampleSeriesIterator implements Series Iterator. It is a "shortcut
@ -685,7 +686,7 @@ type singleSampleSeriesIterator struct {
// ValueAtTime implements SeriesIterator. // ValueAtTime implements SeriesIterator.
func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
if it.samplePair.Timestamp.After(t) { if it.samplePair.Timestamp.After(t) {
return ZeroSamplePair return model.ZeroSamplePair
} }
return it.samplePair return it.samplePair
} }
@ -711,7 +712,7 @@ type nopSeriesIterator struct{}
// ValueAtTime implements SeriesIterator. // ValueAtTime implements SeriesIterator.
func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
return ZeroSamplePair return model.ZeroSamplePair
} }
// RangeValues implements SeriesIterator. // RangeValues implements SeriesIterator.

View file

@ -29,13 +29,13 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
const ( const (
evictRequestsCap = 1024 evictRequestsCap = 1024
quarantineRequestsCap = 1024 quarantineRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP. // See waitForNextFP.
fpMaxSweepTime = 6 * time.Hour fpMaxSweepTime = 6 * time.Hour
@ -89,11 +89,6 @@ var (
) )
) )
type evictRequest struct {
cd *ChunkDesc
evict bool
}
type quarantineRequest struct { type quarantineRequest struct {
fp model.Fingerprint fp model.Fingerprint
metric model.Metric metric model.Metric
@ -171,7 +166,7 @@ type MemorySeriesStorage struct {
mapper *fpMapper mapper *fpMapper
evictList *list.List evictList *list.List
evictRequests chan evictRequest evictRequests chan chunk.EvictRequest
evictStopping, evictStopped chan struct{} evictStopping, evictStopped chan struct{}
quarantineRequests chan quarantineRequest quarantineRequests chan quarantineRequest
@ -226,7 +221,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
maxChunksToPersist: o.MaxChunksToPersist, maxChunksToPersist: o.MaxChunksToPersist,
evictList: list.New(), evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap), evictRequests: make(chan chunk.EvictRequest, evictRequestsCap),
evictStopping: make(chan struct{}), evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}), evictStopped: make(chan struct{}),
@ -457,7 +452,7 @@ type boundedIterator struct {
// ValueAtOrBeforeTime implements the SeriesIterator interface. // ValueAtOrBeforeTime implements the SeriesIterator interface.
func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
if ts < bit.start { if ts < bit.start {
return ZeroSamplePair return model.ZeroSamplePair
} }
return bit.it.ValueAtOrBeforeTime(ts) return bit.it.ValueAtOrBeforeTime(ts)
} }
@ -662,7 +657,7 @@ func (s *MemorySeriesStorage) metricForRange(
) (model.Metric, *memorySeries, bool) { ) (model.Metric, *memorySeries, bool) {
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if ok { if ok {
if series.lastTime.Before(from) || series.FirstTime().After(through) { if series.lastTime.Before(from) || series.firstTime().After(through) {
return nil, nil, false return nil, nil, false
} }
return series.metric, series, true return series.metric, series, true
@ -779,7 +774,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
// NeedsThrottling implements Storage. // NeedsThrottling implements Storage.
func (s *MemorySeriesStorage) NeedsThrottling() bool { func (s *MemorySeriesStorage) NeedsThrottling() bool {
if s.getNumChunksToPersist() > s.maxChunksToPersist || if s.getNumChunksToPersist() > s.maxChunksToPersist ||
float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { float64(atomic.LoadInt64(&chunk.NumMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks {
select { select {
case s.throttled <- struct{}{}: case s.throttled <- struct{}{}:
default: // Do nothing, signal already pending. default: // Do nothing, signal already pending.
@ -813,7 +808,7 @@ func (s *MemorySeriesStorage) logThrottling() {
log. log.
With("chunksToPersist", s.getNumChunksToPersist()). With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist). With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&numMemChunks)). With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.")
} }
@ -821,7 +816,7 @@ func (s *MemorySeriesStorage) logThrottling() {
log. log.
With("chunksToPersist", s.getNumChunksToPersist()). With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist). With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&numMemChunks)). With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Info("Storage does not need throttling anymore.") Info("Storage does not need throttling anymore.")
case <-s.loopStopping: case <-s.loopStopping:
@ -833,7 +828,7 @@ func (s *MemorySeriesStorage) logThrottling() {
func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
var cds []*ChunkDesc var cds []*chunk.Desc
var modTime time.Time var modTime time.Time
unarchived, err := s.persistence.unarchiveMetric(fp) unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil { if err != nil {
@ -842,9 +837,9 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
} }
if unarchived { if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc() s.seriesOps.WithLabelValues(unarchive).Inc()
// We have to load chunkDescs anyway to do anything with // We have to load chunk.Descs anyway to do anything with
// the series, so let's do it right now so that we don't // the series, so let's do it right now so that we don't
// end up with a series without any chunkDescs for a // end up with a series without any chunk.Descs for a
// while (which is confusing as it makes the series // while (which is confusing as it makes the series
// appear as archived or purged). // appear as archived or purged).
cds, err = s.loadChunkDescs(fp, 0) cds, err = s.loadChunkDescs(fp, 0)
@ -936,17 +931,17 @@ func (s *MemorySeriesStorage) handleEvictList() {
// evict run is more than maxMemoryChunks/1000. // evict run is more than maxMemoryChunks/1000.
select { select {
case req := <-s.evictRequests: case req := <-s.evictRequests:
if req.evict { if req.Evict {
req.cd.evictListElement = s.evictList.PushBack(req.cd) req.Desc.EvictListElement = s.evictList.PushBack(req.Desc)
count++ count++
if count > s.maxMemoryChunks/1000 { if count > s.maxMemoryChunks/1000 {
s.maybeEvict() s.maybeEvict()
count = 0 count = 0
} }
} else { } else {
if req.cd.evictListElement != nil { if req.Desc.EvictListElement != nil {
s.evictList.Remove(req.cd.evictListElement) s.evictList.Remove(req.Desc.EvictListElement)
req.cd.evictListElement = nil req.Desc.EvictListElement = nil
} }
} }
case <-ticker.C: case <-ticker.C:
@ -971,39 +966,39 @@ func (s *MemorySeriesStorage) handleEvictList() {
// maybeEvict is a local helper method. Must only be called by handleEvictList. // maybeEvict is a local helper method. Must only be called by handleEvictList.
func (s *MemorySeriesStorage) maybeEvict() { func (s *MemorySeriesStorage) maybeEvict() {
numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks numChunksToEvict := int(atomic.LoadInt64(&chunk.NumMemChunks)) - s.maxMemoryChunks
if numChunksToEvict <= 0 { if numChunksToEvict <= 0 {
return return
} }
chunkDescsToEvict := make([]*ChunkDesc, numChunksToEvict) chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict)
for i := range chunkDescsToEvict { for i := range chunkDescsToEvict {
e := s.evictList.Front() e := s.evictList.Front()
if e == nil { if e == nil {
break break
} }
cd := e.Value.(*ChunkDesc) cd := e.Value.(*chunk.Desc)
cd.evictListElement = nil cd.EvictListElement = nil
chunkDescsToEvict[i] = cd chunkDescsToEvict[i] = cd
s.evictList.Remove(e) s.evictList.Remove(e)
} }
// Do the actual eviction in a goroutine as we might otherwise deadlock, // Do the actual eviction in a goroutine as we might otherwise deadlock,
// in the following way: A chunk was unpinned completely and therefore // in the following way: A chunk was Unpinned completely and therefore
// scheduled for eviction. At the time we actually try to evict it, // scheduled for eviction. At the time we actually try to evict it,
// another goroutine is pinning the chunk. The pinning goroutine has // another goroutine is pinning the chunk. The pinning goroutine has
// currently locked the chunk and tries to send the evict request (to // currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests // remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the // channel. The send blocks because evictRequests is full. However, the
// goroutine that is supposed to empty the channel is waiting for the // goroutine that is supposed to empty the channel is waiting for the
// chunkDesc lock to try to evict the chunk. // Chunk.Desc lock to try to evict the chunk.
go func() { go func() {
for _, cd := range chunkDescsToEvict { for _, cd := range chunkDescsToEvict {
if cd == nil { if cd == nil {
break break
} }
cd.maybeEvict() cd.MaybeEvict()
// We don't care if the eviction succeeds. If the chunk // We don't care if the eviction succeeds. If the chunk
// was pinned in the meantime, it will be added to the // was pinned in the meantime, it will be added to the
// evict list once it gets unpinned again. // evict list once it gets Unpinned again.
} }
}() }()
} }
@ -1224,7 +1219,7 @@ loop:
// Next, the method checks if all chunks in the series are evicted. In that // Next, the method checks if all chunks in the series are evicted. In that
// case, it archives the series and returns true. // case, it archives the series and returns true.
// //
// Finally, it evicts chunkDescs if there are too many. // Finally, it evicts chunk.Descs if there are too many.
func (s *MemorySeriesStorage) maintainMemorySeries( func (s *MemorySeriesStorage) maintainMemorySeries(
fp model.Fingerprint, beforeTime model.Time, fp model.Fingerprint, beforeTime model.Time,
) (becameDirty bool) { ) (becameDirty bool) {
@ -1258,7 +1253,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
iOldestNotEvicted := -1 iOldestNotEvicted := -1
for i, cd := range series.chunkDescs { for i, cd := range series.chunkDescs {
if !cd.isEvicted() { if !cd.IsEvicted() {
iOldestNotEvicted = i iOldestNotEvicted = i
break break
} }
@ -1269,7 +1264,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), series.lastTime) s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
s.seriesOps.WithLabelValues(archive).Inc() s.seriesOps.WithLabelValues(archive).Inc()
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
if oldWatermark < int64(series.lastTime) { if oldWatermark < int64(series.lastTime) {
@ -1282,7 +1277,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
} }
return return
} }
// If we are here, the series is not archived, so check for chunkDesc // If we are here, the series is not archived, so check for Chunk.Desc
// eviction next. // eviction next.
series.evictChunkDescs(iOldestNotEvicted) series.evictChunkDescs(iOldestNotEvicted)
@ -1316,21 +1311,21 @@ func (s *MemorySeriesStorage) writeMemorySeries(
// that belong to a series that is scheduled for quarantine // that belong to a series that is scheduled for quarantine
// anyway. // anyway.
for _, cd := range cds { for _, cd := range cds {
cd.unpin(s.evictRequests) cd.Unpin(s.evictRequests)
} }
s.incNumChunksToPersist(-len(cds)) s.incNumChunksToPersist(-len(cds))
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds))) chunk.Ops.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds)))
series.modTime = s.persistence.seriesFileModTime(fp) series.modTime = s.persistence.seriesFileModTime(fp)
}() }()
// Get the actual chunks from underneath the chunkDescs. // Get the actual chunks from underneath the chunk.Descs.
// No lock required as chunks still to persist cannot be evicted. // No lock required as chunks still to persist cannot be evicted.
chunks := make([]Chunk, len(cds)) chunks := make([]chunk.Chunk, len(cds))
for i, cd := range cds { for i, cd := range cds {
chunks[i] = cd.c chunks[i] = cd.C
} }
if !series.FirstTime().Before(beforeTime) { if !series.firstTime().Before(beforeTime) {
// Oldest sample not old enough, just append chunks, if any. // Oldest sample not old enough, just append chunks, if any.
if len(cds) == 0 { if len(cds) == 0 {
return false return false
@ -1413,12 +1408,12 @@ func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
} }
// See persistence.loadChunks for detailed explanation. // See persistence.loadChunks for detailed explanation.
func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) { func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset) return s.persistence.loadChunks(fp, indexes, indexOffset)
} }
// See persistence.loadChunkDescs for detailed explanation. // See persistence.loadChunkDescs for detailed explanation.
func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) { func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
return s.persistence.loadChunkDescs(fp, offsetFromEnd) return s.persistence.loadChunkDescs(fp, offsetFromEnd)
} }
@ -1468,7 +1463,7 @@ func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
var ( var (
chunksToPersist = float64(s.getNumChunksToPersist()) chunksToPersist = float64(s.getNumChunksToPersist())
maxChunksToPersist = float64(s.maxChunksToPersist) maxChunksToPersist = float64(s.maxChunksToPersist)
memChunks = float64(atomic.LoadInt64(&numMemChunks)) memChunks = float64(atomic.LoadInt64(&chunk.NumMemChunks))
maxMemChunks = float64(s.maxMemoryChunks) maxMemChunks = float64(s.maxMemoryChunks)
) )
score := chunksToPersist / maxChunksToPersist score := chunksToPersist / maxChunksToPersist
@ -1578,12 +1573,12 @@ func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
s.numSeries.Dec() s.numSeries.Dec()
m = series.metric m = series.metric
// Adjust s.numChunksToPersist and numMemChunks down by // Adjust s.numChunksToPersist and chunk.NumMemChunks down by
// the number of chunks in this series that are not // the number of chunks in this series that are not
// persisted yet. Persisted chunks will be deducted from // persisted yet. Persisted chunks will be deducted from
// numMemChunks upon eviction. // chunk.NumMemChunks upon eviction.
numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark
atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted)) atomic.AddInt64(&chunk.NumMemChunks, int64(-numChunksNotYetPersisted))
if !series.headChunkClosed { if !series.headChunkClosed {
// Head chunk wasn't counted as waiting for persistence yet. // Head chunk wasn't counted as waiting for persistence yet.
// (But it was counted as a chunk in memory.) // (But it was counted as a chunk in memory.)
@ -1641,7 +1636,7 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s.ingestedSamplesCount.Desc() ch <- s.ingestedSamplesCount.Desc()
s.discardedSamplesCount.Describe(ch) s.discardedSamplesCount.Describe(ch)
ch <- s.nonExistentSeriesMatchesCount.Desc() ch <- s.nonExistentSeriesMatchesCount.Desc()
ch <- numMemChunksDesc ch <- chunk.NumMemChunksDesc
s.maintainSeriesDuration.Describe(ch) s.maintainSeriesDuration.Describe(ch)
ch <- s.persistenceUrgencyScore.Desc() ch <- s.persistenceUrgencyScore.Desc()
ch <- s.rushedMode.Desc() ch <- s.rushedMode.Desc()
@ -1669,9 +1664,9 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.discardedSamplesCount.Collect(ch) s.discardedSamplesCount.Collect(ch)
ch <- s.nonExistentSeriesMatchesCount ch <- s.nonExistentSeriesMatchesCount
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
numMemChunksDesc, chunk.NumMemChunksDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)), float64(atomic.LoadInt64(&chunk.NumMemChunks)),
) )
s.maintainSeriesDuration.Collect(ch) s.maintainSeriesDuration.Collect(ch)
ch <- s.persistenceUrgencyScore ch <- s.persistenceUrgencyScore

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -68,7 +69,7 @@ func TestMatches(t *testing.T) {
t.Fatal("could not retrieve series for fp", fp) t.Fatal("could not retrieve series for fp", fp)
} }
storage.fpLocker.Lock(fp) storage.fpLocker.Lock(fp)
storage.persistence.archiveMetric(fp, s.metric, s.FirstTime(), s.lastTime) storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime)
storage.fpLocker.Unlock(fp) storage.fpLocker.Unlock(fp)
} }
@ -785,7 +786,7 @@ func TestLoop(t *testing.T) {
} }
} }
func testChunk(t *testing.T, encoding ChunkEncoding) { func testChunk(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 500000) samples := make(model.Samples, 500000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -806,10 +807,10 @@ func testChunk(t *testing.T, encoding ChunkEncoding) {
defer s.fpLocker.Unlock(m.fp) // TODO remove, see below defer s.fpLocker.Unlock(m.fp) // TODO remove, see below
var values []model.SamplePair var values []model.SamplePair
for _, cd := range m.series.chunkDescs { for _, cd := range m.series.chunkDescs {
if cd.isEvicted() { if cd.IsEvicted() {
continue continue
} }
it := cd.c.NewIterator() it := cd.C.NewIterator()
for it.Scan() { for it.Scan() {
values = append(values, it.Value()) values = append(values, it.Value())
} }
@ -843,7 +844,7 @@ func TestChunkType2(t *testing.T) {
testChunk(t, 2) testChunk(t, 2)
} }
func testValueAtOrBeforeTime(t *testing.T, encoding ChunkEncoding) { func testValueAtOrBeforeTime(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -921,7 +922,7 @@ func TestValueAtTimeChunkType2(t *testing.T) {
testValueAtOrBeforeTime(t, 2) testValueAtOrBeforeTime(t, 2)
} }
func benchmarkValueAtOrBeforeTime(b *testing.B, encoding ChunkEncoding) { func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunk.Encoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1003,7 +1004,7 @@ func BenchmarkValueAtTimeChunkType2(b *testing.B) {
benchmarkValueAtOrBeforeTime(b, 2) benchmarkValueAtOrBeforeTime(b, 2)
} }
func testRangeValues(t *testing.T, encoding ChunkEncoding) { func testRangeValues(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1159,7 +1160,7 @@ func TestRangeValuesChunkType2(t *testing.T) {
testRangeValues(t, 2) testRangeValues(t, 2)
} }
func benchmarkRangeValues(b *testing.B, encoding ChunkEncoding) { func benchmarkRangeValues(b *testing.B, encoding chunk.Encoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1207,7 +1208,7 @@ func BenchmarkRangeValuesChunkType2(b *testing.B) {
benchmarkRangeValues(b, 2) benchmarkRangeValues(b, 2)
} }
func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) { func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1271,11 +1272,11 @@ func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) {
// Archive metrics. // Archive metrics.
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
lastTime, err := series.head().lastTime() lastTime, err := series.head().LastTime()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime) s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
archived, _, _ := s.persistence.hasArchivedMetric(fp) archived, _, _ := s.persistence.hasArchivedMetric(fp)
if !archived { if !archived {
t.Fatal("not archived") t.Fatal("not archived")
@ -1312,11 +1313,11 @@ func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) {
// Archive metrics. // Archive metrics.
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
lastTime, err = series.head().lastTime() lastTime, err = series.head().LastTime()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime) s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
archived, _, _ = s.persistence.hasArchivedMetric(fp) archived, _, _ = s.persistence.hasArchivedMetric(fp)
if !archived { if !archived {
t.Fatal("not archived") t.Fatal("not archived")
@ -1362,7 +1363,7 @@ func TestEvictAndPurgeSeriesChunkType2(t *testing.T) {
testEvictAndPurgeSeries(t, 2) testEvictAndPurgeSeries(t, 2)
} }
func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) { func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1401,7 +1402,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) {
s.maintainMemorySeries(fp, 0) s.maintainMemorySeries(fp, 0)
// Give the evict goroutine an opportunity to run. // Give the evict goroutine an opportunity to run.
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
// Maintain series again to trigger chunkDesc eviction // Maintain series again to trigger chunk.Desc eviction.
s.maintainMemorySeries(fp, 0) s.maintainMemorySeries(fp, 0)
if oldLen <= len(series.chunkDescs) { if oldLen <= len(series.chunkDescs) {
@ -1421,7 +1422,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) {
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
if len(series.chunkDescs) != 1 { if len(series.chunkDescs) != 1 {
t.Errorf("Expected exactly one chunkDesc left, got %d.", len(series.chunkDescs)) t.Errorf("Expected exactly one chunk.Desc left, got %d.", len(series.chunkDescs))
} }
} }
@ -1433,7 +1434,7 @@ func TestEvictAndLoadChunkDescsType1(t *testing.T) {
testEvictAndLoadChunkDescs(t, 1) testEvictAndLoadChunkDescs(t, 1)
} }
func benchmarkAppend(b *testing.B, encoding ChunkEncoding) { func benchmarkAppend(b *testing.B, encoding chunk.Encoding) {
samples := make(model.Samples, b.N) samples := make(model.Samples, b.N)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1469,7 +1470,7 @@ func BenchmarkAppendType2(b *testing.B) {
// Append a large number of random samples and then check if we can get them out // Append a large number of random samples and then check if we can get them out
// of the storage alright. // of the storage alright.
func testFuzz(t *testing.T, encoding ChunkEncoding) { func testFuzz(t *testing.T, encoding chunk.Encoding) {
if testing.Short() { if testing.Short() {
t.Skip("Skipping test in short mode.") t.Skip("Skipping test in short mode.")
} }
@ -1517,8 +1518,8 @@ func TestFuzzChunkType2(t *testing.T) {
// make things even slower): // make things even slower):
// //
// go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType // go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType
func benchmarkFuzz(b *testing.B, encoding ChunkEncoding) { func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) {
DefaultChunkEncoding = encoding chunk.DefaultEncoding = encoding
const samplesPerRun = 100000 const samplesPerRun = 100000
rand.Seed(42) rand.Seed(42)
directory := testutil.NewTemporaryDirectory("test_storage", b) directory := testutil.NewTemporaryDirectory("test_storage", b)

View file

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -40,8 +41,8 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary // NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the // directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up. // returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t testutil.T, encoding ChunkEncoding) (*MemorySeriesStorage, testutil.Closer) { func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage, testutil.Closer) {
DefaultChunkEncoding = encoding chunk.DefaultEncoding = encoding
directory := testutil.NewTemporaryDirectory("test_storage", t) directory := testutil.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000, MemoryChunks: 1000000,

View file

@ -22,6 +22,22 @@ import (
"strings" "strings"
) )
var (
// ZeroSamplePair is the pseudo zero-value of SamplePair used to signal a
// non-existing sample pair. It is a SamplePair with timestamp Earliest and
// value 0.0. Note that the natural zero value of SamplePair has a timestamp
// of 0, which is possible to appear in a real SamplePair and thus not
// suitable to signal a non-existing SamplePair.
ZeroSamplePair = SamplePair{Timestamp: Earliest}
// ZeroSample is the pseudo zero-value of Sample used to signal a
// non-existing sample. It is a Sample with timestamp Earliest, value 0.0,
// and metric nil. Note that the natural zero value of Sample has a timestamp
// of 0, which is possible to appear in a real Sample and thus not suitable
// to signal a non-existing Sample.
ZeroSample = Sample{Timestamp: Earliest}
)
// A SampleValue is a representation of a value for a given sample at a given // A SampleValue is a representation of a value for a given sample at a given
// time. // time.
type SampleValue float64 type SampleValue float64

4
vendor/vendor.json vendored
View file

@ -205,8 +205,8 @@
{ {
"checksumSHA1": "Jx0GXl5hGnO25s3ryyvtdWHdCpw=", "checksumSHA1": "Jx0GXl5hGnO25s3ryyvtdWHdCpw=",
"path": "github.com/prometheus/common/model", "path": "github.com/prometheus/common/model",
"revision": "4402f4e5ea79ec15f3c574773b6a5198fbea215f", "revision": "e35a2e33a50a7d756c7afdfaf609f93905a0c111",
"revisionTime": "2016-06-23T15:14:27Z" "revisionTime": "2016-09-28T14:38:18+02:00"
}, },
{ {
"checksumSHA1": "CKVJRc1NREmfoAWQLHxqWQlvxo0=", "checksumSHA1": "CKVJRc1NREmfoAWQLHxqWQlvxo0=",