Merge remote-tracking branch 'upstream/master' into shutdown-during-compaction

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-01-30 13:47:50 +02:00
commit 00d5e19baf
8 changed files with 244 additions and 184 deletions

View file

@ -2,6 +2,8 @@
- [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db.
- [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block.
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`
## 0.4.0 ## 0.4.0
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)

View file

@ -22,8 +22,8 @@ import (
"testing" "testing"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
) )
// In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped // In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped
@ -46,7 +46,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
blockDir := createBlock(t, tmpdir, 1, 0, 0) blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0))
b, err := OpenBlock(nil, blockDir, nil) b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Equals(t, false, b.meta.Compaction.Failed)
@ -60,33 +60,32 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, b.Close()) testutil.Ok(t, b.Close())
} }
// createBlock creates a block with nSeries series, filled with // createBlock creates a block with given set of series and returns its dir.
// samples of the given mint,maxt time range and returns its dir. func createBlock(tb testing.TB, dir string, series []Series) string {
func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) string {
head, err := NewHead(nil, nil, nil, 2*60*60*1000) head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err) testutil.Ok(tb, err)
defer head.Close() defer head.Close()
lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries) app := head.Appender()
testutil.Ok(tb, err) for _, s := range series {
refs := make([]uint64, nSeries) ref := uint64(0)
it := s.Iterator()
for ts := mint; ts <= maxt; ts++ { for it.Next() {
app := head.Appender() t, v := it.At()
for i, lbl := range lbls { if ref != 0 {
if refs[i] != 0 { err := app.AddFast(ref, t, v)
err := app.AddFast(refs[i], ts, rand.Float64())
if err == nil { if err == nil {
continue continue
} }
} }
ref, err := app.Add(lbl, int64(ts), rand.Float64()) ref, err = app.Add(s.Labels(), t, v)
testutil.Ok(tb, err) testutil.Ok(tb, err)
refs[i] = ref refs[i] = ref
} }
err := app.Commit() testutil.Ok(tb, it.Err())
testutil.Ok(tb, err)
} }
err = app.Commit()
testutil.Ok(tb, err)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(tb, err) testutil.Ok(tb, err)
@ -97,3 +96,53 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin
testutil.Ok(tb, err) testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String()) return filepath.Join(dir, ulid.String())
} }
// genSeries generates series with a given number of labels and values.
func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}
series := make([]Series, totalSeries)
for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
for len(lbls) < labelCount {
lbls[randString()] = randString()
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series[i] = newSeries(lbls, samples)
}
return series
}
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
// randString generates random string.
func randString() string {
maxLength := int32(50)
length := rand.Int31n(maxLength)
b := make([]byte, length+1)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := length, rand.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = rand.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b)
}

View file

@ -352,30 +352,31 @@ func (s *Reader) Size() int64 {
return s.size return s.size
} }
// Chunk returns a chunk from a given reference.
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var ( var (
seq = int(ref >> 32) sgmSeq = int(ref >> 32)
off = int((ref << 32) >> 32) sgmOffset = int((ref << 32) >> 32)
) )
if seq >= len(s.bs) { if sgmSeq >= len(s.bs) {
return nil, errors.Errorf("reference sequence %d out of range", seq) return nil, errors.Errorf("reference sequence %d out of range", sgmSeq)
} }
b := s.bs[seq] chkS := s.bs[sgmSeq]
if off >= b.Len() { if sgmOffset >= chkS.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len())
} }
// With the minimum chunk length this should never cause us reading // With the minimum chunk length this should never cause us reading
// over the end of the slice. // over the end of the slice.
r := b.Range(off, off+binary.MaxVarintLen32) chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32)
l, n := binary.Uvarint(r) chkLen, n := binary.Uvarint(chk)
if n <= 0 { if n <= 0 {
return nil, errors.Errorf("reading chunk length failed with %d", n) return nil, errors.Errorf("reading chunk length failed with %d", n)
} }
r = b.Range(off+n, off+n+int(l)) chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen))
return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l]) return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen])
} }
func nextSequenceFile(dir string) (string, int, error) { func nextSequenceFile(dir string) (string, int, error) {

View file

@ -424,6 +424,8 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
start := time.Now()
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
@ -450,7 +452,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return ulid.ULID{}, nil return ulid.ULID{}, nil
} }
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) level.Info(c.logger).Log(
"msg", "write block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"duration", time.Since(start),
)
return uid, nil return uid, nil
} }

2
db.go
View file

@ -206,7 +206,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Help: "The time taken to recompact blocks to remove tombstones.", Help: "The time taken to recompact blocks to remove tombstones.",
}) })
m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_storage_blocks_bytes_total", Name: "prometheus_tsdb_storage_blocks_bytes",
Help: "The number of bytes that are currently used for local storage by all blocks.", Help: "The number of bytes that are currently used for local storage by all blocks.",
}) })
m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal" "github.com/prometheus/tsdb/wal"
) )
@ -87,7 +88,7 @@ func TestDB_reloadOrder(t *testing.T) {
{MinTime: 100, MaxTime: 110}, {MinTime: 100, MaxTime: 110},
} }
for _, m := range metas { for _, m := range metas {
createBlock(t, db.Dir(), 1, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) testutil.Ok(t, db.reload())
@ -250,7 +251,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -477,7 +478,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -782,7 +783,7 @@ func TestTombstoneClean(t *testing.T) {
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -836,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2 totalBlocks := 2
for i := 0; i < totalBlocks; i++ { for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), 1, 0, 0) blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0))
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction. // Add some some fake tombstones to trigger the compaction.
@ -880,7 +881,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
} }
block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil)
testutil.Ok(c.t, err) testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
@ -918,7 +919,7 @@ func TestTimeRetention(t *testing.T) {
} }
for _, m := range blocks { for _, m := range blocks {
createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
@ -952,7 +953,7 @@ func TestSizeRetention(t *testing.T) {
} }
for _, m := range blocks { for _, m := range blocks {
createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime))
} }
// Test that registered size matches the actual disk size. // Test that registered size matches the actual disk size.
@ -1319,7 +1320,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
createBlock(t, dir, 1, 1000, 2000) createBlock(t, dir, genSeries(1, 1, 1000, 2000))
db, err := Open(dir, nil, nil, nil) db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1332,7 +1333,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
createBlock(t, dir, 1, 1000, 6000) createBlock(t, dir, genSeries(1, 1, 1000, 6000))
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal")) w, err := wal.New(nil, nil, path.Join(dir, "wal"))
@ -1450,7 +1451,7 @@ func TestNoEmptyBlocks(t *testing.T) {
{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
} }
for _, m := range blocks { for _, m := range blocks {
createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime))
} }
oldBlocks := db.Blocks() oldBlocks := db.Blocks()
@ -1603,9 +1604,9 @@ func TestCorrectNumTombstones(t *testing.T) {
// TestBlockRanges checks the following use cases: // TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt. // - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlaping blocks // - The compactor doesn't create overlapping blocks
// even when the last blocks is not within the default boundaries. // even when the last blocks is not within the default boundaries.
// - Lower bondary is based on the smallest sample in the head and // - Lower boundary is based on the smallest sample in the head and
// upper boundary is rounded to the configured block range. // upper boundary is rounded to the configured block range.
// //
// This ensures that a snapshot that includes the head and creates a block with a custom time range // This ensures that a snapshot that includes the head and creates a block with a custom time range
@ -1623,7 +1624,7 @@ func TestBlockRanges(t *testing.T) {
// Test that the compactor doesn't create overlapping blocks // Test that the compactor doesn't create overlapping blocks
// when a non standard block already exists. // when a non standard block already exists.
firstBlockMaxT := int64(3) firstBlockMaxT := int64(3)
createBlock(t, dir, 1, 0, firstBlockMaxT) createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
db, err := Open(dir, logger, nil, DefaultOptions) db, err := Open(dir, logger, nil, DefaultOptions)
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)
@ -1673,7 +1674,7 @@ func TestBlockRanges(t *testing.T) {
testutil.Ok(t, db.Close()) testutil.Ok(t, db.Close())
thirdBlockMaxt := secondBlockMaxt + 2 thirdBlockMaxt := secondBlockMaxt + 2
createBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt))
db, err = Open(dir, logger, nil, DefaultOptions) db, err = Open(dir, logger, nil, DefaultOptions)
if err != nil { if err != nil {

View file

@ -28,6 +28,7 @@ import (
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal" "github.com/prometheus/tsdb/wal"
) )
@ -352,7 +353,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -472,9 +473,9 @@ func TestDelete_e2e(t *testing.T) {
{"job", "prom-k8s"}, {"job", "prom-k8s"},
}, },
} }
seriesMap := map[string][]Sample{} seriesMap := map[string][]tsdbutil.Sample{}
for _, l := range lbls { for _, l := range lbls {
seriesMap[labels.New(l...).String()] = []Sample{} seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
} }
dir, _ := ioutil.TempDir("", "test") dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -484,7 +485,7 @@ func TestDelete_e2e(t *testing.T) {
app := hb.Appender() app := hb.Appender()
for _, l := range lbls { for _, l := range lbls {
ls := labels.New(l...) ls := labels.New(l...)
series := []Sample{} series := []tsdbutil.Sample{}
ts := rand.Int63n(300) ts := rand.Int63n(300)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
v := rand.Float64() v := rand.Float64()
@ -604,8 +605,8 @@ func boundedSamples(full []sample, mint, maxt int64) []sample {
return full return full
} }
func deletedSamples(full []Sample, dranges Intervals) []Sample { func deletedSamples(full []tsdbutil.Sample, dranges Intervals) []tsdbutil.Sample {
ds := make([]Sample, 0, len(full)) ds := make([]tsdbutil.Sample, 0, len(full))
Outer: Outer:
for _, s := range full { for _, s := range full {
for _, r := range dranges { for _, r := range dranges {

View file

@ -68,58 +68,6 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() bool { return m.next() } func (m *mockSeriesIterator) Next() bool { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() } func (m *mockSeriesIterator) Err() error { return m.err() }
type mockSeries struct {
labels func() labels.Labels
iterator func() SeriesIterator
}
type Sample = tsdbutil.Sample
func newSeries(l map[string]string, s []Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
type listSeriesIterator struct {
list []Sample
idx int
}
func newListSeriesIterator(list []Sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx]
return s.T(), s.V()
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx]
return s.T() >= t
})
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Err() error {
return nil
}
func TestMergedSeriesSet(t *testing.T) { func TestMergedSeriesSet(t *testing.T) {
cases := []struct { cases := []struct {
@ -134,32 +82,32 @@ func TestMergedSeriesSet(t *testing.T) {
a: newMockSeriesSet([]Series{ a: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
b: newMockSeriesSet([]Series{ b: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
exp: newMockSeriesSet([]Series{ exp: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
@ -169,13 +117,13 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 2}, sample{t: 1, v: 2},
}), }),
}), }),
@ -183,13 +131,13 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 1}, sample{t: 2, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "query", "handler": "query",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
}), }),
@ -197,20 +145,20 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
sample{t: 2, v: 1}, sample{t: 2, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 2}, sample{t: 1, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "query", "handler": "query",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
}), }),
@ -316,7 +264,7 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
} }
func TestBlockQuerier(t *testing.T) { func TestBlockQuerier(t *testing.T) {
newSeries := func(l map[string]string, s []Sample) Series { newSeries := func(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -404,13 +352,13 @@ func TestBlockQuerier(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, },
[]Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}}, []tsdbutil.Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}}, []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
), ),
}), }),
}, },
@ -456,7 +404,7 @@ Outer:
} }
func TestBlockQuerierDelete(t *testing.T) { func TestBlockQuerierDelete(t *testing.T) {
newSeries := func(l map[string]string, s []Sample) Series { newSeries := func(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -531,13 +479,13 @@ func TestBlockQuerierDelete(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, },
[]Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, []tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{4, 15}, sample{5, 3}}, []tsdbutil.Sample{sample{4, 15}, sample{5, 3}},
), ),
}), }),
}, },
@ -550,12 +498,12 @@ func TestBlockQuerierDelete(t *testing.T) {
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{4, 15}, sample{5, 3}}, []tsdbutil.Sample{sample{4, 15}, sample{5, 3}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, },
[]Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}}, []tsdbutil.Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}},
), ),
}), }),
}, },
@ -568,7 +516,7 @@ func TestBlockQuerierDelete(t *testing.T) {
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{4, 15}}, []tsdbutil.Sample{sample{4, 15}},
), ),
}), }),
}, },
@ -727,66 +675,66 @@ func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func TestSeriesIterator(t *testing.T) { func TestSeriesIterator(t *testing.T) {
itcases := []struct { itcases := []struct {
a, b, c []Sample a, b, c []tsdbutil.Sample
exp []Sample exp []tsdbutil.Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{}, c: []tsdbutil.Sample{},
exp: []Sample{}, exp: []tsdbutil.Sample{},
mint: math.MinInt64, mint: math.MinInt64,
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{1, 2}, sample{1, 2},
sample{2, 3}, sample{2, 3},
sample{3, 5}, sample{3, 5},
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
exp: []Sample{ exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: math.MinInt64, mint: math.MinInt64,
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{ b: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
exp: []Sample{ exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: 2, mint: 2,
maxt: 8, maxt: 8,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
exp: []Sample{ exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493},
}, },
mint: 6, mint: 6,
@ -795,29 +743,29 @@ func TestSeriesIterator(t *testing.T) {
} }
seekcases := []struct { seekcases := []struct {
a, b, c []Sample a, b, c []tsdbutil.Sample
seek int64 seek int64
success bool success bool
exp []Sample exp []tsdbutil.Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{}, c: []tsdbutil.Sample{},
seek: 0, seek: 0,
success: false, success: false,
exp: nil, exp: nil,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{2, 3}, sample{2, 3},
}, },
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
@ -828,55 +776,55 @@ func TestSeriesIterator(t *testing.T) {
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{ b: []tsdbutil.Sample{
sample{1, 2}, sample{3, 5}, sample{6, 1}, sample{1, 2}, sample{3, 5}, sample{6, 1},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
seek: 2, seek: 2,
success: true, success: true,
exp: []Sample{ exp: []tsdbutil.Sample{
sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: 5, mint: 5,
maxt: 8, maxt: 8,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 10, seek: 10,
success: true, success: true,
exp: []Sample{ exp: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
mint: 10, mint: 10,
maxt: 203, maxt: 203,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 203, seek: 203,
success: true, success: true,
exp: []Sample{ exp: []tsdbutil.Sample{
sample{203, 3493}, sample{203, 3493},
}, },
mint: 7, mint: 7,
@ -893,10 +841,10 @@ func TestSeriesIterator(t *testing.T) {
} }
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
smplValid := make([]Sample, 0) smplValid := make([]tsdbutil.Sample, 0)
for _, s := range tc.exp { for _, s := range tc.exp {
if s.T() >= tc.mint && s.T() <= tc.maxt { if s.T() >= tc.mint && s.T() <= tc.maxt {
smplValid = append(smplValid, Sample(s)) smplValid = append(smplValid, tsdbutil.Sample(s))
} }
} }
exp := newListSeriesIterator(smplValid) exp := newListSeriesIterator(smplValid)
@ -910,22 +858,22 @@ func TestSeriesIterator(t *testing.T) {
t.Run("Seek", func(t *testing.T) { t.Run("Seek", func(t *testing.T) {
extra := []struct { extra := []struct {
a, b, c []Sample a, b, c []tsdbutil.Sample
seek int64 seek int64
success bool success bool
exp []Sample exp []tsdbutil.Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
@ -936,19 +884,19 @@ func TestSeriesIterator(t *testing.T) {
maxt: 202, maxt: 202,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 5, seek: 5,
success: true, success: true,
exp: []Sample{sample{10, 22}}, exp: []tsdbutil.Sample{sample{10, 22}},
mint: 10, mint: 10,
maxt: 202, maxt: 202,
}, },
@ -964,10 +912,10 @@ func TestSeriesIterator(t *testing.T) {
} }
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
smplValid := make([]Sample, 0) smplValid := make([]tsdbutil.Sample, 0)
for _, s := range tc.exp { for _, s := range tc.exp {
if s.T() >= tc.mint && s.T() <= tc.maxt { if s.T() >= tc.mint && s.T() <= tc.maxt {
smplValid = append(smplValid, Sample(s)) smplValid = append(smplValid, tsdbutil.Sample(s))
} }
} }
exp := newListSeriesIterator(smplValid) exp := newListSeriesIterator(smplValid)
@ -1000,7 +948,7 @@ func TestSeriesIterator(t *testing.T) {
itSeries{newListSeriesIterator(tc.c)} itSeries{newListSeriesIterator(tc.c)}
res := newChainedSeriesIterator(a, b, c) res := newChainedSeriesIterator(a, b, c)
exp := newListSeriesIterator([]Sample(tc.exp)) exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp))
smplExp, errExp := expandSeriesIterator(exp) smplExp, errExp := expandSeriesIterator(exp)
smplRes, errRes := expandSeriesIterator(res) smplRes, errRes := expandSeriesIterator(res)
@ -1045,9 +993,9 @@ func TestSeriesIterator(t *testing.T) {
// Regression for: https://github.com/prometheus/tsdb/pull/97 // Regression for: https://github.com/prometheus/tsdb/pull/97
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
chkMetas := []chunks.Meta{ chkMetas := []chunks.Meta{
tsdbutil.ChunkFromSamples([]Sample{}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}),
tsdbutil.ChunkFromSamples([]Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
tsdbutil.ChunkFromSamples([]Sample{sample{4, 4}, sample{5, 5}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 4}, sample{5, 5}}),
} }
res := newChunkSeriesIterator(chkMetas, nil, 2, 8) res := newChunkSeriesIterator(chkMetas, nil, 2, 8)
@ -1062,9 +1010,9 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
// skipped to the end when seeking a value in the current chunk. // skipped to the end when seeking a value in the current chunk.
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
metas := []chunks.Meta{ metas := []chunks.Meta{
tsdbutil.ChunkFromSamples([]Sample{}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}),
tsdbutil.ChunkFromSamples([]Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}),
tsdbutil.ChunkFromSamples([]Sample{}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}),
} }
it := newChunkSeriesIterator(metas, nil, 1, 7) it := newChunkSeriesIterator(metas, nil, 1, 7)
@ -1084,7 +1032,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
// Seek gets called and advances beyond the max time, which was just accepted as a valid sample. // Seek gets called and advances beyond the max time, which was just accepted as a valid sample.
func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
metas := []chunks.Meta{ metas := []chunks.Meta{
tsdbutil.ChunkFromSamples([]Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}),
} }
it := newChunkSeriesIterator(metas, nil, 2, 4) it := newChunkSeriesIterator(metas, nil, 2, 4)
@ -1232,7 +1180,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted") dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err) testutil.Ok(b, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil)
testutil.Ok(b, err) testutil.Ok(b, err)
defer block.Close() defer block.Close()
@ -1462,3 +1410,53 @@ func (m mockIndex) LabelNames() ([]string, error) {
sort.Strings(labelNames) sort.Strings(labelNames)
return labelNames, nil return labelNames, nil
} }
type mockSeries struct {
labels func() labels.Labels
iterator func() SeriesIterator
}
func newSeries(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
type listSeriesIterator struct {
list []tsdbutil.Sample
idx int
}
func newListSeriesIterator(list []tsdbutil.Sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx]
return s.T(), s.V()
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx]
return s.T() >= t
})
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Err() error {
return nil
}