Review fixups.

This commit is contained in:
Julius Volz 2016-09-28 23:33:34 +02:00
parent d30a3c7c0f
commit 044ebce779
9 changed files with 113 additions and 113 deletions

View file

@ -37,7 +37,7 @@ var errChunkBoundsExceeded = errors.New("attempted access outside of chunk bound
// EvictRequest is a request to evict a chunk from memory. // EvictRequest is a request to evict a chunk from memory.
type EvictRequest struct { type EvictRequest struct {
CD *Desc Desc *Desc
Evict bool Evict bool
} }
@ -45,19 +45,19 @@ type EvictRequest struct {
type Encoding byte type Encoding byte
// String implements flag.Value. // String implements flag.Value.
func (ce Encoding) String() string { func (e Encoding) String() string {
return fmt.Sprintf("%d", ce) return fmt.Sprintf("%d", e)
} }
// Set implements flag.Value. // Set implements flag.Value.
func (ce *Encoding) Set(s string) error { func (e *Encoding) Set(s string) error {
switch s { switch s {
case "0": case "0":
*ce = Delta *e = Delta
case "1": case "1":
*ce = DoubleDelta *e = DoubleDelta
case "2": case "2":
*ce = Varbit *e = Varbit
default: default:
return fmt.Errorf("invalid chunk encoding: %s", s) return fmt.Errorf("invalid chunk encoding: %s", s)
} }
@ -87,18 +87,18 @@ const (
// methods involve no locking. They may only be called if the caller has pinned // methods involve no locking. They may only be called if the caller has pinned
// the chunk (to guarantee the chunk is not evicted concurrently). Also, the // the chunk (to guarantee the chunk is not evicted concurrently). Also, the
// caller must make sure nobody else will call these methods concurrently, // caller must make sure nobody else will call these methods concurrently,
// either by holding the sole reference to the ChunkDesc (usually during loading // either by holding the sole reference to the Desc (usually during loading
// or creation) or by locking the fingerprint of the series the ChunkDesc // or creation) or by locking the fingerprint of the series the Desc
// belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk. // belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk.
// //
// Finally, there are the special cases FirstTime and LastTime. LastTime requires // Finally, there are the special cases FirstTime and LastTime. LastTime requires
// to have locked the fingerprint of the series but the chunk does not need to // to have locked the fingerprint of the series but the chunk does not need to
// be pinned. That's because the ChunkLastTime field in ChunkDesc gets populated // be pinned. That's because the ChunkLastTime field in Desc gets populated
// upon completion of the chunk (when it is still pinned, and which happens // upon completion of the chunk (when it is still pinned, and which happens
// while the series's fingerprint is locked). Once that has happened, calling // while the series's fingerprint is locked). Once that has happened, calling
// LastTime does not require the chunk to be loaded anymore. Before that has // LastTime does not require the chunk to be loaded anymore. Before that has
// happened, the chunk is pinned anyway. The ChunkFirstTime field in ChunkDesc // happened, the chunk is pinned anyway. The ChunkFirstTime field in Desc
// is populated upon creation of a ChunkDesc, so it is alway safe to call // is populated upon creation of a Desc, so it is alway safe to call
// FirstTime. The FirstTime method is arguably not needed and only there for // FirstTime. The FirstTime method is arguably not needed and only there for
// consistency with LastTime. // consistency with LastTime.
type Desc struct { type Desc struct {
@ -109,18 +109,18 @@ type Desc struct {
ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
// EvictListElement is nil if the chunk is not in the evict list. // EvictListElement is nil if the chunk is not in the evict list.
// EvictListElement is _not_ protected by the ChunkDesc mutex. // EvictListElement is _not_ protected by the Desc mutex.
// It must only be touched by the evict list handler in MemorySeriesStorage. // It must only be touched by the evict list handler in MemorySeriesStorage.
EvictListElement *list.Element EvictListElement *list.Element
} }
// NewDesc creates a new Desc pointing to the provided chunk. The provided chunk // NewDesc creates a new Desc pointing to the provided chunk. The provided chunk
// is assumed to be not persisted yet. Therefore, the refCount of the new // is assumed to be not persisted yet. Therefore, the refCount of the new
// ChunkDesc is 1 (preventing eviction prior to persisting). // Desc is 1 (preventing eviction prior to persisting).
func NewDesc(c Chunk, firstTime model.Time) *Desc { func NewDesc(c Chunk, firstTime model.Time) *Desc {
ChunkOps.WithLabelValues(CreateAndPin).Inc() Ops.WithLabelValues(CreateAndPin).Inc()
atomic.AddInt64(&NumMemChunks, 1) atomic.AddInt64(&NumMemChunks, 1)
NumMemChunkDescs.Inc() NumMemDescs.Inc()
return &Desc{ return &Desc{
C: c, C: c,
rCnt: 1, rCnt: 1,
@ -132,68 +132,68 @@ func NewDesc(c Chunk, firstTime model.Time) *Desc {
// Add adds a sample pair to the underlying chunk. For safe concurrent access, // Add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of // The chunk must be pinned, and the caller must have locked the fingerprint of
// the series. // the series.
func (cd *Desc) Add(s model.SamplePair) ([]Chunk, error) { func (d *Desc) Add(s model.SamplePair) ([]Chunk, error) {
return cd.C.Add(s) return d.C.Add(s)
} }
// Pin increments the refCount by one. Upon increment from 0 to 1, this // Pin increments the refCount by one. Upon increment from 0 to 1, this
// ChunkDesc is removed from the evict list. To enable the latter, the // Desc is removed from the evict list. To enable the latter, the
// evictRequests channel has to be provided. This method can be called // evictRequests channel has to be provided. This method can be called
// concurrently at any time. // concurrently at any time.
func (cd *Desc) Pin(evictRequests chan<- EvictRequest) { func (d *Desc) Pin(evictRequests chan<- EvictRequest) {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
if cd.rCnt == 0 { if d.rCnt == 0 {
// Remove ourselves from the evict list. // Remove ourselves from the evict list.
evictRequests <- EvictRequest{cd, false} evictRequests <- EvictRequest{d, false}
} }
cd.rCnt++ d.rCnt++
} }
// Unpin decrements the refCount by one. Upon decrement from 1 to 0, this // Unpin decrements the refCount by one. Upon decrement from 1 to 0, this
// ChunkDesc is added to the evict list. To enable the latter, the evictRequests // Desc is added to the evict list. To enable the latter, the evictRequests
// channel has to be provided. This method can be called concurrently at any // channel has to be provided. This method can be called concurrently at any
// time. // time.
func (cd *Desc) Unpin(evictRequests chan<- EvictRequest) { func (d *Desc) Unpin(evictRequests chan<- EvictRequest) {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
if cd.rCnt == 0 { if d.rCnt == 0 {
panic("cannot unpin already unpinned chunk") panic("cannot unpin already unpinned chunk")
} }
cd.rCnt-- d.rCnt--
if cd.rCnt == 0 { if d.rCnt == 0 {
// Add ourselves to the back of the evict list. // Add ourselves to the back of the evict list.
evictRequests <- EvictRequest{cd, true} evictRequests <- EvictRequest{d, true}
} }
} }
// RefCount returns the number of pins. This method can be called concurrently // RefCount returns the number of pins. This method can be called concurrently
// at any time. // at any time.
func (cd *Desc) RefCount() int { func (d *Desc) RefCount() int {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
return cd.rCnt return d.rCnt
} }
// FirstTime returns the timestamp of the first sample in the chunk. This method // FirstTime returns the timestamp of the first sample in the chunk. This method
// can be called concurrently at any time. It only returns the immutable // can be called concurrently at any time. It only returns the immutable
// cd.ChunkFirstTime without any locking. Arguably, this method is // d.ChunkFirstTime without any locking. Arguably, this method is
// useless. However, it provides consistency with the LastTime method. // useless. However, it provides consistency with the LastTime method.
func (cd *Desc) FirstTime() model.Time { func (d *Desc) FirstTime() model.Time {
return cd.ChunkFirstTime return d.ChunkFirstTime
} }
// LastTime returns the timestamp of the last sample in the chunk. For safe // LastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to // concurrent access, this method requires the fingerprint of the time series to
// be locked. // be locked.
func (cd *Desc) LastTime() (model.Time, error) { func (d *Desc) LastTime() (model.Time, error) {
if cd.ChunkLastTime != model.Earliest || cd.C == nil { if d.ChunkLastTime != model.Earliest || d.C == nil {
return cd.ChunkLastTime, nil return d.ChunkLastTime, nil
} }
return cd.C.NewIterator().LastTimestamp() return d.C.NewIterator().LastTimestamp()
} }
// MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk // MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk
@ -201,57 +201,57 @@ func (cd *Desc) LastTime() (model.Time, error) {
// last sample to a chunk or after closing a head chunk due to age. For safe // last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked // concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series. // the fingerprint of the series.
func (cd *Desc) MaybePopulateLastTime() error { func (d *Desc) MaybePopulateLastTime() error {
if cd.ChunkLastTime == model.Earliest && cd.C != nil { if d.ChunkLastTime == model.Earliest && d.C != nil {
t, err := cd.C.NewIterator().LastTimestamp() t, err := d.C.NewIterator().LastTimestamp()
if err != nil { if err != nil {
return err return err
} }
cd.ChunkLastTime = t d.ChunkLastTime = t
} }
return nil return nil
} }
// IsEvicted returns whether the chunk is evicted. For safe concurrent access, // IsEvicted returns whether the chunk is evicted. For safe concurrent access,
// the caller must have locked the fingerprint of the series. // the caller must have locked the fingerprint of the series.
func (cd *Desc) IsEvicted() bool { func (d *Desc) IsEvicted() bool {
// Locking required here because we do not want the caller to force // Locking required here because we do not want the caller to force
// pinning the chunk first, so it could be evicted while this method is // pinning the chunk first, so it could be evicted while this method is
// called. // called.
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
return cd.C == nil return d.C == nil
} }
// SetChunk sets the underlying chunk. The caller must have locked the // SetChunk sets the underlying chunk. The caller must have locked the
// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first // fingerprint of the series and must have "pre-pinned" the chunk (i.e. first
// call Pin and then set the chunk). // call Pin and then set the chunk).
func (cd *Desc) SetChunk(c Chunk) { func (d *Desc) SetChunk(c Chunk) {
if cd.C != nil { if d.C != nil {
panic("chunk already set") panic("chunk already set")
} }
cd.C = c d.C = c
} }
// MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// is now evicted, which includes the case that the chunk was evicted even // is now evicted, which includes the case that the chunk was evicted even
// before this method was called. It can be called concurrently at any time. // before this method was called. It can be called concurrently at any time.
func (cd *Desc) MaybeEvict() bool { func (d *Desc) MaybeEvict() bool {
cd.Lock() d.Lock()
defer cd.Unlock() defer d.Unlock()
if cd.C == nil { if d.C == nil {
return true return true
} }
if cd.rCnt != 0 { if d.rCnt != 0 {
return false return false
} }
if cd.ChunkLastTime == model.Earliest { if d.ChunkLastTime == model.Earliest {
// This must never happen. // This must never happen.
panic("ChunkLastTime not populated for evicted chunk") panic("ChunkLastTime not populated for evicted chunk")
} }
cd.C = nil d.C = nil
return true return true
} }
@ -327,7 +327,7 @@ func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) {
// chunk, adds the provided sample to it, and returns a chunk slice containing // chunk, adds the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk. // the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
overflowChunks, err := NewChunk().Add(s) overflowChunks, err := New().Add(s)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -339,7 +339,7 @@ func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
// provided sample. It returns the new chunks (transcoded plus overflow) with // provided sample. It returns the new chunks (transcoded plus overflow) with
// the new sample at the end. // the new sample at the end.
func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) { func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) {
ChunkOps.WithLabelValues(Transcode).Inc() Ops.WithLabelValues(Transcode).Inc()
var ( var (
head = dst head = dst
@ -365,18 +365,18 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error)
return append(body, NewChunks...), nil return append(body, NewChunks...), nil
} }
// NewChunk creates a new chunk according to the encoding set by the // New creates a new chunk according to the encoding set by the
// DefaultEncoding flag. // DefaultEncoding flag.
func NewChunk() Chunk { func New() Chunk {
chunk, err := NewChunkForEncoding(DefaultEncoding) chunk, err := NewForEncoding(DefaultEncoding)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return chunk return chunk
} }
// NewChunkForEncoding allows configuring what chunk type you want // NewForEncoding allows configuring what chunk type you want
func NewChunkForEncoding(encoding Encoding) (Chunk, error) { func NewForEncoding(encoding Encoding) (Chunk, error) {
switch encoding { switch encoding {
case Delta: case Delta:
return newDeltaEncodedChunk(d1, d0, true, ChunkLen), nil return newDeltaEncodedChunk(d1, d0, true, ChunkLen), nil

View file

@ -19,7 +19,7 @@ import "github.com/prometheus/client_golang/prometheus"
// be close to where they are used. However, the metrics below are set all over // be close to where they are used. However, the metrics below are set all over
// the place, so we go for a separate instrumentation file in this case. // the place, so we go for a separate instrumentation file in this case.
var ( var (
ChunkOps = prometheus.NewCounterVec( Ops = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -28,7 +28,7 @@ var (
}, },
[]string{OpTypeLabel}, []string{OpTypeLabel},
) )
ChunkDescOps = prometheus.NewCounterVec( DescOps = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -37,7 +37,7 @@ var (
}, },
[]string{OpTypeLabel}, []string{OpTypeLabel},
) )
NumMemChunkDescs = prometheus.NewGauge(prometheus.GaugeOpts{ NumMemDescs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "memory_chunkdescs", Name: "memory_chunkdescs",
@ -55,7 +55,7 @@ const (
// Op-types for ChunkOps. // Op-types for ChunkOps.
// CreateAndPin is the label value for create-and-pin chunk ops. // CreateAndPin is the label value for create-and-pin chunk ops.
CreateAndPin = "create" // A ChunkDesc creation with refCount=1. CreateAndPin = "create" // A Desc creation with refCount=1.
// PersistAndUnpin is the label value for persist chunk ops. // PersistAndUnpin is the label value for persist chunk ops.
PersistAndUnpin = "persist" PersistAndUnpin = "persist"
// Pin is the label value for pin chunk ops (excludes pin on creation). // Pin is the label value for pin chunk ops (excludes pin on creation).
@ -78,9 +78,9 @@ const (
) )
func init() { func init() {
prometheus.MustRegister(ChunkOps) prometheus.MustRegister(Ops)
prometheus.MustRegister(ChunkDescOps) prometheus.MustRegister(DescOps)
prometheus.MustRegister(NumMemChunkDescs) prometheus.MustRegister(NumMemDescs)
} }
var ( var (

View file

@ -118,7 +118,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark), make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]..., s.chunkDescs[s.persistWatermark:]...,
) )
chunk.NumMemChunkDescs.Sub(float64(s.persistWatermark)) chunk.NumMemDescs.Sub(float64(s.persistWatermark))
s.persistWatermark = 0 s.persistWatermark = 0
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
} }
@ -315,7 +315,7 @@ func (p *persistence) sanitizeSeries(
// First, throw away the chunkDescs without chunks. // First, throw away the chunkDescs without chunks.
s.chunkDescs = s.chunkDescs[s.persistWatermark:] s.chunkDescs = s.chunkDescs[s.persistWatermark:]
chunk.NumMemChunkDescs.Sub(float64(s.persistWatermark)) chunk.NumMemDescs.Sub(float64(s.persistWatermark))
cds, err := p.loadChunkDescs(fp, 0) cds, err := p.loadChunkDescs(fp, 0)
if err != nil { if err != nil {
log.Errorf( log.Errorf(
@ -351,7 +351,7 @@ func (p *persistence) sanitizeSeries(
"Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.", "Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.",
s.metric, fp, chunksInFile, s.metric, fp, chunksInFile,
) )
chunk.NumMemChunkDescs.Sub(float64(len(s.chunkDescs))) chunk.NumMemDescs.Sub(float64(len(s.chunkDescs)))
atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs))) atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs)))
s.chunkDescs = cds s.chunkDescs = cds
s.headChunkClosed = true s.headChunkClosed = true
@ -361,7 +361,7 @@ func (p *persistence) sanitizeSeries(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.", "Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.",
s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx, s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx,
) )
chunk.NumMemChunkDescs.Sub(float64(keepIdx)) chunk.NumMemDescs.Sub(float64(keepIdx))
atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx)) atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx))
if keepIdx == len(s.chunkDescs) { if keepIdx == len(s.chunkDescs) {
// No chunks from series file left, head chunk is evicted, so declare it closed. // No chunks from series file left, head chunk is evicted, so declare it closed.

View file

@ -168,7 +168,7 @@ func (hs *headsScanner) scan() bool {
ChunkFirstTime: model.Time(firstTime), ChunkFirstTime: model.Time(firstTime),
ChunkLastTime: model.Time(lastTime), ChunkLastTime: model.Time(lastTime),
} }
chunk.NumMemChunkDescs.Inc() chunk.NumMemDescs.Inc()
} else { } else {
// Non-persisted chunk. // Non-persisted chunk.
// If there are non-persisted chunks at all, we consider // If there are non-persisted chunks at all, we consider
@ -177,7 +177,7 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false return false
} }
if ch, hs.err = chunk.NewChunkForEncoding(chunk.Encoding(encoding)); hs.err != nil { if ch, hs.err = chunk.NewForEncoding(chunk.Encoding(encoding)); hs.err != nil {
return false return false
} }
if hs.err = ch.Unmarshal(hs.r); hs.err != nil { if hs.err = ch.Unmarshal(hs.r); hs.err != nil {

View file

@ -437,7 +437,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err return nil, err
} }
for c := 0; c < batchSize; c++ { for c := 0; c < batchSize; c++ {
chunk, err := chunk.NewChunkForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -447,7 +447,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
chunks = append(chunks, chunk) chunks = append(chunks, chunk)
} }
} }
chunk.ChunkOps.WithLabelValues(chunk.Load).Add(float64(len(chunks))) chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks))) atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
return chunks, nil return chunks, nil
} }
@ -496,8 +496,8 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
} }
} }
chunk.ChunkDescOps.WithLabelValues(chunk.Load).Add(float64(len(cds))) chunk.DescOps.WithLabelValues(chunk.Load).Add(float64(len(cds)))
chunk.NumMemChunkDescs.Add(float64(len(cds))) chunk.NumMemDescs.Add(float64(len(cds)))
return cds, nil return cds, nil
} }
@ -880,7 +880,7 @@ func (p *persistence) dropAndPersistChunks(
firstTimeNotDropped = model.Time( firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]), binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
) )
chunk.ChunkOps.WithLabelValues(chunk.Drop).Add(float64(numDropped)) chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped))
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR) _, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
if err != nil { if err != nil {
return return
@ -931,7 +931,7 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
if err := os.Remove(fname); err != nil { if err := os.Remove(fname); err != nil {
return -1, err return -1, err
} }
chunk.ChunkOps.WithLabelValues(chunk.Drop).Add(float64(numChunks)) chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numChunks))
return numChunks, nil return numChunks, nil
} }

View file

@ -65,7 +65,7 @@ func buildTestChunks(t *testing.T, encoding chunk.Encoding) map[model.Fingerprin
for _, fp := range fps { for _, fp := range fps {
fpToChunks[fp] = make([]chunk.Chunk, 0, 10) fpToChunks[fp] = make([]chunk.Chunk, 0, 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
ch, err := chunk.NewChunkForEncoding(encoding) ch, err := chunk.NewForEncoding(encoding)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -565,7 +565,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
for i, cd := range loadedS4.chunkDescs { for i, cd := range loadedS4.chunkDescs {
if cd.ChunkFirstTime != cd.C.FirstTime() { if cd.ChunkFirstTime != cd.C.FirstTime() {
t.Errorf( t.Errorf(
"ChunkDesc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.C.FirstTime(), cd.ChunkFirstTime, i, cd.C.FirstTime(), cd.ChunkFirstTime,
) )
} }
@ -582,7 +582,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
} }
if cd.ChunkLastTime != lastTime { if cd.ChunkLastTime != lastTime {
t.Errorf( t.Errorf(
"ChunkDesc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d",
i, lastTime, cd.ChunkLastTime, i, lastTime, cd.ChunkLastTime,
) )
} }
@ -616,13 +616,13 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
if i < 3 { if i < 3 {
// Evicted chunks. // Evicted chunks.
if cd.ChunkFirstTime == model.Earliest { if cd.ChunkFirstTime == model.Earliest {
t.Errorf("ChunkDesc[%d]: ChunkLastTime not set", i) t.Errorf("chunk.Desc[%d]: ChunkLastTime not set", i)
} }
continue continue
} }
if cd.ChunkFirstTime != cd.C.FirstTime() { if cd.ChunkFirstTime != cd.C.FirstTime() {
t.Errorf( t.Errorf(
"ChunkDesc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.C.FirstTime(), cd.ChunkFirstTime, i, cd.C.FirstTime(), cd.ChunkFirstTime,
) )
} }
@ -639,7 +639,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
} }
if cd.ChunkLastTime != lastTime { if cd.ChunkLastTime != lastTime {
t.Errorf( t.Errorf(
"ChunkDesc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d", "chunk.Desc[%d]: ChunkLastTime not consistent with chunk, want %d, got %d",
i, cd.ChunkLastTime, lastTime, i, cd.ChunkLastTime, lastTime,
) )
} }

View file

@ -25,9 +25,9 @@ import (
) )
const ( const (
// chunkDescEvictionFactor is a factor used for ChunkDesc eviction (as opposed // chunkDescEvictionFactor is a factor used for chunk.Desc eviction (as opposed
// to evictions of chunks, see method evictOlderThan. A chunk takes about 20x // to evictions of chunks, see method evictOlderThan. A chunk takes about 20x
// more memory than a ChunkDesc. With a chunkDescEvictionFactor of 10, not more // more memory than a chunk.Desc. With a chunkDescEvictionFactor of 10, not more
// than a third of the total memory taken by a series will be used for // than a third of the total memory taken by a series will be used for
// chunkDescs. // chunkDescs.
chunkDescEvictionFactor = 10 chunkDescEvictionFactor = 10
@ -142,7 +142,7 @@ type memorySeries struct {
metric model.Metric metric model.Metric
// Sorted by start time, overlapping chunk ranges are forbidden. // Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs []*chunk.Desc chunkDescs []*chunk.Desc
// The index (within chunkDescs above) of the first ChunkDesc that // The index (within chunkDescs above) of the first chunk.Desc that
// points to a non-persisted chunk. If all chunks are persisted, then // points to a non-persisted chunk. If all chunks are persisted, then
// persistWatermark == len(chunkDescs). // persistWatermark == len(chunkDescs).
persistWatermark int persistWatermark int
@ -152,7 +152,7 @@ type memorySeries struct {
// The chunkDescs in memory might not have all the chunkDescs for the // The chunkDescs in memory might not have all the chunkDescs for the
// chunks that are persisted to disk. The missing chunkDescs are all // chunks that are persisted to disk. The missing chunkDescs are all
// contiguous and at the tail end. chunkDescsOffset is the index of the // contiguous and at the tail end. chunkDescsOffset is the index of the
// chunk on disk that corresponds to the first ChunkDesc in memory. If // chunk on disk that corresponds to the first chunk.Desc in memory. If
// it is 0, the chunkDescs are all loaded. A value of -1 denotes a // it is 0, the chunkDescs are all loaded. A value of -1 denotes a
// special case: There are chunks on disk, but the offset to the // special case: There are chunks on disk, but the offset to the
// chunkDescs in memory is unknown. Also, in this special case, there is // chunkDescs in memory is unknown. Also, in this special case, there is
@ -221,7 +221,7 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunk.Desc, modTime time.Time
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) Add(v model.SamplePair) (int, error) { func (s *memorySeries) Add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed { if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := chunk.NewDesc(chunk.NewChunk(), v.Timestamp) newHead := chunk.NewDesc(chunk.New(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead) s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkClosed = false s.headChunkClosed = false
} else if s.headChunkUsedByIterator && s.head().RefCount() > 1 { } else if s.headChunkUsedByIterator && s.head().RefCount() > 1 {
@ -234,7 +234,7 @@ func (s *memorySeries) Add(v model.SamplePair) (int, error) {
// around and keep the head chunk pinned. We needed to track // around and keep the head chunk pinned. We needed to track
// pins by version of the head chunk, which is probably not // pins by version of the head chunk, which is probably not
// worth the effort. // worth the effort.
chunk.ChunkOps.WithLabelValues(chunk.Clone).Inc() chunk.Ops.WithLabelValues(chunk.Clone).Inc()
// No locking needed here because a non-persisted head chunk can // No locking needed here because a non-persisted head chunk can
// not get evicted concurrently. // not get evicted concurrently.
s.head().C = s.head().C.Clone() s.head().C = s.head().C.Clone()
@ -292,8 +292,8 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
lenEvicted := len(s.chunkDescs) - lenToKeep lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted s.chunkDescsOffset += lenEvicted
s.persistWatermark -= lenEvicted s.persistWatermark -= lenEvicted
chunk.ChunkDescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted)) chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted))
chunk.NumMemChunkDescs.Sub(float64(lenEvicted)) chunk.NumMemDescs.Sub(float64(lenEvicted))
s.chunkDescs = append( s.chunkDescs = append(
make([]*chunk.Desc, 0, lenToKeep), make([]*chunk.Desc, 0, lenToKeep),
s.chunkDescs[lenEvicted:]..., s.chunkDescs[lenEvicted:]...,
@ -335,7 +335,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
if s.chunkDescsOffset != -1 { if s.chunkDescsOffset != -1 {
s.chunkDescsOffset += keepIdx s.chunkDescsOffset += keepIdx
} }
chunk.NumMemChunkDescs.Sub(float64(keepIdx)) chunk.NumMemDescs.Sub(float64(keepIdx))
s.dirty = true s.dirty = true
return nil return nil
} }
@ -354,7 +354,7 @@ func (s *memorySeries) preloadChunks(
loadIndexes = append(loadIndexes, idx) loadIndexes = append(loadIndexes, idx)
} }
} }
chunk.ChunkOps.WithLabelValues(chunk.Pin).Add(float64(len(pinnedChunkDescs))) chunk.Ops.WithLabelValues(chunk.Pin).Add(float64(len(pinnedChunkDescs)))
if len(loadIndexes) > 0 { if len(loadIndexes) > 0 {
if s.chunkDescsOffset == -1 { if s.chunkDescsOffset == -1 {
@ -366,7 +366,7 @@ func (s *memorySeries) preloadChunks(
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
cd.Unpin(mss.evictRequests) cd.Unpin(mss.evictRequests)
} }
chunk.ChunkOps.WithLabelValues(chunk.Unpin).Add(float64(len(pinnedChunkDescs))) chunk.Ops.WithLabelValues(chunk.Unpin).Add(float64(len(pinnedChunkDescs)))
return nopIter, err return nopIter, err
} }
for i, c := range chunks { for i, c := range chunks {
@ -672,7 +672,7 @@ func (it *memorySeriesIterator) Close() {
for _, cd := range it.pinnedChunkDescs { for _, cd := range it.pinnedChunkDescs {
cd.Unpin(it.evictRequests) cd.Unpin(it.evictRequests)
} }
chunk.ChunkOps.WithLabelValues(chunk.Unpin).Add(float64(len(it.pinnedChunkDescs))) chunk.Ops.WithLabelValues(chunk.Unpin).Add(float64(len(it.pinnedChunkDescs)))
} }
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut // singleSampleSeriesIterator implements Series Iterator. It is a "shortcut

View file

@ -932,16 +932,16 @@ func (s *MemorySeriesStorage) handleEvictList() {
select { select {
case req := <-s.evictRequests: case req := <-s.evictRequests:
if req.Evict { if req.Evict {
req.CD.EvictListElement = s.evictList.PushBack(req.CD) req.Desc.EvictListElement = s.evictList.PushBack(req.Desc)
count++ count++
if count > s.maxMemoryChunks/1000 { if count > s.maxMemoryChunks/1000 {
s.maybeEvict() s.maybeEvict()
count = 0 count = 0
} }
} else { } else {
if req.CD.EvictListElement != nil { if req.Desc.EvictListElement != nil {
s.evictList.Remove(req.CD.EvictListElement) s.evictList.Remove(req.Desc.EvictListElement)
req.CD.EvictListElement = nil req.Desc.EvictListElement = nil
} }
} }
case <-ticker.C: case <-ticker.C:
@ -1314,7 +1314,7 @@ func (s *MemorySeriesStorage) writeMemorySeries(
cd.Unpin(s.evictRequests) cd.Unpin(s.evictRequests)
} }
s.incNumChunksToPersist(-len(cds)) s.incNumChunksToPersist(-len(cds))
chunk.ChunkOps.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds))) chunk.Ops.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds)))
series.modTime = s.persistence.seriesFileModTime(fp) series.modTime = s.persistence.seriesFileModTime(fp)
}() }()

View file

@ -1402,7 +1402,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
s.maintainMemorySeries(fp, 0) s.maintainMemorySeries(fp, 0)
// Give the evict goroutine an opportunity to run. // Give the evict goroutine an opportunity to run.
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
// Maintain series again to trigger ChunkDesc eviction // Maintain series again to trigger chunk.Desc eviction.
s.maintainMemorySeries(fp, 0) s.maintainMemorySeries(fp, 0)
if oldLen <= len(series.chunkDescs) { if oldLen <= len(series.chunkDescs) {
@ -1422,7 +1422,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
if len(series.chunkDescs) != 1 { if len(series.chunkDescs) != 1 {
t.Errorf("Expected exactly one ChunkDesc left, got %d.", len(series.chunkDescs)) t.Errorf("Expected exactly one chunk.Desc left, got %d.", len(series.chunkDescs))
} }
} }