diff --git a/storage/local/delta.go b/storage/local/delta.go index 4d481aa77..9ea35f004 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -108,7 +108,7 @@ func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue { // add implements chunk. func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { - if len(c) < deltaHeaderBytes { + if c.len() == 0 { c = c[:deltaHeaderBytes] binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value))) diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 58fd98955..bc8abdd0d 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -150,71 +150,15 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // add implements chunk. func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { - if len(c) <= doubleDeltaHeaderIsIntOffset+1 { - // This is the first sample added to this chunk. Add it as base - // time and value. - c = c[:doubleDeltaHeaderBaseValueOffset+8] - binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeOffset:], - uint64(s.Timestamp), - ) - binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueOffset:], - math.Float64bits(float64(s.Value)), - ) - return []chunk{&c} + if c.len() == 0 { + return c.addFirstSample(s) } tb := c.timeBytes() vb := c.valueBytes() - if len(c) <= doubleDeltaHeaderBaseValueOffset+8 { - // This is the 2nd sample added to this chunk. Calculate the - // base deltas. - baseTimeDelta := s.Timestamp - c.baseTime() - if baseTimeDelta < 0 { - // TODO(beorn7): We ignore this irregular case for now. Once - // https://github.com/prometheus/prometheus/issues/481 is - // fixed, we should panic here instead. - return []chunk{&c} - } - c = c[:doubleDeltaHeaderBytes] - if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { - // If already the base delta needs d8 (or we are at d8 - // already, anyway), we better encode this timestamp - // directly rather than as a delta and switch everything - // to d8. - c[doubleDeltaHeaderTimeBytesOffset] = byte(d8) - binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeDeltaOffset:], - uint64(s.Timestamp), - ) - } else { - binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeDeltaOffset:], - uint64(baseTimeDelta), - ) - } - baseValue := c.baseValue() - baseValueDelta := s.Value - baseValue - if vb >= d8 || baseValue+baseValueDelta != s.Value { - // If we can't reproduce the original sample value (or - // if we are at d8 already, anyway), we better encode - // this value directly rather than as a delta and switch - // everything to d8. - c[doubleDeltaHeaderValueBytesOffset] = byte(d8) - c[doubleDeltaHeaderIsIntOffset] = 0 - binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueDeltaOffset:], - math.Float64bits(float64(s.Value)), - ) - } else { - binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueDeltaOffset:], - math.Float64bits(float64(baseValueDelta)), - ) - } - return []chunk{&c} + if c.len() == 1 { + return c.addSecondSample(s, tb, vb) } remainingBytes := cap(c) - len(c) @@ -298,6 +242,70 @@ func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { return []chunk{&c} } +// addFirstSample is a helper method only used by c.add(). It adds timestamp and +// value as base time and value. +func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk { + c = c[:doubleDeltaHeaderBaseValueOffset+8] + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseTimeOffset:], + uint64(s.Timestamp), + ) + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseValueOffset:], + math.Float64bits(float64(s.Value)), + ) + return []chunk{&c} +} + +// addSecondSample is a helper method only used by c.add(). It calculates the +// base delta from the provided sample and adds it to the chunk. +func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk { + baseTimeDelta := s.Timestamp - c.baseTime() + if baseTimeDelta < 0 { + // TODO(beorn7): We ignore this irregular case for now. Once + // https://github.com/prometheus/prometheus/issues/481 is + // fixed, we should panic here instead. + return []chunk{&c} + } + c = c[:doubleDeltaHeaderBytes] + if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { + // If already the base delta needs d8 (or we are at d8 + // already, anyway), we better encode this timestamp + // directly rather than as a delta and switch everything + // to d8. + c[doubleDeltaHeaderTimeBytesOffset] = byte(d8) + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseTimeDeltaOffset:], + uint64(s.Timestamp), + ) + } else { + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseTimeDeltaOffset:], + uint64(baseTimeDelta), + ) + } + baseValue := c.baseValue() + baseValueDelta := s.Value - baseValue + if vb >= d8 || baseValue+baseValueDelta != s.Value { + // If we can't reproduce the original sample value (or + // if we are at d8 already, anyway), we better encode + // this value directly rather than as a delta and switch + // everything to d8. + c[doubleDeltaHeaderValueBytesOffset] = byte(d8) + c[doubleDeltaHeaderIsIntOffset] = 0 + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseValueDeltaOffset:], + math.Float64bits(float64(s.Value)), + ) + } else { + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseValueDeltaOffset:], + math.Float64bits(float64(baseValueDelta)), + ) + } + return []chunk{&c} +} + // clone implements chunk. func (c doubleDeltaEncodedChunk) clone() chunk { clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index bc866a615..a33c5fba4 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -592,7 +592,7 @@ func testIndexing(t *testing.T, chunkType byte) { } func TestIndexingChunkType0(t *testing.T) { - testIndexing(t, 1) + testIndexing(t, 0) } func TestIndexingChunkType1(t *testing.T) {