From c13b1ecfe9144b8bcc62bfc7fe388c9da1ad30ab Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 7 Mar 2016 20:23:14 +0100 Subject: [PATCH] Make chunk iterators more DRY This finally extracts all the common code of the two chunk iterators into one. Any future chunk encodings with fast access by index can use the same iterator by simply providing an indexAccessor. Other future chunk encodings without fast index access (like Gorilla-style) can still implement the chunkIterator interface as usual. --- storage/local/chunk.go | 100 ++++++++++++++++++ storage/local/delta.go | 144 ++++++-------------------- storage/local/doubledelta.go | 194 +++++++++++------------------------ 3 files changed, 191 insertions(+), 247 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 582789fb7e..b843fa3a71 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -17,6 +17,7 @@ import ( "container/list" "fmt" "io" + "sort" "sync" "sync/atomic" @@ -342,3 +343,102 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } + +// indexAccessor allows accesses to samples by index. +type indexAccessor interface { + timestampAtIndex(int) model.Time + sampleValueAtIndex(int) model.SampleValue + err() error +} + +// indexAccessingChunkIterator is a chunk iterator for chunks for which an +// indexAccessor implementation exists. +type indexAccessingChunkIterator struct { + len int + pos int + lastValue model.SamplePair + acc indexAccessor +} + +func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator { + return &indexAccessingChunkIterator{ + len: len, + pos: -1, + lastValue: ZeroSamplePair, + acc: acc, + } +} + +// lastTimestamp implements chunkIterator. +func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { + return it.acc.timestampAtIndex(it.len - 1), it.acc.err() +} + +// valueAtOrBeforeTime implements chunkIterator. +func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + i := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(t) + }) + if i == 0 || it.acc.err() != nil { + return ZeroSamplePair, it.acc.err() + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i - 1), + Value: it.acc.sampleValueAtIndex(i - 1), + } + return it.lastValue, it.acc.err() +} + +// rangeValues implements chunkIterator. +func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + oldest := sort.Search(it.len, func(i int) bool { + return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive) + }) + newest := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(in.NewestInclusive) + }) + if oldest == it.len || it.acc.err() != nil { + return nil, it.acc.err() + } + + result := make([]model.SamplePair, 0, newest-oldest) + for i := oldest; i < newest; i++ { + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i), + Value: it.acc.sampleValueAtIndex(i), + } + result = append(result, it.lastValue) + } + return result, it.acc.err() +} + +// contains implements chunkIterator. +func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { + return !t.Before(it.acc.timestampAtIndex(0)) && + !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() +} + +// scan implements chunkIterator. +func (it *indexAccessingChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false + } + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(it.pos), + Value: it.acc.sampleValueAtIndex(it.pos), + } + return it.acc.err() == nil +} + +// value implements chunkIterator. +func (it *indexAccessingChunkIterator) value() model.SamplePair { + return it.lastValue +} + +// err implements chunkIterator. +func (it *indexAccessingChunkIterator) err() error { + return it.acc.err() +} diff --git a/storage/local/delta.go b/storage/local/delta.go index 3dd41cf775..a74e0806a3 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 21-byte header of a delta-encoded chunk looks like: @@ -201,17 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { - return &deltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - pos: -1, - lastValue: ZeroSamplePair, - } + return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ + c: *c, + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + }) } // marshal implements chunk. @@ -305,137 +299,65 @@ func (c deltaEncodedChunk) len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } -// deltaEncodedChunkIterator implements chunkIterator. -type deltaEncodedChunkIterator struct { +// deltaEncodedIndexAccessor implements indexAccessor. +type deltaEncodedIndexAccessor struct { c deltaEncodedChunk - len int baseT model.Time baseV model.SampleValue tBytes, vBytes deltaBytes isInt bool - pos int - lastValue model.SamplePair lastErr error } -// lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1), it.lastErr +func (acc *deltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) - }) - if i == 0 || it.lastErr != nil { - return ZeroSamplePair, it.lastErr - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.lastErr -} +func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) -// rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.lastErr != nil { - return nil, it.lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.lastErr -} - -// contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr -} - -// scan implements chunkIterator. -func (it *deltaEncodedChunkIterator) scan() bool { - it.pos++ - if it.pos >= it.len { - return false - } - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(it.pos), - Value: it.sampleValueAtIndex(it.pos), - } - return it.lastErr == nil -} - -// value implements chunkIterator. -func (it *deltaEncodedChunkIterator) value() model.SamplePair { - return it.lastValue -} - -// err implements chunkIterator. -func (it *deltaEncodedChunkIterator) err() error { - return it.lastErr -} - -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) - - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])) + return acc.baseT + model.Time(uint8(acc.c[offset])) case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:])) case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:])) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) } return model.Earliest } -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) +func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + return acc.baseV case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])) + return acc.baseV + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) } } return 0 diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 84c8d38fc2..a53d41f6bc 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 37-byte header of a delta-encoded chunk looks like: @@ -207,19 +204,16 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { - return &doubleDeltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseΔT: c.baseTimeDelta(), - baseV: c.baseValue(), - baseΔV: c.baseValueDelta(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - pos: -1, - lastValue: ZeroSamplePair, - } + return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ + c: *c, + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + }) } // marshal implements chunk. @@ -411,176 +405,104 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt return []chunk{&c}, nil } -// doubleDeltaEncodedChunkIterator implements chunkIterator. -type doubleDeltaEncodedChunkIterator struct { +// doubleDeltaEncodedIndexAccessor implements indexAccessor. +type doubleDeltaEncodedIndexAccessor struct { c doubleDeltaEncodedChunk - len int baseT, baseΔT model.Time baseV, baseΔV model.SampleValue tBytes, vBytes deltaBytes isInt bool - pos int - lastValue model.SamplePair lastErr error } -// lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1), it.lastErr +func (acc *doubleDeltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) - }) - if i == 0 || it.lastErr != nil { - return ZeroSamplePair, it.lastErr - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.lastErr -} - -// rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.lastErr != nil { - return nil, it.lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.lastErr -} - -// contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr -} - -// scan implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) scan() bool { - it.pos++ - if it.pos >= it.len { - return false - } - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(it.pos), - Value: it.sampleValueAtIndex(it.pos), - } - return it.lastErr == nil -} - -// value implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair { - return it.lastValue -} - -// err implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) err() error { - return it.lastErr -} - -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { if idx == 0 { - return it.baseT + return acc.baseT } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. - if it.tBytes == d8 { - return it.baseΔT + if acc.tBytes == d8 { + return acc.baseΔT } - return it.baseT + it.baseΔT + return acc.baseT + acc.baseΔT } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int8(acc.c[offset])) case d2: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) } return model.Earliest } -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { if idx == 0 { - return it.baseV + return acc.baseV } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. - if it.vBytes == d8 { - return it.baseΔV + if acc.vBytes == d8 { + return acc.baseΔV } - return it.baseV + it.baseΔV + return acc.baseV + acc.baseΔV } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV case d1: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) } } return 0