mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-27 14:39:40 -08:00
Merge pull request #1466 from prometheus/beorn7/storage6
Rework chunk iterators
This commit is contained in:
commit
90eb0555df
|
@ -79,7 +79,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
|
||||||
Inspect(a.Expr, func(node Node) bool {
|
Inspect(a.Expr, func(node Node) bool {
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case *VectorSelector:
|
case *VectorSelector:
|
||||||
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...)
|
n.metrics = a.Storage.MetricsForLabelMatchers(
|
||||||
|
a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset),
|
||||||
|
n.LabelMatchers...,
|
||||||
|
)
|
||||||
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
|
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
|
||||||
|
|
||||||
pt := getPreloadTimes(n.Offset)
|
pt := getPreloadTimes(n.Offset)
|
||||||
|
@ -95,7 +98,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case *MatrixSelector:
|
case *MatrixSelector:
|
||||||
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...)
|
n.metrics = a.Storage.MetricsForLabelMatchers(
|
||||||
|
a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset),
|
||||||
|
n.LabelMatchers...,
|
||||||
|
)
|
||||||
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
|
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
|
||||||
|
|
||||||
pt := getPreloadTimes(n.Offset)
|
pt := getPreloadTimes(n.Offset)
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
@ -261,54 +262,72 @@ type chunk interface {
|
||||||
// generally not safe to use a chunkIterator concurrently with or after chunk
|
// generally not safe to use a chunkIterator concurrently with or after chunk
|
||||||
// mutation.
|
// mutation.
|
||||||
type chunkIterator interface {
|
type chunkIterator interface {
|
||||||
// length returns the number of samples in the chunk.
|
|
||||||
length() int
|
|
||||||
// Gets the timestamp of the n-th sample in the chunk.
|
|
||||||
timestampAtIndex(int) (model.Time, error)
|
|
||||||
// Gets the last timestamp in the chunk.
|
// Gets the last timestamp in the chunk.
|
||||||
lastTimestamp() (model.Time, error)
|
lastTimestamp() (model.Time, error)
|
||||||
// Gets the sample value of the n-th sample in the chunk.
|
|
||||||
sampleValueAtIndex(int) (model.SampleValue, error)
|
|
||||||
// Gets the last sample value in the chunk.
|
|
||||||
lastSampleValue() (model.SampleValue, error)
|
|
||||||
// Gets the value that is closest before the given time. In case a value
|
|
||||||
// exists at precisely the given time, that value is returned. If no
|
|
||||||
// applicable value exists, ZeroSamplePair is returned.
|
|
||||||
valueAtOrBeforeTime(model.Time) (model.SamplePair, error)
|
|
||||||
// Gets all values contained within a given interval.
|
|
||||||
rangeValues(metric.Interval) ([]model.SamplePair, error)
|
|
||||||
// Whether a given timestamp is contained between first and last value
|
// Whether a given timestamp is contained between first and last value
|
||||||
// in the chunk.
|
// in the chunk.
|
||||||
contains(model.Time) (bool, error)
|
contains(model.Time) (bool, error)
|
||||||
// values returns a channel, from which all sample values in the chunk
|
// Scans the next value in the chunk. Directly after the iterator has
|
||||||
// can be received in order. The channel is closed after the last
|
// been created, the next value is the first value in the
|
||||||
// one. It is generally not safe to mutate the chunk while the channel
|
// chunk. Otherwise, it is the value following the last value scanned or
|
||||||
// is still open. If a value is returned with error!=nil, no further
|
// found (by one of the find... methods). Returns false if either the
|
||||||
// values will be returned and the channel is closed.
|
// end of the chunk is reached or an error has occurred.
|
||||||
values() <-chan struct {
|
scan() bool
|
||||||
model.SamplePair
|
// Finds the most recent value at or before the provided time. Returns
|
||||||
error
|
// false if either the chunk contains no value at or before the provided
|
||||||
|
// time, or an error has occurred.
|
||||||
|
findAtOrBefore(model.Time) bool
|
||||||
|
// Finds the oldest value at or after the provided time. Returns false
|
||||||
|
// if either the chunk contains no value at or after the provided time,
|
||||||
|
// or an error has occurred.
|
||||||
|
findAtOrAfter(model.Time) bool
|
||||||
|
// Returns the last value scanned (by the scan method) or found (by one
|
||||||
|
// of the find... methods). It returns ZeroSamplePair before any of
|
||||||
|
// those methods were called.
|
||||||
|
value() model.SamplePair
|
||||||
|
// Returns the last error encountered. In general, an error signals data
|
||||||
|
// corruption in the chunk and requires quarantining.
|
||||||
|
err() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// rangeValues is a utility function that retrieves all values within the given
|
||||||
|
// range from a chunkIterator.
|
||||||
|
func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) {
|
||||||
|
result := []model.SamplePair{}
|
||||||
|
if !it.findAtOrAfter(in.OldestInclusive) {
|
||||||
|
return result, it.err()
|
||||||
}
|
}
|
||||||
|
for !it.value().Timestamp.After(in.NewestInclusive) {
|
||||||
|
result = append(result, it.value())
|
||||||
|
if !it.scan() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, it.err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
|
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
|
||||||
chunkOps.WithLabelValues(transcode).Inc()
|
chunkOps.WithLabelValues(transcode).Inc()
|
||||||
|
|
||||||
head := dst
|
var (
|
||||||
body := []chunk{}
|
head = dst
|
||||||
for v := range src.newIterator().values() {
|
body, newChunks []chunk
|
||||||
if v.error != nil {
|
err error
|
||||||
return nil, v.error
|
)
|
||||||
}
|
|
||||||
newChunks, err := head.add(v.SamplePair)
|
it := src.newIterator()
|
||||||
if err != nil {
|
for it.scan() {
|
||||||
|
if newChunks, err = head.add(it.value()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
body = append(body, newChunks[:len(newChunks)-1]...)
|
body = append(body, newChunks[:len(newChunks)-1]...)
|
||||||
head = newChunks[len(newChunks)-1]
|
head = newChunks[len(newChunks)-1]
|
||||||
}
|
}
|
||||||
newChunks, err := head.add(s)
|
if it.err() != nil {
|
||||||
if err != nil {
|
return nil, it.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
if newChunks, err = head.add(s); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return append(body, newChunks...), nil
|
return append(body, newChunks...), nil
|
||||||
|
@ -334,3 +353,94 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
|
||||||
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
|
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// indexAccessor allows accesses to samples by index.
|
||||||
|
type indexAccessor interface {
|
||||||
|
timestampAtIndex(int) model.Time
|
||||||
|
sampleValueAtIndex(int) model.SampleValue
|
||||||
|
err() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// indexAccessingChunkIterator is a chunk iterator for chunks for which an
|
||||||
|
// indexAccessor implementation exists.
|
||||||
|
type indexAccessingChunkIterator struct {
|
||||||
|
len int
|
||||||
|
pos int
|
||||||
|
lastValue model.SamplePair
|
||||||
|
acc indexAccessor
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator {
|
||||||
|
return &indexAccessingChunkIterator{
|
||||||
|
len: len,
|
||||||
|
pos: -1,
|
||||||
|
lastValue: ZeroSamplePair,
|
||||||
|
acc: acc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastTimestamp implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) {
|
||||||
|
return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// contains implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) {
|
||||||
|
return !t.Before(it.acc.timestampAtIndex(0)) &&
|
||||||
|
!t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) scan() bool {
|
||||||
|
it.pos++
|
||||||
|
if it.pos >= it.len {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
it.lastValue = model.SamplePair{
|
||||||
|
Timestamp: it.acc.timestampAtIndex(it.pos),
|
||||||
|
Value: it.acc.sampleValueAtIndex(it.pos),
|
||||||
|
}
|
||||||
|
return it.acc.err() == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// findAtOrBefore implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool {
|
||||||
|
i := sort.Search(it.len, func(i int) bool {
|
||||||
|
return it.acc.timestampAtIndex(i).After(t)
|
||||||
|
})
|
||||||
|
if i == 0 || it.acc.err() != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
it.pos = i - 1
|
||||||
|
it.lastValue = model.SamplePair{
|
||||||
|
Timestamp: it.acc.timestampAtIndex(i - 1),
|
||||||
|
Value: it.acc.sampleValueAtIndex(i - 1),
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// findAtOrAfter implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool {
|
||||||
|
i := sort.Search(it.len, func(i int) bool {
|
||||||
|
return !it.acc.timestampAtIndex(i).Before(t)
|
||||||
|
})
|
||||||
|
if i == it.len || it.acc.err() != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
it.pos = i
|
||||||
|
it.lastValue = model.SamplePair{
|
||||||
|
Timestamp: it.acc.timestampAtIndex(i),
|
||||||
|
Value: it.acc.sampleValueAtIndex(i),
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// value implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) value() model.SamplePair {
|
||||||
|
return it.lastValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// err implements chunkIterator.
|
||||||
|
func (it *indexAccessingChunkIterator) err() error {
|
||||||
|
return it.acc.err()
|
||||||
|
}
|
||||||
|
|
|
@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.setDirty(false, nil)
|
p.dirtyMtx.Lock()
|
||||||
|
// Only declare storage clean if it didn't become dirty during crash recovery.
|
||||||
|
if !p.becameDirty {
|
||||||
|
p.dirty = false
|
||||||
|
}
|
||||||
|
p.dirtyMtx.Unlock()
|
||||||
|
|
||||||
log.Warn("Crash recovery complete.")
|
log.Warn("Crash recovery complete.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// The 21-byte header of a delta-encoded chunk looks like:
|
// The 21-byte header of a delta-encoded chunk looks like:
|
||||||
|
@ -201,15 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time {
|
||||||
|
|
||||||
// newIterator implements chunk.
|
// newIterator implements chunk.
|
||||||
func (c *deltaEncodedChunk) newIterator() chunkIterator {
|
func (c *deltaEncodedChunk) newIterator() chunkIterator {
|
||||||
return &deltaEncodedChunkIterator{
|
return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
|
||||||
c: *c,
|
c: *c,
|
||||||
len: c.len(),
|
|
||||||
baseT: c.baseTime(),
|
baseT: c.baseTime(),
|
||||||
baseV: c.baseValue(),
|
baseV: c.baseValue(),
|
||||||
tBytes: c.timeBytes(),
|
tBytes: c.timeBytes(),
|
||||||
vBytes: c.valueBytes(),
|
vBytes: c.valueBytes(),
|
||||||
isInt: c.isInt(),
|
isInt: c.isInt(),
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// marshal implements chunk.
|
// marshal implements chunk.
|
||||||
|
@ -303,184 +299,66 @@ func (c deltaEncodedChunk) len() int {
|
||||||
return (len(c) - deltaHeaderBytes) / c.sampleSize()
|
return (len(c) - deltaHeaderBytes) / c.sampleSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
// deltaEncodedChunkIterator implements chunkIterator.
|
// deltaEncodedIndexAccessor implements indexAccessor.
|
||||||
type deltaEncodedChunkIterator struct {
|
type deltaEncodedIndexAccessor struct {
|
||||||
c deltaEncodedChunk
|
c deltaEncodedChunk
|
||||||
len int
|
|
||||||
baseT model.Time
|
baseT model.Time
|
||||||
baseV model.SampleValue
|
baseV model.SampleValue
|
||||||
tBytes, vBytes deltaBytes
|
tBytes, vBytes deltaBytes
|
||||||
isInt bool
|
isInt bool
|
||||||
|
lastErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// length implements chunkIterator.
|
func (acc *deltaEncodedIndexAccessor) err() error {
|
||||||
func (it *deltaEncodedChunkIterator) length() int { return it.len }
|
return acc.lastErr
|
||||||
|
|
||||||
// valueAtOrBeforeTime implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
|
||||||
var lastErr error
|
|
||||||
i := sort.Search(it.len, func(i int) bool {
|
|
||||||
ts, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return ts.After(t)
|
|
||||||
})
|
|
||||||
if i == 0 || lastErr != nil {
|
|
||||||
return ZeroSamplePair, lastErr
|
|
||||||
}
|
|
||||||
ts, err := it.timestampAtIndex(i - 1)
|
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i - 1)
|
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
return model.SamplePair{Timestamp: ts, Value: v}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// rangeValues implements chunkIterator.
|
func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
|
||||||
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes)
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
oldest := sort.Search(it.len, func(i int) bool {
|
switch acc.tBytes {
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return !t.Before(in.OldestInclusive)
|
|
||||||
})
|
|
||||||
|
|
||||||
newest := sort.Search(it.len, func(i int) bool {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return t.After(in.NewestInclusive)
|
|
||||||
})
|
|
||||||
|
|
||||||
if oldest == it.len || lastErr != nil {
|
|
||||||
return nil, lastErr
|
|
||||||
}
|
|
||||||
|
|
||||||
result := make([]model.SamplePair, 0, newest-oldest)
|
|
||||||
for i := oldest; i < newest; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
result = append(result, model.SamplePair{Timestamp: t, Value: v})
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// contains implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
|
|
||||||
lastT, err := it.timestampAtIndex(it.len - 1)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return !t.Before(it.baseT) && !t.After(lastT), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// values implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) values() <-chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
} {
|
|
||||||
valuesChan := make(chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
})
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < it.len; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{model.SamplePair{Timestamp: t, Value: v}, nil}
|
|
||||||
}
|
|
||||||
close(valuesChan)
|
|
||||||
}()
|
|
||||||
return valuesChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// timestampAtIndex implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
|
|
||||||
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
|
|
||||||
|
|
||||||
switch it.tBytes {
|
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseT + model.Time(uint8(it.c[offset])), nil
|
return acc.baseT + model.Time(uint8(acc.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil
|
return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:]))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil
|
return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:]))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
|
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
|
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
|
||||||
|
return model.Earliest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastTimestamp implements chunkIterator.
|
func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
|
||||||
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
|
offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
|
||||||
return it.timestampAtIndex(it.len - 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sampleValueAtIndex implements chunkIterator.
|
if acc.isInt {
|
||||||
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
|
switch acc.vBytes {
|
||||||
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
|
||||||
|
|
||||||
if it.isInt {
|
|
||||||
switch it.vBytes {
|
|
||||||
case d0:
|
case d0:
|
||||||
return it.baseV, nil
|
return acc.baseV
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseV + model.SampleValue(int8(it.c[offset])), nil
|
return acc.baseV + model.SampleValue(int8(acc.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
|
return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
||||||
// No d8 for ints.
|
// No d8 for ints.
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
|
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch it.vBytes {
|
switch acc.vBytes {
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
|
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
|
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
|
||||||
|
return 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastSampleValue implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
|
|
||||||
return it.sampleValueAtIndex(it.len - 1)
|
|
||||||
}
|
|
||||||
|
|
|
@ -18,11 +18,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// The 37-byte header of a delta-encoded chunk looks like:
|
// The 37-byte header of a delta-encoded chunk looks like:
|
||||||
|
@ -207,9 +204,8 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time {
|
||||||
|
|
||||||
// newIterator implements chunk.
|
// newIterator implements chunk.
|
||||||
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
||||||
return &doubleDeltaEncodedChunkIterator{
|
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
|
||||||
c: *c,
|
c: *c,
|
||||||
len: c.len(),
|
|
||||||
baseT: c.baseTime(),
|
baseT: c.baseTime(),
|
||||||
baseΔT: c.baseTimeDelta(),
|
baseΔT: c.baseTimeDelta(),
|
||||||
baseV: c.baseValue(),
|
baseV: c.baseValue(),
|
||||||
|
@ -217,7 +213,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
||||||
tBytes: c.timeBytes(),
|
tBytes: c.timeBytes(),
|
||||||
vBytes: c.valueBytes(),
|
vBytes: c.valueBytes(),
|
||||||
isInt: c.isInt(),
|
isInt: c.isInt(),
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// marshal implements chunk.
|
// marshal implements chunk.
|
||||||
|
@ -409,223 +405,106 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt
|
||||||
return []chunk{&c}, nil
|
return []chunk{&c}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// doubleDeltaEncodedChunkIterator implements chunkIterator.
|
// doubleDeltaEncodedIndexAccessor implements indexAccessor.
|
||||||
type doubleDeltaEncodedChunkIterator struct {
|
type doubleDeltaEncodedIndexAccessor struct {
|
||||||
c doubleDeltaEncodedChunk
|
c doubleDeltaEncodedChunk
|
||||||
len int
|
|
||||||
baseT, baseΔT model.Time
|
baseT, baseΔT model.Time
|
||||||
baseV, baseΔV model.SampleValue
|
baseV, baseΔV model.SampleValue
|
||||||
tBytes, vBytes deltaBytes
|
tBytes, vBytes deltaBytes
|
||||||
isInt bool
|
isInt bool
|
||||||
|
lastErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// length implements chunkIterator.
|
func (acc *doubleDeltaEncodedIndexAccessor) err() error {
|
||||||
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
|
return acc.lastErr
|
||||||
|
|
||||||
// valueAtOrBeforeTime implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
|
||||||
var lastErr error
|
|
||||||
i := sort.Search(it.len, func(i int) bool {
|
|
||||||
ts, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return ts.After(t)
|
|
||||||
})
|
|
||||||
if i == 0 || lastErr != nil {
|
|
||||||
return ZeroSamplePair, lastErr
|
|
||||||
}
|
|
||||||
ts, err := it.timestampAtIndex(i - 1)
|
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i - 1)
|
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
return model.SamplePair{Timestamp: ts, Value: v}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// rangeValues implements chunkIterator.
|
func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
|
||||||
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
oldest := sort.Search(it.len, func(i int) bool {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return !t.Before(in.OldestInclusive)
|
|
||||||
})
|
|
||||||
|
|
||||||
newest := sort.Search(it.len, func(i int) bool {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return t.After(in.NewestInclusive)
|
|
||||||
})
|
|
||||||
|
|
||||||
if oldest == it.len || lastErr != nil {
|
|
||||||
return nil, lastErr
|
|
||||||
}
|
|
||||||
|
|
||||||
result := make([]model.SamplePair, 0, newest-oldest)
|
|
||||||
for i := oldest; i < newest; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
result = append(result, model.SamplePair{Timestamp: t, Value: v})
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// contains implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
|
|
||||||
lastT, err := it.timestampAtIndex(it.len - 1)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return !t.Before(it.baseT) && !t.After(lastT), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// values implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
} {
|
|
||||||
valuesChan := make(chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
})
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < it.len; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{model.SamplePair{Timestamp: t, Value: v}, nil}
|
|
||||||
}
|
|
||||||
close(valuesChan)
|
|
||||||
}()
|
|
||||||
return valuesChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// timestampAtIndex implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
|
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
return it.baseT, nil
|
return acc.baseT
|
||||||
}
|
}
|
||||||
if idx == 1 {
|
if idx == 1 {
|
||||||
// If time bytes are at d8, the time is saved directly rather
|
// If time bytes are at d8, the time is saved directly rather
|
||||||
// than as a difference.
|
// than as a difference.
|
||||||
if it.tBytes == d8 {
|
if acc.tBytes == d8 {
|
||||||
return it.baseΔT, nil
|
return acc.baseΔT
|
||||||
}
|
}
|
||||||
return it.baseT + it.baseΔT, nil
|
return acc.baseT + acc.baseΔT
|
||||||
}
|
}
|
||||||
|
|
||||||
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes)
|
||||||
|
|
||||||
switch it.tBytes {
|
switch acc.tBytes {
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseT +
|
return acc.baseT +
|
||||||
model.Time(idx)*it.baseΔT +
|
model.Time(idx)*acc.baseΔT +
|
||||||
model.Time(int8(it.c[offset])), nil
|
model.Time(int8(acc.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseT +
|
return acc.baseT +
|
||||||
model.Time(idx)*it.baseΔT +
|
model.Time(idx)*acc.baseΔT +
|
||||||
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
|
model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseT +
|
return acc.baseT +
|
||||||
model.Time(idx)*it.baseΔT +
|
model.Time(idx)*acc.baseΔT +
|
||||||
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
|
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
|
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
|
||||||
|
return model.Earliest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastTimestamp implements chunkIterator.
|
func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
|
||||||
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
|
|
||||||
return it.timestampAtIndex(it.len - 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sampleValueAtIndex implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
|
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
return it.baseV, nil
|
return acc.baseV
|
||||||
}
|
}
|
||||||
if idx == 1 {
|
if idx == 1 {
|
||||||
// If value bytes are at d8, the value is saved directly rather
|
// If value bytes are at d8, the value is saved directly rather
|
||||||
// than as a difference.
|
// than as a difference.
|
||||||
if it.vBytes == d8 {
|
if acc.vBytes == d8 {
|
||||||
return it.baseΔV, nil
|
return acc.baseΔV
|
||||||
}
|
}
|
||||||
return it.baseV + it.baseΔV, nil
|
return acc.baseV + acc.baseΔV
|
||||||
}
|
}
|
||||||
|
|
||||||
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
|
||||||
|
|
||||||
if it.isInt {
|
if acc.isInt {
|
||||||
switch it.vBytes {
|
switch acc.vBytes {
|
||||||
case d0:
|
case d0:
|
||||||
return it.baseV +
|
return acc.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV, nil
|
model.SampleValue(idx)*acc.baseΔV
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseV +
|
return acc.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*acc.baseΔV +
|
||||||
model.SampleValue(int8(it.c[offset])), nil
|
model.SampleValue(int8(acc.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseV +
|
return acc.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*acc.baseΔV +
|
||||||
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
|
model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV +
|
return acc.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*acc.baseΔV +
|
||||||
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
||||||
// No d8 for ints.
|
// No d8 for ints.
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
|
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
|
||||||
|
return 0
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch it.vBytes {
|
switch acc.vBytes {
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV +
|
return acc.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*acc.baseΔV +
|
||||||
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
|
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
|
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
|
||||||
|
return 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastSampleValue implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
|
|
||||||
return it.sampleValueAtIndex(it.len - 1)
|
|
||||||
}
|
|
||||||
|
|
|
@ -40,20 +40,22 @@ type Storage interface {
|
||||||
// NewPreloader returns a new Preloader which allows preloading and pinning
|
// NewPreloader returns a new Preloader which allows preloading and pinning
|
||||||
// series data into memory for use within a query.
|
// series data into memory for use within a query.
|
||||||
NewPreloader() Preloader
|
NewPreloader() Preloader
|
||||||
// MetricsForLabelMatchers returns the metrics from storage that satisfy the given
|
// MetricsForLabelMatchers returns the metrics from storage that satisfy
|
||||||
// label matchers. At least one label matcher must be specified that does not
|
// the given label matchers. At least one label matcher must be
|
||||||
// match the empty string.
|
// specified that does not match the empty string. The times from and
|
||||||
MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric
|
// through are hints for the storage to optimize the search. The storage
|
||||||
// LastSamplePairForFingerprint returns the last sample pair that has
|
// MAY exclude metrics that have no samples in the specified interval
|
||||||
// been ingested for the provided fingerprint. If this instance of the
|
// from the returned map. In doubt, specify model.Earliest for from and
|
||||||
|
// model.Latest for through.
|
||||||
|
MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric
|
||||||
|
// LastSampleForFingerprint returns the last sample that has been
|
||||||
|
// ingested for the provided fingerprint. If this instance of the
|
||||||
// Storage has never ingested a sample for the provided fingerprint (or
|
// Storage has never ingested a sample for the provided fingerprint (or
|
||||||
// the last ingestion is so long ago that the series has been archived),
|
// the last ingestion is so long ago that the series has been archived),
|
||||||
// ZeroSamplePair is returned.
|
// ZeroSample is returned.
|
||||||
LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair
|
LastSampleForFingerprint(model.Fingerprint) model.Sample
|
||||||
// Get all of the label values that are associated with a given label name.
|
// Get all of the label values that are associated with a given label name.
|
||||||
LabelValuesForLabelName(model.LabelName) model.LabelValues
|
LabelValuesForLabelName(model.LabelName) model.LabelValues
|
||||||
// Get the metric associated with the provided fingerprint.
|
|
||||||
MetricForFingerprint(model.Fingerprint) metric.Metric
|
|
||||||
// Drop all time series associated with the given fingerprints.
|
// Drop all time series associated with the given fingerprints.
|
||||||
DropMetricsForFingerprints(...model.Fingerprint)
|
DropMetricsForFingerprints(...model.Fingerprint)
|
||||||
// Run the various maintenance loops in goroutines. Returns when the
|
// Run the various maintenance loops in goroutines. Returns when the
|
||||||
|
@ -89,7 +91,7 @@ type SeriesIterator interface {
|
||||||
type Preloader interface {
|
type Preloader interface {
|
||||||
PreloadRange(
|
PreloadRange(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from model.Time, through model.Time,
|
from, through model.Time,
|
||||||
) SeriesIterator
|
) SeriesIterator
|
||||||
PreloadInstant(
|
PreloadInstant(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
|
@ -100,8 +102,15 @@ type Preloader interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local
|
// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local
|
||||||
// package to signal a non-existing sample. It is a SamplePair with timestamp
|
// package to signal a non-existing sample pair. It is a SamplePair with
|
||||||
// model.Earliest and value 0.0. Note that the natural zero value of SamplePair
|
// timestamp model.Earliest and value 0.0. Note that the natural zero value of
|
||||||
// has a timestamp of 0, which is possible to appear in a real SamplePair and
|
// SamplePair has a timestamp of 0, which is possible to appear in a real
|
||||||
// thus not suitable to signal a non-existing SamplePair.
|
// SamplePair and thus not suitable to signal a non-existing SamplePair.
|
||||||
var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest}
|
var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest}
|
||||||
|
|
||||||
|
// ZeroSample is the pseudo zero-value of model.Sample used by the local package
|
||||||
|
// to signal a non-existing sample. It is a Sample with timestamp
|
||||||
|
// model.Earliest, value 0.0, and metric nil. Note that the natural zero value
|
||||||
|
// of Sample has a timestamp of 0, which is possible to appear in a real
|
||||||
|
// Sample and thus not suitable to signal a non-existing Sample.
|
||||||
|
var ZeroSample = model.Sample{Timestamp: model.Earliest}
|
||||||
|
|
|
@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool {
|
||||||
return p.dirty
|
return p.dirty
|
||||||
}
|
}
|
||||||
|
|
||||||
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was
|
// setDirty flags the storage as dirty in a goroutine-safe way. The provided
|
||||||
// set to true with this method, it cannot be set to false again. (If we became
|
// error will be logged as a reason the first time the storage is flagged as dirty.
|
||||||
// dirty during our runtime, there is no way back. If we were dirty from the
|
func (p *persistence) setDirty(err error) {
|
||||||
// start, a clean-up might make us clean again.) The provided error will be
|
p.dirtyCounter.Inc()
|
||||||
// logged as a reason if dirty is true.
|
|
||||||
func (p *persistence) setDirty(dirty bool, err error) {
|
|
||||||
if dirty {
|
|
||||||
p.dirtyCounter.Inc()
|
|
||||||
}
|
|
||||||
p.dirtyMtx.Lock()
|
p.dirtyMtx.Lock()
|
||||||
defer p.dirtyMtx.Unlock()
|
defer p.dirtyMtx.Unlock()
|
||||||
if p.becameDirty {
|
if p.becameDirty {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.dirty = dirty
|
p.dirty = true
|
||||||
if dirty {
|
p.becameDirty = true
|
||||||
p.becameDirty = true
|
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
|
||||||
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fingerprintsForLabelPair returns the fingerprints for the given label
|
// fingerprintsForLabelPair returns the fingerprints for the given label
|
||||||
// pair. This method is goroutine-safe but take into account that metrics queued
|
// pair. This method is goroutine-safe but take into account that metrics queued
|
||||||
// for indexing with IndexMetric might not have made it into the index
|
// for indexing with IndexMetric might not have made it into the index
|
||||||
// yet. (Same applies correspondingly to UnindexMetric.)
|
// yet. (Same applies correspondingly to UnindexMetric.)
|
||||||
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) {
|
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints {
|
||||||
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
|
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return fps, nil
|
return fps
|
||||||
}
|
}
|
||||||
|
|
||||||
// labelValuesForLabelName returns the label values for the given label
|
// labelValuesForLabelName returns the label values for the given label
|
||||||
// name. This method is goroutine-safe but take into account that metrics queued
|
// name. This method is goroutine-safe but take into account that metrics queued
|
||||||
// for indexing with IndexMetric might not have made it into the index
|
// for indexing with IndexMetric might not have made it into the index
|
||||||
// yet. (Same applies correspondingly to UnindexMetric.)
|
// yet. (Same applies correspondingly to UnindexMetric.)
|
||||||
func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) {
|
func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues {
|
||||||
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
|
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return lvs, nil
|
return lvs
|
||||||
}
|
}
|
||||||
|
|
||||||
// persistChunks persists a number of consecutive chunks of a series. It is the
|
// persistChunks persists a number of consecutive chunks of a series. It is the
|
||||||
|
@ -1008,29 +1003,28 @@ func (p *persistence) waitForIndexing() {
|
||||||
// the metric. The caller must have locked the fingerprint.
|
// the metric. The caller must have locked the fingerprint.
|
||||||
func (p *persistence) archiveMetric(
|
func (p *persistence) archiveMetric(
|
||||||
fp model.Fingerprint, m model.Metric, first, last model.Time,
|
fp model.Fingerprint, m model.Metric, first, last model.Time,
|
||||||
) error {
|
) {
|
||||||
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
|
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
|
||||||
p.setDirty(true, err)
|
p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err))
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
|
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
|
||||||
p.setDirty(true, err)
|
p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err))
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// hasArchivedMetric returns whether the archived metric for the given
|
// hasArchivedMetric returns whether the archived metric for the given
|
||||||
// fingerprint exists and if yes, what the first and last timestamp in the
|
// fingerprint exists and if yes, what the first and last timestamp in the
|
||||||
// corresponding series is. This method is goroutine-safe.
|
// corresponding series is. This method is goroutine-safe.
|
||||||
func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
|
func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
|
||||||
hasMetric bool, firstTime, lastTime model.Time, err error,
|
hasMetric bool, firstTime, lastTime model.Time,
|
||||||
) {
|
) {
|
||||||
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
|
firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.setDirty(true, err)
|
p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err))
|
||||||
|
hasMetric = false
|
||||||
}
|
}
|
||||||
return
|
return hasMetric, firstTime, lastTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateArchivedTimeRange updates an archived time range. The caller must make
|
// updateArchivedTimeRange updates an archived time range. The caller must make
|
||||||
|
@ -1068,7 +1062,11 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model
|
||||||
// method is goroutine-safe.
|
// method is goroutine-safe.
|
||||||
func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) {
|
func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) {
|
||||||
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
|
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
|
||||||
return metric, err
|
if err != nil {
|
||||||
|
p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return metric, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// purgeArchivedMetric deletes an archived fingerprint and its corresponding
|
// purgeArchivedMetric deletes an archived fingerprint and its corresponding
|
||||||
|
@ -1078,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error)
|
||||||
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
|
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err))
|
p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -82,14 +82,14 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint
|
||||||
}
|
}
|
||||||
|
|
||||||
func chunksEqual(c1, c2 chunk) bool {
|
func chunksEqual(c1, c2 chunk) bool {
|
||||||
values2 := c2.newIterator().values()
|
it1 := c1.newIterator()
|
||||||
for v1 := range c1.newIterator().values() {
|
it2 := c2.newIterator()
|
||||||
v2 := <-values2
|
for it1.scan() && it2.scan() {
|
||||||
if !(v1 == v2) {
|
if !(it1.value() == it2.value()) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return it1.err() == nil && it2.err() == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
|
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
|
||||||
|
@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
||||||
p.indexMetric(2, m2)
|
p.indexMetric(2, m2)
|
||||||
p.waitForIndexing()
|
p.waitForIndexing()
|
||||||
|
|
||||||
outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
|
outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
want := model.Fingerprints{1}
|
want := model.Fingerprints{1}
|
||||||
if !reflect.DeepEqual(outFPs, want) {
|
if !reflect.DeepEqual(outFPs, want) {
|
||||||
t.Errorf("want %#v, got %#v", want, outFPs)
|
t.Errorf("want %#v, got %#v", want, outFPs)
|
||||||
}
|
}
|
||||||
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
|
outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
want = model.Fingerprints{2}
|
want = model.Fingerprints{2}
|
||||||
if !reflect.DeepEqual(outFPs, want) {
|
if !reflect.DeepEqual(outFPs, want) {
|
||||||
t.Errorf("want %#v, got %#v", want, outFPs)
|
t.Errorf("want %#v, got %#v", want, outFPs)
|
||||||
}
|
}
|
||||||
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived {
|
if archived, _, _ := p.hasArchivedMetric(1); !archived {
|
||||||
t.Error("want FP 1 archived")
|
t.Error("want FP 1 archived")
|
||||||
}
|
}
|
||||||
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
|
if archived, _, _ := p.hasArchivedMetric(2); !archived {
|
||||||
t.Error("want FP 2 archived")
|
t.Error("want FP 2 archived")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != p.purgeArchivedMetric(1) {
|
if err := p.purgeArchivedMetric(1); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err != p.purgeArchivedMetric(3) {
|
if err := p.purgeArchivedMetric(3); err != nil {
|
||||||
// Purging something that has not beet archived is not an error.
|
// Purging something that has not beet archived is not an error.
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
p.waitForIndexing()
|
p.waitForIndexing()
|
||||||
|
|
||||||
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
|
outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
want = nil
|
want = nil
|
||||||
if !reflect.DeepEqual(outFPs, want) {
|
if !reflect.DeepEqual(outFPs, want) {
|
||||||
t.Errorf("want %#v, got %#v", want, outFPs)
|
t.Errorf("want %#v, got %#v", want, outFPs)
|
||||||
}
|
}
|
||||||
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
|
outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
want = model.Fingerprints{2}
|
want = model.Fingerprints{2}
|
||||||
if !reflect.DeepEqual(outFPs, want) {
|
if !reflect.DeepEqual(outFPs, want) {
|
||||||
t.Errorf("want %#v, got %#v", want, outFPs)
|
t.Errorf("want %#v, got %#v", want, outFPs)
|
||||||
}
|
}
|
||||||
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived {
|
if archived, _, _ := p.hasArchivedMetric(1); archived {
|
||||||
t.Error("want FP 1 not archived")
|
t.Error("want FP 1 not archived")
|
||||||
}
|
}
|
||||||
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
|
if archived, _, _ := p.hasArchivedMetric(2); !archived {
|
||||||
t.Error("want FP 2 archived")
|
t.Error("want FP 2 archived")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) {
|
||||||
for i, b := range batches {
|
for i, b := range batches {
|
||||||
for fp, m := range b.fpToMetric {
|
for fp, m := range b.fpToMetric {
|
||||||
p.indexMetric(fp, m)
|
p.indexMetric(fp, m)
|
||||||
if err := p.archiveMetric(fp, m, 1, 2); err != nil {
|
p.archiveMetric(fp, m, 1, 2)
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
indexedFpsToMetrics[fp] = m
|
indexedFpsToMetrics[fp] = m
|
||||||
}
|
}
|
||||||
verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
|
verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
|
||||||
|
@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that archived metrics are in membership index.
|
// Check that archived metrics are in membership index.
|
||||||
has, first, last, err := p.hasArchivedMetric(fp)
|
has, first, last := p.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !has {
|
if !has {
|
||||||
t.Errorf("%d. fingerprint %v not found", i, fp)
|
t.Errorf("%d. fingerprint %v not found", i, fp)
|
||||||
}
|
}
|
||||||
|
@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
|
||||||
|
|
||||||
// Compare label name -> label values mappings.
|
// Compare label name -> label values mappings.
|
||||||
for ln, lvs := range b.expectedLnToLvs {
|
for ln, lvs := range b.expectedLnToLvs {
|
||||||
outLvs, err := p.labelValuesForLabelName(ln)
|
outLvs := p.labelValuesForLabelName(ln)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
outSet := codable.LabelValueSet{}
|
outSet := codable.LabelValueSet{}
|
||||||
for _, lv := range outLvs {
|
for _, lv := range outLvs {
|
||||||
|
@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
|
||||||
|
|
||||||
// Compare label pair -> fingerprints mappings.
|
// Compare label pair -> fingerprints mappings.
|
||||||
for lp, fps := range b.expectedLpToFps {
|
for lp, fps := range b.expectedLpToFps {
|
||||||
outFPs, err := p.fingerprintsForLabelPair(lp)
|
outFPs := p.fingerprintsForLabelPair(lp)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
outSet := codable.FingerprintSet{}
|
outSet := codable.FingerprintSet{}
|
||||||
for _, fp := range outFPs {
|
for _, fp := range outFPs {
|
||||||
|
|
|
@ -557,12 +557,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
|
||||||
return ZeroSamplePair
|
return ZeroSamplePair
|
||||||
}
|
}
|
||||||
if containsT {
|
if containsT {
|
||||||
value, err := it.chunkIt.valueAtOrBeforeTime(t)
|
if it.chunkIt.findAtOrBefore(t) {
|
||||||
if err != nil {
|
return it.chunkIt.value()
|
||||||
it.quarantine(err)
|
|
||||||
return ZeroSamplePair
|
|
||||||
}
|
}
|
||||||
return value
|
if it.chunkIt.err() != nil {
|
||||||
|
it.quarantine(it.chunkIt.err())
|
||||||
|
}
|
||||||
|
return ZeroSamplePair
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -580,12 +581,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
|
||||||
return ZeroSamplePair
|
return ZeroSamplePair
|
||||||
}
|
}
|
||||||
it.chunkIt = it.chunkIterator(l - i)
|
it.chunkIt = it.chunkIterator(l - i)
|
||||||
value, err := it.chunkIt.valueAtOrBeforeTime(t)
|
if it.chunkIt.findAtOrBefore(t) {
|
||||||
if err != nil {
|
return it.chunkIt.value()
|
||||||
it.quarantine(err)
|
|
||||||
return ZeroSamplePair
|
|
||||||
}
|
}
|
||||||
return value
|
if it.chunkIt.err() != nil {
|
||||||
|
it.quarantine(it.chunkIt.err())
|
||||||
|
}
|
||||||
|
return ZeroSamplePair
|
||||||
}
|
}
|
||||||
|
|
||||||
// RangeValues implements SeriesIterator.
|
// RangeValues implements SeriesIterator.
|
||||||
|
@ -612,7 +614,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
|
||||||
if c.firstTime().After(in.NewestInclusive) {
|
if c.firstTime().After(in.NewestInclusive) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
chValues, err := it.chunkIterator(i + j).rangeValues(in)
|
chValues, err := rangeValues(it.chunkIterator(i+j), in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
it.quarantine(err)
|
it.quarantine(err)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -129,12 +129,13 @@ const (
|
||||||
type syncStrategy func() bool
|
type syncStrategy func() bool
|
||||||
|
|
||||||
type memorySeriesStorage struct {
|
type memorySeriesStorage struct {
|
||||||
// numChunksToPersist has to be aligned for atomic operations.
|
// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.
|
||||||
numChunksToPersist int64 // The number of chunks waiting for persistence.
|
archiveHighWatermark model.Time // No archived series has samples after this time.
|
||||||
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
|
numChunksToPersist int64 // The number of chunks waiting for persistence.
|
||||||
rushed bool // Whether the storage is in rushed mode.
|
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
|
||||||
rushedMtx sync.Mutex // Protects entering and exiting rushed mode.
|
rushed bool // Whether the storage is in rushed mode.
|
||||||
throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).
|
rushedMtx sync.Mutex // Protects entering and exiting rushed mode.
|
||||||
|
throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).
|
||||||
|
|
||||||
fpLocker *fingerprintLocker
|
fpLocker *fingerprintLocker
|
||||||
fpToSeries *seriesMap
|
fpToSeries *seriesMap
|
||||||
|
@ -158,15 +159,15 @@ type memorySeriesStorage struct {
|
||||||
quarantineRequests chan quarantineRequest
|
quarantineRequests chan quarantineRequest
|
||||||
quarantineStopping, quarantineStopped chan struct{}
|
quarantineStopping, quarantineStopped chan struct{}
|
||||||
|
|
||||||
persistErrors prometheus.Counter
|
persistErrors prometheus.Counter
|
||||||
numSeries prometheus.Gauge
|
numSeries prometheus.Gauge
|
||||||
seriesOps *prometheus.CounterVec
|
seriesOps *prometheus.CounterVec
|
||||||
ingestedSamplesCount prometheus.Counter
|
ingestedSamplesCount prometheus.Counter
|
||||||
outOfOrderSamplesCount prometheus.Counter
|
outOfOrderSamplesCount prometheus.Counter
|
||||||
invalidPreloadRequestsCount prometheus.Counter
|
nonExistentSeriesMatchesCount prometheus.Counter
|
||||||
maintainSeriesDuration *prometheus.SummaryVec
|
maintainSeriesDuration *prometheus.SummaryVec
|
||||||
persistenceUrgencyScore prometheus.Gauge
|
persistenceUrgencyScore prometheus.Gauge
|
||||||
rushedMode prometheus.Gauge
|
rushedMode prometheus.Gauge
|
||||||
}
|
}
|
||||||
|
|
||||||
// MemorySeriesStorageOptions contains options needed by
|
// MemorySeriesStorageOptions contains options needed by
|
||||||
|
@ -201,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
|
||||||
dropAfter: o.PersistenceRetentionPeriod,
|
dropAfter: o.PersistenceRetentionPeriod,
|
||||||
checkpointInterval: o.CheckpointInterval,
|
checkpointInterval: o.CheckpointInterval,
|
||||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||||
|
archiveHighWatermark: model.Now().Add(-headChunkTimeout),
|
||||||
|
|
||||||
maxChunksToPersist: o.MaxChunksToPersist,
|
maxChunksToPersist: o.MaxChunksToPersist,
|
||||||
|
|
||||||
|
@ -246,11 +248,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
|
||||||
Name: "out_of_order_samples_total",
|
Name: "out_of_order_samples_total",
|
||||||
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
|
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
|
||||||
}),
|
}),
|
||||||
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
|
nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "invalid_preload_requests_total",
|
Name: "non_existent_series_matches_total",
|
||||||
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
|
Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.",
|
||||||
}),
|
}),
|
||||||
maintainSeriesDuration: prometheus.NewSummaryVec(
|
maintainSeriesDuration: prometheus.NewSummaryVec(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
|
@ -368,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastSampleForFingerprint implements Storage.
|
// LastSampleForFingerprint implements Storage.
|
||||||
func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair {
|
func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample {
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, ok := s.fpToSeries.get(fp)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ZeroSamplePair
|
return ZeroSample
|
||||||
|
}
|
||||||
|
sp := series.lastSamplePair()
|
||||||
|
return model.Sample{
|
||||||
|
Metric: series.metric,
|
||||||
|
Value: sp.Value,
|
||||||
|
Timestamp: sp.Timestamp,
|
||||||
}
|
}
|
||||||
return series.lastSamplePair()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// boundedIterator wraps a SeriesIterator and does not allow fetching
|
// boundedIterator wraps a SeriesIterator and does not allow fetching
|
||||||
|
@ -418,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
|
||||||
var result map[model.Fingerprint]struct{}
|
var result map[model.Fingerprint]struct{}
|
||||||
for _, pair := range pairs {
|
for _, pair := range pairs {
|
||||||
intersection := map[model.Fingerprint]struct{}{}
|
intersection := map[model.Fingerprint]struct{}{}
|
||||||
fps, err := s.persistence.fingerprintsForLabelPair(pair)
|
fps := s.persistence.fingerprintsForLabelPair(pair)
|
||||||
if err != nil {
|
|
||||||
log.Error("Error getting fingerprints for label pair: ", err)
|
|
||||||
}
|
|
||||||
if len(fps) == 0 {
|
if len(fps) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -439,7 +443,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetricsForLabelMatchers implements Storage.
|
// MetricsForLabelMatchers implements Storage.
|
||||||
func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric {
|
func (s *memorySeriesStorage) MetricsForLabelMatchers(
|
||||||
|
from, through model.Time,
|
||||||
|
matchers ...*metric.LabelMatcher,
|
||||||
|
) map[model.Fingerprint]metric.Metric {
|
||||||
var (
|
var (
|
||||||
equals []model.LabelPair
|
equals []model.LabelPair
|
||||||
filters []*metric.LabelMatcher
|
filters []*metric.LabelMatcher
|
||||||
|
@ -491,9 +498,13 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM
|
||||||
filters = remaining
|
filters = remaining
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make(map[model.Fingerprint]metric.Metric, len(resFPs))
|
result := map[model.Fingerprint]metric.Metric{}
|
||||||
for fp := range resFPs {
|
for fp := range resFPs {
|
||||||
result[fp] = s.MetricForFingerprint(fp)
|
s.fpLocker.Lock(fp)
|
||||||
|
if met, _, ok := s.metricForRange(fp, from, through); ok {
|
||||||
|
result[fp] = metric.Metric{Metric: met}
|
||||||
|
}
|
||||||
|
s.fpLocker.Unlock(fp)
|
||||||
}
|
}
|
||||||
for _, matcher := range filters {
|
for _, matcher := range filters {
|
||||||
for fp, met := range result {
|
for fp, met := range result {
|
||||||
|
@ -505,37 +516,55 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValuesForLabelName implements Storage.
|
// metricForRange returns the metric for the given fingerprint if the
|
||||||
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues {
|
// corresponding time series has samples between 'from' and 'through', together
|
||||||
lvs, err := s.persistence.labelValuesForLabelName(labelName)
|
// with a pointer to the series if it is in memory already. For a series that
|
||||||
if err != nil {
|
// does not have samples between 'from' and 'through', the returned bool is
|
||||||
log.Errorf("Error getting label values for label name %q: %v", labelName, err)
|
// false. For an archived series that does contain samples between 'from' and
|
||||||
}
|
// 'through', it returns (metric, nil, true).
|
||||||
return lvs
|
//
|
||||||
}
|
// The caller must have locked the fp.
|
||||||
|
func (s *memorySeriesStorage) metricForRange(
|
||||||
// MetricForFingerprint implements Storage.
|
fp model.Fingerprint,
|
||||||
func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.Metric {
|
from, through model.Time,
|
||||||
s.fpLocker.Lock(fp)
|
) (model.Metric, *memorySeries, bool) {
|
||||||
defer s.fpLocker.Unlock(fp)
|
|
||||||
|
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, ok := s.fpToSeries.get(fp)
|
||||||
if ok {
|
if ok {
|
||||||
// Wrap the returned metric in a copy-on-write (COW) metric here because
|
if series.lastTime.Before(from) || series.firstTime().After(through) {
|
||||||
// the caller might mutate it.
|
return nil, nil, false
|
||||||
return metric.Metric{
|
}
|
||||||
Metric: series.metric,
|
return series.metric, series, true
|
||||||
|
}
|
||||||
|
// From here on, we are only concerned with archived metrics.
|
||||||
|
// If the high watermark of archived series is before 'from', we are done.
|
||||||
|
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
|
||||||
|
if watermark < from {
|
||||||
|
return nil, nil, false
|
||||||
|
}
|
||||||
|
if from.After(model.Earliest) || through.Before(model.Latest) {
|
||||||
|
// The range lookup is relatively cheap, so let's do it first if
|
||||||
|
// we have a chance the archived metric is not in the range.
|
||||||
|
has, first, last := s.persistence.hasArchivedMetric(fp)
|
||||||
|
if !has {
|
||||||
|
s.nonExistentSeriesMatchesCount.Inc()
|
||||||
|
return nil, nil, false
|
||||||
|
}
|
||||||
|
if first.After(through) || last.Before(from) {
|
||||||
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
met, err := s.persistence.archivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return metric.Metric{
|
metric, err := s.persistence.archivedMetric(fp)
|
||||||
Metric: met,
|
if err != nil {
|
||||||
Copied: false,
|
// archivedMetric has already flagged the storage as dirty in this case.
|
||||||
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
|
return metric, nil, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// LabelValuesForLabelName implements Storage.
|
||||||
|
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues {
|
||||||
|
return s.persistence.labelValuesForLabelName(labelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DropMetric implements Storage.
|
// DropMetric implements Storage.
|
||||||
|
@ -563,7 +592,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
|
||||||
s.fpLocker.Unlock(fp)
|
s.fpLocker.Unlock(fp)
|
||||||
}() // Func wrapper because fp might change below.
|
}() // Func wrapper because fp might change below.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
|
s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if fp != rawFP {
|
if fp != rawFP {
|
||||||
|
@ -696,36 +725,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
|
||||||
return series, nil
|
return series, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
|
// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
|
||||||
func (s *memorySeriesStorage) getSeriesForRange(
|
//
|
||||||
|
// The caller must have locked the fp.
|
||||||
|
func (s *memorySeriesStorage) seriesForRange(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from model.Time, through model.Time,
|
from model.Time, through model.Time,
|
||||||
) *memorySeries {
|
) *memorySeries {
|
||||||
series, ok := s.fpToSeries.get(fp)
|
metric, series, ok := s.metricForRange(fp, from, through)
|
||||||
if ok {
|
if !ok {
|
||||||
return series
|
|
||||||
}
|
|
||||||
has, first, last, err := s.persistence.hasArchivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !has {
|
if series == nil {
|
||||||
s.invalidPreloadRequestsCount.Inc()
|
series, _ = s.getOrCreateSeries(fp, metric)
|
||||||
return nil
|
// getOrCreateSeries took care of quarantining already, so ignore the error.
|
||||||
}
|
|
||||||
if last.Before(from) || first.After(through) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
metric, err := s.persistence.archivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
series, err = s.getOrCreateSeries(fp, metric)
|
|
||||||
if err != nil {
|
|
||||||
// getOrCreateSeries took care of quarantining already.
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return series
|
return series
|
||||||
}
|
}
|
||||||
|
@ -737,7 +750,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
series := s.getSeriesForRange(fp, from, through)
|
series := s.seriesForRange(fp, from, through)
|
||||||
if series == nil {
|
if series == nil {
|
||||||
return nil, nopIter
|
return nil, nopIter
|
||||||
}
|
}
|
||||||
|
@ -756,7 +769,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant(
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
series := s.getSeriesForRange(fp, from, through)
|
series := s.seriesForRange(fp, from, through)
|
||||||
if series == nil {
|
if series == nil {
|
||||||
return nil, nopIter
|
return nil, nopIter
|
||||||
}
|
}
|
||||||
|
@ -1107,17 +1120,22 @@ func (s *memorySeriesStorage) maintainMemorySeries(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Archive if all chunks are evicted.
|
// Archive if all chunks are evicted. Also make sure the last sample has
|
||||||
if iOldestNotEvicted == -1 {
|
// an age of at least headChunkTimeout (which is very likely anyway).
|
||||||
|
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {
|
||||||
s.fpToSeries.del(fp)
|
s.fpToSeries.del(fp)
|
||||||
s.numSeries.Dec()
|
s.numSeries.Dec()
|
||||||
if err := s.persistence.archiveMetric(
|
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
|
||||||
fp, series.metric, series.firstTime(), series.lastTime,
|
|
||||||
); err != nil {
|
|
||||||
log.Errorf("Error archiving metric %v: %v", series.metric, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.seriesOps.WithLabelValues(archive).Inc()
|
s.seriesOps.WithLabelValues(archive).Inc()
|
||||||
|
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
|
||||||
|
if oldWatermark < int64(series.lastTime) {
|
||||||
|
if !atomic.CompareAndSwapInt64(
|
||||||
|
(*int64)(&s.archiveHighWatermark),
|
||||||
|
oldWatermark, int64(series.lastTime),
|
||||||
|
) {
|
||||||
|
panic("s.archiveHighWatermark modified outside of maintainMemorySeries")
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// If we are here, the series is not archived, so check for chunkDesc
|
// If we are here, the series is not archived, so check for chunkDesc
|
||||||
|
@ -1228,11 +1246,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp)
|
has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
|
||||||
log.Error("Error looking up archived time range: ", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !has || !firstTime.Before(beforeTime) {
|
if !has || !firstTime.Before(beforeTime) {
|
||||||
// Oldest sample not old enough, or metric purged or unarchived in the meantime.
|
// Oldest sample not old enough, or metric purged or unarchived in the meantime.
|
||||||
return
|
return
|
||||||
|
@ -1245,10 +1259,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
|
||||||
log.Error("Error dropping persisted chunks: ", err)
|
log.Error("Error dropping persisted chunks: ", err)
|
||||||
}
|
}
|
||||||
if allDropped {
|
if allDropped {
|
||||||
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
|
s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do.
|
||||||
log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.seriesOps.WithLabelValues(archivePurge).Inc()
|
s.seriesOps.WithLabelValues(archivePurge).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1437,13 +1448,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
|
||||||
s.incNumChunksToPersist(-numChunksNotYetPersisted)
|
s.incNumChunksToPersist(-numChunksNotYetPersisted)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
|
s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do.
|
||||||
log.
|
|
||||||
With("fingerprint", fp).
|
|
||||||
With("metric", m).
|
|
||||||
With("error", err).
|
|
||||||
Error("Error purging metric from archive.")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if m != nil {
|
if m != nil {
|
||||||
// If we know a metric now, unindex it in any case.
|
// If we know a metric now, unindex it in any case.
|
||||||
|
@ -1491,7 +1496,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||||
s.seriesOps.Describe(ch)
|
s.seriesOps.Describe(ch)
|
||||||
ch <- s.ingestedSamplesCount.Desc()
|
ch <- s.ingestedSamplesCount.Desc()
|
||||||
ch <- s.outOfOrderSamplesCount.Desc()
|
ch <- s.outOfOrderSamplesCount.Desc()
|
||||||
ch <- s.invalidPreloadRequestsCount.Desc()
|
ch <- s.nonExistentSeriesMatchesCount.Desc()
|
||||||
ch <- numMemChunksDesc
|
ch <- numMemChunksDesc
|
||||||
s.maintainSeriesDuration.Describe(ch)
|
s.maintainSeriesDuration.Describe(ch)
|
||||||
ch <- s.persistenceUrgencyScore.Desc()
|
ch <- s.persistenceUrgencyScore.Desc()
|
||||||
|
@ -1518,7 +1523,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
|
||||||
s.seriesOps.Collect(ch)
|
s.seriesOps.Collect(ch)
|
||||||
ch <- s.ingestedSamplesCount
|
ch <- s.ingestedSamplesCount
|
||||||
ch <- s.outOfOrderSamplesCount
|
ch <- s.outOfOrderSamplesCount
|
||||||
ch <- s.invalidPreloadRequestsCount
|
ch <- s.nonExistentSeriesMatchesCount
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
numMemChunksDesc,
|
numMemChunksDesc,
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
|
|
|
@ -34,6 +34,7 @@ func TestMatches(t *testing.T) {
|
||||||
storage, closer := NewTestStorage(t, 1)
|
storage, closer := NewTestStorage(t, 1)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
|
storage.archiveHighWatermark = 90
|
||||||
samples := make([]*model.Sample, 100)
|
samples := make([]*model.Sample, 100)
|
||||||
fingerprints := make(model.Fingerprints, 100)
|
fingerprints := make(model.Fingerprints, 100)
|
||||||
|
|
||||||
|
@ -56,6 +57,20 @@ func TestMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
storage.WaitForIndexing()
|
storage.WaitForIndexing()
|
||||||
|
|
||||||
|
// Archive every tenth metric.
|
||||||
|
for i, fp := range fingerprints {
|
||||||
|
if i%10 != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s, ok := storage.fpToSeries.get(fp)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("could not retrieve series for fp", fp)
|
||||||
|
}
|
||||||
|
storage.fpLocker.Lock(fp)
|
||||||
|
storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime)
|
||||||
|
storage.fpLocker.Unlock(fp)
|
||||||
|
}
|
||||||
|
|
||||||
newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher {
|
newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher {
|
||||||
lm, err := metric.NewLabelMatcher(matchType, name, value)
|
lm, err := metric.NewLabelMatcher(matchType, name, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -178,7 +193,10 @@ func TestMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, mt := range matcherTests {
|
for _, mt := range matcherTests {
|
||||||
res := storage.MetricsForLabelMatchers(mt.matchers...)
|
res := storage.MetricsForLabelMatchers(
|
||||||
|
model.Earliest, model.Latest,
|
||||||
|
mt.matchers...,
|
||||||
|
)
|
||||||
if len(mt.expected) != len(res) {
|
if len(mt.expected) != len(res) {
|
||||||
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res))
|
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res))
|
||||||
}
|
}
|
||||||
|
@ -194,6 +212,56 @@ func TestMatches(t *testing.T) {
|
||||||
t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers)
|
t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Smoketest for from/through.
|
||||||
|
if len(storage.MetricsForLabelMatchers(
|
||||||
|
model.Earliest, -10000,
|
||||||
|
mt.matchers...,
|
||||||
|
)) > 0 {
|
||||||
|
t.Error("expected no matches with 'through' older than any sample")
|
||||||
|
}
|
||||||
|
if len(storage.MetricsForLabelMatchers(
|
||||||
|
10000, model.Latest,
|
||||||
|
mt.matchers...,
|
||||||
|
)) > 0 {
|
||||||
|
t.Error("expected no matches with 'from' newer than any sample")
|
||||||
|
}
|
||||||
|
// Now the tricky one, cut out something from the middle.
|
||||||
|
var (
|
||||||
|
from model.Time = 25
|
||||||
|
through model.Time = 75
|
||||||
|
)
|
||||||
|
res = storage.MetricsForLabelMatchers(
|
||||||
|
from, through,
|
||||||
|
mt.matchers...,
|
||||||
|
)
|
||||||
|
expected := model.Fingerprints{}
|
||||||
|
for _, fp := range mt.expected {
|
||||||
|
i := 0
|
||||||
|
for ; fingerprints[i] != fp && i < len(fingerprints); i++ {
|
||||||
|
}
|
||||||
|
if i == len(fingerprints) {
|
||||||
|
t.Fatal("expected fingerprint does not exist")
|
||||||
|
}
|
||||||
|
if !model.Time(i).Before(from) && !model.Time(i).After(through) {
|
||||||
|
expected = append(expected, fp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(expected) != len(res) {
|
||||||
|
t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res))
|
||||||
|
}
|
||||||
|
for fp1 := range res {
|
||||||
|
found := false
|
||||||
|
for _, fp2 := range expected {
|
||||||
|
if fp1 == fp2 {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,7 +430,10 @@ func BenchmarkLabelMatching(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{}
|
benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{}
|
||||||
for _, mt := range matcherTests {
|
for _, mt := range matcherTests {
|
||||||
benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...)
|
benchLabelMatchingRes = s.MetricsForLabelMatchers(
|
||||||
|
model.Earliest, model.Latest,
|
||||||
|
mt...,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Stop timer to not count the storage closing.
|
// Stop timer to not count the storage closing.
|
||||||
|
@ -465,11 +536,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
s.maintainMemorySeries(fpToBeArchived, 0)
|
s.maintainMemorySeries(fpToBeArchived, 0)
|
||||||
s.fpLocker.Lock(fpToBeArchived)
|
s.fpLocker.Lock(fpToBeArchived)
|
||||||
s.fpToSeries.del(fpToBeArchived)
|
s.fpToSeries.del(fpToBeArchived)
|
||||||
if err := s.persistence.archiveMetric(
|
s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond))
|
||||||
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
|
|
||||||
); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
s.fpLocker.Unlock(fpToBeArchived)
|
s.fpLocker.Unlock(fpToBeArchived)
|
||||||
|
|
||||||
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
|
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
|
||||||
|
@ -576,11 +643,7 @@ func TestQuarantineMetric(t *testing.T) {
|
||||||
s.maintainMemorySeries(fpToBeArchived, 0)
|
s.maintainMemorySeries(fpToBeArchived, 0)
|
||||||
s.fpLocker.Lock(fpToBeArchived)
|
s.fpLocker.Lock(fpToBeArchived)
|
||||||
s.fpToSeries.del(fpToBeArchived)
|
s.fpToSeries.del(fpToBeArchived)
|
||||||
if err := s.persistence.archiveMetric(
|
s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond))
|
||||||
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
|
|
||||||
); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
s.fpLocker.Unlock(fpToBeArchived)
|
s.fpLocker.Unlock(fpToBeArchived)
|
||||||
|
|
||||||
// Corrupt the series file for m3.
|
// Corrupt the series file for m3.
|
||||||
|
@ -692,11 +755,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
|
||||||
if cd.isEvicted() {
|
if cd.isEvicted() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for sample := range cd.c.newIterator().values() {
|
it := cd.c.newIterator()
|
||||||
if sample.error != nil {
|
for it.scan() {
|
||||||
t.Error(sample.error)
|
values = append(values, it.value())
|
||||||
}
|
}
|
||||||
values = append(values, sample.SamplePair)
|
if it.err() != nil {
|
||||||
|
t.Error(it.err())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1137,36 +1201,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := s.persistence.archiveMetric(
|
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
|
||||||
fp, series.metric, series.firstTime(), lastTime,
|
archived, _, _ := s.persistence.hasArchivedMetric(fp)
|
||||||
); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
archived, _, _, err := s.persistence.hasArchivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !archived {
|
if !archived {
|
||||||
t.Fatal("not archived")
|
t.Fatal("not archived")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop ~half of the chunks of an archived series.
|
// Drop ~half of the chunks of an archived series.
|
||||||
s.maintainArchivedSeries(fp, 10000)
|
s.maintainArchivedSeries(fp, 10000)
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _ = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !archived {
|
if !archived {
|
||||||
t.Fatal("archived series purged although only half of the chunks dropped")
|
t.Fatal("archived series purged although only half of the chunks dropped")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop everything.
|
// Drop everything.
|
||||||
s.maintainArchivedSeries(fp, 100000)
|
s.maintainArchivedSeries(fp, 100000)
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _ = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if archived {
|
if archived {
|
||||||
t.Fatal("archived series not dropped")
|
t.Fatal("archived series not dropped")
|
||||||
}
|
}
|
||||||
|
@ -1192,16 +1242,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := s.persistence.archiveMetric(
|
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
|
||||||
fp, series.metric, series.firstTime(), lastTime,
|
archived, _, _ = s.persistence.hasArchivedMetric(fp)
|
||||||
); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !archived {
|
if !archived {
|
||||||
t.Fatal("not archived")
|
t.Fatal("not archived")
|
||||||
}
|
}
|
||||||
|
@ -1213,24 +1255,25 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("could not find series")
|
t.Fatal("could not find series")
|
||||||
}
|
}
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _ = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if archived {
|
if archived {
|
||||||
t.Fatal("archived")
|
t.Fatal("archived")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set archiveHighWatermark to a low value so that we can see it increase.
|
||||||
|
s.archiveHighWatermark = 42
|
||||||
|
|
||||||
// This will archive again, but must not drop it completely, despite the
|
// This will archive again, but must not drop it completely, despite the
|
||||||
// memorySeries being empty.
|
// memorySeries being empty.
|
||||||
s.maintainMemorySeries(fp, 10000)
|
s.maintainMemorySeries(fp, 10000)
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _ = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !archived {
|
if !archived {
|
||||||
t.Fatal("series purged completely")
|
t.Fatal("series purged completely")
|
||||||
}
|
}
|
||||||
|
// archiveHighWatermark must have been set by maintainMemorySeries.
|
||||||
|
if want, got := model.Time(19998), s.archiveHighWatermark; want != got {
|
||||||
|
t.Errorf("want archiveHighWatermark %v, got %v", want, got)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {
|
func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package local
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
|
||||||
SyncStrategy: Adaptive,
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
storage := NewMemorySeriesStorage(o)
|
storage := NewMemorySeriesStorage(o)
|
||||||
|
storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest
|
||||||
if err := storage.Start(); err != nil {
|
if err := storage.Start(); err != nil {
|
||||||
directory.Close()
|
directory.Close()
|
||||||
t.Fatalf("Error creating storage: %s", err)
|
t.Fatalf("Error creating storage: %s", err)
|
||||||
|
|
|
@ -226,7 +226,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &apiError{errorBadData, err}
|
return nil, &apiError{errorBadData, err}
|
||||||
}
|
}
|
||||||
for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) {
|
for fp, met := range api.Storage.MetricsForLabelMatchers(
|
||||||
|
model.Earliest, model.Latest, // Get every series.
|
||||||
|
matchers...,
|
||||||
|
) {
|
||||||
res[fp] = met
|
res[fp] = met
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -250,7 +253,10 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &apiError{errorBadData, err}
|
return nil, &apiError{errorBadData, err}
|
||||||
}
|
}
|
||||||
for fp := range api.Storage.MetricsForLabelMatchers(matchers...) {
|
for fp := range api.Storage.MetricsForLabelMatchers(
|
||||||
|
model.Earliest, model.Latest, // Get every series.
|
||||||
|
matchers...,
|
||||||
|
) {
|
||||||
fps[fp] = struct{}{}
|
fps[fp] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
|
||||||
|
|
||||||
"github.com/prometheus/common/expfmt"
|
"github.com/prometheus/common/expfmt"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
@ -33,7 +32,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
req.ParseForm()
|
req.ParseForm()
|
||||||
|
|
||||||
metrics := map[model.Fingerprint]metric.Metric{}
|
fps := map[model.Fingerprint]struct{}{}
|
||||||
|
|
||||||
for _, s := range req.Form["match[]"] {
|
for _, s := range req.Form["match[]"] {
|
||||||
matchers, err := promql.ParseMetricSelector(s)
|
matchers, err := promql.ParseMetricSelector(s)
|
||||||
|
@ -41,8 +40,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) {
|
for fp := range h.storage.MetricsForLabelMatchers(
|
||||||
metrics[fp] = met
|
model.Now().Add(-promql.StalenessDelta), model.Latest,
|
||||||
|
matchers...,
|
||||||
|
) {
|
||||||
|
fps[fp] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,19 +64,19 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
Type: dto.MetricType_UNTYPED.Enum(),
|
Type: dto.MetricType_UNTYPED.Enum(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for fp, met := range metrics {
|
for fp := range fps {
|
||||||
globalUsed := map[model.LabelName]struct{}{}
|
globalUsed := map[model.LabelName]struct{}{}
|
||||||
|
|
||||||
sp := h.storage.LastSamplePairForFingerprint(fp)
|
s := h.storage.LastSampleForFingerprint(fp)
|
||||||
// Discard if sample does not exist or lays before the staleness interval.
|
// Discard if sample does not exist or lays before the staleness interval.
|
||||||
if sp.Timestamp.Before(minTimestamp) {
|
if s.Timestamp.Before(minTimestamp) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset label slice.
|
// Reset label slice.
|
||||||
protMetric.Label = protMetric.Label[:0]
|
protMetric.Label = protMetric.Label[:0]
|
||||||
|
|
||||||
for ln, lv := range met.Metric {
|
for ln, lv := range s.Metric {
|
||||||
if ln == model.MetricNameLabel {
|
if ln == model.MetricNameLabel {
|
||||||
protMetricFam.Name = proto.String(string(lv))
|
protMetricFam.Name = proto.String(string(lv))
|
||||||
continue
|
continue
|
||||||
|
@ -98,8 +100,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protMetric.TimestampMs = proto.Int64(int64(sp.Timestamp))
|
protMetric.TimestampMs = proto.Int64(int64(s.Timestamp))
|
||||||
protMetric.Untyped.Value = proto.Float64(float64(sp.Value))
|
protMetric.Untyped.Value = proto.Float64(float64(s.Value))
|
||||||
|
|
||||||
if err := enc.Encode(protMetricFam); err != nil {
|
if err := enc.Encode(protMetricFam); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
|
Loading…
Reference in a new issue