diff --git a/promql/analyzer.go b/promql/analyzer.go index bad5fbd92..7243d31db 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -79,7 +79,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { Inspect(a.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) @@ -95,7 +98,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { } } case *MatrixSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index d8b1ae2b8..55e39ecbd 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -17,6 +17,7 @@ import ( "container/list" "fmt" "io" + "sort" "sync" "sync/atomic" @@ -261,54 +262,72 @@ type chunk interface { // generally not safe to use a chunkIterator concurrently with or after chunk // mutation. type chunkIterator interface { - // length returns the number of samples in the chunk. - length() int - // Gets the timestamp of the n-th sample in the chunk. - timestampAtIndex(int) (model.Time, error) // Gets the last timestamp in the chunk. lastTimestamp() (model.Time, error) - // Gets the sample value of the n-th sample in the chunk. - sampleValueAtIndex(int) (model.SampleValue, error) - // Gets the last sample value in the chunk. - lastSampleValue() (model.SampleValue, error) - // Gets the value that is closest before the given time. In case a value - // exists at precisely the given time, that value is returned. If no - // applicable value exists, ZeroSamplePair is returned. - valueAtOrBeforeTime(model.Time) (model.SamplePair, error) - // Gets all values contained within a given interval. - rangeValues(metric.Interval) ([]model.SamplePair, error) // Whether a given timestamp is contained between first and last value // in the chunk. contains(model.Time) (bool, error) - // values returns a channel, from which all sample values in the chunk - // can be received in order. The channel is closed after the last - // one. It is generally not safe to mutate the chunk while the channel - // is still open. If a value is returned with error!=nil, no further - // values will be returned and the channel is closed. - values() <-chan struct { - model.SamplePair - error + // Scans the next value in the chunk. Directly after the iterator has + // been created, the next value is the first value in the + // chunk. Otherwise, it is the value following the last value scanned or + // found (by one of the find... methods). Returns false if either the + // end of the chunk is reached or an error has occurred. + scan() bool + // Finds the most recent value at or before the provided time. Returns + // false if either the chunk contains no value at or before the provided + // time, or an error has occurred. + findAtOrBefore(model.Time) bool + // Finds the oldest value at or after the provided time. Returns false + // if either the chunk contains no value at or after the provided time, + // or an error has occurred. + findAtOrAfter(model.Time) bool + // Returns the last value scanned (by the scan method) or found (by one + // of the find... methods). It returns ZeroSamplePair before any of + // those methods were called. + value() model.SamplePair + // Returns the last error encountered. In general, an error signals data + // corruption in the chunk and requires quarantining. + err() error +} + +// rangeValues is a utility function that retrieves all values within the given +// range from a chunkIterator. +func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) { + result := []model.SamplePair{} + if !it.findAtOrAfter(in.OldestInclusive) { + return result, it.err() } + for !it.value().Timestamp.After(in.NewestInclusive) { + result = append(result, it.value()) + if !it.scan() { + break + } + } + return result, it.err() } func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() - head := dst - body := []chunk{} - for v := range src.newIterator().values() { - if v.error != nil { - return nil, v.error - } - newChunks, err := head.add(v.SamplePair) - if err != nil { + var ( + head = dst + body, newChunks []chunk + err error + ) + + it := src.newIterator() + for it.scan() { + if newChunks, err = head.add(it.value()); err != nil { return nil, err } body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] } - newChunks, err := head.add(s) - if err != nil { + if it.err() != nil { + return nil, it.err() + } + + if newChunks, err = head.add(s); err != nil { return nil, err } return append(body, newChunks...), nil @@ -334,3 +353,94 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } + +// indexAccessor allows accesses to samples by index. +type indexAccessor interface { + timestampAtIndex(int) model.Time + sampleValueAtIndex(int) model.SampleValue + err() error +} + +// indexAccessingChunkIterator is a chunk iterator for chunks for which an +// indexAccessor implementation exists. +type indexAccessingChunkIterator struct { + len int + pos int + lastValue model.SamplePair + acc indexAccessor +} + +func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator { + return &indexAccessingChunkIterator{ + len: len, + pos: -1, + lastValue: ZeroSamplePair, + acc: acc, + } +} + +// lastTimestamp implements chunkIterator. +func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { + return it.acc.timestampAtIndex(it.len - 1), it.acc.err() +} + +// contains implements chunkIterator. +func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { + return !t.Before(it.acc.timestampAtIndex(0)) && + !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() +} + +// scan implements chunkIterator. +func (it *indexAccessingChunkIterator) scan() bool { + it.pos++ + if it.pos >= it.len { + return false + } + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(it.pos), + Value: it.acc.sampleValueAtIndex(it.pos), + } + return it.acc.err() == nil +} + +// findAtOrBefore implements chunkIterator. +func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool { + i := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(t) + }) + if i == 0 || it.acc.err() != nil { + return false + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i - 1), + Value: it.acc.sampleValueAtIndex(i - 1), + } + return true +} + +// findAtOrAfter implements chunkIterator. +func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool { + i := sort.Search(it.len, func(i int) bool { + return !it.acc.timestampAtIndex(i).Before(t) + }) + if i == it.len || it.acc.err() != nil { + return false + } + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i), + Value: it.acc.sampleValueAtIndex(i), + } + return true +} + +// value implements chunkIterator. +func (it *indexAccessingChunkIterator) value() model.SamplePair { + return it.lastValue +} + +// err implements chunkIterator. +func (it *indexAccessingChunkIterator) err() error { + return it.acc.err() +} diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index f51e54e7b..17626c07c 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false, nil) + p.dirtyMtx.Lock() + // Only declare storage clean if it didn't become dirty during crash recovery. + if !p.becameDirty { + p.dirty = false + } + p.dirtyMtx.Unlock() + log.Warn("Crash recovery complete.") return nil } diff --git a/storage/local/delta.go b/storage/local/delta.go index c78702072..1ca066184 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 21-byte header of a delta-encoded chunk looks like: @@ -201,15 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { - return &deltaEncodedChunkIterator{ + return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ c: *c, - len: c.len(), baseT: c.baseTime(), baseV: c.baseValue(), tBytes: c.timeBytes(), vBytes: c.valueBytes(), isInt: c.isInt(), - } + }) } // marshal implements chunk. @@ -303,184 +299,67 @@ func (c deltaEncodedChunk) len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } -// deltaEncodedChunkIterator implements chunkIterator. -type deltaEncodedChunkIterator struct { +// deltaEncodedIndexAccessor implements indexAccessor. +type deltaEncodedIndexAccessor struct { c deltaEncodedChunk - len int baseT model.Time baseV model.SampleValue tBytes, vBytes deltaBytes isInt bool + lastErr error } -// length implements chunkIterator. -func (it *deltaEncodedChunkIterator) length() int { return it.len } - -// valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - var lastErr error - i := sort.Search(it.len, func(i int) bool { - ts, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return ts.After(t) - }) - if i == 0 || lastErr != nil { - return ZeroSamplePair, lastErr - } - ts, err := it.timestampAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - v, err := it.sampleValueAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - return model.SamplePair{Timestamp: ts, Value: v}, nil +func (acc *deltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - var lastErr error +func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) - oldest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return !t.Before(in.OldestInclusive) - }) - - newest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return t.After(in.NewestInclusive) - }) - - if oldest == it.len || lastErr != nil { - return nil, lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - return nil, err - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - return nil, err - } - result = append(result, model.SamplePair{Timestamp: t, Value: v}) - } - return result, nil -} - -// contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - lastT, err := it.timestampAtIndex(it.len - 1) - if err != nil { - return false, err - } - return !t.Before(it.baseT) && !t.After(lastT), nil -} - -// values implements chunkIterator. -func (it *deltaEncodedChunkIterator) values() <-chan struct { - model.SamplePair - error -} { - valuesChan := make(chan struct { - model.SamplePair - error - }) - go func() { - for i := 0; i < it.len; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - valuesChan <- struct { - model.SamplePair - error - }{model.SamplePair{Timestamp: t, Value: v}, nil} - } - close(valuesChan) - }() - return valuesChan -} - -// timestampAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) - - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])), nil + return acc.baseT + model.Time(uint8(acc.c[offset])) case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil + return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:])) case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil + return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:])) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) + return model.Earliest } } -// lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1) -} +func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { + offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) -// sampleValueAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { - offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) - - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV, nil + return acc.baseV case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])), nil + return acc.baseV + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) + return 0 } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) + return 0 } } } - -// lastSampleValue implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { - return it.sampleValueAtIndex(it.len - 1) -} diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 257c84544..60e5667ad 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -18,11 +18,8 @@ import ( "fmt" "io" "math" - "sort" "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/storage/metric" ) // The 37-byte header of a delta-encoded chunk looks like: @@ -207,9 +204,8 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time { // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { - return &doubleDeltaEncodedChunkIterator{ + return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ c: *c, - len: c.len(), baseT: c.baseTime(), baseΔT: c.baseTimeDelta(), baseV: c.baseValue(), @@ -217,7 +213,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { tBytes: c.timeBytes(), vBytes: c.valueBytes(), isInt: c.isInt(), - } + }) } // marshal implements chunk. @@ -409,223 +405,106 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt return []chunk{&c}, nil } -// doubleDeltaEncodedChunkIterator implements chunkIterator. -type doubleDeltaEncodedChunkIterator struct { +// doubleDeltaEncodedIndexAccessor implements indexAccessor. +type doubleDeltaEncodedIndexAccessor struct { c doubleDeltaEncodedChunk - len int baseT, baseΔT model.Time baseV, baseΔV model.SampleValue tBytes, vBytes deltaBytes isInt bool + lastErr error } -// length implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } - -// valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - var lastErr error - i := sort.Search(it.len, func(i int) bool { - ts, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return ts.After(t) - }) - if i == 0 || lastErr != nil { - return ZeroSamplePair, lastErr - } - ts, err := it.timestampAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - v, err := it.sampleValueAtIndex(i - 1) - if err != nil { - return ZeroSamplePair, err - } - return model.SamplePair{Timestamp: ts, Value: v}, nil +func (acc *doubleDeltaEncodedIndexAccessor) err() error { + return acc.lastErr } -// rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - var lastErr error - - oldest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return !t.Before(in.OldestInclusive) - }) - - newest := sort.Search(it.len, func(i int) bool { - t, err := it.timestampAtIndex(i) - if err != nil { - lastErr = err - } - return t.After(in.NewestInclusive) - }) - - if oldest == it.len || lastErr != nil { - return nil, lastErr - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - return nil, err - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - return nil, err - } - result = append(result, model.SamplePair{Timestamp: t, Value: v}) - } - return result, nil -} - -// contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { - lastT, err := it.timestampAtIndex(it.len - 1) - if err != nil { - return false, err - } - return !t.Before(it.baseT) && !t.After(lastT), nil -} - -// values implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct { - model.SamplePair - error -} { - valuesChan := make(chan struct { - model.SamplePair - error - }) - go func() { - for i := 0; i < it.len; i++ { - t, err := it.timestampAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - v, err := it.sampleValueAtIndex(i) - if err != nil { - valuesChan <- struct { - model.SamplePair - error - }{ZeroSamplePair, err} - break - } - valuesChan <- struct { - model.SamplePair - error - }{model.SamplePair{Timestamp: t, Value: v}, nil} - } - close(valuesChan) - }() - return valuesChan -} - -// timestampAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { +func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { if idx == 0 { - return it.baseT, nil + return acc.baseT } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. - if it.tBytes == d8 { - return it.baseΔT, nil + if acc.tBytes == d8 { + return acc.baseΔT } - return it.baseT + it.baseΔT, nil + return acc.baseT + acc.baseΔT } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) - switch it.tBytes { + switch acc.tBytes { case d1: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])), nil + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int8(acc.c[offset])) case d2: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseT + - model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return acc.baseT + + model.Time(idx)*acc.baseΔT + + model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil + return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: - return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) + return model.Earliest } } -// lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { - return it.timestampAtIndex(it.len - 1) -} - -// sampleValueAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { +func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { if idx == 0 { - return it.baseV, nil + return acc.baseV } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. - if it.vBytes == d8 { - return it.baseΔV, nil + if acc.vBytes == d8 { + return acc.baseΔV } - return it.baseV + it.baseΔV, nil + return acc.baseV + acc.baseΔV } - offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) + offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - if it.isInt { - switch it.vBytes { + if acc.isInt { + switch acc.vBytes { case d0: - return it.baseV + - model.SampleValue(idx)*it.baseΔV, nil + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV case d1: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])), nil + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int8(acc.c[offset])) case d2: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) // No d8 for ints. default: - return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) + return 0 } } else { - switch it.vBytes { + switch acc.vBytes { case d4: - return it.baseV + - model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil + return acc.baseV + + model.SampleValue(idx)*acc.baseΔV + + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: - return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) + acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) + return 0 } } } - -// lastSampleValue implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { - return it.sampleValueAtIndex(it.len - 1) -} diff --git a/storage/local/interface.go b/storage/local/interface.go index d9dbc4f21..26bc5325a 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -40,20 +40,22 @@ type Storage interface { // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader - // MetricsForLabelMatchers returns the metrics from storage that satisfy the given - // label matchers. At least one label matcher must be specified that does not - // match the empty string. - MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric - // LastSamplePairForFingerprint returns the last sample pair that has - // been ingested for the provided fingerprint. If this instance of the + // MetricsForLabelMatchers returns the metrics from storage that satisfy + // the given label matchers. At least one label matcher must be + // specified that does not match the empty string. The times from and + // through are hints for the storage to optimize the search. The storage + // MAY exclude metrics that have no samples in the specified interval + // from the returned map. In doubt, specify model.Earliest for from and + // model.Latest for through. + MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric + // LastSampleForFingerprint returns the last sample that has been + // ingested for the provided fingerprint. If this instance of the // Storage has never ingested a sample for the provided fingerprint (or // the last ingestion is so long ago that the series has been archived), - // ZeroSamplePair is returned. - LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair + // ZeroSample is returned. + LastSampleForFingerprint(model.Fingerprint) model.Sample // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues - // Get the metric associated with the provided fingerprint. - MetricForFingerprint(model.Fingerprint) metric.Metric // Drop all time series associated with the given fingerprints. DropMetricsForFingerprints(...model.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the @@ -89,7 +91,7 @@ type SeriesIterator interface { type Preloader interface { PreloadRange( fp model.Fingerprint, - from model.Time, through model.Time, + from, through model.Time, ) SeriesIterator PreloadInstant( fp model.Fingerprint, @@ -100,8 +102,15 @@ type Preloader interface { } // ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local -// package to signal a non-existing sample. It is a SamplePair with timestamp -// model.Earliest and value 0.0. Note that the natural zero value of SamplePair -// has a timestamp of 0, which is possible to appear in a real SamplePair and -// thus not suitable to signal a non-existing SamplePair. +// package to signal a non-existing sample pair. It is a SamplePair with +// timestamp model.Earliest and value 0.0. Note that the natural zero value of +// SamplePair has a timestamp of 0, which is possible to appear in a real +// SamplePair and thus not suitable to signal a non-existing SamplePair. var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} + +// ZeroSample is the pseudo zero-value of model.Sample used by the local package +// to signal a non-existing sample. It is a Sample with timestamp +// model.Earliest, value 0.0, and metric nil. Note that the natural zero value +// of Sample has a timestamp of 0, which is possible to appear in a real +// Sample and thus not suitable to signal a non-existing Sample. +var ZeroSample = model.Sample{Timestamp: model.Earliest} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index e713feea7..68bc612f0 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool { return p.dirty } -// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was -// set to true with this method, it cannot be set to false again. (If we became -// dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) The provided error will be -// logged as a reason if dirty is true. -func (p *persistence) setDirty(dirty bool, err error) { - if dirty { - p.dirtyCounter.Inc() - } +// setDirty flags the storage as dirty in a goroutine-safe way. The provided +// error will be logged as a reason the first time the storage is flagged as dirty. +func (p *persistence) setDirty(err error) { + p.dirtyCounter.Inc() p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { return } - p.dirty = dirty - if dirty { - p.becameDirty = true - log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") - } + p.dirty = true + p.becameDirty = true + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } // fingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) { +func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err)) + return nil } - return fps, nil + return fps } // labelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { +func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) + return nil } - return lvs, nil + return lvs } // persistChunks persists a number of consecutive chunks of a series. It is the @@ -363,13 +358,10 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa // the (zero-based) index of the first persisted chunk within the series // file. In case of an error, the returned index is -1 (to avoid the // misconception that the chunk was written at position 0). +// +// Returning an error signals problems with the series file. In this case, the +// caller should quarantine the series. func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { - defer func() { - if err != nil { - p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err)) - } - }() - f, err := p.openChunkFileForWriting(fp) if err != nil { return -1, err @@ -743,6 +735,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in // deleted (in which case the returned timestamp will be 0 and must be ignored). // It is the caller's responsibility to make sure nothing is persisted or loaded // for the same fingerprint concurrently. +// +// Returning an error signals problems with the series file. In this case, the +// caller should quarantine the series. func (p *persistence) dropAndPersistChunks( fp model.Fingerprint, beforeTime model.Time, chunks []chunk, ) ( @@ -755,12 +750,6 @@ func (p *persistence) dropAndPersistChunks( // Style note: With the many return values, it was decided to use naked // returns in this method. They make the method more readable, but // please handle with care! - defer func() { - if err != nil { - p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err)) - } - }() - if len(chunks) > 0 { // We have chunks to persist. First check if those are already // too old. If that's the case, the chunks in the series file @@ -1014,29 +1003,28 @@ func (p *persistence) waitForIndexing() { // the metric. The caller must have locked the fingerprint. func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, -) error { +) { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err)) + return } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err)) } - return nil } // hasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in the // corresponding series is. This method is goroutine-safe. func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( - hasMetric bool, firstTime, lastTime model.Time, err error, + hasMetric bool, firstTime, lastTime model.Time, ) { - firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err)) + hasMetric = false } - return + return hasMetric, firstTime, lastTime } // updateArchivedTimeRange updates an archived time range. The caller must make @@ -1074,7 +1062,11 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model // method is goroutine-safe. func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) - return metric, err + if err != nil { + p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err)) + return nil, err + } + return metric, nil } // purgeArchivedMetric deletes an archived fingerprint and its corresponding @@ -1084,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) + p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err)) } }() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e1894032a..692f494d5 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -82,14 +82,14 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint } func chunksEqual(c1, c2 chunk) bool { - values2 := c2.newIterator().values() - for v1 := range c1.newIterator().values() { - v2 := <-values2 - if !(v1 == v2) { + it1 := c1.newIterator() + it2 := c2.newIterator() + for it1.scan() && it2.scan() { + if !(it1.value() == it2.value()) { return false } } - return true + return it1.err() == nil && it2.err() == nil } func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { @@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p.indexMetric(2, m2) p.waitForIndexing() - outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want := model.Fingerprints{1} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(1); !archived { t.Error("want FP 1 archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } - if err != p.purgeArchivedMetric(1) { + if err := p.purgeArchivedMetric(1); err != nil { t.Fatal(err) } - if err != p.purgeArchivedMetric(3) { + if err := p.purgeArchivedMetric(3); err != nil { // Purging something that has not beet archived is not an error. t.Fatal(err) } p.waitForIndexing() - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want = nil if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { + if archived, _, _ := p.hasArchivedMetric(1); archived { t.Error("want FP 1 not archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } } @@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { for i, b := range batches { for fp, m := range b.fpToMetric { p.indexMetric(fp, m) - if err := p.archiveMetric(fp, m, 1, 2); err != nil { - t.Fatal(err) - } + p.archiveMetric(fp, m, 1, 2) indexedFpsToMetrics[fp] = m } verifyIndexedState(i, t, b, indexedFpsToMetrics, p) @@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } // Check that archived metrics are in membership index. - has, first, last, err := p.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + has, first, last := p.hasArchivedMetric(fp) if !has { t.Errorf("%d. fingerprint %v not found", i, fp) } @@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.labelValuesForLabelName(ln) - if err != nil { - t.Fatal(err) - } + outLvs := p.labelValuesForLabelName(ln) outSet := codable.LabelValueSet{} for _, lv := range outLvs { @@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFPs, err := p.fingerprintsForLabelPair(lp) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(lp) outSet := codable.FingerprintSet{} for _, fp := range outFPs { diff --git a/storage/local/series.go b/storage/local/series.go index 51ad865a9..0dcad859a 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -557,12 +557,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } if containsT { - value, err := it.chunkIt.valueAtOrBeforeTime(t) - if err != nil { - it.quarantine(err) - return ZeroSamplePair + if it.chunkIt.findAtOrBefore(t) { + return it.chunkIt.value() } - return value + if it.chunkIt.err() != nil { + it.quarantine(it.chunkIt.err()) + } + return ZeroSamplePair } } @@ -580,12 +581,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - value, err := it.chunkIt.valueAtOrBeforeTime(t) - if err != nil { - it.quarantine(err) - return ZeroSamplePair + if it.chunkIt.findAtOrBefore(t) { + return it.chunkIt.value() } - return value + if it.chunkIt.err() != nil { + it.quarantine(it.chunkIt.err()) + } + return ZeroSamplePair } // RangeValues implements SeriesIterator. @@ -612,7 +614,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.firstTime().After(in.NewestInclusive) { break } - chValues, err := it.chunkIterator(i + j).rangeValues(in) + chValues, err := rangeValues(it.chunkIterator(i+j), in) if err != nil { it.quarantine(err) return nil diff --git a/storage/local/storage.go b/storage/local/storage.go index ede1bfc61..79ece03d3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,6 +16,7 @@ package local import ( "container/list" + "errors" "fmt" "math" "sync" @@ -128,12 +129,13 @@ const ( type syncStrategy func() bool type memorySeriesStorage struct { - // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. - rushed bool // Whether the storage is in rushed mode. - rushedMtx sync.Mutex // Protects entering and exiting rushed mode. - throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). + // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. + archiveHighWatermark model.Time // No archived series has samples after this time. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + rushedMtx sync.Mutex // Protects entering and exiting rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -157,15 +159,15 @@ type memorySeriesStorage struct { quarantineRequests chan quarantineRequest quarantineStopping, quarantineStopped chan struct{} - persistErrors prometheus.Counter - numSeries prometheus.Gauge - seriesOps *prometheus.CounterVec - ingestedSamplesCount prometheus.Counter - outOfOrderSamplesCount prometheus.Counter - invalidPreloadRequestsCount prometheus.Counter - maintainSeriesDuration *prometheus.SummaryVec - persistenceUrgencyScore prometheus.Gauge - rushedMode prometheus.Gauge + persistErrors prometheus.Counter + numSeries prometheus.Gauge + seriesOps *prometheus.CounterVec + ingestedSamplesCount prometheus.Counter + outOfOrderSamplesCount prometheus.Counter + nonExistentSeriesMatchesCount prometheus.Counter + maintainSeriesDuration *prometheus.SummaryVec + persistenceUrgencyScore prometheus.Gauge + rushedMode prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -200,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, + archiveHighWatermark: model.Now().Add(-headChunkTimeout), maxChunksToPersist: o.MaxChunksToPersist, @@ -245,11 +248,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Name: "out_of_order_samples_total", Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", }), - invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ + nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "invalid_preload_requests_total", - Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", + Name: "non_existent_series_matches_total", + Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.", }), maintainSeriesDuration: prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -367,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() { } // LastSampleForFingerprint implements Storage. -func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair { +func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series, ok := s.fpToSeries.get(fp) if !ok { - return ZeroSamplePair + return ZeroSample + } + sp := series.lastSamplePair() + return model.Sample{ + Metric: series.metric, + Value: sp.Value, + Timestamp: sp.Timestamp, } - return series.lastSamplePair() } // boundedIterator wraps a SeriesIterator and does not allow fetching @@ -417,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair var result map[model.Fingerprint]struct{} for _, pair := range pairs { intersection := map[model.Fingerprint]struct{}{} - fps, err := s.persistence.fingerprintsForLabelPair(pair) - if err != nil { - log.Error("Error getting fingerprints for label pair: ", err) - } + fps := s.persistence.fingerprintsForLabelPair(pair) if len(fps) == 0 { return nil } @@ -438,7 +443,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair } // MetricsForLabelMatchers implements Storage. -func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric { +func (s *memorySeriesStorage) MetricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) map[model.Fingerprint]metric.Metric { var ( equals []model.LabelPair filters []*metric.LabelMatcher @@ -490,9 +498,13 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM filters = remaining } - result := make(map[model.Fingerprint]metric.Metric, len(resFPs)) + result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - result[fp] = s.MetricForFingerprint(fp) + s.fpLocker.Lock(fp) + if met, _, ok := s.metricForRange(fp, from, through); ok { + result[fp] = metric.Metric{Metric: met} + } + s.fpLocker.Unlock(fp) } for _, matcher := range filters { for fp, met := range result { @@ -504,37 +516,55 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM return result } -// LabelValuesForLabelName implements Storage. -func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { - lvs, err := s.persistence.labelValuesForLabelName(labelName) - if err != nil { - log.Errorf("Error getting label values for label name %q: %v", labelName, err) - } - return lvs -} - -// MetricForFingerprint implements Storage. -func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.Metric { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - +// metricForRange returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through', together +// with a pointer to the series if it is in memory already. For a series that +// does not have samples between 'from' and 'through', the returned bool is +// false. For an archived series that does contain samples between 'from' and +// 'through', it returns (metric, nil, true). +// +// The caller must have locked the fp. +func (s *memorySeriesStorage) metricForRange( + fp model.Fingerprint, + from, through model.Time, +) (model.Metric, *memorySeries, bool) { series, ok := s.fpToSeries.get(fp) if ok { - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, + if series.lastTime.Before(from) || series.firstTime().After(through) { + return nil, nil, false + } + return series.metric, series, true + } + // From here on, we are only concerned with archived metrics. + // If the high watermark of archived series is before 'from', we are done. + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + if watermark < from { + return nil, nil, false + } + if from.After(model.Earliest) || through.Before(model.Latest) { + // The range lookup is relatively cheap, so let's do it first if + // we have a chance the archived metric is not in the range. + has, first, last := s.persistence.hasArchivedMetric(fp) + if !has { + s.nonExistentSeriesMatchesCount.Inc() + return nil, nil, false + } + if first.After(through) || last.Before(from) { + return nil, nil, false } } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) - } - return metric.Metric{ - Metric: met, - Copied: false, + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + // archivedMetric has already flagged the storage as dirty in this case. + return nil, nil, false } + return metric, nil, true +} + +// LabelValuesForLabelName implements Storage. +func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { + return s.persistence.labelValuesForLabelName(labelName) } // DropMetric implements Storage. @@ -562,7 +592,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.fpLocker.Unlock(fp) }() // Func wrapper because fp might change below. if err != nil { - s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) return err } if fp != rawFP { @@ -695,36 +725,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. -func (s *memorySeriesStorage) getSeriesForRange( +// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// +// The caller must have locked the fp. +func (s *memorySeriesStorage) seriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { - series, ok := s.fpToSeries.get(fp) - if ok { - return series - } - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + metric, series, ok := s.metricForRange(fp, from, through) + if !ok { return nil } - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil - } - if last.Before(from) || first.After(through) { - return nil - } - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil - } - series, err = s.getOrCreateSeries(fp, metric) - if err != nil { - // getOrCreateSeries took care of quarantining already. - return nil + if series == nil { + series, _ = s.getOrCreateSeries(fp, metric) + // getOrCreateSeries took care of quarantining already, so ignore the error. } return series } @@ -736,7 +750,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } @@ -755,7 +769,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } @@ -1106,17 +1120,22 @@ func (s *memorySeriesStorage) maintainMemorySeries( } } - // Archive if all chunks are evicted. - if iOldestNotEvicted == -1 { + // Archive if all chunks are evicted. Also make sure the last sample has + // an age of at least headChunkTimeout (which is very likely anyway). + if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.lastTime, - ); err != nil { - log.Errorf("Error archiving metric %v: %v", series.metric, err) - return - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() + oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) + if oldWatermark < int64(series.lastTime) { + if !atomic.CompareAndSwapInt64( + (*int64)(&s.archiveHighWatermark), + oldWatermark, int64(series.lastTime), + ) { + panic("s.archiveHighWatermark modified outside of maintainMemorySeries") + } + } return } // If we are here, the series is not archived, so check for chunkDesc @@ -1138,8 +1157,20 @@ func (s *memorySeriesStorage) maintainMemorySeries( func (s *memorySeriesStorage) writeMemorySeries( fp model.Fingerprint, series *memorySeries, beforeTime model.Time, ) bool { - cds := series.chunksToPersist() + var ( + persistErr error + cds = series.chunksToPersist() + ) + defer func() { + if persistErr != nil { + s.quarantineSeries(fp, series.metric, persistErr) + s.persistErrors.Inc() + } + // The following is done even in case of an error to ensure + // correct counter bookkeeping and to not pin chunks in memory + // that belong to a series that is scheduled for quarantine + // anyway. for _, cd := range cds { cd.unpin(s.evictRequests) } @@ -1160,9 +1191,9 @@ func (s *memorySeriesStorage) writeMemorySeries( if len(cds) == 0 { return false } - offset, err := s.persistence.persistChunks(fp, chunks) - if err != nil { - s.persistErrors.Inc() + var offset int + offset, persistErr = s.persistence.persistChunks(fp, chunks) + if persistErr != nil { return false } if series.chunkDescsOffset == -1 { @@ -1174,14 +1205,12 @@ func (s *memorySeriesStorage) writeMemorySeries( return false } - newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, err := + newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, persistErr := s.persistence.dropAndPersistChunks(fp, beforeTime, chunks) - if err != nil { - s.persistErrors.Inc() + if persistErr != nil { return false } - if err := series.dropChunks(beforeTime); err != nil { - s.persistErrors.Inc() + if persistErr = series.dropChunks(beforeTime); persistErr != nil { return false } if len(series.chunkDescs) == 0 && allDroppedFromPersistence { @@ -1198,8 +1227,8 @@ func (s *memorySeriesStorage) writeMemorySeries( } else { series.chunkDescsOffset -= numDroppedFromPersistence if series.chunkDescsOffset < 0 { - s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series)) - series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. + persistErr = errors.New("dropped more chunks from persistence than from memory") + series.chunkDescsOffset = -1 } } return false @@ -1217,11 +1246,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Error("Error looking up archived time range: ", err) - return - } + has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp) if !has || !firstTime.Before(beforeTime) { // Oldest sample not old enough, or metric purged or unarchived in the meantime. return @@ -1234,10 +1259,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor log.Error("Error dropping persisted chunks: ", err) } if allDropped { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) - return - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do. s.seriesOps.WithLabelValues(archivePurge).Inc() return } @@ -1426,13 +1448,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, s.incNumChunksToPersist(-numChunksNotYetPersisted) } else { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log. - With("fingerprint", fp). - With("metric", m). - With("error", err). - Error("Error purging metric from archive.") - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do. } if m != nil { // If we know a metric now, unindex it in any case. @@ -1480,7 +1496,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() ch <- s.outOfOrderSamplesCount.Desc() - ch <- s.invalidPreloadRequestsCount.Desc() + ch <- s.nonExistentSeriesMatchesCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) ch <- s.persistenceUrgencyScore.Desc() @@ -1507,7 +1523,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount ch <- s.outOfOrderSamplesCount - ch <- s.invalidPreloadRequestsCount + ch <- s.nonExistentSeriesMatchesCount ch <- prometheus.MustNewConstMetric( numMemChunksDesc, prometheus.GaugeValue, diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 33d1a9918..f305e792f 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -34,6 +34,7 @@ func TestMatches(t *testing.T) { storage, closer := NewTestStorage(t, 1) defer closer.Close() + storage.archiveHighWatermark = 90 samples := make([]*model.Sample, 100) fingerprints := make(model.Fingerprints, 100) @@ -56,6 +57,20 @@ func TestMatches(t *testing.T) { } storage.WaitForIndexing() + // Archive every tenth metric. + for i, fp := range fingerprints { + if i%10 != 0 { + continue + } + s, ok := storage.fpToSeries.get(fp) + if !ok { + t.Fatal("could not retrieve series for fp", fp) + } + storage.fpLocker.Lock(fp) + storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) + storage.fpLocker.Unlock(fp) + } + newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher { lm, err := metric.NewLabelMatcher(matchType, name, value) if err != nil { @@ -178,7 +193,10 @@ func TestMatches(t *testing.T) { } for _, mt := range matcherTests { - res := storage.MetricsForLabelMatchers(mt.matchers...) + res := storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt.matchers..., + ) if len(mt.expected) != len(res) { t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) } @@ -194,6 +212,56 @@ func TestMatches(t *testing.T) { t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers) } } + // Smoketest for from/through. + if len(storage.MetricsForLabelMatchers( + model.Earliest, -10000, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'through' older than any sample") + } + if len(storage.MetricsForLabelMatchers( + 10000, model.Latest, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'from' newer than any sample") + } + // Now the tricky one, cut out something from the middle. + var ( + from model.Time = 25 + through model.Time = 75 + ) + res = storage.MetricsForLabelMatchers( + from, through, + mt.matchers..., + ) + expected := model.Fingerprints{} + for _, fp := range mt.expected { + i := 0 + for ; fingerprints[i] != fp && i < len(fingerprints); i++ { + } + if i == len(fingerprints) { + t.Fatal("expected fingerprint does not exist") + } + if !model.Time(i).Before(from) && !model.Time(i).After(through) { + expected = append(expected, fp) + } + } + if len(expected) != len(res) { + t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res)) + } + for fp1 := range res { + found := false + for _, fp2 := range expected { + if fp1 == fp2 { + found = true + break + } + } + if !found { + t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers) + } + } + } } @@ -362,7 +430,10 @@ func BenchmarkLabelMatching(b *testing.B) { for i := 0; i < b.N; i++ { benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} for _, mt := range matcherTests { - benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...) + benchLabelMatchingRes = s.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt..., + ) } } // Stop timer to not count the storage closing. @@ -465,11 +536,7 @@ func TestDropMetrics(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) @@ -576,11 +643,7 @@ func TestQuarantineMetric(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) // Corrupt the series file for m3. @@ -692,11 +755,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) { if cd.isEvicted() { continue } - for sample := range cd.c.newIterator().values() { - if sample.error != nil { - t.Error(sample.error) - } - values = append(values, sample.SamplePair) + it := cd.c.newIterator() + for it.scan() { + values = append(values, it.value()) + } + if it.err() != nil { + t.Error(it.err()) } } @@ -1137,36 +1201,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } // Drop ~half of the chunks of an archived series. s.maintainArchivedSeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("archived series purged although only half of the chunks dropped") } // Drop everything. s.maintainArchivedSeries(fp, 100000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived series not dropped") } @@ -1192,16 +1242,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } @@ -1213,24 +1255,25 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !ok { t.Fatal("could not find series") } - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived") } + // Set archiveHighWatermark to a low value so that we can see it increase. + s.archiveHighWatermark = 42 + // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("series purged completely") } + // archiveHighWatermark must have been set by maintainMemorySeries. + if want, got := model.Time(19998), s.archiveHighWatermark; want != got { + t.Errorf("want archiveHighWatermark %v, got %v", want, got) + } } func TestEvictAndPurgeSeriesChunkType0(t *testing.T) { diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 4e914b724..1dedf518e 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -21,6 +21,7 @@ package local import ( "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/util/testutil" ) @@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, SyncStrategy: Adaptive, } storage := NewMemorySeriesStorage(o) + storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 823665f3b..6858938fb 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -226,7 +226,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp, met := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { res[fp] = met } } @@ -250,7 +253,10 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { fps[fp] = struct{}{} } } diff --git a/web/federate.go b/web/federate.go index d9baf676b..26f4710eb 100644 --- a/web/federate.go +++ b/web/federate.go @@ -19,7 +19,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" @@ -33,7 +32,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - metrics := map[model.Fingerprint]metric.Metric{} + fps := map[model.Fingerprint]struct{}{} for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) @@ -41,8 +40,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) { - metrics[fp] = met + for fp := range h.storage.MetricsForLabelMatchers( + model.Now().Add(-promql.StalenessDelta), model.Latest, + matchers..., + ) { + fps[fp] = struct{}{} } } @@ -62,19 +64,19 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), } - for fp, met := range metrics { + for fp := range fps { globalUsed := map[model.LabelName]struct{}{} - sp := h.storage.LastSamplePairForFingerprint(fp) + s := h.storage.LastSampleForFingerprint(fp) // Discard if sample does not exist or lays before the staleness interval. - if sp.Timestamp.Before(minTimestamp) { + if s.Timestamp.Before(minTimestamp) { continue } // Reset label slice. protMetric.Label = protMetric.Label[:0] - for ln, lv := range met.Metric { + for ln, lv := range s.Metric { if ln == model.MetricNameLabel { protMetricFam.Name = proto.String(string(lv)) continue @@ -98,8 +100,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } } - protMetric.TimestampMs = proto.Int64(int64(sp.Timestamp)) - protMetric.Untyped.Value = proto.Float64(float64(sp.Value)) + protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) + protMetric.Untyped.Value = proto.Float64(float64(s.Value)) if err := enc.Encode(protMetricFam); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError)