From 32f280a3cd478622e6bf7092064844dda72137e4 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 7 Mar 2016 19:50:13 +0100 Subject: [PATCH 1/9] Slim down the chunkIterator interface For one, remove unneeded methods. Then, instead of using a channel for all values, use a bufio.Scanner-like interface. This removes the need for creating a goroutine and avoids the (unnecessary) locking performed by channel sending and receiving. This will make it much easier to write new chunk implementations (like Gorilla-style encoding). --- storage/local/chunk.go | 64 ++++---- storage/local/delta.go | 264 +++++++++++++----------------- storage/local/doubledelta.go | 204 +++++++++-------------- storage/local/persistence_test.go | 10 +- storage/local/storage_test.go | 11 +- 5 files changed, 237 insertions(+), 316 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index d8b1ae2b8..582789fb7 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -259,18 +259,13 @@ type chunk interface { // A chunkIterator enables efficient access to the content of a chunk. It is // generally not safe to use a chunkIterator concurrently with or after chunk -// mutation. +// mutation. The error returned by any of the methods is always the last error +// encountered by the iterator, i.e. once an error has been encountered, no +// method will ever return a nil error again. In general, errors signal data +// corruption in the chunk and require quarantining. type chunkIterator interface { - // length returns the number of samples in the chunk. - length() int - // Gets the timestamp of the n-th sample in the chunk. - timestampAtIndex(int) (model.Time, error) // Gets the last timestamp in the chunk. lastTimestamp() (model.Time, error) - // Gets the sample value of the n-th sample in the chunk. - sampleValueAtIndex(int) (model.SampleValue, error) - // Gets the last sample value in the chunk. - lastSampleValue() (model.SampleValue, error) // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no // applicable value exists, ZeroSamplePair is returned. @@ -280,35 +275,48 @@ type chunkIterator interface { // Whether a given timestamp is contained between first and last value // in the chunk. contains(model.Time) (bool, error) - // values returns a channel, from which all sample values in the chunk - // can be received in order. The channel is closed after the last - // one. It is generally not safe to mutate the chunk while the channel - // is still open. If a value is returned with error!=nil, no further - // values will be returned and the channel is closed. - values() <-chan struct { - model.SamplePair - error - } + // scan, value, and err implement a bufio.Scanner-like interface. The + // scan method advances the iterator to the next value in the chunk and + // returns true if that worked. In that case, the value method will + // return the sample pair the iterator has advanced to. If the scan + // method returns false, either an error has occured or the end of the + // chunk has been reached. In the former case, the err method will + // return the error. In the latter case, the err method will return nil. + // Upon creation, the iterator is at position "minus one". After the + // first scan call, value will return the 1st value in the + // chunk. valueAtOrBeforeTime and rangeValues all modify the iterator + // position, too. They behave as if their return values were retrieved + // after a scan call, i.e. calling the value or err methods after a call + // to those methods will retrieve the same return value again (or the + // last value in the range in case of rangeValues), and subsequent scan + // calls will advance the iterator from there. + scan() bool + value() model.SamplePair + err() error } func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() - head := dst - body := []chunk{} - for v := range src.newIterator().values() { - if v.error != nil { - return nil, v.error - } - newChunks, err := head.add(v.SamplePair) - if err != nil { + var ( + head = dst + body, newChunks []chunk + err error + ) + + it := src.newIterator() + for it.scan() { + if newChunks, err = head.add(it.value()); err != nil { return nil, err } body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] } - newChunks, err := head.add(s) - if err != nil { + if it.err() != nil { + return nil, it.err() + } + + if newChunks, err = head.add(s); err != nil { return nil, err } return append(body, newChunks...), nil diff --git a/storage/local/delta.go b/storage/local/delta.go index c78702072..3dd41cf77 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -202,13 +202,15 @@ func (c deltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { return &deltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + pos: -1, + lastValue: ZeroSamplePair, } } @@ -311,176 +313,130 @@ type deltaEncodedChunkIterator struct { baseV model.SampleValue tBytes, vBytes deltaBytes isInt bool -} - -// length implements chunkIterator. -func (it *deltaEncodedChunkIterator) length() int { return it.len } - -// valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - var lastErr error - i := sort.Search(it.len, func(i int) bool { - ts, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return ts.After(t) - }) - if i == 0 || lastErr != nil { - return ZeroSamplePair, lastErr - } - ts, err := it.timestampAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - v, err := it.sampleValueAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - return model.SamplePair{Timestamp: ts, Value: v}, nil -} - -// rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - var lastErr error - - oldest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return !t.Before(in.OldestInclusive) - }) - - newest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return t.After(in.NewestInclusive) - }) - - if oldest == it.len || lastErr != nil { - return nil, lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - return nil, err - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - return nil, err - } - result = append(result, model.SamplePair{Timestamp: t, Value: v}) - } - return result, nil -} - -// contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - lastT, err := it.timestampAtIndex(it.len - 1) - if err != nil { - return false, err - } - return !t.Before(it.baseT) && !t.After(lastT), nil -} - -// values implements chunkIterator. -func (it *deltaEncodedChunkIterator) values() <-chan struct { - model.SamplePair - error -} { - valuesChan := make(chan struct { - model.SamplePair - error - }) - go func() { - for i := 0; i < it.len; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - valuesChan <- struct { - model.SamplePair - error - }{model.SamplePair{Timestamp: t, Value: v}, nil} - } - close(valuesChan) - }() - return valuesChan -} - -// timestampAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) - - switch it.tBytes { - case d1: - return it.baseT + model.Time(uint8(it.c[offset])), nil - case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil - case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil - case d8: - // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil - default: - return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) - } + pos int + lastValue model.SamplePair + lastErr error } // lastTimestamp implements chunkIterator. func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1) + return it.timestampAtIndex(it.len - 1), it.lastErr } -// sampleValueAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { +// valueAtOrBeforeTime implements chunkIterator. +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + i := sort.Search(it.len, func(i int) bool { + return it.timestampAtIndex(i).After(t) + }) + if i == 0 || it.lastErr != nil { + return ZeroSamplePair, it.lastErr + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), + } + return it.lastValue, it.lastErr +} + +// rangeValues implements chunkIterator. +func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + oldest := sort.Search(it.len, func(i int) bool { + return !it.timestampAtIndex(i).Before(in.OldestInclusive) + }) + newest := sort.Search(it.len, func(i int) bool { + return it.timestampAtIndex(i).After(in.NewestInclusive) + }) + if oldest == it.len || it.lastErr != nil { + return nil, it.lastErr + } + + result := make([]model.SamplePair, 0, newest-oldest) + for i := oldest; i < newest; i++ { + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), + } + result = append(result, it.lastValue) + } + return result, it.lastErr +} + +// contains implements chunkIterator. +func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr +} + +// scan implements chunkIterator. +func (it *deltaEncodedChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false + } + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(it.pos), + Value: it.sampleValueAtIndex(it.pos), + } + return it.lastErr == nil +} + +// value implements chunkIterator. +func (it *deltaEncodedChunkIterator) value() model.SamplePair { + return it.lastValue +} + +// err implements chunkIterator. +func (it *deltaEncodedChunkIterator) err() error { + return it.lastErr +} + +func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { + offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + + switch it.tBytes { + case d1: + return it.baseT + model.Time(uint8(it.c[offset])) + case d2: + return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + case d4: + return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + case d8: + // Take absolute value for d8. + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + default: + it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + } + return model.Earliest +} + +func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) if it.isInt { switch it.vBytes { case d0: - return it.baseV, nil + return it.baseV case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])), nil + return it.baseV + model.SampleValue(int8(it.c[offset])) case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } -} - -// lastSampleValue implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { - return it.sampleValueAtIndex(it.len - 1) + return 0 } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 257c84544..84c8d38fc 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -208,15 +208,17 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { return &doubleDeltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseΔT: c.baseTimeDelta(), - baseV: c.baseValue(), - baseΔV: c.baseValueDelta(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + pos: -1, + lastValue: ZeroSamplePair, } } @@ -417,132 +419,95 @@ type doubleDeltaEncodedChunkIterator struct { baseV, baseΔV model.SampleValue tBytes, vBytes deltaBytes isInt bool + pos int + lastValue model.SamplePair + lastErr error } -// length implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } +// lastTimestamp implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { + return it.timestampAtIndex(it.len - 1), it.lastErr +} // valueAtOrBeforeTime implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - var lastErr error i := sort.Search(it.len, func(i int) bool { - ts, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return ts.After(t) + return it.timestampAtIndex(i).After(t) }) - if i == 0 || lastErr != nil { - return ZeroSamplePair, lastErr + if i == 0 || it.lastErr != nil { + return ZeroSamplePair, it.lastErr } - ts, err := it.timestampAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), } - v, err := it.sampleValueAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - return model.SamplePair{Timestamp: ts, Value: v}, nil + return it.lastValue, it.lastErr } // rangeValues implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - var lastErr error - oldest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return !t.Before(in.OldestInclusive) + return !it.timestampAtIndex(i).Before(in.OldestInclusive) }) - newest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return t.After(in.NewestInclusive) + return it.timestampAtIndex(i).After(in.NewestInclusive) }) - - if oldest == it.len || lastErr != nil { - return nil, lastErr + if oldest == it.len || it.lastErr != nil { + return nil, it.lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - return nil, err + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), } - v, err := it.sampleValueAtIndex(i) - if err != nil { - return nil, err - } - result = append(result, model.SamplePair{Timestamp: t, Value: v}) + result = append(result, it.lastValue) } - return result, nil + return result, it.lastErr } // contains implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - lastT, err := it.timestampAtIndex(it.len - 1) - if err != nil { - return false, err + return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr +} + +// scan implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false } - return !t.Before(it.baseT) && !t.After(lastT), nil + it.lastValue = model.SamplePair{ + Timestamp: it.timestampAtIndex(it.pos), + Value: it.sampleValueAtIndex(it.pos), + } + return it.lastErr == nil } -// values implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct { - model.SamplePair - error -} { - valuesChan := make(chan struct { - model.SamplePair - error - }) - go func() { - for i := 0; i < it.len; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - valuesChan <- struct { - model.SamplePair - error - }{model.SamplePair{Timestamp: t, Value: v}, nil} - } - close(valuesChan) - }() - return valuesChan +// value implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair { + return it.lastValue } -// timestampAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { +// err implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) err() error { + return it.lastErr +} + +func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { if idx == 0 { - return it.baseT, nil + return it.baseT } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. if it.tBytes == d8 { - return it.baseΔT, nil + return it.baseΔT } - return it.baseT + it.baseΔT, nil + return it.baseT + it.baseΔT } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) @@ -551,40 +516,35 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time case d1: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])), nil + model.Time(int8(it.c[offset])) case d2: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) case d4: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) default: - return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } + return model.Earliest } -// lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1) -} - -// sampleValueAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { +func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { if idx == 0 { - return it.baseV, nil + return it.baseV } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. if it.vBytes == d8 { - return it.baseΔV, nil + return it.baseΔV } - return it.baseV + it.baseΔV, nil + return it.baseV + it.baseΔV } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) @@ -593,39 +553,35 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.Sa switch it.vBytes { case d0: return it.baseV + - model.SampleValue(idx)*it.baseΔV, nil + model.SampleValue(idx)*it.baseΔV case d1: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])), nil + model.SampleValue(int8(it.c[offset])) case d2: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } -} - -// lastSampleValue implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { - return it.sampleValueAtIndex(it.len - 1) + return 0 } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e1894032a..e2da77803 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -82,14 +82,14 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint } func chunksEqual(c1, c2 chunk) bool { - values2 := c2.newIterator().values() - for v1 := range c1.newIterator().values() { - v2 := <-values2 - if !(v1 == v2) { + it1 := c1.newIterator() + it2 := c2.newIterator() + for it1.scan() && it2.scan() { + if !(it1.value() == it2.value()) { return false } } - return true + return it1.err() == nil && it2.err() == nil } func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 97fa450ba..5fcb39c43 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -692,11 +692,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) { if cd.isEvicted() { continue } - for sample := range cd.c.newIterator().values() { - if sample.error != nil { - t.Error(sample.error) - } - values = append(values, sample.SamplePair) + it := cd.c.newIterator() + for it.scan() { + values = append(values, it.value()) + } + if it.err() != nil { + t.Error(it.err()) } } From c13b1ecfe9144b8bcc62bfc7fe388c9da1ad30ab Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 7 Mar 2016 20:23:14 +0100 Subject: [PATCH 2/9] Make chunk iterators more DRY This finally extracts all the common code of the two chunk iterators into one. Any future chunk encodings with fast access by index can use the same iterator by simply providing an indexAccessor. Other future chunk encodings without fast index access (like Gorilla-style) can still implement the chunkIterator interface as usual. --- storage/local/chunk.go | 100 ++++++++++++++++++ storage/local/delta.go | 144 ++++++-------------------- storage/local/doubledelta.go | 194 +++++++++++------------------------ 3 files changed, 191 insertions(+), 247 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 582789fb7..b843fa3a7 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -17,6 +17,7 @@ import ( "container/list" "fmt" "io" + "sort" "sync" "sync/atomic" @@ -342,3 +343,102 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } + +// indexAccessor allows accesses to samples by index. +type indexAccessor interface { + timestampAtIndex(int) model.Time + sampleValueAtIndex(int) model.SampleValue + err() error +} + +// indexAccessingChunkIterator is a chunk iterator for chunks for which an +// indexAccessor implementation exists. +type indexAccessingChunkIterator struct { + len int + pos int + lastValue model.SamplePair + acc indexAccessor +} + +func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator { + return &indexAccessingChunkIterator{ + len: len, + pos: -1, + lastValue: ZeroSamplePair, + acc: acc, + } +} + +// lastTimestamp implements chunkIterator. +func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { + return it.acc.timestampAtIndex(it.len - 1), it.acc.err() +} + +// valueAtOrBeforeTime implements chunkIterator. +func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + i := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(t) + }) + if i == 0 || it.acc.err() != nil { + return ZeroSamplePair, it.acc.err() + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i - 1), + Value: it.acc.sampleValueAtIndex(i - 1), + } + return it.lastValue, it.acc.err() +} + +// rangeValues implements chunkIterator. +func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + oldest := sort.Search(it.len, func(i int) bool { + return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive) + }) + newest := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(in.NewestInclusive) + }) + if oldest == it.len || it.acc.err() != nil { + return nil, it.acc.err() + } + + result := make([]model.SamplePair, 0, newest-oldest) + for i := oldest; i < newest; i++ { + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i), + Value: it.acc.sampleValueAtIndex(i), + } + result = append(result, it.lastValue) + } + return result, it.acc.err() +} + +// contains implements chunkIterator. +func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { + return !t.Before(it.acc.timestampAtIndex(0)) && + !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() +} + +// scan implements chunkIterator. +func (it *indexAccessingChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false + } + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(it.pos), + Value: it.acc.sampleValueAtIndex(it.pos), + } + return it.acc.err() == nil +} + +// value implements chunkIterator. +func (it *indexAccessingChunkIterator) value() model.SamplePair { + return it.lastValue +} + +// err implements chunkIterator. +func (it *indexAccessingChunkIterator) err() error { + return it.acc.err() +} diff --git a/storage/local/delta.go b/storage/local/delta.go index 3dd41cf77..a74e0806a 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 21-byte header of a delta-encoded chunk looks like: @@ -201,17 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { - return &deltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - pos: -1, - lastValue: ZeroSamplePair, - } + return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ + c: *c, + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + }) } // marshal implements chunk. @@ -305,137 +299,65 @@ func (c deltaEncodedChunk) len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } -// deltaEncodedChunkIterator implements chunkIterator. -type deltaEncodedChunkIterator struct { +// deltaEncodedIndexAccessor implements indexAccessor. +type deltaEncodedIndexAccessor struct { c deltaEncodedChunk - len int baseT model.Time baseV model.SampleValue tBytes, vBytes deltaBytes isInt bool - pos int - lastValue model.SamplePair lastErr error } -// lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1), it.lastErr +func (acc *deltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) - }) - if i == 0 || it.lastErr != nil { - return ZeroSamplePair, it.lastErr - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.lastErr -} +func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) -// rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.lastErr != nil { - return nil, it.lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.lastErr -} - -// contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr -} - -// scan implements chunkIterator. -func (it *deltaEncodedChunkIterator) scan() bool { - it.pos++ - if it.pos >= it.len { - return false - } - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(it.pos), - Value: it.sampleValueAtIndex(it.pos), - } - return it.lastErr == nil -} - -// value implements chunkIterator. -func (it *deltaEncodedChunkIterator) value() model.SamplePair { - return it.lastValue -} - -// err implements chunkIterator. -func (it *deltaEncodedChunkIterator) err() error { - return it.lastErr -} - -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) - - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])) + return acc.baseT + model.Time(uint8(acc.c[offset])) case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:])) case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:])) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) } return model.Earliest } -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) +func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + return acc.baseV case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])) + return acc.baseV + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) } } return 0 diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 84c8d38fc..a53d41f6b 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 37-byte header of a delta-encoded chunk looks like: @@ -207,19 +204,16 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { - return &doubleDeltaEncodedChunkIterator{ - c: *c, - len: c.len(), - baseT: c.baseTime(), - baseΔT: c.baseTimeDelta(), - baseV: c.baseValue(), - baseΔV: c.baseValueDelta(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - pos: -1, - lastValue: ZeroSamplePair, - } + return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ + c: *c, + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), + }) } // marshal implements chunk. @@ -411,176 +405,104 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt return []chunk{&c}, nil } -// doubleDeltaEncodedChunkIterator implements chunkIterator. -type doubleDeltaEncodedChunkIterator struct { +// doubleDeltaEncodedIndexAccessor implements indexAccessor. +type doubleDeltaEncodedIndexAccessor struct { c doubleDeltaEncodedChunk - len int baseT, baseΔT model.Time baseV, baseΔV model.SampleValue tBytes, vBytes deltaBytes isInt bool - pos int - lastValue model.SamplePair lastErr error } -// lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1), it.lastErr +func (acc *doubleDeltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) - }) - if i == 0 || it.lastErr != nil { - return ZeroSamplePair, it.lastErr - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.lastErr -} - -// rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.lastErr != nil { - return nil, it.lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.lastErr -} - -// contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr -} - -// scan implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) scan() bool { - it.pos++ - if it.pos >= it.len { - return false - } - it.lastValue = model.SamplePair{ - Timestamp: it.timestampAtIndex(it.pos), - Value: it.sampleValueAtIndex(it.pos), - } - return it.lastErr == nil -} - -// value implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair { - return it.lastValue -} - -// err implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) err() error { - return it.lastErr -} - -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { if idx == 0 { - return it.baseT + return acc.baseT } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. - if it.tBytes == d8 { - return it.baseΔT + if acc.tBytes == d8 { + return acc.baseΔT } - return it.baseT + it.baseΔT + return acc.baseT + acc.baseΔT } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int8(acc.c[offset])) case d2: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) } return model.Earliest } -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { if idx == 0 { - return it.baseV + return acc.baseV } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. - if it.vBytes == d8 { - return it.baseΔV + if acc.vBytes == d8 { + return acc.baseΔV } - return it.baseV + it.baseΔV + return acc.baseV + acc.baseΔV } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV case d1: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) } } return 0 From 836f1db04c9718bc8164e39d4dc98876c143524a Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 00:09:42 +0100 Subject: [PATCH 3/9] Improve MetricsForLabelMatchers WIP: This needs more tests. It now gets a from and through value, which it may opportunistically use to optimize the retrieval. With possible future range indices, this could be used in a very efficient way. This change merely applies some easy checks, which should nevertheless solve the use case of heavy rule evaluations on servers with a lot of series churn. Idea is the following: - Only archive series that are at least as old as the headChunkTimeout (which was already extremely unlikely to happen). - Then maintain a high watermark for the last archival, i.e. no archived series has a sample more recent than that watermark. - Any query that doesn't reach to a time before that watermark doesn't have to touch the archive index at all. (A production server at Soundcloud with the aforementioned series churn and heavy rule evaluations spends 50% of its CPU time in archive index lookups. Since rule evaluations usually only touch very recent values, most of those lookup should disappear with this change.) - Federation with a very broad label matcher will profit from this, too. As a byproduct, the un-needed MetricForFingerprint method was removed from the Storage interface. --- promql/analyzer.go | 10 ++- storage/local/interface.go | 39 +++++++---- storage/local/persistence.go | 3 + storage/local/storage.go | 126 ++++++++++++++++++++++++---------- storage/local/storage_test.go | 10 ++- web/api/v1/api.go | 10 ++- web/federate.go | 22 +++--- 7 files changed, 151 insertions(+), 69 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index bad5fbd92..7243d31db 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -79,7 +79,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { Inspect(a.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) @@ -95,7 +98,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { } } case *MatrixSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) diff --git a/storage/local/interface.go b/storage/local/interface.go index d9dbc4f21..26bc5325a 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -40,20 +40,22 @@ type Storage interface { // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader - // MetricsForLabelMatchers returns the metrics from storage that satisfy the given - // label matchers. At least one label matcher must be specified that does not - // match the empty string. - MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric - // LastSamplePairForFingerprint returns the last sample pair that has - // been ingested for the provided fingerprint. If this instance of the + // MetricsForLabelMatchers returns the metrics from storage that satisfy + // the given label matchers. At least one label matcher must be + // specified that does not match the empty string. The times from and + // through are hints for the storage to optimize the search. The storage + // MAY exclude metrics that have no samples in the specified interval + // from the returned map. In doubt, specify model.Earliest for from and + // model.Latest for through. + MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric + // LastSampleForFingerprint returns the last sample that has been + // ingested for the provided fingerprint. If this instance of the // Storage has never ingested a sample for the provided fingerprint (or // the last ingestion is so long ago that the series has been archived), - // ZeroSamplePair is returned. - LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair + // ZeroSample is returned. + LastSampleForFingerprint(model.Fingerprint) model.Sample // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues - // Get the metric associated with the provided fingerprint. - MetricForFingerprint(model.Fingerprint) metric.Metric // Drop all time series associated with the given fingerprints. DropMetricsForFingerprints(...model.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the @@ -89,7 +91,7 @@ type SeriesIterator interface { type Preloader interface { PreloadRange( fp model.Fingerprint, - from model.Time, through model.Time, + from, through model.Time, ) SeriesIterator PreloadInstant( fp model.Fingerprint, @@ -100,8 +102,15 @@ type Preloader interface { } // ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local -// package to signal a non-existing sample. It is a SamplePair with timestamp -// model.Earliest and value 0.0. Note that the natural zero value of SamplePair -// has a timestamp of 0, which is possible to appear in a real SamplePair and -// thus not suitable to signal a non-existing SamplePair. +// package to signal a non-existing sample pair. It is a SamplePair with +// timestamp model.Earliest and value 0.0. Note that the natural zero value of +// SamplePair has a timestamp of 0, which is possible to appear in a real +// SamplePair and thus not suitable to signal a non-existing SamplePair. var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} + +// ZeroSample is the pseudo zero-value of model.Sample used by the local package +// to signal a non-existing sample. It is a Sample with timestamp +// model.Earliest, value 0.0, and metric nil. Note that the natural zero value +// of Sample has a timestamp of 0, which is possible to appear in a real +// Sample and thus not suitable to signal a non-existing Sample. +var ZeroSample = model.Sample{Timestamp: model.Earliest} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 124a41fb6..a89307bc3 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1068,6 +1068,9 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model // method is goroutine-safe. func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) + if err != nil { + p.setDirty(true, err) + } return metric, err } diff --git a/storage/local/storage.go b/storage/local/storage.go index 201c2ba8b..e6527085b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -129,12 +129,13 @@ const ( type syncStrategy func() bool type memorySeriesStorage struct { - // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. - rushed bool // Whether the storage is in rushed mode. - rushedMtx sync.Mutex // Protects entering and exiting rushed mode. - throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). + // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. + archiveHighWatermark model.Time // No archived series has samples after this time. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + rushedMtx sync.Mutex // Protects entering and exiting rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -201,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, + archiveHighWatermark: model.Now().Add(-headChunkTimeout), maxChunksToPersist: o.MaxChunksToPersist, @@ -368,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() { } // LastSampleForFingerprint implements Storage. -func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair { +func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series, ok := s.fpToSeries.get(fp) if !ok { - return ZeroSamplePair + return ZeroSample + } + sp := series.lastSamplePair() + return model.Sample{ + Metric: series.metric, + Value: sp.Value, + Timestamp: sp.Timestamp, } - return series.lastSamplePair() } // boundedIterator wraps a SeriesIterator and does not allow fetching @@ -439,7 +446,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair } // MetricsForLabelMatchers implements Storage. -func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric { +func (s *memorySeriesStorage) MetricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) map[model.Fingerprint]metric.Metric { var ( equals []model.LabelPair filters []*metric.LabelMatcher @@ -491,9 +501,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM filters = remaining } - result := make(map[model.Fingerprint]metric.Metric, len(resFPs)) + result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - result[fp] = s.MetricForFingerprint(fp) + if metric, ok := s.metricForFingerprint(fp, from, through); ok { + result[fp] = metric + } } for _, matcher := range filters { for fp, met := range result { @@ -505,6 +517,58 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM return result } +// metricForFingerprint returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through'. +func (s *memorySeriesStorage) metricForFingerprint( + fp model.Fingerprint, + from, through model.Time, +) (metric.Metric, bool) { + // Lock FP so that no (un-)archiving will happen during lookup. + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + + series, ok := s.fpToSeries.get(fp) + if ok { + if series.lastTime.Before(from) || series.savedFirstTime.After(through) { + return metric.Metric{}, false + } + // Wrap the returned metric in a copy-on-write (COW) metric here because + // the caller might mutate it. + return metric.Metric{ + Metric: series.metric, + }, true + } + // From here on, we are only concerned with archived metrics. + // If the high watermark of archived series is before 'from', we are done. + if watermark < from { + return metric.Metric{}, false + } + if from.After(model.Earliest) || through.Before(model.Latest) { + // The range lookup is relatively cheap, so let's do it first. + ok, first, last, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) + return metric.Metric{}, false + } + if !ok || first.After(through) || last.Before(from) { + return metric.Metric{}, false + } + } + + met, err := s.persistence.archivedMetric(fp) + if err != nil { + log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + return metric.Metric{}, false + } + + return metric.Metric{ + Metric: met, + Copied: false, + }, true +} + // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { lvs, err := s.persistence.labelValuesForLabelName(labelName) @@ -514,30 +578,6 @@ func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) return lvs } -// MetricForFingerprint implements Storage. -func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.Metric { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series, ok := s.fpToSeries.get(fp) - if ok { - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, - } - } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) - } - - return metric.Metric{ - Metric: met, - Copied: false, - } -} - // DropMetric implements Storage. func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { @@ -1077,8 +1117,9 @@ func (s *memorySeriesStorage) maintainMemorySeries( } } - // Archive if all chunks are evicted. - if iOldestNotEvicted == -1 { + // Archive if all chunks are evicted. Also make sure the last sample has + // an age of at least headChunkTimeout (which is very likely anyway). + if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() if err := s.persistence.archiveMetric( @@ -1088,6 +1129,15 @@ func (s *memorySeriesStorage) maintainMemorySeries( return } s.seriesOps.WithLabelValues(archive).Inc() + oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) + if oldWatermark < int64(series.lastTime) { + if !atomic.CompareAndSwapInt64( + (*int64)(&s.archiveHighWatermark), + oldWatermark, int64(series.lastTime), + ) { + panic("s.archiveHighWatermark modified outside of maintainMemorySeries") + } + } return } // If we are here, the series is not archived, so check for chunkDesc diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 5fcb39c43..ab5c2a9ed 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -178,7 +178,10 @@ func TestMatches(t *testing.T) { } for _, mt := range matcherTests { - res := storage.MetricsForLabelMatchers(mt.matchers...) + res := storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt.matchers..., + ) if len(mt.expected) != len(res) { t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) } @@ -362,7 +365,10 @@ func BenchmarkLabelMatching(b *testing.B) { for i := 0; i < b.N; i++ { benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} for _, mt := range matcherTests { - benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...) + benchLabelMatchingRes = s.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt..., + ) } } // Stop timer to not count the storage closing. diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 823665f3b..6858938fb 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -226,7 +226,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp, met := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { res[fp] = met } } @@ -250,7 +253,10 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { fps[fp] = struct{}{} } } diff --git a/web/federate.go b/web/federate.go index d9baf676b..26f4710eb 100644 --- a/web/federate.go +++ b/web/federate.go @@ -19,7 +19,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" @@ -33,7 +32,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - metrics := map[model.Fingerprint]metric.Metric{} + fps := map[model.Fingerprint]struct{}{} for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) @@ -41,8 +40,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) { - metrics[fp] = met + for fp := range h.storage.MetricsForLabelMatchers( + model.Now().Add(-promql.StalenessDelta), model.Latest, + matchers..., + ) { + fps[fp] = struct{}{} } } @@ -62,19 +64,19 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), } - for fp, met := range metrics { + for fp := range fps { globalUsed := map[model.LabelName]struct{}{} - sp := h.storage.LastSamplePairForFingerprint(fp) + s := h.storage.LastSampleForFingerprint(fp) // Discard if sample does not exist or lays before the staleness interval. - if sp.Timestamp.Before(minTimestamp) { + if s.Timestamp.Before(minTimestamp) { continue } // Reset label slice. protMetric.Label = protMetric.Label[:0] - for ln, lv := range met.Metric { + for ln, lv := range s.Metric { if ln == model.MetricNameLabel { protMetricFam.Name = proto.String(string(lv)) continue @@ -98,8 +100,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } } - protMetric.TimestampMs = proto.Int64(int64(sp.Timestamp)) - protMetric.Untyped.Value = proto.Float64(float64(sp.Value)) + protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) + protMetric.Untyped.Value = proto.Float64(float64(s.Value)) if err := enc.Encode(protMetricFam); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) From 161eada3ad5bf65f677778656d92ef418cd2abd1 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 16:20:39 +0100 Subject: [PATCH 4/9] Make chunkIterator even leaner. --- storage/local/chunk.go | 132 ++++++++++++++++++++-------------------- storage/local/series.go | 24 ++++---- 2 files changed, 80 insertions(+), 76 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index b843fa3a7..2e9fcf4cf 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -260,42 +260,52 @@ type chunk interface { // A chunkIterator enables efficient access to the content of a chunk. It is // generally not safe to use a chunkIterator concurrently with or after chunk -// mutation. The error returned by any of the methods is always the last error -// encountered by the iterator, i.e. once an error has been encountered, no -// method will ever return a nil error again. In general, errors signal data -// corruption in the chunk and require quarantining. +// mutation. type chunkIterator interface { // Gets the last timestamp in the chunk. lastTimestamp() (model.Time, error) - // Gets the value that is closest before the given time. In case a value - // exists at precisely the given time, that value is returned. If no - // applicable value exists, ZeroSamplePair is returned. - valueAtOrBeforeTime(model.Time) (model.SamplePair, error) - // Gets all values contained within a given interval. - rangeValues(metric.Interval) ([]model.SamplePair, error) // Whether a given timestamp is contained between first and last value // in the chunk. contains(model.Time) (bool, error) - // scan, value, and err implement a bufio.Scanner-like interface. The - // scan method advances the iterator to the next value in the chunk and - // returns true if that worked. In that case, the value method will - // return the sample pair the iterator has advanced to. If the scan - // method returns false, either an error has occured or the end of the - // chunk has been reached. In the former case, the err method will - // return the error. In the latter case, the err method will return nil. - // Upon creation, the iterator is at position "minus one". After the - // first scan call, value will return the 1st value in the - // chunk. valueAtOrBeforeTime and rangeValues all modify the iterator - // position, too. They behave as if their return values were retrieved - // after a scan call, i.e. calling the value or err methods after a call - // to those methods will retrieve the same return value again (or the - // last value in the range in case of rangeValues), and subsequent scan - // calls will advance the iterator from there. + // Scans the next value in the chunk. Directly after the iterator has + // been created, the next value is the first value in the + // chunk. Otherwise, it is the value following the last value scanned or + // found (by one of the find... methods). Returns false if either the + // end of the chunk is reached or an error has occurred. scan() bool + // Finds the most recent value at or before the provided time. Returns + // false if either the chunk contains no value at or before the provided + // time, or an error has occurred. + findAtOrBefore(model.Time) bool + // Finds the oldest value at or after the provided time. Returns false + // if either the chunk contains no value at or after the provided time, + // or an error has occurred. + findAtOrAfter(model.Time) bool + // Returns the last value scanned (by the scan method) or found (by one + // of the find... methods). It returns ZeroSamplePair before any of + // those methods were called. value() model.SamplePair + // Returns the last error encountered. In general, an error signal data + // corruption in the chunk and requires quarantining. err() error } +// rangeValues is a utility function that retrieves all values within the given +// range from a chunkIterator. +func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) { + result := []model.SamplePair{} + if !it.findAtOrAfter(in.OldestInclusive) { + return result, it.err() + } + for !it.value().Timestamp.After(in.NewestInclusive) { + result = append(result, it.value()) + if !it.scan() { + break + } + } + return result, it.err() +} + func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() @@ -374,46 +384,6 @@ func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { return it.acc.timestampAtIndex(it.len - 1), it.acc.err() } -// valueAtOrBeforeTime implements chunkIterator. -func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.acc.timestampAtIndex(i).After(t) - }) - if i == 0 || it.acc.err() != nil { - return ZeroSamplePair, it.acc.err() - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.acc.timestampAtIndex(i - 1), - Value: it.acc.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.acc.err() -} - -// rangeValues implements chunkIterator. -func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.acc.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.acc.err() != nil { - return nil, it.acc.err() - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.acc.timestampAtIndex(i), - Value: it.acc.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.acc.err() -} - // contains implements chunkIterator. func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { return !t.Before(it.acc.timestampAtIndex(0)) && @@ -433,6 +403,38 @@ func (it *indexAccessingChunkIterator) scan() bool { return it.acc.err() == nil } +// findAtOrBefore implements chunkIterator. +func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool { + i := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(t) + }) + if i == 0 || it.acc.err() != nil { + return false + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i - 1), + Value: it.acc.sampleValueAtIndex(i - 1), + } + return true +} + +// findAtOrAfter implements chunkIterator. +func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool { + i := sort.Search(it.len, func(i int) bool { + return !it.acc.timestampAtIndex(i).Before(t) + }) + if i == it.len || it.acc.err() != nil { + return false + } + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i), + Value: it.acc.sampleValueAtIndex(i), + } + return true +} + // value implements chunkIterator. func (it *indexAccessingChunkIterator) value() model.SamplePair { return it.lastValue diff --git a/storage/local/series.go b/storage/local/series.go index f76d5ee27..dff9d1ecb 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -547,12 +547,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } if containsT { - value, err := it.chunkIt.valueAtOrBeforeTime(t) - if err != nil { - it.quarantine(err) - return ZeroSamplePair + if it.chunkIt.findAtOrBefore(t) { + return it.chunkIt.value() } - return value + if it.chunkIt.err() != nil { + it.quarantine(it.chunkIt.err()) + } + return ZeroSamplePair } } @@ -570,12 +571,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - value, err := it.chunkIt.valueAtOrBeforeTime(t) - if err != nil { - it.quarantine(err) - return ZeroSamplePair + if it.chunkIt.findAtOrBefore(t) { + return it.chunkIt.value() } - return value + if it.chunkIt.err() != nil { + it.quarantine(it.chunkIt.err()) + } + return ZeroSamplePair } // RangeValues implements SeriesIterator. @@ -602,7 +604,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.firstTime().After(in.NewestInclusive) { break } - chValues, err := it.chunkIterator(i + j).rangeValues(in) + chValues, err := rangeValues(it.chunkIterator(i+j), in) if err != nil { it.quarantine(err) return nil From 47e3c90f9b417938c9caa72e1932f743800d47da Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 18:56:30 +0100 Subject: [PATCH 5/9] Clean up error propagation Only return an error where callers are doing something with it except simply logging and ignoring. All the errors touched in this commit flag the storage as dirty anyway, and that fact is logged anyway. So most of what is being removed here is just log spam. As discussed earlier, the class of errors that flags the storage as dirty signals fundamental corruption, no even bubbling up a one-time warning to the user (e.g. about incomplete results) isn't helping much because _anything_ happening in the storage has to be doubted from that point on (and in fact retroactively into the past, too). Flagging the storage dirty, and alerting on it (plus marking the state in the web UI) is the only way I can see right now. As a byproduct, I cleaned up the setDirty method a bit and improved the logged errors. --- storage/local/crashrecovery.go | 8 +++- storage/local/persistence.go | 61 ++++++++++++++----------------- storage/local/persistence_test.go | 51 +++++++------------------- storage/local/storage.go | 58 ++++++----------------------- storage/local/storage_test.go | 56 +++++----------------------- 5 files changed, 71 insertions(+), 163 deletions(-) diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index f51e54e7b..17626c07c 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false, nil) + p.dirtyMtx.Lock() + // Only declare storage clean if it didn't become dirty during crash recovery. + if !p.becameDirty { + p.dirty = false + } + p.dirtyMtx.Unlock() + log.Warn("Crash recovery complete.") return nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a89307bc3..68bc612f0 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool { return p.dirty } -// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was -// set to true with this method, it cannot be set to false again. (If we became -// dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) The provided error will be -// logged as a reason if dirty is true. -func (p *persistence) setDirty(dirty bool, err error) { - if dirty { - p.dirtyCounter.Inc() - } +// setDirty flags the storage as dirty in a goroutine-safe way. The provided +// error will be logged as a reason the first time the storage is flagged as dirty. +func (p *persistence) setDirty(err error) { + p.dirtyCounter.Inc() p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { return } - p.dirty = dirty - if dirty { - p.becameDirty = true - log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") - } + p.dirty = true + p.becameDirty = true + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } // fingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) { +func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err)) + return nil } - return fps, nil + return fps } // labelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { +func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) + return nil } - return lvs, nil + return lvs } // persistChunks persists a number of consecutive chunks of a series. It is the @@ -1008,29 +1003,28 @@ func (p *persistence) waitForIndexing() { // the metric. The caller must have locked the fingerprint. func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, -) error { +) { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err)) + return } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err)) } - return nil } // hasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in the // corresponding series is. This method is goroutine-safe. func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( - hasMetric bool, firstTime, lastTime model.Time, err error, + hasMetric bool, firstTime, lastTime model.Time, ) { - firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err)) + hasMetric = false } - return + return hasMetric, firstTime, lastTime } // updateArchivedTimeRange updates an archived time range. The caller must make @@ -1069,9 +1063,10 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err)) + return nil, err } - return metric, err + return metric, nil } // purgeArchivedMetric deletes an archived fingerprint and its corresponding @@ -1081,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) + p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err)) } }() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e2da77803..692f494d5 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p.indexMetric(2, m2) p.waitForIndexing() - outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want := model.Fingerprints{1} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(1); !archived { t.Error("want FP 1 archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } - if err != p.purgeArchivedMetric(1) { + if err := p.purgeArchivedMetric(1); err != nil { t.Fatal(err) } - if err != p.purgeArchivedMetric(3) { + if err := p.purgeArchivedMetric(3); err != nil { // Purging something that has not beet archived is not an error. t.Fatal(err) } p.waitForIndexing() - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want = nil if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { + if archived, _, _ := p.hasArchivedMetric(1); archived { t.Error("want FP 1 not archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } } @@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { for i, b := range batches { for fp, m := range b.fpToMetric { p.indexMetric(fp, m) - if err := p.archiveMetric(fp, m, 1, 2); err != nil { - t.Fatal(err) - } + p.archiveMetric(fp, m, 1, 2) indexedFpsToMetrics[fp] = m } verifyIndexedState(i, t, b, indexedFpsToMetrics, p) @@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } // Check that archived metrics are in membership index. - has, first, last, err := p.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + has, first, last := p.hasArchivedMetric(fp) if !has { t.Errorf("%d. fingerprint %v not found", i, fp) } @@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.labelValuesForLabelName(ln) - if err != nil { - t.Fatal(err) - } + outLvs := p.labelValuesForLabelName(ln) outSet := codable.LabelValueSet{} for _, lv := range outLvs { @@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFPs, err := p.fingerprintsForLabelPair(lp) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(lp) outSet := codable.FingerprintSet{} for _, fp := range outFPs { diff --git a/storage/local/storage.go b/storage/local/storage.go index bc380bd6b..a24387831 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -425,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair var result map[model.Fingerprint]struct{} for _, pair := range pairs { intersection := map[model.Fingerprint]struct{}{} - fps, err := s.persistence.fingerprintsForLabelPair(pair) - if err != nil { - log.Error("Error getting fingerprints for label pair: ", err) - } + fps := s.persistence.fingerprintsForLabelPair(pair) if len(fps) == 0 { return nil } @@ -547,19 +544,14 @@ func (s *memorySeriesStorage) metricForFingerprint( } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first. - ok, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) - return metric.Metric{}, false - } + ok, first, last := s.persistence.hasArchivedMetric(fp) if !ok || first.After(through) || last.Before(from) { return metric.Metric{}, false } } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. + if met == nil { return metric.Metric{}, false } @@ -571,11 +563,7 @@ func (s *memorySeriesStorage) metricForFingerprint( // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { - lvs, err := s.persistence.labelValuesForLabelName(labelName) - if err != nil { - log.Errorf("Error getting label values for label name %q: %v", labelName, err) - } - return lvs + return s.persistence.labelValuesForLabelName(labelName) } // DropMetric implements Storage. @@ -603,7 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.fpLocker.Unlock(fp) }() // Func wrapper because fp might change below. if err != nil { - s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) return err } if fp != rawFP { @@ -745,11 +733,7 @@ func (s *memorySeriesStorage) getSeriesForRange( if ok { return series } - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil - } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() return nil @@ -759,7 +743,7 @@ func (s *memorySeriesStorage) getSeriesForRange( } metric, err := s.persistence.archivedMetric(fp) if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + // Error already logged, storage declared dirty by archivedMetric. return nil } series, err = s.getOrCreateSeries(fp, metric) @@ -1152,12 +1136,7 @@ func (s *memorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.lastTime, - ); err != nil { - log.Errorf("Error archiving metric %v: %v", series.metric, err) - return - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) if oldWatermark < int64(series.lastTime) { @@ -1278,11 +1257,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Error("Error looking up archived time range: ", err) - return - } + has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp) if !has || !firstTime.Before(beforeTime) { // Oldest sample not old enough, or metric purged or unarchived in the meantime. return @@ -1295,10 +1270,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor log.Error("Error dropping persisted chunks: ", err) } if allDropped { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) - return - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do. s.seriesOps.WithLabelValues(archivePurge).Inc() return } @@ -1487,13 +1459,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, s.incNumChunksToPersist(-numChunksNotYetPersisted) } else { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log. - With("fingerprint", fp). - With("metric", m). - With("error", err). - Error("Error purging metric from archive.") - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do. } if m != nil { // If we know a metric now, unindex it in any case. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 01f30c28b..291b32918 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -471,11 +471,7 @@ func TestDropMetrics(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) @@ -582,11 +578,7 @@ func TestQuarantineMetric(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) // Corrupt the series file for m3. @@ -1144,36 +1136,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } // Drop ~half of the chunks of an archived series. s.maintainArchivedSeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("archived series purged although only half of the chunks dropped") } // Drop everything. s.maintainArchivedSeries(fp, 100000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived series not dropped") } @@ -1199,16 +1177,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } @@ -1220,10 +1190,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !ok { t.Fatal("could not find series") } - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived") } @@ -1231,10 +1198,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("series purged completely") } From 9445c7053d2203ea8ff1d37dfb33d80331c4d709 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 20:27:50 +0100 Subject: [PATCH 6/9] Add tests for range-limited label matching While doing so, improve getSeriesForRange. --- storage/local/storage.go | 24 ++++++++---- storage/local/storage_test.go | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index a24387831..734b97ad3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -520,15 +520,12 @@ func (s *memorySeriesStorage) metricForFingerprint( fp model.Fingerprint, from, through model.Time, ) (metric.Metric, bool) { - // Lock FP so that no (un-)archiving will happen during lookup. s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) - series, ok := s.fpToSeries.get(fp) if ok { - if series.lastTime.Before(from) || series.savedFirstTime.After(through) { + if series.lastTime.Before(from) || series.firstTime().After(through) { return metric.Metric{}, false } // Wrap the returned metric in a copy-on-write (COW) metric here because @@ -539,13 +536,15 @@ func (s *memorySeriesStorage) metricForFingerprint( } // From here on, we are only concerned with archived metrics. // If the high watermark of archived series is before 'from', we are done. + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) if watermark < from { return metric.Metric{}, false } if from.After(model.Earliest) || through.Before(model.Latest) { - // The range lookup is relatively cheap, so let's do it first. - ok, first, last := s.persistence.hasArchivedMetric(fp) - if !ok || first.After(through) || last.Before(from) { + // The range lookup is relatively cheap, so let's do it first if + // we have a chance the archived metric is not in the range. + has, first, last := s.persistence.hasArchivedMetric(fp) + if !has || first.After(through) || last.Before(from) { return metric.Metric{}, false } } @@ -725,14 +724,25 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me } // getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// +// The caller must have locked the fp. func (s *memorySeriesStorage) getSeriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { series, ok := s.fpToSeries.get(fp) if ok { + if series.lastTime.Before(from) || series.firstTime().After(through) { + return nil + } return series } + + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + if watermark < from { + return nil + } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 291b32918..f305e792f 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -34,6 +34,7 @@ func TestMatches(t *testing.T) { storage, closer := NewTestStorage(t, 1) defer closer.Close() + storage.archiveHighWatermark = 90 samples := make([]*model.Sample, 100) fingerprints := make(model.Fingerprints, 100) @@ -56,6 +57,20 @@ func TestMatches(t *testing.T) { } storage.WaitForIndexing() + // Archive every tenth metric. + for i, fp := range fingerprints { + if i%10 != 0 { + continue + } + s, ok := storage.fpToSeries.get(fp) + if !ok { + t.Fatal("could not retrieve series for fp", fp) + } + storage.fpLocker.Lock(fp) + storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) + storage.fpLocker.Unlock(fp) + } + newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher { lm, err := metric.NewLabelMatcher(matchType, name, value) if err != nil { @@ -197,6 +212,56 @@ func TestMatches(t *testing.T) { t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers) } } + // Smoketest for from/through. + if len(storage.MetricsForLabelMatchers( + model.Earliest, -10000, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'through' older than any sample") + } + if len(storage.MetricsForLabelMatchers( + 10000, model.Latest, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'from' newer than any sample") + } + // Now the tricky one, cut out something from the middle. + var ( + from model.Time = 25 + through model.Time = 75 + ) + res = storage.MetricsForLabelMatchers( + from, through, + mt.matchers..., + ) + expected := model.Fingerprints{} + for _, fp := range mt.expected { + i := 0 + for ; fingerprints[i] != fp && i < len(fingerprints); i++ { + } + if i == len(fingerprints) { + t.Fatal("expected fingerprint does not exist") + } + if !model.Time(i).Before(from) && !model.Time(i).After(through) { + expected = append(expected, fp) + } + } + if len(expected) != len(res) { + t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res)) + } + for fp1 := range res { + found := false + for _, fp2 := range expected { + if fp1 == fp2 { + found = true + break + } + } + if !found { + t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers) + } + } + } } @@ -1195,6 +1260,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { t.Fatal("archived") } + // Set archiveHighWatermark to a low value so that we can see it increase. + s.archiveHighWatermark = 42 + // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) @@ -1202,6 +1270,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !archived { t.Fatal("series purged completely") } + // archiveHighWatermark must have been set by maintainMemorySeries. + if want, got := model.Time(19998), s.archiveHighWatermark; want != got { + t.Errorf("want archiveHighWatermark %v, got %v", want, got) + } } func TestEvictAndPurgeSeriesChunkType0(t *testing.T) { From e8c1f30ab2020a558cae2f6ec030f1d4fab24421 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 21:56:15 +0100 Subject: [PATCH 7/9] Merge the parallel logic of getSeriesForRange and metricForFingerprint --- storage/local/storage.go | 89 +++++++++++++---------------------- storage/local/test_helpers.go | 2 + 2 files changed, 34 insertions(+), 57 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 734b97ad3..740dede3c 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -500,9 +500,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - if metric, ok := s.metricForFingerprint(fp, from, through); ok { - result[fp] = metric + s.fpLocker.Lock(fp) + if met, _, ok := s.metricForRange(fp, from, through); ok { + result[fp] = metric.Metric{Metric: met} } + s.fpLocker.Unlock(fp) } for _, matcher := range filters { for fp, met := range result { @@ -514,50 +516,46 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( return result } -// metricForFingerprint returns the metric for the given fingerprint if the -// corresponding time series has samples between 'from' and 'through'. -func (s *memorySeriesStorage) metricForFingerprint( +// metricForRange returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through', together +// with a pointer to the series if it is in memory already. For a series that +// does not have samples between 'from' and 'through', the returned bool is +// false. For an archived series that does contain samples between 'from' and +// 'through', it returns (metric, nil, true). +// +// The caller must have locked the fp. +func (s *memorySeriesStorage) metricForRange( fp model.Fingerprint, from, through model.Time, -) (metric.Metric, bool) { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - +) (model.Metric, *memorySeries, bool) { series, ok := s.fpToSeries.get(fp) if ok { if series.lastTime.Before(from) || series.firstTime().After(through) { - return metric.Metric{}, false + return nil, nil, false } - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, - }, true + return series.metric, series, true } // From here on, we are only concerned with archived metrics. // If the high watermark of archived series is before 'from', we are done. watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) if watermark < from { - return metric.Metric{}, false + return nil, nil, false } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first if // we have a chance the archived metric is not in the range. has, first, last := s.persistence.hasArchivedMetric(fp) if !has || first.After(through) || last.Before(from) { - return metric.Metric{}, false + return nil, nil, false } } - met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. - if met == nil { - return metric.Metric{}, false + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + // archivedMetric has already flagged the storage as dirty in this case. + return nil, nil, false } - - return metric.Metric{ - Metric: met, - Copied: false, - }, true + return metric, nil, true } // LabelValuesForLabelName implements Storage. @@ -723,43 +721,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. // // The caller must have locked the fp. -func (s *memorySeriesStorage) getSeriesForRange( +func (s *memorySeriesStorage) seriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { - series, ok := s.fpToSeries.get(fp) - if ok { - if series.lastTime.Before(from) || series.firstTime().After(through) { - return nil - } - return series - } - - watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) - if watermark < from { + metric, series, ok := s.metricForRange(fp, from, through) + if !ok { return nil } - - has, first, last := s.persistence.hasArchivedMetric(fp) - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil - } - if last.Before(from) || first.After(through) { - return nil - } - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - // Error already logged, storage declared dirty by archivedMetric. - return nil - } - series, err = s.getOrCreateSeries(fp, metric) - if err != nil { - // getOrCreateSeries took care of quarantining already. - return nil + if series == nil { + series, _ = s.getOrCreateSeries(fp, metric) + // getOrCreateSeries took care of quarantining already, so ignore the error. } return series } @@ -771,7 +746,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } @@ -790,7 +765,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 4e914b724..1dedf518e 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -21,6 +21,7 @@ package local import ( "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/util/testutil" ) @@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, SyncStrategy: Adaptive, } storage := NewMemorySeriesStorage(o) + storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) From 199f309a39c6079b3502766225bfaa54cfd434bc Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sun, 13 Mar 2016 11:54:24 +0100 Subject: [PATCH 8/9] Resurrect and rename invalid preload requests count metric. It is now also used in label matching, so the name of the metric changed from `prometheus_local_storage_invalid_preload_requests_total` to `non_existent_series_matches_total'. --- storage/local/storage.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 740dede3c..79ece03d3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -159,15 +159,15 @@ type memorySeriesStorage struct { quarantineRequests chan quarantineRequest quarantineStopping, quarantineStopped chan struct{} - persistErrors prometheus.Counter - numSeries prometheus.Gauge - seriesOps *prometheus.CounterVec - ingestedSamplesCount prometheus.Counter - outOfOrderSamplesCount prometheus.Counter - invalidPreloadRequestsCount prometheus.Counter - maintainSeriesDuration *prometheus.SummaryVec - persistenceUrgencyScore prometheus.Gauge - rushedMode prometheus.Gauge + persistErrors prometheus.Counter + numSeries prometheus.Gauge + seriesOps *prometheus.CounterVec + ingestedSamplesCount prometheus.Counter + outOfOrderSamplesCount prometheus.Counter + nonExistentSeriesMatchesCount prometheus.Counter + maintainSeriesDuration *prometheus.SummaryVec + persistenceUrgencyScore prometheus.Gauge + rushedMode prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -248,11 +248,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Name: "out_of_order_samples_total", Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", }), - invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ + nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "invalid_preload_requests_total", - Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", + Name: "non_existent_series_matches_total", + Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.", }), maintainSeriesDuration: prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -545,7 +545,11 @@ func (s *memorySeriesStorage) metricForRange( // The range lookup is relatively cheap, so let's do it first if // we have a chance the archived metric is not in the range. has, first, last := s.persistence.hasArchivedMetric(fp) - if !has || first.After(through) || last.Before(from) { + if !has { + s.nonExistentSeriesMatchesCount.Inc() + return nil, nil, false + } + if first.After(through) || last.Before(from) { return nil, nil, false } } @@ -1492,7 +1496,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() ch <- s.outOfOrderSamplesCount.Desc() - ch <- s.invalidPreloadRequestsCount.Desc() + ch <- s.nonExistentSeriesMatchesCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) ch <- s.persistenceUrgencyScore.Desc() @@ -1519,7 +1523,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount ch <- s.outOfOrderSamplesCount - ch <- s.invalidPreloadRequestsCount + ch <- s.nonExistentSeriesMatchesCount ch <- prometheus.MustNewConstMetric( numMemChunksDesc, prometheus.GaugeValue, From e7ac9c6863baaed1455b2113eaabe5b200316fb6 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 17 Mar 2016 14:37:24 +0100 Subject: [PATCH 9/9] Improvments based on review - Moved returns into the default section of switch statement that can only happen then. - Fix typo. --- storage/local/chunk.go | 2 +- storage/local/delta.go | 4 ++-- storage/local/doubledelta.go | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 2e9fcf4cf..55e39ecbd 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -285,7 +285,7 @@ type chunkIterator interface { // of the find... methods). It returns ZeroSamplePair before any of // those methods were called. value() model.SamplePair - // Returns the last error encountered. In general, an error signal data + // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. err() error } diff --git a/storage/local/delta.go b/storage/local/delta.go index a74e0806a..58e028159 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -328,8 +328,8 @@ func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) + return model.Earliest } - return model.Earliest } func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { @@ -358,7 +358,7 @@ func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleVa return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) + return 0 } } - return 0 } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index a53d41f6b..60e5667ad 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -452,8 +452,8 @@ func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) + return model.Earliest } - return model.Earliest } func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { @@ -491,6 +491,7 @@ func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.Sa // No d8 for ints. default: acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) + return 0 } } else { switch acc.vBytes { @@ -503,7 +504,7 @@ func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.Sa return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) + return 0 } } - return 0 }