Merge pull request #1424 from prometheus/beorn7/storage2

Move and improve lastSamplePair.
This commit is contained in:
Björn Rabenstein 2016-03-09 17:16:44 +01:00
commit c088b2669b
20 changed files with 1272 additions and 653 deletions

View file

@ -41,14 +41,20 @@ type Analyzer struct {
// fingerprints. One of these structs is collected for each offset by the query
// analyzer.
type preloadTimes struct {
// Instants require single samples to be loaded along the entire query
// range, with intervals between the samples corresponding to the query
// resolution.
instants map[model.Fingerprint]struct{}
// Ranges require loading a range of samples at each resolution step,
// stretching backwards from the current evaluation timestamp. The length of
// the range into the past is given by the duration, as in "foo[5m]".
// Ranges require loading a range of samples. They can be triggered by
// two type of expressions: First a range expression AKA matrix
// selector, where the Duration in the ranges map is the length of the
// range in the range expression. Second an instant expression AKA
// vector selector, where the Duration in the ranges map is the
// StalenessDelta. In preloading, both types of expressions result in
// the same effect: Preload everything between the specified start time
// minus the Duration in the ranges map up to the specified end time.
ranges map[model.Fingerprint]time.Duration
// Instants require a single sample to be loaded. This only happens for
// instant expressions AKA vector selectors iff the specified start ond
// end time are the same, Thus, instants is only populated if start and
// end time are the same.
instants map[model.Fingerprint]struct{}
}
// Analyze the provided expression and attach metrics and fingerprints to data-selecting
@ -57,13 +63,15 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
a.offsetPreloadTimes = map[time.Duration]preloadTimes{}
getPreloadTimes := func(offset time.Duration) preloadTimes {
if _, ok := a.offsetPreloadTimes[offset]; !ok {
a.offsetPreloadTimes[offset] = preloadTimes{
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
if pt, ok := a.offsetPreloadTimes[offset]; ok {
return pt
}
return a.offsetPreloadTimes[offset]
pt := preloadTimes{
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
a.offsetPreloadTimes[offset] = pt
return pt
}
// Retrieve fingerprints and metrics for the required time range for
@ -76,11 +84,14 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
pt := getPreloadTimes(n.Offset)
for fp := range n.metrics {
// Only add the fingerprint to the instants if not yet present in the
// ranges. Ranges always contain more points and span more time than
// instants for the same offset.
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
r, alreadyInRanges := pt.ranges[fp]
if a.Start.Equal(a.End) && !alreadyInRanges {
// A true instant, we only need one value.
pt.instants[fp] = struct{}{}
continue
}
if r < StalenessDelta {
pt.ranges[fp] = StalenessDelta
}
}
case *MatrixSelector:
@ -135,35 +146,13 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
startOfRange := start.Add(-rangeDuration)
if StalenessDelta > rangeDuration {
// Cover a weird corner case: The expression
// mixes up instants and ranges for the same
// series. We'll handle that over-all as
// range. But if the rangeDuration is smaller
// than the StalenessDelta, the range wouldn't
// cover everything potentially needed for the
// instant, so we have to extend startOfRange.
startOfRange = start.Add(-StalenessDelta)
}
iter, err := p.PreloadRange(fp, startOfRange, end)
if err != nil {
return nil, err
}
itersForDuration[fp] = iter
itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end)
}
for fp := range pt.instants {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
// Need to look backwards by StalenessDelta but not
// forward because we always return the closest sample
// _before_ the reference time.
iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end)
if err != nil {
return nil, err
}
itersForDuration[fp] = iter
itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta)
}
}

View file

@ -575,15 +575,6 @@ func (ev *evaluator) evalMatrix(e Expr) matrix {
return mat
}
// evalMatrixBounds attempts to evaluate e to matrix boundaries and errors otherwise.
func (ev *evaluator) evalMatrixBounds(e Expr) matrix {
ms, ok := e.(*MatrixSelector)
if !ok {
ev.errorf("matrix bounds can only be evaluated for matrix selectors, got %T", e)
}
return ev.matrixSelectorBounds(ms)
}
// evalString attempts to evaluate e to a string value and errors otherwise.
func (ev *evaluator) evalString(e Expr) *model.String {
val := ev.eval(e)
@ -731,29 +722,6 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix {
return matrix(sampleStreams)
}
// matrixSelectorBounds evaluates the boundaries of a *MatrixSelector.
func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) matrix {
interval := metric.Interval{
OldestInclusive: ev.Timestamp.Add(-node.Range - node.Offset),
NewestInclusive: ev.Timestamp.Add(-node.Offset),
}
sampleStreams := make([]*sampleStream, 0, len(node.iterators))
for fp, it := range node.iterators {
samplePairs := it.BoundaryValues(interval)
if len(samplePairs) == 0 {
continue
}
ss := &sampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleStreams = append(sampleStreams, ss)
}
return matrix(sampleStreams)
}
func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector {
if matching.Card != CardManyToMany {
panic("logical operations must always be many-to-many matching")

View file

@ -524,10 +524,37 @@ func funcLog10(ev *evaluator, args Expressions) model.Value {
return vector
}
// linearRegression performs a least-square linear regression analysis on the
// provided SamplePairs. It returns the slope, and the intercept value at the
// provided time.
func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slope, intercept model.SampleValue) {
var (
n model.SampleValue
sumX, sumY model.SampleValue
sumXY, sumX2 model.SampleValue
)
for _, sample := range samples {
x := model.SampleValue(
model.Time(sample.Timestamp-interceptTime).UnixNano(),
) / 1e9
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
covXY := sumXY - sumX*sumY/n
varX := sumX2 - sumX*sumX/n
slope = covXY / varX
intercept = sumY/n - slope*sumX/n
return slope, intercept
}
// === deriv(node model.ValMatrix) Vector ===
func funcDeriv(ev *evaluator, args Expressions) model.Value {
resultVector := vector{}
mat := ev.evalMatrix(args[0])
resultVector := make(vector, 0, len(mat))
for _, samples := range mat {
// No sense in trying to compute a derivative without at least two points.
@ -535,29 +562,10 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value {
if len(samples.Values) < 2 {
continue
}
// Least squares.
var (
n model.SampleValue
sumX, sumY model.SampleValue
sumXY, sumX2 model.SampleValue
)
for _, sample := range samples.Values {
x := model.SampleValue(sample.Timestamp.UnixNano() / 1e9)
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
numerator := sumXY - sumX*sumY/n
denominator := sumX2 - (sumX*sumX)/n
resultValue := numerator / denominator
slope, _ := linearRegression(samples.Values, 0)
resultSample := &sample{
Metric: samples.Metric,
Value: resultValue,
Value: slope,
Timestamp: ev.Timestamp,
}
resultSample.Metric.Del(model.MetricNameLabel)
@ -568,44 +576,26 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value {
// === predict_linear(node model.ValMatrix, k model.ValScalar) Vector ===
func funcPredictLinear(ev *evaluator, args Expressions) model.Value {
vec := funcDeriv(ev, args[0:1]).(vector)
duration := model.SampleValue(model.SampleValue(ev.evalFloat(args[1])))
mat := ev.evalMatrix(args[0])
resultVector := make(vector, 0, len(mat))
duration := model.SampleValue(ev.evalFloat(args[1]))
excludedLabels := map[model.LabelName]struct{}{
model.MetricNameLabel: {},
}
// Calculate predicted delta over the duration.
signatureToDelta := map[uint64]model.SampleValue{}
for _, el := range vec {
signature := model.SignatureWithoutLabels(el.Metric.Metric, excludedLabels)
signatureToDelta[signature] = el.Value * duration
}
// add predicted delta to last value.
// TODO(beorn7): This is arguably suboptimal. The funcDeriv above has
// given us an estimate over the range. So we should add the delta to
// the value predicted for the end of the range. Also, once this has
// been rectified, we are not using BoundaryValues anywhere anymore, so
// we can kick out a whole lot of code.
matrixBounds := ev.evalMatrixBounds(args[0])
outVec := make(vector, 0, len(signatureToDelta))
for _, samples := range matrixBounds {
for _, samples := range mat {
// No sense in trying to predict anything without at least two points.
// Drop this vector element.
if len(samples.Values) < 2 {
continue
}
signature := model.SignatureWithoutLabels(samples.Metric.Metric, excludedLabels)
delta, ok := signatureToDelta[signature]
if ok {
samples.Metric.Del(model.MetricNameLabel)
outVec = append(outVec, &sample{
Metric: samples.Metric,
Value: delta + samples.Values[1].Value,
Timestamp: ev.Timestamp,
})
slope, intercept := linearRegression(samples.Values, ev.Timestamp)
resultSample := &sample{
Metric: samples.Metric,
Value: slope*duration + intercept,
Timestamp: ev.Timestamp,
}
resultSample.Metric.Del(model.MetricNameLabel)
resultVector = append(resultVector, resultSample)
}
return outVec
return resultVector
}
// === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector ===

View file

@ -102,16 +102,28 @@ eval instant at 50m deriv(testcounter_reset_middle[100m])
{} 0.010606060606060607
# predict_linear should return correct result.
# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000]
# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50]
# sumX = 16500
# sumY = 250
# sumXY = 480000
# sumX2 = 34650000
# n = 11
# covXY = 105000
# varX = 9900000
# slope = 0.010606060606060607
# intercept at t=0: 6.818181818181818
# intercept at t=3000: 38.63636363636364
# intercept at t=3000+3600: 76.81818181818181
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600)
{} 88.181818181818185200
{} 76.81818181818181
# predict_linear is syntactic sugar around deriv.
# With http_requests, there is a sample value exactly at the end of
# the range, and it has exactly the predicted value, so predict_linear
# can be emulated with deriv.
eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600)
{group="canary", instance="1", job="app-server"} 0
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - (testcounter_reset_middle + deriv(testcounter_reset_middle[100m]) * 3600)
{} 0
clear
# Tests for label_replace.

View file

@ -81,13 +81,6 @@ const (
// is populated upon creation of a chunkDesc, so it is alway safe to call
// firstTime. The firstTime method is arguably not needed and only there for
// consistency with lastTime.
//
// Yet another (deprecated) case is lastSamplePair. It's used in federation and
// must be callable without pinning. Locking the fingerprint of the series is
// still required (to avoid concurrent appends to the chunk). The call is
// relatively expensive because of the required acquisition of the evict
// mutex. It will go away, though, once tracking the lastSamplePair has been
// moved into the series object.
type chunkDesc struct {
sync.Mutex // Protects pinning.
c chunk // nil if chunk is evicted.
@ -119,7 +112,7 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc {
// add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of
// the series.
func (cd *chunkDesc) add(s *model.SamplePair) []chunk {
func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) {
return cd.c.add(s)
}
@ -176,9 +169,9 @@ func (cd *chunkDesc) firstTime() model.Time {
// lastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to
// be locked.
func (cd *chunkDesc) lastTime() model.Time {
func (cd *chunkDesc) lastTime() (model.Time, error) {
if cd.chunkLastTime != model.Earliest || cd.c == nil {
return cd.chunkLastTime
return cd.chunkLastTime, nil
}
return cd.c.newIterator().lastTimestamp()
}
@ -188,28 +181,15 @@ func (cd *chunkDesc) lastTime() model.Time {
// last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series.
func (cd *chunkDesc) maybePopulateLastTime() {
func (cd *chunkDesc) maybePopulateLastTime() error {
if cd.chunkLastTime == model.Earliest && cd.c != nil {
cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
}
}
// lastSamplePair returns the last sample pair of the underlying chunk, or nil
// if the chunk is evicted. For safe concurrent access, this method requires the
// fingerprint of the time series to be locked.
// TODO(beorn7): Move up into memorySeries.
func (cd *chunkDesc) lastSamplePair() *model.SamplePair {
cd.Lock()
defer cd.Unlock()
if cd.c == nil {
return nil
}
it := cd.c.newIterator()
return &model.SamplePair{
Timestamp: it.lastTimestamp(),
Value: it.lastSampleValue(),
t, err := cd.c.newIterator().lastTimestamp()
if err != nil {
return err
}
cd.chunkLastTime = t
}
return nil
}
// isEvicted returns whether the chunk is evicted. For safe concurrent access,
@ -266,14 +246,14 @@ type chunk interface {
// any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk.
add(sample *model.SamplePair) []chunk
add(sample model.SamplePair) ([]chunk, error)
clone() chunk
firstTime() model.Time
newIterator() chunkIterator
marshal(io.Writer) error
marshalToBuf([]byte) error
unmarshal(io.Reader) error
unmarshalFromBuf([]byte)
unmarshalFromBuf([]byte) error
encoding() chunkEncoding
}
@ -284,57 +264,73 @@ type chunkIterator interface {
// length returns the number of samples in the chunk.
length() int
// Gets the timestamp of the n-th sample in the chunk.
timestampAtIndex(int) model.Time
timestampAtIndex(int) (model.Time, error)
// Gets the last timestamp in the chunk.
lastTimestamp() model.Time
lastTimestamp() (model.Time, error)
// Gets the sample value of the n-th sample in the chunk.
sampleValueAtIndex(int) model.SampleValue
sampleValueAtIndex(int) (model.SampleValue, error)
// Gets the last sample value in the chunk.
lastSampleValue() model.SampleValue
lastSampleValue() (model.SampleValue, error)
// Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no
// applicable value exists, a SamplePair with timestamp model.Earliest
// and value 0.0 is returned.
valueAtOrBeforeTime(model.Time) model.SamplePair
// applicable value exists, ZeroSamplePair is returned.
valueAtOrBeforeTime(model.Time) (model.SamplePair, error)
// Gets all values contained within a given interval.
rangeValues(metric.Interval) []model.SamplePair
rangeValues(metric.Interval) ([]model.SamplePair, error)
// Whether a given timestamp is contained between first and last value
// in the chunk.
contains(model.Time) bool
contains(model.Time) (bool, error)
// values returns a channel, from which all sample values in the chunk
// can be received in order. The channel is closed after the last
// one. It is generally not safe to mutate the chunk while the channel
// is still open.
values() <-chan *model.SamplePair
// is still open. If a value is returned with error!=nil, no further
// values will be returned and the channel is closed.
values() <-chan struct {
model.SamplePair
error
}
}
func transcodeAndAdd(dst chunk, src chunk, s *model.SamplePair) []chunk {
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
chunkOps.WithLabelValues(transcode).Inc()
head := dst
body := []chunk{}
for v := range src.newIterator().values() {
newChunks := head.add(v)
if v.error != nil {
return nil, v.error
}
newChunks, err := head.add(v.SamplePair)
if err != nil {
return nil, err
}
body = append(body, newChunks[:len(newChunks)-1]...)
head = newChunks[len(newChunks)-1]
}
newChunks := head.add(s)
return append(body, newChunks...)
newChunks, err := head.add(s)
if err != nil {
return nil, err
}
return append(body, newChunks...), nil
}
// newChunk creates a new chunk according to the encoding set by the
// defaultChunkEncoding flag.
func newChunk() chunk {
return newChunkForEncoding(DefaultChunkEncoding)
chunk, err := newChunkForEncoding(DefaultChunkEncoding)
if err != nil {
panic(err)
}
return chunk
}
func newChunkForEncoding(encoding chunkEncoding) chunk {
func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
switch encoding {
case delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen)
return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case doubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen)
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil
default:
panic(fmt.Errorf("unknown chunk encoding: %v", encoding))
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
}
}

View file

@ -14,10 +14,11 @@
package local
import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync/atomic"
@ -52,7 +53,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
log.Info("Scanning files.")
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dir, err := os.Open(dirname)
if os.IsNotExist(err) {
continue
@ -139,7 +140,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
}
}
p.setDirty(false)
p.setDirty(false, nil)
log.Warn("Crash recovery complete.")
return nil
}
@ -175,36 +176,46 @@ func (p *persistence) sanitizeSeries(
fingerprintToSeries map[model.Fingerprint]*memorySeries,
fpm fpMappings,
) (model.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name())
var (
fp model.Fingerprint
err error
filename = filepath.Join(dirname, fi.Name())
s *memorySeries
)
purge := func() {
var err error
defer func() {
if err != nil {
log.Errorf("Failed to move lost series file %s to orphaned directory, deleting it instead. Error was: %s", filename, err)
if err = os.Remove(filename); err != nil {
log.Errorf("Even deleting file %s did not work: %s", filename, err)
}
if fp != 0 {
var metric model.Metric
if s != nil {
metric = s.metric
}
}()
orphanedDir := path.Join(p.basePath, "orphaned", path.Base(dirname))
if err = os.MkdirAll(orphanedDir, 0700); err != nil {
return
if err = p.quarantineSeriesFile(
fp, errors.New("purge during crash recovery"), metric,
); err == nil {
return
}
log.
With("file", filename).
With("error", err).
Error("Failed to move lost series file to orphaned directory.")
}
if err = os.Rename(filename, path.Join(orphanedDir, fi.Name())); err != nil {
return
// If we are here, we are either purging an incorrectly named
// file, or quarantining has failed. So simply delete the file.
if err = os.Remove(filename); err != nil {
log.
With("file", filename).
With("error", err).
Error("Failed to delete lost series file.")
}
}
var fp model.Fingerprint
var err error
if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) ||
!strings.HasSuffix(fi.Name(), seriesFileSuffix) {
log.Warnf("Unexpected series file name %s.", filename)
purge()
return fp, false
}
if fp, err = model.FingerprintFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
log.Warnf("Error parsing file name %s: %s", filename, err)
purge()
return fp, false
@ -274,7 +285,15 @@ func (p *persistence) sanitizeSeries(
s.chunkDescs = cds
s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime()
s.lastTime = cds[len(cds)-1].lastTime()
s.lastTime, err = cds[len(cds)-1].lastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
s.persistWatermark = len(cds)
s.modTime = modTime
return fp, true
@ -304,7 +323,15 @@ func (p *persistence) sanitizeSeries(
s.savedFirstTime = cds[0].firstTime()
s.modTime = modTime
lastTime := cds[len(cds)-1].lastTime()
lastTime, err := cds[len(cds)-1].lastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
keepIdx := -1
for i, cd := range s.chunkDescs {
if cd.firstTime() >= lastTime {
@ -414,7 +441,10 @@ func (p *persistence) cleanUpArchiveIndexes(
if err != nil {
return err
}
series := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp)))
series, err := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp)))
if err != nil {
return err
}
fpToSeries[model.Fingerprint(fp)] = series
return nil
}); err != nil {

View file

@ -76,7 +76,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
}
// add implements chunk.
func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 {
c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
@ -89,14 +89,17 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
baseValue := c.baseValue()
dt := s.Timestamp - c.baseTime()
if dt < 0 {
panic("time delta is less than zero")
return nil, fmt.Errorf("time delta is less than zero: %v", dt)
}
dv := s.Value - baseValue
@ -130,8 +133,11 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
offset := len(c)
@ -148,7 +154,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
offset += int(tb)
@ -165,7 +171,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv)))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
}
} else {
switch vb {
@ -175,10 +181,10 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}
return []chunk{&c}, nil
}
// clone implements chunk.
@ -243,15 +249,24 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) {
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// encoding implements chunk.
@ -302,57 +317,108 @@ type deltaEncodedChunkIterator struct {
func (it *deltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair {
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t)
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 {
return model.SamplePair{Timestamp: model.Earliest}
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
return model.SamplePair{
Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1),
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
}
// rangeValues implements chunkIterator.
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair {
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len {
return nil
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
result = append(result, model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
})
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result
return result, nil
}
// contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t model.Time) bool {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair {
valuesChan := make(chan *model.SamplePair)
func (it *deltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
valuesChan <- &model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
@ -360,61 +426,61 @@ func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair {
}
// timestampAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
switch it.tBytes {
case d1:
return it.baseT + model.Time(uint8(it.c[offset]))
return it.baseT + model.Time(uint8(it.c[offset])), nil
case d2:
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:]))
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil
case d4:
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:]))
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default:
panic("invalid number of bytes for time delta")
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
}
}
// lastTimestamp implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time {
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
if it.isInt {
switch it.vBytes {
case d0:
return it.baseV
return it.baseV, nil
case d1:
return it.baseV + model.SampleValue(int8(it.c[offset]))
return it.baseV + model.SampleValue(int8(it.c[offset])), nil
case d2:
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints.
default:
panic("invalid number of bytes for integer delta")
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
}
} else {
switch it.vBytes {
case d4:
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default:
panic("invalid number of bytes for floating point delta")
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
}
}
}
// lastSampleValue implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue {
func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}

View file

@ -83,9 +83,9 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
}
// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 {
return c.addFirstSample(s)
return c.addFirstSample(s), nil
}
tb := c.timeBytes()
@ -101,8 +101,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta()
@ -136,8 +139,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
offset := len(c)
@ -154,7 +160,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
offset += int(tb)
@ -171,7 +177,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv)))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
}
} else {
switch vb {
@ -181,10 +187,10 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}
return []chunk{&c}, nil
}
// clone implements chunk.
@ -251,15 +257,24 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) {
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// encoding implements chunk.
@ -335,7 +350,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk {
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:],
@ -350,10 +365,10 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk {
// addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb deltaBytes) []chunk {
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) {
baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 {
panic("base time delta is less than zero")
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
}
c = c[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
@ -391,7 +406,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb del
math.Float64bits(float64(baseValueDelta)),
)
}
return []chunk{&c}
return []chunk{&c}, nil
}
// doubleDeltaEncodedChunkIterator implements chunkIterator.
@ -408,57 +423,108 @@ type doubleDeltaEncodedChunkIterator struct {
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair {
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t)
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 {
return model.SamplePair{Timestamp: model.Earliest}
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
return model.SamplePair{
Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1),
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
}
// rangeValues implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair {
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len {
return nil
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
result = append(result, model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
})
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result
return result, nil
}
// contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) bool {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair {
valuesChan := make(chan *model.SamplePair)
func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
valuesChan <- &model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
@ -466,17 +532,17 @@ func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair {
}
// timestampAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
if idx == 0 {
return it.baseT
return it.baseT, nil
}
if idx == 1 {
// If time bytes are at d8, the time is saved directly rather
// than as a difference.
if it.tBytes == d8 {
return it.baseΔT
return it.baseΔT, nil
}
return it.baseT + it.baseΔT
return it.baseT + it.baseΔT, nil
}
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
@ -485,40 +551,40 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time
case d1:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int8(it.c[offset]))
model.Time(int8(it.c[offset])), nil
case d2:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:])))
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:])))
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default:
panic("invalid number of bytes for time delta")
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
}
}
// lastTimestamp implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() model.Time {
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
if idx == 0 {
return it.baseV
return it.baseV, nil
}
if idx == 1 {
// If value bytes are at d8, the value is saved directly rather
// than as a difference.
if it.vBytes == d8 {
return it.baseΔV
return it.baseΔV, nil
}
return it.baseV + it.baseΔV
return it.baseV + it.baseΔV, nil
}
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
@ -527,39 +593,39 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.Sam
switch it.vBytes {
case d0:
return it.baseV +
model.SampleValue(idx)*it.baseΔV
model.SampleValue(idx)*it.baseΔV, nil
case d1:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int8(it.c[offset]))
model.SampleValue(int8(it.c[offset])), nil
case d2:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints.
default:
panic("invalid number of bytes for integer delta")
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
}
} else {
switch it.vBytes {
case d4:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default:
panic("invalid number of bytes for floating point delta")
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
}
}
}
// lastSampleValue implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() model.SampleValue {
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}

View file

@ -107,6 +107,8 @@ func (hs *headsScanner) scan() bool {
firstTime int64
lastTime int64
encoding byte
ch chunk
lastTimeHead model.Time
)
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
return false
@ -174,11 +176,13 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false
}
chunk := newChunkForEncoding(chunkEncoding(encoding))
if hs.err = chunk.unmarshal(hs.r); hs.err != nil {
if ch, hs.err = newChunkForEncoding(chunkEncoding(encoding)); hs.err != nil {
return false
}
cd := newChunkDesc(chunk, chunk.firstTime())
if hs.err = ch.unmarshal(hs.r); hs.err != nil {
return false
}
cd := newChunkDesc(ch, ch.firstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
@ -189,6 +193,10 @@ func (hs *headsScanner) scan() bool {
}
}
if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].lastTime(); hs.err != nil {
return false
}
hs.series = &memorySeries{
metric: model.Metric(metric),
chunkDescs: chunkDescs,
@ -196,7 +204,7 @@ func (hs *headsScanner) scan() bool {
modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: model.Time(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
lastTime: lastTimeHead,
headChunkClosed: headChunkClosed,
}
hs.seriesCurrent++

View file

@ -19,6 +19,7 @@ package index
import (
"os"
"path"
"path/filepath"
"github.com/prometheus/common/model"
@ -95,7 +96,7 @@ func (i *FingerprintMetricIndex) Lookup(fp model.Fingerprint) (metric model.Metr
// ready to use.
func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) {
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintToMetricDir),
Path: filepath.Join(basePath, fingerprintToMetricDir),
CacheSizeBytes: FingerprintMetricCacheSize,
})
if err != nil {
@ -167,7 +168,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l model.LabelName) (values map[mod
// LabelNameLabelValuesIndex ready to use.
func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) {
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelNameToLabelValuesDir),
Path: filepath.Join(basePath, labelNameToLabelValuesDir),
CacheSizeBytes: LabelNameLabelValuesCacheSize,
})
if err != nil {
@ -245,7 +246,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p model.LabelPair) (fps map[model.
// LabelPairFingerprintIndex ready to use.
func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) {
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelPairToFingerprintsDir),
Path: filepath.Join(basePath, labelPairToFingerprintsDir),
CacheSizeBytes: LabelPairFingerprintsCacheSize,
})
if err != nil {
@ -283,7 +284,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp model.Fingerprint) (firstTime, las
// FingerprintTimeRangeIndex ready to use.
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {
fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintTimeRangeDir),
Path: filepath.Join(basePath, fingerprintTimeRangeDir),
CacheSizeBytes: FingerprintTimeRangeCacheSize,
})
if err != nil {

View file

@ -60,6 +60,9 @@ const (
requestedPurge = "purge_on_request"
memoryMaintenance = "maintenance_in_memory"
archiveMaintenance = "maintenance_in_archive"
completedQurantine = "quarantine_completed"
droppedQuarantine = "quarantine_dropped"
failedQuarantine = "quarantine_failed"
// Op-types for chunkOps.
createAndPin = "create" // A chunkDesc creation with refCount=1.

View file

@ -14,6 +14,8 @@
package local
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -42,10 +44,12 @@ type Storage interface {
// label matchers. At least one label matcher must be specified that does not
// match the empty string.
MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric
// LastSamplePairForFingerprint returns the last sample pair for the
// provided fingerprint. If the respective time series does not exist or
// has an evicted head chunk, nil is returned.
LastSamplePairForFingerprint(model.Fingerprint) *model.SamplePair
// LastSamplePairForFingerprint returns the last sample pair that has
// been ingested for the provided fingerprint. If this instance of the
// Storage has never ingested a sample for the provided fingerprint (or
// the last ingestion is so long ago that the series has been archived),
// ZeroSamplePair is returned.
LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues
// Get the metric associated with the provided fingerprint.
@ -69,16 +73,12 @@ type Storage interface {
// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of
// a series, i.e. it is safe to continue using a SeriesIterator after or during
// modifying the corresponding series, but the iterator will represent the state
// of the series prior the modification.
// of the series prior to the modification.
type SeriesIterator interface {
// Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no
// applicable value exists, a SamplePair with timestamp model.Earliest
// and value 0.0 is returned.
// applicable value exists, ZeroSamplePair is returned.
ValueAtOrBeforeTime(model.Time) model.SamplePair
// Gets the boundary values of an interval: the first and last value
// within a given interval.
BoundaryValues(metric.Interval) []model.SamplePair
// Gets all values contained within a given interval.
RangeValues(metric.Interval) []model.SamplePair
}
@ -90,7 +90,18 @@ type Preloader interface {
PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error)
) SeriesIterator
PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) SeriesIterator
// Close unpins any previously requested series data from memory.
Close()
}
// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local
// package to signal a non-existing sample. It is a SamplePair with timestamp
// model.Earliest and value 0.0. Note that the natural zero value of SamplePair
// has a timestamp of 0, which is possible to appear in a real SamplePair and
// thus not suitable to signal a non-existing SamplePair.
var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest}

View file

@ -20,7 +20,6 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
@ -46,6 +45,7 @@ const (
seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
hintFileSuffix = ".hint"
mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp"
@ -315,8 +315,9 @@ func (p *persistence) isDirty() bool {
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was
// set to true with this method, it cannot be set to false again. (If we became
// dirty during our runtime, there is no way back. If we were dirty from the
// start, a clean-up might make us clean again.)
func (p *persistence) setDirty(dirty bool) {
// start, a clean-up might make us clean again.) The provided error will be
// logged as a reason if dirty is true.
func (p *persistence) setDirty(dirty bool, err error) {
if dirty {
p.dirtyCounter.Inc()
}
@ -328,7 +329,7 @@ func (p *persistence) setDirty(dirty bool) {
p.dirty = dirty
if dirty {
p.becameDirty = true
log.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
}
}
@ -365,8 +366,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) {
defer func() {
if err != nil {
log.Error("Error persisting chunks: ", err)
p.setDirty(true)
p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err))
}
}()
@ -435,8 +435,13 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err
}
for c := 0; c < batchSize; c++ {
chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:])
chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil {
return nil, err
}
if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return nil, err
}
chunks = append(chunks, chunk)
}
}
@ -464,7 +469,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
return nil, err
}
if fi.Size()%int64(chunkLenWithHeader) != 0 {
p.setDirty(true)
// The returned error will bubble up and lead to quarantining of the whole series.
return nil, fmt.Errorf(
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
fp, fi.Size(), chunkLenWithHeader,
@ -642,7 +647,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil {
lt, err := chunkDesc.lastTime()
if err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
return
}
} else {
@ -748,8 +757,7 @@ func (p *persistence) dropAndPersistChunks(
// please handle with care!
defer func() {
if err != nil {
log.Error("Error dropping and/or persisting chunks: ", err)
p.setDirty(true)
p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err))
}
}()
@ -758,7 +766,15 @@ func (p *persistence) dropAndPersistChunks(
// too old. If that's the case, the chunks in the series file
// are all too old, too.
i := 0
for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ {
for ; i < len(chunks); i++ {
var lt model.Time
lt, err = chunks[i].newIterator().lastTimestamp()
if err != nil {
return
}
if !lt.Before(beforeTime) {
break
}
}
if i < len(chunks) {
firstTimeNotDropped = chunks[i].firstTime()
@ -911,6 +927,44 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
return numChunks, nil
}
// quarantineSeriesFile moves a series file to the orphaned directory. It also
// writes a hint file with the provided quarantine reason and, if series is
// non-nil, the string representation of the metric.
func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error {
var (
oldName = p.fileNameForFingerprint(fp)
orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName)))
newName = filepath.Join(orphanedDir, filepath.Base(oldName))
hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix
)
renameErr := os.MkdirAll(orphanedDir, 0700)
if renameErr != nil {
return renameErr
}
renameErr = os.Rename(oldName, newName)
if os.IsNotExist(renameErr) {
// Source file dosn't exist. That's normal.
renameErr = nil
}
// Write hint file even if the rename ended in an error. At least try...
// And ignore errors writing the hint file. It's best effort.
if f, err := os.Create(hintName); err == nil {
if metric != nil {
f.WriteString(metric.String() + "\n")
} else {
f.WriteString("[UNKNOWN METRIC]\n")
}
if quarantineReason != nil {
f.WriteString(quarantineReason.Error() + "\n")
} else {
f.WriteString("[UNKNOWN REASON]\n")
}
f.Close()
}
return renameErr
}
// seriesFileModTime returns the modification time of the series file belonging
// to the provided fingerprint. In case of an error, the zero value of time.Time
// is returned.
@ -962,11 +1016,11 @@ func (p *persistence) archiveMetric(
fp model.Fingerprint, m model.Metric, first, last model.Time,
) error {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
p.setDirty(true)
p.setDirty(true, err)
return err
}
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
p.setDirty(true)
p.setDirty(true, err)
return err
}
return nil
@ -979,6 +1033,9 @@ func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
hasMetric bool, firstTime, lastTime model.Time, err error,
) {
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil {
p.setDirty(true, err)
}
return
}
@ -1027,7 +1084,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error)
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
defer func() {
if err != nil {
p.setDirty(true)
p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err))
}
}()
@ -1058,12 +1115,8 @@ func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
// was actually deleted, the method returns true and the first time and last
// time of the deleted metric. The caller must have locked the fingerprint.
func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) {
defer func() {
if err != nil {
p.setDirty(true)
}
}()
// An error returned here will bubble up and lead to quarantining of the
// series, so no setDirty required.
deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
if err != nil || !deleted {
return false, err
@ -1119,17 +1172,17 @@ func (p *persistence) close() error {
func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen])
return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen])
}
func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
}
func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
}
func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) {
@ -1162,19 +1215,19 @@ func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, e
}
func (p *persistence) headsFileName() string {
return path.Join(p.basePath, headsFileName)
return filepath.Join(p.basePath, headsFileName)
}
func (p *persistence) headsTempFileName() string {
return path.Join(p.basePath, headsTempFileName)
return filepath.Join(p.basePath, headsTempFileName)
}
func (p *persistence) mappingsFileName() string {
return path.Join(p.basePath, mappingsFileName)
return filepath.Join(p.basePath, mappingsFileName)
}
func (p *persistence) mappingsTempFileName() string {
return path.Join(p.basePath, mappingsTempFileName)
return filepath.Join(p.basePath, mappingsTempFileName)
}
func (p *persistence) processIndexingQueue() {
@ -1456,7 +1509,9 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error {
b = b[:writeSize]
for i, chunk := range chunks[:batchSize] {
writeChunkHeader(b[i*chunkLenWithHeader:], chunk)
if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil {
return err
}
if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return err
}
@ -1482,14 +1537,19 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil
}
func writeChunkHeader(header []byte, c chunk) {
func writeChunkHeader(header []byte, c chunk) error {
header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:],
uint64(c.firstTime()),
)
lt, err := c.newIterator().lastTimestamp()
if err != nil {
return err
}
binary.LittleEndian.PutUint64(
header[chunkHeaderLastTimeOffset:],
uint64(c.newIterator().lastTimestamp()),
uint64(lt),
)
return nil
}

View file

@ -14,6 +14,10 @@
package local
import (
"bufio"
"errors"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
@ -49,7 +53,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes
})
}
func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk {
func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk {
fps := model.Fingerprints{
m1.FastFingerprint(),
m2.FastFingerprint(),
@ -60,10 +64,18 @@ func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk {
for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ {
fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&model.SamplePair{
ch, err := newChunkForEncoding(encoding)
if err != nil {
t.Fatal(err)
}
chs, err := ch.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(fp),
})[0])
})
if err != nil {
t.Fatal(err)
}
fpToChunks[fp] = append(fpToChunks[fp], chs[0])
}
}
return fpToChunks
@ -73,7 +85,7 @@ func chunksEqual(c1, c2 chunk) bool {
values2 := c2.newIterator().values()
for v1 := range c1.newIterator().values() {
v2 := <-values2
if !v1.Equal(v2) {
if !(v1 == v2) {
return false
}
}
@ -84,7 +96,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
fpToChunks := buildTestChunks(encoding)
fpToChunks := buildTestChunks(t, encoding)
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
@ -126,10 +138,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
}
for i, cd := range actualChunkDescs {
if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) {
lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(),
i, cd.firstTime(), lastTime,
)
}
@ -140,10 +156,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
}
for i, cd := range actualChunkDescs {
if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) {
lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(),
i, cd.firstTime(), lastTime,
)
}
@ -433,21 +453,21 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s1 := newMemorySeries(m1, nil, time.Time{})
s2 := newMemorySeries(m2, nil, time.Time{})
s3 := newMemorySeries(m3, nil, time.Time{})
s4 := newMemorySeries(m4, nil, time.Time{})
s5 := newMemorySeries(m5, nil, time.Time{})
s1.add(&model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(&model.SamplePair{Timestamp: 2, Value: 2.7})
s1, _ := newMemorySeries(m1, nil, time.Time{})
s2, _ := newMemorySeries(m2, nil, time.Time{})
s3, _ := newMemorySeries(m3, nil, time.Time{})
s4, _ := newMemorySeries(m4, nil, time.Time{})
s5, _ := newMemorySeries(m5, nil, time.Time{})
s1.add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true
s3.persistWatermark = 1
for i := 0; i < 10000; i++ {
s4.add(&model.SamplePair{
s4.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i) / 2,
})
s5.add(&model.SamplePair{
s5.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i * i),
})
@ -552,10 +572,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
i, lastTime, cd.chunkLastTime,
)
}
}
@ -605,10 +629,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
i, cd.chunkLastTime, lastTime,
)
}
}
@ -1051,6 +1079,108 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
}
}
func TestQuranatineSeriesFile(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) {
var (
fpStr = fp.String()
originalFile = p.fileNameForFingerprint(fp)
quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix)
)
if _, err := os.Stat(originalFile); !os.IsNotExist(err) {
t.Errorf("Expected file %q to not exist.", originalFile)
}
if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) {
t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err)
}
f, err := os.Open(hintFile)
defer f.Close()
if err != nil {
t.Errorf("Could not open hint file %q: %s", hintFile, err)
return
}
scanner := bufio.NewScanner(f)
for _, want := range contentHintFile {
if !scanner.Scan() {
t.Errorf("Unexpected end of hint file %q.", hintFile)
return
}
got := scanner.Text()
if want != got {
t.Errorf("Want hint line %q, got %q.", want, got)
}
}
if scanner.Scan() {
t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text())
}
}
if err := p.quarantineSeriesFile(0, nil, nil); err != nil {
t.Error(err)
}
verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
1, errors.New("file does not exist"),
nil,
); err != nil {
t.Error(err)
}
verify(1, false, "[UNKNOWN METRIC]", "file does not exist")
if err := p.quarantineSeriesFile(
2, errors.New("file does not exist"),
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist")
if err := p.quarantineSeriesFile(
3, nil,
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]")
err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm)
if err != nil {
t.Fatal(err)
}
f, err := os.Create(p.fileNameForFingerprint(4))
if err != nil {
t.Fatal(err)
}
f.Close()
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
verify(4, true, `{sound="cloud"}`, "file exists")
if err := p.quarantineSeriesFile(4, nil, nil); err != nil {
t.Error(err)
}
// Overwrites hint file but leaves series file intact.
verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
// Overwrites everything.
verify(4, true, `{sound="cloud"}`, "file exists")
}
var fpStrings = []string{
"b004b821ca50ba26",
"b037c21e884e4fc5",

View file

@ -13,7 +13,11 @@
package local
import "github.com/prometheus/common/model"
import (
"time"
"github.com/prometheus/common/model"
)
// memorySeriesPreloader is a Preloader for the memorySeriesStorage.
type memorySeriesPreloader struct {
@ -25,13 +29,20 @@ type memorySeriesPreloader struct {
func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through)
if err != nil {
return nil, err
}
) SeriesIterator {
cds, iter := p.storage.preloadChunksForRange(fp, from, through)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil
return iter
}
// PreloadInstant implements Preloader
func (p *memorySeriesPreloader) PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) SeriesIterator {
cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter
}
// Close implements Preloader.

View file

@ -162,9 +162,15 @@ type memorySeries struct {
// first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp.
savedFirstTime model.Time
// The timestamp of the last sample in this series. Needed for fast access to
// ensure timestamp monotonicity during ingestion.
// The timestamp of the last sample in this series. Needed for fast
// access for federation and to ensure timestamp monotonicity during
// ingestion.
lastTime model.Time
// The last ingested sample value. Needed for fast access for
// federation.
lastSampleValue model.SampleValue
// Whether lastSampleValue has been set already.
lastSampleValueSet bool
// Whether the current head chunk has already been finished. If true,
// the current head chunk must not be modified anymore.
headChunkClosed bool
@ -185,12 +191,15 @@ type memorySeries struct {
// set to model.Earliest. The zero value for modTime can be used if the
// modification time of the series file is unknown (e.g. if this is a genuinely
// new series).
func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries {
func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) {
var err error
firstTime := model.Earliest
lastTime := model.Earliest
if len(chunkDescs) > 0 {
firstTime = chunkDescs[0].firstTime()
lastTime = chunkDescs[len(chunkDescs)-1].lastTime()
if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil {
return nil, err
}
}
return &memorySeries{
metric: m,
@ -200,14 +209,14 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time)
lastTime: lastTime,
persistWatermark: len(chunkDescs),
modTime: modTime,
}
}, nil
}
// add adds a sample pair to the series. It returns the number of newly
// completed chunks (which are now eligible for persistence).
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(v *model.SamplePair) int {
func (s *memorySeries) add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := newChunkDesc(newChunk(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead)
@ -229,7 +238,10 @@ func (s *memorySeries) add(v *model.SamplePair) int {
s.headChunkUsedByIterator = false
}
chunks := s.head().add(v)
chunks, err := s.head().add(v)
if err != nil {
return 0, err
}
s.head().c = chunks[0]
for _, c := range chunks[1:] {
@ -242,7 +254,9 @@ func (s *memorySeries) add(v *model.SamplePair) int {
}
s.lastTime = v.Timestamp
return len(chunks) - 1
s.lastSampleValue = v.Value
s.lastSampleValueSet = true
return len(chunks) - 1, nil
}
// maybeCloseHeadChunk closes the head chunk if it has not been touched for the
@ -287,10 +301,14 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
// dropChunks removes chunkDescs older than t. The caller must have locked the
// fingerprint of the series.
func (s *memorySeries) dropChunks(t model.Time) {
func (s *memorySeries) dropChunks(t model.Time) error {
keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) {
lt, err := cd.lastTime()
if err != nil {
return err
}
if !lt.Before(t) {
keepIdx = i
break
}
@ -310,6 +328,7 @@ func (s *memorySeries) dropChunks(t model.Time) {
numMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true
}
return nil
}
// preloadChunks is an internal helper method.
@ -350,8 +369,12 @@ func (s *memorySeries) preloadChunks(
s.headChunkUsedByIterator = true
}
curriedQuarantineSeries := func(err error) {
mss.quarantineSeries(fp, s.metric, err)
}
iter := &boundedIterator{
it: s.newIterator(pinnedChunkDescs),
it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries),
start: model.Now().Add(-mss.dropAfter),
}
@ -359,9 +382,10 @@ func (s *memorySeries) preloadChunks(
}
// newIterator returns a new SeriesIterator for the provided chunkDescs (which
// must be pinned). The caller must have locked the fingerprint of the
// memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator {
// must be pinned).
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the
@ -369,16 +393,45 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator
chunks = append(chunks, cd.c)
}
return &memorySeriesIterator{
chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)),
chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)),
quarantine: quarantine,
}
}
// preloadChunksForInstant preloads chunks for the latest value in the given
// range. If the last sample saved in the memorySeries itself is the latest
// value in the given range, it will in fact preload zero chunks and just take
// that value.
func (s *memorySeries) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
// If we have a lastSamplePair in the series, and thas last samplePair
// is in the interval, just take it in a singleSampleSeriesIterator. No
// need to pin or load anything.
lastSample := s.lastSamplePair()
if !through.Before(lastSample.Timestamp) &&
!from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair {
iter := &boundedIterator{
it: &singleSampleSeriesIterator{samplePair: lastSample},
start: model.Now().Add(-mss.dropAfter),
}
return nil, iter, nil
}
// If we are here, we are out of luck and have to delegate to the more
// expensive method.
return s.preloadChunksForRange(fp, from, through, mss)
}
// preloadChunksForRange loads chunks for the given range from the persistence.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
fp model.Fingerprint, mss *memorySeriesStorage,
mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 {
@ -410,7 +463,11 @@ func (s *memorySeries) preloadChunksForRange(
if fromIdx == len(s.chunkDescs) {
// Even the last chunk starts before "from". Find out if the
// series ends before "from" and we don't need to do anything.
if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) {
lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime()
if err != nil {
return nil, nopIter, err
}
if lt.Before(from) {
return nil, nopIter, nil
}
}
@ -435,8 +492,9 @@ func (s *memorySeries) head() *chunkDesc {
return s.chunkDescs[len(s.chunkDescs)-1]
}
// firstTime returns the timestamp of the first sample in the series. The caller
// must have locked the fingerprint of the memorySeries.
// firstTime returns the timestamp of the first sample in the series.
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) firstTime() model.Time {
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
return s.chunkDescs[0].firstTime()
@ -444,6 +502,23 @@ func (s *memorySeries) firstTime() model.Time {
return s.savedFirstTime
}
// lastSamplePair returns the last ingested SamplePair. It returns
// ZeroSamplePair if this memorySeries has never received a sample (via the add
// method), which is the case for freshly unarchived series or newly created
// ones and also for all series after a server restart. However, in that case,
// series will most likely be considered stale anyway.
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) lastSamplePair() model.SamplePair {
if !s.lastSampleValueSet {
return ZeroSamplePair
}
return model.SamplePair{
Timestamp: s.lastTime,
Value: s.lastSampleValue,
}
}
// chunksToPersist returns a slice of chunkDescs eligible for persistence. It's
// the caller's responsibility to actually persist the returned chunks
// afterwards. The method sets the persistWatermark and the dirty flag
@ -466,20 +541,33 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc {
// memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct {
chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators.
chunks []chunk
chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators.
chunks []chunk
quarantine func(error) // Call to quarantine the series this iterator belongs to.
}
// ValueAtOrBeforeTime implements SeriesIterator.
func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
// The most common case. We are iterating through a chunk.
if it.chunkIt != nil && it.chunkIt.contains(t) {
return it.chunkIt.valueAtOrBeforeTime(t)
if it.chunkIt != nil {
containsT, err := it.chunkIt.contains(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
if containsT {
value, err := it.chunkIt.valueAtOrBeforeTime(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
return value
}
}
if len(it.chunks) == 0 {
return model.SamplePair{Timestamp: model.Earliest}
return ZeroSamplePair
}
// Find the last chunk where firstTime() is before or equal to t.
@ -489,75 +577,15 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
})
if i == len(it.chunks) {
// Even the first chunk starts after t.
return model.SamplePair{Timestamp: model.Earliest}
return ZeroSamplePair
}
it.chunkIt = it.chunkIterator(l - i)
return it.chunkIt.valueAtOrBeforeTime(t)
}
// BoundaryValues implements SeriesIterator.
func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
// Find the first chunk for which the first sample is within the interval.
i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
})
// Only now check the last timestamp of the previous chunk (which is
// fairly expensive).
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
i--
value, err := it.chunkIt.valueAtOrBeforeTime(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
values := make([]model.SamplePair, 0, 2)
for j, c := range it.chunks[i:] {
if c.firstTime().After(in.NewestInclusive) {
if len(values) == 1 {
// We found the first value before but are now
// already past the last value. The value we
// want must be the last value of the previous
// chunk. So backtrack...
chunkIt := it.chunkIterator(i + j - 1)
values = append(values, model.SamplePair{
Timestamp: chunkIt.lastTimestamp(),
Value: chunkIt.lastSampleValue(),
})
}
break
}
chunkIt := it.chunkIterator(i + j)
if len(values) == 0 {
for s := range chunkIt.values() {
if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) {
values = append(values, *s)
// We cannot just break out here as we have to consume all
// the values to not leak a goroutine. This could obviously
// be made much neater with more suitable methods in the chunk
// interface. But currently, BoundaryValues is only used by
// `predict_linear` so we would pollute the chunk interface
// unduly just for one single corner case. Plus, even that use
// of BoundaryValues is suboptimal and should be replaced.
}
}
}
if chunkIt.lastTimestamp().After(in.NewestInclusive) {
s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive)
if s.Timestamp != model.Earliest {
values = append(values, s)
}
break
}
}
if len(values) == 1 {
// We found exactly one value. In that case, add the most recent we know.
chunkIt := it.chunkIterator(len(it.chunks) - 1)
values = append(values, model.SamplePair{
Timestamp: chunkIt.lastTimestamp(),
Value: chunkIt.lastSampleValue(),
})
}
if len(values) == 2 && values[0].Equal(&values[1]) {
return values[:1]
}
return values
return value
}
// RangeValues implements SeriesIterator.
@ -568,8 +596,15 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
})
// Only now check the last timestamp of the previous chunk (which is
// fairly expensive).
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
i--
if i > 0 {
lt, err := it.chunkIterator(i - 1).lastTimestamp()
if err != nil {
it.quarantine(err)
return nil
}
if !lt.Before(in.OldestInclusive) {
i--
}
}
values := []model.SamplePair{}
@ -577,7 +612,12 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
if c.firstTime().After(in.NewestInclusive) {
break
}
values = append(values, it.chunkIterator(i+j).rangeValues(in)...)
chValues, err := it.chunkIterator(i + j).rangeValues(in)
if err != nil {
it.quarantine(err)
return nil
}
values = append(values, chValues...)
}
return values
}
@ -593,17 +633,36 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
return chunkIt
}
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut
// iterator" that returns a single samplee only. The sample is saved in the
// iterator itself, so no chunks need to be pinned.
type singleSampleSeriesIterator struct {
samplePair model.SamplePair
}
// ValueAtTime implements SeriesIterator.
func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
if it.samplePair.Timestamp.After(t) {
return ZeroSamplePair
}
return it.samplePair
}
// RangeValues implements SeriesIterator.
func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
if it.samplePair.Timestamp.After(in.NewestInclusive) ||
it.samplePair.Timestamp.Before(in.OldestInclusive) {
return []model.SamplePair{}
}
return []model.SamplePair{it.samplePair}
}
// nopSeriesIterator implements Series Iterator. It never returns any values.
type nopSeriesIterator struct{}
// ValueAtTime implements SeriesIterator.
func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
return model.SamplePair{Timestamp: model.Earliest}
}
// BoundaryValues implements SeriesIterator.
func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
return []model.SamplePair{}
return ZeroSamplePair
}
// RangeValues implements SeriesIterator.

View file

@ -30,8 +30,9 @@ import (
)
const (
evictRequestsCap = 1024
chunkLen = 1024
evictRequestsCap = 1024
quarantineRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP.
fpMaxSweepTime = 6 * time.Hour
@ -77,6 +78,12 @@ type evictRequest struct {
evict bool
}
type quarantineRequest struct {
fp model.Fingerprint
metric model.Metric
reason error
}
// SyncStrategy is an enum to select a sync strategy for series files.
type SyncStrategy int
@ -147,6 +154,9 @@ type memorySeriesStorage struct {
evictRequests chan evictRequest
evictStopping, evictStopped chan struct{}
quarantineRequests chan quarantineRequest
quarantineStopping, quarantineStopped chan struct{}
persistErrors prometheus.Counter
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
@ -198,6 +208,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap),
quarantineStopping: make(chan struct{}),
quarantineStopped: make(chan struct{}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -312,6 +326,7 @@ func (s *memorySeriesStorage) Start() (err error) {
}
go s.handleEvictList()
go s.handleQuarantine()
go s.logThrottling()
go s.loop()
@ -326,6 +341,10 @@ func (s *memorySeriesStorage) Stop() error {
close(s.loopStopping)
<-s.loopStopped
log.Info("Stopping series quarantining...")
close(s.quarantineStopping)
<-s.quarantineStopped
log.Info("Stopping chunk eviction...")
close(s.evictStopping)
<-s.evictStopped
@ -348,15 +367,15 @@ func (s *memorySeriesStorage) WaitForIndexing() {
}
// LastSampleForFingerprint implements Storage.
func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair {
func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
return nil
return ZeroSamplePair
}
return series.head().lastSamplePair()
return series.lastSamplePair()
}
// boundedIterator wraps a SeriesIterator and does not allow fetching
@ -369,22 +388,11 @@ type boundedIterator struct {
// ValueAtOrBeforeTime implements the SeriesIterator interface.
func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
if ts < bit.start {
return model.SamplePair{Timestamp: model.Earliest}
return ZeroSamplePair
}
return bit.it.ValueAtOrBeforeTime(ts)
}
// BoundaryValues implements the SeriesIterator interface.
func (bit *boundedIterator) BoundaryValues(interval metric.Interval) []model.SamplePair {
if interval.NewestInclusive < bit.start {
return []model.SamplePair{}
}
if interval.OldestInclusive < bit.start {
interval.OldestInclusive = bit.start
}
return bit.it.BoundaryValues(interval)
}
// RangeValues implements the SeriesIterator interface.
func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair {
if interval.NewestInclusive < bit.start {
@ -532,22 +540,7 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.
// DropMetric implements Storage.
func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) {
for _, fp := range fps {
s.fpLocker.Lock(fp)
if series, ok := s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.persistence.unindexMetric(fp, series.metric)
} else if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.Errorf("Error purging metric with fingerprint %v: %v", fp, err)
}
// Attempt to delete series file in any case.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.Errorf("Error deleting series file for %v: %v", fp, err)
}
s.fpLocker.Unlock(fp)
s.seriesOps.WithLabelValues(requestedPurge).Inc()
s.purgeSeries(fp, nil, nil)
}
}
@ -565,34 +558,44 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric)
defer func() {
s.fpLocker.Unlock(fp)
}() // Func wrapper because fp might change below.
if err != nil {
log.Errorf("Error while mapping fingerprint %v: %v", rawFP, err)
s.persistence.setDirty(true)
s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
return err
}
if fp != rawFP {
// Switch locks.
s.fpLocker.Unlock(rawFP)
s.fpLocker.Lock(fp)
}
series := s.getOrCreateSeries(fp, sample.Metric)
series, err := s.getOrCreateSeries(fp, sample.Metric)
if err != nil {
return err // getOrCreateSeries took care of quarantining already.
}
if sample.Timestamp <= series.lastTime {
s.fpLocker.Unlock(fp)
// Don't log and track equal timestamps, as they are a common occurrence
// when using client-side timestamps (e.g. Pushgateway or federation).
// It would be even better to also compare the sample values here, but
// we don't have efficient access to a series's last value.
if sample.Timestamp != series.lastTime {
s.outOfOrderSamplesCount.Inc()
return ErrOutOfOrderSample
// Don't report "no-op appends", i.e. where timestamp and sample
// value are the same as for the last append, as they are a
// common occurrence when using client-side timestamps
// (e.g. Pushgateway or federation).
if sample.Timestamp == series.lastTime &&
series.lastSampleValueSet &&
sample.Value == series.lastSampleValue {
return nil
}
return nil
s.outOfOrderSamplesCount.Inc()
return ErrOutOfOrderSample // Caused by the caller.
}
completedChunksCount := series.add(&model.SamplePair{
completedChunksCount, err := series.add(model.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
s.fpLocker.Unlock(fp)
if err != nil {
s.quarantineSeries(fp, sample.Metric, err)
return err
}
s.ingestedSamplesCount.Inc()
s.incNumChunksToPersist(completedChunksCount)
@ -653,7 +656,7 @@ func (s *memorySeriesStorage) logThrottling() {
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries {
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp)
if !ok {
var cds []*chunkDesc
@ -661,6 +664,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil {
log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
return nil, err
}
if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc()
@ -671,7 +675,8 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
// appear as archived or purged).
cds, err = s.loadChunkDescs(fp, 0)
if err != nil {
log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err)
s.quarantineSeries(fp, m, err)
return nil, err
}
modTime = s.persistence.seriesFileModTime(fp)
} else {
@ -679,41 +684,87 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, cds, modTime)
series, err = newMemorySeries(m, cds, modTime)
if err != nil {
s.quarantineSeries(fp, m, err)
return nil, err
}
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}
return series, nil
}
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
func (s *memorySeriesStorage) getSeriesForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) *memorySeries {
series, ok := s.fpToSeries.get(fp)
if ok {
return series
}
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil
}
if last.Before(from) || first.After(through) {
return nil
}
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil
}
series, err = s.getOrCreateSeries(fp, metric)
if err != nil {
// getOrCreateSeries took care of quarantining already.
return nil
}
return series
}
func (s *memorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator, error) {
) ([]*chunkDesc, SeriesIterator) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
return nil, nopIter, err
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nopIter, nil
}
if from.Before(last) && through.After(first) {
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
return nil, nopIter, err
}
series = s.getOrCreateSeries(fp, metric)
} else {
return nil, nopIter, nil
}
series := s.getSeriesForRange(fp, from, through)
if series == nil {
return nil, nopIter
}
return series.preloadChunksForRange(from, through, fp, s)
cds, iter, err := series.preloadChunksForRange(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
return nil, nopIter
}
return cds, iter
}
func (s *memorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series := s.getSeriesForRange(fp, from, through)
if series == nil {
return nil, nopIter
}
cds, iter, err := series.preloadChunksForInstant(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
return nil, nopIter
}
return cds, iter
}
func (s *memorySeriesStorage) handleEvictList() {
@ -1129,7 +1180,10 @@ func (s *memorySeriesStorage) writeMemorySeries(
s.persistErrors.Inc()
return false
}
series.dropChunks(beforeTime)
if err := series.dropChunks(beforeTime); err != nil {
s.persistErrors.Inc()
return false
}
if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
// All chunks dropped from both memory and persistence. Delete the series for good.
s.fpToSeries.del(fp)
@ -1144,8 +1198,7 @@ func (s *memorySeriesStorage) writeMemorySeries(
} else {
series.chunkDescsOffset -= numDroppedFromPersistence
if series.chunkDescsOffset < 0 {
log.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series)
s.persistence.setDirty(true)
s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series))
series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery.
}
}
@ -1299,6 +1352,122 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
return score
}
// quarantineSeries registers the provided fingerprint for quarantining. It
// always returns immediately. Quarantine requests are processed
// asynchronously. If there are too many requests queued, they are simply
// dropped.
//
// Quarantining means that the series file is moved to the orphaned directory,
// and all its traces are removed from indices. Call this method if an
// unrecoverable error is detected while dealing with a series, and pass in the
// encountered error. It will be saved as a hint in the orphaned directory.
func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) {
req := quarantineRequest{fp: fp, metric: metric, reason: err}
select {
case s.quarantineRequests <- req:
// Request submitted.
default:
log.
With("fingerprint", fp).
With("metric", metric).
With("reason", err).
Warn("Quarantine queue full. Dropped quarantine request.")
s.seriesOps.WithLabelValues(droppedQuarantine).Inc()
}
}
func (s *memorySeriesStorage) handleQuarantine() {
for {
select {
case req := <-s.quarantineRequests:
s.purgeSeries(req.fp, req.metric, req.reason)
log.
With("fingerprint", req.fp).
With("metric", req.metric).
With("reason", req.reason).
Warn("Series quarantined.")
case <-s.quarantineStopping:
log.Info("Series quarantining stopped.")
close(s.quarantineStopped)
return
}
}
}
// purgeSeries removes all traces of a series. If a non-nil quarantine reason is
// provided, the series file will not be deleted completely, but moved to the
// orphaned directory with the reason and the metric in a hint file. The
// provided metric might be nil if unknown.
func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) {
s.fpLocker.Lock(fp)
var (
series *memorySeries
ok bool
)
if series, ok = s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
m = series.metric
// Adjust s.numChunksToPersist and numMemChunks down by
// the number of chunks in this series that are not
// persisted yet. Persisted chunks will be deducted from
// numMemChunks upon eviction.
numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark
atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted))
if !series.headChunkClosed {
// Head chunk wasn't counted as waiting for persistence yet.
// (But it was counted as a chunk in memory.)
numChunksNotYetPersisted--
}
s.incNumChunksToPersist(-numChunksNotYetPersisted)
} else {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error purging metric from archive.")
}
}
if m != nil {
// If we know a metric now, unindex it in any case.
// purgeArchivedMetric might have done so already, but we cannot
// be sure. Unindexing in idempotent, though.
s.persistence.unindexMetric(fp, m)
}
// Attempt to delete/quarantine the series file in any case.
if quarantineReason == nil {
// No reason stated, simply delete the file.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error deleting series file.")
}
s.seriesOps.WithLabelValues(requestedPurge).Inc()
} else {
if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil {
s.seriesOps.WithLabelValues(completedQurantine).Inc()
} else {
s.seriesOps.WithLabelValues(failedQuarantine).Inc()
log.
With("fingerprint", fp).
With("metric", m).
With("reason", quarantineReason).
With("error", err).
Error("Error quarantining series file.")
}
}
s.fpLocker.Unlock(fp)
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)

View file

@ -405,10 +405,7 @@ func TestRetentionCutoff(t *testing.T) {
defer pl.Close()
// Preload everything.
it, err := pl.PreloadRange(fp, insertStart, now)
if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err)
}
it := pl.PreloadRange(fp, insertStart, now)
val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute))
if val.Timestamp != model.Earliest {
@ -424,14 +421,6 @@ func TestRetentionCutoff(t *testing.T) {
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
if len(vals) != 2 {
t.Errorf("expected 2 values but got %d", len(vals))
}
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
}
func TestDropMetrics(t *testing.T) {
@ -500,18 +489,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Errorf("unexpected number of samples: %d", len(vals))
}
@ -533,18 +516,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
}
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals))
}
@ -557,6 +534,95 @@ func TestDropMetrics(t *testing.T) {
}
}
func TestQuarantineMetric(t *testing.T) {
now := model.Now()
insertStart := now.Add(-2 * time.Hour)
s, closer := NewTestStorage(t, 1)
defer closer.Close()
chunkFileExists := func(fp model.Fingerprint) (bool, error) {
f, err := s.persistence.openChunkFileForReading(fp)
if err == nil {
f.Close()
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"}
m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"}
m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"}
N := 120000
for j, m := range []model.Metric{m1, m2, m3} {
for i := 0; i < N; i++ {
smpl := &model.Sample{
Metric: m,
Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals.
Value: model.SampleValue(j),
}
s.Append(smpl)
}
}
s.WaitForIndexing()
// Archive m3, but first maintain it so that at least something is written to disk.
fpToBeArchived := m3.FastFingerprint()
s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric(
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.fpLocker.Unlock(fpToBeArchived)
// Corrupt the series file for m3.
f, err := os.Create(s.persistence.fileNameForFingerprint(fpToBeArchived))
if err != nil {
t.Fatal(err)
}
if _, err := f.WriteString("This is clearly not the content of a series file."); err != nil {
t.Fatal(err)
}
if f.Close(); err != nil {
t.Fatal(err)
}
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
if len(fps) != 3 {
t.Errorf("unexpected number of fingerprints: %d", len(fps))
}
pl := s.NewPreloader()
// This will access the corrupt file and lead to quarantining.
pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute)
pl.Close()
time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait.
s.WaitForIndexing()
fps2 := s.fingerprintsForLabelPairs(model.LabelPair{
Name: model.MetricNameLabel, Value: "test",
})
if len(fps2) != 2 {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
exists, err := chunkFileExists(fpToBeArchived)
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("chunk file exists for fp=%v", fpToBeArchived)
}
}
// TestLoop is just a smoke test for the loop method, if we can switch it on and
// off without disaster.
func TestLoop(t *testing.T) {
@ -627,7 +693,10 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
continue
}
for sample := range cd.c.newIterator().values() {
values = append(values, *sample)
if sample.error != nil {
t.Error(sample.error)
}
values = append(values, sample.SamplePair)
}
}
@ -670,10 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
// #1 Exactly on a sample.
for i, expected := range samples {
@ -747,10 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
b.ResetTimer()
@ -828,10 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
// #1 Zero length interval at sample.
for i, expected := range samples {
@ -983,10 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
b.ResetTimer()
@ -1032,32 +1089,26 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
actual := it.BoundaryValues(metric.Interval{
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
actual := it.RangeValues(metric.Interval{
OldestInclusive: 0,
NewestInclusive: 100000,
})
if len(actual) != 2 {
t.Fatal("expected two results after purging half of series")
if len(actual) < 4000 {
t.Fatalf("expected more than %d results after purging half of series, got %d", 4000, len(actual))
}
if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 {
t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp)
}
want := model.Time(19998)
if actual[1].Timestamp != want {
if actual[len(actual)-1].Timestamp != want {
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
}
// Drop everything.
s.maintainMemorySeries(fp, 100000)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
actual = it.BoundaryValues(metric.Interval{
_, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
actual = it.RangeValues(metric.Interval{
OldestInclusive: 0,
NewestInclusive: 100000,
})
@ -1082,8 +1133,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Archive metrics.
s.fpToSeries.del(fp)
lastTime, err := series.head().lastTime()
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
fp, series.metric, series.firstTime(), lastTime,
); err != nil {
t.Fatal(err)
}
@ -1133,8 +1188,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Archive metrics.
s.fpToSeries.del(fp)
lastTime, err = series.head().lastTime()
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
fp, series.metric, series.firstTime(), lastTime,
); err != nil {
t.Fatal(err)
}
@ -1528,10 +1587,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples,
t.Fatal(err)
}
p := s.NewPreloader()
it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
if err != nil {
t.Fatal(err)
}
it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
found := it.ValueAtOrBeforeTime(sample.Timestamp)
if found.Timestamp == model.Earliest {
t.Errorf("Sample %#v: Expected sample not found.", sample)
@ -1575,10 +1631,7 @@ func TestAppendOutOfOrder(t *testing.T) {
pl := s.NewPreloader()
defer pl.Close()
it, err := pl.PreloadRange(fp, 0, 2)
if err != nil {
t.Fatalf("Error preloading chunks: %s", err)
}
it := pl.PreloadRange(fp, 0, 2)
want := []model.SamplePair{
{

View file

@ -29,7 +29,6 @@ const (
ResultAppendTime
QueryAnalysisTime
GetValueAtTimeTime
GetBoundaryValuesTime
GetRangeValuesTime
ExecQueueTime
ViewDiskPreparationTime
@ -60,8 +59,6 @@ func (s QueryTiming) String() string {
return "Query analysis time"
case GetValueAtTimeTime:
return "GetValueAtTime() time"
case GetBoundaryValuesTime:
return "GetBoundaryValues() time"
case GetRangeValuesTime:
return "GetRangeValues() time"
case ExecQueueTime:

View file

@ -67,7 +67,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sp := h.storage.LastSamplePairForFingerprint(fp)
// Discard if sample does not exist or lays before the staleness interval.
if sp == nil || sp.Timestamp.Before(minTimestamp) {
if sp.Timestamp.Before(minTimestamp) {
continue
}