prometheus/storage/local/delta.go
beorn7 0ea5801e47 Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...

The ideas behind this are the following:

- panic only if it's a programming error. Data corruptions happen, and
  they are not programming errors.

- If we detect a data corruption, we "quarantine" the series,
  essentially removing it from the database and putting its data into
  a separate directory for forensics.

- Failure during writing to a series file is not considered corruption
  automatically. It will call setDirty, though, so that a
  crashrecovery upon the next restart will commence and check for
  that.

- Series quarantining and setDirty calls are logged and counted in
  metrics, but are hidden from the user of the interfaces in
  interface.go, whith the notable exception of Append(). The reasoning
  is that we treat corruption by removing the corrupted series, i.e. a
  query for it will return no results on its next call anyway, so
  return no results right now. In the case of Append(), we want to
  tell the user that no data has been appended, though.

Minor side effects:

- Now consistently using filepath.* instead of path.*.

- Introduced structured logging where I touched it. This makes things
  less consistent, but a complete change to structured logging would
  be out of scope for this PR.
2016-03-02 23:02:34 +01:00

487 lines
13 KiB
Go

// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"encoding/binary"
"fmt"
"io"
"math"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
)
// The 21-byte header of a delta-encoded chunk looks like:
//
// - time delta bytes: 1 bytes
// - value delta bytes: 1 bytes
// - is integer: 1 byte
// - base time: 8 bytes
// - base value: 8 bytes
// - used buf bytes: 2 bytes
const (
deltaHeaderBytes = 21
deltaHeaderTimeBytesOffset = 0
deltaHeaderValueBytesOffset = 1
deltaHeaderIsIntOffset = 2
deltaHeaderBaseTimeOffset = 3
deltaHeaderBaseValueOffset = 11
deltaHeaderBufLenOffset = 19
)
// A deltaEncodedChunk adaptively stores sample timestamps and values with a
// delta encoding of various types (int, float) and bit widths. However, once 8
// bytes would be needed to encode a delta value, a fall-back to the absolute
// numbers happens (so that timestamps are saved directly as int64 and values as
// float64). It implements the chunk interface.
type deltaEncodedChunk []byte
// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk.
func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk {
if tb < 1 {
panic("need at least 1 time delta byte")
}
if length < deltaHeaderBytes+16 {
panic(fmt.Errorf(
"chunk length %d bytes is insufficient, need at least %d",
length, deltaHeaderBytes+16,
))
}
c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length)
c[deltaHeaderTimeBytesOffset] = byte(tb)
c[deltaHeaderValueBytesOffset] = byte(vb)
if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes.
c[deltaHeaderIsIntOffset] = 1
} else {
c[deltaHeaderIsIntOffset] = 0
}
return &c
}
// add implements chunk.
func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 {
c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)))
}
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
baseValue := c.baseValue()
dt := s.Timestamp - c.baseTime()
if dt < 0 {
return nil, fmt.Errorf("time delta is less than zero: %v", dt)
}
dv := s.Value - baseValue
tb := c.timeBytes()
vb := c.valueBytes()
isInt := c.isInt()
// If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s).
ntb, nvb, nInt := tb, vb, isInt
if isInt && !isInt64(dv) {
// int->float.
nvb = d4
nInt = false
} else if !isInt && vb == d4 && baseValue+model.SampleValue(float32(dv)) != s.Value {
// float32->float64.
nvb = d8
} else {
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt))
}
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv))
}
}
if tb != ntb || vb != nvb || isInt != nInt {
if len(c)*2 < cap(c) {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(dt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(dt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(dt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
offset += int(tb)
if c.isInt() {
switch vb {
case d0:
// No-op. Constant value is stored as base value.
case d1:
c[offset] = byte(int8(dv))
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(int16(dv)))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv)))
// d8 must not happen. Those samples are encoded as float64.
default:
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
}
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}, nil
}
// clone implements chunk.
func (c deltaEncodedChunk) clone() chunk {
clone := make(deltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
// firstTime implements chunk.
func (c deltaEncodedChunk) firstTime() model.Time {
return c.baseTime()
}
// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
return &deltaEncodedChunkIterator{
c: *c,
len: c.len(),
baseT: c.baseTime(),
baseV: c.baseValue(),
tBytes: c.timeBytes(),
vBytes: c.valueBytes(),
isInt: c.isInt(),
}
}
// marshal implements chunk.
func (c deltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
}
return nil
}
// marshalToBuf implements chunk.
func (c deltaEncodedChunk) marshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint")
}
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n := copy(buf, c)
if n != len(c) {
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
}
// encoding implements chunk.
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }
func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])
}
func (c deltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[deltaHeaderValueBytesOffset])
}
func (c deltaEncodedChunk) isInt() bool {
return c[deltaHeaderIsIntOffset] == 1
}
func (c deltaEncodedChunk) baseTime() model.Time {
return model.Time(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:]))
}
func (c deltaEncodedChunk) baseValue() model.SampleValue {
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:])))
}
func (c deltaEncodedChunk) sampleSize() int {
return int(c.timeBytes() + c.valueBytes())
}
func (c deltaEncodedChunk) len() int {
if len(c) < deltaHeaderBytes {
return 0
}
return (len(c) - deltaHeaderBytes) / c.sampleSize()
}
// deltaEncodedChunkIterator implements chunkIterator.
type deltaEncodedChunkIterator struct {
c deltaEncodedChunk
len int
baseT model.Time
baseV model.SampleValue
tBytes, vBytes deltaBytes
isInt bool
}
// length implements chunkIterator.
func (it *deltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
}
// rangeValues implements chunkIterator.
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool {
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result, nil
}
// contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *deltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
return valuesChan
}
// timestampAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
switch it.tBytes {
case d1:
return it.baseT + model.Time(uint8(it.c[offset])), nil
case d2:
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil
case d4:
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default:
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
}
}
// lastTimestamp implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
if it.isInt {
switch it.vBytes {
case d0:
return it.baseV, nil
case d1:
return it.baseV + model.SampleValue(int8(it.c[offset])), nil
case d2:
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4:
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints.
default:
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
}
} else {
switch it.vBytes {
case d4:
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default:
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
}
}
}
// lastSampleValue implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}