Vertical query merging and compaction (#370)

* Vertical series iterator

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Select overlapped blocks first in compactor Plan()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Added vertical compaction

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Code cleanup and comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix tests

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add benchmark for compaction

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Perform vertical compaction only when blocks are overlapping.

Actions for vertical compaction:
* Sorting chunk metas
* Calling chunks.MergeOverlappingChunks on the chunks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Benchmark for vertical compaction

* BenchmarkNormalCompaction => BenchmarkCompaction
* Moved the benchmark from db_test.go to compact_test.go

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Benchmark for query iterator and seek for non overlapping blocks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Vertical query merge only for overlapping blocks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Simplify logging in Compact(...)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Updated CHANGELOG.md

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Calculate overlapping inside populateBlock

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* MinTime and MaxTime for BlockReader.

Using this to find overlapping blocks in populateBlock()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Sort blocks w.r.t. MinTime in reload()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Log about overlapping in LeveledCompactor.write() instead of returning bool

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Log about overlapping inside LeveledCompactor.populateBlock()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Refactor createBlock to take optional []Series

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* review1

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>

* Updated CHANGELOG and minor nits

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* nits

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Updated CHANGELOG

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Refactor iterator and seek benchmarks for Querier.

Also has as overlapping blocks.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Additional test case

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Split genSeries into genSeries and populateSeries

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Check error in benchmark

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Warn about overlapping blocks in reload()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2019-02-14 18:59:41 +05:30 committed by Goutham Veeramachaneni
parent 89ee5aaed4
commit c59ed492b2
13 changed files with 1164 additions and 127 deletions

View file

@ -1,4 +1,8 @@
## master / unreleased
- [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370)
- Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks.
- Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas.
- Added `MinTime` and `MaxTime` method for `BlockReader`.
- [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db.
- [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block.
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`
@ -7,9 +11,9 @@
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- Added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- New public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
@ -24,4 +28,4 @@
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
- [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.

View file

@ -135,6 +135,12 @@ type BlockReader interface {
// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)
// MinTime returns the min time of the block.
MinTime() int64
// MaxTime returns the max time of the block.
MaxTime() int64
}
// Appendable defines an entity to which data can be appended.
@ -363,6 +369,12 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }
// MinTime returns the min time of the meta.
func (pb *Block) MinTime() int64 { return pb.meta.MinTime }
// MaxTime returns the max time of the meta.
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }
// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

View file

@ -101,8 +101,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}
series := make([]Series, totalSeries)
series := make([]Series, totalSeries)
for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
for len(lbls) < labelCount {
@ -114,7 +114,26 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
}
series[i] = newSeries(lbls, samples)
}
return series
}
// populateSeries generates series from given labels, mint and maxt.
func populateSeries(lbls []map[string]string, mint, maxt int64) []Series {
if len(lbls) == 0 {
return nil
}
series := make([]Series, 0, len(lbls))
for _, lbl := range lbls {
if len(lbl) == 0 {
continue
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series = append(series, newSeries(lbl, samples))
}
return series
}

View file

@ -196,6 +196,84 @@ func (w *Writer) write(b []byte) error {
return err
}
// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}
return newChks, nil
}
// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}
func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.

View file

@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"os"
"path/filepath"
@ -80,13 +81,14 @@ type LeveledCompactor struct {
}
type compactorMetrics struct {
ran prometheus.Counter
populatingBlocks prometheus.Gauge
failed prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
ran prometheus.Counter
populatingBlocks prometheus.Gauge
failed prometheus.Counter
overlappingBlocks prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
}
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
@ -104,6 +106,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_vertical_compactions_total",
Help: "Total number of compactions done on overlapping blocks.",
})
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_duration_seconds",
Help: "Duration of compaction runs",
@ -130,6 +136,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m.ran,
m.populatingBlocks,
m.failed,
m.overlappingBlocks,
m.duration,
m.chunkRange,
m.chunkSamples,
@ -147,6 +154,9 @@ func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Log
if pool == nil {
pool = chunkenc.NewPool()
}
if l == nil {
l = log.NewNopLogger()
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
@ -187,11 +197,15 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
res := c.selectOverlappingDirs(dms)
if len(res) > 0 {
return res, nil
}
// No overlapping blocks, do compaction the usual way.
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
dms = dms[:len(dms)-1]
var res []string
for _, dm := range c.selectDirs(dms) {
res = append(res, dm.dir)
}
@ -252,6 +266,28 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
return nil
}
// selectOverlappingDirs returns all dirs with overlaping time ranges.
// It expects sorted input by mint.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if len(ds) < 2 {
return nil
}
var overlappingDirs []string
globalMaxt := ds[0].meta.MaxTime
for i, d := range ds[1:] {
if d.meta.MinTime < globalMaxt {
if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well.
overlappingDirs = append(overlappingDirs, ds[i].dir)
}
overlappingDirs = append(overlappingDirs, d.dir)
}
if d.meta.MaxTime > globalMaxt {
globalMaxt = d.meta.MaxTime
}
}
return overlappingDirs
}
// splitByRange splits the directories by the time range. The range sequence starts at 0.
//
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
@ -299,12 +335,17 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
res := &BlockMeta{
ULID: uid,
MinTime: blocks[0].MinTime,
MaxTime: blocks[len(blocks)-1].MaxTime,
}
sources := map[ulid.ULID]struct{}{}
// For overlapping blocks, the Maxt can be
// in any block so we track it globally.
maxt := int64(math.MinInt64)
for _, b := range blocks {
if b.MaxTime > maxt {
maxt = b.MaxTime
}
if b.Compaction.Level > res.Compaction.Level {
res.Compaction.Level = b.Compaction.Level
}
@ -326,6 +367,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0
})
res.MaxTime = maxt
return res
}
@ -603,15 +645,17 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}
var (
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
overlapping bool
)
defer func() {
var merr MultiError
@ -622,6 +666,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}()
c.metrics.populatingBlocks.Set(1)
globalMaxt := blocks[0].MaxTime()
for i, b := range blocks {
select {
case <-c.ctx.Done():
@ -629,6 +674,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
default:
}
if !overlapping {
if i > 0 && b.MinTime() < globalMaxt {
c.metrics.overlappingBlocks.Inc()
overlapping = true
level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
}
if b.MaxTime() > globalMaxt {
globalMaxt = b.MaxTime()
}
}
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b)
@ -692,6 +748,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
if overlapping {
// If blocks are overlapping, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
return chks[i].MinTime < chks[j].MinTime
})
}
// Skip the series with all deleted chunks.
if len(chks) == 0 {
@ -725,21 +787,28 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
}
if err := chunkw.WriteChunks(chks...); err != nil {
mergedChks := chks
if overlapping {
mergedChks, err = chunks.MergeOverlappingChunks(chks)
if err != nil {
return errors.Wrap(err, "merge overlapping chunks")
}
}
if err := chunkw.WriteChunks(mergedChks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(i, lset, chks...); err != nil {
if err := indexw.AddSeries(i, lset, mergedChks...); err != nil {
return errors.Wrap(err, "add series")
}
meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumChunks += uint64(len(mergedChks))
meta.Stats.NumSeries++
for _, chk := range chks {
for _, chk := range mergedChks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range chks {
for _, chk := range mergedChks {
if err := c.chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}

View file

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
@ -311,13 +312,46 @@ func TestLeveledCompactor_plan(t *testing.T) {
},
expected: []string{"7", "8"},
},
// For overlapping blocks.
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 19, 40, nil),
metaRange("3", 40, 60, nil),
},
expected: []string{"1", "2"},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 30, 50, nil),
},
expected: []string{"2", "3"},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 10, 40, nil),
metaRange("3", 30, 50, nil),
},
expected: []string{"1", "2", "3"},
},
{
metas: []dirMeta{
metaRange("5", 0, 360, nil),
metaRange("6", 340, 560, nil),
metaRange("7", 360, 420, nil),
metaRange("8", 420, 540, nil),
},
expected: []string{"5", "6", "7", "8"},
},
}
for _, c := range cases {
if !t.Run("", func(t *testing.T) {
res, err := compactor.plan(c.metas)
testutil.Ok(t, err)
testutil.Equals(t, c.expected, res)
}) {
return
@ -410,6 +444,8 @@ type erringBReader struct{}
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
func (erringBReader) MinTime() int64 { return 0 }
func (erringBReader) MaxTime() int64 { return 0 }
type nopChunkWriter struct{}
@ -422,9 +458,8 @@ func TestCompaction_populateBlock(t *testing.T) {
inputSeriesSamples [][]seriesSamples
compactMinTime int64
compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64.
expSeriesSamples []seriesSamples
expErr error
expSeriesSamples []seriesSamples
expErr error
}{
{
title: "Populate block from empty input should return error.",
@ -500,16 +535,6 @@ func TestCompaction_populateBlock(t *testing.T) {
{
title: "Populate from two blocks showing that order is maintained.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 21}, {t: 30}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 40}, {t: 45}}},
},
},
{
{
lset: map[string]string{"a": "b"},
@ -520,20 +545,30 @@ func TestCompaction_populateBlock(t *testing.T) {
chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
},
},
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 21}, {t: 30}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 40}, {t: 45}}},
},
},
},
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 21}, {t: 30}}, {{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 40}, {t: 45}}, {{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}},
},
},
},
{
title: "Populate from two blocks showing that order or series is sorted.",
title: "Populate from two blocks showing that order of series is sorted.",
inputSeriesSamples: [][]seriesSamples{
{
{
@ -647,8 +682,8 @@ func TestCompaction_populateBlock(t *testing.T) {
if ok := t.Run(tc.title, func(t *testing.T) {
blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples))
for _, b := range tc.inputSeriesSamples {
ir, cr := createIdxChkReaders(b)
blocks = append(blocks, &mockBReader{ir: ir, cr: cr})
ir, cr, mint, maxt := createIdxChkReaders(b)
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
}
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil)
@ -690,6 +725,78 @@ func TestCompaction_populateBlock(t *testing.T) {
}
}
func BenchmarkCompaction(b *testing.B) {
cases := []struct {
ranges [][2]int64
compactionType string
}{
{
ranges: [][2]int64{{0, 100}, {200, 300}, {400, 500}, {600, 700}},
compactionType: "normal",
},
{
ranges: [][2]int64{{0, 1000}, {2000, 3000}, {4000, 5000}, {6000, 7000}},
compactionType: "normal",
},
{
ranges: [][2]int64{{0, 10000}, {20000, 30000}, {40000, 50000}, {60000, 70000}},
compactionType: "normal",
},
{
ranges: [][2]int64{{0, 100000}, {200000, 300000}, {400000, 500000}, {600000, 700000}},
compactionType: "normal",
},
// 40% overlaps.
{
ranges: [][2]int64{{0, 100}, {60, 160}, {120, 220}, {180, 280}},
compactionType: "vertical",
},
{
ranges: [][2]int64{{0, 1000}, {600, 1600}, {1200, 2200}, {1800, 2800}},
compactionType: "vertical",
},
{
ranges: [][2]int64{{0, 10000}, {6000, 16000}, {12000, 22000}, {18000, 28000}},
compactionType: "vertical",
},
{
ranges: [][2]int64{{0, 100000}, {60000, 160000}, {120000, 220000}, {180000, 280000}},
compactionType: "vertical",
},
}
nSeries := 10000
for _, c := range cases {
nBlocks := len(c.ranges)
b.Run(fmt.Sprintf("type=%s,blocks=%d,series=%d,samplesPerSeriesPerBlock=%d", c.compactionType, nBlocks, nSeries, c.ranges[0][1]-c.ranges[0][0]+1), func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_compaction")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
blockDirs := make([]string, 0, len(c.ranges))
var blocks []*Block
for _, r := range c.ranges {
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil)
testutil.Ok(b, err)
blocks = append(blocks, block)
defer func() {
testutil.Ok(b, block.Close())
}()
blockDirs = append(blockDirs, block.Dir())
}
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil)
testutil.Ok(b, err)
b.ResetTimer()
b.ReportAllocs()
_, err = c.Compact(dir, blockDirs, blocks)
testutil.Ok(b, err)
})
}
}
// TestDisableAutoCompactions checks that we can
// disable and enable the auto compaction.
// This is needed for unit tests that rely on

55
db.go
View file

@ -546,11 +546,8 @@ func (db *DB) reload() (err error) {
db.metrics.blocksBytes.Set(float64(blocksSize))
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
})
if err := validateBlockSequence(loadable); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
@ -558,6 +555,14 @@ func (db *DB) reload() (err error) {
db.blocks = loadable
db.mtx.Unlock()
blockMetas := make([]BlockMeta, 0, len(loadable))
for _, b := range loadable {
blockMetas = append(blockMetas, b.Meta())
}
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(db.logger).Log("msg", "overlapping blocks found during reload", "detail", overlaps.String())
}
for _, b := range oldBlocks {
if _, ok := deletable[b.Meta().ULID]; ok {
deletable[b.Meta().ULID] = b
@ -694,25 +699,6 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
return nil
}
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {
return nil
}
var metas []BlockMeta
for _, b := range bs {
metas = append(metas, b.meta)
}
overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", overlaps)
}
return nil
}
// TimeRange specifies minTime and maxTime range.
type TimeRange struct {
Min, Max int64
@ -909,6 +895,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
// A goroutine must not handle more than one open Querier.
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader
var blockMetas []BlockMeta
db.mtx.RLock()
defer db.mtx.RUnlock()
@ -916,6 +903,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
for _, b := range db.blocks {
if b.OverlapsClosedInterval(mint, maxt) {
blocks = append(blocks, b)
blockMetas = append(blockMetas, b.Meta())
}
}
if maxt >= db.head.MinTime() {
@ -926,22 +914,31 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
})
}
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
}
blockQueriers := make([]Querier, 0, len(blocks))
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err == nil {
sq.blocks = append(sq.blocks, q)
blockQueriers = append(blockQueriers, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range sq.blocks {
for _, q := range blockQueriers {
q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
return sq, nil
if len(OverlappingBlocks(blockMetas)) > 0 {
return &verticalQuerier{
querier: querier{
blocks: blockQueriers,
},
}, nil
}
return &querier{
blocks: blockQueriers,
}, nil
}
func rangeForTimestamp(t int64, width int64) (maxt int64) {

View file

@ -52,16 +52,19 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
}
// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample {
func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]tsdbutil.Sample {
ss, err := q.Select(matchers...)
defer func() {
testutil.Ok(t, q.Close())
}()
testutil.Ok(t, err)
result := map[string][]sample{}
result := map[string][]tsdbutil.Sample{}
for ss.Next() {
series := ss.At()
samples := []sample{}
samples := []tsdbutil.Sample{}
it := series.Iterator()
for it.Next() {
t, v := it.At()
@ -124,9 +127,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
querier, err := db.Querier(0, 1)
testutil.Ok(t, err)
seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar"))
testutil.Equals(t, map[string][]sample{}, seriesSet)
testutil.Ok(t, querier.Close())
testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet)
err = app.Commit()
testutil.Ok(t, err)
@ -137,7 +138,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar"))
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}, seriesSet)
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet)
}
func TestDataNotAvailableAfterRollback(t *testing.T) {
@ -160,7 +161,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar"))
testutil.Equals(t, map[string][]sample{}, seriesSet)
testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet)
}
func TestDBAppenderAddRef(t *testing.T) {
@ -207,17 +208,15 @@ func TestDBAppenderAddRef(t *testing.T) {
res := query(t, q, labels.NewEqualMatcher("a", "b"))
testutil.Equals(t, map[string][]sample{
testutil.Equals(t, map[string][]tsdbutil.Sample{
labels.FromStrings("a", "b").String(): {
{t: 123, v: 0},
{t: 124, v: 1},
{t: 125, v: 0},
{t: 133, v: 1},
{t: 143, v: 2},
sample{t: 123, v: 0},
sample{t: 124, v: 1},
sample{t: 125, v: 0},
sample{t: 133, v: 1},
sample{t: 143, v: 2},
},
}, res)
testutil.Ok(t, q.Close())
}
func TestDeleteSimple(t *testing.T) {
@ -398,12 +397,10 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
ssMap := query(t, q, labels.NewEqualMatcher("a", "b"))
testutil.Equals(t, map[string][]sample{
labels.New(labels.Label{"a", "b"}).String(): {{0, 1}},
testutil.Equals(t, map[string][]tsdbutil.Sample{
labels.New(labels.Label{"a", "b"}).String(): {sample{0, 1}},
}, ssMap)
testutil.Ok(t, q.Close())
// Append Out of Order Value.
app = db.Appender()
_, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3)
@ -417,10 +414,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
ssMap = query(t, q, labels.NewEqualMatcher("a", "b"))
testutil.Equals(t, map[string][]sample{
labels.New(labels.Label{"a", "b"}).String(): {{0, 1}, {10, 3}},
testutil.Equals(t, map[string][]tsdbutil.Sample{
labels.New(labels.Label{"a", "b"}).String(): {sample{0, 1}, sample{10, 3}},
}, ssMap)
testutil.Ok(t, q.Close())
}
func TestDB_Snapshot(t *testing.T) {
@ -610,9 +606,9 @@ func TestDB_e2e(t *testing.T) {
},
}
seriesMap := map[string][]sample{}
seriesMap := map[string][]tsdbutil.Sample{}
for _, l := range lbls {
seriesMap[labels.New(l...).String()] = []sample{}
seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
}
db, delete := openTestDB(t, nil)
@ -625,7 +621,7 @@ func TestDB_e2e(t *testing.T) {
for _, l := range lbls {
lset := labels.New(l...)
series := []sample{}
series := []tsdbutil.Sample{}
ts := rand.Int63n(300)
for i := 0; i < numDatapoints; i++ {
@ -682,7 +678,7 @@ func TestDB_e2e(t *testing.T) {
mint := rand.Int63n(300)
maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints))
expected := map[string][]sample{}
expected := map[string][]tsdbutil.Sample{}
// Build the mockSeriesSet.
for _, m := range matched {
@ -698,7 +694,7 @@ func TestDB_e2e(t *testing.T) {
ss, err := q.Select(qry.ms...)
testutil.Ok(t, err)
result := map[string][]sample{}
result := map[string][]tsdbutil.Sample{}
for ss.Next() {
x := ss.At()
@ -1666,6 +1662,310 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}
func TestVerticalCompaction(t *testing.T) {
cases := []struct {
blockSeries [][]Series
expSeries map[string][]tsdbutil.Sample
}{
// Case 0
// |--------------|
// |----------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
}},
},
// Case 1
// |-------------------------------|
// |----------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
sample{11, 0}, sample{13, 0}, sample{17, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0},
sample{13, 0}, sample{17, 0},
}},
},
// Case 2
// |-------------------------------|
// |------------|
// |--------------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
sample{11, 0}, sample{13, 0}, sample{17, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{11, 0}, sample{13, 0},
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}},
},
// Case 3
// |-------------------|
// |--------------------|
// |----------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{14, 59},
sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}},
},
// Case 4
// |-------------------------------------|
// |------------|
// |-------------------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0},
}},
},
// Case 5: series are merged properly when there are multiple series.
// |-------------------------------------|
// |------------|
// |-------------------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
newSeries(map[string]string{"b": "c"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
},
[]Series{
newSeries(map[string]string{"__name__": "a"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{
`{__name__="a"}`: {
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
},
`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0},
},
`{aa="bb"}`: {
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
},
`{b="c"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
},
`{c="d"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0},
},
},
},
}
defaultMatcher := labels.NewMustRegexpMatcher("__name__", ".*")
for _, c := range cases {
if ok := t.Run("", func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "data")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
for _, series := range c.blockSeries {
createBlock(t, tmpdir, series)
}
db, err := Open(tmpdir, nil, nil, nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())
}()
db.DisableCompactions()
testutil.Assert(t, len(db.blocks) == len(c.blockSeries), "Wrong number of blocks [before compact].")
// Vertical Query Merging test.
querier, err := db.Querier(0, 100)
testutil.Ok(t, err)
actSeries := query(t, querier, defaultMatcher)
testutil.Equals(t, c.expSeries, actSeries)
// Vertical compaction.
lc := db.compactor.(*LeveledCompactor)
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here")
err = db.compact()
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.Blocks()), "Wrong number of blocks [after compact]")
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch")
// Query test after merging the overlapping blocks.
querier, err = db.Querier(0, 100)
testutil.Ok(t, err)
actSeries = query(t, querier, defaultMatcher)
testutil.Equals(t, c.expSeries, actSeries)
}); !ok {
return
}
}
}
// TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlapping blocks

View file

@ -616,6 +616,14 @@ func (h *rangeHead) Tombstones() (TombstoneReader, error) {
return emptyTombstoneReader, nil
}
func (h *rangeHead) MinTime() int64 {
return h.mint
}
func (h *rangeHead) MaxTime() int64 {
return h.maxt
}
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {

View file

@ -489,7 +489,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
it := exps.Iterator()
ressmpls, err := expandSeriesIterator(it)
testutil.Ok(t, err)
testutil.Equals(t, []sample{{11, 1}}, ressmpls)
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, ressmpls)
}
func TestDelete_e2e(t *testing.T) {
numDatapoints := 1000
@ -650,16 +650,16 @@ func TestDelete_e2e(t *testing.T) {
}
}
func boundedSamples(full []sample, mint, maxt int64) []sample {
func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample {
for len(full) > 0 {
if full[0].t >= mint {
if full[0].T() >= mint {
break
}
full = full[1:]
}
for i, s := range full {
// labels.Labelinate on the first sample larger than maxt.
if s.t > maxt {
if s.T() > maxt {
return full[:i]
}
}

View file

@ -64,10 +64,14 @@ func (mockIndexWriter) WritePostings(name, value string, it index.Postings) erro
func (mockIndexWriter) Close() error { return nil }
type mockBReader struct {
ir IndexReader
cr ChunkReader
ir IndexReader
cr ChunkReader
mint int64
maxt int64
}
func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil }
func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil }
func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil }
func (r *mockBReader) MinTime() int64 { return r.mint }
func (r *mockBReader) MaxTime() int64 { return r.maxt }

View file

@ -111,7 +111,6 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms)
}
func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
@ -143,6 +142,36 @@ func (q *querier) Close() error {
return merr.Err()
}
// verticalQuerier aggregates querying results from time blocks within
// a single partition. The block time ranges can be overlapping.
type verticalQuerier struct {
querier
}
func (q *verticalQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms)
}
func (q *verticalQuerier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
if len(qs) == 0 {
return EmptySeriesSet(), nil
}
if len(qs) == 1 {
return qs[0].Select(ms...)
}
l := len(qs) / 2
a, err := q.sel(qs[:l], ms)
if err != nil {
return nil, err
}
b, err := q.sel(qs[l:], ms)
if err != nil {
return nil, err
}
return newMergedVerticalSeriesSet(a, b), nil
}
// NewBlockQuerier returns a querier against the reader.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
indexr, err := b.Index()
@ -444,6 +473,72 @@ func (s *mergedSeriesSet) Next() bool {
return true
}
type mergedVerticalSeriesSet struct {
a, b SeriesSet
cur Series
adone, bdone bool
}
// NewMergedVerticalSeriesSet takes two series sets as a single series set.
// The input series sets must be sorted and
// the time ranges of the series can be overlapping.
func NewMergedVerticalSeriesSet(a, b SeriesSet) SeriesSet {
return newMergedVerticalSeriesSet(a, b)
}
func newMergedVerticalSeriesSet(a, b SeriesSet) *mergedVerticalSeriesSet {
s := &mergedVerticalSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return s
}
func (s *mergedVerticalSeriesSet) At() Series {
return s.cur
}
func (s *mergedVerticalSeriesSet) Err() error {
if s.a.Err() != nil {
return s.a.Err()
}
return s.b.Err()
}
func (s *mergedVerticalSeriesSet) compare() int {
if s.adone {
return 1
}
if s.bdone {
return -1
}
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
}
func (s *mergedVerticalSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
return false
}
d := s.compare()
// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.cur = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
s.cur = s.a.At()
s.adone = !s.a.Next()
} else {
s.cur = &verticalChainedSeries{series: []Series{s.a.At(), s.b.At()}}
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
}
return true
}
// ChunkSeriesSet exposes the chunks and intervals of a series instead of the
// actual series itself.
type ChunkSeriesSet interface {
@ -739,6 +834,100 @@ func (it *chainedSeriesIterator) Err() error {
return it.cur.Err()
}
// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series.
// They all must have the same labels.
type verticalChainedSeries struct {
series []Series
}
func (s *verticalChainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}
func (s *verticalChainedSeries) Iterator() SeriesIterator {
return newVerticalMergeSeriesIterator(s.series...)
}
// verticalMergeSeriesIterator implements a series iterater over a list
// of time-sorted, time-overlapping iterators.
type verticalMergeSeriesIterator struct {
a, b SeriesIterator
aok, bok, initialized bool
curT int64
curV float64
}
func newVerticalMergeSeriesIterator(s ...Series) SeriesIterator {
if len(s) == 1 {
return s[0].Iterator()
} else if len(s) == 2 {
return &verticalMergeSeriesIterator{
a: s[0].Iterator(),
b: s[1].Iterator(),
}
}
return &verticalMergeSeriesIterator{
a: s[0].Iterator(),
b: newVerticalMergeSeriesIterator(s[1:]...),
}
}
func (it *verticalMergeSeriesIterator) Seek(t int64) bool {
it.aok, it.bok = it.a.Seek(t), it.b.Seek(t)
it.initialized = true
return it.Next()
}
func (it *verticalMergeSeriesIterator) Next() bool {
if !it.initialized {
it.aok = it.a.Next()
it.bok = it.b.Next()
it.initialized = true
}
if !it.aok && !it.bok {
return false
}
if !it.aok {
it.curT, it.curV = it.b.At()
it.bok = it.b.Next()
return true
}
if !it.bok {
it.curT, it.curV = it.a.At()
it.aok = it.a.Next()
return true
}
acurT, acurV := it.a.At()
bcurT, bcurV := it.b.At()
if acurT < bcurT {
it.curT, it.curV = acurT, acurV
it.aok = it.a.Next()
} else if acurT > bcurT {
it.curT, it.curV = bcurT, bcurV
it.bok = it.b.Next()
} else {
it.curT, it.curV = bcurT, bcurV
it.aok = it.a.Next()
it.bok = it.b.Next()
}
return true
}
func (it *verticalMergeSeriesIterator) At() (t int64, v float64) {
return it.curT, it.curV
}
func (it *verticalMergeSeriesIterator) Err() error {
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}
// chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct {

View file

@ -178,7 +178,7 @@ Outer:
}
}
func expandSeriesIterator(it SeriesIterator) (r []sample, err error) {
func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) {
for it.Next() {
t, v := it.At()
r = append(r, sample{t: t, v: v})
@ -194,7 +194,7 @@ type seriesSamples struct {
// Index: labels -> postings -> chunkMetas -> chunkRef
// ChunkReader: ref -> vals
func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader, int64, int64) {
sort.Slice(tc, func(i, j int) bool {
return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0
})
@ -203,6 +203,8 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
chkReader := mockChunkReader(make(map[uint64]chunkenc.Chunk))
lblIdx := make(map[string]stringset)
mi := newMockIndex()
blockMint := int64(math.MaxInt64)
blockMaxt := int64(math.MinInt64)
for i, s := range tc {
i = i + 1 // 0 is not a valid posting.
@ -211,6 +213,13 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
// Collisions can be there, but for tests, its fine.
ref := rand.Uint64()
if chk[0].t < blockMint {
blockMint = chk[0].t
}
if chk[len(chk)-1].t > blockMaxt {
blockMaxt = chk[len(chk)-1].t
}
metas = append(metas, chunks.Meta{
MinTime: chk[0].t,
MaxTime: chk[len(chk)-1].t,
@ -248,7 +257,7 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
return mi.WritePostings(l.Name, l.Value, p)
})
return mi, chkReader
return mi, chkReader, blockMint, blockMaxt
}
func TestBlockQuerier(t *testing.T) {
@ -355,7 +364,7 @@ func TestBlockQuerier(t *testing.T) {
Outer:
for _, c := range cases.queries {
ir, cr := createIdxChkReaders(cases.data)
ir, cr, _, _ := createIdxChkReaders(cases.data)
querier := &blockQuerier{
index: ir,
chunks: cr,
@ -517,7 +526,7 @@ func TestBlockQuerierDelete(t *testing.T) {
Outer:
for _, c := range cases.queries {
ir, cr := createIdxChkReaders(cases.data)
ir, cr, _, _ := createIdxChkReaders(cases.data)
querier := &blockQuerier{
index: ir,
chunks: cr,
@ -924,6 +933,46 @@ func TestSeriesIterator(t *testing.T) {
})
t.Run("Chain", func(t *testing.T) {
// Extra cases for overlapping series.
itcasesExtra := []struct {
a, b, c []tsdbutil.Sample
exp []tsdbutil.Sample
mint, maxt int64
}{
{
a: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
},
b: []tsdbutil.Sample{
sample{5, 49}, sample{7, 89}, sample{9, 8},
},
c: []tsdbutil.Sample{
sample{2, 33}, sample{4, 44}, sample{10, 3},
},
exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 33}, sample{3, 5}, sample{4, 44}, sample{5, 49}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 3},
},
mint: math.MinInt64,
maxt: math.MaxInt64,
},
{
a: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{9, 5}, sample{13, 1},
},
b: []tsdbutil.Sample{},
c: []tsdbutil.Sample{
sample{1, 23}, sample{2, 342}, sample{3, 25}, sample{6, 11},
},
exp: []tsdbutil.Sample{
sample{1, 23}, sample{2, 342}, sample{3, 25}, sample{6, 11}, sample{9, 5}, sample{13, 1},
},
mint: math.MinInt64,
maxt: math.MaxInt64,
},
}
for _, tc := range itcases {
a, b, c := itSeries{newListSeriesIterator(tc.a)},
itSeries{newListSeriesIterator(tc.b)},
@ -939,30 +988,55 @@ func TestSeriesIterator(t *testing.T) {
testutil.Equals(t, smplExp, smplRes)
}
for _, tc := range append(itcases, itcasesExtra...) {
a, b, c := itSeries{newListSeriesIterator(tc.a)},
itSeries{newListSeriesIterator(tc.b)},
itSeries{newListSeriesIterator(tc.c)}
res := newVerticalMergeSeriesIterator(a, b, c)
exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp))
smplExp, errExp := expandSeriesIterator(exp)
smplRes, errRes := expandSeriesIterator(res)
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
t.Run("Seek", func(t *testing.T) {
for _, tc := range seekcases {
a, b, c := itSeries{newListSeriesIterator(tc.a)},
itSeries{newListSeriesIterator(tc.b)},
itSeries{newListSeriesIterator(tc.c)}
ress := []SeriesIterator{
newChainedSeriesIterator(
itSeries{newListSeriesIterator(tc.a)},
itSeries{newListSeriesIterator(tc.b)},
itSeries{newListSeriesIterator(tc.c)},
),
newVerticalMergeSeriesIterator(
itSeries{newListSeriesIterator(tc.a)},
itSeries{newListSeriesIterator(tc.b)},
itSeries{newListSeriesIterator(tc.c)},
),
}
res := newChainedSeriesIterator(a, b, c)
exp := newListSeriesIterator(tc.exp)
for _, res := range ress {
exp := newListSeriesIterator(tc.exp)
testutil.Equals(t, tc.success, res.Seek(tc.seek))
testutil.Equals(t, tc.success, res.Seek(tc.seek))
if tc.success {
// Init the list and then proceed to check.
remaining := exp.Next()
testutil.Assert(t, remaining == true, "")
if tc.success {
// Init the list and then proceed to check.
remaining := exp.Next()
testutil.Assert(t, remaining == true, "")
for remaining {
sExp, eExp := exp.At()
sRes, eRes := res.At()
testutil.Equals(t, eExp, eRes)
testutil.Equals(t, sExp, sRes)
for remaining {
sExp, eExp := exp.At()
sRes, eRes := res.At()
testutil.Equals(t, eExp, eRes)
testutil.Equals(t, sExp, sRes)
remaining = exp.Next()
testutil.Equals(t, remaining, res.Next())
remaining = exp.Next()
testutil.Equals(t, remaining, res.Next())
}
}
}
}
@ -1159,6 +1233,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil)
testutil.Ok(b, err)
defer block.Close()
@ -1439,3 +1514,178 @@ func (it *listSeriesIterator) Seek(t int64) bool {
func (it *listSeriesIterator) Err() error {
return nil
}
func BenchmarkQueryIterator(b *testing.B) {
cases := []struct {
numBlocks int
numSeries int
numSamplesPerSeriesPerBlock int
overlapPercentages []int // >=0, <=100, this is w.r.t. the previous block.
}{
{
numBlocks: 20,
numSeries: 1000,
numSamplesPerSeriesPerBlock: 20000,
overlapPercentages: []int{0, 10, 30},
},
}
for _, c := range cases {
for _, overlapPercentage := range c.overlapPercentages {
benchMsg := fmt.Sprintf("nBlocks=%d,nSeries=%d,numSamplesPerSeriesPerBlock=%d,overlap=%d%%",
c.numBlocks, c.numSeries, c.numSamplesPerSeriesPerBlock, overlapPercentage)
b.Run(benchMsg, func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_query_iterator")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
var (
blocks []*Block
overlapDelta = int64(overlapPercentage * c.numSamplesPerSeriesPerBlock / 100)
prefilledLabels []map[string]string
generatedSeries []Series
)
for i := int64(0); i < int64(c.numBlocks); i++ {
offset := i * overlapDelta
mint := i*int64(c.numSamplesPerSeriesPerBlock) - offset
maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1
if len(prefilledLabels) == 0 {
generatedSeries = genSeries(c.numSeries, 10, mint, maxt)
for _, s := range generatedSeries {
prefilledLabels = append(prefilledLabels, s.Labels().Map())
}
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil)
testutil.Ok(b, err)
blocks = append(blocks, block)
defer block.Close()
}
que := &querier{
blocks: make([]Querier, 0, len(blocks)),
}
for _, blk := range blocks {
q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64)
testutil.Ok(b, err)
que.blocks = append(que.blocks, q)
}
var sq Querier = que
if overlapPercentage > 0 {
sq = &verticalQuerier{
querier: *que,
}
}
defer sq.Close()
b.ResetTimer()
b.ReportAllocs()
ss, err := sq.Select(labels.NewMustRegexpMatcher("__name__", ".*"))
testutil.Ok(b, err)
for ss.Next() {
it := ss.At().Iterator()
for it.Next() {
}
testutil.Ok(b, it.Err())
}
testutil.Ok(b, ss.Err())
testutil.Ok(b, err)
})
}
}
}
func BenchmarkQuerySeek(b *testing.B) {
cases := []struct {
numBlocks int
numSeries int
numSamplesPerSeriesPerBlock int
overlapPercentages []int // >=0, <=100, this is w.r.t. the previous block.
}{
{
numBlocks: 20,
numSeries: 100,
numSamplesPerSeriesPerBlock: 2000,
overlapPercentages: []int{0, 10, 30, 50},
},
}
for _, c := range cases {
for _, overlapPercentage := range c.overlapPercentages {
benchMsg := fmt.Sprintf("nBlocks=%d,nSeries=%d,numSamplesPerSeriesPerBlock=%d,overlap=%d%%",
c.numBlocks, c.numSeries, c.numSamplesPerSeriesPerBlock, overlapPercentage)
b.Run(benchMsg, func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_query_iterator")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
var (
blocks []*Block
overlapDelta = int64(overlapPercentage * c.numSamplesPerSeriesPerBlock / 100)
prefilledLabels []map[string]string
generatedSeries []Series
)
for i := int64(0); i < int64(c.numBlocks); i++ {
offset := i * overlapDelta
mint := i*int64(c.numSamplesPerSeriesPerBlock) - offset
maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1
if len(prefilledLabels) == 0 {
generatedSeries = genSeries(c.numSeries, 10, mint, maxt)
for _, s := range generatedSeries {
prefilledLabels = append(prefilledLabels, s.Labels().Map())
}
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil)
testutil.Ok(b, err)
blocks = append(blocks, block)
defer block.Close()
}
que := &querier{
blocks: make([]Querier, 0, len(blocks)),
}
for _, blk := range blocks {
q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64)
testutil.Ok(b, err)
que.blocks = append(que.blocks, q)
}
var sq Querier = que
if overlapPercentage > 0 {
sq = &verticalQuerier{
querier: *que,
}
}
defer sq.Close()
mint := blocks[0].meta.MinTime
maxt := blocks[len(blocks)-1].meta.MaxTime
b.ResetTimer()
b.ReportAllocs()
ss, err := sq.Select(labels.NewMustRegexpMatcher("__name__", ".*"))
for ss.Next() {
it := ss.At().Iterator()
for t := mint; t <= maxt; t++ {
it.Seek(t)
}
testutil.Ok(b, it.Err())
}
testutil.Ok(b, ss.Err())
testutil.Ok(b, err)
})
}
}
}