diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 582789fb7e..b843fa3a71 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -17,6 +17,7 @@ import ( "container/list" "fmt" "io" + "sort" "sync" "sync/atomic" @@ -342,3 +343,102 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } + +// indexAccessor allows accesses to samples by index. +type indexAccessor interface { + timestampAtIndex(int) model.Time + sampleValueAtIndex(int) model.SampleValue + err() error +} + +// indexAccessingChunkIterator is a chunk iterator for chunks for which an +// indexAccessor implementation exists. +type indexAccessingChunkIterator struct { + len int + pos int + lastValue model.SamplePair + acc indexAccessor +} + +func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator { + return &indexAccessingChunkIterator{ + len: len, + pos: -1, + lastValue: ZeroSamplePair, + acc: acc, + } +} + +// lastTimestamp implements chunkIterator. +func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { + return it.acc.timestampAtIndex(it.len - 1), it.acc.err() +} + +// valueAtOrBeforeTime implements chunkIterator. +func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + i := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(t) + }) + if i == 0 || it.acc.err() != nil { + return ZeroSamplePair, it.acc.err() + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i - 1), + Value: it.acc.sampleValueAtIndex(i - 1), + } + return it.lastValue, it.acc.err() +} + +// rangeValues implements chunkIterator. +func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + oldest := sort.Search(it.len, func(i int) bool { + return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive) + }) + newest := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(in.NewestInclusive) + }) + if oldest == it.len || it.acc.err() != nil { + return nil, it.acc.err() + } + + result := make([]model.SamplePair, 0, newest-oldest) + for i := oldest; i < newest; i++ { + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i), + Value: it.acc.sampleValueAtIndex(i), + } + result = append(result, it.lastValue) + } + return result, it.acc.err() +} + +// contains implements chunkIterator. +func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { + return !t.Before(it.acc.timestampAtIndex(0)) && + !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() +} + +// scan implements chunkIterator. +func (it *indexAccessingChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false + } + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(it.pos), + Value: it.acc.sampleValueAtIndex(it.pos), + } + return it.acc.err() == nil +} + +// value implements chunkIterator. +func (it *indexAccessingChunkIterator) value() model.SamplePair { + return it.lastValue +} + +// err implements chunkIterator. +func (it *indexAccessingChunkIterator) err() error { + return it.acc.err() +} diff --git a/storage/local/delta.go b/storage/local/delta.go index 3dd41cf775..a74e0806a3 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 21-byte header of a delta-encoded chunk looks like: @@ -201,17 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { - return &deltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - pos: -1, - lastValue: ZeroSamplePair, - } + return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ + c: *c, + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + }) } // marshal implements chunk. @@ -305,137 +299,65 @@ func (c deltaEncodedChunk) len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } -// deltaEncodedChunkIterator implements chunkIterator. -type deltaEncodedChunkIterator struct { +// deltaEncodedIndexAccessor implements indexAccessor. +type deltaEncodedIndexAccessor struct { c deltaEncodedChunk - len int baseT model.Time baseV model.SampleValue tBytes, vBytes deltaBytes isInt bool - pos int - lastValue model.SamplePair lastErr error } -// lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1), it.lastErr +func (acc *deltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) - }) - if i == 0 || it.lastErr != nil { - return ZeroSamplePair, it.lastErr - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.lastErr -} +func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) -// rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.lastErr != nil { - return nil, it.lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.lastErr -} - -// contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr -} - -// scan implements chunkIterator. -func (it *deltaEncodedChunkIterator) scan() bool { - it.pos++ - if it.pos >= it.len { - return false - } - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(it.pos), - Value: it.sampleValueAtIndex(it.pos), - } - return it.lastErr == nil -} - -// value implements chunkIterator. -func (it *deltaEncodedChunkIterator) value() model.SamplePair { - return it.lastValue -} - -// err implements chunkIterator. -func (it *deltaEncodedChunkIterator) err() error { - return it.lastErr -} - -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) - - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])) + return acc.baseT + model.Time(uint8(acc.c[offset])) case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:])) case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:])) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) } return model.Earliest } -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) +func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + return acc.baseV case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])) + return acc.baseV + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) } } return 0 diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 84c8d38fc2..a53d41f6bc 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 37-byte header of a delta-encoded chunk looks like: @@ -207,19 +204,16 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { - return &doubleDeltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseΔT: c.baseTimeDelta(), - baseV: c.baseValue(), - baseΔV: c.baseValueDelta(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - pos: -1, - lastValue: ZeroSamplePair, - } + return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ + c: *c, + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + }) } // marshal implements chunk. @@ -411,176 +405,104 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt return []chunk{&c}, nil } -// doubleDeltaEncodedChunkIterator implements chunkIterator. -type doubleDeltaEncodedChunkIterator struct { +// doubleDeltaEncodedIndexAccessor implements indexAccessor. +type doubleDeltaEncodedIndexAccessor struct { c doubleDeltaEncodedChunk - len int baseT, baseΔT model.Time baseV, baseΔV model.SampleValue tBytes, vBytes deltaBytes isInt bool - pos int - lastValue model.SamplePair lastErr error } -// lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1), it.lastErr +func (acc *doubleDeltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) - }) - if i == 0 || it.lastErr != nil { - return ZeroSamplePair, it.lastErr - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.lastErr -} - -// rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.lastErr != nil { - return nil, it.lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.lastErr -} - -// contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr -} - -// scan implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) scan() bool { - it.pos++ - if it.pos >= it.len { - return false - } - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(it.pos), - Value: it.sampleValueAtIndex(it.pos), - } - return it.lastErr == nil -} - -// value implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair { - return it.lastValue -} - -// err implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) err() error { - return it.lastErr -} - -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { if idx == 0 { - return it.baseT + return acc.baseT } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. - if it.tBytes == d8 { - return it.baseΔT + if acc.tBytes == d8 { + return acc.baseΔT } - return it.baseT + it.baseΔT + return acc.baseT + acc.baseΔT } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int8(acc.c[offset])) case d2: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) } return model.Earliest } -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { if idx == 0 { - return it.baseV + return acc.baseV } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. - if it.vBytes == d8 { - return it.baseΔV + if acc.vBytes == d8 { + return acc.baseΔV } - return it.baseV + it.baseΔV + return acc.baseV + acc.baseΔV } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV case d1: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) } } return 0