diff --git a/storage/local/chunk.go b/storage/local/chunk.go index d8b1ae2b8d..582789fb7e 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -259,18 +259,13 @@ type chunk interface { // A chunkIterator enables efficient access to the content of a chunk. It is // generally not safe to use a chunkIterator concurrently with or after chunk -// mutation. +// mutation. The error returned by any of the methods is always the last error +// encountered by the iterator, i.e. once an error has been encountered, no +// method will ever return a nil error again. In general, errors signal data +// corruption in the chunk and require quarantining. type chunkIterator interface { - // length returns the number of samples in the chunk. - length() int - // Gets the timestamp of the n-th sample in the chunk. - timestampAtIndex(int) (model.Time, error) // Gets the last timestamp in the chunk. lastTimestamp() (model.Time, error) - // Gets the sample value of the n-th sample in the chunk. - sampleValueAtIndex(int) (model.SampleValue, error) - // Gets the last sample value in the chunk. - lastSampleValue() (model.SampleValue, error) // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no // applicable value exists, ZeroSamplePair is returned. @@ -280,35 +275,48 @@ type chunkIterator interface { // Whether a given timestamp is contained between first and last value // in the chunk. contains(model.Time) (bool, error) - // values returns a channel, from which all sample values in the chunk - // can be received in order. The channel is closed after the last - // one. It is generally not safe to mutate the chunk while the channel - // is still open. If a value is returned with error!=nil, no further - // values will be returned and the channel is closed. - values() <-chan struct { - model.SamplePair - error - } + // scan, value, and err implement a bufio.Scanner-like interface. The + // scan method advances the iterator to the next value in the chunk and + // returns true if that worked. In that case, the value method will + // return the sample pair the iterator has advanced to. If the scan + // method returns false, either an error has occured or the end of the + // chunk has been reached. In the former case, the err method will + // return the error. In the latter case, the err method will return nil. + // Upon creation, the iterator is at position "minus one". After the + // first scan call, value will return the 1st value in the + // chunk. valueAtOrBeforeTime and rangeValues all modify the iterator + // position, too. They behave as if their return values were retrieved + // after a scan call, i.e. calling the value or err methods after a call + // to those methods will retrieve the same return value again (or the + // last value in the range in case of rangeValues), and subsequent scan + // calls will advance the iterator from there. + scan() bool + value() model.SamplePair + err() error } func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() - head := dst - body := []chunk{} - for v := range src.newIterator().values() { - if v.error != nil { - return nil, v.error - } - newChunks, err := head.add(v.SamplePair) - if err != nil { + var ( + head = dst + body, newChunks []chunk + err error + ) + + it := src.newIterator() + for it.scan() { + if newChunks, err = head.add(it.value()); err != nil { return nil, err } body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] } - newChunks, err := head.add(s) - if err != nil { + if it.err() != nil { + return nil, it.err() + } + + if newChunks, err = head.add(s); err != nil { return nil, err } return append(body, newChunks...), nil diff --git a/storage/local/delta.go b/storage/local/delta.go index c787020722..3dd41cf775 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -202,13 +202,15 @@ func (c deltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { return &deltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + pos: -1, + lastValue: ZeroSamplePair, } } @@ -311,176 +313,130 @@ type deltaEncodedChunkIterator struct { baseV model.SampleValue tBytes, vBytes deltaBytes isInt bool -} - -// length implements chunkIterator. -func (it *deltaEncodedChunkIterator) length() int { return it.len } - -// valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - var lastErr error - i := sort.Search(it.len, func(i int) bool { - ts, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return ts.After(t) - }) - if i == 0 || lastErr != nil { - return ZeroSamplePair, lastErr - } - ts, err := it.timestampAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - v, err := it.sampleValueAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - return model.SamplePair{Timestamp: ts, Value: v}, nil -} - -// rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - var lastErr error - - oldest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return !t.Before(in.OldestInclusive) - }) - - newest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return t.After(in.NewestInclusive) - }) - - if oldest == it.len || lastErr != nil { - return nil, lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - return nil, err - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - return nil, err - } - result = append(result, model.SamplePair{Timestamp: t, Value: v}) - } - return result, nil -} - -// contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - lastT, err := it.timestampAtIndex(it.len - 1) - if err != nil { - return false, err - } - return !t.Before(it.baseT) && !t.After(lastT), nil -} - -// values implements chunkIterator. -func (it *deltaEncodedChunkIterator) values() <-chan struct { - model.SamplePair - error -} { - valuesChan := make(chan struct { - model.SamplePair - error - }) - go func() { - for i := 0; i < it.len; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - valuesChan <- struct { - model.SamplePair - error - }{model.SamplePair{Timestamp: t, Value: v}, nil} - } - close(valuesChan) - }() - return valuesChan -} - -// timestampAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) - - switch it.tBytes { - case d1: - return it.baseT + model.Time(uint8(it.c[offset])), nil - case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil - case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil - case d8: - // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil - default: - return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) - } + pos int + lastValue model.SamplePair + lastErr error } // lastTimestamp implements chunkIterator. func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1) + return it.timestampAtIndex(it.len - 1), it.lastErr } -// sampleValueAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { +// valueAtOrBeforeTime implements chunkIterator. +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + i := sort.Search(it.len, func(i int) bool { + return it.timestampAtIndex(i).After(t) + }) + if i == 0 || it.lastErr != nil { + return ZeroSamplePair, it.lastErr + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), + } + return it.lastValue, it.lastErr +} + +// rangeValues implements chunkIterator. +func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + oldest := sort.Search(it.len, func(i int) bool { + return !it.timestampAtIndex(i).Before(in.OldestInclusive) + }) + newest := sort.Search(it.len, func(i int) bool { + return it.timestampAtIndex(i).After(in.NewestInclusive) + }) + if oldest == it.len || it.lastErr != nil { + return nil, it.lastErr + } + + result := make([]model.SamplePair, 0, newest-oldest) + for i := oldest; i < newest; i++ { + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), + } + result = append(result, it.lastValue) + } + return result, it.lastErr +} + +// contains implements chunkIterator. +func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr +} + +// scan implements chunkIterator. +func (it *deltaEncodedChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false + } + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(it.pos), + Value: it.sampleValueAtIndex(it.pos), + } + return it.lastErr == nil +} + +// value implements chunkIterator. +func (it *deltaEncodedChunkIterator) value() model.SamplePair { + return it.lastValue +} + +// err implements chunkIterator. +func (it *deltaEncodedChunkIterator) err() error { + return it.lastErr +} + +func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { + offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + + switch it.tBytes { + case d1: + return it.baseT + model.Time(uint8(it.c[offset])) + case d2: + return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + case d4: + return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + case d8: + // Take absolute value for d8. + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + default: + it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + } + return model.Earliest +} + +func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) if it.isInt { switch it.vBytes { case d0: - return it.baseV, nil + return it.baseV case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])), nil + return it.baseV + model.SampleValue(int8(it.c[offset])) case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } -} - -// lastSampleValue implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { - return it.sampleValueAtIndex(it.len - 1) + return 0 } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 257c845443..84c8d38fc2 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -208,15 +208,17 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { return &doubleDeltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseΔT: c.baseTimeDelta(), - baseV: c.baseValue(), - baseΔV: c.baseValueDelta(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + pos: -1, + lastValue: ZeroSamplePair, } } @@ -417,132 +419,95 @@ type doubleDeltaEncodedChunkIterator struct { baseV, baseΔV model.SampleValue tBytes, vBytes deltaBytes isInt bool + pos int + lastValue model.SamplePair + lastErr error } -// length implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } +// lastTimestamp implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { + return it.timestampAtIndex(it.len - 1), it.lastErr +} // valueAtOrBeforeTime implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - var lastErr error i := sort.Search(it.len, func(i int) bool { - ts, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return ts.After(t) + return it.timestampAtIndex(i).After(t) }) - if i == 0 || lastErr != nil { - return ZeroSamplePair, lastErr + if i == 0 || it.lastErr != nil { + return ZeroSamplePair, it.lastErr } - ts, err := it.timestampAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), } - v, err := it.sampleValueAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - return model.SamplePair{Timestamp: ts, Value: v}, nil + return it.lastValue, it.lastErr } // rangeValues implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - var lastErr error - oldest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return !t.Before(in.OldestInclusive) + return !it.timestampAtIndex(i).Before(in.OldestInclusive) }) - newest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return t.After(in.NewestInclusive) + return it.timestampAtIndex(i).After(in.NewestInclusive) }) - - if oldest == it.len || lastErr != nil { - return nil, lastErr + if oldest == it.len || it.lastErr != nil { + return nil, it.lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - return nil, err + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), } - v, err := it.sampleValueAtIndex(i) - if err != nil { - return nil, err - } - result = append(result, model.SamplePair{Timestamp: t, Value: v}) + result = append(result, it.lastValue) } - return result, nil + return result, it.lastErr } // contains implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - lastT, err := it.timestampAtIndex(it.len - 1) - if err != nil { - return false, err + return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr +} + +// scan implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false } - return !t.Before(it.baseT) && !t.After(lastT), nil + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(it.pos), + Value: it.sampleValueAtIndex(it.pos), + } + return it.lastErr == nil } -// values implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct { - model.SamplePair - error -} { - valuesChan := make(chan struct { - model.SamplePair - error - }) - go func() { - for i := 0; i < it.len; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - valuesChan <- struct { - model.SamplePair - error - }{model.SamplePair{Timestamp: t, Value: v}, nil} - } - close(valuesChan) - }() - return valuesChan +// value implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair { + return it.lastValue } -// timestampAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { +// err implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) err() error { + return it.lastErr +} + +func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { if idx == 0 { - return it.baseT, nil + return it.baseT } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. if it.tBytes == d8 { - return it.baseΔT, nil + return it.baseΔT } - return it.baseT + it.baseΔT, nil + return it.baseT + it.baseΔT } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) @@ -551,40 +516,35 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time case d1: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])), nil + model.Time(int8(it.c[offset])) case d2: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) case d4: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) default: - return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } + return model.Earliest } -// lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1) -} - -// sampleValueAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { +func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { if idx == 0 { - return it.baseV, nil + return it.baseV } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. if it.vBytes == d8 { - return it.baseΔV, nil + return it.baseΔV } - return it.baseV + it.baseΔV, nil + return it.baseV + it.baseΔV } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) @@ -593,39 +553,35 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.Sa switch it.vBytes { case d0: return it.baseV + - model.SampleValue(idx)*it.baseΔV, nil + model.SampleValue(idx)*it.baseΔV case d1: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])), nil + model.SampleValue(int8(it.c[offset])) case d2: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } -} - -// lastSampleValue implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { - return it.sampleValueAtIndex(it.len - 1) + return 0 } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e1894032af..e2da778035 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -82,14 +82,14 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint } func chunksEqual(c1, c2 chunk) bool { - values2 := c2.newIterator().values() - for v1 := range c1.newIterator().values() { - v2 := <-values2 - if !(v1 == v2) { + it1 := c1.newIterator() + it2 := c2.newIterator() + for it1.scan() && it2.scan() { + if !(it1.value() == it2.value()) { return false } } - return true + return it1.err() == nil && it2.err() == nil } func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 97fa450ba7..5fcb39c435 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -692,11 +692,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) { if cd.isEvicted() { continue } - for sample := range cd.c.newIterator().values() { - if sample.error != nil { - t.Error(sample.error) - } - values = append(values, sample.SamplePair) + it := cd.c.newIterator() + for it.scan() { + values = append(values, it.value()) + } + if it.err() != nil { + t.Error(it.err()) } }