Udpate tsdb to 0.4 (#5110)

* update tsdb to v0.4.0

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>

* remove unused struct field

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-01-18 14:02:15 +03:00 committed by Goutham Veeramachaneni
parent 68e4c211f2
commit 3bd41cc92c
22 changed files with 823 additions and 540 deletions

2
go.mod
View file

@ -89,7 +89,7 @@ require (
github.com/prometheus/client_golang v0.9.1 github.com/prometheus/client_golang v0.9.1
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea
github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc github.com/prometheus/tsdb v0.4.0
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/rlmcpherson/s3gof3r v0.5.0 // indirect github.com/rlmcpherson/s3gof3r v0.5.0 // indirect
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect

4
go.sum
View file

@ -218,8 +218,8 @@ github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea h1:4RkbEb5XX0Wvu
github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc h1:phU3kj067sczIc4fhaq5rRcH4Lp9A45MsrcQqjC+cao= github.com/prometheus/tsdb v0.4.0 h1:pXJyEi/5p6UBmOrnzsZmYxLrZjxnRlEB78/qj3+a8Gk=
github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.4.0/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k= github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k=

View file

@ -108,9 +108,6 @@ type adapter struct {
// Options of the DB storage. // Options of the DB storage.
type Options struct { type Options struct {
// The interval at which the write ahead log is flushed to disc.
WALFlushInterval time.Duration
// The timestamp range of head blocks after which they get persisted. // The timestamp range of head blocks after which they get persisted.
// It's the minimum duration of any persisted block. // It's the minimum duration of any persisted block.
MinBlockDuration model.Duration MinBlockDuration model.Duration
@ -185,7 +182,6 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
} }
db, err := tsdb.Open(path, l, r, &tsdb.Options{ db, err := tsdb.Open(path, l, r, &tsdb.Options{
WALFlushInterval: 10 * time.Second,
WALSegmentSize: int(opts.WALSegmentSize), WALSegmentSize: int(opts.WALSegmentSize),
RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000), RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000),
BlockRanges: rngs, BlockRanges: rngs,

View file

@ -15,10 +15,10 @@ go_import_path: github.com/prometheus/tsdb
before_install: before_install:
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi
install: install:
- go get -v -t ./... - make deps
script: script:
# `staticcheck` target is omitted due to linting errors # `staticcheck` target is omitted due to linting errors
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make check_license style unused test; fi - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make; fi

View file

@ -1,8 +1,18 @@
## master / unreleased ## master / unreleased
## 0.4.0
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
## 0.3.1 ## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
## 0.3.0 ## 0.3.0
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
@ -11,3 +21,4 @@
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.

View file

@ -18,11 +18,15 @@ TSDB_BENCHMARK_NUM_METRICS ?= 1000
TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json" TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json"
TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout" TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout"
STATICCHECK_IGNORE =
include Makefile.common include Makefile.common
.PHONY: deps
deps:
@echo ">> getting dependencies"
GO111MODULE=$(GO111MODULE) $(GO) get $(GOOPTS) -t ./...
build: build:
@$(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR) GO111MODULE=$(GO111MODULE) $(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)
bench: build bench: build
@echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)" @echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)"

View file

@ -21,6 +21,8 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunkenc"
@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender Appender() Appender
} }
// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}
// BlockMeta provides meta information about a block. // BlockMeta provides meta information about a block.
type BlockMeta struct { type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction. // Unique identifier for the block and its contents. Changes on compaction.
@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"` NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
} }
// BlockDesc describes a block by ULID and time range. // BlockDesc describes a block by ULID and time range.
@ -182,6 +191,9 @@ type BlockMetaCompaction struct {
Level int `json:"level"` Level int `json:"level"`
// ULIDs of all source head blocks that went into the block. // ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"` Sources []ulid.ULID `json:"sources,omitempty"`
// Indicates that during compaction it resulted in a block without any samples
// so it should be deleted on the next reload.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create // Short descriptions of the direct blocks that were used to create
// this block. // this block.
Parents []BlockDesc `json:"parents,omitempty"` Parents []BlockDesc `json:"parents,omitempty"`
@ -257,7 +269,10 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs. // to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -272,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err return nil, err
} }
tr, err := readTombstones(dir) tr, tsr, err := readTombstones(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}
pb := &Block{ pb := &Block{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
@ -288,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil return pb, nil
} }
func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}
// Close closes the on-disk block. It blocks as long as there are readers reading from the block. // Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error { func (pb *Block) Close() error {
pb.mtx.Lock() pb.mtx.Lock()
@ -315,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block. // Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta } func (pb *Block) Meta() BlockMeta { return pb.meta }
// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
// ErrClosing is returned when a block is in the process of being closed. // ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing") var ErrClosing = errors.New("block is closing")

View file

@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
defer sgmReader.Close() defer sgmReader.Close()
} }
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
cpdirtmp := cpdir + ".tmp" cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil { if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
return nil, errors.Wrap(err, "open checkpoint") return nil, errors.Wrap(err, "open checkpoint")
} }
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()
r := wal.NewReader(sgmReader) r := wal.NewReader(sgmReader)
var ( var (

View file

@ -205,6 +205,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
for _, c := range chks { for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
maxLen += int64(len(c.Chunk.Bytes())) maxLen += int64(len(c.Chunk.Bytes()))
maxLen += 4 // The 4 bytes of crc32
} }
newsz := w.n + maxLen newsz := w.n + maxLen
@ -284,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// Reader implements a SeriesReader for a serialized byte stream // Reader implements a SeriesReader for a serialized byte stream
// of series data. // of series data.
type Reader struct { type Reader struct {
// The underlying bytes holding the encoded series data. bs []ByteSlice // The underlying bytes holding the encoded series data.
bs []ByteSlice cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
// Closers for resources behind the byte slices.
cs []io.Closer
pool chunkenc.Pool pool chunkenc.Pool
} }
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs} cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64
for i, b := range cr.bs { for i, b := range cr.bs {
if b.Len() < 4 { if b.Len() < 4 {
@ -304,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, errors.Errorf("invalid magic number %x", m) return nil, errors.Errorf("invalid magic number %x", m)
} }
totalSize += int64(b.Len())
} }
cr.size = totalSize
return &cr, nil return &cr, nil
} }
@ -327,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
pool = chunkenc.NewPool() pool = chunkenc.NewPool()
} }
var bs []ByteSlice var (
var cs []io.Closer bs []ByteSlice
cs []io.Closer
)
for _, fn := range files { for _, fn := range files {
f, err := fileutil.OpenMmapFile(fn) f, err := fileutil.OpenMmapFile(fn)
if err != nil { if err != nil {
@ -345,6 +347,11 @@ func (s *Reader) Close() error {
return closeAll(s.cs...) return closeAll(s.cs...)
} }
// Size returns the size of the chunks.
func (s *Reader) Size() int64 {
return s.size
}
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var ( var (
seq = int(ref >> 32) seq = int(ref >> 32)

View file

@ -55,12 +55,17 @@ type Compactor interface {
Plan(dir string) ([]string, error) Plan(dir string) ([]string, error)
// Write persists a Block into a directory. // Write persists a Block into a directory.
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must // Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan(). // only be called concurrently with results of Plan().
// Can optionally pass a list of already open blocks, // Can optionally pass a list of already open blocks,
// to avoid having to reopen them. // to avoid having to reopen them.
// When resulting Block has 0 samples
// * No block is written.
// * The source dirs are marked Deletable.
// * Returns empty ulid.ULID{}.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
} }
@ -186,13 +191,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return res, nil return res, nil
} }
// Compact any blocks that have >5% tombstones. // Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- { for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break break
} }
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil return []string{dms[i].dir}, nil
} }
@ -347,7 +351,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if b == nil { if b == nil {
var err error var err error
b, err = OpenBlock(d, c.chunkPool) b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil { if err != nil {
return uid, err return uid, err
} }
@ -366,15 +370,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
meta := compactBlockMetas(uid, metas...) meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...) err = c.write(dest, meta, blocks...)
if err == nil { if err == nil {
level.Info(c.logger).Log( if meta.Stats.NumSamples == 0 {
"msg", "compact blocks", for _, b := range bs {
"count", len(blocks), b.meta.Compaction.Deletable = true
"mint", meta.MinTime, if err = writeMetaFile(b.dir, &b.meta); err != nil {
"maxt", meta.MaxTime, level.Error(c.logger).Log(
"ulid", meta.ULID, "msg", "Failed to write 'Deletable' to meta file after compaction",
"sources", fmt.Sprintf("%v", uids), "ulid", b.meta.ULID,
"duration", time.Since(start), )
) }
}
uid = ulid.ULID{}
level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block",
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
} else {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
}
return uid, nil return uid, nil
} }
@ -413,6 +436,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return uid, err return uid, err
} }
if meta.Stats.NumSamples == 0 {
return ulid.ULID{}, nil
}
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
return uid, nil return uid, nil
} }
@ -490,11 +517,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction") return errors.Wrap(err, "write compaction")
} }
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// We are explicitly closing them here to check for error even // We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows, // though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to // you cannot delete these unless they are closed and the defer is to
@ -506,6 +528,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "close index writer") return errors.Wrap(err, "close index writer")
} }
// Populated block is empty, so cleanup and exit.
if meta.Stats.NumSamples == 0 {
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
}
return nil
}
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// Create an empty tombstones file. // Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file") return errors.Wrap(err, "write new tombstones file")

View file

@ -44,7 +44,6 @@ import (
// DefaultOptions used for the DB. They are sane for setups using // DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestamps. // millisecond precision timestamps.
var DefaultOptions = &Options{ var DefaultOptions = &Options{
WALFlushInterval: 5 * time.Second,
WALSegmentSize: wal.DefaultSegmentSize, WALSegmentSize: wal.DefaultSegmentSize,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
@ -53,15 +52,19 @@ var DefaultOptions = &Options{
// Options of the DB storage. // Options of the DB storage.
type Options struct { type Options struct {
// The interval at which the write ahead log is flushed to disk.
WALFlushInterval time.Duration
// Segments (wal files) max size // Segments (wal files) max size
WALSegmentSize int WALSegmentSize int
// Duration of persisted data to keep. // Duration of persisted data to keep.
RetentionDuration uint64 RetentionDuration uint64
// Maximum number of bytes in blocks to be retained.
// 0 or less means disabled.
// NOTE: For proper storage calculations need to consider
// the size of the WAL folder which is not added when calculating
// the current size of the database.
MaxBytes int64
// The sizes of the Blocks. // The sizes of the Blocks.
BlockRanges []int64 BlockRanges []int64
@ -131,11 +134,12 @@ type dbMetrics struct {
reloads prometheus.Counter reloads prometheus.Counter
reloadsFailed prometheus.Counter reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter compactionsTriggered prometheus.Counter
timeRetentionCount prometheus.Counter
compactionsSkipped prometheus.Counter compactionsSkipped prometheus.Counter
cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter
startTime prometheus.GaugeFunc startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
sizeRetentionCount prometheus.Counter
} }
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
@ -174,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.", Help: "Total number of triggered compactions for the partition.",
}) })
m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_time_retentions_total",
Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.",
})
m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_skipped_total", Name: "prometheus_tsdb_compactions_skipped_total",
Help: "Total number of skipped compactions due to disabled auto compaction.", Help: "Total number of skipped compactions due to disabled auto compaction.",
}) })
m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_total",
Help: "Number of times the database cut off block data from disk.",
})
m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_failures_total",
Help: "Number of times the database failed to cut off block data from disk.",
})
m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp", Name: "prometheus_tsdb_lowest_timestamp",
Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.",
@ -201,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_tombstone_cleanup_seconds", Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.", Help: "The time taken to recompact blocks to remove tombstones.",
}) })
m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_storage_blocks_bytes_total",
Help: "The number of bytes that are currently used for local storage by all blocks.",
})
m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_size_retentions_total",
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
})
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
@ -208,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.symbolTableSize, m.symbolTableSize,
m.reloads, m.reloads,
m.reloadsFailed, m.reloadsFailed,
m.cutoffs, m.timeRetentionCount,
m.cutoffsFailed,
m.compactionsTriggered, m.compactionsTriggered,
m.startTime, m.startTime,
m.tombCleanTimer, m.tombCleanTimer,
m.blocksBytes,
m.sizeRetentionCount,
) )
} }
return m return m
@ -344,25 +353,6 @@ func (db *DB) run() {
} }
} }
func (db *DB) beyondRetention(meta *BlockMeta) bool {
if db.opts.RetentionDuration == 0 {
return false
}
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
if len(blocks) == 0 {
return false
}
last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return meta.MaxTime < mint
}
// Appender opens a new appender against the database. // Appender opens a new appender against the database.
func (db *DB) Appender() Appender { func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()} return dbAppender{db: db, Appender: db.head.Appender()}
@ -427,7 +417,8 @@ func (db *DB) compact() (err error) {
// from the block interval here. // from the block interval here.
maxt: maxt - 1, maxt: maxt - 1,
} }
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block") return errors.Wrap(err, "persist head block")
} }
@ -436,6 +427,14 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }
if (uid == ulid.ULID{}) {
// Compaction resulted in an empty block.
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
runtime.GC() runtime.GC()
} }
@ -478,8 +477,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
return nil, false return nil, false
} }
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes // reload blocks and trigger head truncation if new blocks appeared.
// a list of block directories which should be deleted during reload.
// Blocks that are obsolete due to replacement or retention will be deleted. // Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) { func (db *DB) reload() (err error) {
defer func() { defer func() {
@ -489,112 +487,193 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc() db.metrics.reloads.Inc()
}() }()
dirs, err := blockDirs(db.dir) loadable, corrupted, err := db.openBlocks()
if err != nil { if err != nil {
return errors.Wrap(err, "find blocks") return err
} }
// We delete old blocks that have been superseded by new ones by gathering all parents
// from existing blocks. Those parents all have newer replacements and can be safely deleted
// after we loaded the other blocks.
// This makes us resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically. By creating
// blocks with their parents, we can pick up the deletion where it left off during a crash.
var (
blocks []*Block
corrupted = map[ulid.ULID]error{}
opened = map[ulid.ULID]struct{}{}
deleteable = map[ulid.ULID]struct{}{}
)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
// The block was potentially in the middle of being deleted during a crash.
// Skip it since we may delete it properly further down again.
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
ulid, err2 := ulid.Parse(filepath.Base(dir)) deletable := db.deletableBlocks(loadable)
if err2 != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) // Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
continue // This makes it resilient against the process crashing towards the end of a compaction.
} // Creation of a new block and deletion of its parents cannot happen atomically.
corrupted[ulid] = err // By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
continue for _, block := range loadable {
} for _, b := range block.Meta().Compaction.Parents {
if db.beyondRetention(meta) { delete(corrupted, b.ULID)
deleteable[meta.ULID] = struct{}{} deletable[b.ULID] = nil
continue
}
for _, b := range meta.Compaction.Parents {
deleteable[b.ULID] = struct{}{}
} }
} }
// Blocks we failed to open should all be those we are want to delete anyway. if len(corrupted) > 0 {
for c, err := range corrupted { return errors.Wrap(err, "unexpected corrupted block")
if _, ok := deleteable[c]; !ok {
return errors.Wrapf(err, "unexpected corrupted block %s", c)
}
} }
// Load new blocks into memory.
for _, dir := range dirs { // All deletable blocks should not be loaded.
meta, err := readMetaFile(dir) var (
if err != nil { bb []*Block
return errors.Wrapf(err, "read meta information %s", dir) blocksSize int64
} )
// Don't load blocks that are scheduled for deletion. for _, block := range loadable {
if _, ok := deleteable[meta.ULID]; ok { if _, ok := deletable[block.Meta().ULID]; ok {
deletable[block.Meta().ULID] = block
continue continue
} }
// See if we already have the block in memory or open it otherwise. bb = append(bb, block)
b, ok := db.getBlock(meta.ULID) blocksSize += block.Size()
if !ok {
b, err = OpenBlock(dir, db.chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
}
}
blocks = append(blocks, b)
opened[meta.ULID] = struct{}{}
} }
sort.Slice(blocks, func(i, j int) bool { loadable = bb
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime db.metrics.blocksBytes.Set(float64(blocksSize))
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime
}) })
if err := validateBlockSequence(blocks); err != nil { if err := validateBlockSequence(loadable); err != nil {
return errors.Wrap(err, "invalid block sequence") return errors.Wrap(err, "invalid block sequence")
} }
// Swap in new blocks first for subsequently created readers to be seen. // Swap new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
db.mtx.Lock() db.mtx.Lock()
oldBlocks := db.blocks oldBlocks := db.blocks
db.blocks = blocks db.blocks = loadable
db.mtx.Unlock() db.mtx.Unlock()
// Drop old blocks from memory.
for _, b := range oldBlocks { for _, b := range oldBlocks {
if _, ok := opened[b.Meta().ULID]; ok { if _, ok := deletable[b.Meta().ULID]; ok {
continue deletable[b.Meta().ULID] = b
}
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
} }
} }
// Delete all obsolete blocks. None of them are opened any longer.
for ulid := range deleteable { if err := db.deleteBlocks(deletable); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { return err
return errors.Wrapf(err, "delete obsolete block %s", ulid)
}
} }
// Garbage collect data in the head if the most recent persisted block // Garbage collect data in the head if the most recent persisted block
// covers data of its current time range. // covers data of its current time range.
if len(blocks) == 0 { if len(loadable) == 0 {
return nil return nil
} }
maxt := blocks[len(blocks)-1].Meta().MaxTime
maxt := loadable[len(loadable)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
} }
func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
dirs, err := blockDirs(db.dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
}
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
}
// See if we already have the block in memory or open it otherwise.
block, ok := db.getBlock(meta.ULID)
if !ok {
block, err = OpenBlock(db.logger, dir, db.chunkPool)
if err != nil {
corrupted[meta.ULID] = err
continue
}
}
blocks = append(blocks, block)
}
return blocks, corrupted, nil
}
// deletableBlocks returns all blocks past retention policy.
func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
deletable := make(map[ulid.ULID]*Block)
// Sort the blocks by time - newest to oldest (largest to smallest timestamp).
// This ensures that the retentions will remove the oldest blocks.
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime
})
for _, block := range blocks {
if block.Meta().Compaction.Deletable {
deletable[block.Meta().ULID] = block
}
}
for ulid, block := range db.beyondTimeRetention(blocks) {
deletable[ulid] = block
}
for ulid, block := range db.beyondSizeRetention(blocks) {
deletable[ulid] = block
}
return deletable
}
func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) {
// Time retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {
return
}
deleteable = make(map[ulid.ULID]*Block)
for i, block := range blocks {
// The difference between the first block and this block is larger than
// the retention period so any blocks after that are added as deleteable.
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) {
for _, b := range blocks[i:] {
deleteable[b.meta.ULID] = b
}
db.metrics.timeRetentionCount.Inc()
break
}
}
return deleteable
}
func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) {
// Size retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {
return
}
deleteable = make(map[ulid.ULID]*Block)
blocksSize := int64(0)
for i, block := range blocks {
blocksSize += block.Size()
if blocksSize > db.opts.MaxBytes {
// Add this and all following blocks for deletion.
for _, b := range blocks[i:] {
deleteable[b.meta.ULID] = b
}
db.metrics.sizeRetentionCount.Inc()
break
}
}
return deleteable
}
// deleteBlocks closes and deletes blocks from the disk.
// When the map contains a non nil block object it means it is loaded in memory
// so needs to be closed first as it might need to wait for pending readers to complete.
func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
for ulid, block := range blocks {
if block != nil {
if err := block.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
}
}
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
}
}
return nil
}
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error { func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 { if len(bs) <= 1 {

View file

@ -685,6 +685,7 @@ func (h *Head) getAppendBuffer() []RefSample {
} }
func (h *Head) putAppendBuffer(b []RefSample) { func (h *Head) putAppendBuffer(b []RefSample) {
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.appendPool.Put(b[:0]) h.appendPool.Put(b[:0])
} }
@ -697,6 +698,7 @@ func (h *Head) getBytesBuffer() []byte {
} }
func (h *Head) putBytesBuffer(b []byte) { func (h *Head) putBytesBuffer(b []byte) {
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.bytesPool.Put(b[:0]) h.bytesPool.Put(b[:0])
} }
@ -1094,25 +1096,30 @@ func (h *headIndexReader) Postings(name, value string) (index.Postings, error) {
} }
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
ep := make([]uint64, 0, 128) series := make([]*memSeries, 0, 128)
// Fetch all the series only once.
for p.Next() { for p.Next() {
ep = append(ep, p.At()) s := h.head.series.getByID(p.At())
if s == nil {
level.Debug(h.head.logger).Log("msg", "looked up series not found")
} else {
series = append(series, s)
}
} }
if err := p.Err(); err != nil { if err := p.Err(); err != nil {
return index.ErrPostings(errors.Wrap(err, "expand postings")) return index.ErrPostings(errors.Wrap(err, "expand postings"))
} }
sort.Slice(ep, func(i, j int) bool { sort.Slice(series, func(i, j int) bool {
a := h.head.series.getByID(ep[i]) return labels.Compare(series[i].lset, series[j].lset) < 0
b := h.head.series.getByID(ep[j])
if a == nil || b == nil {
level.Debug(h.head.logger).Log("msg", "looked up series not found")
return false
}
return labels.Compare(a.lset, b.lset) < 0
}) })
// Convert back to list.
ep := make([]uint64, 0, len(series))
for _, p := range series {
ep = append(ep, p.ref)
}
return index.NewListPostings(ep) return index.NewListPostings(ep)
} }

View file

@ -18,6 +18,8 @@ import (
"hash" "hash"
"hash/crc32" "hash/crc32"
"unsafe" "unsafe"
"github.com/pkg/errors"
) )
// enbuf is a helper type to populate a byte slice with various types. // enbuf is a helper type to populate a byte slice with various types.
@ -86,6 +88,60 @@ type decbuf struct {
e error e error
} }
// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func newDecbufAt(bs ByteSlice, off int) decbuf {
if bs.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := bs.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if bs.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = bs.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func newDecbufUvarintAt(bs ByteSlice, off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if bs.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := bs.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
}
if bs.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = bs.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be32int() int { return int(d.be32()) }

View file

@ -20,6 +20,7 @@ import (
"hash" "hash"
"hash/crc32" "hash/crc32"
"io" "io"
"io/ioutil"
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
@ -35,9 +36,13 @@ import (
const ( const (
// MagicIndex 4 bytes at the head of an index file. // MagicIndex 4 bytes at the head of an index file.
MagicIndex = 0xBAAAD700 MagicIndex = 0xBAAAD700
// HeaderLen represents number of bytes reserved of index for header.
HeaderLen = 5
indexFormatV1 = 1 // FormatV1 represents 1 version of index.
indexFormatV2 = 2 FormatV1 = 1
// FormatV2 represents 2 version of index.
FormatV2 = 2
labelNameSeperator = "\xff" labelNameSeperator = "\xff"
) )
@ -108,7 +113,7 @@ type Writer struct {
fbuf *bufio.Writer fbuf *bufio.Writer
pos uint64 pos uint64
toc indexTOC toc TOC
stage indexWriterStage stage indexWriterStage
// Reusable memory. // Reusable memory.
@ -129,13 +134,42 @@ type Writer struct {
Version int Version int
} }
type indexTOC struct { // TOC represents index Table Of Content that states where each section of index starts.
symbols uint64 type TOC struct {
series uint64 Symbols uint64
labelIndices uint64 Series uint64
labelIndicesTable uint64 LabelIndices uint64
postings uint64 LabelIndicesTable uint64
postingsTable uint64 Postings uint64
PostingsTable uint64
}
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
if bs.Len() < indexTOCLen {
return nil, errInvalidSize
}
b := bs.Range(bs.Len()-indexTOCLen, bs.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return nil, errors.Wrap(errInvalidChecksum, "read TOC")
}
if err := d.err(); err != nil {
return nil, err
}
return &TOC{
Symbols: d.be64(),
Series: d.be64(),
LabelIndices: d.be64(),
LabelIndicesTable: d.be64(),
Postings: d.be64(),
PostingsTable: d.be64(),
}, nil
} }
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2. // NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
// Mark start of sections in table of contents. // Mark start of sections in table of contents.
switch s { switch s {
case idxStageSymbols: case idxStageSymbols:
w.toc.symbols = w.pos w.toc.Symbols = w.pos
case idxStageSeries: case idxStageSeries:
w.toc.series = w.pos w.toc.Series = w.pos
case idxStageLabelIndex: case idxStageLabelIndex:
w.toc.labelIndices = w.pos w.toc.LabelIndices = w.pos
case idxStagePostings: case idxStagePostings:
w.toc.postings = w.pos w.toc.Postings = w.pos
case idxStageDone: case idxStageDone:
w.toc.labelIndicesTable = w.pos w.toc.LabelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil { if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err return err
} }
w.toc.postingsTable = w.pos w.toc.PostingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil { if err := w.writeOffsetTable(w.postings); err != nil {
return err return err
} }
@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error { func (w *Writer) writeMeta() error {
w.buf1.reset() w.buf1.reset()
w.buf1.putBE32(MagicIndex) w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV2) w.buf1.putByte(FormatV2)
return w.write(w.buf1.get()) return w.write(w.buf1.get())
} }
@ -346,8 +380,6 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
} }
sort.Strings(symbols) sort.Strings(symbols)
const headerSize = 4
w.buf1.reset() w.buf1.reset()
w.buf2.reset() w.buf2.reset()
@ -438,12 +470,12 @@ const indexTOCLen = 6*8 + 4
func (w *Writer) writeTOC() error { func (w *Writer) writeTOC() error {
w.buf1.reset() w.buf1.reset()
w.buf1.putBE64(w.toc.symbols) w.buf1.putBE64(w.toc.Symbols)
w.buf1.putBE64(w.toc.series) w.buf1.putBE64(w.toc.Series)
w.buf1.putBE64(w.toc.labelIndices) w.buf1.putBE64(w.toc.LabelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable) w.buf1.putBE64(w.toc.LabelIndicesTable)
w.buf1.putBE64(w.toc.postings) w.buf1.putBE64(w.toc.Postings)
w.buf1.putBE64(w.toc.postingsTable) w.buf1.putBE64(w.toc.PostingsTable)
w.buf1.putHash(w.crc32) w.buf1.putHash(w.crc32)
@ -535,15 +567,14 @@ type StringTuples interface {
} }
type Reader struct { type Reader struct {
// The underlying byte slice holding the encoded series data. b ByteSlice
b ByteSlice
toc indexTOC
// Close that releases the underlying resources of the byte slice. // Close that releases the underlying resources of the byte slice.
c io.Closer c io.Closer
// Cached hashmaps of section offsets. // Cached hashmaps of section offsets.
labels map[string]uint64 labels map[string]uint64
// LabelName to LabelValue to offset map.
postings map[string]map[string]uint64 postings map[string]map[string]uint64
// Cache of read symbols. Strings that are returned when reading from the // Cache of read symbols. Strings that are returned when reading from the
// block are always backed by true strings held in here rather than // block are always backed by true strings held in here rather than
@ -551,19 +582,17 @@ type Reader struct {
// prevents memory faults when applications work with read symbols after // prevents memory faults when applications work with read symbols after
// the block has been unmapped. The older format has sparse indexes so a map // the block has been unmapped. The older format has sparse indexes so a map
// must be used, but the new format is not so we can use a slice. // must be used, but the new format is not so we can use a slice.
symbols map[uint32]string symbolsV1 map[uint32]string
symbolSlice []string symbolsV2 []string
symbolsTableSize uint64
dec *Decoder dec *Decoder
crc32 hash.Hash32
version int version int
} }
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum") errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
@ -587,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end] return b[start:end]
} }
// NewReader returns a new IndexReader on the given byte slice. It automatically // NewReader returns a new index reader on the given byte slice. It automatically
// handles different format versions. // handles different format versions.
func NewReader(b ByteSlice) (*Reader, error) { func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, nil) return newReader(b, ioutil.NopCloser(nil))
} }
// NewFileReader returns a new index reader against the given index file. // NewFileReader returns a new index reader against the given index file.
@ -606,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r := &Reader{ r := &Reader{
b: b, b: b,
c: c, c: c,
symbols: map[uint32]string{},
labels: map[string]uint64{}, labels: map[string]uint64{},
postings: map[string]map[string]uint64{}, postings: map[string]map[string]uint64{},
crc32: newCRC32(),
} }
// Verify header. // Verify header.
if b.Len() < 5 { if r.b.Len() < HeaderLen {
return nil, errors.Wrap(errInvalidSize, "index header") return nil, errors.Wrap(errInvalidSize, "index header")
} }
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
@ -621,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
} }
r.version = int(r.b.Range(4, 5)[0]) r.version = int(r.b.Range(4, 5)[0])
if r.version != 1 && r.version != 2 { if r.version != FormatV1 && r.version != FormatV2 {
return nil, errors.Errorf("unknown index file version %d", r.version) return nil, errors.Errorf("unknown index file version %d", r.version)
} }
if err := r.readTOC(); err != nil { toc, err := NewTOCFromByteSlice(b)
if err != nil {
return nil, errors.Wrap(err, "read TOC") return nil, errors.Wrap(err, "read TOC")
} }
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols") return nil, errors.Wrap(err, "read symbols")
} }
var err error
// Use the strings already allocated by symbols, rather than // Use the strings already allocated by symbols, rather than
// re-allocating them again below. // re-allocating them again below.
symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice)) // Additionally, calculate symbolsTableSize.
for _, s := range r.symbols { allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2))
symbols[s] = s for _, s := range r.symbolsV1 {
r.symbolsTableSize += uint64(len(s) + 8)
allocatedSymbols[s] = s
} }
for _, s := range r.symbolSlice { for _, s := range r.symbolsV2 {
symbols[s] = s r.symbolsTableSize += uint64(len(s) + 8)
allocatedSymbols[s] = s
} }
err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error { if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error {
if len(key) != 1 { if len(key) != 1 {
return errors.Errorf("unexpected key length %d", len(key)) return errors.Errorf("unexpected key length for label indices table %d", len(key))
} }
r.labels[symbols[key[0]]] = off
r.labels[allocatedSymbols[key[0]]] = off
return nil return nil
}) }); err != nil {
if err != nil {
return nil, errors.Wrap(err, "read label index table") return nil, errors.Wrap(err, "read label index table")
} }
r.postings[""] = map[string]uint64{} r.postings[""] = map[string]uint64{}
err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error { if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error {
if len(key) != 2 { if len(key) != 2 {
return errors.Errorf("unexpected key length %d", len(key)) return errors.Errorf("unexpected key length for posting table %d", len(key))
} }
if _, ok := r.postings[key[0]]; !ok { if _, ok := r.postings[key[0]]; !ok {
r.postings[symbols[key[0]]] = map[string]uint64{} r.postings[allocatedSymbols[key[0]]] = map[string]uint64{}
} }
r.postings[key[0]][symbols[key[1]]] = off r.postings[key[0]][allocatedSymbols[key[1]]] = off
return nil return nil
}) }); err != nil {
if err != nil {
return nil, errors.Wrap(err, "read postings table") return nil, errors.Wrap(err, "read postings table")
} }
r.dec = &Decoder{lookupSymbol: r.lookupSymbol} r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
return r, nil return r, nil
} }
@ -690,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
for k, e := range r.postings { for k, e := range r.postings {
for v, start := range e { for v, start := range e {
d := r.decbufAt(int(start)) d := newDecbufAt(r.b, int(start))
if d.err() != nil { if d.err() != nil {
return nil, d.err() return nil, d.err()
} }
@ -703,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
return m, nil return m, nil
} }
func (r *Reader) readTOC() error { // ReadSymbols reads the symbol table fully into memory and allocates proper strings for them.
if r.b.Len() < indexTOCLen {
return errInvalidSize
}
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return errors.Wrap(errInvalidChecksum, "read TOC")
}
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
return d.err()
}
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *Reader) decbufAt(off int) decbuf {
if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *Reader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them // Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed. // after the reader is closed.
func (r *Reader) readSymbols(off int) error { func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) {
if off == 0 { if off == 0 {
return nil return nil, nil, nil
} }
d := r.decbufAt(off) d := newDecbufAt(bs, off)
var ( var (
origLen = d.len() origLen = d.len()
cnt = d.be32int() cnt = d.be32int()
basePos = uint32(off) + 4 basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.len()) nextPos = basePos + uint32(origLen-d.len())
symbolSlice []string
symbols = map[uint32]string{}
) )
if r.version == 2 { if version == 2 {
r.symbolSlice = make([]string, 0, cnt) symbolSlice = make([]string, 0, cnt)
} }
for d.err() == nil && d.len() > 0 && cnt > 0 { for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d.uvarintStr() s := d.uvarintStr()
if r.version == 2 { if version == FormatV2 {
r.symbolSlice = append(r.symbolSlice, s) symbolSlice = append(symbolSlice, s)
} else { } else {
r.symbols[nextPos] = s symbols[nextPos] = s
nextPos = basePos + uint32(origLen-d.len()) nextPos = basePos + uint32(origLen-d.len())
} }
cnt-- cnt--
} }
return errors.Wrap(d.err(), "read symbols") return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols")
} }
// readOffsetTable reads an offset table at the given position calls f for each // ReadOffsetTable reads an offset table and at the given position calls f for each
// found entry.f // found entry. If f returns an error it stops decoding and returns the received error.
// If f returns an error it stops decoding and returns the received error, func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error {
func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error { d := newDecbufAt(bs, int(off))
d := r.decbufAt(int(off))
cnt := d.be32() cnt := d.be32()
for d.err() == nil && d.len() > 0 && cnt > 0 { for d.err() == nil && d.len() > 0 && cnt > 0 {
@ -845,10 +801,10 @@ func (r *Reader) Close() error {
} }
func (r *Reader) lookupSymbol(o uint32) (string, error) { func (r *Reader) lookupSymbol(o uint32) (string, error) {
if int(o) < len(r.symbolSlice) { if int(o) < len(r.symbolsV2) {
return r.symbolSlice[o], nil return r.symbolsV2[o], nil
} }
s, ok := r.symbols[o] s, ok := r.symbolsV1[o]
if !ok { if !ok {
return "", errors.Errorf("unknown symbol offset %d", o) return "", errors.Errorf("unknown symbol offset %d", o)
} }
@ -857,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) {
// Symbols returns a set of symbols that exist within the index. // Symbols returns a set of symbols that exist within the index.
func (r *Reader) Symbols() (map[string]struct{}, error) { func (r *Reader) Symbols() (map[string]struct{}, error) {
res := make(map[string]struct{}, len(r.symbols)) res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2))
for _, s := range r.symbols { for _, s := range r.symbolsV1 {
res[s] = struct{}{} res[s] = struct{}{}
} }
for _, s := range r.symbolSlice { for _, s := range r.symbolsV2 {
res[s] = struct{}{} res[s] = struct{}{}
} }
return res, nil return res, nil
} }
// SymbolTableSize returns the symbol table that is used to resolve symbol references. // SymbolTableSize returns the symbol table size in bytes.
func (r *Reader) SymbolTableSize() uint64 { func (r *Reader) SymbolTableSize() uint64 {
var size int return r.symbolsTableSize
for _, s := range r.symbols {
size += len(s) + 8
}
for _, s := range r.symbolSlice {
size += len(s) + 8
}
return uint64(size)
} }
// LabelValues returns value tuples that exist for the given label name tuples. // LabelValues returns value tuples that exist for the given label name tuples.
@ -892,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist") //return nil, fmt.Errorf("label index doesn't exist")
} }
d := r.decbufAt(int(off)) d := newDecbufAt(r.b, int(off))
nc := d.be32int() nc := d.be32int()
d.be32() // consume unused value entry count. d.be32() // consume unused value entry count.
@ -916,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 }
// LabelIndices returns a slice of label names for which labels or label tuples value indices exist. // LabelIndices returns a slice of label names for which labels or label tuples value indices exist.
// NOTE: This is deprecated. Use `LabelNames()` instead. // NOTE: This is deprecated. Use `LabelNames()` instead.
func (r *Reader) LabelIndices() ([][]string, error) { func (r *Reader) LabelIndices() ([][]string, error) {
res := [][]string{} var res [][]string
for s := range r.labels { for s := range r.labels {
res = append(res, strings.Split(s, labelNameSeperator)) res = append(res, strings.Split(s, labelNameSeperator))
} }
@ -928,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
offset := id offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded // In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position. // and the ID is the multiple of 16 of the actual position.
if r.version == 2 { if r.version == FormatV2 {
offset = id * 16 offset = id * 16
} }
d := r.decbufUvarintAt(int(offset)) d := newDecbufUvarintAt(r.b, int(offset))
if d.err() != nil { if d.err() != nil {
return d.err() return d.err()
} }
@ -948,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
if !ok { if !ok {
return EmptyPostings(), nil return EmptyPostings(), nil
} }
d := r.decbufAt(int(off)) d := newDecbufAt(r.b, int(off))
if d.err() != nil { if d.err() != nil {
return nil, errors.Wrap(d.err(), "get postings entry") return nil, errors.Wrap(d.err(), "get postings entry")
} }
@ -965,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings {
return p return p
} }
// Size returns the size of an index file.
func (r *Reader) Size() int64 {
return int64(r.b.Len())
}
// LabelNames returns all the unique label names present in the index. // LabelNames returns all the unique label names present in the index.
func (r *Reader) LabelNames() ([]string, error) { func (r *Reader) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{}, len(r.labels)) labelNamesMap := make(map[string]struct{}, len(r.labels))
@ -1062,7 +1016,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
// It currently does not contain decoding methods for all entry types but can be extended // It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand. // by them if there's demand.
type Decoder struct { type Decoder struct {
lookupSymbol func(uint32) (string, error) LookupSymbol func(uint32) (string, error)
} }
// Postings returns a postings list for b and its number of elements. // Postings returns a postings list for b and its number of elements.
@ -1090,11 +1044,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
return errors.Wrap(d.err(), "read series label offsets") return errors.Wrap(d.err(), "read series label offsets")
} }
ln, err := dec.lookupSymbol(lno) ln, err := dec.LookupSymbol(lno)
if err != nil { if err != nil {
return errors.Wrap(err, "lookup label name") return errors.Wrap(err, "lookup label name")
} }
lv, err := dec.lookupSymbol(lvo) lv, err := dec.LookupSymbol(lvo)
if err != nil { if err != nil {
return errors.Wrap(err, "lookup label value") return errors.Wrap(err, "lookup label value")
} }

View file

@ -366,80 +366,25 @@ func Merge(its ...Postings) Postings {
if len(its) == 1 { if len(its) == 1 {
return its[0] return its[0]
} }
l := len(its) / 2 // All the uses of this function immediately expand it, so
return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...)) // collect everything in a map. This is more efficient
} // when there's 100ks of postings, compared to
// having a tree of merge objects.
type mergedPostings struct { pm := make(map[uint64]struct{}, len(its))
a, b Postings for _, it := range its {
initialized bool for it.Next() {
aok, bok bool pm[it.At()] = struct{}{}
cur uint64 }
} if it.Err() != nil {
return ErrPostings(it.Err())
func newMergedPostings(a, b Postings) *mergedPostings { }
return &mergedPostings{a: a, b: b}
}
func (it *mergedPostings) At() uint64 {
return it.cur
}
func (it *mergedPostings) Next() bool {
if !it.initialized {
it.aok = it.a.Next()
it.bok = it.b.Next()
it.initialized = true
} }
pl := make([]uint64, 0, len(pm))
if !it.aok && !it.bok { for p := range pm {
return false pl = append(pl, p)
} }
sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] })
if !it.aok { return newListPostings(pl)
it.cur = it.b.At()
it.bok = it.b.Next()
return true
}
if !it.bok {
it.cur = it.a.At()
it.aok = it.a.Next()
return true
}
acur, bcur := it.a.At(), it.b.At()
if acur < bcur {
it.cur = acur
it.aok = it.a.Next()
} else if acur > bcur {
it.cur = bcur
it.bok = it.b.Next()
} else {
it.cur = acur
it.aok = it.a.Next()
it.bok = it.b.Next()
}
return true
}
func (it *mergedPostings) Seek(id uint64) bool {
if it.cur >= id {
return true
}
it.aok = it.a.Seek(id)
it.bok = it.b.Seek(id)
it.initialized = true
return it.Next()
}
func (it *mergedPostings) Err() error {
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
} }
// Without returns a new postings list that contains all elements from the full list that // Without returns a new postings list that contains all elements from the full list that

View file

@ -15,7 +15,6 @@ package labels
import ( import (
"regexp" "regexp"
"strings"
) )
// Selector holds constraints for matching against a label set. // Selector holds constraints for matching against a label set.
@ -99,22 +98,3 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) }
func Not(m Matcher) Matcher { func Not(m Matcher) Matcher {
return &notMatcher{m} return &notMatcher{m}
} }
// PrefixMatcher implements Matcher for labels which values matches prefix.
type PrefixMatcher struct {
name, prefix string
}
// NewPrefixMatcher returns new Matcher for label name matching prefix.
func NewPrefixMatcher(name, prefix string) Matcher {
return &PrefixMatcher{name: name, prefix: prefix}
}
// Name implements Matcher interface.
func (m *PrefixMatcher) Name() string { return m.name }
// Prefix returns matching prefix.
func (m *PrefixMatcher) Prefix() string { return m.prefix }
// Matches implements Matcher interface.
func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) }

View file

@ -247,37 +247,6 @@ func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings,
return ix.SortedPostings(index.Intersect(its...)), nil return ix.SortedPostings(index.Intersect(its...)), nil
} }
// tuplesByPrefix uses binary search to find prefix matches within ts.
func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) {
var outErr error
tslen := ts.Len()
i := sort.Search(tslen, func(i int) bool {
vs, err := ts.At(i)
if err != nil {
outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err)
return true
}
val := vs[0]
l := len(m.Prefix())
if l > len(vs) {
l = len(val)
}
return val[:l] >= m.Prefix()
})
if outErr != nil {
return nil, outErr
}
var matches []string
for ; i < tslen; i++ {
vs, err := ts.At(i)
if err != nil || !m.Matches(vs[0]) {
return matches, err
}
matches = append(matches, vs[0])
}
return matches, nil
}
func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) { func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) {
// If the matcher selects an empty value, it selects all the series which don't // If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
@ -301,21 +270,13 @@ func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error
} }
var res []string var res []string
if pm, ok := m.(*labels.PrefixMatcher); ok { for i := 0; i < tpls.Len(); i++ {
res, err = tuplesByPrefix(pm, tpls) vals, err := tpls.At(i)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if m.Matches(vals[0]) {
} else { res = append(res, vals[0])
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return nil, err
}
if m.Matches(vals[0]) {
res = append(res, vals[0])
}
} }
} }
@ -620,11 +581,9 @@ func (s *populatedChunkSeries) Next() bool {
// This means that the chunk has be garbage collected. Remove it from the list. // This means that the chunk has be garbage collected. Remove it from the list.
if s.err == ErrNotFound { if s.err == ErrNotFound {
s.err = nil s.err = nil
// Delete in-place. // Delete in-place.
chks = append(chks[:j], chks[j+1:]...) s.chks = append(chks[:j], chks[j+1:]...)
} }
return false return false
} }
} }

2
vendor/github.com/prometheus/tsdb/staticcheck.conf generated vendored Normal file
View file

@ -0,0 +1,2 @@
# Enable only "legacy" staticcheck verifications.
checks = [ "SA*" ]

View file

@ -113,37 +113,41 @@ type Stone struct {
intervals Intervals intervals Intervals
} }
func readTombstones(dir string) (TombstoneReader, error) { func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return newMemTombstones(), nil return newMemTombstones(), nil, nil
} else if err != nil { } else if err != nil {
return nil, err return nil, nil, err
}
sr := &TombstoneFile{
size: int64(len(b)),
} }
if len(b) < 5 { if len(b) < 5 {
return nil, errors.Wrap(errInvalidSize, "tombstones header") return nil, sr, errors.Wrap(errInvalidSize, "tombstones header")
} }
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
if mg := d.be32(); mg != MagicTombstone { if mg := d.be32(); mg != MagicTombstone {
return nil, fmt.Errorf("invalid magic number %x", mg) return nil, sr, fmt.Errorf("invalid magic number %x", mg)
} }
if flag := d.byte(); flag != tombstoneFormatV1 { if flag := d.byte(); flag != tombstoneFormatV1 {
return nil, fmt.Errorf("invalid tombstone format %x", flag) return nil, sr, fmt.Errorf("invalid tombstone format %x", flag)
} }
if d.err() != nil { if d.err() != nil {
return nil, d.err() return nil, sr, d.err()
} }
// Verify checksum. // Verify checksum.
hash := newCRC32() hash := newCRC32()
if _, err := hash.Write(d.get()); err != nil { if _, err := hash.Write(d.get()); err != nil {
return nil, errors.Wrap(err, "write to hash") return nil, sr, errors.Wrap(err, "write to hash")
} }
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
return nil, errors.New("checksum did not match") return nil, sr, errors.New("checksum did not match")
} }
stonesMap := newMemTombstones() stonesMap := newMemTombstones()
@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) {
mint := d.varint64() mint := d.varint64()
maxt := d.varint64() maxt := d.varint64()
if d.err() != nil { if d.err() != nil {
return nil, d.err() return nil, sr, d.err()
} }
stonesMap.addInterval(k, Interval{mint, maxt}) stonesMap.addInterval(k, Interval{mint, maxt})
} }
return stonesMap, nil return stonesMap, sr, nil
} }
type memTombstones struct { type memTombstones struct {
@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
} }
} }
// TombstoneFile holds information about the tombstone file.
type TombstoneFile struct {
size int64
}
// Size returns the tombstone file size.
func (t *TombstoneFile) Size() int64 {
return t.size
}
func (*memTombstones) Close() error { func (*memTombstones) Close() error {
return nil return nil
} }

View file

@ -94,27 +94,6 @@ type WAL interface {
Close() error Close() error
} }
// NopWAL is a WAL that does nothing.
func NopWAL() WAL {
return nopWAL{}
}
type nopWAL struct{}
func (nopWAL) Read(
seriesf func([]RefSeries),
samplesf func([]RefSample),
deletesf func([]Stone),
) error {
return nil
}
func (w nopWAL) Reader() WALReader { return w }
func (nopWAL) LogSeries([]RefSeries) error { return nil }
func (nopWAL) LogSamples([]RefSample) error { return nil }
func (nopWAL) LogDeletes([]Stone) error { return nil }
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
func (nopWAL) Close() error { return nil }
// WALReader reads entries from a WAL. // WALReader reads entries from a WAL.
type WALReader interface { type WALReader interface {
Read( Read(
@ -909,16 +888,19 @@ func (r *walReader) Read(
if seriesf != nil { if seriesf != nil {
seriesf(v) seriesf(v)
} }
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
seriesPool.Put(v[:0]) seriesPool.Put(v[:0])
case []RefSample: case []RefSample:
if samplesf != nil { if samplesf != nil {
samplesf(v) samplesf(v)
} }
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
samplePool.Put(v[:0]) samplePool.Put(v[:0])
case []Stone: case []Stone:
if deletesf != nil { if deletesf != nil {
deletesf(v) deletesf(v)
} }
//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
deletePool.Put(v[:0]) deletePool.Put(v[:0])
default: default:
level.Error(r.logger).Log("msg", "unexpected data type") level.Error(r.logger).Log("msg", "unexpected data type")

View file

@ -164,6 +164,7 @@ type WAL struct {
page *page // active page page *page // active page
stopc chan chan struct{} stopc chan chan struct{}
actorc chan func() actorc chan func()
closed bool // To allow calling Close() more than once without blocking.
fsyncDuration prometheus.Summary fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter pageFlushes prometheus.Counter
@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
if w.closed {
return nil
}
// Flush the last page and zero out all its remaining size. // Flush the last page and zero out all its remaining size.
// We must not flush an empty page as it would falsely signal // We must not flush an empty page as it would falsely signal
// the segment is done if we start writing to it again after opening. // the segment is done if we start writing to it again after opening.
@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) {
if err := w.segment.Close(); err != nil { if err := w.segment.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err) level.Error(w.logger).Log("msg", "close previous segment", "err", err)
} }
w.closed = true
return nil return nil
} }
@ -827,28 +832,13 @@ func (r *Reader) next() (err error) {
} }
r.rec = append(r.rec, buf[:length]...) r.rec = append(r.rec, buf[:length]...)
switch r.curRecTyp { if err := validateRecord(r.curRecTyp, i); err != nil {
case recFull: return err
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record")
}
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record")
}
case recLast:
if i == 0 {
return errors.New("unexpected last record")
}
return nil
default:
return errors.Errorf("unexpected record type %d", r.curRecTyp)
} }
if r.curRecTyp == recLast || r.curRecTyp == recFull {
return nil
}
// Only increment i for non-zero records since we use it // Only increment i for non-zero records since we use it
// to determine valid content record sequences. // to determine valid content record sequences.
i++ i++
@ -899,6 +889,226 @@ func (r *Reader) Offset() int64 {
return r.total return r.total
} }
// NewLiveReader returns a new live reader.
func NewLiveReader(r io.Reader) *LiveReader {
return &LiveReader{rdr: r}
}
// Reader reads WAL records from an io.Reader. It buffers partial record data for
// the next read.
type LiveReader struct {
rdr io.Reader
err error
rec []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
writeIndex int // Index in buf to start at for next write.
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
}
func (r *LiveReader) Err() error {
return r.err
}
func (r *LiveReader) TotalRead() int64 {
return r.total
}
func (r *LiveReader) fillBuffer() error {
n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)])
r.writeIndex += n
return err
}
// Shift the buffer up to the read index.
func (r *LiveReader) shiftBuffer() {
copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex])
r.readIndex = 0
r.writeIndex = copied
}
// Next returns true if r.rec will contain a full record.
// False does not indicate that there will never be more data to
// read for the current io.Reader.
func (r *LiveReader) Next() bool {
for {
if r.buildRecord() {
return true
}
if r.err != nil && r.err != io.EOF {
return false
}
if r.readIndex == pageSize {
r.shiftBuffer()
}
if r.writeIndex != pageSize {
if err := r.fillBuffer(); err != nil {
// We expect to get EOF, since we're reading the segment file as it's being written.
if err != io.EOF {
r.err = err
}
return false
}
}
}
}
// Record returns the current record.
// The returned byte slice is only valid until the next call to Next.
func (r *LiveReader) Record() []byte {
return r.rec
}
// Rebuild a full record from potentially partial records. Returns false
// if there was an error or if we weren't able to read a record for any reason.
// Returns true if we read a full record. Any record data is appeneded to
// LiveReader.rec
func (r *LiveReader) buildRecord() bool {
for {
// Check that we have data in the internal buffer to read.
if r.writeIndex <= r.readIndex {
return false
}
// Attempt to read a record, partial or otherwise.
temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total)
r.readIndex += n
r.total += int64(n)
if err != nil {
r.err = err
return false
}
if temp == nil {
return false
}
rt := recType(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
r.rec = append(r.rec, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.err = err
r.index = 0
return false
}
if rt == recLast || rt == recFull {
r.index = 0
return true
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
r.index++
}
}
// Returns an error if the recType and i indicate an invalid record sequence.
// As an example, if i is > 0 because we've read some amount of a partial record
// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull
// instead of a recLast or recMiddle we would have an invalid record.
func validateRecord(typ recType, i int) error {
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record, dropping buffer")
}
return nil
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record, dropping buffer")
}
return nil
case recLast:
if i == 0 {
return errors.New("unexpected last record, dropping buffer")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
}
// Read a sub-record (see recType) from the buffer. It could potentially
// be a full record (recFull) if the record fits within the bounds of a single page.
// Returns a byte slice of the record data read, the number of bytes read, and an error
// if there's a non-zero byte in a page term record or the record checksum fails.
// TODO(callum) the EOF errors we're returning from this function should theoretically
// never happen, add a metric for them.
func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) {
readIndex := 0
header[0] = buf[0]
readIndex++
total++
// The rest of this function is mostly from Reader.Next().
typ := recType(header[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (total % pageSize)
if k == pageSize {
return nil, 1, nil // Initial 0 byte was last page byte.
}
if k <= int64(len(buf)-readIndex) {
for _, v := range buf[readIndex : int64(readIndex)+k] {
readIndex++
if v != 0 {
return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes")
}
}
return nil, readIndex, nil
}
// Not enough bytes to read the rest of the page term rec.
// This theoretically should never happen, since we're only shifting the
// internal buffer of the live reader when we read to the end of page.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
if readIndex+recordHeaderSize-1 > len(buf) {
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
copy(header[1:], buf[readIndex:readIndex+len(header[1:])])
readIndex += recordHeaderSize - 1
total += int64(recordHeaderSize - 1)
var (
length = binary.BigEndian.Uint16(header[1:])
crc = binary.BigEndian.Uint32(header[3:])
)
readTo := int(length) + readIndex
if readTo > len(buf) {
if (readTo - readIndex) > pageSize {
return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length))
}
// Not enough data to read all of the record data.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
recData := buf[readIndex:readTo]
readIndex += int(length)
total += int64(length)
// TODO(callum) what should we do here, throw out the record? We should add a metric at least.
if c := crc32.Checksum(recData, castagnoliTable); c != crc {
return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return recData, readIndex, nil
}
func min(i, j int) int { func min(i, j int) int {
if i < j { if i < j {
return i return i

2
vendor/modules.txt vendored
View file

@ -229,7 +229,7 @@ github.com/prometheus/procfs
github.com/prometheus/procfs/nfs github.com/prometheus/procfs/nfs
github.com/prometheus/procfs/xfs github.com/prometheus/procfs/xfs
github.com/prometheus/procfs/internal/util github.com/prometheus/procfs/internal/util
# github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc # github.com/prometheus/tsdb v0.4.0
github.com/prometheus/tsdb github.com/prometheus/tsdb
github.com/prometheus/tsdb/labels github.com/prometheus/tsdb/labels
github.com/prometheus/tsdb/chunkenc github.com/prometheus/tsdb/chunkenc