Handle errors caused by data corruption more gracefully

This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...

The ideas behind this are the following:

- panic only if it's a programming error. Data corruptions happen, and
  they are not programming errors.

- If we detect a data corruption, we "quarantine" the series,
  essentially removing it from the database and putting its data into
  a separate directory for forensics.

- Failure during writing to a series file is not considered corruption
  automatically. It will call setDirty, though, so that a
  crashrecovery upon the next restart will commence and check for
  that.

- Series quarantining and setDirty calls are logged and counted in
  metrics, but are hidden from the user of the interfaces in
  interface.go, whith the notable exception of Append(). The reasoning
  is that we treat corruption by removing the corrupted series, i.e. a
  query for it will return no results on its next call anyway, so
  return no results right now. In the case of Append(), we want to
  tell the user that no data has been appended, though.

Minor side effects:

- Now consistently using filepath.* instead of path.*.

- Introduced structured logging where I touched it. This makes things
  less consistent, but a complete change to structured logging would
  be out of scope for this PR.
This commit is contained in:
beorn7 2016-02-25 12:23:42 +01:00
parent 8766f99085
commit 0ea5801e47
14 changed files with 1002 additions and 368 deletions

View file

@ -146,21 +146,13 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
iter, err := p.PreloadRange(fp, start.Add(-rangeDuration), end)
if err != nil {
return nil, err
}
itersForDuration[fp] = iter
itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end)
}
for fp := range pt.instants {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
iter, err := p.PreloadInstant(fp, start, StalenessDelta)
if err != nil {
return nil, err
}
itersForDuration[fp] = iter
itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta)
}
}

View file

@ -112,7 +112,7 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc {
// add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of
// the series.
func (cd *chunkDesc) add(s *model.SamplePair) []chunk {
func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) {
return cd.c.add(s)
}
@ -169,9 +169,9 @@ func (cd *chunkDesc) firstTime() model.Time {
// lastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to
// be locked.
func (cd *chunkDesc) lastTime() model.Time {
func (cd *chunkDesc) lastTime() (model.Time, error) {
if cd.chunkLastTime != model.Earliest || cd.c == nil {
return cd.chunkLastTime
return cd.chunkLastTime, nil
}
return cd.c.newIterator().lastTimestamp()
}
@ -181,10 +181,15 @@ func (cd *chunkDesc) lastTime() model.Time {
// last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series.
func (cd *chunkDesc) maybePopulateLastTime() {
func (cd *chunkDesc) maybePopulateLastTime() error {
if cd.chunkLastTime == model.Earliest && cd.c != nil {
cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
t, err := cd.c.newIterator().lastTimestamp()
if err != nil {
return err
}
cd.chunkLastTime = t
}
return nil
}
// isEvicted returns whether the chunk is evicted. For safe concurrent access,
@ -241,14 +246,14 @@ type chunk interface {
// any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk.
add(sample *model.SamplePair) []chunk
add(sample model.SamplePair) ([]chunk, error)
clone() chunk
firstTime() model.Time
newIterator() chunkIterator
marshal(io.Writer) error
marshalToBuf([]byte) error
unmarshal(io.Reader) error
unmarshalFromBuf([]byte)
unmarshalFromBuf([]byte) error
encoding() chunkEncoding
}
@ -259,56 +264,73 @@ type chunkIterator interface {
// length returns the number of samples in the chunk.
length() int
// Gets the timestamp of the n-th sample in the chunk.
timestampAtIndex(int) model.Time
timestampAtIndex(int) (model.Time, error)
// Gets the last timestamp in the chunk.
lastTimestamp() model.Time
lastTimestamp() (model.Time, error)
// Gets the sample value of the n-th sample in the chunk.
sampleValueAtIndex(int) model.SampleValue
sampleValueAtIndex(int) (model.SampleValue, error)
// Gets the last sample value in the chunk.
lastSampleValue() model.SampleValue
lastSampleValue() (model.SampleValue, error)
// Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no
// applicable value exists, ZeroSamplePair is returned.
valueAtOrBeforeTime(model.Time) model.SamplePair
valueAtOrBeforeTime(model.Time) (model.SamplePair, error)
// Gets all values contained within a given interval.
rangeValues(metric.Interval) []model.SamplePair
rangeValues(metric.Interval) ([]model.SamplePair, error)
// Whether a given timestamp is contained between first and last value
// in the chunk.
contains(model.Time) bool
contains(model.Time) (bool, error)
// values returns a channel, from which all sample values in the chunk
// can be received in order. The channel is closed after the last
// one. It is generally not safe to mutate the chunk while the channel
// is still open.
values() <-chan *model.SamplePair
// is still open. If a value is returned with error!=nil, no further
// values will be returned and the channel is closed.
values() <-chan struct {
model.SamplePair
error
}
}
func transcodeAndAdd(dst chunk, src chunk, s *model.SamplePair) []chunk {
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
chunkOps.WithLabelValues(transcode).Inc()
head := dst
body := []chunk{}
for v := range src.newIterator().values() {
newChunks := head.add(v)
if v.error != nil {
return nil, v.error
}
newChunks, err := head.add(v.SamplePair)
if err != nil {
return nil, err
}
body = append(body, newChunks[:len(newChunks)-1]...)
head = newChunks[len(newChunks)-1]
}
newChunks := head.add(s)
return append(body, newChunks...)
newChunks, err := head.add(s)
if err != nil {
return nil, err
}
return append(body, newChunks...), nil
}
// newChunk creates a new chunk according to the encoding set by the
// defaultChunkEncoding flag.
func newChunk() chunk {
return newChunkForEncoding(DefaultChunkEncoding)
chunk, err := newChunkForEncoding(DefaultChunkEncoding)
if err != nil {
panic(err)
}
return chunk
}
func newChunkForEncoding(encoding chunkEncoding) chunk {
func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
switch encoding {
case delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen)
return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case doubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen)
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil
default:
panic(fmt.Errorf("unknown chunk encoding: %v", encoding))
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
}
}

View file

@ -14,10 +14,11 @@
package local
import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync/atomic"
@ -52,7 +53,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
log.Info("Scanning files.")
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dir, err := os.Open(dirname)
if os.IsNotExist(err) {
continue
@ -139,7 +140,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
}
}
p.setDirty(false)
p.setDirty(false, nil)
log.Warn("Crash recovery complete.")
return nil
}
@ -175,36 +176,46 @@ func (p *persistence) sanitizeSeries(
fingerprintToSeries map[model.Fingerprint]*memorySeries,
fpm fpMappings,
) (model.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name())
var (
fp model.Fingerprint
err error
filename = filepath.Join(dirname, fi.Name())
s *memorySeries
)
purge := func() {
var err error
defer func() {
if err != nil {
log.Errorf("Failed to move lost series file %s to orphaned directory, deleting it instead. Error was: %s", filename, err)
if err = os.Remove(filename); err != nil {
log.Errorf("Even deleting file %s did not work: %s", filename, err)
}
if fp != 0 {
var metric model.Metric
if s != nil {
metric = s.metric
}
}()
orphanedDir := path.Join(p.basePath, "orphaned", path.Base(dirname))
if err = os.MkdirAll(orphanedDir, 0700); err != nil {
return
if err = p.quarantineSeriesFile(
fp, errors.New("purge during crash recovery"), metric,
); err == nil {
return
}
log.
With("file", filename).
With("error", err).
Error("Failed to move lost series file to orphaned directory.")
}
if err = os.Rename(filename, path.Join(orphanedDir, fi.Name())); err != nil {
return
// If we are here, we are either purging an incorrectly named
// file, or quarantining has failed. So simply delete the file.
if err = os.Remove(filename); err != nil {
log.
With("file", filename).
With("error", err).
Error("Failed to delete lost series file.")
}
}
var fp model.Fingerprint
var err error
if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) ||
!strings.HasSuffix(fi.Name(), seriesFileSuffix) {
log.Warnf("Unexpected series file name %s.", filename)
purge()
return fp, false
}
if fp, err = model.FingerprintFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
log.Warnf("Error parsing file name %s: %s", filename, err)
purge()
return fp, false
@ -274,7 +285,15 @@ func (p *persistence) sanitizeSeries(
s.chunkDescs = cds
s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime()
s.lastTime = cds[len(cds)-1].lastTime()
s.lastTime, err = cds[len(cds)-1].lastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
s.persistWatermark = len(cds)
s.modTime = modTime
return fp, true
@ -304,7 +323,15 @@ func (p *persistence) sanitizeSeries(
s.savedFirstTime = cds[0].firstTime()
s.modTime = modTime
lastTime := cds[len(cds)-1].lastTime()
lastTime, err := cds[len(cds)-1].lastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
keepIdx := -1
for i, cd := range s.chunkDescs {
if cd.firstTime() >= lastTime {
@ -414,7 +441,10 @@ func (p *persistence) cleanUpArchiveIndexes(
if err != nil {
return err
}
series := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp)))
series, err := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp)))
if err != nil {
return err
}
fpToSeries[model.Fingerprint(fp)] = series
return nil
}); err != nil {

View file

@ -76,7 +76,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
}
// add implements chunk.
func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 {
c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
@ -89,14 +89,17 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
baseValue := c.baseValue()
dt := s.Timestamp - c.baseTime()
if dt < 0 {
panic("time delta is less than zero")
return nil, fmt.Errorf("time delta is less than zero: %v", dt)
}
dv := s.Value - baseValue
@ -130,8 +133,11 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
offset := len(c)
@ -148,7 +154,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
offset += int(tb)
@ -165,7 +171,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv)))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
}
} else {
switch vb {
@ -175,10 +181,10 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}
return []chunk{&c}, nil
}
// clone implements chunk.
@ -243,15 +249,24 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) {
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// encoding implements chunk.
@ -302,57 +317,108 @@ type deltaEncodedChunkIterator struct {
func (it *deltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair {
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t)
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 {
return ZeroSamplePair
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
return model.SamplePair{
Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1),
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
}
// rangeValues implements chunkIterator.
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair {
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len {
return nil
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
result = append(result, model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
})
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result
return result, nil
}
// contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t model.Time) bool {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair {
valuesChan := make(chan *model.SamplePair)
func (it *deltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
valuesChan <- &model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
@ -360,61 +426,61 @@ func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair {
}
// timestampAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
switch it.tBytes {
case d1:
return it.baseT + model.Time(uint8(it.c[offset]))
return it.baseT + model.Time(uint8(it.c[offset])), nil
case d2:
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:]))
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil
case d4:
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:]))
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default:
panic("invalid number of bytes for time delta")
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
}
}
// lastTimestamp implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time {
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
if it.isInt {
switch it.vBytes {
case d0:
return it.baseV
return it.baseV, nil
case d1:
return it.baseV + model.SampleValue(int8(it.c[offset]))
return it.baseV + model.SampleValue(int8(it.c[offset])), nil
case d2:
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints.
default:
panic("invalid number of bytes for integer delta")
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
}
} else {
switch it.vBytes {
case d4:
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default:
panic("invalid number of bytes for floating point delta")
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
}
}
}
// lastSampleValue implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue {
func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}

View file

@ -83,9 +83,9 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
}
// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 {
return c.addFirstSample(s)
return c.addFirstSample(s), nil
}
tb := c.timeBytes()
@ -101,8 +101,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta()
@ -136,8 +139,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
offset := len(c)
@ -154,7 +160,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
offset += int(tb)
@ -171,7 +177,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv)))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
}
} else {
switch vb {
@ -181,10 +187,10 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}
return []chunk{&c}, nil
}
// clone implements chunk.
@ -251,15 +257,24 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) {
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// encoding implements chunk.
@ -335,7 +350,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk {
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:],
@ -350,10 +365,10 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk {
// addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb deltaBytes) []chunk {
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) {
baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 {
panic("base time delta is less than zero")
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
}
c = c[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
@ -391,7 +406,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb del
math.Float64bits(float64(baseValueDelta)),
)
}
return []chunk{&c}
return []chunk{&c}, nil
}
// doubleDeltaEncodedChunkIterator implements chunkIterator.
@ -408,57 +423,108 @@ type doubleDeltaEncodedChunkIterator struct {
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair {
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t)
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 {
return ZeroSamplePair
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
return model.SamplePair{
Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1),
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
}
// rangeValues implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair {
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive)
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len {
return nil
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
result = append(result, model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
})
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result
return result, nil
}
// contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) bool {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair {
valuesChan := make(chan *model.SamplePair)
func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
valuesChan <- &model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
@ -466,17 +532,17 @@ func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair {
}
// timestampAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
if idx == 0 {
return it.baseT
return it.baseT, nil
}
if idx == 1 {
// If time bytes are at d8, the time is saved directly rather
// than as a difference.
if it.tBytes == d8 {
return it.baseΔT
return it.baseΔT, nil
}
return it.baseT + it.baseΔT
return it.baseT + it.baseΔT, nil
}
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
@ -485,40 +551,40 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time
case d1:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int8(it.c[offset]))
model.Time(int8(it.c[offset])), nil
case d2:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:])))
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:])))
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default:
panic("invalid number of bytes for time delta")
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
}
}
// lastTimestamp implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() model.Time {
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
if idx == 0 {
return it.baseV
return it.baseV, nil
}
if idx == 1 {
// If value bytes are at d8, the value is saved directly rather
// than as a difference.
if it.vBytes == d8 {
return it.baseΔV
return it.baseΔV, nil
}
return it.baseV + it.baseΔV
return it.baseV + it.baseΔV, nil
}
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
@ -527,39 +593,39 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.Sam
switch it.vBytes {
case d0:
return it.baseV +
model.SampleValue(idx)*it.baseΔV
model.SampleValue(idx)*it.baseΔV, nil
case d1:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int8(it.c[offset]))
model.SampleValue(int8(it.c[offset])), nil
case d2:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints.
default:
panic("invalid number of bytes for integer delta")
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
}
} else {
switch it.vBytes {
case d4:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default:
panic("invalid number of bytes for floating point delta")
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
}
}
}
// lastSampleValue implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() model.SampleValue {
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}

View file

@ -19,6 +19,7 @@ package index
import (
"os"
"path"
"path/filepath"
"github.com/prometheus/common/model"
@ -95,7 +96,7 @@ func (i *FingerprintMetricIndex) Lookup(fp model.Fingerprint) (metric model.Metr
// ready to use.
func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) {
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintToMetricDir),
Path: filepath.Join(basePath, fingerprintToMetricDir),
CacheSizeBytes: FingerprintMetricCacheSize,
})
if err != nil {
@ -167,7 +168,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l model.LabelName) (values map[mod
// LabelNameLabelValuesIndex ready to use.
func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) {
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelNameToLabelValuesDir),
Path: filepath.Join(basePath, labelNameToLabelValuesDir),
CacheSizeBytes: LabelNameLabelValuesCacheSize,
})
if err != nil {
@ -245,7 +246,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p model.LabelPair) (fps map[model.
// LabelPairFingerprintIndex ready to use.
func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) {
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelPairToFingerprintsDir),
Path: filepath.Join(basePath, labelPairToFingerprintsDir),
CacheSizeBytes: LabelPairFingerprintsCacheSize,
})
if err != nil {
@ -283,7 +284,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp model.Fingerprint) (firstTime, las
// FingerprintTimeRangeIndex ready to use.
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {
fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintTimeRangeDir),
Path: filepath.Join(basePath, fingerprintTimeRangeDir),
CacheSizeBytes: FingerprintTimeRangeCacheSize,
})
if err != nil {

View file

@ -60,6 +60,9 @@ const (
requestedPurge = "purge_on_request"
memoryMaintenance = "maintenance_in_memory"
archiveMaintenance = "maintenance_in_archive"
completedQurantine = "quarantine_completed"
droppedQuarantine = "quarantine_dropped"
failedQuarantine = "quarantine_failed"
// Op-types for chunkOps.
createAndPin = "create" // A chunkDesc creation with refCount=1.

View file

@ -73,7 +73,7 @@ type Storage interface {
// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of
// a series, i.e. it is safe to continue using a SeriesIterator after or during
// modifying the corresponding series, but the iterator will represent the state
// of the series prior the modification.
// of the series prior to the modification.
type SeriesIterator interface {
// Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no
@ -90,11 +90,11 @@ type Preloader interface {
PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error)
) SeriesIterator
PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) (SeriesIterator, error)
) SeriesIterator
// Close unpins any previously requested series data from memory.
Close()
}

View file

@ -20,7 +20,6 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
@ -46,6 +45,7 @@ const (
seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
hintFileSuffix = ".hint"
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
@ -321,8 +321,9 @@ func (p *persistence) isDirty() bool {
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was
// set to true with this method, it cannot be set to false again. (If we became
// dirty during our runtime, there is no way back. If we were dirty from the
// start, a clean-up might make us clean again.)
func (p *persistence) setDirty(dirty bool) {
// start, a clean-up might make us clean again.) The provided error will be
// logged as a reason if dirty is true.
func (p *persistence) setDirty(dirty bool, err error) {
if dirty {
p.dirtyCounter.Inc()
}
@ -334,7 +335,7 @@ func (p *persistence) setDirty(dirty bool) {
p.dirty = dirty
if dirty {
p.becameDirty = true
log.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
}
}
@ -371,8 +372,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) {
defer func() {
if err != nil {
log.Error("Error persisting chunks: ", err)
p.setDirty(true)
p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err))
}
}()
@ -441,8 +441,13 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err
}
for c := 0; c < batchSize; c++ {
chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:])
chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil {
return nil, err
}
if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return nil, err
}
chunks = append(chunks, chunk)
}
}
@ -470,7 +475,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
return nil, err
}
if fi.Size()%int64(chunkLenWithHeader) != 0 {
p.setDirty(true)
// The returned error will bubble up and lead to quarantining of the whole series.
return nil, fmt.Errorf(
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
fp, fi.Size(), chunkLenWithHeader,
@ -648,7 +653,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil {
lt, err := chunkDesc.lastTime()
if err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
return
}
} else {
@ -854,7 +863,12 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
p.dirty = true
return sm, chunksToPersist, nil
}
chunk := newChunkForEncoding(chunkEncoding(encoding))
chunk, err := newChunkForEncoding(chunkEncoding(encoding))
if err != nil {
log.Warn("Problem with chunk encoding:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
if err := chunk.unmarshal(r); err != nil {
log.Warn("Could not decode chunk:", err)
p.dirty = true
@ -871,6 +885,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
}
}
lt, err := chunkDescs[len(chunkDescs)-1].lastTime()
if err != nil {
log.Warn("Could not determine last time of head chunk:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{
metric: model.Metric(metric),
chunkDescs: chunkDescs,
@ -878,7 +899,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: model.Time(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
lastTime: lt,
headChunkClosed: headChunkClosed,
}
}
@ -908,8 +929,7 @@ func (p *persistence) dropAndPersistChunks(
// please handle with care!
defer func() {
if err != nil {
log.Error("Error dropping and/or persisting chunks: ", err)
p.setDirty(true)
p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err))
}
}()
@ -918,7 +938,15 @@ func (p *persistence) dropAndPersistChunks(
// too old. If that's the case, the chunks in the series file
// are all too old, too.
i := 0
for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ {
for ; i < len(chunks); i++ {
var lt model.Time
lt, err = chunks[i].newIterator().lastTimestamp()
if err != nil {
return
}
if !lt.Before(beforeTime) {
break
}
}
if i < len(chunks) {
firstTimeNotDropped = chunks[i].firstTime()
@ -1071,6 +1099,44 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
return numChunks, nil
}
// quarantineSeriesFile moves a series file to the orphaned directory. It also
// writes a hint file with the provided quarantine reason and, if series is
// non-nil, the string representation of the metric.
func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error {
var (
oldName = p.fileNameForFingerprint(fp)
orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName)))
newName = filepath.Join(orphanedDir, filepath.Base(oldName))
hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix
)
renameErr := os.MkdirAll(orphanedDir, 0700)
if renameErr != nil {
return renameErr
}
renameErr = os.Rename(oldName, newName)
if os.IsNotExist(renameErr) {
// Source file dosn't exist. That's normal.
renameErr = nil
}
// Write hint file even if the rename ended in an error. At least try...
// And ignore errors writing the hint file. It's best effort.
if f, err := os.Create(hintName); err == nil {
if metric != nil {
f.WriteString(metric.String() + "\n")
} else {
f.WriteString("[UNKNOWN METRIC]\n")
}
if quarantineReason != nil {
f.WriteString(quarantineReason.Error() + "\n")
} else {
f.WriteString("[UNKNOWN REASON]\n")
}
f.Close()
}
return renameErr
}
// seriesFileModTime returns the modification time of the series file belonging
// to the provided fingerprint. In case of an error, the zero value of time.Time
// is returned.
@ -1122,11 +1188,11 @@ func (p *persistence) archiveMetric(
fp model.Fingerprint, m model.Metric, first, last model.Time,
) error {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
p.setDirty(true)
p.setDirty(true, err)
return err
}
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
p.setDirty(true)
p.setDirty(true, err)
return err
}
return nil
@ -1139,6 +1205,9 @@ func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
hasMetric bool, firstTime, lastTime model.Time, err error,
) {
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil {
p.setDirty(true, err)
}
return
}
@ -1187,7 +1256,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error)
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
defer func() {
if err != nil {
p.setDirty(true)
p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err))
}
}()
@ -1218,12 +1287,8 @@ func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
// was actually deleted, the method returns true and the first time and last
// time of the deleted metric. The caller must have locked the fingerprint.
func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) {
defer func() {
if err != nil {
p.setDirty(true)
}
}()
// An error returned here will bubble up and lead to quarantining of the
// series, so no setDirty required.
deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
if err != nil || !deleted {
return false, err
@ -1279,17 +1344,17 @@ func (p *persistence) close() error {
func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen])
return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen])
}
func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
}
func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
}
func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) {
@ -1322,19 +1387,19 @@ func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, e
}
func (p *persistence) headsFileName() string {
return path.Join(p.basePath, headsFileName)
return filepath.Join(p.basePath, headsFileName)
}
func (p *persistence) headsTempFileName() string {
return path.Join(p.basePath, headsTempFileName)
return filepath.Join(p.basePath, headsTempFileName)
}
func (p *persistence) mappingsFileName() string {
return path.Join(p.basePath, mappingsFileName)
return filepath.Join(p.basePath, mappingsFileName)
}
func (p *persistence) mappingsTempFileName() string {
return path.Join(p.basePath, mappingsTempFileName)
return filepath.Join(p.basePath, mappingsTempFileName)
}
func (p *persistence) processIndexingQueue() {
@ -1616,7 +1681,9 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error {
b = b[:writeSize]
for i, chunk := range chunks[:batchSize] {
writeChunkHeader(b[i*chunkLenWithHeader:], chunk)
if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil {
return err
}
if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return err
}
@ -1642,14 +1709,19 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil
}
func writeChunkHeader(header []byte, c chunk) {
func writeChunkHeader(header []byte, c chunk) error {
header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:],
uint64(c.firstTime()),
)
lt, err := c.newIterator().lastTimestamp()
if err != nil {
return err
}
binary.LittleEndian.PutUint64(
header[chunkHeaderLastTimeOffset:],
uint64(c.newIterator().lastTimestamp()),
uint64(lt),
)
return nil
}

View file

@ -14,6 +14,10 @@
package local
import (
"bufio"
"errors"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
@ -49,7 +53,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes
})
}
func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk {
func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk {
fps := model.Fingerprints{
m1.FastFingerprint(),
m2.FastFingerprint(),
@ -60,10 +64,18 @@ func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk {
for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ {
fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&model.SamplePair{
ch, err := newChunkForEncoding(encoding)
if err != nil {
t.Fatal(err)
}
chs, err := ch.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(fp),
})[0])
})
if err != nil {
t.Fatal(err)
}
fpToChunks[fp] = append(fpToChunks[fp], chs[0])
}
}
return fpToChunks
@ -73,7 +85,7 @@ func chunksEqual(c1, c2 chunk) bool {
values2 := c2.newIterator().values()
for v1 := range c1.newIterator().values() {
v2 := <-values2
if !v1.Equal(v2) {
if !(v1 == v2) {
return false
}
}
@ -84,7 +96,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
fpToChunks := buildTestChunks(encoding)
fpToChunks := buildTestChunks(t, encoding)
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
@ -126,10 +138,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
}
for i, cd := range actualChunkDescs {
if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) {
lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(),
i, cd.firstTime(), lastTime,
)
}
@ -140,10 +156,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
}
for i, cd := range actualChunkDescs {
if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) {
lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(),
i, cd.firstTime(), lastTime,
)
}
@ -433,21 +453,21 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s1 := newMemorySeries(m1, nil, time.Time{})
s2 := newMemorySeries(m2, nil, time.Time{})
s3 := newMemorySeries(m3, nil, time.Time{})
s4 := newMemorySeries(m4, nil, time.Time{})
s5 := newMemorySeries(m5, nil, time.Time{})
s1.add(&model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(&model.SamplePair{Timestamp: 2, Value: 2.7})
s1, _ := newMemorySeries(m1, nil, time.Time{})
s2, _ := newMemorySeries(m2, nil, time.Time{})
s3, _ := newMemorySeries(m3, nil, time.Time{})
s4, _ := newMemorySeries(m4, nil, time.Time{})
s5, _ := newMemorySeries(m5, nil, time.Time{})
s1.add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true
s3.persistWatermark = 1
for i := 0; i < 10000; i++ {
s4.add(&model.SamplePair{
s4.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i) / 2,
})
s5.add(&model.SamplePair{
s5.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i * i),
})
@ -552,10 +572,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
i, lastTime, cd.chunkLastTime,
)
}
}
@ -605,10 +629,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
i, cd.chunkLastTime, lastTime,
)
}
}
@ -1051,6 +1079,108 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
}
}
func TestQuranatineSeriesFile(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) {
var (
fpStr = fp.String()
originalFile = p.fileNameForFingerprint(fp)
quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix)
)
if _, err := os.Stat(originalFile); !os.IsNotExist(err) {
t.Errorf("Expected file %q to not exist.", originalFile)
}
if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) {
t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err)
}
f, err := os.Open(hintFile)
defer f.Close()
if err != nil {
t.Errorf("Could not open hint file %q: %s", hintFile, err)
return
}
scanner := bufio.NewScanner(f)
for _, want := range contentHintFile {
if !scanner.Scan() {
t.Errorf("Unexpected end of hint file %q.", hintFile)
return
}
got := scanner.Text()
if want != got {
t.Errorf("Want hint line %q, got %q.", want, got)
}
}
if scanner.Scan() {
t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text())
}
}
if err := p.quarantineSeriesFile(0, nil, nil); err != nil {
t.Error(err)
}
verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
1, errors.New("file does not exist"),
nil,
); err != nil {
t.Error(err)
}
verify(1, false, "[UNKNOWN METRIC]", "file does not exist")
if err := p.quarantineSeriesFile(
2, errors.New("file does not exist"),
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist")
if err := p.quarantineSeriesFile(
3, nil,
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]")
err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm)
if err != nil {
t.Fatal(err)
}
f, err := os.Create(p.fileNameForFingerprint(4))
if err != nil {
t.Fatal(err)
}
f.Close()
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
verify(4, true, `{sound="cloud"}`, "file exists")
if err := p.quarantineSeriesFile(4, nil, nil); err != nil {
t.Error(err)
}
// Overwrites hint file but leaves series file intact.
verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
// Overwrites everything.
verify(4, true, `{sound="cloud"}`, "file exists")
}
var fpStrings = []string{
"b004b821ca50ba26",
"b037c21e884e4fc5",

View file

@ -29,26 +29,20 @@ type memorySeriesPreloader struct {
func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false)
if err != nil {
return iter, err
}
) SeriesIterator {
cds, iter := p.storage.preloadChunksForRange(fp, from, through, false)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil
return iter
}
// PreloadInstant implements Preloader
func (p *memorySeriesPreloader) PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true)
if err != nil {
return nil, err
}
) SeriesIterator {
cds, iter := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil
return iter
}
// Close implements Preloader.

View file

@ -191,12 +191,15 @@ type memorySeries struct {
// set to model.Earliest. The zero value for modTime can be used if the
// modification time of the series file is unknown (e.g. if this is a genuinely
// new series).
func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries {
func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) {
var err error
firstTime := model.Earliest
lastTime := model.Earliest
if len(chunkDescs) > 0 {
firstTime = chunkDescs[0].firstTime()
lastTime = chunkDescs[len(chunkDescs)-1].lastTime()
if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil {
return nil, err
}
}
return &memorySeries{
metric: m,
@ -206,14 +209,14 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time)
lastTime: lastTime,
persistWatermark: len(chunkDescs),
modTime: modTime,
}
}, nil
}
// add adds a sample pair to the series. It returns the number of newly
// completed chunks (which are now eligible for persistence).
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(v *model.SamplePair) int {
func (s *memorySeries) add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := newChunkDesc(newChunk(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead)
@ -235,7 +238,10 @@ func (s *memorySeries) add(v *model.SamplePair) int {
s.headChunkUsedByIterator = false
}
chunks := s.head().add(v)
chunks, err := s.head().add(v)
if err != nil {
return 0, err
}
s.head().c = chunks[0]
for _, c := range chunks[1:] {
@ -250,7 +256,7 @@ func (s *memorySeries) add(v *model.SamplePair) int {
s.lastTime = v.Timestamp
s.lastSampleValue = v.Value
s.lastSampleValueSet = true
return len(chunks) - 1
return len(chunks) - 1, nil
}
// maybeCloseHeadChunk closes the head chunk if it has not been touched for the
@ -295,10 +301,14 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
// dropChunks removes chunkDescs older than t. The caller must have locked the
// fingerprint of the series.
func (s *memorySeries) dropChunks(t model.Time) {
func (s *memorySeries) dropChunks(t model.Time) error {
keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) {
lt, err := cd.lastTime()
if err != nil {
return err
}
if !lt.Before(t) {
keepIdx = i
break
}
@ -318,6 +328,7 @@ func (s *memorySeries) dropChunks(t model.Time) {
numMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true
}
return nil
}
// preloadChunks is an internal helper method.
@ -358,8 +369,12 @@ func (s *memorySeries) preloadChunks(
s.headChunkUsedByIterator = true
}
curriedQuarantineSeries := func(err error) {
mss.quarantineSeries(fp, s.metric, err)
}
iter := &boundedIterator{
it: s.newIterator(pinnedChunkDescs),
it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries),
start: model.Now().Add(-mss.dropAfter),
}
@ -370,7 +385,7 @@ func (s *memorySeries) preloadChunks(
// must be pinned).
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator {
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the
@ -378,8 +393,9 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator
chunks = append(chunks, cd.c)
}
return &memorySeriesIterator{
chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)),
chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)),
quarantine: quarantine,
}
}
@ -437,7 +453,11 @@ func (s *memorySeries) preloadChunksForRange(
if fromIdx == len(s.chunkDescs) {
// Even the last chunk starts before "from". Find out if the
// series ends before "from" and we don't need to do anything.
if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) {
lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime()
if err != nil {
return nil, nopIter, err
}
if lt.Before(from) {
return nil, nopIter, nil
}
}
@ -511,16 +531,29 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc {
// memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct {
chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators.
chunks []chunk
chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators.
chunks []chunk
quarantine func(error) // Call to quarantine the series this iterator belongs to.
}
// ValueAtOrBeforeTime implements SeriesIterator.
func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
// The most common case. We are iterating through a chunk.
if it.chunkIt != nil && it.chunkIt.contains(t) {
return it.chunkIt.valueAtOrBeforeTime(t)
if it.chunkIt != nil {
containsT, err := it.chunkIt.contains(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
if containsT {
value, err := it.chunkIt.valueAtOrBeforeTime(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
return value
}
}
if len(it.chunks) == 0 {
@ -537,7 +570,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
return ZeroSamplePair
}
it.chunkIt = it.chunkIterator(l - i)
return it.chunkIt.valueAtOrBeforeTime(t)
value, err := it.chunkIt.valueAtOrBeforeTime(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
return value
}
// RangeValues implements SeriesIterator.
@ -548,8 +586,15 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
})
// Only now check the last timestamp of the previous chunk (which is
// fairly expensive).
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
i--
if i > 0 {
lt, err := it.chunkIterator(i - 1).lastTimestamp()
if err != nil {
it.quarantine(err)
return nil
}
if !lt.Before(in.OldestInclusive) {
i--
}
}
values := []model.SamplePair{}
@ -557,7 +602,12 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
if c.firstTime().After(in.NewestInclusive) {
break
}
values = append(values, it.chunkIterator(i+j).rangeValues(in)...)
chValues, err := it.chunkIterator(i + j).rangeValues(in)
if err != nil {
it.quarantine(err)
return nil
}
values = append(values, chValues...)
}
return values
}

View file

@ -30,8 +30,9 @@ import (
)
const (
evictRequestsCap = 1024
chunkLen = 1024
evictRequestsCap = 1024
quarantineRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP.
fpMaxSweepTime = 6 * time.Hour
@ -77,6 +78,12 @@ type evictRequest struct {
evict bool
}
type quarantineRequest struct {
fp model.Fingerprint
metric model.Metric
reason error
}
// SyncStrategy is an enum to select a sync strategy for series files.
type SyncStrategy int
@ -147,6 +154,9 @@ type memorySeriesStorage struct {
evictRequests chan evictRequest
evictStopping, evictStopped chan struct{}
quarantineRequests chan quarantineRequest
quarantineStopping, quarantineStopped chan struct{}
persistErrors prometheus.Counter
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
@ -198,6 +208,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap),
quarantineStopping: make(chan struct{}),
quarantineStopped: make(chan struct{}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -312,6 +326,7 @@ func (s *memorySeriesStorage) Start() (err error) {
}
go s.handleEvictList()
go s.handleQuarantine()
go s.logThrottling()
go s.loop()
@ -326,6 +341,10 @@ func (s *memorySeriesStorage) Stop() error {
close(s.loopStopping)
<-s.loopStopped
log.Info("Stopping series quarantining...")
close(s.quarantineStopping)
<-s.quarantineStopped
log.Info("Stopping chunk eviction...")
close(s.evictStopping)
<-s.evictStopped
@ -521,22 +540,7 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.
// DropMetric implements Storage.
func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) {
for _, fp := range fps {
s.fpLocker.Lock(fp)
if series, ok := s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.persistence.unindexMetric(fp, series.metric)
} else if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.Errorf("Error purging metric with fingerprint %v: %v", fp, err)
}
// Attempt to delete series file in any case.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.Errorf("Error deleting series file for %v: %v", fp, err)
}
s.fpLocker.Unlock(fp)
s.seriesOps.WithLabelValues(requestedPurge).Inc()
s.purgeSeries(fp, nil, nil)
}
}
@ -554,19 +558,24 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric)
defer func() {
s.fpLocker.Unlock(fp)
}() // Func wrapper because fp might change below.
if err != nil {
log.Errorf("Error while mapping fingerprint %v: %v", rawFP, err)
s.persistence.setDirty(true)
s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
return err
}
if fp != rawFP {
// Switch locks.
s.fpLocker.Unlock(rawFP)
s.fpLocker.Lock(fp)
}
series := s.getOrCreateSeries(fp, sample.Metric)
series, err := s.getOrCreateSeries(fp, sample.Metric)
if err != nil {
return err // getOrCreateSeries took care of quarantining already.
}
if sample.Timestamp <= series.lastTime {
s.fpLocker.Unlock(fp)
// Don't report "no-op appends", i.e. where timestamp and sample
// value are the same as for the last append, as they are a
// common occurrence when using client-side timestamps
@ -577,13 +586,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
return nil
}
s.outOfOrderSamplesCount.Inc()
return ErrOutOfOrderSample
return ErrOutOfOrderSample // Caused by the caller.
}
completedChunksCount := series.add(&model.SamplePair{
completedChunksCount, err := series.add(model.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
s.fpLocker.Unlock(fp)
if err != nil {
s.quarantineSeries(fp, sample.Metric, err)
return err
}
s.ingestedSamplesCount.Inc()
s.incNumChunksToPersist(completedChunksCount)
@ -644,7 +656,7 @@ func (s *memorySeriesStorage) logThrottling() {
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries {
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp)
if !ok {
var cds []*chunkDesc
@ -652,6 +664,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil {
log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
return nil, err
}
if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc()
@ -662,7 +675,8 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
// appear as archived or purged).
cds, err = s.loadChunkDescs(fp, 0)
if err != nil {
log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err)
s.quarantineSeries(fp, m, err)
return nil, err
}
modTime = s.persistence.seriesFileModTime(fp)
} else {
@ -670,18 +684,22 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, cds, modTime)
series, err = newMemorySeries(m, cds, modTime)
if err != nil {
s.quarantineSeries(fp, m, err)
return nil, err
}
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}
return series
return series, nil
}
func (s *memorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
lastSampleOnly bool,
) ([]*chunkDesc, SeriesIterator, error) {
) ([]*chunkDesc, SeriesIterator) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
@ -689,23 +707,34 @@ func (s *memorySeriesStorage) preloadChunksForRange(
if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
return nil, nopIter, err
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil, nopIter
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nopIter, nil
return nil, nopIter
}
if from.Before(last) && through.After(first) {
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
return nil, nopIter, err
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil, nopIter
}
series, err = s.getOrCreateSeries(fp, metric)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Error while retrieving series.")
return nil, nopIter
}
series = s.getOrCreateSeries(fp, metric)
} else {
return nil, nopIter, nil
return nil, nopIter
}
}
return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s)
cds, it, err := series.preloadChunksForRange(fp, from, through, lastSampleOnly, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
return nil, nopIter
}
return cds, it
}
func (s *memorySeriesStorage) handleEvictList() {
@ -1121,7 +1150,10 @@ func (s *memorySeriesStorage) writeMemorySeries(
s.persistErrors.Inc()
return false
}
series.dropChunks(beforeTime)
if err := series.dropChunks(beforeTime); err != nil {
s.persistErrors.Inc()
return false
}
if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
// All chunks dropped from both memory and persistence. Delete the series for good.
s.fpToSeries.del(fp)
@ -1136,8 +1168,7 @@ func (s *memorySeriesStorage) writeMemorySeries(
} else {
series.chunkDescsOffset -= numDroppedFromPersistence
if series.chunkDescsOffset < 0 {
log.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series)
s.persistence.setDirty(true)
s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series))
series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery.
}
}
@ -1291,6 +1322,122 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
return score
}
// quarantineSeries registers the provided fingerprint for quarantining. It
// always returns immediately. Quarantine requests are processed
// asynchronously. If there are too many requests queued, they are simply
// dropped.
//
// Quarantining means that the series file is moved to the orphaned directory,
// and all its traces are removed from indices. Call this method if an
// unrecoverable error is detected while dealing with a series, and pass in the
// encountered error. It will be saved as a hint in the orphaned directory.
func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) {
req := quarantineRequest{fp: fp, metric: metric, reason: err}
select {
case s.quarantineRequests <- req:
// Request submitted.
default:
log.
With("fingerprint", fp).
With("metric", metric).
With("reason", err).
Warn("Quarantine queue full. Dropped quarantine request.")
s.seriesOps.WithLabelValues(droppedQuarantine).Inc()
}
}
func (s *memorySeriesStorage) handleQuarantine() {
for {
select {
case req := <-s.quarantineRequests:
s.purgeSeries(req.fp, req.metric, req.reason)
log.
With("fingerprint", req.fp).
With("metric", req.metric).
With("reason", req.reason).
Warn("Series quarantined.")
case <-s.quarantineStopping:
log.Info("Series quarantining stopped.")
close(s.quarantineStopped)
return
}
}
}
// purgeSeries removes all traces of a series. If a non-nil quarantine reason is
// provided, the series file will not be deleted completely, but moved to the
// orphaned directory with the reason and the metric in a hint file. The
// provided metric might be nil if unknown.
func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) {
s.fpLocker.Lock(fp)
var (
series *memorySeries
ok bool
)
if series, ok = s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
m = series.metric
// Adjust s.numChunksToPersist and numMemChunks down by
// the number of chunks in this series that are not
// persisted yet. Persisted chunks will be deducted from
// numMemChunks upon eviction.
numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark
atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted))
if !series.headChunkClosed {
// Head chunk wasn't counted as waiting for persistence yet.
// (But it was counted as a chunk in memory.)
numChunksNotYetPersisted--
}
s.incNumChunksToPersist(-numChunksNotYetPersisted)
} else {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error purging metric from archive.")
}
}
if m != nil {
// If we know a metric now, unindex it in any case.
// purgeArchivedMetric might have done so already, but we cannot
// be sure. Unindexing in idempotent, though.
s.persistence.unindexMetric(fp, m)
}
// Attempt to delete/quarantine the series file in any case.
if quarantineReason == nil {
// No reason stated, simply delete the file.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error deleting series file.")
}
s.seriesOps.WithLabelValues(requestedPurge).Inc()
} else {
if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil {
s.seriesOps.WithLabelValues(completedQurantine).Inc()
} else {
s.seriesOps.WithLabelValues(failedQuarantine).Inc()
log.
With("fingerprint", fp).
With("metric", m).
With("reason", quarantineReason).
With("error", err).
Error("Error quarantining series file.")
}
}
s.fpLocker.Unlock(fp)
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)

View file

@ -405,10 +405,7 @@ func TestRetentionCutoff(t *testing.T) {
defer pl.Close()
// Preload everything.
it, err := pl.PreloadRange(fp, insertStart, now)
if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err)
}
it := pl.PreloadRange(fp, insertStart, now)
val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute))
if val.Timestamp != model.Earliest {
@ -492,18 +489,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Errorf("unexpected number of samples: %d", len(vals))
}
@ -525,18 +516,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
}
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals))
}
@ -549,6 +534,95 @@ func TestDropMetrics(t *testing.T) {
}
}
func TestQuarantineMetric(t *testing.T) {
now := model.Now()
insertStart := now.Add(-2 * time.Hour)
s, closer := NewTestStorage(t, 1)
defer closer.Close()
chunkFileExists := func(fp model.Fingerprint) (bool, error) {
f, err := s.persistence.openChunkFileForReading(fp)
if err == nil {
f.Close()
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"}
m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"}
m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"}
N := 120000
for j, m := range []model.Metric{m1, m2, m3} {
for i := 0; i < N; i++ {
smpl := &model.Sample{
Metric: m,
Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals.
Value: model.SampleValue(j),
}
s.Append(smpl)
}
}
s.WaitForIndexing()
// Archive m3, but first maintain it so that at least something is written to disk.
fpToBeArchived := m3.FastFingerprint()
s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric(
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.fpLocker.Unlock(fpToBeArchived)
// Corrupt the series file for m3.
f, err := os.Create(s.persistence.fileNameForFingerprint(fpToBeArchived))
if err != nil {
t.Fatal(err)
}
if _, err := f.WriteString("This is clearly not the content of a series file."); err != nil {
t.Fatal(err)
}
if f.Close(); err != nil {
t.Fatal(err)
}
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
if len(fps) != 3 {
t.Errorf("unexpected number of fingerprints: %d", len(fps))
}
pl := s.NewPreloader()
// This will access the corrupt file and lead to quarantining.
pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute)
pl.Close()
time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait.
s.WaitForIndexing()
fps2 := s.fingerprintsForLabelPairs(model.LabelPair{
Name: model.MetricNameLabel, Value: "test",
})
if len(fps2) != 2 {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
exists, err := chunkFileExists(fpToBeArchived)
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("chunk file exists for fp=%v", fpToBeArchived)
}
}
// TestLoop is just a smoke test for the loop method, if we can switch it on and
// off without disaster.
func TestLoop(t *testing.T) {
@ -619,7 +693,10 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
continue
}
for sample := range cd.c.newIterator().values() {
values = append(values, *sample)
if sample.error != nil {
t.Error(sample.error)
}
values = append(values, sample.SamplePair)
}
}
@ -662,10 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
// #1 Exactly on a sample.
for i, expected := range samples {
@ -739,10 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
b.ResetTimer()
@ -820,10 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
// #1 Zero length interval at sample.
for i, expected := range samples {
@ -975,10 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
b.ResetTimer()
@ -1024,10 +1089,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
actual := it.RangeValues(metric.Interval{
OldestInclusive: 0,
NewestInclusive: 100000,
@ -1045,10 +1107,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop everything.
s.maintainMemorySeries(fp, 100000)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
_, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
actual = it.RangeValues(metric.Interval{
OldestInclusive: 0,
NewestInclusive: 100000,
@ -1074,8 +1133,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Archive metrics.
s.fpToSeries.del(fp)
lastTime, err := series.head().lastTime()
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
fp, series.metric, series.firstTime(), lastTime,
); err != nil {
t.Fatal(err)
}
@ -1125,8 +1188,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Archive metrics.
s.fpToSeries.del(fp)
lastTime, err = series.head().lastTime()
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
fp, series.metric, series.firstTime(), lastTime,
); err != nil {
t.Fatal(err)
}
@ -1520,10 +1587,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples,
t.Fatal(err)
}
p := s.NewPreloader()
it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
if err != nil {
t.Fatal(err)
}
it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
found := it.ValueAtOrBeforeTime(sample.Timestamp)
if found.Timestamp == model.Earliest {
t.Errorf("Sample %#v: Expected sample not found.", sample)
@ -1567,10 +1631,7 @@ func TestAppendOutOfOrder(t *testing.T) {
pl := s.NewPreloader()
defer pl.Close()
it, err := pl.PreloadRange(fp, 0, 2)
if err != nil {
t.Fatalf("Error preloading chunks: %s", err)
}
it := pl.PreloadRange(fp, 0, 2)
want := []model.SamplePair{
{