diff --git a/main.go b/main.go index 1efeedbd2..fd3c533cd 100644 --- a/main.go +++ b/main.go @@ -54,7 +54,6 @@ var ( samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.") - chunkType = flag.Int("storage.local.chunk-type", 1, "Which chunk encoding version to use. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") @@ -123,8 +122,7 @@ func NewPrometheus() *prometheus { PersistenceQueueCapacity: *persistenceQueueCapacity, CheckpointInterval: *checkpointInterval, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, - ChunkType: byte(*chunkType), - Dirty: *storageDirty, + Dirty: *storageDirty, } memStorage, err := local.NewMemorySeriesStorage(o) if err != nil { diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 4dbce0523..58110c350 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -15,6 +15,7 @@ package local import ( "container/list" + "flag" "fmt" "io" "sync" @@ -25,6 +26,17 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +var ( + defaultChunkEncoding = flag.Int("storage.local.chunk-encoding-version", 1, "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).") +) + +type chunkEncoding byte + +const ( + delta chunkEncoding = iota + doubleDelta +) + // chunkDesc contains meta-data for a chunk. Many of its methods are // goroutine-safe proxies for chunk methods. type chunkDesc struct { @@ -173,13 +185,14 @@ type chunk interface { // any. The first chunk returned might be the same as the original one // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the orginal chunk. - add(*metric.SamplePair) []chunk + add(sample *metric.SamplePair) []chunk clone() chunk firstTime() clientmodel.Timestamp lastTime() clientmodel.Timestamp newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error + encoding() chunkEncoding // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last // one. It is generally not safe to mutate the chunk while the channel @@ -215,29 +228,22 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { head = newChunks[len(newChunks)-1] } newChunks := head.add(s) - body = append(body, newChunks[:len(newChunks)-1]...) - head = newChunks[len(newChunks)-1] - return append(body, head) + return append(body, newChunks...) } -func chunkType(c chunk) byte { - switch c.(type) { - case *deltaEncodedChunk: - return 0 - case *doubleDeltaEncodedChunk: - return 1 - default: - panic(fmt.Errorf("unknown chunk type: %T", c)) - } +// newChunk creates a new chunk according to the encoding set by the +// defaultChunkEncoding flag. +func newChunk() chunk { + return newChunkForEncoding(chunkEncoding(*defaultChunkEncoding)) } -func chunkForType(chunkType byte) chunk { - switch chunkType { - case 0: +func newChunkForEncoding(encoding chunkEncoding) chunk { + switch encoding { + case delta: return newDeltaEncodedChunk(d1, d0, true, chunkLen) - case 1: + case doubleDelta: return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) default: - panic(fmt.Errorf("unknown chunk type: %d", chunkType)) + panic(fmt.Errorf("unknown chunk encoding: %v", encoding)) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index b27b5a599..0dc741e40 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -341,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes( if err := kv.Value(&m); err != nil { return err } - series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest, p.chunkType) + series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) if err != nil { return err diff --git a/storage/local/delta.go b/storage/local/delta.go index 17c374f76..0e4816935 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -75,37 +75,6 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod return &c } -func (c deltaEncodedChunk) newFollowupChunk() chunk { - return newDeltaEncodedChunk(d1, d0, true, cap(c)) -} - -// clone implements chunk. -func (c deltaEncodedChunk) clone() chunk { - clone := make(deltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - -func (c deltaEncodedChunk) timeBytes() deltaBytes { - return deltaBytes(c[deltaHeaderTimeBytesOffset]) -} - -func (c deltaEncodedChunk) valueBytes() deltaBytes { - return deltaBytes(c[deltaHeaderValueBytesOffset]) -} - -func (c deltaEncodedChunk) isInt() bool { - return c[deltaHeaderIsIntOffset] == 1 -} - -func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp { - return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) -} - -func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue { - return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) -} - // add implements chunk. func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { if c.len() == 0 { @@ -120,7 +89,7 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := c.newFollowupChunk().add(s) + overflowChunks := newChunk().add(s) return []chunk{&c, overflowChunks[0]} } @@ -131,33 +100,38 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { dv := s.Value - baseValue tb := c.timeBytes() vb := c.valueBytes() + isInt := c.isInt() // If the new sample is incompatible with the current encoding, reencode the // existing chunk data into new chunk(s). - // - // int->float. - if c.isInt() && !isInt64(dv) { - return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s) + + ntb, nvb, nInt := tb, vb, isInt + if isInt && !isInt64(dv) { + // int->float. + nvb = d4 + nInt = false + } else if !isInt && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value { + // float32->float64. + nvb = d8 + } else { + if tb < d8 { + // Maybe more bytes for timestamp. + ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt)) + } + if c.isInt() && vb < d8 { + // Maybe more bytes for sample value. + nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv)) + } } - // float32->float64. - if !c.isInt() && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value { - return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s) + if tb != ntb || vb != nvb || isInt != nInt { + if len(c)*2 < cap(c) { + return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) + } + // Chunk is already half full. Better create a new one and save the transcoding efforts. + overflowChunks := newChunk().add(s) + return []chunk{&c, overflowChunks[0]} } - var ntb, nvb deltaBytes - if tb < d8 { - // Maybe more bytes for timestamp. - ntb = bytesNeededForUnsignedTimestampDelta(dt) - } - if c.isInt() && vb < d8 { - // Maybe more bytes for sample value. - nvb = bytesNeededForIntegerSampleValueDelta(dv) - } - if ntb > tb || nvb > vb { - ntb = max(ntb, tb) - nvb = max(nvb, vb) - return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s) - } offset := len(c) c = c[:offset+sampleSize] @@ -205,15 +179,60 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { return []chunk{&c} } -func (c deltaEncodedChunk) sampleSize() int { - return int(c.timeBytes() + c.valueBytes()) +// clone implements chunk. +func (c deltaEncodedChunk) clone() chunk { + clone := make(deltaEncodedChunk, len(c), cap(c)) + copy(clone, c) + return &clone } -func (c deltaEncodedChunk) len() int { - if len(c) < deltaHeaderBytes { - return 0 +// firstTime implements chunk. +func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { + return c.valueAtIndex(0).Timestamp +} + +// lastTime implements chunk. +func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { + return c.valueAtIndex(c.len() - 1).Timestamp +} + +// newIterator implements chunk. +func (c *deltaEncodedChunk) newIterator() chunkIterator { + return &deltaEncodedChunkIterator{ + chunk: c, } - return (len(c) - deltaHeaderBytes) / c.sampleSize() +} + +// marshal implements chunk. +func (c deltaEncodedChunk) marshal(w io.Writer) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint.") + } + binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) + + n, err := w.Write(c[:cap(c)]) + if err != nil { + return err + } + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + } + return nil +} + +// unmarshal implements chunk. +func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { + *c = (*c)[:cap(*c)] + readBytes := 0 + for readBytes < len(*c) { + n, err := r.Read((*c)[readBytes:]) + if err != nil { + return err + } + readBytes += n + } + *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + return nil } // values implements chunk. @@ -229,6 +248,40 @@ func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { return valuesChan } +// encoding implements chunk. +func (c deltaEncodedChunk) encoding() chunkEncoding { return delta } + +func (c deltaEncodedChunk) timeBytes() deltaBytes { + return deltaBytes(c[deltaHeaderTimeBytesOffset]) +} + +func (c deltaEncodedChunk) valueBytes() deltaBytes { + return deltaBytes(c[deltaHeaderValueBytesOffset]) +} + +func (c deltaEncodedChunk) isInt() bool { + return c[deltaHeaderIsIntOffset] == 1 +} + +func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp { + return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) +} + +func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue { + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) +} + +func (c deltaEncodedChunk) sampleSize() int { + return int(c.timeBytes() + c.valueBytes()) +} + +func (c deltaEncodedChunk) len() int { + if len(c) < deltaHeaderBytes { + return 0 + } + return (len(c) - deltaHeaderBytes) / c.sampleSize() +} + func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { offset := deltaHeaderBytes + idx*c.sampleSize() @@ -281,61 +334,12 @@ func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } } -// firstTime implements chunk. -func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { - return c.valueAtIndex(0).Timestamp -} - -// lastTime implements chunk. -func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp -} - -// marshal implements chunk. -func (c deltaEncodedChunk) marshal(w io.Writer) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") - } - binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - - n, err := w.Write(c[:cap(c)]) - if err != nil { - return err - } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) - } - return nil -} - -// unmarshal implements chunk. -func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { - *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n - } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] - return nil -} - // deltaEncodedChunkIterator implements chunkIterator. type deltaEncodedChunkIterator struct { chunk *deltaEncodedChunk // TODO: add more fields here to keep track of last position. } -// newIterator implements chunk. -func (c *deltaEncodedChunk) newIterator() chunkIterator { - return &deltaEncodedChunkIterator{ - chunk: c, - } -} - // getValueAtTime implements chunkIterator. func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { i := sort.Search(it.chunk.len(), func(i int) bool { diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index c5ad79817..a1c9ad224 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -82,10 +82,183 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub return &c } -func (c doubleDeltaEncodedChunk) newFollowupChunk() chunk { - return newDoubleDeltaEncodedChunk(d1, d0, true, cap(c)) +// add implements chunk. +func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { + if c.len() == 0 { + return c.addFirstSample(s) + } + + tb := c.timeBytes() + vb := c.valueBytes() + + if c.len() == 1 { + return c.addSecondSample(s, tb, vb) + } + + remainingBytes := cap(c) - len(c) + sampleSize := c.sampleSize() + + // Do we generally have space for another sample in this chunk? If not, + // overflow into a new one. + if remainingBytes < sampleSize { + overflowChunks := newChunk().add(s) + return []chunk{&c, overflowChunks[0]} + } + + projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta() + ddt := s.Timestamp - projectedTime + + projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta() + ddv := s.Value - projectedValue + + ntb, nvb, nInt := tb, vb, c.isInt() + // If the new sample is incompatible with the current encoding, reencode the + // existing chunk data into new chunk(s). + if c.isInt() && !isInt64(ddv) { + // int->float. + nvb = d4 + nInt = false + } else if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value { + // float32->float64. + nvb = d8 + } else { + if tb < d8 { + // Maybe more bytes for timestamp. + ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt)) + } + if c.isInt() && vb < d8 { + // Maybe more bytes for sample value. + nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv)) + } + } + if tb != ntb || vb != nvb || c.isInt() != nInt { + if len(c)*2 < cap(c) { + return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) + } + // Chunk is already half full. Better create a new one and save the transcoding efforts. + overflowChunks := newChunk().add(s) + return []chunk{&c, overflowChunks[0]} + } + + offset := len(c) + c = c[:offset+sampleSize] + + switch tb { + case d1: + c[offset] = byte(ddt) + case d2: + binary.LittleEndian.PutUint16(c[offset:], uint16(ddt)) + case d4: + binary.LittleEndian.PutUint32(c[offset:], uint32(ddt)) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) + default: + panic("invalid number of bytes for time delta") + } + + offset += int(tb) + + if c.isInt() { + switch vb { + case d0: + // No-op. Constant delta is stored as base value. + case d1: + c[offset] = byte(ddv) + case d2: + binary.LittleEndian.PutUint16(c[offset:], uint16(ddv)) + case d4: + binary.LittleEndian.PutUint32(c[offset:], uint32(ddv)) + // d8 must not happen. Those samples are encoded as float64. + default: + panic("invalid number of bytes for integer delta") + } + } else { + switch vb { + case d4: + binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv))) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) + default: + panic("invalid number of bytes for floating point delta") + } + } + return []chunk{&c} } +// clone implements chunk. +func (c doubleDeltaEncodedChunk) clone() chunk { + clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) + copy(clone, c) + return &clone +} + +// firstTime implements chunk. +func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { + return c.baseTime() +} + +// lastTime implements chunk. +func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { + return c.valueAtIndex(c.len() - 1).Timestamp +} + +// newIterator implements chunk. +func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { + return &doubleDeltaEncodedChunkIterator{ + chunk: c, + } +} + +// marshal implements chunk. +func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint.") + } + binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) + + n, err := w.Write(c[:cap(c)]) + if err != nil { + return err + } + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + } + return nil +} + +// unmarshal implements chunk. +func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { + *c = (*c)[:cap(*c)] + readBytes := 0 + for readBytes < len(*c) { + n, err := r.Read((*c)[readBytes:]) + if err != nil { + return err + } + readBytes += n + } + *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + return nil +} + +// values implements chunk. +func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { + n := c.len() + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < n; i++ { + valuesChan <- c.valueAtIndex(i) + } + close(valuesChan) + }() + return valuesChan +} + +// encoding implements chunk. +func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta } + func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp { return clientmodel.Timestamp( binary.LittleEndian.Uint64( @@ -148,109 +321,6 @@ func (c doubleDeltaEncodedChunk) isInt() bool { return c[doubleDeltaHeaderIsIntOffset] == 1 } -// add implements chunk. -func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { - if c.len() == 0 { - return c.addFirstSample(s) - } - - tb := c.timeBytes() - vb := c.valueBytes() - - if c.len() == 1 { - return c.addSecondSample(s, tb, vb) - } - - remainingBytes := cap(c) - len(c) - sampleSize := c.sampleSize() - - // Do we generally have space for another sample in this chunk? If not, - // overflow into a new one. - if remainingBytes < sampleSize { - overflowChunks := c.newFollowupChunk().add(s) - return []chunk{&c, overflowChunks[0]} - } - - projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta() - ddt := s.Timestamp - projectedTime - - projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta() - ddv := s.Value - projectedValue - - // If the new sample is incompatible with the current encoding, reencode the - // existing chunk data into new chunk(s). - // - // int->float. - if c.isInt() && !isInt64(ddv) { - return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s) - } - // float32->float64. - if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value { - return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s) - } - - var ntb, nvb deltaBytes - if tb < d8 { - // Maybe more bytes for timestamp. - ntb = bytesNeededForSignedTimestampDelta(ddt) - } - if c.isInt() && vb < d8 { - // Maybe more bytes for sample value. - nvb = bytesNeededForIntegerSampleValueDelta(ddv) - } - if ntb > tb || nvb > vb { - ntb = max(ntb, tb) - nvb = max(nvb, vb) - return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s) - } - - offset := len(c) - c = c[:offset+sampleSize] - - switch tb { - case d1: - c[offset] = byte(ddt) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(ddt)) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(ddt)) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) - default: - panic("invalid number of bytes for time delta") - } - - offset += int(tb) - - if c.isInt() { - switch vb { - case d0: - // No-op. Constant delta is stored as base value. - case d1: - c[offset] = byte(ddv) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(ddv)) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(ddv)) - // d8 must not happen. Those samples are encoded as float64. - default: - panic("invalid number of bytes for integer delta") - } - } else { - switch vb { - case d4: - binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv))) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) - default: - panic("invalid number of bytes for floating point delta") - } - } - return []chunk{&c} -} - // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk { @@ -315,26 +385,6 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de return []chunk{&c} } -// clone implements chunk. -func (c doubleDeltaEncodedChunk) clone() chunk { - clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - -// values implements chunk. -func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { - n := c.len() - valuesChan := make(chan *metric.SamplePair) - go func() { - for i := 0; i < n; i++ { - valuesChan <- c.valueAtIndex(i) - } - close(valuesChan) - }() - return valuesChan -} - func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { if idx == 0 { return &metric.SamplePair{ @@ -424,61 +474,12 @@ func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } } -// firstTime implements chunk. -func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { - return c.baseTime() -} - -// lastTime implements chunk. -func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp -} - -// marshal implements chunk. -func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") - } - binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) - - n, err := w.Write(c[:cap(c)]) - if err != nil { - return err - } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) - } - return nil -} - -// unmarshal implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { - *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n - } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] - return nil -} - // doubleDeltaEncodedChunkIterator implements chunkIterator. type doubleDeltaEncodedChunkIterator struct { chunk *doubleDeltaEncodedChunk // TODO(beorn7): add more fields here to keep track of last position. } -// newIterator implements chunk. -func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { - return &doubleDeltaEncodedChunkIterator{ - chunk: c, - } -} - // getValueAtTime implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { // TODO(beorn7): Implement in a more efficient way making use of the diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 590a6256a..74a02da6c 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -95,8 +95,7 @@ type indexingOp struct { // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { - basePath string - chunkType byte + basePath string archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex @@ -121,7 +120,7 @@ type persistence struct { } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. -func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, error) { +func newPersistence(basePath string, dirty bool) (*persistence, error) { dirtyPath := filepath.Join(basePath, dirtyFileName) versionPath := filepath.Join(basePath, versionFileName) @@ -178,8 +177,7 @@ func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, } p := &persistence{ - basePath: basePath, - chunkType: chunkType, + basePath: basePath, archivedFingerprintToMetrics: archivedFingerprintToMetrics, archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, @@ -396,7 +394,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde if err != nil { return nil, err } - chunk := chunkForType(typeBuf[0]) + chunk := newChunkForEncoding(chunkEncoding(typeBuf[0])) chunk.unmarshal(f) chunks = append(chunks, chunk) } @@ -590,7 +588,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } else { // This is the non-persisted head chunk. Fully marshal it. - if err = w.WriteByte(chunkType(chunkDesc.chunk)); err != nil { + if err = w.WriteByte(byte(chunkDesc.chunk.encoding())); err != nil { return } if err = chunkDesc.chunk.marshal(w); err != nil { @@ -742,13 +740,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { } } else { // Non-persisted head chunk. - chunkType, err := r.ReadByte() + encoding, err := r.ReadByte() if err != nil { glog.Warning("Could not decode chunk type:", err) p.dirty = true return sm, nil } - chunk := chunkForType(chunkType) + chunk := newChunkForEncoding(chunkEncoding(encoding)) if err := chunk.unmarshal(r); err != nil { glog.Warning("Could not decode chunk type:", err) p.dirty = true @@ -771,7 +769,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), headChunkPersisted: headChunkPersisted, - chunkType: p.chunkType, } } return sm, nil @@ -1096,7 +1093,7 @@ func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.F func writeChunkHeader(w io.Writer, c chunk) error { header := make([]byte, chunkHeaderLen) - header[chunkHeaderTypeOffset] = chunkType(c) + header[chunkHeaderTypeOffset] = byte(c.encoding()) binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime())) binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime())) _, err := w.Write(header) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index a33c5fba4..382eb034a 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -31,9 +31,10 @@ var ( m3 = clientmodel.Metric{"label": "value3"} ) -func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer) { +func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) { + *defaultChunkEncoding = int(encoding) dir := test.NewTemporaryDirectory("test_persistence", t) - p, err := newPersistence(dir.Path(), chunkType, false) + p, err := newPersistence(dir.Path(), false) if err != nil { dir.Close() t.Fatal(err) @@ -44,7 +45,7 @@ func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer }) } -func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk { +func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk { fps := clientmodel.Fingerprints{ m1.Fingerprint(), m2.Fingerprint(), @@ -55,7 +56,7 @@ func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk { for _, fp := range fps { fpToChunks[fp] = make([]chunk, 0, 10) for i := 0; i < 10; i++ { - fpToChunks[fp] = append(fpToChunks[fp], chunkForType(chunkType).add(&metric.SamplePair{ + fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&metric.SamplePair{ Timestamp: clientmodel.Timestamp(i), Value: clientmodel.SampleValue(fp), })[0]) @@ -75,11 +76,11 @@ func chunksEqual(c1, c2 chunk) bool { return true } -func testPersistLoadDropChunks(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() - fpToChunks := buildTestChunks(chunkType) + fpToChunks := buildTestChunks(encoding) for fp, chunks := range fpToChunks { for i, c := range chunks { @@ -191,15 +192,15 @@ func TestPersistLoadDropChunksType1(t *testing.T) { testPersistLoadDropChunks(t, 1) } -func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, true, 0, chunkType) - s2 := newMemorySeries(m2, false, 0, chunkType) - s3 := newMemorySeries(m3, false, 0, chunkType) + s1 := newMemorySeries(m1, true, 0) + s2 := newMemorySeries(m2, false, 0) + s3 := newMemorySeries(m3, false, 0) s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkPersisted = true @@ -260,8 +261,8 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) { testCheckpointAndLoadSeriesMapAndHeads(t, 1) } -func testGetFingerprintsModifiedBefore(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() m1 := clientmodel.Metric{"n1": "v1"} @@ -338,8 +339,8 @@ func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) { testGetFingerprintsModifiedBefore(t, 1) } -func testDropArchivedMetric(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() m1 := clientmodel.Metric{"n1": "v1"} @@ -420,7 +421,7 @@ type incrementalBatch struct { expectedLpToFps index.LabelPairFingerprintsMapping } -func testIndexing(t *testing.T, chunkType byte) { +func testIndexing(t *testing.T, encoding chunkEncoding) { batches := []incrementalBatch{ { fpToMetric: index.FingerprintMetricMapping{ @@ -556,7 +557,7 @@ func testIndexing(t *testing.T, chunkType byte) { }, } - p, closer := newTestPersistence(t, chunkType) + p, closer := newTestPersistence(t, encoding) defer closer.Close() indexedFpsToMetrics := index.FingerprintMetricMapping{} diff --git a/storage/local/series.go b/storage/local/series.go index c50bb9412..7a4ace7fc 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -158,8 +158,6 @@ type memorySeries struct { // a non-persisted head chunk has to be cloned before more samples are // appended. headChunkUsedByIterator bool - // Which type of chunk to create if a new chunk is needed. - chunkType byte } // newMemorySeries returns a pointer to a newly allocated memorySeries for the @@ -167,13 +165,11 @@ type memorySeries struct { // or (if false) a series for a metric being unarchived, i.e. a series that // existed before but has been evicted from memory. If reallyNew is false, // firstTime is ignored (and set to the lowest possible timestamp instead - it -// will be set properly upon the first eviction of chunkDescs). chunkType is the -// type of chunks newly created by this memorySeries. +// will be set properly upon the first eviction of chunkDescs). func newMemorySeries( m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp, - chunkType byte, ) *memorySeries { if reallyNew { firstTime = clientmodel.Earliest @@ -182,7 +178,6 @@ func newMemorySeries( metric: m, headChunkPersisted: !reallyNew, savedFirstTime: firstTime, - chunkType: chunkType, } if !reallyNew { s.chunkDescsOffset = -1 @@ -195,7 +190,7 @@ func newMemorySeries( // The caller must have locked the fingerprint of the series. func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc { if len(s.chunkDescs) == 0 || s.headChunkPersisted { - newHead := newChunkDesc(chunkForType(s.chunkType)) + newHead := newChunkDesc(newChunk()) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkPersisted = false } else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 { diff --git a/storage/local/storage.go b/storage/local/storage.go index c655e5311..d71117050 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,7 +16,6 @@ package local import ( "container/list" - "fmt" "sync" "sync/atomic" "time" @@ -72,7 +71,6 @@ type memorySeriesStorage struct { dropAfter time.Duration checkpointInterval time.Duration checkpointDirtySeriesLimit int - chunkType byte appendQueue chan *clientmodel.Sample appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. @@ -110,17 +108,13 @@ type MemorySeriesStorageOptions struct { PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. - ChunkType byte // Chunk type for newly created chunks. Dirty bool // Force the storage to consider itself dirty on startup. } // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { - if o.ChunkType > 1 { - return nil, fmt.Errorf("unsupported chunk type %d", o.ChunkType) - } - p, err := newPersistence(o.PersistenceStoragePath, o.ChunkType, o.Dirty) + p, err := newPersistence(o.PersistenceStoragePath, o.Dirty) if err != nil { return nil, err } @@ -148,7 +142,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, - chunkType: o.ChunkType, appendLastTimestamp: clientmodel.Earliest, appendQueue: make(chan *clientmodel.Sample, appendQueueCap), @@ -458,7 +451,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, !unarchived, firstTime, s.chunkType) + series = newMemorySeries(m, !unarchived, firstTime) s.fpToSeries.put(fp, series) s.numSeries.Inc() } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index d045dee77..7e773dcc0 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -181,7 +181,7 @@ func TestLoop(t *testing.T) { } } -func testChunk(t *testing.T, chunkType byte) { +func testChunk(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 500000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -189,7 +189,7 @@ func testChunk(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() s.AppendSamples(samples) @@ -229,7 +229,7 @@ func TestChunkType1(t *testing.T) { testChunk(t, 1) } -func testGetValueAtTime(t *testing.T, chunkType byte) { +func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -237,7 +237,7 @@ func testGetValueAtTime(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() s.AppendSamples(samples) @@ -320,7 +320,7 @@ func TestGetValueAtTimeChunkType1(t *testing.T) { testGetValueAtTime(t, 1) } -func testGetRangeValues(t *testing.T, chunkType byte) { +func testGetRangeValues(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -328,7 +328,7 @@ func testGetRangeValues(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() s.AppendSamples(samples) @@ -470,7 +470,7 @@ func TestGetRangeValuesChunkType1(t *testing.T) { testGetRangeValues(t, 1) } -func testEvictAndPurgeSeries(t *testing.T, chunkType byte) { +func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -478,7 +478,7 @@ func testEvictAndPurgeSeries(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i * i)), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. @@ -576,7 +576,7 @@ func TestEvictAndPurgeSeriesChunkType1(t *testing.T) { testEvictAndPurgeSeries(t, 1) } -func benchmarkAppend(b *testing.B, chunkType byte) { +func benchmarkAppend(b *testing.B, encoding chunkEncoding) { samples := make(clientmodel.Samples, b.N) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -590,7 +590,7 @@ func benchmarkAppend(b *testing.B, chunkType byte) { } } b.ResetTimer() - s, closer := NewTestStorage(b, chunkType) + s, closer := NewTestStorage(b, encoding) defer closer.Close() s.AppendSamples(samples) @@ -606,14 +606,14 @@ func BenchmarkAppendType1(b *testing.B) { // Append a large number of random samples and then check if we can get them out // of the storage alright. -func testFuzz(t *testing.T, chunkType byte) { +func testFuzz(t *testing.T, encoding chunkEncoding) { if testing.Short() { t.Skip("Skipping test in short mode.") } check := func(seed int64) bool { rand.Seed(seed) - s, c := NewTestStorage(t, chunkType) + s, c := NewTestStorage(t, encoding) defer c.Close() samples := createRandomSamples("test_fuzz", 1000) @@ -645,7 +645,8 @@ func TestFuzzChunkType1(t *testing.T) { // make things even slower): // // go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType -func benchmarkFuzz(b *testing.B, chunkType byte) { +func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { + *defaultChunkEncoding = int(encoding) const samplesPerRun = 100000 rand.Seed(42) directory := test.NewTemporaryDirectory("test_storage", b) @@ -655,7 +656,6 @@ func benchmarkFuzz(b *testing.B, chunkType byte) { PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Second, - ChunkType: chunkType, } s, err := NewMemorySeriesStorage(o) if err != nil { diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 319398497..2fc78d429 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -37,14 +37,14 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t test.T, chunkType byte) (Storage, test.Closer) { +func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) { + *defaultChunkEncoding = int(encoding) directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Hour, - ChunkType: chunkType, } storage, err := NewMemorySeriesStorage(o) if err != nil {