diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 5b590db03a..92b6b479b7 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -60,7 +60,7 @@ func chunkType(c chunk) byte { func chunkForType(chunkType byte) chunk { switch chunkType { case 0: - return newDeltaEncodedChunk(1, 1, false) + return newDeltaEncodedChunk(d1, d0, true) default: panic("unknown chunk type") } diff --git a/storage/local/delta.go b/storage/local/delta.go index db23664c96..7d5196ff54 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -41,6 +41,11 @@ const ( deltaHeaderBufLenOffset = 19 ) +// A deltaEncodedChunk adaptively stores sample timestamps and values with a +// delta encoding of various types (int, float) and bit width. However, once 8 +// bytes would be needed to encode a delta value, a fall-back to the absolute +// numbers happens (so that timestamps are saved directly as int64 and values as +// float64). type deltaEncodedChunk struct { buf []byte } @@ -51,7 +56,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk { buf[deltaHeaderTimeBytesOffset] = byte(tb) buf[deltaHeaderValueBytesOffset] = byte(vb) - if isInt { + if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes. buf[deltaHeaderIsIntOffset] = 1 } else { buf[deltaHeaderIsIntOffset] = 0 @@ -77,35 +82,35 @@ func (c *deltaEncodedChunk) clone() chunk { } func neededDeltaBytes(deltaT clientmodel.Timestamp, deltaV clientmodel.SampleValue, isInt bool) (dtb, dvb deltaBytes) { - dtb = 1 + dtb = d1 if deltaT >= 256 { - dtb = 2 + dtb = d2 } if deltaT >= 256*256 { - dtb = 4 + dtb = d4 } if deltaT >= 256*256*256*256 { - dtb = 8 + dtb = d8 } if isInt { - dvb = 0 + dvb = d0 if deltaV != 0 { - dvb = 1 + dvb = d1 } if deltaV < -(256/2) || deltaV > (256/2)-1 { - dvb = 2 + dvb = d2 } if deltaV < -(256*256/2) || deltaV > (256*256/2)-1 { - dvb = 4 + dvb = d4 } if deltaV < -(256*256*256*256/2) || deltaV > (256*256*256*256/2)-1 { - dvb = 8 + dvb = d8 } } else { - dvb = 4 + dvb = d4 if clientmodel.SampleValue(float32(deltaV)) != deltaV { - dvb = 8 + dvb = d8 } } return dtb, dvb @@ -149,8 +154,7 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks { sampleSize := c.sampleSize() // Do we generally have space for another sample in this chunk? If not, - // overflow into a new one. We assume that if we have seen floating point - // values once, the series will most likely contain floats in the future. + // overflow into a new one. if remainingBytes < sampleSize { //fmt.Println("overflow") overflowChunks := c.newFollowupChunk().add(s) @@ -159,6 +163,8 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks { dt := s.Timestamp - c.baseTime() dv := s.Value - c.baseValue() + tb := c.timeBytes() + vb := c.valueBytes() // If the new sample is incompatible with the current encoding, reencode the // existing chunk data into new chunk(s). @@ -166,61 +172,65 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks { // int->float. // TODO: compare speed with Math.Modf. if c.isInt() && clientmodel.SampleValue(int64(dv)) != dv { - //fmt.Println("int->float", len(c.buf), cap(c.buf)) - return transcodeAndAdd(newDeltaEncodedChunk(c.timeBytes(), d4, false), c, s) + //fmt.Println("int->float", len(c.buf), cap(c.buf), dv) + return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false), c, s) } // float32->float64. - if !c.isInt() && c.valueBytes() == d4 && clientmodel.SampleValue(float32(dv)) != dv { + if !c.isInt() && vb == d4 && clientmodel.SampleValue(float32(dv)) != dv { //fmt.Println("float32->float64", float32(dv), dv, len(c.buf), cap(c.buf)) - return transcodeAndAdd(newDeltaEncodedChunk(c.timeBytes(), d8, false), c, s) + return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false), c, s) } - // More bytes per sample. - if dtb, dvb := neededDeltaBytes(dt, dv, c.isInt()); dtb > c.timeBytes() || dvb > c.valueBytes() { - //fmt.Printf("transcoding T: %v->%v, V: %v->%v, I: %v; len %v, cap %v\n", c.timeBytes(), dtb, c.valueBytes(), dvb, c.isInt(), len(c.buf), cap(c.buf)) - dtb = max(dtb, c.timeBytes()) - dvb = max(dvb, c.valueBytes()) - return transcodeAndAdd(newDeltaEncodedChunk(dtb, dvb, c.isInt()), c, s) + if tb < d8 || vb < d8 { + // Maybe more bytes per sample. + if ntb, nvb := neededDeltaBytes(dt, dv, c.isInt()); ntb > tb || nvb > vb { + //fmt.Printf("transcoding T: %v->%v, V: %v->%v, I: %v; len %v, cap %v\n", tb, ntb, vb, nvb, c.isInt(), len(c.buf), cap(c.buf)) + ntb = max(ntb, tb) + nvb = max(nvb, vb) + return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt()), c, s) + } } - offset := len(c.buf) c.buf = c.buf[:offset+sampleSize] - switch c.timeBytes() { - case 1: + switch tb { + case d1: c.buf[offset] = byte(dt) - case 2: + case d2: binary.LittleEndian.PutUint16(c.buf[offset:], uint16(dt)) - case 4: + case d4: binary.LittleEndian.PutUint32(c.buf[offset:], uint32(dt)) - case 8: - binary.LittleEndian.PutUint64(c.buf[offset:], uint64(dt)) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c.buf[offset:], uint64(s.Timestamp)) + default: + panic("invalid number of bytes for time delta") } - offset += int(c.timeBytes()) + offset += int(tb) if c.isInt() { - switch c.valueBytes() { - case 0: + switch vb { + case d0: // No-op. Constant value is stored as base value. - case 1: + case d1: c.buf[offset] = byte(dv) - case 2: + case d2: binary.LittleEndian.PutUint16(c.buf[offset:], uint16(dv)) - case 4: + case d4: binary.LittleEndian.PutUint32(c.buf[offset:], uint32(dv)) - case 8: - binary.LittleEndian.PutUint64(c.buf[offset:], uint64(dv)) + // d8 must not happen. Those samples are encoded as float64. default: - panic("Invalid number of bytes for integer delta") + panic("invalid number of bytes for integer delta") } } else { - switch c.valueBytes() { - case 4: + switch vb { + case d4: binary.LittleEndian.PutUint32(c.buf[offset:], math.Float32bits(float32(dv))) - case 8: - binary.LittleEndian.PutUint64(c.buf[offset:], math.Float64bits(float64(dv))) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c.buf[offset:], math.Float64bits(float64(s.Value))) default: - panic("Invalid number of bytes for floating point delta") + panic("invalid number of bytes for floating point delta") } } return chunks{c} @@ -258,49 +268,52 @@ func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair { func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { offset := deltaHeaderBytes + idx*c.sampleSize() - var dt uint64 + var ts clientmodel.Timestamp switch c.timeBytes() { - case 1: - dt = uint64(uint8(c.buf[offset])) - case 2: - dt = uint64(binary.LittleEndian.Uint16(c.buf[offset:])) - case 4: - dt = uint64(binary.LittleEndian.Uint32(c.buf[offset:])) - case 8: - dt = uint64(binary.LittleEndian.Uint64(c.buf[offset:])) + case d1: + ts = c.baseTime() + clientmodel.Timestamp(uint8(c.buf[offset])) + case d2: + ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c.buf[offset:])) + case d4: + ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c.buf[offset:])) + case d8: + // Take absolute value for d8. + ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c.buf[offset:])) + default: + panic("Invalid number of bytes for time delta") } offset += int(c.timeBytes()) - var dv clientmodel.SampleValue + var v clientmodel.SampleValue if c.isInt() { switch c.valueBytes() { - case 0: - dv = clientmodel.SampleValue(0) - case 1: - dv = clientmodel.SampleValue(int8(c.buf[offset])) - case 2: - dv = clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c.buf[offset:]))) - case 4: - dv = clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c.buf[offset:]))) - case 8: - dv = clientmodel.SampleValue(int64(binary.LittleEndian.Uint64(c.buf[offset:]))) + case d0: + v = c.baseValue() + case d1: + v = c.baseValue() + clientmodel.SampleValue(int8(c.buf[offset])) + case d2: + v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c.buf[offset:]))) + case d4: + v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c.buf[offset:]))) + // No d8 for ints. default: panic("Invalid number of bytes for integer delta") } } else { switch c.valueBytes() { - case 4: - dv = clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c.buf[offset:]))) - case 8: - dv = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[offset:]))) + case d4: + v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c.buf[offset:]))) + case d8: + // Take absolute value for d8. + v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[offset:]))) default: panic("Invalid number of bytes for floating point delta") } } return &metric.SamplePair{ - Timestamp: c.baseTime() + clientmodel.Timestamp(dt), - Value: c.baseValue() + dv, + Timestamp: ts, + Value: v, } } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 6d83009ace..4f24dd2dab 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -244,7 +244,7 @@ func TestPersistChunk(t *testing.T) { for fp, expectedChunks := range fpToChunks { indexes := make([]int, 0, len(expectedChunks)) - for i, _ := range expectedChunks { + for i := range expectedChunks { indexes = append(indexes, i) } actualChunks, err := p.LoadChunks(fp, indexes) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index beeba5f7c3..7ea0c8d9ae 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -324,9 +324,8 @@ func createRandomSamples(r *rand.Rand) clientmodel.Samples { }, }, { // Integer with int deltas of various byte length. - // TODO: Using larger ints yields even worse results. Improve! createValue: func() clientmodel.SampleValue { - return clientmodel.SampleValue(r.Int31() - 1<<30) + return clientmodel.SampleValue(r.Int63() - 1<<62) }, applyDelta: []deltaApplier{ func(v clientmodel.SampleValue) clientmodel.SampleValue { @@ -435,8 +434,7 @@ func verifyStorage(t *testing.T, s Storage, samples clientmodel.Samples, r *rand } want := float64(sample.Value) got := float64(found[0].Value) - // TODO: 0.01 is a horribly large deviation. Improve! - if want != got && (want == 0. || math.Abs(want-got)/want > 0.01) { + if want != got && (want == 0. || math.Abs(want-got)/want > 0.000001) { t.Errorf("Value mismatch, want %f, got %f.", want, got) result = false }