2017-04-10 11:59:45 -07:00
|
|
|
// Copyright 2017 The Prometheus Authors
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-03-07 03:47:49 -08:00
|
|
|
package tsdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
2017-04-25 07:45:44 -07:00
|
|
|
"hash"
|
2017-04-28 05:17:53 -07:00
|
|
|
"hash/crc32"
|
2017-03-07 03:47:49 -08:00
|
|
|
"io"
|
|
|
|
"os"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
|
|
"github.com/pkg/errors"
|
2017-04-04 02:27:26 -07:00
|
|
|
"github.com/prometheus/tsdb/chunks"
|
2017-03-07 03:47:49 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2017-04-28 06:41:42 -07:00
|
|
|
// MagicChunks is 4 bytes at the head of a series file.
|
2017-03-07 03:47:49 -08:00
|
|
|
MagicChunks = 0x85BD40DD
|
|
|
|
)
|
|
|
|
|
|
|
|
// ChunkMeta holds information about a chunk of data.
|
|
|
|
type ChunkMeta struct {
|
|
|
|
// Ref and Chunk hold either a reference that can be used to retrieve
|
|
|
|
// chunk data or the data itself.
|
|
|
|
// Generally, only one of them is set.
|
|
|
|
Ref uint64
|
|
|
|
Chunk chunks.Chunk
|
|
|
|
|
|
|
|
MinTime, MaxTime int64 // time range the data covers
|
2017-05-14 02:06:26 -07:00
|
|
|
|
|
|
|
// To handle deleted time-ranges.
|
|
|
|
deleted bool
|
|
|
|
dranges []trange
|
|
|
|
}
|
|
|
|
|
|
|
|
type trange struct {
|
|
|
|
mint, maxt int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tr trange) inBounds(t int64) bool {
|
|
|
|
return t >= tr.mint && t <= tr.maxt
|
2017-03-07 03:47:49 -08:00
|
|
|
}
|
|
|
|
|
2017-05-15 10:58:14 -07:00
|
|
|
// This adds the new time-range to the existing ones.
|
|
|
|
// The existing ones must be sorted.
|
|
|
|
func addNewInterval(existing []trange, n trange) []trange {
|
|
|
|
for i, r := range existing {
|
|
|
|
if r.inBounds(n.mint) {
|
|
|
|
if n.maxt > r.maxt {
|
|
|
|
existing[i].maxt = n.maxt
|
|
|
|
}
|
|
|
|
|
|
|
|
return existing
|
|
|
|
}
|
|
|
|
if r.inBounds(n.maxt) {
|
|
|
|
if n.mint < r.maxt {
|
|
|
|
existing[i].mint = n.mint
|
|
|
|
}
|
|
|
|
|
|
|
|
return existing
|
|
|
|
}
|
|
|
|
|
|
|
|
if n.mint < r.mint {
|
|
|
|
newRange := existing[:i]
|
|
|
|
newRange = append(newRange, n)
|
|
|
|
newRange = append(newRange, existing[i:]...)
|
|
|
|
|
|
|
|
return newRange
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
existing = append(existing, n)
|
|
|
|
return existing
|
|
|
|
}
|
|
|
|
|
2017-05-02 03:55:40 -07:00
|
|
|
// writeHash writes the chunk encoding and raw data into the provided hash.
|
|
|
|
func (cm *ChunkMeta) writeHash(h hash.Hash) error {
|
2017-04-25 07:45:44 -07:00
|
|
|
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-05-14 02:06:26 -07:00
|
|
|
// Iterator returns a chunks.Iterator that honors any deleted ranges.
|
|
|
|
// If there is no deleted range then the underlying iterator is returned.
|
|
|
|
func (cm *ChunkMeta) Iterator() chunks.Iterator {
|
|
|
|
if cm.Chunk == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if cm.deleted {
|
|
|
|
return &deletedIterator{it: cm.Chunk.Iterator(), dranges: cm.dranges}
|
|
|
|
}
|
|
|
|
|
|
|
|
return cm.Chunk.Iterator()
|
|
|
|
}
|
|
|
|
|
|
|
|
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
|
|
|
|
// returned.
|
|
|
|
type deletedIterator struct {
|
|
|
|
it chunks.Iterator
|
|
|
|
|
|
|
|
dranges []trange
|
|
|
|
}
|
|
|
|
|
|
|
|
func (it *deletedIterator) At() (int64, float64) {
|
|
|
|
return it.it.At()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (it *deletedIterator) Next() bool {
|
|
|
|
Outer:
|
|
|
|
for it.it.Next() {
|
|
|
|
ts, _ := it.it.At()
|
|
|
|
for _, tr := range it.dranges {
|
|
|
|
if tr.inBounds(ts) {
|
|
|
|
continue Outer
|
|
|
|
}
|
|
|
|
if ts > tr.maxt {
|
|
|
|
it.dranges = it.dranges[1:]
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2017-05-15 10:58:14 -07:00
|
|
|
func (it *deletedIterator) Err() error {
|
2017-05-14 02:06:26 -07:00
|
|
|
return it.Err()
|
|
|
|
}
|
|
|
|
|
2017-03-07 03:47:49 -08:00
|
|
|
// ChunkWriter serializes a time block of chunked series data.
|
|
|
|
type ChunkWriter interface {
|
2017-04-28 06:41:42 -07:00
|
|
|
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
|
2017-03-07 03:47:49 -08:00
|
|
|
// must be populated.
|
|
|
|
// After returning successfully, the Ref fields in the ChunkMetas
|
2017-04-28 06:41:42 -07:00
|
|
|
// are set and can be used to retrieve the chunks from the written data.
|
2017-03-14 07:40:16 -07:00
|
|
|
WriteChunks(chunks ...*ChunkMeta) error
|
2017-03-07 03:47:49 -08:00
|
|
|
|
|
|
|
// Close writes any required finalization and closes the resources
|
|
|
|
// associated with the underlying writer.
|
|
|
|
Close() error
|
|
|
|
}
|
|
|
|
|
|
|
|
// chunkWriter implements the ChunkWriter interface for the standard
|
|
|
|
// serialization format.
|
|
|
|
type chunkWriter struct {
|
|
|
|
dirFile *os.File
|
|
|
|
files []*os.File
|
|
|
|
wbuf *bufio.Writer
|
|
|
|
n int64
|
2017-04-28 05:17:53 -07:00
|
|
|
crc32 hash.Hash
|
2017-03-07 03:47:49 -08:00
|
|
|
|
|
|
|
segmentSize int64
|
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
|
|
|
defaultChunkSegmentSize = 512 * 1024 * 1024
|
|
|
|
|
|
|
|
chunksFormatV1 = 1
|
|
|
|
)
|
|
|
|
|
|
|
|
func newChunkWriter(dir string) (*chunkWriter, error) {
|
|
|
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
dirFile, err := fileutil.OpenDir(dir)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
cw := &chunkWriter{
|
|
|
|
dirFile: dirFile,
|
|
|
|
n: 0,
|
2017-04-28 05:17:53 -07:00
|
|
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
2017-03-07 03:47:49 -08:00
|
|
|
segmentSize: defaultChunkSegmentSize,
|
|
|
|
}
|
|
|
|
return cw, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *chunkWriter) tail() *os.File {
|
|
|
|
if len(w.files) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return w.files[len(w.files)-1]
|
|
|
|
}
|
|
|
|
|
|
|
|
// finalizeTail writes all pending data to the current tail file,
|
|
|
|
// truncates its size, and closes it.
|
|
|
|
func (w *chunkWriter) finalizeTail() error {
|
|
|
|
tf := w.tail()
|
|
|
|
if tf == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := w.wbuf.Flush(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := fileutil.Fsync(tf); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// As the file was pre-allocated, we truncate any superfluous zero bytes.
|
|
|
|
off, err := tf.Seek(0, os.SEEK_CUR)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := tf.Truncate(off); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return tf.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *chunkWriter) cut() error {
|
|
|
|
// Sync current tail to disk and close.
|
2017-04-28 06:59:23 -07:00
|
|
|
if err := w.finalizeTail(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-03-07 03:47:49 -08:00
|
|
|
|
|
|
|
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err = w.dirFile.Sync(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write header metadata for new file.
|
|
|
|
|
|
|
|
metab := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint32(metab[:4], MagicChunks)
|
|
|
|
metab[4] = chunksFormatV1
|
|
|
|
|
|
|
|
if _, err := f.Write(metab); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
w.files = append(w.files, f)
|
|
|
|
if w.wbuf != nil {
|
|
|
|
w.wbuf.Reset(f)
|
|
|
|
} else {
|
|
|
|
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
|
|
|
|
}
|
|
|
|
w.n = 8
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-24 08:10:12 -07:00
|
|
|
func (w *chunkWriter) write(b []byte) error {
|
|
|
|
n, err := w.wbuf.Write(b)
|
2017-03-07 03:47:49 -08:00
|
|
|
w.n += int64(n)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-03-14 07:40:16 -07:00
|
|
|
func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
2017-03-07 03:47:49 -08:00
|
|
|
// Calculate maximum space we need and cut a new segment in case
|
|
|
|
// we don't fit into the current one.
|
2017-04-28 06:41:42 -07:00
|
|
|
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
|
2017-03-07 03:47:49 -08:00
|
|
|
for _, c := range chks {
|
2017-04-28 06:41:42 -07:00
|
|
|
maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
|
2017-05-15 10:58:14 -07:00
|
|
|
|
|
|
|
// Remove the deleted parts.
|
|
|
|
if c.deleted {
|
|
|
|
// TODO(gouthamve): Try to do it in-place somehow?
|
|
|
|
chk := chunks.NewXORChunk()
|
|
|
|
app, err := chk.Appender()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
it := c.Iterator()
|
|
|
|
for it.Next() {
|
|
|
|
ts, v := it.At()
|
|
|
|
app.Append(ts, v)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := it.Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
c.Chunk = chk
|
|
|
|
}
|
|
|
|
|
2017-03-07 03:47:49 -08:00
|
|
|
maxLen += int64(len(c.Chunk.Bytes()))
|
|
|
|
}
|
|
|
|
newsz := w.n + maxLen
|
|
|
|
|
|
|
|
if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
|
|
|
|
if err := w.cut(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
b := make([]byte, binary.MaxVarintLen32)
|
|
|
|
n := binary.PutUvarint(b, uint64(len(chks)))
|
|
|
|
|
2017-04-24 08:10:12 -07:00
|
|
|
if err := w.write(b[:n]); err != nil {
|
2017-03-07 03:47:49 -08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
seq := uint64(w.seq()) << 32
|
|
|
|
|
2017-03-14 07:40:16 -07:00
|
|
|
for _, chk := range chks {
|
2017-03-07 03:47:49 -08:00
|
|
|
chk.Ref = seq | uint64(w.n)
|
|
|
|
|
|
|
|
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
|
|
|
|
|
2017-04-24 08:10:12 -07:00
|
|
|
if err := w.write(b[:n]); err != nil {
|
2017-03-07 03:47:49 -08:00
|
|
|
return err
|
|
|
|
}
|
2017-04-25 07:45:44 -07:00
|
|
|
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-04-24 08:10:12 -07:00
|
|
|
if err := w.write(chk.Chunk.Bytes()); err != nil {
|
2017-03-07 03:47:49 -08:00
|
|
|
return err
|
|
|
|
}
|
2017-04-28 05:17:53 -07:00
|
|
|
|
|
|
|
w.crc32.Reset()
|
2017-05-02 03:55:40 -07:00
|
|
|
if err := chk.writeHash(w.crc32); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-04-28 05:17:53 -07:00
|
|
|
if err := w.write(w.crc32.Sum(nil)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-03-07 03:47:49 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *chunkWriter) seq() int {
|
|
|
|
return len(w.files) - 1
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *chunkWriter) Close() error {
|
|
|
|
return w.finalizeTail()
|
|
|
|
}
|
|
|
|
|
|
|
|
// ChunkReader provides reading access of serialized time series data.
|
|
|
|
type ChunkReader interface {
|
|
|
|
// Chunk returns the series data chunk with the given reference.
|
|
|
|
Chunk(ref uint64) (chunks.Chunk, error)
|
|
|
|
|
|
|
|
// Close releases all underlying resources of the reader.
|
|
|
|
Close() error
|
|
|
|
}
|
|
|
|
|
|
|
|
// chunkReader implements a SeriesReader for a serialized byte stream
|
|
|
|
// of series data.
|
|
|
|
type chunkReader struct {
|
|
|
|
// The underlying bytes holding the encoded series data.
|
|
|
|
bs [][]byte
|
|
|
|
|
|
|
|
// Closers for resources behind the byte slices.
|
|
|
|
cs []io.Closer
|
|
|
|
}
|
|
|
|
|
|
|
|
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
|
|
|
func newChunkReader(dir string) (*chunkReader, error) {
|
|
|
|
files, err := sequenceFiles(dir, "")
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var cr chunkReader
|
|
|
|
|
|
|
|
for _, fn := range files {
|
|
|
|
f, err := openMmapFile(fn)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrapf(err, "mmap files")
|
|
|
|
}
|
|
|
|
cr.cs = append(cr.cs, f)
|
|
|
|
cr.bs = append(cr.bs, f.b)
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, b := range cr.bs {
|
|
|
|
if len(b) < 4 {
|
|
|
|
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
|
|
|
}
|
|
|
|
// Verify magic number.
|
|
|
|
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
|
|
|
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return &cr, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *chunkReader) Close() error {
|
|
|
|
return closeAll(s.cs...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
|
|
|
var (
|
|
|
|
seq = int(ref >> 32)
|
|
|
|
off = int((ref << 32) >> 32)
|
|
|
|
)
|
|
|
|
if seq >= len(s.bs) {
|
|
|
|
return nil, errors.Errorf("reference sequence %d out of range", seq)
|
|
|
|
}
|
|
|
|
b := s.bs[seq]
|
|
|
|
|
|
|
|
if int(off) >= len(b) {
|
|
|
|
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
|
|
|
|
}
|
2017-04-25 07:45:44 -07:00
|
|
|
b = b[off:]
|
2017-03-07 03:47:49 -08:00
|
|
|
|
|
|
|
l, n := binary.Uvarint(b)
|
|
|
|
if n < 0 {
|
|
|
|
return nil, fmt.Errorf("reading chunk length failed")
|
|
|
|
}
|
|
|
|
b = b[n:]
|
2017-04-25 07:45:44 -07:00
|
|
|
enc := chunks.Encoding(b[0])
|
2017-03-07 03:47:49 -08:00
|
|
|
|
2017-04-25 07:45:44 -07:00
|
|
|
c, err := chunks.FromData(enc, b[1:1+l])
|
2017-03-07 03:47:49 -08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return c, nil
|
|
|
|
}
|