mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
vendor: update tsdb (#2840)
This commit is contained in:
parent
baf5b0f0fc
commit
7640960469
148
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
148
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -1,4 +1,5 @@
|
||||||
// Copyright 2017 The Prometheus Authors
|
// Copyright 2017 The Prometheus Authors
|
||||||
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
|
@ -21,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DiskBlock handles reads against a Block of time series data.
|
// DiskBlock handles reads against a Block of time series data.
|
||||||
|
@ -37,6 +39,12 @@ type DiskBlock interface {
|
||||||
// Chunks returns a ChunkReader over the block's data.
|
// Chunks returns a ChunkReader over the block's data.
|
||||||
Chunks() ChunkReader
|
Chunks() ChunkReader
|
||||||
|
|
||||||
|
// Tombstones returns a TombstoneReader over the block's deleted data.
|
||||||
|
Tombstones() TombstoneReader
|
||||||
|
|
||||||
|
// Delete deletes data from the block.
|
||||||
|
Delete(mint, maxt int64, ms ...labels.Matcher) error
|
||||||
|
|
||||||
// Close releases all underlying resources of the block.
|
// Close releases all underlying resources of the block.
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
@ -45,6 +53,7 @@ type DiskBlock interface {
|
||||||
type Block interface {
|
type Block interface {
|
||||||
DiskBlock
|
DiskBlock
|
||||||
Queryable
|
Queryable
|
||||||
|
Snapshottable
|
||||||
}
|
}
|
||||||
|
|
||||||
// headBlock is a regular block that can still be appended to.
|
// headBlock is a regular block that can still be appended to.
|
||||||
|
@ -53,6 +62,11 @@ type headBlock interface {
|
||||||
Appendable
|
Appendable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Snapshottable defines an entity that can be backedup online.
|
||||||
|
type Snapshottable interface {
|
||||||
|
Snapshot(dir string) error
|
||||||
|
}
|
||||||
|
|
||||||
// Appendable defines an entity to which data can be appended.
|
// Appendable defines an entity to which data can be appended.
|
||||||
type Appendable interface {
|
type Appendable interface {
|
||||||
// Appender returns a new Appender against an underlying store.
|
// Appender returns a new Appender against an underlying store.
|
||||||
|
@ -78,16 +92,27 @@ type BlockMeta struct {
|
||||||
MaxTime int64 `json:"maxTime"`
|
MaxTime int64 `json:"maxTime"`
|
||||||
|
|
||||||
// Stats about the contents of the block.
|
// Stats about the contents of the block.
|
||||||
Stats struct {
|
Stats BlockStats `json:"stats,omitempty"`
|
||||||
|
|
||||||
|
// Information on compactions the block was created from.
|
||||||
|
Compaction BlockMetaCompaction `json:"compaction"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BlockStats contains stats about contents of a block.
|
||||||
|
type BlockStats struct {
|
||||||
NumSamples uint64 `json:"numSamples,omitempty"`
|
NumSamples uint64 `json:"numSamples,omitempty"`
|
||||||
NumSeries uint64 `json:"numSeries,omitempty"`
|
NumSeries uint64 `json:"numSeries,omitempty"`
|
||||||
NumChunks uint64 `json:"numChunks,omitempty"`
|
NumChunks uint64 `json:"numChunks,omitempty"`
|
||||||
} `json:"stats,omitempty"`
|
NumTombstones uint64 `json:"numTombstones,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Information on compactions the block was created from.
|
// BlockMetaCompaction holds information about compactions a block went through.
|
||||||
Compaction struct {
|
type BlockMetaCompaction struct {
|
||||||
|
// Maximum number of compaction cycles any source block has
|
||||||
|
// gone through.
|
||||||
Generation int `json:"generation"`
|
Generation int `json:"generation"`
|
||||||
} `json:"compaction"`
|
// ULIDs of all source head blocks that went into the block.
|
||||||
|
Sources []ulid.ULID `json:"sources,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -136,7 +161,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
|
||||||
var merr MultiError
|
var merr MultiError
|
||||||
if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {
|
if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {
|
||||||
merr.Add(f.Close())
|
merr.Add(f.Close())
|
||||||
return merr
|
return merr.Err()
|
||||||
}
|
}
|
||||||
if err := f.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -150,6 +175,8 @@ type persistedBlock struct {
|
||||||
|
|
||||||
chunkr *chunkReader
|
chunkr *chunkReader
|
||||||
indexr *indexReader
|
indexr *indexReader
|
||||||
|
|
||||||
|
tombstones tombstoneReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPersistedBlock(dir string) (*persistedBlock, error) {
|
func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||||
|
@ -167,11 +194,17 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tr, err := readTombstones(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
pb := &persistedBlock{
|
pb := &persistedBlock{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
chunkr: cr,
|
chunkr: cr,
|
||||||
indexr: ir,
|
indexr: ir,
|
||||||
|
tombstones: tr,
|
||||||
}
|
}
|
||||||
return pb, nil
|
return pb, nil
|
||||||
}
|
}
|
||||||
|
@ -195,17 +228,120 @@ func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
index: pb.Index(),
|
index: pb.Index(),
|
||||||
chunks: pb.Chunks(),
|
chunks: pb.Chunks(),
|
||||||
|
tombstones: pb.Tombstones(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Dir() string { return pb.dir }
|
func (pb *persistedBlock) Dir() string { return pb.dir }
|
||||||
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
||||||
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
||||||
|
func (pb *persistedBlock) Tombstones() TombstoneReader {
|
||||||
|
return pb.tombstones
|
||||||
|
}
|
||||||
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
||||||
|
|
||||||
|
func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
|
pr := newPostingsReader(pb.indexr)
|
||||||
|
p, absent := pr.Select(ms...)
|
||||||
|
|
||||||
|
ir := pb.indexr
|
||||||
|
|
||||||
|
// Choose only valid postings which have chunks in the time-range.
|
||||||
|
stones := map[uint32]intervals{}
|
||||||
|
|
||||||
|
Outer:
|
||||||
|
for p.Next() {
|
||||||
|
lset, chunks, err := ir.Series(p.At())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, abs := range absent {
|
||||||
|
if lset.Get(abs) != "" {
|
||||||
|
continue Outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chk := range chunks {
|
||||||
|
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
||||||
|
// Delete only until the current vlaues and not beyond.
|
||||||
|
tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime)
|
||||||
|
stones[p.At()] = intervals{{tmin, tmax}}
|
||||||
|
continue Outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Err() != nil {
|
||||||
|
return p.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge the current and new tombstones.
|
||||||
|
for k, v := range stones {
|
||||||
|
pb.tombstones.add(k, v[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
|
||||||
|
return writeMetaFile(pb.dir, &pb.meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *persistedBlock) Snapshot(dir string) error {
|
||||||
|
blockDir := filepath.Join(dir, pb.meta.ULID.String())
|
||||||
|
if err := os.MkdirAll(blockDir, 0777); err != nil {
|
||||||
|
return errors.Wrap(err, "create snapshot block dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
chunksDir := chunkDir(blockDir)
|
||||||
|
if err := os.MkdirAll(chunksDir, 0777); err != nil {
|
||||||
|
return errors.Wrap(err, "create snapshot chunk dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hardlink meta, index and tombstones
|
||||||
|
for _, fname := range []string{
|
||||||
|
metaFilename,
|
||||||
|
indexFilename,
|
||||||
|
tombstoneFilename,
|
||||||
|
} {
|
||||||
|
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
|
||||||
|
return errors.Wrapf(err, "create snapshot %s", fname)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hardlink the chunks
|
||||||
|
curChunkDir := chunkDir(pb.dir)
|
||||||
|
files, err := ioutil.ReadDir(curChunkDir)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "ReadDir the current chunk dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name()))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "hardlink a chunk")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||||
func walDir(dir string) string { return filepath.Join(dir, "wal") }
|
func walDir(dir string) string { return filepath.Join(dir, "wal") }
|
||||||
|
|
||||||
|
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
|
||||||
|
if a < mint {
|
||||||
|
a = mint
|
||||||
|
}
|
||||||
|
if b > maxt {
|
||||||
|
b = maxt
|
||||||
|
}
|
||||||
|
|
||||||
|
return a, b
|
||||||
|
}
|
||||||
|
|
||||||
type mmapFile struct {
|
type mmapFile struct {
|
||||||
f *os.File
|
f *os.File
|
||||||
b []byte
|
b []byte
|
||||||
|
|
40
vendor/github.com/prometheus/tsdb/chunks.go
generated
vendored
40
vendor/github.com/prometheus/tsdb/chunks.go
generated
vendored
|
@ -54,6 +54,46 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
|
||||||
|
// returned.
|
||||||
|
type deletedIterator struct {
|
||||||
|
it chunks.Iterator
|
||||||
|
|
||||||
|
intervals intervals
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *deletedIterator) At() (int64, float64) {
|
||||||
|
return it.it.At()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *deletedIterator) Next() bool {
|
||||||
|
Outer:
|
||||||
|
for it.it.Next() {
|
||||||
|
ts, _ := it.it.At()
|
||||||
|
|
||||||
|
for _, tr := range it.intervals {
|
||||||
|
if tr.inBounds(ts) {
|
||||||
|
continue Outer
|
||||||
|
}
|
||||||
|
|
||||||
|
if ts > tr.maxt {
|
||||||
|
it.intervals = it.intervals[1:]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *deletedIterator) Err() error {
|
||||||
|
return it.it.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// ChunkWriter serializes a time block of chunked series data.
|
// ChunkWriter serializes a time block of chunked series data.
|
||||||
type ChunkWriter interface {
|
type ChunkWriter interface {
|
||||||
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
|
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
|
||||||
|
|
135
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
135
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -70,7 +71,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
Name: "tsdb_compactions_failed_total",
|
Name: "tsdb_compactions_failed_total",
|
||||||
Help: "Total number of compactions that failed for the partition.",
|
Help: "Total number of compactions that failed for the partition.",
|
||||||
})
|
})
|
||||||
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
|
m.duration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
Name: "tsdb_compaction_duration",
|
Name: "tsdb_compaction_duration",
|
||||||
Help: "Duration of compaction runs.",
|
Help: "Duration of compaction runs.",
|
||||||
})
|
})
|
||||||
|
@ -165,17 +166,35 @@ func (c *compactor) match(dirs []dirMeta) bool {
|
||||||
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
|
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
m0 := blocks[0].Meta()
|
res.MinTime = blocks[0].MinTime
|
||||||
|
res.MaxTime = blocks[len(blocks)-1].MaxTime
|
||||||
|
|
||||||
res.MinTime = m0.MinTime
|
sources := map[ulid.ULID]struct{}{}
|
||||||
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
|
||||||
|
|
||||||
res.Compaction.Generation = m0.Compaction.Generation + 1
|
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
res.Stats.NumSamples += b.Meta().Stats.NumSamples
|
res.Stats.NumSamples += b.Stats.NumSamples
|
||||||
|
|
||||||
|
if b.Compaction.Generation > res.Compaction.Generation {
|
||||||
|
res.Compaction.Generation = b.Compaction.Generation
|
||||||
}
|
}
|
||||||
|
for _, s := range b.Compaction.Sources {
|
||||||
|
sources[s] = struct{}{}
|
||||||
|
}
|
||||||
|
// If it's an in memory block, its ULID goes into the sources.
|
||||||
|
if b.Compaction.Generation == 0 {
|
||||||
|
sources[b.ULID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res.Compaction.Generation++
|
||||||
|
|
||||||
|
for s := range sources {
|
||||||
|
res.Compaction.Sources = append(res.Compaction.Sources, s)
|
||||||
|
}
|
||||||
|
sort.Slice(res.Compaction.Sources, func(i, j int) bool {
|
||||||
|
return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0
|
||||||
|
})
|
||||||
|
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,6 +238,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.metrics.failed.Inc()
|
c.metrics.failed.Inc()
|
||||||
}
|
}
|
||||||
|
c.metrics.ran.Inc()
|
||||||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
|
@ -244,7 +264,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
return errors.Wrap(err, "open index writer")
|
return errors.Wrap(err, "open index writer")
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := c.populate(blocks, indexw, chunkw)
|
meta, err := populateBlock(blocks, indexw, chunkw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "write compaction")
|
return errors.Wrap(err, "write compaction")
|
||||||
}
|
}
|
||||||
|
@ -261,6 +281,11 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
return errors.Wrap(err, "close index writer")
|
return errors.Wrap(err, "close index writer")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create an empty tombstones file.
|
||||||
|
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
||||||
|
return errors.Wrap(err, "write new tombstones file")
|
||||||
|
}
|
||||||
|
|
||||||
// Block successfully written, make visible and remove old ones.
|
// Block successfully written, make visible and remove old ones.
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
if err := renameFile(tmp, dir); err != nil {
|
||||||
return errors.Wrap(err, "rename block dir")
|
return errors.Wrap(err, "rename block dir")
|
||||||
|
@ -275,6 +300,8 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "sync block dir")
|
return errors.Wrap(err, "sync block dir")
|
||||||
}
|
}
|
||||||
|
defer df.Close()
|
||||||
|
|
||||||
if err := fileutil.Fsync(df); err != nil {
|
if err := fileutil.Fsync(df); err != nil {
|
||||||
return errors.Wrap(err, "sync block dir")
|
return errors.Wrap(err, "sync block dir")
|
||||||
}
|
}
|
||||||
|
@ -282,17 +309,20 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// populate fills the index and chunk writers with new data gathered as the union
|
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks. It returns meta information for the new block.
|
||||||
func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
||||||
var set compactionSet
|
var set compactionSet
|
||||||
|
var metas []BlockMeta
|
||||||
|
|
||||||
for i, b := range blocks {
|
for i, b := range blocks {
|
||||||
|
metas = append(metas, b.Meta())
|
||||||
|
|
||||||
all, err := b.Index().Postings("", "")
|
all, err := b.Index().Postings("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
|
s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
set = s
|
set = s
|
||||||
|
@ -309,18 +339,40 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
|
||||||
postings = &memPostings{m: make(map[term][]uint32, 512)}
|
postings = &memPostings{m: make(map[term][]uint32, 512)}
|
||||||
values = map[string]stringset{}
|
values = map[string]stringset{}
|
||||||
i = uint32(0)
|
i = uint32(0)
|
||||||
meta = mergeBlockMetas(blocks...)
|
meta = compactBlockMetas(metas...)
|
||||||
)
|
)
|
||||||
|
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
lset, chunks := set.At()
|
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
||||||
if err := chunkw.WriteChunks(chunks...); err != nil {
|
|
||||||
|
if len(dranges) > 0 {
|
||||||
|
// Re-encode the chunk to not have deleted values.
|
||||||
|
for _, chk := range chks {
|
||||||
|
if intervalOverlap(dranges[0].mint, dranges[len(dranges)-1].maxt, chk.MinTime, chk.MaxTime) {
|
||||||
|
newChunk := chunks.NewXORChunk()
|
||||||
|
app, err := newChunk.Appender()
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
indexw.AddSeries(i, lset, chunks...)
|
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
|
||||||
|
for it.Next() {
|
||||||
|
ts, v := it.At()
|
||||||
|
app.Append(ts, v)
|
||||||
|
}
|
||||||
|
|
||||||
meta.Stats.NumChunks += uint64(len(chunks))
|
chk.Chunk = newChunk
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chunkw.WriteChunks(chks...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
indexw.AddSeries(i, lset, chks...)
|
||||||
|
|
||||||
|
meta.Stats.NumChunks += uint64(len(chks))
|
||||||
meta.Stats.NumSeries++
|
meta.Stats.NumSeries++
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
|
@ -370,7 +422,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
|
||||||
|
|
||||||
type compactionSet interface {
|
type compactionSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []*ChunkMeta)
|
At() (labels.Labels, []*ChunkMeta, intervals)
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,16 +430,19 @@ type compactionSeriesSet struct {
|
||||||
p Postings
|
p Postings
|
||||||
index IndexReader
|
index IndexReader
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
|
tombstones TombstoneReader
|
||||||
|
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []*ChunkMeta
|
c []*ChunkMeta
|
||||||
|
intervals intervals
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet {
|
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p Postings) *compactionSeriesSet {
|
||||||
return &compactionSeriesSet{
|
return &compactionSeriesSet{
|
||||||
index: i,
|
index: i,
|
||||||
chunks: c,
|
chunks: c,
|
||||||
|
tombstones: t,
|
||||||
p: p,
|
p: p,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -397,10 +452,25 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.intervals = c.tombstones.Get(c.p.At())
|
||||||
|
|
||||||
c.l, c.c, c.err = c.index.Series(c.p.At())
|
c.l, c.c, c.err = c.index.Series(c.p.At())
|
||||||
if c.err != nil {
|
if c.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove completely deleted chunks.
|
||||||
|
if len(c.intervals) > 0 {
|
||||||
|
chks := make([]*ChunkMeta, 0, len(c.c))
|
||||||
|
for _, chk := range c.c {
|
||||||
|
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
|
||||||
|
chks = append(chks, chk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.c = chks
|
||||||
|
}
|
||||||
|
|
||||||
for _, chk := range c.c {
|
for _, chk := range c.c {
|
||||||
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
|
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
|
||||||
if c.err != nil {
|
if c.err != nil {
|
||||||
|
@ -418,8 +488,8 @@ func (c *compactionSeriesSet) Err() error {
|
||||||
return c.p.Err()
|
return c.p.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta) {
|
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||||
return c.l, c.c
|
return c.l, c.c, c.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionMerger struct {
|
type compactionMerger struct {
|
||||||
|
@ -428,6 +498,7 @@ type compactionMerger struct {
|
||||||
aok, bok bool
|
aok, bok bool
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []*ChunkMeta
|
c []*ChunkMeta
|
||||||
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionSeries struct {
|
type compactionSeries struct {
|
||||||
|
@ -455,8 +526,8 @@ func (c *compactionMerger) compare() int {
|
||||||
if !c.bok {
|
if !c.bok {
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
a, _ := c.a.At()
|
a, _, _ := c.a.At()
|
||||||
b, _ := c.b.At()
|
b, _, _ := c.b.At()
|
||||||
return labels.Compare(a, b)
|
return labels.Compare(a, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -468,17 +539,21 @@ func (c *compactionMerger) Next() bool {
|
||||||
d := c.compare()
|
d := c.compare()
|
||||||
// Both sets contain the current series. Chain them into a single one.
|
// Both sets contain the current series. Chain them into a single one.
|
||||||
if d > 0 {
|
if d > 0 {
|
||||||
c.l, c.c = c.b.At()
|
c.l, c.c, c.intervals = c.b.At()
|
||||||
c.bok = c.b.Next()
|
c.bok = c.b.Next()
|
||||||
} else if d < 0 {
|
} else if d < 0 {
|
||||||
c.l, c.c = c.a.At()
|
c.l, c.c, c.intervals = c.a.At()
|
||||||
c.aok = c.a.Next()
|
c.aok = c.a.Next()
|
||||||
} else {
|
} else {
|
||||||
l, ca := c.a.At()
|
l, ca, ra := c.a.At()
|
||||||
_, cb := c.b.At()
|
_, cb, rb := c.b.At()
|
||||||
|
for _, r := range rb {
|
||||||
|
ra = ra.add(r)
|
||||||
|
}
|
||||||
|
|
||||||
c.l = l
|
c.l = l
|
||||||
c.c = append(ca, cb...)
|
c.c = append(ca, cb...)
|
||||||
|
c.intervals = ra
|
||||||
|
|
||||||
c.aok = c.a.Next()
|
c.aok = c.a.Next()
|
||||||
c.bok = c.b.Next()
|
c.bok = c.b.Next()
|
||||||
|
@ -493,8 +568,8 @@ func (c *compactionMerger) Err() error {
|
||||||
return c.b.Err()
|
return c.b.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta) {
|
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||||
return c.l, c.c
|
return c.l, c.c, c.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func renameFile(from, to string) error {
|
func renameFile(from, to string) error {
|
||||||
|
@ -510,6 +585,8 @@ func renameFile(from, to string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer pdir.Close()
|
||||||
|
|
||||||
if err = fileutil.Fsync(pdir); err != nil {
|
if err = fileutil.Fsync(pdir); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
141
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
141
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -119,16 +119,49 @@ type DB struct {
|
||||||
compactc chan struct{}
|
compactc chan struct{}
|
||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
|
|
||||||
|
// cmtx is used to control compactions and deletions.
|
||||||
|
cmtx sync.Mutex
|
||||||
|
compacting bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbMetrics struct {
|
type dbMetrics struct {
|
||||||
|
activeAppenders prometheus.Gauge
|
||||||
|
loadedBlocks prometheus.GaugeFunc
|
||||||
|
reloads prometheus.Counter
|
||||||
|
reloadsFailed prometheus.Counter
|
||||||
|
reloadDuration prometheus.Summary
|
||||||
samplesAppended prometheus.Counter
|
samplesAppended prometheus.Counter
|
||||||
compactionsTriggered prometheus.Counter
|
compactionsTriggered prometheus.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
||||||
m := &dbMetrics{}
|
m := &dbMetrics{}
|
||||||
|
|
||||||
|
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "tsdb_active_appenders",
|
||||||
|
Help: "Number of currently active appender transactions",
|
||||||
|
})
|
||||||
|
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Name: "tsdb_blocks_loaded",
|
||||||
|
Help: "Number of currently loaded data blocks",
|
||||||
|
}, func() float64 {
|
||||||
|
db.mtx.RLock()
|
||||||
|
defer db.mtx.RUnlock()
|
||||||
|
return float64(len(db.blocks))
|
||||||
|
})
|
||||||
|
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "tsdb_reloads_total",
|
||||||
|
Help: "Number of times the database reloaded block data from disk.",
|
||||||
|
})
|
||||||
|
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "tsdb_reloads_failures_total",
|
||||||
|
Help: "Number of times the database failed to reload black data from disk.",
|
||||||
|
})
|
||||||
|
m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Name: "tsdb_reload_duration_seconds",
|
||||||
|
Help: "Duration of block reloads.",
|
||||||
|
})
|
||||||
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
|
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "tsdb_samples_appended_total",
|
Name: "tsdb_samples_appended_total",
|
||||||
Help: "Total number of appended sampledb.",
|
Help: "Total number of appended sampledb.",
|
||||||
|
@ -140,6 +173,11 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
||||||
|
|
||||||
if r != nil {
|
if r != nil {
|
||||||
r.MustRegister(
|
r.MustRegister(
|
||||||
|
m.activeAppenders,
|
||||||
|
m.loadedBlocks,
|
||||||
|
m.reloads,
|
||||||
|
m.reloadsFailed,
|
||||||
|
m.reloadDuration,
|
||||||
m.samplesAppended,
|
m.samplesAppended,
|
||||||
m.compactionsTriggered,
|
m.compactionsTriggered,
|
||||||
)
|
)
|
||||||
|
@ -165,12 +203,14 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
db = &DB{
|
db = &DB{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
logger: l,
|
logger: l,
|
||||||
metrics: newDBMetrics(r),
|
|
||||||
opts: opts,
|
opts: opts,
|
||||||
compactc: make(chan struct{}, 1),
|
compactc: make(chan struct{}, 1),
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
|
compacting: true,
|
||||||
}
|
}
|
||||||
|
db.metrics = newDBMetrics(db, r)
|
||||||
|
|
||||||
if !opts.NoLockfile {
|
if !opts.NoLockfile {
|
||||||
absdir, err := filepath.Abs(dir)
|
absdir, err := filepath.Abs(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -198,6 +238,11 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dir returns the directory of the database.
|
||||||
|
func (db *DB) Dir() string {
|
||||||
|
return db.dir
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) run() {
|
func (db *DB) run() {
|
||||||
defer close(db.donec)
|
defer close(db.donec)
|
||||||
|
|
||||||
|
@ -261,6 +306,9 @@ func (db *DB) retentionCutoff() (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) compact() (changes bool, err error) {
|
func (db *DB) compact() (changes bool, err error) {
|
||||||
|
db.cmtx.Lock()
|
||||||
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.headmtx.RLock()
|
db.headmtx.RLock()
|
||||||
|
|
||||||
// Check whether we have pending head blocks that are ready to be persisted.
|
// Check whether we have pending head blocks that are ready to be persisted.
|
||||||
|
@ -338,6 +386,8 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "open directory")
|
return false, errors.Wrapf(err, "open directory")
|
||||||
}
|
}
|
||||||
|
defer df.Close()
|
||||||
|
|
||||||
dirs, err := blockDirs(dir)
|
dirs, err := blockDirs(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "list block dirs %s", dir)
|
return false, errors.Wrapf(err, "list block dirs %s", dir)
|
||||||
|
@ -374,7 +424,15 @@ func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) reloadBlocks() error {
|
func (db *DB) reloadBlocks() (err error) {
|
||||||
|
defer func(t time.Time) {
|
||||||
|
if err != nil {
|
||||||
|
db.metrics.reloadsFailed.Inc()
|
||||||
|
}
|
||||||
|
db.metrics.reloads.Inc()
|
||||||
|
db.metrics.reloadDuration.Observe(time.Since(t).Seconds())
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
var cs []io.Closer
|
var cs []io.Closer
|
||||||
defer func() { closeAll(cs...) }()
|
defer func() { closeAll(cs...) }()
|
||||||
|
|
||||||
|
@ -418,6 +476,7 @@ func (db *DB) reloadBlocks() error {
|
||||||
if err := validateBlockSequence(blocks); err != nil {
|
if err := validateBlockSequence(blocks); err != nil {
|
||||||
return errors.Wrap(err, "invalid block sequence")
|
return errors.Wrap(err, "invalid block sequence")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all opened blocks that no longer exist after we returned all locks.
|
// Close all opened blocks that no longer exist after we returned all locks.
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
if _, ok := exist[b.Meta().ULID]; !ok {
|
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||||
|
@ -447,7 +506,7 @@ func validateBlockSequence(bs []Block) error {
|
||||||
prev := bs[0]
|
prev := bs[0]
|
||||||
for _, b := range bs[1:] {
|
for _, b := range bs[1:] {
|
||||||
if b.Meta().MinTime < prev.Meta().MaxTime {
|
if b.Meta().MinTime < prev.Meta().MaxTime {
|
||||||
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime)
|
return errors.Errorf("block time ranges overlap (%d, %d)", b.Meta().MinTime, prev.Meta().MaxTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -478,8 +537,47 @@ func (db *DB) Close() error {
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DisableCompactions disables compactions.
|
||||||
|
func (db *DB) DisableCompactions() {
|
||||||
|
if db.compacting {
|
||||||
|
db.cmtx.Lock()
|
||||||
|
db.compacting = false
|
||||||
|
db.logger.Log("msg", "compactions disabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableCompactions enables compactions.
|
||||||
|
func (db *DB) EnableCompactions() {
|
||||||
|
if !db.compacting {
|
||||||
|
db.cmtx.Unlock()
|
||||||
|
db.compacting = true
|
||||||
|
db.logger.Log("msg", "compactions enabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot writes the current data to the directory.
|
||||||
|
func (db *DB) Snapshot(dir string) error {
|
||||||
|
db.mtx.Lock() // To block any appenders.
|
||||||
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
|
db.cmtx.Lock()
|
||||||
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
|
blocks := db.blocks[:]
|
||||||
|
for _, b := range blocks {
|
||||||
|
db.logger.Log("msg", "snapshotting block", "block", b)
|
||||||
|
if err := b.Snapshot(dir); err != nil {
|
||||||
|
return errors.Wrap(err, "error snapshotting headblock")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Appender returns a new Appender on the database.
|
// Appender returns a new Appender on the database.
|
||||||
func (db *DB) Appender() Appender {
|
func (db *DB) Appender() Appender {
|
||||||
|
db.metrics.activeAppenders.Inc()
|
||||||
|
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
return &dbAppender{db: db}
|
return &dbAppender{db: db}
|
||||||
}
|
}
|
||||||
|
@ -619,6 +717,7 @@ func (db *DB) ensureHead(t int64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Commit() error {
|
func (a *dbAppender) Commit() error {
|
||||||
|
defer a.db.metrics.activeAppenders.Dec()
|
||||||
defer a.db.mtx.RUnlock()
|
defer a.db.mtx.RUnlock()
|
||||||
|
|
||||||
// Commits to partial appenders must be concurrent as concurrent appenders
|
// Commits to partial appenders must be concurrent as concurrent appenders
|
||||||
|
@ -649,6 +748,7 @@ func (a *dbAppender) Commit() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Rollback() error {
|
func (a *dbAppender) Rollback() error {
|
||||||
|
defer a.db.metrics.activeAppenders.Dec()
|
||||||
defer a.db.mtx.RUnlock()
|
defer a.db.mtx.RUnlock()
|
||||||
|
|
||||||
var g errgroup.Group
|
var g errgroup.Group
|
||||||
|
@ -660,6 +760,30 @@ func (a *dbAppender) Rollback() error {
|
||||||
return g.Wait()
|
return g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete implements deletion of metrics.
|
||||||
|
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
|
db.cmtx.Lock()
|
||||||
|
defer db.cmtx.Unlock()
|
||||||
|
db.mtx.Lock()
|
||||||
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
|
blocks := db.blocksForInterval(mint, maxt)
|
||||||
|
|
||||||
|
var g errgroup.Group
|
||||||
|
|
||||||
|
for _, b := range blocks {
|
||||||
|
g.Go(func(b Block) func() error {
|
||||||
|
return func() error { return b.Delete(mint, maxt, ms...) }
|
||||||
|
}(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||||
func (db *DB) appendable() (r []headBlock) {
|
func (db *DB) appendable() (r []headBlock) {
|
||||||
switch len(db.heads) {
|
switch len(db.heads) {
|
||||||
|
@ -673,13 +797,8 @@ func (db *DB) appendable() (r []headBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||||
if bmin >= amin && bmin <= amax {
|
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
||||||
return true
|
return amin <= bmax && bmin <= amax
|
||||||
}
|
|
||||||
if amin >= bmin && amin <= bmax {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func intervalContains(min, max, t int64) bool {
|
func intervalContains(min, max, t int64) bool {
|
||||||
|
|
16
vendor/github.com/prometheus/tsdb/encoding_helpers.go
generated
vendored
16
vendor/github.com/prometheus/tsdb/encoding_helpers.go
generated
vendored
|
@ -22,6 +22,7 @@ func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
|
||||||
|
|
||||||
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
|
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
|
||||||
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
|
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
|
||||||
|
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
|
||||||
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
|
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
|
||||||
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
|
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
|
||||||
|
|
||||||
|
@ -72,7 +73,9 @@ type decbuf struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||||
|
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
||||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||||
|
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||||
|
|
||||||
func (d *decbuf) uvarintStr() string {
|
func (d *decbuf) uvarintStr() string {
|
||||||
l := d.uvarint64()
|
l := d.uvarint64()
|
||||||
|
@ -140,6 +143,19 @@ func (d *decbuf) be32() uint32 {
|
||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *decbuf) byte() byte {
|
||||||
|
if d.e != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if len(d.b) < 1 {
|
||||||
|
d.e = errInvalidSize
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
x := d.b[0]
|
||||||
|
d.b = d.b[1:]
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
||||||
func (d *decbuf) decbuf(l int) decbuf {
|
func (d *decbuf) decbuf(l int) decbuf {
|
||||||
if d.e != nil {
|
if d.e != nil {
|
||||||
return decbuf{e: d.e}
|
return decbuf{e: d.e}
|
||||||
|
|
222
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
222
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -69,6 +69,8 @@ type HeadBlock struct {
|
||||||
values map[string]stringset // label names to possible values
|
values map[string]stringset // label names to possible values
|
||||||
postings *memPostings // postings lists for terms
|
postings *memPostings // postings lists for terms
|
||||||
|
|
||||||
|
tombstones tombstoneReader
|
||||||
|
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,6 +99,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return dir, renameFile(tmp, dir)
|
return dir, renameFile(tmp, dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,6 +118,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
postings: &memPostings{m: make(map[term][]uint32)},
|
postings: &memPostings{m: make(map[term][]uint32)},
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
|
tombstones: newEmptyTombstoneReader(),
|
||||||
}
|
}
|
||||||
return h, h.init()
|
return h, h.init()
|
||||||
}
|
}
|
||||||
|
@ -122,16 +126,19 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
||||||
func (h *HeadBlock) init() error {
|
func (h *HeadBlock) init() error {
|
||||||
r := h.wal.Reader()
|
r := h.wal.Reader()
|
||||||
|
|
||||||
for r.Next() {
|
seriesFunc := func(series []labels.Labels) error {
|
||||||
series, samples := r.At()
|
|
||||||
|
|
||||||
for _, lset := range series {
|
for _, lset := range series {
|
||||||
h.create(lset.Hash(), lset)
|
h.create(lset.Hash(), lset)
|
||||||
h.meta.Stats.NumSeries++
|
h.meta.Stats.NumSeries++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
samplesFunc := func(samples []RefSample) error {
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if int(s.Ref) >= len(h.series) {
|
if int(s.Ref) >= len(h.series) {
|
||||||
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
|
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore",
|
||||||
|
s.Ref, len(h.series))
|
||||||
}
|
}
|
||||||
h.series[s.Ref].append(s.T, s.V)
|
h.series[s.Ref].append(s.T, s.V)
|
||||||
|
|
||||||
|
@ -140,8 +147,24 @@ func (h *HeadBlock) init() error {
|
||||||
}
|
}
|
||||||
h.meta.Stats.NumSamples++
|
h.meta.Stats.NumSamples++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return errors.Wrap(r.Err(), "consume WAL")
|
deletesFunc := func(stones []Stone) error {
|
||||||
|
for _, s := range stones {
|
||||||
|
for _, itv := range s.intervals {
|
||||||
|
h.tombstones.add(s.ref, itv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
||||||
|
return errors.Wrap(err, "consume WAL")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// inBounds returns true if the given timestamp is within the valid
|
// inBounds returns true if the given timestamp is within the valid
|
||||||
|
@ -195,6 +218,114 @@ func (h *HeadBlock) Meta() BlockMeta {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tombstones returns the TombstoneReader against the block.
|
||||||
|
func (h *HeadBlock) Tombstones() TombstoneReader {
|
||||||
|
return h.tombstones
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete implements headBlock.
|
||||||
|
func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error {
|
||||||
|
ir := h.Index()
|
||||||
|
|
||||||
|
pr := newPostingsReader(ir)
|
||||||
|
p, absent := pr.Select(ms...)
|
||||||
|
|
||||||
|
var stones []Stone
|
||||||
|
|
||||||
|
Outer:
|
||||||
|
for p.Next() {
|
||||||
|
ref := p.At()
|
||||||
|
lset := h.series[ref].lset
|
||||||
|
for _, abs := range absent {
|
||||||
|
if lset.Get(abs) != "" {
|
||||||
|
continue Outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete only until the current values and not beyond.
|
||||||
|
tmin, tmax := clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime)
|
||||||
|
stones = append(stones, Stone{ref, intervals{{tmin, tmax}}})
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Err() != nil {
|
||||||
|
return p.Err()
|
||||||
|
}
|
||||||
|
if err := h.wal.LogDeletes(stones); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range stones {
|
||||||
|
h.tombstones.add(s.ref, s.intervals[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
h.meta.Stats.NumTombstones = uint64(len(h.tombstones))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot persists the current state of the headblock to the given directory.
|
||||||
|
// TODO(gouthamve): Snapshot must be called when there are no active appenders.
|
||||||
|
// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should
|
||||||
|
// be removed in the future.
|
||||||
|
func (h *HeadBlock) Snapshot(snapshotDir string) error {
|
||||||
|
if h.meta.Stats.NumSeries == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
|
dir := filepath.Join(snapshotDir, uid.String())
|
||||||
|
tmp := dir + ".tmp"
|
||||||
|
|
||||||
|
if err := os.RemoveAll(tmp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.MkdirAll(tmp, 0777); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate chunk and index files into temporary directory with
|
||||||
|
// data of all blocks.
|
||||||
|
chunkw, err := newChunkWriter(chunkDir(tmp))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "open chunk writer")
|
||||||
|
}
|
||||||
|
indexw, err := newIndexWriter(tmp)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "open index writer")
|
||||||
|
}
|
||||||
|
|
||||||
|
meta, err := populateBlock([]Block{h}, indexw, chunkw)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "write snapshot")
|
||||||
|
}
|
||||||
|
meta.ULID = uid
|
||||||
|
|
||||||
|
if err = writeMetaFile(tmp, meta); err != nil {
|
||||||
|
return errors.Wrap(err, "write merged meta")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = chunkw.Close(); err != nil {
|
||||||
|
return errors.Wrap(err, "close chunk writer")
|
||||||
|
}
|
||||||
|
if err = indexw.Close(); err != nil {
|
||||||
|
return errors.Wrap(err, "close index writer")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an empty tombstones file.
|
||||||
|
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
||||||
|
return errors.Wrap(err, "write new tombstones file")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block successfully written, make visible
|
||||||
|
if err := renameFile(tmp, dir); err != nil {
|
||||||
|
return errors.Wrap(err, "rename block dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Dir returns the directory of the block.
|
// Dir returns the directory of the block.
|
||||||
func (h *HeadBlock) Dir() string { return h.dir }
|
func (h *HeadBlock) Dir() string { return h.dir }
|
||||||
|
|
||||||
|
@ -221,6 +352,8 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
index: h.Index(),
|
index: h.Index(),
|
||||||
chunks: h.Chunks(),
|
chunks: h.Chunks(),
|
||||||
|
tombstones: h.Tombstones(),
|
||||||
|
|
||||||
postingsMapper: func(p Postings) Postings {
|
postingsMapper: func(p Postings) Postings {
|
||||||
ep := make([]uint32, 0, 64)
|
ep := make([]uint32, 0, 64)
|
||||||
|
|
||||||
|
@ -388,15 +521,17 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) createSeries() {
|
func (a *headAppender) createSeries() error {
|
||||||
if len(a.newSeries) == 0 {
|
if len(a.newSeries) == 0 {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
a.newLabels = make([]labels.Labels, 0, len(a.newSeries))
|
a.newLabels = make([]labels.Labels, 0, len(a.newSeries))
|
||||||
base0 := len(a.series)
|
base0 := len(a.series)
|
||||||
|
|
||||||
a.mtx.RUnlock()
|
a.mtx.RUnlock()
|
||||||
|
defer a.mtx.RLock()
|
||||||
a.mtx.Lock()
|
a.mtx.Lock()
|
||||||
|
defer a.mtx.Unlock()
|
||||||
|
|
||||||
base1 := len(a.series)
|
base1 := len(a.series)
|
||||||
|
|
||||||
|
@ -416,15 +551,22 @@ func (a *headAppender) createSeries() {
|
||||||
a.create(l.hash, l.labels)
|
a.create(l.hash, l.labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
a.mtx.Unlock()
|
// Write all new series to the WAL.
|
||||||
a.mtx.RLock()
|
if err := a.wal.LogSeries(a.newLabels); err != nil {
|
||||||
|
return errors.Wrap(err, "WAL log series")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) Commit() error {
|
func (a *headAppender) Commit() error {
|
||||||
defer atomic.AddUint64(&a.activeWriters, ^uint64(0))
|
defer atomic.AddUint64(&a.activeWriters, ^uint64(0))
|
||||||
defer putHeadAppendBuffer(a.samples)
|
defer putHeadAppendBuffer(a.samples)
|
||||||
|
defer a.mtx.RUnlock()
|
||||||
|
|
||||||
a.createSeries()
|
if err := a.createSeries(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// We have to update the refs of samples for series we just created.
|
// We have to update the refs of samples for series we just created.
|
||||||
for i := range a.samples {
|
for i := range a.samples {
|
||||||
|
@ -434,11 +576,10 @@ func (a *headAppender) Commit() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write all new series and samples to the WAL and add it to the
|
// Write all new samples to the WAL and add them to the
|
||||||
// in-mem database on success.
|
// in-mem database on success.
|
||||||
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
|
if err := a.wal.LogSamples(a.samples); err != nil {
|
||||||
a.mtx.RUnlock()
|
return errors.Wrap(err, "WAL log samples")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
total := uint64(len(a.samples))
|
total := uint64(len(a.samples))
|
||||||
|
@ -449,8 +590,6 @@ func (a *headAppender) Commit() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
a.mtx.RUnlock()
|
|
||||||
|
|
||||||
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
|
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
|
||||||
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))
|
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))
|
||||||
|
|
||||||
|
@ -538,6 +677,7 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
|
||||||
if int(ref) >= len(h.series) {
|
if int(ref) >= len(h.series) {
|
||||||
return nil, nil, ErrNotFound
|
return nil, nil, ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
s := h.series[ref]
|
s := h.series[ref]
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return nil, nil, ErrNotFound
|
return nil, nil, ErrNotFound
|
||||||
|
@ -584,12 +724,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
s := &memSeries{
|
s := newMemSeries(lset, uint32(len(h.series)), h.meta.MaxTime)
|
||||||
lset: lset,
|
|
||||||
ref: uint32(len(h.series)),
|
|
||||||
}
|
|
||||||
// create the initial chunk and appender
|
|
||||||
s.cut()
|
|
||||||
|
|
||||||
// Allocate empty space until we can insert at the given index.
|
// Allocate empty space until we can insert at the given index.
|
||||||
h.series = append(h.series, s)
|
h.series = append(h.series, s)
|
||||||
|
@ -624,15 +759,18 @@ type memSeries struct {
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chunks []*memChunk
|
chunks []*memChunk
|
||||||
|
|
||||||
|
nextAt int64 // timestamp at which to cut the next chunk.
|
||||||
|
maxt int64 // maximum timestamp for the series.
|
||||||
lastValue float64
|
lastValue float64
|
||||||
sampleBuf [4]sample
|
sampleBuf [4]sample
|
||||||
|
|
||||||
app chunks.Appender // Current appender for the chunk.
|
app chunks.Appender // Current appender for the chunk.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memSeries) cut() *memChunk {
|
func (s *memSeries) cut(mint int64) *memChunk {
|
||||||
c := &memChunk{
|
c := &memChunk{
|
||||||
chunk: chunks.NewXORChunk(),
|
chunk: chunks.NewXORChunk(),
|
||||||
|
minTime: mint,
|
||||||
maxTime: math.MinInt64,
|
maxTime: math.MinInt64,
|
||||||
}
|
}
|
||||||
s.chunks = append(s.chunks, c)
|
s.chunks = append(s.chunks, c)
|
||||||
|
@ -641,32 +779,47 @@ func (s *memSeries) cut() *memChunk {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.app = app
|
s.app = app
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newMemSeries(lset labels.Labels, id uint32, maxt int64) *memSeries {
|
||||||
|
s := &memSeries{
|
||||||
|
lset: lset,
|
||||||
|
ref: id,
|
||||||
|
maxt: maxt,
|
||||||
|
nextAt: math.MinInt64,
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
func (s *memSeries) append(t int64, v float64) bool {
|
func (s *memSeries) append(t int64, v float64) bool {
|
||||||
|
const samplesPerChunk = 120
|
||||||
|
|
||||||
s.mtx.Lock()
|
s.mtx.Lock()
|
||||||
defer s.mtx.Unlock()
|
defer s.mtx.Unlock()
|
||||||
|
|
||||||
var c *memChunk
|
var c *memChunk
|
||||||
|
|
||||||
if s.head().samples > 130 {
|
if len(s.chunks) == 0 {
|
||||||
c = s.cut()
|
c = s.cut(t)
|
||||||
c.minTime = t
|
}
|
||||||
} else {
|
|
||||||
c = s.head()
|
c = s.head()
|
||||||
// Skip duplicate and out of order samples.
|
|
||||||
if c.maxTime >= t {
|
if c.maxTime >= t {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
if c.samples > samplesPerChunk/4 && t >= s.nextAt {
|
||||||
|
c = s.cut(t)
|
||||||
}
|
}
|
||||||
s.app.Append(t, v)
|
s.app.Append(t, v)
|
||||||
|
|
||||||
c.maxTime = t
|
c.maxTime = t
|
||||||
c.samples++
|
c.samples++
|
||||||
|
|
||||||
|
if c.samples == samplesPerChunk/4 {
|
||||||
|
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.maxt)
|
||||||
|
}
|
||||||
|
|
||||||
s.lastValue = v
|
s.lastValue = v
|
||||||
|
|
||||||
s.sampleBuf[0] = s.sampleBuf[1]
|
s.sampleBuf[0] = s.sampleBuf[1]
|
||||||
|
@ -677,6 +830,17 @@ func (s *memSeries) append(t int64, v float64) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
|
||||||
|
// its current timestamp and the upper bound up to which we insert data.
|
||||||
|
// It assumes that the time range is 1/4 full.
|
||||||
|
func computeChunkEndTime(start, cur, max int64) int64 {
|
||||||
|
a := (max - start) / ((cur - start + 1) * 4)
|
||||||
|
if a == 0 {
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
return start + (max-start)/a
|
||||||
|
}
|
||||||
|
|
||||||
func (s *memSeries) iterator(i int) chunks.Iterator {
|
func (s *memSeries) iterator(i int) chunks.Iterator {
|
||||||
c := s.chunks[i]
|
c := s.chunks[i]
|
||||||
|
|
||||||
|
|
10
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
10
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
|
@ -39,6 +39,8 @@ const (
|
||||||
indexFormatV1 = 1
|
indexFormatV1 = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const indexFilename = "index"
|
||||||
|
|
||||||
const compactionPageBytes = minSectorSize * 64
|
const compactionPageBytes = minSectorSize * 64
|
||||||
|
|
||||||
type indexWriterSeries struct {
|
type indexWriterSeries struct {
|
||||||
|
@ -138,7 +140,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666)
|
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -569,11 +571,7 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
return nil, errors.Wrap(err, "read label index table")
|
return nil, errors.Wrap(err, "read label index table")
|
||||||
}
|
}
|
||||||
r.postings, err = r.readOffsetTable(r.toc.postingsTable)
|
r.postings, err = r.readOffsetTable(r.toc.postingsTable)
|
||||||
if err != nil {
|
return r, errors.Wrap(err, "read postings table")
|
||||||
return nil, errors.Wrap(err, "read postings table")
|
|
||||||
}
|
|
||||||
|
|
||||||
return r, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) readTOC() error {
|
func (r *indexReader) readTOC() error {
|
||||||
|
|
69
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
69
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
|
@ -128,6 +128,7 @@ func (q *querier) Close() error {
|
||||||
type blockQuerier struct {
|
type blockQuerier struct {
|
||||||
index IndexReader
|
index IndexReader
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
|
tombstones TombstoneReader
|
||||||
|
|
||||||
postingsMapper func(Postings) Postings
|
postingsMapper func(Postings) Postings
|
||||||
|
|
||||||
|
@ -149,6 +150,8 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
||||||
p: p,
|
p: p,
|
||||||
index: q.index,
|
index: q.index,
|
||||||
absent: absent,
|
absent: absent,
|
||||||
|
|
||||||
|
tombstones: q.tombstones,
|
||||||
},
|
},
|
||||||
chunks: q.chunks,
|
chunks: q.chunks,
|
||||||
mint: q.mint,
|
mint: q.mint,
|
||||||
|
@ -366,7 +369,7 @@ func (s *mergedSeriesSet) Next() bool {
|
||||||
|
|
||||||
type chunkSeriesSet interface {
|
type chunkSeriesSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []*ChunkMeta)
|
At() (labels.Labels, []*ChunkMeta, intervals)
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,20 +378,26 @@ type chunkSeriesSet interface {
|
||||||
type baseChunkSeries struct {
|
type baseChunkSeries struct {
|
||||||
p Postings
|
p Postings
|
||||||
index IndexReader
|
index IndexReader
|
||||||
|
tombstones TombstoneReader
|
||||||
absent []string // labels that must be unset in results.
|
absent []string // labels that must be unset in results.
|
||||||
|
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chks []*ChunkMeta
|
chks []*ChunkMeta
|
||||||
|
intervals intervals
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
|
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||||
|
return s.lset, s.chks, s.intervals
|
||||||
|
}
|
||||||
|
|
||||||
func (s *baseChunkSeries) Err() error { return s.err }
|
func (s *baseChunkSeries) Err() error { return s.err }
|
||||||
|
|
||||||
func (s *baseChunkSeries) Next() bool {
|
func (s *baseChunkSeries) Next() bool {
|
||||||
Outer:
|
Outer:
|
||||||
for s.p.Next() {
|
for s.p.Next() {
|
||||||
lset, chunks, err := s.index.Series(s.p.At())
|
ref := s.p.At()
|
||||||
|
lset, chunks, err := s.index.Series(ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.err = err
|
s.err = err
|
||||||
return false
|
return false
|
||||||
|
@ -403,6 +412,19 @@ Outer:
|
||||||
|
|
||||||
s.lset = lset
|
s.lset = lset
|
||||||
s.chks = chunks
|
s.chks = chunks
|
||||||
|
s.intervals = s.tombstones.Get(s.p.At())
|
||||||
|
|
||||||
|
if len(s.intervals) > 0 {
|
||||||
|
// Only those chunks that are not entirely deleted.
|
||||||
|
chks := make([]*ChunkMeta, 0, len(s.chks))
|
||||||
|
for _, chk := range s.chks {
|
||||||
|
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
|
||||||
|
chks = append(chks, chk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.chks = chks
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -423,14 +445,17 @@ type populatedChunkSeries struct {
|
||||||
err error
|
err error
|
||||||
chks []*ChunkMeta
|
chks []*ChunkMeta
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
|
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||||
|
return s.lset, s.chks, s.intervals
|
||||||
|
}
|
||||||
func (s *populatedChunkSeries) Err() error { return s.err }
|
func (s *populatedChunkSeries) Err() error { return s.err }
|
||||||
|
|
||||||
func (s *populatedChunkSeries) Next() bool {
|
func (s *populatedChunkSeries) Next() bool {
|
||||||
for s.set.Next() {
|
for s.set.Next() {
|
||||||
lset, chks := s.set.At()
|
lset, chks, dranges := s.set.At()
|
||||||
|
|
||||||
for len(chks) > 0 {
|
for len(chks) > 0 {
|
||||||
if chks[0].MaxTime >= s.mint {
|
if chks[0].MaxTime >= s.mint {
|
||||||
|
@ -457,6 +482,7 @@ func (s *populatedChunkSeries) Next() bool {
|
||||||
|
|
||||||
s.lset = lset
|
s.lset = lset
|
||||||
s.chks = chks
|
s.chks = chks
|
||||||
|
s.intervals = dranges
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -477,8 +503,15 @@ type blockSeriesSet struct {
|
||||||
|
|
||||||
func (s *blockSeriesSet) Next() bool {
|
func (s *blockSeriesSet) Next() bool {
|
||||||
for s.set.Next() {
|
for s.set.Next() {
|
||||||
lset, chunks := s.set.At()
|
lset, chunks, dranges := s.set.At()
|
||||||
s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt}
|
s.cur = &chunkSeries{
|
||||||
|
labels: lset,
|
||||||
|
chunks: chunks,
|
||||||
|
mint: s.mint,
|
||||||
|
maxt: s.maxt,
|
||||||
|
|
||||||
|
intervals: dranges,
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if s.set.Err() != nil {
|
if s.set.Err() != nil {
|
||||||
|
@ -497,6 +530,8 @@ type chunkSeries struct {
|
||||||
chunks []*ChunkMeta // in-order chunk refs
|
chunks []*ChunkMeta // in-order chunk refs
|
||||||
|
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
||||||
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *chunkSeries) Labels() labels.Labels {
|
func (s *chunkSeries) Labels() labels.Labels {
|
||||||
|
@ -504,7 +539,7 @@ func (s *chunkSeries) Labels() labels.Labels {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *chunkSeries) Iterator() SeriesIterator {
|
func (s *chunkSeries) Iterator() SeriesIterator {
|
||||||
return newChunkSeriesIterator(s.chunks, s.mint, s.maxt)
|
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesIterator iterates over the data of a time series.
|
// SeriesIterator iterates over the data of a time series.
|
||||||
|
@ -601,16 +636,24 @@ type chunkSeriesIterator struct {
|
||||||
cur chunks.Iterator
|
cur chunks.Iterator
|
||||||
|
|
||||||
maxt, mint int64
|
maxt, mint int64
|
||||||
|
|
||||||
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator {
|
func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator {
|
||||||
|
it := cs[0].Chunk.Iterator()
|
||||||
|
if len(dranges) > 0 {
|
||||||
|
it = &deletedIterator{it: it, intervals: dranges}
|
||||||
|
}
|
||||||
return &chunkSeriesIterator{
|
return &chunkSeriesIterator{
|
||||||
chunks: cs,
|
chunks: cs,
|
||||||
i: 0,
|
i: 0,
|
||||||
cur: cs[0].Chunk.Iterator(),
|
cur: it,
|
||||||
|
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
|
|
||||||
|
intervals: dranges,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -645,6 +688,9 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
||||||
|
|
||||||
it.i = x
|
it.i = x
|
||||||
it.cur = it.chunks[x].Chunk.Iterator()
|
it.cur = it.chunks[x].Chunk.Iterator()
|
||||||
|
if len(it.intervals) > 0 {
|
||||||
|
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
|
||||||
|
}
|
||||||
|
|
||||||
for it.cur.Next() {
|
for it.cur.Next() {
|
||||||
t0, _ := it.cur.At()
|
t0, _ := it.cur.At()
|
||||||
|
@ -676,6 +722,9 @@ func (it *chunkSeriesIterator) Next() bool {
|
||||||
|
|
||||||
it.i++
|
it.i++
|
||||||
it.cur = it.chunks[it.i].Chunk.Iterator()
|
it.cur = it.chunks[it.i].Chunk.Iterator()
|
||||||
|
if len(it.intervals) > 0 {
|
||||||
|
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
|
||||||
|
}
|
||||||
|
|
||||||
return it.Next()
|
return it.Next()
|
||||||
}
|
}
|
||||||
|
|
223
vendor/github.com/prometheus/tsdb/tombstones.go
generated
vendored
Normal file
223
vendor/github.com/prometheus/tsdb/tombstones.go
generated
vendored
Normal file
|
@ -0,0 +1,223 @@
|
||||||
|
// Copyright 2017 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"hash/crc32"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const tombstoneFilename = "tombstones"
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MagicTombstone is 4 bytes at the head of a tombstone file.
|
||||||
|
MagicTombstone = 0x130BA30
|
||||||
|
|
||||||
|
tombstoneFormatV1 = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
||||||
|
path := filepath.Join(dir, tombstoneFilename)
|
||||||
|
tmp := path + ".tmp"
|
||||||
|
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||||
|
|
||||||
|
f, err := os.Create(tmp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)}
|
||||||
|
buf.reset()
|
||||||
|
// Write the meta.
|
||||||
|
buf.putBE32(MagicTombstone)
|
||||||
|
buf.putByte(tombstoneFormatV1)
|
||||||
|
_, err = f.Write(buf.get())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mw := io.MultiWriter(f, hash)
|
||||||
|
|
||||||
|
for k, v := range tr {
|
||||||
|
for _, itv := range v {
|
||||||
|
buf.reset()
|
||||||
|
buf.putUvarint32(k)
|
||||||
|
buf.putVarint64(itv.mint)
|
||||||
|
buf.putVarint64(itv.maxt)
|
||||||
|
|
||||||
|
_, err = mw.Write(buf.get())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = f.Write(hash.Sum(nil))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return renameFile(tmp, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stone holds the information on the posting and time-range
|
||||||
|
// that is deleted.
|
||||||
|
type Stone struct {
|
||||||
|
ref uint32
|
||||||
|
intervals intervals
|
||||||
|
}
|
||||||
|
|
||||||
|
// TombstoneReader is the iterator over tombstones.
|
||||||
|
type TombstoneReader interface {
|
||||||
|
Get(ref uint32) intervals
|
||||||
|
}
|
||||||
|
|
||||||
|
func readTombstones(dir string) (tombstoneReader, error) {
|
||||||
|
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(b) < 5 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "tombstones header")
|
||||||
|
}
|
||||||
|
|
||||||
|
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
|
||||||
|
if mg := d.be32(); mg != MagicTombstone {
|
||||||
|
return nil, fmt.Errorf("invalid magic number %x", mg)
|
||||||
|
}
|
||||||
|
if flag := d.byte(); flag != tombstoneFormatV1 {
|
||||||
|
return nil, fmt.Errorf("invalid tombstone format %x", flag)
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.err() != nil {
|
||||||
|
return nil, d.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify checksum
|
||||||
|
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||||
|
if _, err := hash.Write(d.get()); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "write to hash")
|
||||||
|
}
|
||||||
|
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
|
||||||
|
return nil, errors.New("checksum did not match")
|
||||||
|
}
|
||||||
|
|
||||||
|
stonesMap := newEmptyTombstoneReader()
|
||||||
|
for d.len() > 0 {
|
||||||
|
k := d.uvarint32()
|
||||||
|
mint := d.varint64()
|
||||||
|
maxt := d.varint64()
|
||||||
|
if d.err() != nil {
|
||||||
|
return nil, d.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
stonesMap.add(k, interval{mint, maxt})
|
||||||
|
}
|
||||||
|
|
||||||
|
return newTombstoneReader(stonesMap), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type tombstoneReader map[uint32]intervals
|
||||||
|
|
||||||
|
func newTombstoneReader(ts map[uint32]intervals) tombstoneReader {
|
||||||
|
return tombstoneReader(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEmptyTombstoneReader() tombstoneReader {
|
||||||
|
return tombstoneReader(make(map[uint32]intervals))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t tombstoneReader) Get(ref uint32) intervals {
|
||||||
|
return t[ref]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t tombstoneReader) add(ref uint32, itv interval) {
|
||||||
|
t[ref] = t[ref].add(itv)
|
||||||
|
}
|
||||||
|
|
||||||
|
type interval struct {
|
||||||
|
mint, maxt int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr interval) inBounds(t int64) bool {
|
||||||
|
return t >= tr.mint && t <= tr.maxt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr interval) isSubrange(dranges intervals) bool {
|
||||||
|
for _, r := range dranges {
|
||||||
|
if r.inBounds(tr.mint) && r.inBounds(tr.maxt) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type intervals []interval
|
||||||
|
|
||||||
|
// This adds the new time-range to the existing ones.
|
||||||
|
// The existing ones must be sorted.
|
||||||
|
func (itvs intervals) add(n interval) intervals {
|
||||||
|
for i, r := range itvs {
|
||||||
|
// TODO(gouthamve): Make this codepath easier to digest.
|
||||||
|
if r.inBounds(n.mint-1) || r.inBounds(n.mint) {
|
||||||
|
if n.maxt > r.maxt {
|
||||||
|
itvs[i].maxt = n.maxt
|
||||||
|
}
|
||||||
|
|
||||||
|
j := 0
|
||||||
|
for _, r2 := range itvs[i+1:] {
|
||||||
|
if n.maxt < r2.mint {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
if j != 0 {
|
||||||
|
if itvs[i+j].maxt > n.maxt {
|
||||||
|
itvs[i].maxt = itvs[i+j].maxt
|
||||||
|
}
|
||||||
|
itvs = append(itvs[:i+1], itvs[i+j+1:]...)
|
||||||
|
}
|
||||||
|
return itvs
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.inBounds(n.maxt+1) || r.inBounds(n.maxt) {
|
||||||
|
if n.mint < r.maxt {
|
||||||
|
itvs[i].mint = n.mint
|
||||||
|
}
|
||||||
|
return itvs
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.mint < r.mint {
|
||||||
|
newRange := make(intervals, i, len(itvs[:i])+1)
|
||||||
|
copy(newRange, itvs[:i])
|
||||||
|
newRange = append(newRange, n)
|
||||||
|
newRange = append(newRange, itvs[i:]...)
|
||||||
|
|
||||||
|
return newRange
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
itvs = append(itvs, n)
|
||||||
|
return itvs
|
||||||
|
}
|
186
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
186
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
|
@ -46,8 +46,18 @@ const (
|
||||||
WALEntrySymbols WALEntryType = 1
|
WALEntrySymbols WALEntryType = 1
|
||||||
WALEntrySeries WALEntryType = 2
|
WALEntrySeries WALEntryType = 2
|
||||||
WALEntrySamples WALEntryType = 3
|
WALEntrySamples WALEntryType = 3
|
||||||
|
WALEntryDeletes WALEntryType = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SamplesCB is the callback after reading samples.
|
||||||
|
type SamplesCB func([]RefSample) error
|
||||||
|
|
||||||
|
// SeriesCB is the callback after reading series.
|
||||||
|
type SeriesCB func([]labels.Labels) error
|
||||||
|
|
||||||
|
// DeletesCB is the callback after reading deletes.
|
||||||
|
type DeletesCB func([]Stone) error
|
||||||
|
|
||||||
// SegmentWAL is a write ahead log for series data.
|
// SegmentWAL is a write ahead log for series data.
|
||||||
type SegmentWAL struct {
|
type SegmentWAL struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
@ -71,15 +81,15 @@ type SegmentWAL struct {
|
||||||
// It must be completely read before new entries are logged.
|
// It must be completely read before new entries are logged.
|
||||||
type WAL interface {
|
type WAL interface {
|
||||||
Reader() WALReader
|
Reader() WALReader
|
||||||
Log([]labels.Labels, []RefSample) error
|
LogSeries([]labels.Labels) error
|
||||||
|
LogSamples([]RefSample) error
|
||||||
|
LogDeletes([]Stone) error
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// WALReader reads entries from a WAL.
|
// WALReader reads entries from a WAL.
|
||||||
type WALReader interface {
|
type WALReader interface {
|
||||||
At() ([]labels.Labels, []RefSample)
|
Read(SeriesCB, SamplesCB, DeletesCB) error
|
||||||
Next() bool
|
|
||||||
Err() error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||||
|
@ -141,13 +151,40 @@ func (w *SegmentWAL) Reader() WALReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log writes a batch of new series labels and samples to the log.
|
// Log writes a batch of new series labels and samples to the log.
|
||||||
func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
|
//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
|
||||||
|
//return nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// LogSeries writes a batch of new series labels to the log.
|
||||||
|
func (w *SegmentWAL) LogSeries(series []labels.Labels) error {
|
||||||
if err := w.encodeSeries(series); err != nil {
|
if err := w.encodeSeries(series); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if w.flushInterval <= 0 {
|
||||||
|
return w.Sync()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogSamples writes a batch of new samples to the log.
|
||||||
|
func (w *SegmentWAL) LogSamples(samples []RefSample) error {
|
||||||
if err := w.encodeSamples(samples); err != nil {
|
if err := w.encodeSamples(samples); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if w.flushInterval <= 0 {
|
||||||
|
return w.Sync()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogDeletes write a batch of new deletes to the log.
|
||||||
|
func (w *SegmentWAL) LogDeletes(stones []Stone) error {
|
||||||
|
if err := w.encodeDeletes(stones); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if w.flushInterval <= 0 {
|
if w.flushInterval <= 0 {
|
||||||
return w.Sync()
|
return w.Sync()
|
||||||
}
|
}
|
||||||
|
@ -369,6 +406,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
||||||
const (
|
const (
|
||||||
walSeriesSimple = 1
|
walSeriesSimple = 1
|
||||||
walSamplesSimple = 1
|
walSamplesSimple = 1
|
||||||
|
walDeletesSimple = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
var walBuffers = sync.Pool{}
|
var walBuffers = sync.Pool{}
|
||||||
|
@ -445,6 +483,23 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
|
||||||
return w.entry(WALEntrySamples, walSamplesSimple, buf)
|
return w.entry(WALEntrySamples, walSamplesSimple, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *SegmentWAL) encodeDeletes(stones []Stone) error {
|
||||||
|
b := make([]byte, 2*binary.MaxVarintLen64)
|
||||||
|
eb := &encbuf{b: b}
|
||||||
|
buf := getWALBuffer()
|
||||||
|
for _, s := range stones {
|
||||||
|
for _, itv := range s.intervals {
|
||||||
|
eb.reset()
|
||||||
|
eb.putUvarint32(s.ref)
|
||||||
|
eb.putVarint64(itv.mint)
|
||||||
|
eb.putVarint64(itv.maxt)
|
||||||
|
buf = append(buf, eb.get()...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.entry(WALEntryDeletes, walDeletesSimple, buf)
|
||||||
|
}
|
||||||
|
|
||||||
// walReader decodes and emits write ahead log entries.
|
// walReader decodes and emits write ahead log entries.
|
||||||
type walReader struct {
|
type walReader struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
@ -454,9 +509,11 @@ type walReader struct {
|
||||||
buf []byte
|
buf []byte
|
||||||
crc32 hash.Hash32
|
crc32 hash.Hash32
|
||||||
|
|
||||||
|
curType WALEntryType
|
||||||
|
curFlag byte
|
||||||
|
curBuf []byte
|
||||||
|
|
||||||
err error
|
err error
|
||||||
labels []labels.Labels
|
|
||||||
samples []RefSample
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
|
func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
|
||||||
|
@ -471,18 +528,41 @@ func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// At returns the last decoded entry of labels or samples.
|
|
||||||
// The returned slices are only valid until the next call to Next(). Their elements
|
|
||||||
// have to be copied to preserve them.
|
|
||||||
func (r *walReader) At() ([]labels.Labels, []RefSample) {
|
|
||||||
return r.labels, r.samples
|
|
||||||
}
|
|
||||||
|
|
||||||
// Err returns the last error the reader encountered.
|
// Err returns the last error the reader encountered.
|
||||||
func (r *walReader) Err() error {
|
func (r *walReader) Err() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error {
|
||||||
|
for r.next() {
|
||||||
|
et, flag, b := r.at()
|
||||||
|
// In decoding below we never return a walCorruptionErr for now.
|
||||||
|
// Those should generally be catched by entry decoding before.
|
||||||
|
switch et {
|
||||||
|
case WALEntrySeries:
|
||||||
|
s, err := r.decodeSeries(flag, b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
seriesf(s)
|
||||||
|
case WALEntrySamples:
|
||||||
|
s, err := r.decodeSamples(flag, b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
samplesf(s)
|
||||||
|
case WALEntryDeletes:
|
||||||
|
s, err := r.decodeDeletes(flag, b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
deletesf(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
||||||
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
||||||
if r.cur >= len(r.wal.files) {
|
if r.cur >= len(r.wal.files) {
|
||||||
|
@ -505,12 +585,13 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
||||||
return et, flag, b, err
|
return et, flag, b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next returns decodes the next entry pair and returns true
|
func (r *walReader) at() (WALEntryType, byte, []byte) {
|
||||||
// if it was succesful.
|
return r.curType, r.curFlag, r.curBuf
|
||||||
func (r *walReader) Next() bool {
|
}
|
||||||
r.labels = r.labels[:0]
|
|
||||||
r.samples = r.samples[:0]
|
|
||||||
|
|
||||||
|
// next returns decodes the next entry pair and returns true
|
||||||
|
// if it was succesful.
|
||||||
|
func (r *walReader) next() bool {
|
||||||
if r.cur >= len(r.wal.files) {
|
if r.cur >= len(r.wal.files) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -537,7 +618,7 @@ func (r *walReader) Next() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
r.cur++
|
r.cur++
|
||||||
return r.Next()
|
return r.next()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.err = err
|
r.err = err
|
||||||
|
@ -548,19 +629,9 @@ func (r *walReader) Next() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// In decoding below we never return a walCorruptionErr for now.
|
r.curType = et
|
||||||
// Those should generally be catched by entry decoding before.
|
r.curFlag = flag
|
||||||
|
r.curBuf = b
|
||||||
switch et {
|
|
||||||
case WALEntrySamples:
|
|
||||||
if err := r.decodeSamples(flag, b); err != nil {
|
|
||||||
r.err = err
|
|
||||||
}
|
|
||||||
case WALEntrySeries:
|
|
||||||
if err := r.decodeSeries(flag, b); err != nil {
|
|
||||||
r.err = err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return r.err == nil
|
return r.err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -617,7 +688,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
||||||
if etype == 0 {
|
if etype == 0 {
|
||||||
return 0, 0, nil, io.EOF
|
return 0, 0, nil, io.EOF
|
||||||
}
|
}
|
||||||
if etype != WALEntrySeries && etype != WALEntrySamples {
|
if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes {
|
||||||
return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype)
|
return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -644,11 +715,12 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
||||||
return etype, flag, buf, nil
|
return etype, flag, buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeSeries(flag byte, b []byte) error {
|
func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) {
|
||||||
|
series := []labels.Labels{}
|
||||||
for len(b) > 0 {
|
for len(b) > 0 {
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return errors.Wrap(errInvalidSize, "number of labels")
|
return nil, errors.Wrap(errInvalidSize, "number of labels")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
lset := make(labels.Labels, l)
|
lset := make(labels.Labels, l)
|
||||||
|
@ -656,27 +728,29 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error {
|
||||||
for i := 0; i < int(l); i++ {
|
for i := 0; i < int(l); i++ {
|
||||||
nl, n := binary.Uvarint(b)
|
nl, n := binary.Uvarint(b)
|
||||||
if n < 1 || len(b) < n+int(nl) {
|
if n < 1 || len(b) < n+int(nl) {
|
||||||
return errors.Wrap(errInvalidSize, "label name")
|
return nil, errors.Wrap(errInvalidSize, "label name")
|
||||||
}
|
}
|
||||||
lset[i].Name = string(b[n : n+int(nl)])
|
lset[i].Name = string(b[n : n+int(nl)])
|
||||||
b = b[n+int(nl):]
|
b = b[n+int(nl):]
|
||||||
|
|
||||||
vl, n := binary.Uvarint(b)
|
vl, n := binary.Uvarint(b)
|
||||||
if n < 1 || len(b) < n+int(vl) {
|
if n < 1 || len(b) < n+int(vl) {
|
||||||
return errors.Wrap(errInvalidSize, "label value")
|
return nil, errors.Wrap(errInvalidSize, "label value")
|
||||||
}
|
}
|
||||||
lset[i].Value = string(b[n : n+int(vl)])
|
lset[i].Value = string(b[n : n+int(vl)])
|
||||||
b = b[n+int(vl):]
|
b = b[n+int(vl):]
|
||||||
}
|
}
|
||||||
|
|
||||||
r.labels = append(r.labels, lset)
|
series = append(series, lset)
|
||||||
}
|
}
|
||||||
return nil
|
return series, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
||||||
|
samples := []RefSample{}
|
||||||
|
|
||||||
if len(b) < 16 {
|
if len(b) < 16 {
|
||||||
return errors.Wrap(errInvalidSize, "header length")
|
return nil, errors.Wrap(errInvalidSize, "header length")
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
baseRef = binary.BigEndian.Uint64(b)
|
baseRef = binary.BigEndian.Uint64(b)
|
||||||
|
@ -689,7 +763,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
||||||
|
|
||||||
dref, n := binary.Varint(b)
|
dref, n := binary.Varint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return errors.Wrap(errInvalidSize, "sample ref delta")
|
return nil, errors.Wrap(errInvalidSize, "sample ref delta")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
|
@ -697,18 +771,36 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
||||||
|
|
||||||
dtime, n := binary.Varint(b)
|
dtime, n := binary.Varint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return errors.Wrap(errInvalidSize, "sample timestamp delta")
|
return nil, errors.Wrap(errInvalidSize, "sample timestamp delta")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
smpl.T = baseTime + dtime
|
smpl.T = baseTime + dtime
|
||||||
|
|
||||||
if len(b) < 8 {
|
if len(b) < 8 {
|
||||||
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
return nil, errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
||||||
}
|
}
|
||||||
smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
|
smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
|
||||||
b = b[8:]
|
b = b[8:]
|
||||||
|
|
||||||
r.samples = append(r.samples, smpl)
|
samples = append(samples, smpl)
|
||||||
}
|
}
|
||||||
return nil
|
return samples, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
||||||
|
db := &decbuf{b: b}
|
||||||
|
stones := []Stone{}
|
||||||
|
|
||||||
|
for db.len() > 0 {
|
||||||
|
var s Stone
|
||||||
|
s.ref = db.uvarint32()
|
||||||
|
s.intervals = intervals{{db.varint64(), db.varint64()}}
|
||||||
|
if db.err() != nil {
|
||||||
|
return nil, db.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
stones = append(stones, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return stones, nil
|
||||||
}
|
}
|
||||||
|
|
6
vendor/vendor.json
vendored
6
vendor/vendor.json
vendored
|
@ -751,10 +751,10 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "q2GxuO+ppV/gqBir/Z6ijx7aOOU=",
|
"checksumSHA1": "XXXDHMZe3Y3gosaF/1staHm3INc=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8",
|
"revision": "9963a4c7c3b2a742e00a63c54084b051e3174b06",
|
||||||
"revisionTime": "2017-05-22T06:49:09Z"
|
"revisionTime": "2017-06-12T09:17:49Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
|
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
|
||||||
|
|
Loading…
Reference in a new issue