diff --git a/promql/analyzer.go b/promql/analyzer.go index f56c4fc82a..bad5fbd92b 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -146,21 +146,13 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - iter, err := p.PreloadRange(fp, start.Add(-rangeDuration), end) - if err != nil { - return nil, err - } - itersForDuration[fp] = iter + itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end) } for fp := range pt.instants { if err = contextDone(ctx, env); err != nil { return nil, err } - iter, err := p.PreloadInstant(fp, start, StalenessDelta) - if err != nil { - return nil, err - } - itersForDuration[fp] = iter + itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta) } } diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 9411cac7c8..d8b1ae2b8d 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -112,7 +112,7 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { // add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (cd *chunkDesc) add(s *model.SamplePair) []chunk { +func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) { return cd.c.add(s) } @@ -169,9 +169,9 @@ func (cd *chunkDesc) firstTime() model.Time { // lastTime returns the timestamp of the last sample in the chunk. For safe // concurrent access, this method requires the fingerprint of the time series to // be locked. -func (cd *chunkDesc) lastTime() model.Time { +func (cd *chunkDesc) lastTime() (model.Time, error) { if cd.chunkLastTime != model.Earliest || cd.c == nil { - return cd.chunkLastTime + return cd.chunkLastTime, nil } return cd.c.newIterator().lastTimestamp() } @@ -181,10 +181,15 @@ func (cd *chunkDesc) lastTime() model.Time { // last sample to a chunk or after closing a head chunk due to age. For safe // concurrent access, the chunk must be pinned, and the caller must have locked // the fingerprint of the series. -func (cd *chunkDesc) maybePopulateLastTime() { +func (cd *chunkDesc) maybePopulateLastTime() error { if cd.chunkLastTime == model.Earliest && cd.c != nil { - cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + t, err := cd.c.newIterator().lastTimestamp() + if err != nil { + return err + } + cd.chunkLastTime = t } + return nil } // isEvicted returns whether the chunk is evicted. For safe concurrent access, @@ -241,14 +246,14 @@ type chunk interface { // any. The first chunk returned might be the same as the original one // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the original chunk. - add(sample *model.SamplePair) []chunk + add(sample model.SamplePair) ([]chunk, error) clone() chunk firstTime() model.Time newIterator() chunkIterator marshal(io.Writer) error marshalToBuf([]byte) error unmarshal(io.Reader) error - unmarshalFromBuf([]byte) + unmarshalFromBuf([]byte) error encoding() chunkEncoding } @@ -259,56 +264,73 @@ type chunkIterator interface { // length returns the number of samples in the chunk. length() int // Gets the timestamp of the n-th sample in the chunk. - timestampAtIndex(int) model.Time + timestampAtIndex(int) (model.Time, error) // Gets the last timestamp in the chunk. - lastTimestamp() model.Time + lastTimestamp() (model.Time, error) // Gets the sample value of the n-th sample in the chunk. - sampleValueAtIndex(int) model.SampleValue + sampleValueAtIndex(int) (model.SampleValue, error) // Gets the last sample value in the chunk. - lastSampleValue() model.SampleValue + lastSampleValue() (model.SampleValue, error) // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no // applicable value exists, ZeroSamplePair is returned. - valueAtOrBeforeTime(model.Time) model.SamplePair + valueAtOrBeforeTime(model.Time) (model.SamplePair, error) // Gets all values contained within a given interval. - rangeValues(metric.Interval) []model.SamplePair + rangeValues(metric.Interval) ([]model.SamplePair, error) // Whether a given timestamp is contained between first and last value // in the chunk. - contains(model.Time) bool + contains(model.Time) (bool, error) // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last // one. It is generally not safe to mutate the chunk while the channel - // is still open. - values() <-chan *model.SamplePair + // is still open. If a value is returned with error!=nil, no further + // values will be returned and the channel is closed. + values() <-chan struct { + model.SamplePair + error + } } -func transcodeAndAdd(dst chunk, src chunk, s *model.SamplePair) []chunk { +func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() head := dst body := []chunk{} for v := range src.newIterator().values() { - newChunks := head.add(v) + if v.error != nil { + return nil, v.error + } + newChunks, err := head.add(v.SamplePair) + if err != nil { + return nil, err + } body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] } - newChunks := head.add(s) - return append(body, newChunks...) + newChunks, err := head.add(s) + if err != nil { + return nil, err + } + return append(body, newChunks...), nil } // newChunk creates a new chunk according to the encoding set by the // defaultChunkEncoding flag. func newChunk() chunk { - return newChunkForEncoding(DefaultChunkEncoding) + chunk, err := newChunkForEncoding(DefaultChunkEncoding) + if err != nil { + panic(err) + } + return chunk } -func newChunkForEncoding(encoding chunkEncoding) chunk { +func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { switch encoding { case delta: - return newDeltaEncodedChunk(d1, d0, true, chunkLen) + return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil case doubleDelta: - return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) + return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil default: - panic(fmt.Errorf("unknown chunk encoding: %v", encoding)) + return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index d4c9a4fa01..f51e54e7be 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -14,10 +14,11 @@ package local import ( + "errors" "fmt" "io" "os" - "path" + "path/filepath" "strings" "sync/atomic" @@ -52,7 +53,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint log.Info("Scanning files.") for i := 0; i < 1<<(seriesDirNameLen*4); i++ { - dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) + dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) dir, err := os.Open(dirname) if os.IsNotExist(err) { continue @@ -139,7 +140,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false) + p.setDirty(false, nil) log.Warn("Crash recovery complete.") return nil } @@ -175,36 +176,46 @@ func (p *persistence) sanitizeSeries( fingerprintToSeries map[model.Fingerprint]*memorySeries, fpm fpMappings, ) (model.Fingerprint, bool) { - filename := path.Join(dirname, fi.Name()) + var ( + fp model.Fingerprint + err error + filename = filepath.Join(dirname, fi.Name()) + s *memorySeries + ) + purge := func() { - var err error - defer func() { - if err != nil { - log.Errorf("Failed to move lost series file %s to orphaned directory, deleting it instead. Error was: %s", filename, err) - if err = os.Remove(filename); err != nil { - log.Errorf("Even deleting file %s did not work: %s", filename, err) - } + if fp != 0 { + var metric model.Metric + if s != nil { + metric = s.metric } - }() - orphanedDir := path.Join(p.basePath, "orphaned", path.Base(dirname)) - if err = os.MkdirAll(orphanedDir, 0700); err != nil { - return + if err = p.quarantineSeriesFile( + fp, errors.New("purge during crash recovery"), metric, + ); err == nil { + return + } + log. + With("file", filename). + With("error", err). + Error("Failed to move lost series file to orphaned directory.") } - if err = os.Rename(filename, path.Join(orphanedDir, fi.Name())); err != nil { - return + // If we are here, we are either purging an incorrectly named + // file, or quarantining has failed. So simply delete the file. + if err = os.Remove(filename); err != nil { + log. + With("file", filename). + With("error", err). + Error("Failed to delete lost series file.") } } - var fp model.Fingerprint - var err error - if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) || !strings.HasSuffix(fi.Name(), seriesFileSuffix) { log.Warnf("Unexpected series file name %s.", filename) purge() return fp, false } - if fp, err = model.FingerprintFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { + if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { log.Warnf("Error parsing file name %s: %s", filename, err) purge() return fp, false @@ -274,7 +285,15 @@ func (p *persistence) sanitizeSeries( s.chunkDescs = cds s.chunkDescsOffset = 0 s.savedFirstTime = cds[0].firstTime() - s.lastTime = cds[len(cds)-1].lastTime() + s.lastTime, err = cds[len(cds)-1].lastTime() + if err != nil { + log.Errorf( + "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } s.persistWatermark = len(cds) s.modTime = modTime return fp, true @@ -304,7 +323,15 @@ func (p *persistence) sanitizeSeries( s.savedFirstTime = cds[0].firstTime() s.modTime = modTime - lastTime := cds[len(cds)-1].lastTime() + lastTime, err := cds[len(cds)-1].lastTime() + if err != nil { + log.Errorf( + "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } keepIdx := -1 for i, cd := range s.chunkDescs { if cd.firstTime() >= lastTime { @@ -414,7 +441,10 @@ func (p *persistence) cleanUpArchiveIndexes( if err != nil { return err } - series := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) + series, err := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) + if err != nil { + return err + } fpToSeries[model.Fingerprint(fp)] = series return nil }); err != nil { diff --git a/storage/local/delta.go b/storage/local/delta.go index 99da249c41..c787020722 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -76,7 +76,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod } // add implements chunk. -func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { +func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { if c.len() == 0 { c = c[:deltaHeaderBytes] binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) @@ -89,14 +89,17 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } baseValue := c.baseValue() dt := s.Timestamp - c.baseTime() if dt < 0 { - panic("time delta is less than zero") + return nil, fmt.Errorf("time delta is less than zero: %v", dt) } dv := s.Value - baseValue @@ -130,8 +133,11 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } offset := len(c) @@ -148,7 +154,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: - panic("invalid number of bytes for time delta") + return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } offset += int(tb) @@ -165,7 +171,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv))) // d8 must not happen. Those samples are encoded as float64. default: - panic("invalid number of bytes for integer delta") + return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) } } else { switch vb { @@ -175,10 +181,10 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: - panic("invalid number of bytes for floating point delta") + return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c} + return []chunk{&c}, nil } // clone implements chunk. @@ -243,15 +249,24 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { if _, err := io.ReadFull(r, *c); err != nil { return err } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] return nil } // unmarshalFromBuf implements chunk. -func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { +func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] + return nil } // encoding implements chunk. @@ -302,57 +317,108 @@ type deltaEncodedChunkIterator struct { func (it *deltaEncodedChunkIterator) length() int { return it.len } // valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + var lastErr error i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) + ts, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return ts.After(t) }) - if i == 0 { - return ZeroSamplePair + if i == 0 || lastErr != nil { + return ZeroSamplePair, lastErr } - return model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), + ts, err := it.timestampAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err } + v, err := it.sampleValueAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err + } + return model.SamplePair{Timestamp: ts, Value: v}, nil } // rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { +func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + var lastErr error + oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return !t.Before(in.OldestInclusive) }) newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return t.After(in.NewestInclusive) }) - if oldest == it.len { - return nil + if oldest == it.len || lastErr != nil { + return nil, lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - }) + t, err := it.timestampAtIndex(i) + if err != nil { + return nil, err + } + v, err := it.sampleValueAtIndex(i) + if err != nil { + return nil, err + } + result = append(result, model.SamplePair{Timestamp: t, Value: v}) } - return result + return result, nil } // contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) bool { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + lastT, err := it.timestampAtIndex(it.len - 1) + if err != nil { + return false, err + } + return !t.Before(it.baseT) && !t.After(lastT), nil } // values implements chunkIterator. -func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { - valuesChan := make(chan *model.SamplePair) +func (it *deltaEncodedChunkIterator) values() <-chan struct { + model.SamplePair + error +} { + valuesChan := make(chan struct { + model.SamplePair + error + }) go func() { for i := 0; i < it.len; i++ { - valuesChan <- &model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), + t, err := it.timestampAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break } + v, err := it.sampleValueAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break + } + valuesChan <- struct { + model.SamplePair + error + }{model.SamplePair{Timestamp: t, Value: v}, nil} } close(valuesChan) }() @@ -360,61 +426,61 @@ func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { } // timestampAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) switch it.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])) + return it.baseT + model.Time(uint8(it.c[offset])), nil case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil default: - panic("invalid number of bytes for time delta") + return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } } // lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time { +func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { return it.timestampAtIndex(it.len - 1) } // sampleValueAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) if it.isInt { switch it.vBytes { case d0: - return it.baseV + return it.baseV, nil case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])) + return it.baseV + model.SampleValue(int8(it.c[offset])), nil case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil // No d8 for ints. default: - panic("invalid number of bytes for integer delta") + return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil default: - panic("invalid number of bytes for floating point delta") + return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } } // lastSampleValue implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue { +func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index cb50db6a7c..257c845443 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -83,9 +83,9 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub } // add implements chunk. -func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { if c.len() == 0 { - return c.addFirstSample(s) + return c.addFirstSample(s), nil } tb := c.timeBytes() @@ -101,8 +101,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta() @@ -136,8 +139,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } offset := len(c) @@ -154,7 +160,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: - panic("invalid number of bytes for time delta") + return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } offset += int(tb) @@ -171,7 +177,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv))) // d8 must not happen. Those samples are encoded as float64. default: - panic("invalid number of bytes for integer delta") + return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) } } else { switch vb { @@ -181,10 +187,10 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: - panic("invalid number of bytes for floating point delta") + return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c} + return []chunk{&c}, nil } // clone implements chunk. @@ -251,15 +257,24 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { if _, err := io.ReadFull(r, *c); err != nil { return err } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] return nil } // unmarshalFromBuf implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { +func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] + return nil } // encoding implements chunk. @@ -335,7 +350,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk { c = c[:doubleDeltaHeaderBaseValueOffset+8] binary.LittleEndian.PutUint64( c[doubleDeltaHeaderBaseTimeOffset:], @@ -350,10 +365,10 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { // addSecondSample is a helper method only used by c.add(). It calculates the // base delta from the provided sample and adds it to the chunk. -func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb deltaBytes) []chunk { +func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - panic("base time delta is less than zero") + return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) } c = c[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { @@ -391,7 +406,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb del math.Float64bits(float64(baseValueDelta)), ) } - return []chunk{&c} + return []chunk{&c}, nil } // doubleDeltaEncodedChunkIterator implements chunkIterator. @@ -408,57 +423,108 @@ type doubleDeltaEncodedChunkIterator struct { func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } // valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { +func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + var lastErr error i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) + ts, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return ts.After(t) }) - if i == 0 { - return ZeroSamplePair + if i == 0 || lastErr != nil { + return ZeroSamplePair, lastErr } - return model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), + ts, err := it.timestampAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err } + v, err := it.sampleValueAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err + } + return model.SamplePair{Timestamp: ts, Value: v}, nil } // rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { +func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + var lastErr error + oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return !t.Before(in.OldestInclusive) }) newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return t.After(in.NewestInclusive) }) - if oldest == it.len { - return nil + if oldest == it.len || lastErr != nil { + return nil, lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - }) + t, err := it.timestampAtIndex(i) + if err != nil { + return nil, err + } + v, err := it.sampleValueAtIndex(i) + if err != nil { + return nil, err + } + result = append(result, model.SamplePair{Timestamp: t, Value: v}) } - return result + return result, nil } // contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) bool { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + lastT, err := it.timestampAtIndex(it.len - 1) + if err != nil { + return false, err + } + return !t.Before(it.baseT) && !t.After(lastT), nil } // values implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { - valuesChan := make(chan *model.SamplePair) +func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct { + model.SamplePair + error +} { + valuesChan := make(chan struct { + model.SamplePair + error + }) go func() { for i := 0; i < it.len; i++ { - valuesChan <- &model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), + t, err := it.timestampAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break } + v, err := it.sampleValueAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break + } + valuesChan <- struct { + model.SamplePair + error + }{model.SamplePair{Timestamp: t, Value: v}, nil} } close(valuesChan) }() @@ -466,17 +532,17 @@ func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { } // timestampAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { if idx == 0 { - return it.baseT + return it.baseT, nil } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. if it.tBytes == d8 { - return it.baseΔT + return it.baseΔT, nil } - return it.baseT + it.baseΔT + return it.baseT + it.baseΔT, nil } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) @@ -485,40 +551,40 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time case d1: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])) + model.Time(int8(it.c[offset])), nil case d2: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil default: - panic("invalid number of bytes for time delta") + return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } } // lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() model.Time { +func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { return it.timestampAtIndex(it.len - 1) } // sampleValueAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { if idx == 0 { - return it.baseV + return it.baseV, nil } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. if it.vBytes == d8 { - return it.baseΔV + return it.baseΔV, nil } - return it.baseV + it.baseΔV + return it.baseV + it.baseΔV, nil } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) @@ -527,39 +593,39 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.Sam switch it.vBytes { case d0: return it.baseV + - model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV, nil case d1: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])) + model.SampleValue(int8(it.c[offset])), nil case d2: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil // No d8 for ints. default: - panic("invalid number of bytes for integer delta") + return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil default: - panic("invalid number of bytes for floating point delta") + return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } } // lastSampleValue implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() model.SampleValue { +func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 14200092aa..e189004cc7 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -19,6 +19,7 @@ package index import ( "os" "path" + "path/filepath" "github.com/prometheus/common/model" @@ -95,7 +96,7 @@ func (i *FingerprintMetricIndex) Lookup(fp model.Fingerprint) (metric model.Metr // ready to use. func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) { fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, fingerprintToMetricDir), + Path: filepath.Join(basePath, fingerprintToMetricDir), CacheSizeBytes: FingerprintMetricCacheSize, }) if err != nil { @@ -167,7 +168,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l model.LabelName) (values map[mod // LabelNameLabelValuesIndex ready to use. func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) { labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, labelNameToLabelValuesDir), + Path: filepath.Join(basePath, labelNameToLabelValuesDir), CacheSizeBytes: LabelNameLabelValuesCacheSize, }) if err != nil { @@ -245,7 +246,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p model.LabelPair) (fps map[model. // LabelPairFingerprintIndex ready to use. func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) { labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, labelPairToFingerprintsDir), + Path: filepath.Join(basePath, labelPairToFingerprintsDir), CacheSizeBytes: LabelPairFingerprintsCacheSize, }) if err != nil { @@ -283,7 +284,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp model.Fingerprint) (firstTime, las // FingerprintTimeRangeIndex ready to use. func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, fingerprintTimeRangeDir), + Path: filepath.Join(basePath, fingerprintTimeRangeDir), CacheSizeBytes: FingerprintTimeRangeCacheSize, }) if err != nil { diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 85a7aa5e0f..6d43ebd395 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -60,6 +60,9 @@ const ( requestedPurge = "purge_on_request" memoryMaintenance = "maintenance_in_memory" archiveMaintenance = "maintenance_in_archive" + completedQurantine = "quarantine_completed" + droppedQuarantine = "quarantine_dropped" + failedQuarantine = "quarantine_failed" // Op-types for chunkOps. createAndPin = "create" // A chunkDesc creation with refCount=1. diff --git a/storage/local/interface.go b/storage/local/interface.go index 5508d8c769..d9dbc4f213 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -73,7 +73,7 @@ type Storage interface { // methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of // a series, i.e. it is safe to continue using a SeriesIterator after or during // modifying the corresponding series, but the iterator will represent the state -// of the series prior the modification. +// of the series prior to the modification. type SeriesIterator interface { // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no @@ -90,11 +90,11 @@ type Preloader interface { PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - ) (SeriesIterator, error) + ) SeriesIterator PreloadInstant( fp model.Fingerprint, timestamp model.Time, stalenessDelta time.Duration, - ) (SeriesIterator, error) + ) SeriesIterator // Close unpins any previously requested series data from memory. Close() } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 22c16eccd2..acccc8874f 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -20,7 +20,6 @@ import ( "io" "io/ioutil" "os" - "path" "path/filepath" "strconv" "strings" @@ -46,6 +45,7 @@ const ( seriesFileSuffix = ".db" seriesTempFileSuffix = ".db.tmp" seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. + hintFileSuffix = ".hint" headsFileName = "heads.db" headsTempFileName = "heads.db.tmp" @@ -321,8 +321,9 @@ func (p *persistence) isDirty() bool { // setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was // set to true with this method, it cannot be set to false again. (If we became // dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) -func (p *persistence) setDirty(dirty bool) { +// start, a clean-up might make us clean again.) The provided error will be +// logged as a reason if dirty is true. +func (p *persistence) setDirty(dirty bool, err error) { if dirty { p.dirtyCounter.Inc() } @@ -334,7 +335,7 @@ func (p *persistence) setDirty(dirty bool) { p.dirty = dirty if dirty { p.becameDirty = true - log.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } } @@ -371,8 +372,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { defer func() { if err != nil { - log.Error("Error persisting chunks: ", err) - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err)) } }() @@ -441,8 +441,13 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse return nil, err } for c := 0; c < batchSize; c++ { - chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) - chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]) + chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + if err != nil { + return nil, err + } + if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { + return nil, err + } chunks = append(chunks, chunk) } } @@ -470,7 +475,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ return nil, err } if fi.Size()%int64(chunkLenWithHeader) != 0 { - p.setDirty(true) + // The returned error will bubble up and lead to quarantining of the whole series. return nil, fmt.Errorf( "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", fp, fi.Size(), chunkLenWithHeader, @@ -648,7 +653,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { + lt, err := chunkDesc.lastTime() + if err != nil { + return + } + if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { return } } else { @@ -854,7 +863,12 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in p.dirty = true return sm, chunksToPersist, nil } - chunk := newChunkForEncoding(chunkEncoding(encoding)) + chunk, err := newChunkForEncoding(chunkEncoding(encoding)) + if err != nil { + log.Warn("Problem with chunk encoding:", err) + p.dirty = true + return sm, chunksToPersist, nil + } if err := chunk.unmarshal(r); err != nil { log.Warn("Could not decode chunk:", err) p.dirty = true @@ -871,6 +885,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } } + lt, err := chunkDescs[len(chunkDescs)-1].lastTime() + if err != nil { + log.Warn("Could not determine last time of head chunk:", err) + p.dirty = true + return sm, chunksToPersist, nil + } + fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, @@ -878,7 +899,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: model.Time(savedFirstTime), - lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), + lastTime: lt, headChunkClosed: headChunkClosed, } } @@ -908,8 +929,7 @@ func (p *persistence) dropAndPersistChunks( // please handle with care! defer func() { if err != nil { - log.Error("Error dropping and/or persisting chunks: ", err) - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err)) } }() @@ -918,7 +938,15 @@ func (p *persistence) dropAndPersistChunks( // too old. If that's the case, the chunks in the series file // are all too old, too. i := 0 - for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ { + for ; i < len(chunks); i++ { + var lt model.Time + lt, err = chunks[i].newIterator().lastTimestamp() + if err != nil { + return + } + if !lt.Before(beforeTime) { + break + } } if i < len(chunks) { firstTimeNotDropped = chunks[i].firstTime() @@ -1071,6 +1099,44 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) { return numChunks, nil } +// quarantineSeriesFile moves a series file to the orphaned directory. It also +// writes a hint file with the provided quarantine reason and, if series is +// non-nil, the string representation of the metric. +func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error { + var ( + oldName = p.fileNameForFingerprint(fp) + orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName))) + newName = filepath.Join(orphanedDir, filepath.Base(oldName)) + hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix + ) + + renameErr := os.MkdirAll(orphanedDir, 0700) + if renameErr != nil { + return renameErr + } + renameErr = os.Rename(oldName, newName) + if os.IsNotExist(renameErr) { + // Source file dosn't exist. That's normal. + renameErr = nil + } + // Write hint file even if the rename ended in an error. At least try... + // And ignore errors writing the hint file. It's best effort. + if f, err := os.Create(hintName); err == nil { + if metric != nil { + f.WriteString(metric.String() + "\n") + } else { + f.WriteString("[UNKNOWN METRIC]\n") + } + if quarantineReason != nil { + f.WriteString(quarantineReason.Error() + "\n") + } else { + f.WriteString("[UNKNOWN REASON]\n") + } + f.Close() + } + return renameErr +} + // seriesFileModTime returns the modification time of the series file belonging // to the provided fingerprint. In case of an error, the zero value of time.Time // is returned. @@ -1122,11 +1188,11 @@ func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, ) error { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true) + p.setDirty(true, err) return err } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true) + p.setDirty(true, err) return err } return nil @@ -1139,6 +1205,9 @@ func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( hasMetric bool, firstTime, lastTime model.Time, err error, ) { firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + if err != nil { + p.setDirty(true, err) + } return } @@ -1187,7 +1256,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) } }() @@ -1218,12 +1287,8 @@ func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { // was actually deleted, the method returns true and the first time and last // time of the deleted metric. The caller must have locked the fingerprint. func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) { - defer func() { - if err != nil { - p.setDirty(true) - } - }() - + // An error returned here will bubble up and lead to quarantining of the + // series, so no setDirty required. deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) if err != nil || !deleted { return false, err @@ -1279,17 +1344,17 @@ func (p *persistence) close() error { func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen]) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen]) } func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) } func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) } func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) { @@ -1322,19 +1387,19 @@ func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, e } func (p *persistence) headsFileName() string { - return path.Join(p.basePath, headsFileName) + return filepath.Join(p.basePath, headsFileName) } func (p *persistence) headsTempFileName() string { - return path.Join(p.basePath, headsTempFileName) + return filepath.Join(p.basePath, headsTempFileName) } func (p *persistence) mappingsFileName() string { - return path.Join(p.basePath, mappingsFileName) + return filepath.Join(p.basePath, mappingsFileName) } func (p *persistence) mappingsTempFileName() string { - return path.Join(p.basePath, mappingsTempFileName) + return filepath.Join(p.basePath, mappingsTempFileName) } func (p *persistence) processIndexingQueue() { @@ -1616,7 +1681,9 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { b = b[:writeSize] for i, chunk := range chunks[:batchSize] { - writeChunkHeader(b[i*chunkLenWithHeader:], chunk) + if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil { + return err + } if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { return err } @@ -1642,14 +1709,19 @@ func chunkIndexForOffset(offset int64) (int, error) { return int(offset) / chunkLenWithHeader, nil } -func writeChunkHeader(header []byte, c chunk) { +func writeChunkHeader(header []byte, c chunk) error { header[chunkHeaderTypeOffset] = byte(c.encoding()) binary.LittleEndian.PutUint64( header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()), ) + lt, err := c.newIterator().lastTimestamp() + if err != nil { + return err + } binary.LittleEndian.PutUint64( header[chunkHeaderLastTimeOffset:], - uint64(c.newIterator().lastTimestamp()), + uint64(lt), ) + return nil } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 47cf9078a1..e1894032af 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -14,6 +14,10 @@ package local import ( + "bufio" + "errors" + "os" + "path/filepath" "reflect" "sync" "testing" @@ -49,7 +53,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes }) } -func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { +func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk { fps := model.Fingerprints{ m1.FastFingerprint(), m2.FastFingerprint(), @@ -60,10 +64,18 @@ func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { for _, fp := range fps { fpToChunks[fp] = make([]chunk, 0, 10) for i := 0; i < 10; i++ { - fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&model.SamplePair{ + ch, err := newChunkForEncoding(encoding) + if err != nil { + t.Fatal(err) + } + chs, err := ch.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(fp), - })[0]) + }) + if err != nil { + t.Fatal(err) + } + fpToChunks[fp] = append(fpToChunks[fp], chs[0]) } } return fpToChunks @@ -73,7 +85,7 @@ func chunksEqual(c1, c2 chunk) bool { values2 := c2.newIterator().values() for v1 := range c1.newIterator().values() { v2 := <-values2 - if !v1.Equal(v2) { + if !(v1 == v2) { return false } } @@ -84,7 +96,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() - fpToChunks := buildTestChunks(encoding) + fpToChunks := buildTestChunks(t, encoding) for fp, chunks := range fpToChunks { firstTimeNotDropped, offset, numDropped, allDropped, err := @@ -126,10 +138,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) } for i, cd := range actualChunkDescs { - if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { + lastTime, err := cd.lastTime() + if err != nil { + t.Fatal(err) + } + if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), cd.lastTime(), + i, cd.firstTime(), lastTime, ) } @@ -140,10 +156,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) } for i, cd := range actualChunkDescs { - if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { + lastTime, err := cd.lastTime() + if err != nil { + t.Fatal(err) + } + if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), cd.lastTime(), + i, cd.firstTime(), lastTime, ) } @@ -433,21 +453,21 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, nil, time.Time{}) - s2 := newMemorySeries(m2, nil, time.Time{}) - s3 := newMemorySeries(m3, nil, time.Time{}) - s4 := newMemorySeries(m4, nil, time.Time{}) - s5 := newMemorySeries(m5, nil, time.Time{}) - s1.add(&model.SamplePair{Timestamp: 1, Value: 3.14}) - s3.add(&model.SamplePair{Timestamp: 2, Value: 2.7}) + s1, _ := newMemorySeries(m1, nil, time.Time{}) + s2, _ := newMemorySeries(m2, nil, time.Time{}) + s3, _ := newMemorySeries(m3, nil, time.Time{}) + s4, _ := newMemorySeries(m4, nil, time.Time{}) + s5, _ := newMemorySeries(m5, nil, time.Time{}) + s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) + s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true s3.persistWatermark = 1 for i := 0; i < 10000; i++ { - s4.add(&model.SamplePair{ + s4.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i) / 2, }) - s5.add(&model.SamplePair{ + s5.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i * i), }) @@ -552,10 +572,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + lastTime, err := cd.c.newIterator().lastTimestamp() + if err != nil { + t.Fatal(err) + } + if cd.chunkLastTime != lastTime { t.Errorf( "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + i, lastTime, cd.chunkLastTime, ) } } @@ -605,10 +629,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + lastTime, err := cd.c.newIterator().lastTimestamp() + if err != nil { + t.Fatal(err) + } + if cd.chunkLastTime != lastTime { t.Errorf( "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + i, cd.chunkLastTime, lastTime, ) } } @@ -1051,6 +1079,108 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } } +func TestQuranatineSeriesFile(t *testing.T) { + p, closer := newTestPersistence(t, 1) + defer closer.Close() + + verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) { + var ( + fpStr = fp.String() + originalFile = p.fileNameForFingerprint(fp) + quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) + hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix) + ) + if _, err := os.Stat(originalFile); !os.IsNotExist(err) { + t.Errorf("Expected file %q to not exist.", originalFile) + } + if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) { + t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err) + } + f, err := os.Open(hintFile) + defer f.Close() + if err != nil { + t.Errorf("Could not open hint file %q: %s", hintFile, err) + return + } + scanner := bufio.NewScanner(f) + for _, want := range contentHintFile { + if !scanner.Scan() { + t.Errorf("Unexpected end of hint file %q.", hintFile) + return + } + got := scanner.Text() + if want != got { + t.Errorf("Want hint line %q, got %q.", want, got) + } + } + if scanner.Scan() { + t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text()) + } + } + + if err := p.quarantineSeriesFile(0, nil, nil); err != nil { + t.Error(err) + } + verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]") + + if err := p.quarantineSeriesFile( + 1, errors.New("file does not exist"), + nil, + ); err != nil { + t.Error(err) + } + verify(1, false, "[UNKNOWN METRIC]", "file does not exist") + + if err := p.quarantineSeriesFile( + 2, errors.New("file does not exist"), + model.Metric{"foo": "bar", "dings": "bums"}, + ); err != nil { + t.Error(err) + } + verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist") + + if err := p.quarantineSeriesFile( + 3, nil, + model.Metric{"foo": "bar", "dings": "bums"}, + ); err != nil { + t.Error(err) + } + verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]") + + err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm) + if err != nil { + t.Fatal(err) + } + f, err := os.Create(p.fileNameForFingerprint(4)) + if err != nil { + t.Fatal(err) + } + f.Close() + + if err := p.quarantineSeriesFile( + 4, errors.New("file exists"), + model.Metric{"sound": "cloud"}, + ); err != nil { + t.Error(err) + } + verify(4, true, `{sound="cloud"}`, "file exists") + + if err := p.quarantineSeriesFile(4, nil, nil); err != nil { + t.Error(err) + } + // Overwrites hint file but leaves series file intact. + verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]") + + if err := p.quarantineSeriesFile( + 4, errors.New("file exists"), + model.Metric{"sound": "cloud"}, + ); err != nil { + t.Error(err) + } + // Overwrites everything. + verify(4, true, `{sound="cloud"}`, "file exists") +} + var fpStrings = []string{ "b004b821ca50ba26", "b037c21e884e4fc5", diff --git a/storage/local/preload.go b/storage/local/preload.go index 65f1aac835..b0113bd6be 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -29,26 +29,20 @@ type memorySeriesPreloader struct { func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, -) (SeriesIterator, error) { - cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false) - if err != nil { - return iter, err - } +) SeriesIterator { + cds, iter := p.storage.preloadChunksForRange(fp, from, through, false) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter, nil + return iter } // PreloadInstant implements Preloader func (p *memorySeriesPreloader) PreloadInstant( fp model.Fingerprint, timestamp model.Time, stalenessDelta time.Duration, -) (SeriesIterator, error) { - cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) - if err != nil { - return nil, err - } +) SeriesIterator { + cds, iter := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter, nil + return iter } // Close implements Preloader. diff --git a/storage/local/series.go b/storage/local/series.go index 7a027a8a5d..f76d5ee27b 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -191,12 +191,15 @@ type memorySeries struct { // set to model.Earliest. The zero value for modTime can be used if the // modification time of the series file is unknown (e.g. if this is a genuinely // new series). -func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries { +func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) { + var err error firstTime := model.Earliest lastTime := model.Earliest if len(chunkDescs) > 0 { firstTime = chunkDescs[0].firstTime() - lastTime = chunkDescs[len(chunkDescs)-1].lastTime() + if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil { + return nil, err + } } return &memorySeries{ metric: m, @@ -206,14 +209,14 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) lastTime: lastTime, persistWatermark: len(chunkDescs), modTime: modTime, - } + }, nil } // add adds a sample pair to the series. It returns the number of newly // completed chunks (which are now eligible for persistence). // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(v *model.SamplePair) int { +func (s *memorySeries) add(v model.SamplePair) (int, error) { if len(s.chunkDescs) == 0 || s.headChunkClosed { newHead := newChunkDesc(newChunk(), v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) @@ -235,7 +238,10 @@ func (s *memorySeries) add(v *model.SamplePair) int { s.headChunkUsedByIterator = false } - chunks := s.head().add(v) + chunks, err := s.head().add(v) + if err != nil { + return 0, err + } s.head().c = chunks[0] for _, c := range chunks[1:] { @@ -250,7 +256,7 @@ func (s *memorySeries) add(v *model.SamplePair) int { s.lastTime = v.Timestamp s.lastSampleValue = v.Value s.lastSampleValueSet = true - return len(chunks) - 1 + return len(chunks) - 1, nil } // maybeCloseHeadChunk closes the head chunk if it has not been touched for the @@ -295,10 +301,14 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { // dropChunks removes chunkDescs older than t. The caller must have locked the // fingerprint of the series. -func (s *memorySeries) dropChunks(t model.Time) { +func (s *memorySeries) dropChunks(t model.Time) error { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { - if !cd.lastTime().Before(t) { + lt, err := cd.lastTime() + if err != nil { + return err + } + if !lt.Before(t) { keepIdx = i break } @@ -318,6 +328,7 @@ func (s *memorySeries) dropChunks(t model.Time) { numMemChunkDescs.Sub(float64(keepIdx)) s.dirty = true } + return nil } // preloadChunks is an internal helper method. @@ -358,8 +369,12 @@ func (s *memorySeries) preloadChunks( s.headChunkUsedByIterator = true } + curriedQuarantineSeries := func(err error) { + mss.quarantineSeries(fp, s.metric, err) + } + iter := &boundedIterator{ - it: s.newIterator(pinnedChunkDescs), + it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries), start: model.Now().Add(-mss.dropAfter), } @@ -370,7 +385,7 @@ func (s *memorySeries) preloadChunks( // must be pinned). // // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { +func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator { chunks := make([]chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { // It's OK to directly access cd.c here (without locking) as the @@ -378,8 +393,9 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator chunks = append(chunks, cd.c) } return &memorySeriesIterator{ - chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), + quarantine: quarantine, } } @@ -437,7 +453,11 @@ func (s *memorySeries) preloadChunksForRange( if fromIdx == len(s.chunkDescs) { // Even the last chunk starts before "from". Find out if the // series ends before "from" and we don't need to do anything. - if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) { + lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() + if err != nil { + return nil, nopIter, err + } + if lt.Before(from) { return nil, nopIter, nil } } @@ -511,16 +531,29 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. - chunkIts []chunkIterator // Caches chunkIterators. - chunks []chunk + chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. + chunkIts []chunkIterator // Caches chunkIterators. + chunks []chunk + quarantine func(error) // Call to quarantine the series this iterator belongs to. } // ValueAtOrBeforeTime implements SeriesIterator. func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { // The most common case. We are iterating through a chunk. - if it.chunkIt != nil && it.chunkIt.contains(t) { - return it.chunkIt.valueAtOrBeforeTime(t) + if it.chunkIt != nil { + containsT, err := it.chunkIt.contains(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + if containsT { + value, err := it.chunkIt.valueAtOrBeforeTime(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + return value + } } if len(it.chunks) == 0 { @@ -537,7 +570,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - return it.chunkIt.valueAtOrBeforeTime(t) + value, err := it.chunkIt.valueAtOrBeforeTime(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + return value } // RangeValues implements SeriesIterator. @@ -548,8 +586,15 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa }) // Only now check the last timestamp of the previous chunk (which is // fairly expensive). - if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { - i-- + if i > 0 { + lt, err := it.chunkIterator(i - 1).lastTimestamp() + if err != nil { + it.quarantine(err) + return nil + } + if !lt.Before(in.OldestInclusive) { + i-- + } } values := []model.SamplePair{} @@ -557,7 +602,12 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.firstTime().After(in.NewestInclusive) { break } - values = append(values, it.chunkIterator(i+j).rangeValues(in)...) + chValues, err := it.chunkIterator(i + j).rangeValues(in) + if err != nil { + it.quarantine(err) + return nil + } + values = append(values, chValues...) } return values } diff --git a/storage/local/storage.go b/storage/local/storage.go index f87f083ba3..2d99c073f3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -30,8 +30,9 @@ import ( ) const ( - evictRequestsCap = 1024 - chunkLen = 1024 + evictRequestsCap = 1024 + quarantineRequestsCap = 1024 + chunkLen = 1024 // See waitForNextFP. fpMaxSweepTime = 6 * time.Hour @@ -77,6 +78,12 @@ type evictRequest struct { evict bool } +type quarantineRequest struct { + fp model.Fingerprint + metric model.Metric + reason error +} + // SyncStrategy is an enum to select a sync strategy for series files. type SyncStrategy int @@ -147,6 +154,9 @@ type memorySeriesStorage struct { evictRequests chan evictRequest evictStopping, evictStopped chan struct{} + quarantineRequests chan quarantineRequest + quarantineStopping, quarantineStopped chan struct{} + persistErrors prometheus.Counter numSeries prometheus.Gauge seriesOps *prometheus.CounterVec @@ -198,6 +208,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { evictStopping: make(chan struct{}), evictStopped: make(chan struct{}), + quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap), + quarantineStopping: make(chan struct{}), + quarantineStopped: make(chan struct{}), + persistErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -312,6 +326,7 @@ func (s *memorySeriesStorage) Start() (err error) { } go s.handleEvictList() + go s.handleQuarantine() go s.logThrottling() go s.loop() @@ -326,6 +341,10 @@ func (s *memorySeriesStorage) Stop() error { close(s.loopStopping) <-s.loopStopped + log.Info("Stopping series quarantining...") + close(s.quarantineStopping) + <-s.quarantineStopped + log.Info("Stopping chunk eviction...") close(s.evictStopping) <-s.evictStopped @@ -521,22 +540,7 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric. // DropMetric implements Storage. func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { - s.fpLocker.Lock(fp) - - if series, ok := s.fpToSeries.get(fp); ok { - s.fpToSeries.del(fp) - s.numSeries.Dec() - s.persistence.unindexMetric(fp, series.metric) - } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) - } - // Attempt to delete series file in any case. - if _, err := s.persistence.deleteSeriesFile(fp); err != nil { - log.Errorf("Error deleting series file for %v: %v", fp, err) - } - - s.fpLocker.Unlock(fp) - s.seriesOps.WithLabelValues(requestedPurge).Inc() + s.purgeSeries(fp, nil, nil) } } @@ -554,19 +558,24 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) fp, err := s.mapper.mapFP(rawFP, sample.Metric) + defer func() { + s.fpLocker.Unlock(fp) + }() // Func wrapper because fp might change below. if err != nil { - log.Errorf("Error while mapping fingerprint %v: %v", rawFP, err) - s.persistence.setDirty(true) + s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + return err } if fp != rawFP { // Switch locks. s.fpLocker.Unlock(rawFP) s.fpLocker.Lock(fp) } - series := s.getOrCreateSeries(fp, sample.Metric) + series, err := s.getOrCreateSeries(fp, sample.Metric) + if err != nil { + return err // getOrCreateSeries took care of quarantining already. + } if sample.Timestamp <= series.lastTime { - s.fpLocker.Unlock(fp) // Don't report "no-op appends", i.e. where timestamp and sample // value are the same as for the last append, as they are a // common occurrence when using client-side timestamps @@ -577,13 +586,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { return nil } s.outOfOrderSamplesCount.Inc() - return ErrOutOfOrderSample + return ErrOutOfOrderSample // Caused by the caller. } - completedChunksCount := series.add(&model.SamplePair{ + completedChunksCount, err := series.add(model.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, }) - s.fpLocker.Unlock(fp) + if err != nil { + s.quarantineSeries(fp, sample.Metric, err) + return err + } s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) @@ -644,7 +656,7 @@ func (s *memorySeriesStorage) logThrottling() { } } -func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { +func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { series, ok := s.fpToSeries.get(fp) if !ok { var cds []*chunkDesc @@ -652,6 +664,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) + return nil, err } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() @@ -662,7 +675,8 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me // appear as archived or purged). cds, err = s.loadChunkDescs(fp, 0) if err != nil { - log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) + s.quarantineSeries(fp, m, err) + return nil, err } modTime = s.persistence.seriesFileModTime(fp) } else { @@ -670,18 +684,22 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, cds, modTime) + series, err = newMemorySeries(m, cds, modTime) + if err != nil { + s.quarantineSeries(fp, m, err) + return nil, err + } s.fpToSeries.put(fp, series) s.numSeries.Inc() } - return series + return series, nil } func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, lastSampleOnly bool, -) ([]*chunkDesc, SeriesIterator, error) { +) ([]*chunkDesc, SeriesIterator) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -689,23 +707,34 @@ func (s *memorySeriesStorage) preloadChunksForRange( if !ok { has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { - return nil, nopIter, err + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil, nopIter } if !has { s.invalidPreloadRequestsCount.Inc() - return nil, nopIter, nil + return nil, nopIter } if from.Before(last) && through.After(first) { metric, err := s.persistence.archivedMetric(fp) if err != nil { - return nil, nopIter, err + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil, nopIter + } + series, err = s.getOrCreateSeries(fp, metric) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Error while retrieving series.") + return nil, nopIter } - series = s.getOrCreateSeries(fp, metric) } else { - return nil, nopIter, nil + return nil, nopIter } } - return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) + cds, it, err := series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) + if err != nil { + s.quarantineSeries(fp, series.metric, err) + return nil, nopIter + } + return cds, it } func (s *memorySeriesStorage) handleEvictList() { @@ -1121,7 +1150,10 @@ func (s *memorySeriesStorage) writeMemorySeries( s.persistErrors.Inc() return false } - series.dropChunks(beforeTime) + if err := series.dropChunks(beforeTime); err != nil { + s.persistErrors.Inc() + return false + } if len(series.chunkDescs) == 0 && allDroppedFromPersistence { // All chunks dropped from both memory and persistence. Delete the series for good. s.fpToSeries.del(fp) @@ -1136,8 +1168,7 @@ func (s *memorySeriesStorage) writeMemorySeries( } else { series.chunkDescsOffset -= numDroppedFromPersistence if series.chunkDescsOffset < 0 { - log.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series) - s.persistence.setDirty(true) + s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series)) series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. } } @@ -1291,6 +1322,122 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { return score } +// quarantineSeries registers the provided fingerprint for quarantining. It +// always returns immediately. Quarantine requests are processed +// asynchronously. If there are too many requests queued, they are simply +// dropped. +// +// Quarantining means that the series file is moved to the orphaned directory, +// and all its traces are removed from indices. Call this method if an +// unrecoverable error is detected while dealing with a series, and pass in the +// encountered error. It will be saved as a hint in the orphaned directory. +func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) { + req := quarantineRequest{fp: fp, metric: metric, reason: err} + select { + case s.quarantineRequests <- req: + // Request submitted. + default: + log. + With("fingerprint", fp). + With("metric", metric). + With("reason", err). + Warn("Quarantine queue full. Dropped quarantine request.") + s.seriesOps.WithLabelValues(droppedQuarantine).Inc() + } +} + +func (s *memorySeriesStorage) handleQuarantine() { + for { + select { + case req := <-s.quarantineRequests: + s.purgeSeries(req.fp, req.metric, req.reason) + log. + With("fingerprint", req.fp). + With("metric", req.metric). + With("reason", req.reason). + Warn("Series quarantined.") + case <-s.quarantineStopping: + log.Info("Series quarantining stopped.") + close(s.quarantineStopped) + return + } + } + +} + +// purgeSeries removes all traces of a series. If a non-nil quarantine reason is +// provided, the series file will not be deleted completely, but moved to the +// orphaned directory with the reason and the metric in a hint file. The +// provided metric might be nil if unknown. +func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) { + s.fpLocker.Lock(fp) + + var ( + series *memorySeries + ok bool + ) + + if series, ok = s.fpToSeries.get(fp); ok { + s.fpToSeries.del(fp) + s.numSeries.Dec() + m = series.metric + + // Adjust s.numChunksToPersist and numMemChunks down by + // the number of chunks in this series that are not + // persisted yet. Persisted chunks will be deducted from + // numMemChunks upon eviction. + numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark + atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted)) + if !series.headChunkClosed { + // Head chunk wasn't counted as waiting for persistence yet. + // (But it was counted as a chunk in memory.) + numChunksNotYetPersisted-- + } + s.incNumChunksToPersist(-numChunksNotYetPersisted) + + } else { + if err := s.persistence.purgeArchivedMetric(fp); err != nil { + log. + With("fingerprint", fp). + With("metric", m). + With("error", err). + Error("Error purging metric from archive.") + } + } + if m != nil { + // If we know a metric now, unindex it in any case. + // purgeArchivedMetric might have done so already, but we cannot + // be sure. Unindexing in idempotent, though. + s.persistence.unindexMetric(fp, m) + } + // Attempt to delete/quarantine the series file in any case. + if quarantineReason == nil { + // No reason stated, simply delete the file. + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log. + With("fingerprint", fp). + With("metric", m). + With("error", err). + Error("Error deleting series file.") + } + s.seriesOps.WithLabelValues(requestedPurge).Inc() + } else { + if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil { + s.seriesOps.WithLabelValues(completedQurantine).Inc() + } else { + s.seriesOps.WithLabelValues(failedQuarantine).Inc() + log. + With("fingerprint", fp). + With("metric", m). + With("reason", quarantineReason). + With("error", err). + Error("Error quarantining series file.") + } + } + + s.fpLocker.Unlock(fp) +} + // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 381f7c7a36..97fa450ba7 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -405,10 +405,7 @@ func TestRetentionCutoff(t *testing.T) { defer pl.Close() // Preload everything. - it, err := pl.PreloadRange(fp, insertStart, now) - if err != nil { - t.Fatalf("Error preloading outdated chunks: %s", err) - } + it := pl.PreloadRange(fp, insertStart, now) val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) if val.Timestamp != model.Earliest { @@ -492,18 +489,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -525,18 +516,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -549,6 +534,95 @@ func TestDropMetrics(t *testing.T) { } } +func TestQuarantineMetric(t *testing.T) { + now := model.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + chunkFileExists := func(fp model.Fingerprint) (bool, error) { + f, err := s.persistence.openChunkFileForReading(fp) + if err == nil { + f.Close() + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} + m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} + m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} + + N := 120000 + + for j, m := range []model.Metric{m1, m2, m3} { + for i := 0; i < N; i++ { + smpl := &model.Sample{ + Metric: m, + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals. + Value: model.SampleValue(j), + } + s.Append(smpl) + } + } + s.WaitForIndexing() + + // Archive m3, but first maintain it so that at least something is written to disk. + fpToBeArchived := m3.FastFingerprint() + s.maintainMemorySeries(fpToBeArchived, 0) + s.fpLocker.Lock(fpToBeArchived) + s.fpToSeries.del(fpToBeArchived) + if err := s.persistence.archiveMetric( + fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), + ); err != nil { + t.Error(err) + } + s.fpLocker.Unlock(fpToBeArchived) + + // Corrupt the series file for m3. + f, err := os.Create(s.persistence.fileNameForFingerprint(fpToBeArchived)) + if err != nil { + t.Fatal(err) + } + if _, err := f.WriteString("This is clearly not the content of a series file."); err != nil { + t.Fatal(err) + } + if f.Close(); err != nil { + t.Fatal(err) + } + + fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) + if len(fps) != 3 { + t.Errorf("unexpected number of fingerprints: %d", len(fps)) + } + + pl := s.NewPreloader() + // This will access the corrupt file and lead to quarantining. + pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute) + pl.Close() + time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. + s.WaitForIndexing() + + fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ + Name: model.MetricNameLabel, Value: "test", + }) + if len(fps2) != 2 { + t.Errorf("unexpected number of fingerprints: %d", len(fps2)) + } + + exists, err := chunkFileExists(fpToBeArchived) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("chunk file exists for fp=%v", fpToBeArchived) + } +} + // TestLoop is just a smoke test for the loop method, if we can switch it on and // off without disaster. func TestLoop(t *testing.T) { @@ -619,7 +693,10 @@ func testChunk(t *testing.T, encoding chunkEncoding) { continue } for sample := range cd.c.newIterator().values() { - values = append(values, *sample) + if sample.error != nil { + t.Error(sample.error) + } + values = append(values, sample.SamplePair) } } @@ -662,10 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) // #1 Exactly on a sample. for i, expected := range samples { @@ -739,10 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - b.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) b.ResetTimer() @@ -820,10 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) // #1 Zero length interval at sample. for i, expected := range samples { @@ -975,10 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - b.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) b.ResetTimer() @@ -1024,10 +1089,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1045,10 +1107,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1074,8 +1133,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) + lastTime, err := series.head().lastTime() + if err != nil { + t.Fatal(err) + } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), lastTime, ); err != nil { t.Fatal(err) } @@ -1125,8 +1188,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) + lastTime, err = series.head().lastTime() + if err != nil { + t.Fatal(err) + } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), lastTime, ); err != nil { t.Fatal(err) } @@ -1520,10 +1587,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, t.Fatal(err) } p := s.NewPreloader() - it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) - if err != nil { - t.Fatal(err) - } + it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) if found.Timestamp == model.Earliest { t.Errorf("Sample %#v: Expected sample not found.", sample) @@ -1567,10 +1631,7 @@ func TestAppendOutOfOrder(t *testing.T) { pl := s.NewPreloader() defer pl.Close() - it, err := pl.PreloadRange(fp, 0, 2) - if err != nil { - t.Fatalf("Error preloading chunks: %s", err) - } + it := pl.PreloadRange(fp, 0, 2) want := []model.SamplePair{ {