Merge branch 'master' into update-makefile-common

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-02-07 12:10:22 +01:00
commit 95334f13c5
10 changed files with 299 additions and 234 deletions

View file

@ -1,5 +1,7 @@
## master / unreleased ## master / unreleased
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`
## 0.4.0 ## 0.4.0
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)

View file

@ -21,8 +21,8 @@ import (
"testing" "testing"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
) )
// In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped // In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped
@ -45,7 +45,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
blockDir := createBlock(t, tmpdir, 1, 0, 0) blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0))
b, err := OpenBlock(nil, blockDir, nil) b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Equals(t, false, b.meta.Compaction.Failed)
@ -59,33 +59,31 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, b.Close()) testutil.Ok(t, b.Close())
} }
// createBlock creates a block with nSeries series, filled with // createBlock creates a block with given set of series and returns its dir.
// samples of the given mint,maxt time range and returns its dir. func createBlock(tb testing.TB, dir string, series []Series) string {
func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) string {
head, err := NewHead(nil, nil, nil, 2*60*60*1000) head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err) testutil.Ok(tb, err)
defer head.Close() defer head.Close()
lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries)
testutil.Ok(tb, err)
refs := make([]uint64, nSeries)
for ts := mint; ts <= maxt; ts++ {
app := head.Appender() app := head.Appender()
for i, lbl := range lbls { for _, s := range series {
if refs[i] != 0 { ref := uint64(0)
err := app.AddFast(refs[i], ts, rand.Float64()) it := s.Iterator()
for it.Next() {
t, v := it.At()
if ref != 0 {
err := app.AddFast(ref, t, v)
if err == nil { if err == nil {
continue continue
} }
} }
ref, err := app.Add(lbl, int64(ts), rand.Float64()) ref, err = app.Add(s.Labels(), t, v)
testutil.Ok(tb, err)
refs[i] = ref
}
err := app.Commit()
testutil.Ok(tb, err) testutil.Ok(tb, err)
} }
testutil.Ok(tb, it.Err())
}
err = app.Commit()
testutil.Ok(tb, err)
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil) compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(tb, err) testutil.Ok(tb, err)
@ -96,3 +94,53 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin
testutil.Ok(tb, err) testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String()) return filepath.Join(dir, ulid.String())
} }
// genSeries generates series with a given number of labels and values.
func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}
series := make([]Series, totalSeries)
for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
for len(lbls) < labelCount {
lbls[randString()] = randString()
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series[i] = newSeries(lbls, samples)
}
return series
}
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
// randString generates random string.
func randString() string {
maxLength := int32(50)
length := rand.Int31n(maxLength)
b := make([]byte, length+1)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := length, rand.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = rand.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b)
}

View file

@ -350,30 +350,31 @@ func (s *Reader) Size() int64 {
return s.size return s.size
} }
// Chunk returns a chunk from a given reference.
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var ( var (
seq = int(ref >> 32) sgmSeq = int(ref >> 32)
off = int((ref << 32) >> 32) sgmOffset = int((ref << 32) >> 32)
) )
if seq >= len(s.bs) { if sgmSeq >= len(s.bs) {
return nil, errors.Errorf("reference sequence %d out of range", seq) return nil, errors.Errorf("reference sequence %d out of range", sgmSeq)
} }
b := s.bs[seq] chkS := s.bs[sgmSeq]
if off >= b.Len() { if sgmOffset >= chkS.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len())
} }
// With the minimum chunk length this should never cause us reading // With the minimum chunk length this should never cause us reading
// over the end of the slice. // over the end of the slice.
r := b.Range(off, off+binary.MaxVarintLen32) chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32)
l, n := binary.Uvarint(r) chkLen, n := binary.Uvarint(chk)
if n <= 0 { if n <= 0 {
return nil, errors.Errorf("reading chunk length failed with %d", n) return nil, errors.Errorf("reading chunk length failed with %d", n)
} }
r = b.Range(off+n, off+n+int(l)) chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen))
return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l]) return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen])
} }
func nextSequenceFile(dir string) (string, int, error) { func nextSequenceFile(dir string) (string, int, error) {

View file

@ -413,6 +413,8 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
start := time.Now()
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
@ -439,7 +441,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return ulid.ULID{}, nil return ulid.ULID{}, nil
} }
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) level.Info(c.logger).Log(
"msg", "write block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"duration", time.Since(start),
)
return uid, nil return uid, nil
} }

2
db.go
View file

@ -202,7 +202,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Help: "The time taken to recompact blocks to remove tombstones.", Help: "The time taken to recompact blocks to remove tombstones.",
}) })
m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_storage_blocks_bytes_total", Name: "prometheus_tsdb_storage_blocks_bytes",
Help: "The number of bytes that are currently used for local storage by all blocks.", Help: "The number of bytes that are currently used for local storage by all blocks.",
}) })
m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal" "github.com/prometheus/tsdb/wal"
) )
@ -87,7 +88,7 @@ func TestDB_reloadOrder(t *testing.T) {
{MinTime: 100, MaxTime: 110}, {MinTime: 100, MaxTime: 110},
} }
for _, m := range metas { for _, m := range metas {
createBlock(t, db.Dir(), 1, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) testutil.Ok(t, db.reload())
@ -250,7 +251,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -477,7 +478,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -778,7 +779,7 @@ func TestTombstoneClean(t *testing.T) {
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -832,7 +833,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2 totalBlocks := 2
for i := 0; i < totalBlocks; i++ { for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), 1, 0, 0) blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0))
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction. // Add some some fake tombstones to trigger the compaction.
@ -876,7 +877,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
} }
block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil)
testutil.Ok(c.t, err) testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
@ -914,7 +915,7 @@ func TestTimeRetention(t *testing.T) {
} }
for _, m := range blocks { for _, m := range blocks {
createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
@ -948,7 +949,7 @@ func TestSizeRetention(t *testing.T) {
} }
for _, m := range blocks { for _, m := range blocks {
createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime))
} }
// Test that registered size matches the actual disk size. // Test that registered size matches the actual disk size.
@ -1315,7 +1316,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
createBlock(t, dir, 1, 1000, 2000) createBlock(t, dir, genSeries(1, 1, 1000, 2000))
db, err := Open(dir, nil, nil, nil) db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1328,7 +1329,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
createBlock(t, dir, 1, 1000, 6000) createBlock(t, dir, genSeries(1, 1, 1000, 6000))
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal")) w, err := wal.New(nil, nil, path.Join(dir, "wal"))
@ -1446,7 +1447,7 @@ func TestNoEmptyBlocks(t *testing.T) {
{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
} }
for _, m := range blocks { for _, m := range blocks {
createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime))
} }
oldBlocks := db.Blocks() oldBlocks := db.Blocks()
@ -1599,9 +1600,9 @@ func TestCorrectNumTombstones(t *testing.T) {
// TestBlockRanges checks the following use cases: // TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt. // - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlaping blocks // - The compactor doesn't create overlapping blocks
// even when the last blocks is not within the default boundaries. // even when the last blocks is not within the default boundaries.
// - Lower bondary is based on the smallest sample in the head and // - Lower boundary is based on the smallest sample in the head and
// upper boundary is rounded to the configured block range. // upper boundary is rounded to the configured block range.
// //
// This ensures that a snapshot that includes the head and creates a block with a custom time range // This ensures that a snapshot that includes the head and creates a block with a custom time range
@ -1619,7 +1620,7 @@ func TestBlockRanges(t *testing.T) {
// Test that the compactor doesn't create overlapping blocks // Test that the compactor doesn't create overlapping blocks
// when a non standard block already exists. // when a non standard block already exists.
firstBlockMaxT := int64(3) firstBlockMaxT := int64(3)
createBlock(t, dir, 1, 0, firstBlockMaxT) createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
db, err := Open(dir, logger, nil, DefaultOptions) db, err := Open(dir, logger, nil, DefaultOptions)
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)
@ -1669,7 +1670,7 @@ func TestBlockRanges(t *testing.T) {
testutil.Ok(t, db.Close()) testutil.Ok(t, db.Close())
thirdBlockMaxt := secondBlockMaxt + 2 thirdBlockMaxt := secondBlockMaxt + 2
createBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt))
db, err = Open(dir, logger, nil, DefaultOptions) db, err = Open(dir, logger, nil, DefaultOptions)
if err != nil { if err != nil {

View file

@ -85,34 +85,40 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc
`mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one. `mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one.
``` ```
┌─────────────────────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────────────────────
│ len <uvarint> │ len <uvarint>
├─────────────────────────────────────────────────────────────────────────┤ ├──────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────────────────────┐ │ │ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ │ ┌──────────────────────────────────────────┐ │ │ │ │ labels count <uvarint64> │ │
│ │ │ │ ref(l_i.name) <uvarint> │ │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ #labels │ ├──────────────────────────────────────────┤ ... │ │ │ │ ┌────────────────────────────────────────────┐ │ │
│ │ <uvarint> │ │ ref(l_i.value) <uvarint> │ │ │ │ │ │ ref(l_i.name) <uvarint32> │ │ │
│ │ │ └──────────────────────────────────────────┘ │ │ │ │ ├────────────────────────────────────────────┤ │ │
│ ├──────────────────┼──────────────────────────────────────────────────┤ │ │ │ │ ref(l_i.value) <uvarint32> │ │ │
│ │ │ ┌──────────────────────────────────────────┐ │ │ │ │ └────────────────────────────────────────────┘ │ │
│ │ │ │ c_0.mint <varint> │ │ │ │ │ ... │ │
│ │ │ ├──────────────────────────────────────────┤ │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ │ │ c_0.maxt - c_0.mint <uvarint> │ │ │ │ │ chunks count <uvarint64> │ │
│ │ │ ├──────────────────────────────────────────┤ │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ │ │ ref(c_0.data) <uvarint> │ │ │ │ │ ┌────────────────────────────────────────────┐ │ │
│ │ #chunks │ └──────────────────────────────────────────┘ │ │ │ │ │ c_0.mint <varint64> │ │ │
│ │ <uvarint> │ ┌──────────────────────────────────────────┐ │ │ │ │ ├────────────────────────────────────────────┤ │ │
│ │ │ │ c_i.mint - c_i-1.maxt <uvarint> │ │ │ │ │ │ c_0.maxt - c_0.mint <uvarint64> │ │ │
│ │ │ ├──────────────────────────────────────────┤ │ │ │ │ ├────────────────────────────────────────────┤ │ │
│ │ │ │ c_i.maxt - c_i.mint <uvarint> │ │ │ │ │ │ ref(c_0.data) <uvarint64> │ │ │
│ │ │ ├──────────────────────────────────────────┤ ... │ │ │ │ └────────────────────────────────────────────┘ │ │
│ │ │ │ ref(c_i.data) - ref(c_i-1.data) <varint> │ │ │ │ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ c_i.mint - c_i-1.maxt <uvarint64> │ │ │
│ └──────────────────┴──────────────────────────────────────────────────┘ │ │ │ ├────────────────────────────────────────────┤ │ │
├─────────────────────────────────────────────────────────────────────────┤ │ │ │ c_i.maxt - c_i.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_i.data) - ref(c_i-1.data) <varint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4b> │ CRC32 <4b>
└─────────────────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────────────────
``` ```
@ -176,24 +182,24 @@ The sequence of postings sections is finalized by an [offset table](#offset-tabl
An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded. An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded.
``` ```
┌─────────────────────┬────────────────────┐ ┌─────────────────────┬──────────────────────
│ len <4b>#entries <4b> │ len <4b>#entries <4b>
├─────────────────────┴────────────────────┤ ├─────────────────────┴──────────────────────
│ ┌──────────────────────────────────────┐ │ │ ┌────────────────────────────────────────┐ │
│ │ n = #strs <uvarint> │ │ │ │ n = #strs <uvarint> │ │
│ ├──────────────────────┬───────────────┤ │ │ ├──────────────────────┬─────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │ │ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────┤ │ │ ├──────────────────────┴─────────────────┤ │
│ │ ... │ │ │ │ ... │ │
│ ├──────────────────────┬───────────────┤ │ │ ├──────────────────────┬─────────────────┤ │
│ │ len(str_n) <uvarint> │ str_n <bytes> │ │ │ │ len(str_n) <uvarint> │ str_n <bytes> │ │
│ ├──────────────────────┴───────────────┤ │ │ ├──────────────────────┴─────────────────┤ │
│ │ offset <uvarint> │ │ │ │ offset <uvarint64> │ │
│ └──────────────────────────────────────┘ │ │ └────────────────────────────────────────┘ │
│ . . . │ │ . . . │
├──────────────────────────────────────────┤ ├────────────────────────────────────────────
│ CRC32 <4b> │ CRC32 <4b>
└──────────────────────────────────────────┘ └────────────────────────────────────────────
``` ```

View file

@ -25,7 +25,7 @@ The stones section is 0 padded to a multiple of 4 for fast scans.
# Tombstone # Tombstone
``` ```
┌─────────────┬───────────────┬──────────────┐ ┌─────────────────────────────────┬────────────────┐
│ref <varint> │ mint <varint> │ maxt <varint> │ref <uvarint64> │ mint <varint64> │ maxt <varint64>
└─────────────┴───────────────┴──────────────┘ └─────────────────────────────────┴────────────────┘
``` ```

View file

@ -28,6 +28,7 @@ import (
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal" "github.com/prometheus/tsdb/wal"
) )
@ -352,7 +353,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -472,9 +473,9 @@ func TestDelete_e2e(t *testing.T) {
{"job", "prom-k8s"}, {"job", "prom-k8s"},
}, },
} }
seriesMap := map[string][]Sample{} seriesMap := map[string][]tsdbutil.Sample{}
for _, l := range lbls { for _, l := range lbls {
seriesMap[labels.New(l...).String()] = []Sample{} seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
} }
dir, _ := ioutil.TempDir("", "test") dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -484,7 +485,7 @@ func TestDelete_e2e(t *testing.T) {
app := hb.Appender() app := hb.Appender()
for _, l := range lbls { for _, l := range lbls {
ls := labels.New(l...) ls := labels.New(l...)
series := []Sample{} series := []tsdbutil.Sample{}
ts := rand.Int63n(300) ts := rand.Int63n(300)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
v := rand.Float64() v := rand.Float64()
@ -603,8 +604,8 @@ func boundedSamples(full []sample, mint, maxt int64) []sample {
return full return full
} }
func deletedSamples(full []Sample, dranges Intervals) []Sample { func deletedSamples(full []tsdbutil.Sample, dranges Intervals) []tsdbutil.Sample {
ds := make([]Sample, 0, len(full)) ds := make([]tsdbutil.Sample, 0, len(full))
Outer: Outer:
for _, s := range full { for _, s := range full {
for _, r := range dranges { for _, r := range dranges {

View file

@ -56,58 +56,6 @@ func newMockSeriesSet(list []Series) *mockSeriesSet {
} }
} }
type mockSeries struct {
labels func() labels.Labels
iterator func() SeriesIterator
}
type Sample = tsdbutil.Sample
func newSeries(l map[string]string, s []Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
type listSeriesIterator struct {
list []Sample
idx int
}
func newListSeriesIterator(list []Sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx]
return s.T(), s.V()
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx]
return s.T() >= t
})
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Err() error {
return nil
}
func TestMergedSeriesSet(t *testing.T) { func TestMergedSeriesSet(t *testing.T) {
cases := []struct { cases := []struct {
@ -122,32 +70,32 @@ func TestMergedSeriesSet(t *testing.T) {
a: newMockSeriesSet([]Series{ a: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
b: newMockSeriesSet([]Series{ b: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
exp: newMockSeriesSet([]Series{ exp: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
@ -157,13 +105,13 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 2}, sample{t: 1, v: 2},
}), }),
}), }),
@ -171,13 +119,13 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 1}, sample{t: 2, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "query", "handler": "query",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
}), }),
@ -185,20 +133,20 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 1}, sample{t: 1, v: 1},
sample{t: 2, v: 1}, sample{t: 2, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 1, v: 2}, sample{t: 1, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "query", "handler": "query",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []Sample{ }, []tsdbutil.Sample{
sample{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
}), }),
@ -304,7 +252,7 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
} }
func TestBlockQuerier(t *testing.T) { func TestBlockQuerier(t *testing.T) {
newSeries := func(l map[string]string, s []Sample) Series { newSeries := func(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -392,13 +340,13 @@ func TestBlockQuerier(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, },
[]Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}}, []tsdbutil.Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}}, []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
), ),
}), }),
}, },
@ -442,7 +390,7 @@ Outer:
} }
func TestBlockQuerierDelete(t *testing.T) { func TestBlockQuerierDelete(t *testing.T) {
newSeries := func(l map[string]string, s []Sample) Series { newSeries := func(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -517,13 +465,13 @@ func TestBlockQuerierDelete(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, },
[]Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, []tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{4, 15}, sample{5, 3}}, []tsdbutil.Sample{sample{4, 15}, sample{5, 3}},
), ),
}), }),
}, },
@ -536,12 +484,12 @@ func TestBlockQuerierDelete(t *testing.T) {
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{4, 15}, sample{5, 3}}, []tsdbutil.Sample{sample{4, 15}, sample{5, 3}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, },
[]Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}}, []tsdbutil.Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}},
), ),
}), }),
}, },
@ -554,7 +502,7 @@ func TestBlockQuerierDelete(t *testing.T) {
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]Sample{sample{4, 15}}, []tsdbutil.Sample{sample{4, 15}},
), ),
}), }),
}, },
@ -709,66 +657,66 @@ func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func TestSeriesIterator(t *testing.T) { func TestSeriesIterator(t *testing.T) {
itcases := []struct { itcases := []struct {
a, b, c []Sample a, b, c []tsdbutil.Sample
exp []Sample exp []tsdbutil.Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{}, c: []tsdbutil.Sample{},
exp: []Sample{}, exp: []tsdbutil.Sample{},
mint: math.MinInt64, mint: math.MinInt64,
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{1, 2}, sample{1, 2},
sample{2, 3}, sample{2, 3},
sample{3, 5}, sample{3, 5},
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
exp: []Sample{ exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: math.MinInt64, mint: math.MinInt64,
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{ b: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
exp: []Sample{ exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: 2, mint: 2,
maxt: 8, maxt: 8,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
exp: []Sample{ exp: []tsdbutil.Sample{
sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493},
}, },
mint: 6, mint: 6,
@ -777,29 +725,29 @@ func TestSeriesIterator(t *testing.T) {
} }
seekcases := []struct { seekcases := []struct {
a, b, c []Sample a, b, c []tsdbutil.Sample
seek int64 seek int64
success bool success bool
exp []Sample exp []tsdbutil.Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{}, c: []tsdbutil.Sample{},
seek: 0, seek: 0,
success: false, success: false,
exp: nil, exp: nil,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{2, 3}, sample{2, 3},
}, },
b: []Sample{}, b: []tsdbutil.Sample{},
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
@ -810,55 +758,55 @@ func TestSeriesIterator(t *testing.T) {
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []Sample{}, a: []tsdbutil.Sample{},
b: []Sample{ b: []tsdbutil.Sample{
sample{1, 2}, sample{3, 5}, sample{6, 1}, sample{1, 2}, sample{3, 5}, sample{6, 1},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{7, 89}, sample{9, 8}, sample{7, 89}, sample{9, 8},
}, },
seek: 2, seek: 2,
success: true, success: true,
exp: []Sample{ exp: []tsdbutil.Sample{
sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: 5, mint: 5,
maxt: 8, maxt: 8,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 10, seek: 10,
success: true, success: true,
exp: []Sample{ exp: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
mint: 10, mint: 10,
maxt: 203, maxt: 203,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 203, seek: 203,
success: true, success: true,
exp: []Sample{ exp: []tsdbutil.Sample{
sample{203, 3493}, sample{203, 3493},
}, },
mint: 7, mint: 7,
@ -875,10 +823,10 @@ func TestSeriesIterator(t *testing.T) {
} }
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
smplValid := make([]Sample, 0) smplValid := make([]tsdbutil.Sample, 0)
for _, s := range tc.exp { for _, s := range tc.exp {
if s.T() >= tc.mint && s.T() <= tc.maxt { if s.T() >= tc.mint && s.T() <= tc.maxt {
smplValid = append(smplValid, Sample(s)) smplValid = append(smplValid, tsdbutil.Sample(s))
} }
} }
exp := newListSeriesIterator(smplValid) exp := newListSeriesIterator(smplValid)
@ -892,22 +840,22 @@ func TestSeriesIterator(t *testing.T) {
t.Run("Seek", func(t *testing.T) { t.Run("Seek", func(t *testing.T) {
extra := []struct { extra := []struct {
a, b, c []Sample a, b, c []tsdbutil.Sample
seek int64 seek int64
success bool success bool
exp []Sample exp []tsdbutil.Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
@ -918,19 +866,19 @@ func TestSeriesIterator(t *testing.T) {
maxt: 202, maxt: 202,
}, },
{ {
a: []Sample{ a: []tsdbutil.Sample{
sample{6, 1}, sample{6, 1},
}, },
b: []Sample{ b: []tsdbutil.Sample{
sample{9, 8}, sample{9, 8},
}, },
c: []Sample{ c: []tsdbutil.Sample{
sample{10, 22}, sample{203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 5, seek: 5,
success: true, success: true,
exp: []Sample{sample{10, 22}}, exp: []tsdbutil.Sample{sample{10, 22}},
mint: 10, mint: 10,
maxt: 202, maxt: 202,
}, },
@ -946,10 +894,10 @@ func TestSeriesIterator(t *testing.T) {
} }
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
smplValid := make([]Sample, 0) smplValid := make([]tsdbutil.Sample, 0)
for _, s := range tc.exp { for _, s := range tc.exp {
if s.T() >= tc.mint && s.T() <= tc.maxt { if s.T() >= tc.mint && s.T() <= tc.maxt {
smplValid = append(smplValid, Sample(s)) smplValid = append(smplValid, tsdbutil.Sample(s))
} }
} }
exp := newListSeriesIterator(smplValid) exp := newListSeriesIterator(smplValid)
@ -982,7 +930,7 @@ func TestSeriesIterator(t *testing.T) {
itSeries{newListSeriesIterator(tc.c)} itSeries{newListSeriesIterator(tc.c)}
res := newChainedSeriesIterator(a, b, c) res := newChainedSeriesIterator(a, b, c)
exp := newListSeriesIterator([]Sample(tc.exp)) exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp))
smplExp, errExp := expandSeriesIterator(exp) smplExp, errExp := expandSeriesIterator(exp)
smplRes, errRes := expandSeriesIterator(res) smplRes, errRes := expandSeriesIterator(res)
@ -1025,9 +973,9 @@ func TestSeriesIterator(t *testing.T) {
// Regression for: https://github.com/prometheus/tsdb/pull/97 // Regression for: https://github.com/prometheus/tsdb/pull/97
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
chkMetas := []chunks.Meta{ chkMetas := []chunks.Meta{
tsdbutil.ChunkFromSamples([]Sample{}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}),
tsdbutil.ChunkFromSamples([]Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
tsdbutil.ChunkFromSamples([]Sample{sample{4, 4}, sample{5, 5}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 4}, sample{5, 5}}),
} }
res := newChunkSeriesIterator(chkMetas, nil, 2, 8) res := newChunkSeriesIterator(chkMetas, nil, 2, 8)
@ -1042,9 +990,9 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
// skipped to the end when seeking a value in the current chunk. // skipped to the end when seeking a value in the current chunk.
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
metas := []chunks.Meta{ metas := []chunks.Meta{
tsdbutil.ChunkFromSamples([]Sample{}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}),
tsdbutil.ChunkFromSamples([]Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}),
tsdbutil.ChunkFromSamples([]Sample{}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}),
} }
it := newChunkSeriesIterator(metas, nil, 1, 7) it := newChunkSeriesIterator(metas, nil, 1, 7)
@ -1064,7 +1012,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
// Seek gets called and advances beyond the max time, which was just accepted as a valid sample. // Seek gets called and advances beyond the max time, which was just accepted as a valid sample.
func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
metas := []chunks.Meta{ metas := []chunks.Meta{
tsdbutil.ChunkFromSamples([]Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}),
} }
it := newChunkSeriesIterator(metas, nil, 2, 4) it := newChunkSeriesIterator(metas, nil, 2, 4)
@ -1211,7 +1159,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted") dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err) testutil.Ok(b, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil)
testutil.Ok(b, err) testutil.Ok(b, err)
defer block.Close() defer block.Close()
@ -1441,3 +1389,53 @@ func (m mockIndex) LabelNames() ([]string, error) {
sort.Strings(labelNames) sort.Strings(labelNames)
return labelNames, nil return labelNames, nil
} }
type mockSeries struct {
labels func() labels.Labels
iterator func() SeriesIterator
}
func newSeries(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
type listSeriesIterator struct {
list []tsdbutil.Sample
idx int
}
func newListSeriesIterator(list []tsdbutil.Sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx]
return s.T(), s.V()
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx]
return s.T() >= t
})
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Err() error {
return nil
}