timeseries: store varbit encoded data into cassandra

This commit is contained in:
Matthew Campbell 2016-09-21 17:56:55 +02:00
parent 4520e12440
commit 67d76e3a5d
15 changed files with 286 additions and 281 deletions

View file

@ -28,26 +28,27 @@ import (
) )
// DefaultChunkEncoding can be changed via a flag. // DefaultChunkEncoding can be changed via a flag.
var DefaultChunkEncoding = doubleDelta var DefaultChunkEncoding = DoubleDelta
var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")
type chunkEncoding byte // ChunkEncoding defintes which encoding we are using, delta, doubledelta, or varbit
type ChunkEncoding byte
// String implements flag.Value. // String implements flag.Value.
func (ce chunkEncoding) String() string { func (ce ChunkEncoding) String() string {
return fmt.Sprintf("%d", ce) return fmt.Sprintf("%d", ce)
} }
// Set implements flag.Value. // Set implements flag.Value.
func (ce *chunkEncoding) Set(s string) error { func (ce *ChunkEncoding) Set(s string) error {
switch s { switch s {
case "0": case "0":
*ce = delta *ce = Delta
case "1": case "1":
*ce = doubleDelta *ce = DoubleDelta
case "2": case "2":
*ce = varbit *ce = Varbit
default: default:
return fmt.Errorf("invalid chunk encoding: %s", s) return fmt.Errorf("invalid chunk encoding: %s", s)
} }
@ -55,12 +56,15 @@ func (ce *chunkEncoding) Set(s string) error {
} }
const ( const (
delta chunkEncoding = iota // Delta encoding
doubleDelta Delta ChunkEncoding = iota
varbit // DoubleDelta encoding
DoubleDelta
// Varbit encoding
Varbit
) )
// chunkDesc contains meta-data for a chunk. Pay special attention to the // ChunkDesc contains meta-data for a chunk. Pay special attention to the
// documented requirements for calling its methods concurrently (WRT pinning and // documented requirements for calling its methods concurrently (WRT pinning and
// locking). The doc comments spell out the requirements for each method, but // locking). The doc comments spell out the requirements for each method, but
// here is an overview and general explanation: // here is an overview and general explanation:
@ -88,9 +92,9 @@ const (
// is populated upon creation of a chunkDesc, so it is alway safe to call // is populated upon creation of a chunkDesc, so it is alway safe to call
// firstTime. The firstTime method is arguably not needed and only there for // firstTime. The firstTime method is arguably not needed and only there for
// consistency with lastTime. // consistency with lastTime.
type chunkDesc struct { type ChunkDesc struct {
sync.Mutex // Protects pinning. sync.Mutex // Protects pinning.
c chunk // nil if chunk is evicted. c Chunk // nil if chunk is evicted.
rCnt int rCnt int
chunkFirstTime model.Time // Populated at creation. Immutable. chunkFirstTime model.Time // Populated at creation. Immutable.
chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
@ -101,14 +105,14 @@ type chunkDesc struct {
evictListElement *list.Element evictListElement *list.Element
} }
// newChunkDesc creates a new chunkDesc pointing to the provided chunk. The // NewChunkDesc creates a new chunkDesc pointing to the provided chunk. The
// provided chunk is assumed to be not persisted yet. Therefore, the refCount of // provided chunk is assumed to be not persisted yet. Therefore, the refCount of
// the new chunkDesc is 1 (preventing eviction prior to persisting). // the new chunkDesc is 1 (preventing eviction prior to persisting).
func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { func NewChunkDesc(c Chunk, firstTime model.Time) *ChunkDesc {
chunkOps.WithLabelValues(createAndPin).Inc() chunkOps.WithLabelValues(createAndPin).Inc()
atomic.AddInt64(&numMemChunks, 1) atomic.AddInt64(&numMemChunks, 1)
numMemChunkDescs.Inc() numMemChunkDescs.Inc()
return &chunkDesc{ return &ChunkDesc{
c: c, c: c,
rCnt: 1, rCnt: 1,
chunkFirstTime: firstTime, chunkFirstTime: firstTime,
@ -116,18 +120,18 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc {
} }
} }
// add adds a sample pair to the underlying chunk. For safe concurrent access, // Add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of // The chunk must be pinned, and the caller must have locked the fingerprint of
// the series. // the series.
func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) { func (cd *ChunkDesc) Add(s model.SamplePair) ([]Chunk, error) {
return cd.c.add(s) return cd.c.Add(s)
} }
// pin increments the refCount by one. Upon increment from 0 to 1, this // pin increments the refCount by one. Upon increment from 0 to 1, this
// chunkDesc is removed from the evict list. To enable the latter, the // chunkDesc is removed from the evict list. To enable the latter, the
// evictRequests channel has to be provided. This method can be called // evictRequests channel has to be provided. This method can be called
// concurrently at any time. // concurrently at any time.
func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { func (cd *ChunkDesc) pin(evictRequests chan<- evictRequest) {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -142,7 +146,7 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) {
// chunkDesc is added to the evict list. To enable the latter, the evictRequests // chunkDesc is added to the evict list. To enable the latter, the evictRequests
// channel has to be provided. This method can be called concurrently at any // channel has to be provided. This method can be called concurrently at any
// time. // time.
func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { func (cd *ChunkDesc) unpin(evictRequests chan<- evictRequest) {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -158,7 +162,7 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) {
// refCount returns the number of pins. This method can be called concurrently // refCount returns the number of pins. This method can be called concurrently
// at any time. // at any time.
func (cd *chunkDesc) refCount() int { func (cd *ChunkDesc) refCount() int {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -169,18 +173,18 @@ func (cd *chunkDesc) refCount() int {
// can be called concurrently at any time. It only returns the immutable // can be called concurrently at any time. It only returns the immutable
// cd.chunkFirstTime without any locking. Arguably, this method is // cd.chunkFirstTime without any locking. Arguably, this method is
// useless. However, it provides consistency with the lastTime method. // useless. However, it provides consistency with the lastTime method.
func (cd *chunkDesc) firstTime() model.Time { func (cd *ChunkDesc) firstTime() model.Time {
return cd.chunkFirstTime return cd.chunkFirstTime
} }
// lastTime returns the timestamp of the last sample in the chunk. For safe // lastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to // concurrent access, this method requires the fingerprint of the time series to
// be locked. // be locked.
func (cd *chunkDesc) lastTime() (model.Time, error) { func (cd *ChunkDesc) lastTime() (model.Time, error) {
if cd.chunkLastTime != model.Earliest || cd.c == nil { if cd.chunkLastTime != model.Earliest || cd.c == nil {
return cd.chunkLastTime, nil return cd.chunkLastTime, nil
} }
return cd.c.newIterator().lastTimestamp() return cd.c.NewIterator().LastTimestamp()
} }
// maybePopulateLastTime populates the chunkLastTime from the underlying chunk // maybePopulateLastTime populates the chunkLastTime from the underlying chunk
@ -188,9 +192,9 @@ func (cd *chunkDesc) lastTime() (model.Time, error) {
// last sample to a chunk or after closing a head chunk due to age. For safe // last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked // concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series. // the fingerprint of the series.
func (cd *chunkDesc) maybePopulateLastTime() error { func (cd *ChunkDesc) maybePopulateLastTime() error {
if cd.chunkLastTime == model.Earliest && cd.c != nil { if cd.chunkLastTime == model.Earliest && cd.c != nil {
t, err := cd.c.newIterator().lastTimestamp() t, err := cd.c.NewIterator().LastTimestamp()
if err != nil { if err != nil {
return err return err
} }
@ -201,7 +205,7 @@ func (cd *chunkDesc) maybePopulateLastTime() error {
// isEvicted returns whether the chunk is evicted. For safe concurrent access, // isEvicted returns whether the chunk is evicted. For safe concurrent access,
// the caller must have locked the fingerprint of the series. // the caller must have locked the fingerprint of the series.
func (cd *chunkDesc) isEvicted() bool { func (cd *ChunkDesc) isEvicted() bool {
// Locking required here because we do not want the caller to force // Locking required here because we do not want the caller to force
// pinning the chunk first, so it could be evicted while this method is // pinning the chunk first, so it could be evicted while this method is
// called. // called.
@ -214,7 +218,7 @@ func (cd *chunkDesc) isEvicted() bool {
// setChunk sets the underlying chunk. The caller must have locked the // setChunk sets the underlying chunk. The caller must have locked the
// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first // fingerprint of the series and must have "pre-pinned" the chunk (i.e. first
// call pin and then set the chunk). // call pin and then set the chunk).
func (cd *chunkDesc) setChunk(c chunk) { func (cd *ChunkDesc) setChunk(c Chunk) {
if cd.c != nil { if cd.c != nil {
panic("chunk already set") panic("chunk already set")
} }
@ -224,7 +228,7 @@ func (cd *chunkDesc) setChunk(c chunk) {
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// is now evicted, which includes the case that the chunk was evicted even // is now evicted, which includes the case that the chunk was evicted even
// before this method was called. It can be called concurrently at any time. // before this method was called. It can be called concurrently at any time.
func (cd *chunkDesc) maybeEvict() bool { func (cd *ChunkDesc) maybeEvict() bool {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -244,133 +248,134 @@ func (cd *chunkDesc) maybeEvict() bool {
return true return true
} }
// chunk is the interface for all chunks. Chunks are generally not // Chunk is the interface for all chunks. Chunks are generally not
// goroutine-safe. // goroutine-safe.
type chunk interface { type Chunk interface {
// add adds a SamplePair to the chunks, performs any necessary // add adds a SamplePair to the chunks, performs any necessary
// re-encoding, and adds any necessary overflow chunks. It returns the // re-encoding, and adds any necessary overflow chunks. It returns the
// new version of the original chunk, followed by overflow chunks, if // new version of the original chunk, followed by overflow chunks, if
// any. The first chunk returned might be the same as the original one // any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as // or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk. // the relevant one and discard the original chunk.
add(sample model.SamplePair) ([]chunk, error) Add(sample model.SamplePair) ([]Chunk, error)
clone() chunk Clone() Chunk
firstTime() model.Time FirstTime() model.Time
newIterator() chunkIterator NewIterator() ChunkIterator
marshal(io.Writer) error Marshal(io.Writer) error
marshalToBuf([]byte) error MarshalToBuf([]byte) error
unmarshal(io.Reader) error Unmarshal(io.Reader) error
unmarshalFromBuf([]byte) error UnmarshalFromBuf([]byte) error
encoding() chunkEncoding Encoding() ChunkEncoding
} }
// A chunkIterator enables efficient access to the content of a chunk. It is // ChunkIterator enables efficient access to the content of a chunk. It is
// generally not safe to use a chunkIterator concurrently with or after chunk // generally not safe to use a chunkIterator concurrently with or after chunk
// mutation. // mutation.
type chunkIterator interface { type ChunkIterator interface {
// Gets the last timestamp in the chunk. // Gets the last timestamp in the chunk.
lastTimestamp() (model.Time, error) LastTimestamp() (model.Time, error)
// Whether a given timestamp is contained between first and last value // Whether a given timestamp is contained between first and last value
// in the chunk. // in the chunk.
contains(model.Time) (bool, error) Contains(model.Time) (bool, error)
// Scans the next value in the chunk. Directly after the iterator has // Scans the next value in the chunk. Directly after the iterator has
// been created, the next value is the first value in the // been created, the next value is the first value in the
// chunk. Otherwise, it is the value following the last value scanned or // chunk. Otherwise, it is the value following the last value scanned or
// found (by one of the find... methods). Returns false if either the // found (by one of the find... methods). Returns false if either the
// end of the chunk is reached or an error has occurred. // end of the chunk is reached or an error has occurred.
scan() bool Scan() bool
// Finds the most recent value at or before the provided time. Returns // Finds the most recent value at or before the provided time. Returns
// false if either the chunk contains no value at or before the provided // false if either the chunk contains no value at or before the provided
// time, or an error has occurred. // time, or an error has occurred.
findAtOrBefore(model.Time) bool FindAtOrBefore(model.Time) bool
// Finds the oldest value at or after the provided time. Returns false // Finds the oldest value at or after the provided time. Returns false
// if either the chunk contains no value at or after the provided time, // if either the chunk contains no value at or after the provided time,
// or an error has occurred. // or an error has occurred.
findAtOrAfter(model.Time) bool FindAtOrAfter(model.Time) bool
// Returns the last value scanned (by the scan method) or found (by one // Returns the last value scanned (by the scan method) or found (by one
// of the find... methods). It returns ZeroSamplePair before any of // of the find... methods). It returns ZeroSamplePair before any of
// those methods were called. // those methods were called.
value() model.SamplePair Value() model.SamplePair
// Returns the last error encountered. In general, an error signals data // Returns the last error encountered. In general, an error signals data
// corruption in the chunk and requires quarantining. // corruption in the chunk and requires quarantining.
err() error Err() error
} }
// rangeValues is a utility function that retrieves all values within the given // rangeValues is a utility function that retrieves all values within the given
// range from a chunkIterator. // range from a chunkIterator.
func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) { func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, error) {
result := []model.SamplePair{} result := []model.SamplePair{}
if !it.findAtOrAfter(in.OldestInclusive) { if !it.FindAtOrAfter(in.OldestInclusive) {
return result, it.err() return result, it.Err()
} }
for !it.value().Timestamp.After(in.NewestInclusive) { for !it.Value().Timestamp.After(in.NewestInclusive) {
result = append(result, it.value()) result = append(result, it.Value())
if !it.scan() { if !it.Scan() {
break break
} }
} }
return result, it.err() return result, it.Err()
} }
// addToOverflowChunk is a utility function that creates a new chunk as overflow // addToOverflowChunk is a utility function that creates a new chunk as overflow
// chunk, adds the provided sample to it, and returns a chunk slice containing // chunk, adds the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk. // the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(c chunk, s model.SamplePair) ([]chunk, error) { func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
overflowChunks, err := newChunk().add(s) overflowChunks, err := NewChunk().Add(s)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return []chunk{c, overflowChunks[0]}, nil return []Chunk{c, overflowChunks[0]}, nil
} }
// transcodeAndAdd is a utility function that transcodes the dst chunk into the // transcodeAndAdd is a utility function that transcodes the dst chunk into the
// provided src chunk (plus the necessary overflow chunks) and then adds the // provided src chunk (plus the necessary overflow chunks) and then adds the
// provided sample. It returns the new chunks (transcoded plus overflow) with // provided sample. It returns the new chunks (transcoded plus overflow) with
// the new sample at the end. // the new sample at the end.
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) {
chunkOps.WithLabelValues(transcode).Inc() chunkOps.WithLabelValues(transcode).Inc()
var ( var (
head = dst head = dst
body, newChunks []chunk body, NewChunks []Chunk
err error err error
) )
it := src.newIterator() it := src.NewIterator()
for it.scan() { for it.Scan() {
if newChunks, err = head.add(it.value()); err != nil { if NewChunks, err = head.Add(it.Value()); err != nil {
return nil, err return nil, err
} }
body = append(body, newChunks[:len(newChunks)-1]...) body = append(body, NewChunks[:len(NewChunks)-1]...)
head = newChunks[len(newChunks)-1] head = NewChunks[len(NewChunks)-1]
} }
if it.err() != nil { if it.Err() != nil {
return nil, it.err() return nil, it.Err()
} }
if newChunks, err = head.add(s); err != nil { if NewChunks, err = head.Add(s); err != nil {
return nil, err return nil, err
} }
return append(body, newChunks...), nil return append(body, NewChunks...), nil
} }
// newChunk creates a new chunk according to the encoding set by the // NewChunk creates a new chunk according to the encoding set by the
// DefaultChunkEncoding flag. // DefaultChunkEncoding flag.
func newChunk() chunk { func NewChunk() Chunk {
chunk, err := newChunkForEncoding(DefaultChunkEncoding) chunk, err := NewChunkForEncoding(DefaultChunkEncoding)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return chunk return chunk
} }
func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { // NewChunkForEncoding allows configuring what chunk type you want
func NewChunkForEncoding(encoding ChunkEncoding) (Chunk, error) {
switch encoding { switch encoding {
case delta: case Delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case doubleDelta: case DoubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case varbit: case Varbit:
return newVarbitChunk(varbitZeroEncoding), nil return newVarbitChunk(varbitZeroEncoding), nil
default: default:
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
@ -403,18 +408,18 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC
} }
// lastTimestamp implements chunkIterator. // lastTimestamp implements chunkIterator.
func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) {
return it.acc.timestampAtIndex(it.len - 1), it.acc.err() return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
} }
// contains implements chunkIterator. // contains implements chunkIterator.
func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) {
return !t.Before(it.acc.timestampAtIndex(0)) && return !t.Before(it.acc.timestampAtIndex(0)) &&
!t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err()
} }
// scan implements chunkIterator. // scan implements chunkIterator.
func (it *indexAccessingChunkIterator) scan() bool { func (it *indexAccessingChunkIterator) Scan() bool {
it.pos++ it.pos++
if it.pos >= it.len { if it.pos >= it.len {
return false return false
@ -427,7 +432,7 @@ func (it *indexAccessingChunkIterator) scan() bool {
} }
// findAtOrBefore implements chunkIterator. // findAtOrBefore implements chunkIterator.
func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool { func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return it.acc.timestampAtIndex(i).After(t) return it.acc.timestampAtIndex(i).After(t)
}) })
@ -443,7 +448,7 @@ func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool {
} }
// findAtOrAfter implements chunkIterator. // findAtOrAfter implements chunkIterator.
func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool { func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return !it.acc.timestampAtIndex(i).Before(t) return !it.acc.timestampAtIndex(i).Before(t)
}) })
@ -459,11 +464,11 @@ func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool {
} }
// value implements chunkIterator. // value implements chunkIterator.
func (it *indexAccessingChunkIterator) value() model.SamplePair { func (it *indexAccessingChunkIterator) Value() model.SamplePair {
return it.lastValue return it.lastValue
} }
// err implements chunkIterator. // err implements chunkIterator.
func (it *indexAccessingChunkIterator) err() error { func (it *indexAccessingChunkIterator) Err() error {
return it.acc.err() return it.acc.err()
} }

View file

@ -114,7 +114,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
) )
} }
s.chunkDescs = append( s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-s.persistWatermark), make([]*ChunkDesc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]..., s.chunkDescs[s.persistWatermark:]...,
) )
numMemChunkDescs.Sub(float64(s.persistWatermark)) numMemChunkDescs.Sub(float64(s.persistWatermark))

View file

@ -73,7 +73,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
} }
// add implements chunk. // add implements chunk.
func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.len() == 0 { if c.len() == 0 {
c = c[:deltaHeaderBytes] c = c[:deltaHeaderBytes]
@ -174,23 +174,23 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
} }
} }
return []chunk{&c}, nil return []Chunk{&c}, nil
} }
// clone implements chunk. // clone implements chunk.
func (c deltaEncodedChunk) clone() chunk { func (c deltaEncodedChunk) Clone() Chunk {
clone := make(deltaEncodedChunk, len(c), cap(c)) clone := make(deltaEncodedChunk, len(c), cap(c))
copy(clone, c) copy(clone, c)
return &clone return &clone
} }
// firstTime implements chunk. // firstTime implements chunk.
func (c deltaEncodedChunk) firstTime() model.Time { func (c deltaEncodedChunk) FirstTime() model.Time {
return c.baseTime() return c.baseTime()
} }
// newIterator implements chunk. // NewIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator { func (c *deltaEncodedChunk) NewIterator() ChunkIterator {
return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
c: *c, c: *c,
baseT: c.baseTime(), baseT: c.baseTime(),
@ -202,7 +202,7 @@ func (c *deltaEncodedChunk) newIterator() chunkIterator {
} }
// marshal implements chunk. // marshal implements chunk.
func (c deltaEncodedChunk) marshal(w io.Writer) error { func (c deltaEncodedChunk) Marshal(w io.Writer) error {
if len(c) > math.MaxUint16 { if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.") panic("chunk buffer length would overflow a 16 bit uint.")
} }
@ -218,8 +218,8 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error {
return nil return nil
} }
// marshalToBuf implements chunk. // MarshalToBuf implements chunk.
func (c deltaEncodedChunk) marshalToBuf(buf []byte) error { func (c deltaEncodedChunk) MarshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 { if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint") panic("chunk buffer length would overflow a 16 bit uint")
} }
@ -233,7 +233,7 @@ func (c deltaEncodedChunk) marshalToBuf(buf []byte) error {
} }
// unmarshal implements chunk. // unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil { if _, err := io.ReadFull(r, *c); err != nil {
return err return err
@ -250,7 +250,7 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
} }
// unmarshalFromBuf implements chunk. // unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error { func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
copy(*c, buf) copy(*c, buf)
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
@ -265,7 +265,7 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
} }
// encoding implements chunk. // encoding implements chunk.
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta } func (c deltaEncodedChunk) Encoding() ChunkEncoding { return Delta }
func (c deltaEncodedChunk) timeBytes() deltaBytes { func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset]) return deltaBytes(c[deltaHeaderTimeBytesOffset])

View file

@ -53,13 +53,13 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
cases := []struct { cases := []struct {
chunkTypeName string chunkTypeName string
chunkConstructor func(deltaBytes, deltaBytes, bool, int) chunk chunkConstructor func(deltaBytes, deltaBytes, bool, int) Chunk
minHeaderLen int minHeaderLen int
chunkLenPos int chunkLenPos int
}{ }{
{ {
chunkTypeName: "deltaEncodedChunk", chunkTypeName: "deltaEncodedChunk",
chunkConstructor: func(a, b deltaBytes, c bool, d int) chunk { chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk {
return newDeltaEncodedChunk(a, b, c, d) return newDeltaEncodedChunk(a, b, c, d)
}, },
minHeaderLen: deltaHeaderBytes, minHeaderLen: deltaHeaderBytes,
@ -67,7 +67,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
}, },
{ {
chunkTypeName: "doubleDeltaEncodedChunk", chunkTypeName: "doubleDeltaEncodedChunk",
chunkConstructor: func(a, b deltaBytes, c bool, d int) chunk { chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk {
return newDoubleDeltaEncodedChunk(a, b, c, d) return newDoubleDeltaEncodedChunk(a, b, c, d)
}, },
minHeaderLen: doubleDeltaHeaderMinBytes, minHeaderLen: doubleDeltaHeaderMinBytes,
@ -77,7 +77,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
for _, c := range cases { for _, c := range cases {
chunk := c.chunkConstructor(d1, d4, false, chunkLen) chunk := c.chunkConstructor(d1, d4, false, chunkLen)
cs, err := chunk.add(model.SamplePair{ cs, err := chunk.Add(model.SamplePair{
Timestamp: model.Now(), Timestamp: model.Now(),
Value: model.SampleValue(100), Value: model.SampleValue(100),
}) })
@ -87,16 +87,16 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
buf := make([]byte, chunkLen) buf := make([]byte, chunkLen)
cs[0].marshalToBuf(buf) cs[0].MarshalToBuf(buf)
// Corrupt the length to be every possible too-small value // Corrupt the length to be every possible too-small value
for i := 0; i < c.minHeaderLen; i++ { for i := 0; i < c.minHeaderLen; i++ {
binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i)) binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i))
err = cs[0].unmarshalFromBuf(buf) err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", i) verifyUnmarshallingError(err, c.chunkTypeName, "buf", i)
err = cs[0].unmarshal(bytes.NewBuffer(buf)) err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", i) verifyUnmarshallingError(err, c.chunkTypeName, "Reader", i)
} }
} }

View file

@ -81,7 +81,7 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
} }
// add implements chunk. // add implements chunk.
func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.len() == 0 { if c.len() == 0 {
return c.addFirstSample(s), nil return c.addFirstSample(s), nil
@ -181,23 +181,23 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
} }
} }
return []chunk{&c}, nil return []Chunk{&c}, nil
} }
// clone implements chunk. // clone implements chunk.
func (c doubleDeltaEncodedChunk) clone() chunk { func (c doubleDeltaEncodedChunk) Clone() Chunk {
clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
copy(clone, c) copy(clone, c)
return &clone return &clone
} }
// firstTime implements chunk. // firstTime implements chunk.
func (c doubleDeltaEncodedChunk) firstTime() model.Time { func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
return c.baseTime() return c.baseTime()
} }
// newIterator implements chunk. // NewIterator( implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { func (c *doubleDeltaEncodedChunk) NewIterator() ChunkIterator {
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
c: *c, c: *c,
baseT: c.baseTime(), baseT: c.baseTime(),
@ -211,7 +211,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
} }
// marshal implements chunk. // marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error {
if len(c) > math.MaxUint16 { if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint") panic("chunk buffer length would overflow a 16 bit uint")
} }
@ -227,8 +227,8 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
return nil return nil
} }
// marshalToBuf implements chunk. // MarshalToBuf implements chunk.
func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error { func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 { if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint") panic("chunk buffer length would overflow a 16 bit uint")
} }
@ -242,7 +242,7 @@ func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error {
} }
// unmarshal implements chunk. // unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil { if _, err := io.ReadFull(r, *c); err != nil {
return err return err
@ -260,7 +260,7 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
} }
// unmarshalFromBuf implements chunk. // unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error { func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
copy(*c, buf) copy(*c, buf)
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
@ -275,7 +275,7 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
} }
// encoding implements chunk. // encoding implements chunk.
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta } func (c doubleDeltaEncodedChunk) Encoding() ChunkEncoding { return DoubleDelta }
func (c doubleDeltaEncodedChunk) baseTime() model.Time { func (c doubleDeltaEncodedChunk) baseTime() model.Time {
return model.Time( return model.Time(
@ -347,7 +347,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and // addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value. // value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk { func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8] c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:], c[doubleDeltaHeaderBaseTimeOffset:],
@ -357,12 +357,12 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk {
c[doubleDeltaHeaderBaseValueOffset:], c[doubleDeltaHeaderBaseValueOffset:],
math.Float64bits(float64(s.Value)), math.Float64bits(float64(s.Value)),
) )
return []chunk{&c} return []Chunk{&c}
} }
// addSecondSample is a helper method only used by c.add(). It calculates the // addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk. // base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) { func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) {
baseTimeDelta := s.Timestamp - c.baseTime() baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 { if baseTimeDelta < 0 {
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
@ -403,7 +403,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt
math.Float64bits(float64(baseValueDelta)), math.Float64bits(float64(baseValueDelta)),
) )
} }
return []chunk{&c}, nil return []Chunk{&c}, nil
} }
// doubleDeltaEncodedIndexAccessor implements indexAccessor. // doubleDeltaEncodedIndexAccessor implements indexAccessor.

View file

@ -107,7 +107,7 @@ func (hs *headsScanner) scan() bool {
firstTime int64 firstTime int64
lastTime int64 lastTime int64
encoding byte encoding byte
ch chunk ch Chunk
lastTimeHead model.Time lastTimeHead model.Time
) )
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
@ -146,7 +146,7 @@ func (hs *headsScanner) scan() bool {
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil { if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false return false
} }
chunkDescs := make([]*chunkDesc, numChunkDescs) chunkDescs := make([]*ChunkDesc, numChunkDescs)
if hs.version == headsFormatLegacyVersion { if hs.version == headsFormatLegacyVersion {
if headChunkPersisted { if headChunkPersisted {
persistWatermark = numChunkDescs persistWatermark = numChunkDescs
@ -163,7 +163,7 @@ func (hs *headsScanner) scan() bool {
if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false return false
} }
chunkDescs[i] = &chunkDesc{ chunkDescs[i] = &ChunkDesc{
chunkFirstTime: model.Time(firstTime), chunkFirstTime: model.Time(firstTime),
chunkLastTime: model.Time(lastTime), chunkLastTime: model.Time(lastTime),
} }
@ -176,13 +176,13 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false return false
} }
if ch, hs.err = newChunkForEncoding(chunkEncoding(encoding)); hs.err != nil { if ch, hs.err = NewChunkForEncoding(ChunkEncoding(encoding)); hs.err != nil {
return false return false
} }
if hs.err = ch.unmarshal(hs.r); hs.err != nil { if hs.err = ch.Unmarshal(hs.r); hs.err != nil {
return false return false
} }
cd := newChunkDesc(ch, ch.firstTime()) cd := NewChunkDesc(ch, ch.FirstTime())
if i < numChunkDescs-1 { if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk // This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime. // to be persisted, and we need to populate lastTime.

View file

@ -370,7 +370,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
// //
// Returning an error signals problems with the series file. In this case, the // Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series. // caller should quarantine the series.
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index int, err error) {
f, err := p.openChunkFileForWriting(fp) f, err := p.openChunkFileForWriting(fp)
if err != nil { if err != nil {
return -1, err return -1, err
@ -399,14 +399,14 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index
// incrementally larger indexes. The indexOffset denotes the offset to be added to // incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or // each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently. // drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) {
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer f.Close() defer f.Close()
chunks := make([]chunk, 0, len(indexes)) chunks := make([]Chunk, 0, len(indexes))
buf := p.bufPool.Get().([]byte) buf := p.bufPool.Get().([]byte)
defer func() { defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
@ -436,11 +436,11 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err return nil, err
} }
for c := 0; c < batchSize; c++ { for c := 0; c < batchSize; c++ {
chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) chunk, err := NewChunkForEncoding(ChunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return nil, err return nil, err
} }
chunks = append(chunks, chunk) chunks = append(chunks, chunk)
@ -455,7 +455,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
// the number of chunkDescs to skip from the end of the series file. It is the // the number of chunkDescs to skip from the end of the series file. It is the
// caller's responsibility to not persist or drop anything for the same // caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently. // fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) {
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
@ -478,7 +478,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
} }
numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
cds := make([]*chunkDesc, numChunks) cds := make([]*ChunkDesc, numChunks)
chunkTimesBuf := make([]byte, 16) chunkTimesBuf := make([]byte, 16)
for i := 0; i < numChunks; i++ { for i := 0; i < numChunks; i++ {
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
@ -490,7 +490,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
if err != nil { if err != nil {
return nil, err return nil, err
} }
cds[i] = &chunkDesc{ cds[i] = &ChunkDesc{
chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
chunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), chunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
} }
@ -542,7 +542,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
// (4.8.1.2) The varint-encoded last time. // (4.8.1.2) The varint-encoded last time.
// //
// (4.8.2.1) A byte defining the chunk type. // (4.8.2.1) A byte defining the chunk type.
// (4.8.2.2) The chunk itself, marshaled with the marshal() method. // (4.8.2.2) The chunk itself, marshaled with the Marshal() method.
// //
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...") log.Info("Checkpointing in-memory metrics and chunks...")
@ -657,10 +657,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
} else { } else {
// This is a non-persisted chunk. Fully marshal it. // This is a non-persisted chunk. Fully marshal it.
if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil { if err = w.WriteByte(byte(chunkDesc.c.Encoding())); err != nil {
return return
} }
if err = chunkDesc.c.marshal(w); err != nil { if err = chunkDesc.c.Marshal(w); err != nil {
return return
} }
} }
@ -751,7 +751,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
// Returning an error signals problems with the series file. In this case, the // Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series. // caller should quarantine the series.
func (p *persistence) dropAndPersistChunks( func (p *persistence) dropAndPersistChunks(
fp model.Fingerprint, beforeTime model.Time, chunks []chunk, fp model.Fingerprint, beforeTime model.Time, chunks []Chunk,
) ( ) (
firstTimeNotDropped model.Time, firstTimeNotDropped model.Time,
offset int, offset int,
@ -769,7 +769,7 @@ func (p *persistence) dropAndPersistChunks(
i := 0 i := 0
for ; i < len(chunks); i++ { for ; i < len(chunks); i++ {
var lt model.Time var lt model.Time
lt, err = chunks[i].newIterator().lastTimestamp() lt, err = chunks[i].NewIterator().LastTimestamp()
if err != nil { if err != nil {
return return
} }
@ -778,7 +778,7 @@ func (p *persistence) dropAndPersistChunks(
} }
} }
if i < len(chunks) { if i < len(chunks) {
firstTimeNotDropped = chunks[i].firstTime() firstTimeNotDropped = chunks[i].FirstTime()
} }
if i > 0 || firstTimeNotDropped.Before(beforeTime) { if i > 0 || firstTimeNotDropped.Before(beforeTime) {
// Series file has to go. // Series file has to go.
@ -1500,7 +1500,7 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) {
return fpm, highestMappedFP, nil return fpm, highestMappedFP, nil
} }
func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { func (p *persistence) writeChunks(w io.Writer, chunks []Chunk) error {
b := p.bufPool.Get().([]byte) b := p.bufPool.Get().([]byte)
defer func() { defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
@ -1522,7 +1522,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error {
if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil { if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil {
return err return err
} }
if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { if err := chunk.MarshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return err return err
} }
} }
@ -1547,13 +1547,13 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil return int(offset) / chunkLenWithHeader, nil
} }
func writeChunkHeader(header []byte, c chunk) error { func writeChunkHeader(header []byte, c Chunk) error {
header[chunkHeaderTypeOffset] = byte(c.encoding()) header[chunkHeaderTypeOffset] = byte(c.Encoding())
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:], header[chunkHeaderFirstTimeOffset:],
uint64(c.firstTime()), uint64(c.FirstTime()),
) )
lt, err := c.newIterator().lastTimestamp() lt, err := c.NewIterator().LastTimestamp()
if err != nil { if err != nil {
return err return err
} }

View file

@ -38,7 +38,7 @@ var (
m5 = model.Metric{"label": "value5"} m5 = model.Metric{"label": "value5"}
) )
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) { func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, testutil.Closer) {
DefaultChunkEncoding = encoding DefaultChunkEncoding = encoding
dir := testutil.NewTemporaryDirectory("test_persistence", t) dir := testutil.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1) p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1)
@ -53,22 +53,22 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes
}) })
} }
func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk { func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint][]Chunk {
fps := model.Fingerprints{ fps := model.Fingerprints{
m1.FastFingerprint(), m1.FastFingerprint(),
m2.FastFingerprint(), m2.FastFingerprint(),
m3.FastFingerprint(), m3.FastFingerprint(),
} }
fpToChunks := map[model.Fingerprint][]chunk{} fpToChunks := map[model.Fingerprint][]Chunk{}
for _, fp := range fps { for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10) fpToChunks[fp] = make([]Chunk, 0, 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
ch, err := newChunkForEncoding(encoding) ch, err := NewChunkForEncoding(encoding)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
chs, err := ch.add(model.SamplePair{ chs, err := ch.Add(model.SamplePair{
Timestamp: model.Time(i), Timestamp: model.Time(i),
Value: model.SampleValue(fp), Value: model.SampleValue(fp),
}) })
@ -81,18 +81,18 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint
return fpToChunks return fpToChunks
} }
func chunksEqual(c1, c2 chunk) bool { func chunksEqual(c1, c2 Chunk) bool {
it1 := c1.newIterator() it1 := c1.NewIterator()
it2 := c2.newIterator() it2 := c2.NewIterator()
for it1.scan() && it2.scan() { for it1.Scan() && it2.Scan() {
if !(it1.value() == it2.value()) { if !(it1.Value() == it2.Value()) {
return false return false
} }
} }
return it1.err() == nil && it2.err() == nil return it1.Err() == nil && it2.Err() == nil
} }
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -450,7 +450,7 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1) testPersistLoadDropChunks(t, 1)
} }
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding) { func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -461,16 +461,16 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
s3, _ := newMemorySeries(m3, nil, time.Time{}) s3, _ := newMemorySeries(m3, nil, time.Time{})
s4, _ := newMemorySeries(m4, nil, time.Time{}) s4, _ := newMemorySeries(m4, nil, time.Time{})
s5, _ := newMemorySeries(m5, nil, time.Time{}) s5, _ := newMemorySeries(m5, nil, time.Time{})
s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) s1.Add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.Add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true s3.headChunkClosed = true
s3.persistWatermark = 1 s3.persistWatermark = 1
for i := 0; i < 10000; i++ { for i := 0; i < 10000; i++ {
s4.add(model.SamplePair{ s4.Add(model.SamplePair{
Timestamp: model.Time(i), Timestamp: model.Time(i),
Value: model.SampleValue(i) / 2, Value: model.SampleValue(i) / 2,
}) })
s5.add(model.SamplePair{ s5.Add(model.SamplePair{
Timestamp: model.Time(i), Timestamp: model.Time(i),
Value: model.SampleValue(i * i), Value: model.SampleValue(i * i),
}) })
@ -562,10 +562,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
for i, cd := range loadedS4.chunkDescs { for i, cd := range loadedS4.chunkDescs {
if cd.chunkFirstTime != cd.c.firstTime() { if cd.chunkFirstTime != cd.c.FirstTime() {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime, i, cd.c.FirstTime(), cd.chunkFirstTime,
) )
} }
if i == len(loadedS4.chunkDescs)-1 { if i == len(loadedS4.chunkDescs)-1 {
@ -575,7 +575,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
} }
continue continue
} }
lastTime, err := cd.c.newIterator().lastTimestamp() lastTime, err := cd.c.NewIterator().LastTimestamp()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -619,10 +619,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
} }
continue continue
} }
if cd.chunkFirstTime != cd.c.firstTime() { if cd.chunkFirstTime != cd.c.FirstTime() {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime, i, cd.c.FirstTime(), cd.chunkFirstTime,
) )
} }
if i == len(loadedS5.chunkDescs)-1 { if i == len(loadedS5.chunkDescs)-1 {
@ -632,7 +632,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
} }
continue continue
} }
lastTime, err := cd.c.newIterator().lastTimestamp() lastTime, err := cd.c.NewIterator().LastTimestamp()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -690,7 +690,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) {
} }
} }
func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { func testFingerprintsModifiedBefore(t *testing.T, encoding ChunkEncoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -769,7 +769,7 @@ func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) {
testFingerprintsModifiedBefore(t, 2) testFingerprintsModifiedBefore(t, 2)
} }
func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { func testDropArchivedMetric(t *testing.T, encoding ChunkEncoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -843,7 +843,7 @@ type incrementalBatch struct {
expectedLpToFps index.LabelPairFingerprintsMapping expectedLpToFps index.LabelPairFingerprintsMapping
} }
func testIndexing(t *testing.T, encoding chunkEncoding) { func testIndexing(t *testing.T, encoding ChunkEncoding) {
batches := []incrementalBatch{ batches := []incrementalBatch{
{ {
fpToMetric: index.FingerprintMetricMapping{ fpToMetric: index.FingerprintMetricMapping{

View file

@ -138,7 +138,7 @@ func (sm *seriesMap) fpIter() <-chan model.Fingerprint {
type memorySeries struct { type memorySeries struct {
metric model.Metric metric model.Metric
// Sorted by start time, overlapping chunk ranges are forbidden. // Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs []*chunkDesc chunkDescs []*ChunkDesc
// The index (within chunkDescs above) of the first chunkDesc that // The index (within chunkDescs above) of the first chunkDesc that
// points to a non-persisted chunk. If all chunks are persisted, then // points to a non-persisted chunk. If all chunks are persisted, then
// persistWatermark == len(chunkDescs). // persistWatermark == len(chunkDescs).
@ -191,7 +191,7 @@ type memorySeries struct {
// set to model.Earliest. The zero value for modTime can be used if the // set to model.Earliest. The zero value for modTime can be used if the
// modification time of the series file is unknown (e.g. if this is a genuinely // modification time of the series file is unknown (e.g. if this is a genuinely
// new series). // new series).
func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) { func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time) (*memorySeries, error) {
var err error var err error
firstTime := model.Earliest firstTime := model.Earliest
lastTime := model.Earliest lastTime := model.Earliest
@ -216,9 +216,9 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time)
// completed chunks (which are now eligible for persistence). // completed chunks (which are now eligible for persistence).
// //
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(v model.SamplePair) (int, error) { func (s *memorySeries) Add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed { if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := newChunkDesc(newChunk(), v.Timestamp) newHead := NewChunkDesc(NewChunk(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead) s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkClosed = false s.headChunkClosed = false
} else if s.headChunkUsedByIterator && s.head().refCount() > 1 { } else if s.headChunkUsedByIterator && s.head().refCount() > 1 {
@ -234,18 +234,18 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
chunkOps.WithLabelValues(clone).Inc() chunkOps.WithLabelValues(clone).Inc()
// No locking needed here because a non-persisted head chunk can // No locking needed here because a non-persisted head chunk can
// not get evicted concurrently. // not get evicted concurrently.
s.head().c = s.head().c.clone() s.head().c = s.head().c.Clone()
s.headChunkUsedByIterator = false s.headChunkUsedByIterator = false
} }
chunks, err := s.head().add(v) chunks, err := s.head().Add(v)
if err != nil { if err != nil {
return 0, err return 0, err
} }
s.head().c = chunks[0] s.head().c = chunks[0]
for _, c := range chunks[1:] { for _, c := range chunks[1:] {
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c, c.firstTime())) s.chunkDescs = append(s.chunkDescs, NewChunkDesc(c, c.FirstTime()))
} }
// Populate lastTime of now-closed chunks. // Populate lastTime of now-closed chunks.
@ -285,14 +285,14 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted)
if lenToKeep < len(s.chunkDescs) { if lenToKeep < len(s.chunkDescs) {
s.savedFirstTime = s.firstTime() s.savedFirstTime = s.FirstTime()
lenEvicted := len(s.chunkDescs) - lenToKeep lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted s.chunkDescsOffset += lenEvicted
s.persistWatermark -= lenEvicted s.persistWatermark -= lenEvicted
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted))
numMemChunkDescs.Sub(float64(lenEvicted)) numMemChunkDescs.Sub(float64(lenEvicted))
s.chunkDescs = append( s.chunkDescs = append(
make([]*chunkDesc, 0, lenToKeep), make([]*ChunkDesc, 0, lenToKeep),
s.chunkDescs[lenEvicted:]..., s.chunkDescs[lenEvicted:]...,
) )
s.dirty = true s.dirty = true
@ -322,7 +322,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
return nil return nil
} }
s.chunkDescs = append( s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), make([]*ChunkDesc, 0, len(s.chunkDescs)-keepIdx),
s.chunkDescs[keepIdx:]..., s.chunkDescs[keepIdx:]...,
) )
s.persistWatermark -= keepIdx s.persistWatermark -= keepIdx
@ -342,7 +342,7 @@ func (s *memorySeries) preloadChunks(
indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage,
) (SeriesIterator, error) { ) (SeriesIterator, error) {
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) pinnedChunkDescs := make([]*ChunkDesc, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
cd := s.chunkDescs[idx] cd := s.chunkDescs[idx]
pinnedChunkDescs = append(pinnedChunkDescs, cd) pinnedChunkDescs = append(pinnedChunkDescs, cd)
@ -380,23 +380,23 @@ func (s *memorySeries) preloadChunks(
} }
iter := &boundedIterator{ iter := &boundedIterator{
it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries, mss.evictRequests), it: s.NewIterator(pinnedChunkDescs, curriedQuarantineSeries, mss.evictRequests),
start: model.Now().Add(-mss.dropAfter), start: model.Now().Add(-mss.dropAfter),
} }
return iter, nil return iter, nil
} }
// newIterator returns a new SeriesIterator for the provided chunkDescs (which // NewIterator( returns a new SeriesIterator for the provided chunkDescs (which
// must be pinned). // must be pinned).
// //
// The caller must have locked the fingerprint of the memorySeries. // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) newIterator( func (s *memorySeries) NewIterator(
pinnedChunkDescs []*chunkDesc, pinnedChunkDescs []*ChunkDesc,
quarantine func(error), quarantine func(error),
evictRequests chan<- evictRequest, evictRequests chan<- evictRequest,
) SeriesIterator { ) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs)) chunks := make([]Chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the // It's OK to directly access cd.c here (without locking) as the
// series FP is locked and the chunk is pinned. // series FP is locked and the chunk is pinned.
@ -404,7 +404,7 @@ func (s *memorySeries) newIterator(
} }
return &memorySeriesIterator{ return &memorySeriesIterator{
chunks: chunks, chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)), chunkIts: make([]ChunkIterator, len(chunks)),
quarantine: quarantine, quarantine: quarantine,
metric: s.metric, metric: s.metric,
pinnedChunkDescs: pinnedChunkDescs, pinnedChunkDescs: pinnedChunkDescs,
@ -504,14 +504,14 @@ func (s *memorySeries) preloadChunksForRange(
// head returns a pointer to the head chunk descriptor. The caller must have // head returns a pointer to the head chunk descriptor. The caller must have
// locked the fingerprint of the memorySeries. This method will panic if this // locked the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors. // series has no chunk descriptors.
func (s *memorySeries) head() *chunkDesc { func (s *memorySeries) head() *ChunkDesc {
return s.chunkDescs[len(s.chunkDescs)-1] return s.chunkDescs[len(s.chunkDescs)-1]
} }
// firstTime returns the timestamp of the first sample in the series. // firstTime returns the timestamp of the first sample in the series.
// //
// The caller must have locked the fingerprint of the memorySeries. // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) firstTime() model.Time { func (s *memorySeries) FirstTime() model.Time {
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
return s.chunkDescs[0].firstTime() return s.chunkDescs[0].firstTime()
} }
@ -541,7 +541,7 @@ func (s *memorySeries) lastSamplePair() model.SamplePair {
// accordingly. // accordingly.
// //
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) chunksToPersist() []*chunkDesc { func (s *memorySeries) chunksToPersist() []*ChunkDesc {
newWatermark := len(s.chunkDescs) newWatermark := len(s.chunkDescs)
if !s.headChunkClosed { if !s.headChunkClosed {
newWatermark-- newWatermark--
@ -558,17 +558,17 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc {
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
// Last chunkIterator used by ValueAtOrBeforeTime. // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIt chunkIterator chunkIt ChunkIterator
// Caches chunkIterators. // Caches chunkIterators.
chunkIts []chunkIterator chunkIts []ChunkIterator
// The actual sample chunks. // The actual sample chunks.
chunks []chunk chunks []Chunk
// Call to quarantine the series this iterator belongs to. // Call to quarantine the series this iterator belongs to.
quarantine func(error) quarantine func(error)
// The metric corresponding to the iterator. // The metric corresponding to the iterator.
metric model.Metric metric model.Metric
// Chunks that were pinned for this iterator. // Chunks that were pinned for this iterator.
pinnedChunkDescs []*chunkDesc pinnedChunkDescs []*ChunkDesc
// Where to send evict requests when unpinning pinned chunks. // Where to send evict requests when unpinning pinned chunks.
evictRequests chan<- evictRequest evictRequests chan<- evictRequest
} }
@ -577,17 +577,17 @@ type memorySeriesIterator struct {
func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
// The most common case. We are iterating through a chunk. // The most common case. We are iterating through a chunk.
if it.chunkIt != nil { if it.chunkIt != nil {
containsT, err := it.chunkIt.contains(t) containsT, err := it.chunkIt.Contains(t)
if err != nil { if err != nil {
it.quarantine(err) it.quarantine(err)
return ZeroSamplePair return ZeroSamplePair
} }
if containsT { if containsT {
if it.chunkIt.findAtOrBefore(t) { if it.chunkIt.FindAtOrBefore(t) {
return it.chunkIt.value() return it.chunkIt.Value()
} }
if it.chunkIt.err() != nil { if it.chunkIt.Err() != nil {
it.quarantine(it.chunkIt.err()) it.quarantine(it.chunkIt.Err())
} }
return ZeroSamplePair return ZeroSamplePair
} }
@ -597,21 +597,21 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
return ZeroSamplePair return ZeroSamplePair
} }
// Find the last chunk where firstTime() is before or equal to t. // Find the last chunk where FirstTime() is before or equal to t.
l := len(it.chunks) - 1 l := len(it.chunks) - 1
i := sort.Search(len(it.chunks), func(i int) bool { i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[l-i].firstTime().After(t) return !it.chunks[l-i].FirstTime().After(t)
}) })
if i == len(it.chunks) { if i == len(it.chunks) {
// Even the first chunk starts after t. // Even the first chunk starts after t.
return ZeroSamplePair return ZeroSamplePair
} }
it.chunkIt = it.chunkIterator(l - i) it.chunkIt = it.chunkIterator(l - i)
if it.chunkIt.findAtOrBefore(t) { if it.chunkIt.FindAtOrBefore(t) {
return it.chunkIt.value() return it.chunkIt.Value()
} }
if it.chunkIt.err() != nil { if it.chunkIt.Err() != nil {
it.quarantine(it.chunkIt.err()) it.quarantine(it.chunkIt.Err())
} }
return ZeroSamplePair return ZeroSamplePair
} }
@ -620,12 +620,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
// Find the first chunk for which the first sample is within the interval. // Find the first chunk for which the first sample is within the interval.
i := sort.Search(len(it.chunks), func(i int) bool { i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[i].firstTime().Before(in.OldestInclusive) return !it.chunks[i].FirstTime().Before(in.OldestInclusive)
}) })
// Only now check the last timestamp of the previous chunk (which is // Only now check the last timestamp of the previous chunk (which is
// fairly expensive). // fairly expensive).
if i > 0 { if i > 0 {
lt, err := it.chunkIterator(i - 1).lastTimestamp() lt, err := it.chunkIterator(i - 1).LastTimestamp()
if err != nil { if err != nil {
it.quarantine(err) it.quarantine(err)
return nil return nil
@ -637,7 +637,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
values := []model.SamplePair{} values := []model.SamplePair{}
for j, c := range it.chunks[i:] { for j, c := range it.chunks[i:] {
if c.firstTime().After(in.NewestInclusive) { if c.FirstTime().After(in.NewestInclusive) {
break break
} }
chValues, err := rangeValues(it.chunkIterator(i+j), in) chValues, err := rangeValues(it.chunkIterator(i+j), in)
@ -656,10 +656,10 @@ func (it *memorySeriesIterator) Metric() metric.Metric {
// chunkIterator returns the chunkIterator for the chunk at position i (and // chunkIterator returns the chunkIterator for the chunk at position i (and
// creates it if needed). // creates it if needed).
func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator {
chunkIt := it.chunkIts[i] chunkIt := it.chunkIts[i]
if chunkIt == nil { if chunkIt == nil {
chunkIt = it.chunks[i].newIterator() chunkIt = it.chunks[i].NewIterator()
it.chunkIts[i] = chunkIt it.chunkIts[i] = chunkIt
} }
return chunkIt return chunkIt

View file

@ -26,11 +26,11 @@ func TestDropChunks(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
s.add(model.SamplePair{ s.Add(model.SamplePair{
Timestamp: 100, Timestamp: 100,
Value: 42, Value: 42,
}) })
s.add(model.SamplePair{ s.Add(model.SamplePair{
Timestamp: 110, Timestamp: 110,
Value: 4711, Value: 4711,
}) })

View file

@ -90,7 +90,7 @@ var (
) )
type evictRequest struct { type evictRequest struct {
cd *chunkDesc cd *ChunkDesc
evict bool evict bool
} }
@ -662,7 +662,7 @@ func (s *MemorySeriesStorage) metricForRange(
) (model.Metric, *memorySeries, bool) { ) (model.Metric, *memorySeries, bool) {
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if ok { if ok {
if series.lastTime.Before(from) || series.firstTime().After(through) { if series.lastTime.Before(from) || series.FirstTime().After(through) {
return nil, nil, false return nil, nil, false
} }
return series.metric, series, true return series.metric, series, true
@ -762,7 +762,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc() s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc()
return ErrOutOfOrderSample // Caused by the caller. return ErrOutOfOrderSample // Caused by the caller.
} }
completedChunksCount, err := series.add(model.SamplePair{ completedChunksCount, err := series.Add(model.SamplePair{
Value: sample.Value, Value: sample.Value,
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
}) })
@ -833,7 +833,7 @@ func (s *MemorySeriesStorage) logThrottling() {
func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
var cds []*chunkDesc var cds []*ChunkDesc
var modTime time.Time var modTime time.Time
unarchived, err := s.persistence.unarchiveMetric(fp) unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil { if err != nil {
@ -975,13 +975,13 @@ func (s *MemorySeriesStorage) maybeEvict() {
if numChunksToEvict <= 0 { if numChunksToEvict <= 0 {
return return
} }
chunkDescsToEvict := make([]*chunkDesc, numChunksToEvict) chunkDescsToEvict := make([]*ChunkDesc, numChunksToEvict)
for i := range chunkDescsToEvict { for i := range chunkDescsToEvict {
e := s.evictList.Front() e := s.evictList.Front()
if e == nil { if e == nil {
break break
} }
cd := e.Value.(*chunkDesc) cd := e.Value.(*ChunkDesc)
cd.evictListElement = nil cd.evictListElement = nil
chunkDescsToEvict[i] = cd chunkDescsToEvict[i] = cd
s.evictList.Remove(e) s.evictList.Remove(e)
@ -1269,7 +1269,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), series.lastTime)
s.seriesOps.WithLabelValues(archive).Inc() s.seriesOps.WithLabelValues(archive).Inc()
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
if oldWatermark < int64(series.lastTime) { if oldWatermark < int64(series.lastTime) {
@ -1325,12 +1325,12 @@ func (s *MemorySeriesStorage) writeMemorySeries(
// Get the actual chunks from underneath the chunkDescs. // Get the actual chunks from underneath the chunkDescs.
// No lock required as chunks still to persist cannot be evicted. // No lock required as chunks still to persist cannot be evicted.
chunks := make([]chunk, len(cds)) chunks := make([]Chunk, len(cds))
for i, cd := range cds { for i, cd := range cds {
chunks[i] = cd.c chunks[i] = cd.c
} }
if !series.firstTime().Before(beforeTime) { if !series.FirstTime().Before(beforeTime) {
// Oldest sample not old enough, just append chunks, if any. // Oldest sample not old enough, just append chunks, if any.
if len(cds) == 0 { if len(cds) == 0 {
return false return false
@ -1413,12 +1413,12 @@ func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
} }
// See persistence.loadChunks for detailed explanation. // See persistence.loadChunks for detailed explanation.
func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset) return s.persistence.loadChunks(fp, indexes, indexOffset)
} }
// See persistence.loadChunkDescs for detailed explanation. // See persistence.loadChunkDescs for detailed explanation.
func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) {
return s.persistence.loadChunkDescs(fp, offsetFromEnd) return s.persistence.loadChunkDescs(fp, offsetFromEnd)
} }

View file

@ -68,7 +68,7 @@ func TestMatches(t *testing.T) {
t.Fatal("could not retrieve series for fp", fp) t.Fatal("could not retrieve series for fp", fp)
} }
storage.fpLocker.Lock(fp) storage.fpLocker.Lock(fp)
storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) storage.persistence.archiveMetric(fp, s.metric, s.FirstTime(), s.lastTime)
storage.fpLocker.Unlock(fp) storage.fpLocker.Unlock(fp)
} }
@ -785,7 +785,7 @@ func TestLoop(t *testing.T) {
} }
} }
func testChunk(t *testing.T, encoding chunkEncoding) { func testChunk(t *testing.T, encoding ChunkEncoding) {
samples := make(model.Samples, 500000) samples := make(model.Samples, 500000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -809,12 +809,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
if cd.isEvicted() { if cd.isEvicted() {
continue continue
} }
it := cd.c.newIterator() it := cd.c.NewIterator()
for it.scan() { for it.Scan() {
values = append(values, it.value()) values = append(values, it.Value())
} }
if it.err() != nil { if it.Err() != nil {
t.Error(it.err()) t.Error(it.Err())
} }
} }
@ -843,7 +843,7 @@ func TestChunkType2(t *testing.T) {
testChunk(t, 2) testChunk(t, 2)
} }
func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { func testValueAtOrBeforeTime(t *testing.T, encoding ChunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -921,7 +921,7 @@ func TestValueAtTimeChunkType2(t *testing.T) {
testValueAtOrBeforeTime(t, 2) testValueAtOrBeforeTime(t, 2)
} }
func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { func benchmarkValueAtOrBeforeTime(b *testing.B, encoding ChunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1003,7 +1003,7 @@ func BenchmarkValueAtTimeChunkType2(b *testing.B) {
benchmarkValueAtOrBeforeTime(b, 2) benchmarkValueAtOrBeforeTime(b, 2)
} }
func testRangeValues(t *testing.T, encoding chunkEncoding) { func testRangeValues(t *testing.T, encoding ChunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1159,7 +1159,7 @@ func TestRangeValuesChunkType2(t *testing.T) {
testRangeValues(t, 2) testRangeValues(t, 2)
} }
func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { func benchmarkRangeValues(b *testing.B, encoding ChunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1207,7 +1207,7 @@ func BenchmarkRangeValuesChunkType2(b *testing.B) {
benchmarkRangeValues(b, 2) benchmarkRangeValues(b, 2)
} }
func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1275,7 +1275,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime)
archived, _, _ := s.persistence.hasArchivedMetric(fp) archived, _, _ := s.persistence.hasArchivedMetric(fp)
if !archived { if !archived {
t.Fatal("not archived") t.Fatal("not archived")
@ -1316,7 +1316,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime)
archived, _, _ = s.persistence.hasArchivedMetric(fp) archived, _, _ = s.persistence.hasArchivedMetric(fp)
if !archived { if !archived {
t.Fatal("not archived") t.Fatal("not archived")
@ -1362,7 +1362,7 @@ func TestEvictAndPurgeSeriesChunkType2(t *testing.T) {
testEvictAndPurgeSeries(t, 2) testEvictAndPurgeSeries(t, 2)
} }
func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1433,7 +1433,7 @@ func TestEvictAndLoadChunkDescsType1(t *testing.T) {
testEvictAndLoadChunkDescs(t, 1) testEvictAndLoadChunkDescs(t, 1)
} }
func benchmarkAppend(b *testing.B, encoding chunkEncoding) { func benchmarkAppend(b *testing.B, encoding ChunkEncoding) {
samples := make(model.Samples, b.N) samples := make(model.Samples, b.N)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -1469,7 +1469,7 @@ func BenchmarkAppendType2(b *testing.B) {
// Append a large number of random samples and then check if we can get them out // Append a large number of random samples and then check if we can get them out
// of the storage alright. // of the storage alright.
func testFuzz(t *testing.T, encoding chunkEncoding) { func testFuzz(t *testing.T, encoding ChunkEncoding) {
if testing.Short() { if testing.Short() {
t.Skip("Skipping test in short mode.") t.Skip("Skipping test in short mode.")
} }
@ -1517,7 +1517,7 @@ func TestFuzzChunkType2(t *testing.T) {
// make things even slower): // make things even slower):
// //
// go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType // go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType
func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { func benchmarkFuzz(b *testing.B, encoding ChunkEncoding) {
DefaultChunkEncoding = encoding DefaultChunkEncoding = encoding
const samplesPerRun = 100000 const samplesPerRun = 100000
rand.Seed(42) rand.Seed(42)

View file

@ -40,7 +40,7 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary // NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the // directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up. // returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t testutil.T, encoding chunkEncoding) (*MemorySeriesStorage, testutil.Closer) { func NewTestStorage(t testutil.T, encoding ChunkEncoding) (*MemorySeriesStorage, testutil.Closer) {
DefaultChunkEncoding = encoding DefaultChunkEncoding = encoding
directory := testutil.NewTemporaryDirectory("test_storage", t) directory := testutil.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{

View file

@ -257,7 +257,7 @@ func newVarbitChunk(enc varbitValueEncoding) *varbitChunk {
} }
// add implements chunk. // add implements chunk.
func (c *varbitChunk) add(s model.SamplePair) ([]chunk, error) { func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) {
offset := c.nextSampleOffset() offset := c.nextSampleOffset()
switch { switch {
case c.closed(): case c.closed():
@ -273,19 +273,19 @@ func (c *varbitChunk) add(s model.SamplePair) ([]chunk, error) {
} }
// clone implements chunk. // clone implements chunk.
func (c varbitChunk) clone() chunk { func (c varbitChunk) Clone() Chunk {
clone := make(varbitChunk, len(c)) clone := make(varbitChunk, len(c))
copy(clone, c) copy(clone, c)
return &clone return &clone
} }
// newIterator implements chunk. // NewIterator implements chunk.
func (c varbitChunk) newIterator() chunkIterator { func (c varbitChunk) NewIterator() ChunkIterator {
return newVarbitChunkIterator(c) return newVarbitChunkIterator(c)
} }
// marshal implements chunk. // marshal implements chunk.
func (c varbitChunk) marshal(w io.Writer) error { func (c varbitChunk) Marshal(w io.Writer) error {
n, err := w.Write(c) n, err := w.Write(c)
if err != nil { if err != nil {
return err return err
@ -297,7 +297,7 @@ func (c varbitChunk) marshal(w io.Writer) error {
} }
// marshalToBuf implements chunk. // marshalToBuf implements chunk.
func (c varbitChunk) marshalToBuf(buf []byte) error { func (c varbitChunk) MarshalToBuf(buf []byte) error {
n := copy(buf, c) n := copy(buf, c)
if n != len(c) { if n != len(c) {
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
@ -306,13 +306,13 @@ func (c varbitChunk) marshalToBuf(buf []byte) error {
} }
// unmarshal implements chunk. // unmarshal implements chunk.
func (c varbitChunk) unmarshal(r io.Reader) error { func (c varbitChunk) Unmarshal(r io.Reader) error {
_, err := io.ReadFull(r, c) _, err := io.ReadFull(r, c)
return err return err
} }
// unmarshalFromBuf implements chunk. // unmarshalFromBuf implements chunk.
func (c varbitChunk) unmarshalFromBuf(buf []byte) error { func (c varbitChunk) UnmarshalFromBuf(buf []byte) error {
if copied := copy(c, buf); copied != cap(c) { if copied := copy(c, buf); copied != cap(c) {
return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied) return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied)
} }
@ -320,10 +320,10 @@ func (c varbitChunk) unmarshalFromBuf(buf []byte) error {
} }
// encoding implements chunk. // encoding implements chunk.
func (c varbitChunk) encoding() chunkEncoding { return varbit } func (c varbitChunk) Encoding() ChunkEncoding { return Varbit }
// firstTime implements chunk. // firstTime implements chunk.
func (c varbitChunk) firstTime() model.Time { func (c varbitChunk) FirstTime() model.Time {
return model.Time( return model.Time(
binary.BigEndian.Uint64( binary.BigEndian.Uint64(
c[varbitFirstTimeOffset:], c[varbitFirstTimeOffset:],
@ -472,7 +472,7 @@ func (c varbitChunk) setLastSample(s model.SamplePair) {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and // addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value. // value as base time and value.
func (c *varbitChunk) addFirstSample(s model.SamplePair) []chunk { func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk {
binary.BigEndian.PutUint64( binary.BigEndian.PutUint64(
(*c)[varbitFirstTimeOffset:], (*c)[varbitFirstTimeOffset:],
uint64(s.Timestamp), uint64(s.Timestamp),
@ -483,14 +483,14 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []chunk {
) )
c.setLastSample(s) // To simplify handling of single-sample chunks. c.setLastSample(s) // To simplify handling of single-sample chunks.
c.setNextSampleOffset(varbitSecondSampleBitOffset) c.setNextSampleOffset(varbitSecondSampleBitOffset)
return []chunk{c} return []Chunk{c}
} }
// addSecondSample is a helper method only used by c.add(). It calculates the // addSecondSample is a helper method only used by c.add(). It calculates the
// first time delta from the provided sample and adds it to the chunk together // first time delta from the provided sample and adds it to the chunk together
// with the provided sample as the last sample. // with the provided sample as the last sample.
func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) {
firstTimeDelta := s.Timestamp - c.firstTime() firstTimeDelta := s.Timestamp - c.FirstTime()
if firstTimeDelta < 0 { if firstTimeDelta < 0 {
return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta)
} }
@ -509,7 +509,7 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) {
c.setLastSample(s) c.setLastSample(s)
c.setNextSampleOffset(varbitThirdSampleBitOffset) c.setNextSampleOffset(varbitThirdSampleBitOffset)
return []chunk{c}, nil return []Chunk{c}, nil
} }
// addLastSample isa a helper method only used by c.add() and in other helper // addLastSample isa a helper method only used by c.add() and in other helper
@ -518,15 +518,15 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) {
// adds the very last sample added to this chunk ever, while setLastSample sets // adds the very last sample added to this chunk ever, while setLastSample sets
// the sample most recently added to the chunk so that it can be used for the // the sample most recently added to the chunk so that it can be used for the
// calculations required to add the next sample. // calculations required to add the next sample.
func (c *varbitChunk) addLastSample(s model.SamplePair) []chunk { func (c *varbitChunk) addLastSample(s model.SamplePair) []Chunk {
c.setLastSample(s) c.setLastSample(s)
(*c)[varbitFlagOffset] |= 0x80 (*c)[varbitFlagOffset] |= 0x80
return []chunk{c} return []Chunk{c}
} }
// addLaterSample is a helper method only used by c.add(). It adds a third or // addLaterSample is a helper method only used by c.add(). It adds a third or
// later sample. // later sample.
func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) { func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk, error) {
var ( var (
lastTime = c.lastTime() lastTime = c.lastTime()
lastTimeDelta = c.lastTimeDelta() lastTimeDelta = c.lastTimeDelta()
@ -593,7 +593,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk
c.setNextSampleOffset(offset) c.setNextSampleOffset(offset)
c.setLastSample(s) c.setLastSample(s)
return []chunk{c}, nil return []Chunk{c}, nil
} }
func (c varbitChunk) prepForThirdSample( func (c varbitChunk) prepForThirdSample(
@ -904,7 +904,7 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator {
} }
// lastTimestamp implements chunkIterator. // lastTimestamp implements chunkIterator.
func (it *varbitChunkIterator) lastTimestamp() (model.Time, error) { func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) {
if it.len == varbitFirstSampleBitOffset { if it.len == varbitFirstSampleBitOffset {
// No samples in the chunk yet. // No samples in the chunk yet.
return model.Earliest, it.lastError return model.Earliest, it.lastError
@ -913,18 +913,18 @@ func (it *varbitChunkIterator) lastTimestamp() (model.Time, error) {
} }
// contains implements chunkIterator. // contains implements chunkIterator.
func (it *varbitChunkIterator) contains(t model.Time) (bool, error) { func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) {
last, err := it.lastTimestamp() last, err := it.LastTimestamp()
if err != nil { if err != nil {
it.lastError = err it.lastError = err
return false, err return false, err
} }
return !t.Before(it.c.firstTime()) && return !t.Before(it.c.FirstTime()) &&
!t.After(last), it.lastError !t.After(last), it.lastError
} }
// scan implements chunkIterator. // scan implements chunkIterator.
func (it *varbitChunkIterator) scan() bool { func (it *varbitChunkIterator) Scan() bool {
if it.lastError != nil { if it.lastError != nil {
return false return false
} }
@ -947,7 +947,7 @@ func (it *varbitChunkIterator) scan() bool {
return it.lastError == nil return it.lastError == nil
} }
if it.pos == varbitFirstSampleBitOffset { if it.pos == varbitFirstSampleBitOffset {
it.t = it.c.firstTime() it.t = it.c.FirstTime()
it.v = it.c.firstValue() it.v = it.c.firstValue()
it.pos = varbitSecondSampleBitOffset it.pos = varbitSecondSampleBitOffset
return it.lastError == nil return it.lastError == nil
@ -1003,8 +1003,8 @@ func (it *varbitChunkIterator) scan() bool {
} }
// findAtOrBefore implements chunkIterator. // findAtOrBefore implements chunkIterator.
func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool { func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool {
if it.len == 0 || t.Before(it.c.firstTime()) { if it.len == 0 || t.Before(it.c.FirstTime()) {
return false return false
} }
last := it.c.lastTime() last := it.c.lastTime()
@ -1025,7 +1025,7 @@ func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool {
prevT = model.Earliest prevT = model.Earliest
prevV model.SampleValue prevV model.SampleValue
) )
for it.scan() && t.After(it.t) { for it.Scan() && t.After(it.t) {
prevT = it.t prevT = it.t
prevV = it.v prevV = it.v
// TODO(beorn7): If we are in a repeat, we could iterate forward // TODO(beorn7): If we are in a repeat, we could iterate forward
@ -1039,14 +1039,14 @@ func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool {
} }
// findAtOrAfter implements chunkIterator. // findAtOrAfter implements chunkIterator.
func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool { func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool {
if it.len == 0 || t.After(it.c.lastTime()) { if it.len == 0 || t.After(it.c.lastTime()) {
return false return false
} }
first := it.c.firstTime() first := it.c.FirstTime()
if !t.After(first) { if !t.After(first) {
it.reset() it.reset()
return it.scan() return it.Scan()
} }
if t == it.t { if t == it.t {
return it.lastError == nil return it.lastError == nil
@ -1054,7 +1054,7 @@ func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool {
if t.Before(it.t) { if t.Before(it.t) {
it.reset() it.reset()
} }
for it.scan() && t.After(it.t) { for it.Scan() && t.After(it.t) {
// TODO(beorn7): If we are in a repeat, we could iterate forward // TODO(beorn7): If we are in a repeat, we could iterate forward
// much faster. // much faster.
} }
@ -1062,7 +1062,7 @@ func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool {
} }
// value implements chunkIterator. // value implements chunkIterator.
func (it *varbitChunkIterator) value() model.SamplePair { func (it *varbitChunkIterator) Value() model.SamplePair {
return model.SamplePair{ return model.SamplePair{
Timestamp: it.t, Timestamp: it.t,
Value: it.v, Value: it.v,
@ -1070,7 +1070,7 @@ func (it *varbitChunkIterator) value() model.SamplePair {
} }
// err implements chunkIterator. // err implements chunkIterator.
func (it *varbitChunkIterator) err() error { func (it *varbitChunkIterator) Err() error {
return it.lastError return it.lastError
} }

View file

@ -89,7 +89,7 @@ type Options struct {
GraphitePrefix string GraphitePrefix string
} }
// Run starts the background processing of the storage queues. // Start starts the background processing of the storage queues.
func (s *Storage) Start() { func (s *Storage) Start() {
for _, q := range s.queues { for _, q := range s.queues {
q.Start() q.Start()