mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
commit
3a5ae6b1a4
31
Documentation/format/tombstones.md
Normal file
31
Documentation/format/tombstones.md
Normal file
|
@ -0,0 +1,31 @@
|
|||
# Tombstones Disk Format
|
||||
|
||||
The following describes the format of a tombstones file, which is placed
|
||||
at the top level directory of a block.
|
||||
|
||||
The last 8 bytes specifies the offset to the start of Stones section.
|
||||
The stones section is 0 padded to a multiple of 4 for fast scans.
|
||||
|
||||
```
|
||||
┌────────────────────────────┬─────────────────────┐
|
||||
│ magic(0x130BA30) <4b> │ version(1) <1 byte> │
|
||||
├────────────────────────────┴─────────────────────┤
|
||||
│ ┌──────────────────────────────────────────────┐ │
|
||||
│ │ Tombstone 1 │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ ... │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Tombstone N │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ CRC<4b> │ │
|
||||
│ └──────────────────────────────────────────────┘ │
|
||||
└──────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
# Tombstone
|
||||
|
||||
```
|
||||
┌─────────────┬───────────────┬──────────────┐
|
||||
│ref <varint> │ mint <varint> │ maxt <varint>│
|
||||
└─────────────┴───────────────┴──────────────┘
|
||||
```
|
104
block.go
104
block.go
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
// DiskBlock handles reads against a Block of time series data.
|
||||
|
@ -37,6 +38,12 @@ type DiskBlock interface {
|
|||
// Chunks returns a ChunkReader over the block's data.
|
||||
Chunks() ChunkReader
|
||||
|
||||
// Tombstones returns a TombstoneReader over the block's deleted data.
|
||||
Tombstones() TombstoneReader
|
||||
|
||||
// Delete deletes data from the block.
|
||||
Delete(mint, maxt int64, ms ...labels.Matcher) error
|
||||
|
||||
// Close releases all underlying resources of the block.
|
||||
Close() error
|
||||
}
|
||||
|
@ -79,9 +86,10 @@ type BlockMeta struct {
|
|||
|
||||
// Stats about the contents of the block.
|
||||
Stats struct {
|
||||
NumSamples uint64 `json:"numSamples,omitempty"`
|
||||
NumSeries uint64 `json:"numSeries,omitempty"`
|
||||
NumChunks uint64 `json:"numChunks,omitempty"`
|
||||
NumSamples uint64 `json:"numSamples,omitempty"`
|
||||
NumSeries uint64 `json:"numSeries,omitempty"`
|
||||
NumChunks uint64 `json:"numChunks,omitempty"`
|
||||
NumTombstones uint64 `json:"numTombstones,omitempty"`
|
||||
} `json:"stats,omitempty"`
|
||||
|
||||
// Information on compactions the block was created from.
|
||||
|
@ -150,6 +158,8 @@ type persistedBlock struct {
|
|||
|
||||
chunkr *chunkReader
|
||||
indexr *indexReader
|
||||
|
||||
tombstones tombstoneReader
|
||||
}
|
||||
|
||||
func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||
|
@ -167,11 +177,17 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
tr, err := readTombstones(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pb := &persistedBlock{
|
||||
dir: dir,
|
||||
meta: *meta,
|
||||
chunkr: cr,
|
||||
indexr: ir,
|
||||
dir: dir,
|
||||
meta: *meta,
|
||||
chunkr: cr,
|
||||
indexr: ir,
|
||||
tombstones: tr,
|
||||
}
|
||||
return pb, nil
|
||||
}
|
||||
|
@ -191,21 +207,85 @@ func (pb *persistedBlock) String() string {
|
|||
|
||||
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
|
||||
return &blockQuerier{
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
index: pb.Index(),
|
||||
chunks: pb.Chunks(),
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
index: pb.Index(),
|
||||
chunks: pb.Chunks(),
|
||||
tombstones: pb.Tombstones(),
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *persistedBlock) Dir() string { return pb.dir }
|
||||
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
||||
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
||||
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
||||
func (pb *persistedBlock) Tombstones() TombstoneReader {
|
||||
return pb.tombstones
|
||||
}
|
||||
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
||||
|
||||
func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||
pr := newPostingsReader(pb.indexr)
|
||||
p, absent := pr.Select(ms...)
|
||||
|
||||
ir := pb.indexr
|
||||
|
||||
// Choose only valid postings which have chunks in the time-range.
|
||||
stones := map[uint32]intervals{}
|
||||
|
||||
Outer:
|
||||
for p.Next() {
|
||||
lset, chunks, err := ir.Series(p.At())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, abs := range absent {
|
||||
if lset.Get(abs) != "" {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
for _, chk := range chunks {
|
||||
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
||||
// Delete only until the current vlaues and not beyond.
|
||||
tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime)
|
||||
stones[p.At()] = intervals{{tmin, tmax}}
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if p.Err() != nil {
|
||||
return p.Err()
|
||||
}
|
||||
|
||||
// Merge the current and new tombstones.
|
||||
for k, v := range stones {
|
||||
pb.tombstones.add(k, v[0])
|
||||
}
|
||||
|
||||
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
|
||||
return writeMetaFile(pb.dir, &pb.meta)
|
||||
}
|
||||
|
||||
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||
func walDir(dir string) string { return filepath.Join(dir, "wal") }
|
||||
|
||||
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
|
||||
if a < mint {
|
||||
a = mint
|
||||
}
|
||||
if b > maxt {
|
||||
b = maxt
|
||||
}
|
||||
|
||||
return a, b
|
||||
}
|
||||
|
||||
type mmapFile struct {
|
||||
f *os.File
|
||||
b []byte
|
||||
|
|
14
block_test.go
Normal file
14
block_test.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
40
chunks.go
40
chunks.go
|
@ -54,6 +54,46 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
|
||||
// returned.
|
||||
type deletedIterator struct {
|
||||
it chunks.Iterator
|
||||
|
||||
intervals intervals
|
||||
}
|
||||
|
||||
func (it *deletedIterator) At() (int64, float64) {
|
||||
return it.it.At()
|
||||
}
|
||||
|
||||
func (it *deletedIterator) Next() bool {
|
||||
Outer:
|
||||
for it.it.Next() {
|
||||
ts, _ := it.it.At()
|
||||
|
||||
for _, tr := range it.intervals {
|
||||
if tr.inBounds(ts) {
|
||||
continue Outer
|
||||
}
|
||||
|
||||
if ts > tr.maxt {
|
||||
it.intervals = it.intervals[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (it *deletedIterator) Err() error {
|
||||
return it.it.Err()
|
||||
}
|
||||
|
||||
// ChunkWriter serializes a time block of chunked series data.
|
||||
type ChunkWriter interface {
|
||||
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
|
||||
|
|
|
@ -14,8 +14,12 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockChunkReader map[uint64]chunks.Chunk
|
||||
|
@ -32,3 +36,63 @@ func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
|||
func (cr mockChunkReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDeletedIterator(t *testing.T) {
|
||||
chk := chunks.NewXORChunk()
|
||||
app, err := chk.Appender()
|
||||
require.NoError(t, err)
|
||||
// Insert random stuff from (0, 1000).
|
||||
act := make([]sample, 1000)
|
||||
for i := 0; i < 1000; i++ {
|
||||
act[i].t = int64(i)
|
||||
act[i].v = rand.Float64()
|
||||
app.Append(act[i].t, act[i].v)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
r intervals
|
||||
}{
|
||||
{r: intervals{{1, 20}}},
|
||||
{r: intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}},
|
||||
{r: intervals{{1, 10}, {12, 20}, {20, 30}}},
|
||||
{r: intervals{{1, 10}, {12, 23}, {25, 30}}},
|
||||
{r: intervals{{1, 23}, {12, 20}, {25, 30}}},
|
||||
{r: intervals{{1, 23}, {12, 20}, {25, 3000}}},
|
||||
{r: intervals{{0, 2000}}},
|
||||
{r: intervals{{500, 2000}}},
|
||||
{r: intervals{{0, 200}}},
|
||||
{r: intervals{{1000, 20000}}},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
i := int64(-1)
|
||||
it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]}
|
||||
ranges := c.r[:]
|
||||
for it.Next() {
|
||||
i++
|
||||
for _, tr := range ranges {
|
||||
if tr.inBounds(i) {
|
||||
i = tr.maxt + 1
|
||||
ranges = ranges[1:]
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, i < 1000)
|
||||
|
||||
ts, v := it.At()
|
||||
require.Equal(t, act[i].t, ts)
|
||||
require.Equal(t, act[i].v, v)
|
||||
}
|
||||
// There has been an extra call to Next().
|
||||
i++
|
||||
for _, tr := range ranges {
|
||||
if tr.inBounds(i) {
|
||||
i = tr.maxt + 1
|
||||
ranges = ranges[1:]
|
||||
}
|
||||
}
|
||||
|
||||
require.False(t, i < 1000)
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
}
|
||||
|
|
109
compact.go
109
compact.go
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
|
@ -262,6 +263,11 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
|||
return errors.Wrap(err, "close index writer")
|
||||
}
|
||||
|
||||
// Create an empty tombstones file.
|
||||
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
||||
return errors.Wrap(err, "write new tombstones file")
|
||||
}
|
||||
|
||||
// Block successfully written, make visible and remove old ones.
|
||||
if err := renameFile(tmp, dir); err != nil {
|
||||
return errors.Wrap(err, "rename block dir")
|
||||
|
@ -293,7 +299,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
|
||||
s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
|
||||
|
||||
if i == 0 {
|
||||
set = s
|
||||
|
@ -314,14 +320,36 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
|
|||
)
|
||||
|
||||
for set.Next() {
|
||||
lset, chunks := set.At()
|
||||
if err := chunkw.WriteChunks(chunks...); err != nil {
|
||||
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
||||
|
||||
if len(dranges) > 0 {
|
||||
// Re-encode the chunk to not have deleted values.
|
||||
for _, chk := range chks {
|
||||
if intervalOverlap(dranges[0].mint, dranges[len(dranges)-1].maxt, chk.MinTime, chk.MaxTime) {
|
||||
newChunk := chunks.NewXORChunk()
|
||||
app, err := newChunk.Appender()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
|
||||
for it.Next() {
|
||||
ts, v := it.At()
|
||||
app.Append(ts, v)
|
||||
}
|
||||
|
||||
chk.Chunk = newChunk
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := chunkw.WriteChunks(chks...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
indexw.AddSeries(i, lset, chunks...)
|
||||
indexw.AddSeries(i, lset, chks...)
|
||||
|
||||
meta.Stats.NumChunks += uint64(len(chunks))
|
||||
meta.Stats.NumChunks += uint64(len(chks))
|
||||
meta.Stats.NumSeries++
|
||||
|
||||
for _, l := range lset {
|
||||
|
@ -371,25 +399,28 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
|
|||
|
||||
type compactionSet interface {
|
||||
Next() bool
|
||||
At() (labels.Labels, []*ChunkMeta)
|
||||
At() (labels.Labels, []*ChunkMeta, intervals)
|
||||
Err() error
|
||||
}
|
||||
|
||||
type compactionSeriesSet struct {
|
||||
p Postings
|
||||
index IndexReader
|
||||
chunks ChunkReader
|
||||
p Postings
|
||||
index IndexReader
|
||||
chunks ChunkReader
|
||||
tombstones TombstoneReader
|
||||
|
||||
l labels.Labels
|
||||
c []*ChunkMeta
|
||||
err error
|
||||
l labels.Labels
|
||||
c []*ChunkMeta
|
||||
intervals intervals
|
||||
err error
|
||||
}
|
||||
|
||||
func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet {
|
||||
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p Postings) *compactionSeriesSet {
|
||||
return &compactionSeriesSet{
|
||||
index: i,
|
||||
chunks: c,
|
||||
p: p,
|
||||
index: i,
|
||||
chunks: c,
|
||||
tombstones: t,
|
||||
p: p,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -398,10 +429,25 @@ func (c *compactionSeriesSet) Next() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
c.intervals = c.tombstones.Get(c.p.At())
|
||||
|
||||
c.l, c.c, c.err = c.index.Series(c.p.At())
|
||||
if c.err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Remove completely deleted chunks.
|
||||
if len(c.intervals) > 0 {
|
||||
chks := make([]*ChunkMeta, 0, len(c.c))
|
||||
for _, chk := range c.c {
|
||||
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
|
||||
chks = append(chks, chk)
|
||||
}
|
||||
}
|
||||
|
||||
c.c = chks
|
||||
}
|
||||
|
||||
for _, chk := range c.c {
|
||||
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
|
||||
if c.err != nil {
|
||||
|
@ -419,16 +465,17 @@ func (c *compactionSeriesSet) Err() error {
|
|||
return c.p.Err()
|
||||
}
|
||||
|
||||
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta) {
|
||||
return c.l, c.c
|
||||
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||
return c.l, c.c, c.intervals
|
||||
}
|
||||
|
||||
type compactionMerger struct {
|
||||
a, b compactionSet
|
||||
|
||||
aok, bok bool
|
||||
l labels.Labels
|
||||
c []*ChunkMeta
|
||||
aok, bok bool
|
||||
l labels.Labels
|
||||
c []*ChunkMeta
|
||||
intervals intervals
|
||||
}
|
||||
|
||||
type compactionSeries struct {
|
||||
|
@ -456,8 +503,8 @@ func (c *compactionMerger) compare() int {
|
|||
if !c.bok {
|
||||
return -1
|
||||
}
|
||||
a, _ := c.a.At()
|
||||
b, _ := c.b.At()
|
||||
a, _, _ := c.a.At()
|
||||
b, _, _ := c.b.At()
|
||||
return labels.Compare(a, b)
|
||||
}
|
||||
|
||||
|
@ -469,17 +516,21 @@ func (c *compactionMerger) Next() bool {
|
|||
d := c.compare()
|
||||
// Both sets contain the current series. Chain them into a single one.
|
||||
if d > 0 {
|
||||
c.l, c.c = c.b.At()
|
||||
c.l, c.c, c.intervals = c.b.At()
|
||||
c.bok = c.b.Next()
|
||||
} else if d < 0 {
|
||||
c.l, c.c = c.a.At()
|
||||
c.l, c.c, c.intervals = c.a.At()
|
||||
c.aok = c.a.Next()
|
||||
} else {
|
||||
l, ca := c.a.At()
|
||||
_, cb := c.b.At()
|
||||
l, ca, ra := c.a.At()
|
||||
_, cb, rb := c.b.At()
|
||||
for _, r := range rb {
|
||||
ra = ra.add(r)
|
||||
}
|
||||
|
||||
c.l = l
|
||||
c.c = append(ca, cb...)
|
||||
c.intervals = ra
|
||||
|
||||
c.aok = c.a.Next()
|
||||
c.bok = c.b.Next()
|
||||
|
@ -494,8 +545,8 @@ func (c *compactionMerger) Err() error {
|
|||
return c.b.Err()
|
||||
}
|
||||
|
||||
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta) {
|
||||
return c.l, c.c
|
||||
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||
return c.l, c.c, c.intervals
|
||||
}
|
||||
|
||||
func renameFile(from, to string) error {
|
||||
|
|
40
db.go
40
db.go
|
@ -119,6 +119,9 @@ type DB struct {
|
|||
compactc chan struct{}
|
||||
donec chan struct{}
|
||||
stopc chan struct{}
|
||||
|
||||
// cmtx is used to control compactions and deletions.
|
||||
cmtx sync.Mutex
|
||||
}
|
||||
|
||||
type dbMetrics struct {
|
||||
|
@ -296,6 +299,9 @@ func (db *DB) retentionCutoff() (bool, error) {
|
|||
}
|
||||
|
||||
func (db *DB) compact() (changes bool, err error) {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
db.headmtx.RLock()
|
||||
|
||||
// Check whether we have pending head blocks that are ready to be persisted.
|
||||
|
@ -461,6 +467,7 @@ func (db *DB) reloadBlocks() (err error) {
|
|||
if err := validateBlockSequence(blocks); err != nil {
|
||||
return errors.Wrap(err, "invalid block sequence")
|
||||
}
|
||||
|
||||
// Close all opened blocks that no longer exist after we returned all locks.
|
||||
for _, b := range db.blocks {
|
||||
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||
|
@ -707,6 +714,30 @@ func (a *dbAppender) Rollback() error {
|
|||
return g.Wait()
|
||||
}
|
||||
|
||||
// Delete implements deletion of metrics.
|
||||
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
db.mtx.Lock()
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
blocks := db.blocksForInterval(mint, maxt)
|
||||
|
||||
var g errgroup.Group
|
||||
|
||||
for _, b := range blocks {
|
||||
g.Go(func(b Block) func() error {
|
||||
return func() error { return b.Delete(mint, maxt, ms...) }
|
||||
}(b))
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||
func (db *DB) appendable() (r []headBlock) {
|
||||
switch len(db.heads) {
|
||||
|
@ -720,13 +751,8 @@ func (db *DB) appendable() (r []headBlock) {
|
|||
}
|
||||
|
||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||
if bmin >= amin && bmin <= amax {
|
||||
return true
|
||||
}
|
||||
if amin >= bmin && amin <= bmax {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
||||
return amin <= bmax && bmin <= amax
|
||||
}
|
||||
|
||||
func intervalContains(min, max, t int64) bool {
|
||||
|
|
75
db_test.go
75
db_test.go
|
@ -15,6 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
|
@ -141,3 +142,77 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
err = app2.AddFast(string(refb), 1, 1)
|
||||
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||
}
|
||||
|
||||
func TestDeleteSimple(t *testing.T) {
|
||||
numSamples := int64(10)
|
||||
|
||||
tmpdir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
db, err := Open(tmpdir, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
app := db.Appender()
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
cases := []struct {
|
||||
intervals intervals
|
||||
remaint []int64
|
||||
}{
|
||||
{
|
||||
intervals: intervals{{1, 3}, {4, 7}},
|
||||
remaint: []int64{0, 8, 9},
|
||||
},
|
||||
}
|
||||
|
||||
Outer:
|
||||
for _, c := range cases {
|
||||
// TODO(gouthamve): Reset the tombstones somehow.
|
||||
// Delete the ranges.
|
||||
for _, r := range c.intervals {
|
||||
require.NoError(t, db.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b")))
|
||||
}
|
||||
|
||||
// Compare the result.
|
||||
q := db.Querier(0, numSamples)
|
||||
res := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
|
||||
expSamples := make([]sample, 0, len(c.remaint))
|
||||
for _, ts := range c.remaint {
|
||||
expSamples = append(expSamples, sample{ts, smpls[ts]})
|
||||
}
|
||||
|
||||
expss := newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{"a": "b"}, expSamples),
|
||||
})
|
||||
|
||||
if len(expSamples) == 0 {
|
||||
require.False(t, res.Next())
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
eok, rok := expss.Next(), res.Next()
|
||||
require.Equal(t, eok, rok, "next")
|
||||
|
||||
if !eok {
|
||||
continue Outer
|
||||
}
|
||||
sexp := expss.At()
|
||||
sres := res.At()
|
||||
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||
|
||||
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||
|
||||
require.Equal(t, errExp, errRes, "samples error")
|
||||
require.Equal(t, smplExp, smplRes, "samples")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
|
|||
|
||||
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
|
||||
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
|
||||
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
|
||||
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
|
||||
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
|
||||
|
||||
|
@ -71,8 +72,10 @@ type decbuf struct {
|
|||
e error
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
|
@ -140,6 +143,19 @@ func (d *decbuf) be32() uint32 {
|
|||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) byte() byte {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := d.b[0]
|
||||
d.b = d.b[1:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) decbuf(l int) decbuf {
|
||||
if d.e != nil {
|
||||
return decbuf{e: d.e}
|
||||
|
|
113
head.go
113
head.go
|
@ -69,6 +69,8 @@ type HeadBlock struct {
|
|||
values map[string]stringset // label names to possible values
|
||||
postings *memPostings // postings lists for terms
|
||||
|
||||
tombstones tombstoneReader
|
||||
|
||||
meta BlockMeta
|
||||
}
|
||||
|
||||
|
@ -97,6 +99,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
|
|||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return dir, renameFile(tmp, dir)
|
||||
}
|
||||
|
||||
|
@ -108,13 +111,14 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
|||
}
|
||||
|
||||
h := &HeadBlock{
|
||||
dir: dir,
|
||||
wal: wal,
|
||||
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
||||
hashes: map[uint64][]*memSeries{},
|
||||
values: map[string]stringset{},
|
||||
postings: &memPostings{m: make(map[term][]uint32)},
|
||||
meta: *meta,
|
||||
dir: dir,
|
||||
wal: wal,
|
||||
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
||||
hashes: map[uint64][]*memSeries{},
|
||||
values: map[string]stringset{},
|
||||
postings: &memPostings{m: make(map[term][]uint32)},
|
||||
meta: *meta,
|
||||
tombstones: newEmptyTombstoneReader(),
|
||||
}
|
||||
return h, h.init()
|
||||
}
|
||||
|
@ -122,16 +126,19 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
|||
func (h *HeadBlock) init() error {
|
||||
r := h.wal.Reader()
|
||||
|
||||
for r.Next() {
|
||||
series, samples := r.At()
|
||||
|
||||
seriesFunc := func(series []labels.Labels) error {
|
||||
for _, lset := range series {
|
||||
h.create(lset.Hash(), lset)
|
||||
h.meta.Stats.NumSeries++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
samplesFunc := func(samples []RefSample) error {
|
||||
for _, s := range samples {
|
||||
if int(s.Ref) >= len(h.series) {
|
||||
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
|
||||
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore",
|
||||
s.Ref, len(h.series))
|
||||
}
|
||||
h.series[s.Ref].append(s.T, s.V)
|
||||
|
||||
|
@ -140,8 +147,24 @@ func (h *HeadBlock) init() error {
|
|||
}
|
||||
h.meta.Stats.NumSamples++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(r.Err(), "consume WAL")
|
||||
deletesFunc := func(stones []Stone) error {
|
||||
for _, s := range stones {
|
||||
for _, itv := range s.intervals {
|
||||
h.tombstones.add(s.ref, itv)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
||||
return errors.Wrap(err, "consume WAL")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// inBounds returns true if the given timestamp is within the valid
|
||||
|
@ -195,6 +218,50 @@ func (h *HeadBlock) Meta() BlockMeta {
|
|||
return m
|
||||
}
|
||||
|
||||
// Tombstones returns the TombstoneReader against the block.
|
||||
func (h *HeadBlock) Tombstones() TombstoneReader {
|
||||
return h.tombstones
|
||||
}
|
||||
|
||||
// Delete implements headBlock.
|
||||
func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error {
|
||||
ir := h.Index()
|
||||
|
||||
pr := newPostingsReader(ir)
|
||||
p, absent := pr.Select(ms...)
|
||||
|
||||
var stones []Stone
|
||||
|
||||
Outer:
|
||||
for p.Next() {
|
||||
ref := p.At()
|
||||
lset := h.series[ref].lset
|
||||
for _, abs := range absent {
|
||||
if lset.Get(abs) != "" {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
// Delete only until the current values and not beyond.
|
||||
tmin, tmax := clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime)
|
||||
stones = append(stones, Stone{ref, intervals{{tmin, tmax}}})
|
||||
}
|
||||
|
||||
if p.Err() != nil {
|
||||
return p.Err()
|
||||
}
|
||||
if err := h.wal.LogDeletes(stones); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range stones {
|
||||
h.tombstones.add(s.ref, s.intervals[0])
|
||||
}
|
||||
|
||||
h.meta.Stats.NumTombstones = uint64(len(h.tombstones))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dir returns the directory of the block.
|
||||
func (h *HeadBlock) Dir() string { return h.dir }
|
||||
|
||||
|
@ -217,10 +284,12 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
|||
series := h.series[:]
|
||||
|
||||
return &blockQuerier{
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
index: h.Index(),
|
||||
chunks: h.Chunks(),
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
index: h.Index(),
|
||||
chunks: h.Chunks(),
|
||||
tombstones: h.Tombstones(),
|
||||
|
||||
postingsMapper: func(p Postings) Postings {
|
||||
ep := make([]uint32, 0, 64)
|
||||
|
||||
|
@ -423,6 +492,7 @@ func (a *headAppender) createSeries() {
|
|||
func (a *headAppender) Commit() error {
|
||||
defer atomic.AddUint64(&a.activeWriters, ^uint64(0))
|
||||
defer putHeadAppendBuffer(a.samples)
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
a.createSeries()
|
||||
|
||||
|
@ -436,9 +506,11 @@ func (a *headAppender) Commit() error {
|
|||
|
||||
// Write all new series and samples to the WAL and add it to the
|
||||
// in-mem database on success.
|
||||
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
|
||||
a.mtx.RUnlock()
|
||||
return err
|
||||
if err := a.wal.LogSeries(a.newLabels); err != nil {
|
||||
return errors.Wrap(err, "WAL log series")
|
||||
}
|
||||
if err := a.wal.LogSamples(a.samples); err != nil {
|
||||
return errors.Wrap(err, "WAL log samples")
|
||||
}
|
||||
|
||||
total := uint64(len(a.samples))
|
||||
|
@ -449,8 +521,6 @@ func (a *headAppender) Commit() error {
|
|||
}
|
||||
}
|
||||
|
||||
a.mtx.RUnlock()
|
||||
|
||||
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
|
||||
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))
|
||||
|
||||
|
@ -538,6 +608,7 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
|
|||
if int(ref) >= len(h.series) {
|
||||
return nil, nil, ErrNotFound
|
||||
}
|
||||
|
||||
s := h.series[ref]
|
||||
if s == nil {
|
||||
return nil, nil, ErrNotFound
|
||||
|
|
337
head_test.go
337
head_test.go
|
@ -36,6 +36,10 @@ func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock
|
|||
dir, err := TouchHeadBlock(dir, mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
return openTestHeadBlock(t, dir)
|
||||
}
|
||||
|
||||
func openTestHeadBlock(t testing.TB, dir string) *HeadBlock {
|
||||
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -378,6 +382,323 @@ func TestHeadBlock_e2e(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
func TestHBDeleteSimple(t *testing.T) {
|
||||
numSamples := int64(10)
|
||||
|
||||
dir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
hb := createTestHeadBlock(t, dir, 0, numSamples)
|
||||
app := hb.Appender()
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
cases := []struct {
|
||||
intervals intervals
|
||||
remaint []int64
|
||||
}{
|
||||
{
|
||||
intervals: intervals{{0, 3}},
|
||||
remaint: []int64{4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
intervals: intervals{{1, 3}},
|
||||
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
intervals: intervals{{1, 3}, {4, 7}},
|
||||
remaint: []int64{0, 8, 9},
|
||||
},
|
||||
{
|
||||
intervals: intervals{{1, 3}, {4, 700}},
|
||||
remaint: []int64{0},
|
||||
},
|
||||
{
|
||||
intervals: intervals{{0, 9}},
|
||||
remaint: []int64{},
|
||||
},
|
||||
}
|
||||
|
||||
Outer:
|
||||
for _, c := range cases {
|
||||
// Reset the tombstones.
|
||||
hb.tombstones = newEmptyTombstoneReader()
|
||||
|
||||
// Delete the ranges.
|
||||
for _, r := range c.intervals {
|
||||
require.NoError(t, hb.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b")))
|
||||
}
|
||||
|
||||
// Compare the result.
|
||||
q := hb.Querier(0, numSamples)
|
||||
res := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
|
||||
expSamples := make([]sample, 0, len(c.remaint))
|
||||
for _, ts := range c.remaint {
|
||||
expSamples = append(expSamples, sample{ts, smpls[ts]})
|
||||
}
|
||||
|
||||
expss := newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{"a": "b"}, expSamples),
|
||||
})
|
||||
|
||||
if len(expSamples) == 0 {
|
||||
require.False(t, res.Next())
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
eok, rok := expss.Next(), res.Next()
|
||||
require.Equal(t, eok, rok, "next")
|
||||
|
||||
if !eok {
|
||||
continue Outer
|
||||
}
|
||||
sexp := expss.At()
|
||||
sres := res.At()
|
||||
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||
|
||||
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||
|
||||
require.Equal(t, errExp, errRes, "samples error")
|
||||
require.Equal(t, smplExp, smplRes, "samples")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteUntilCurMax(t *testing.T) {
|
||||
numSamples := int64(10)
|
||||
|
||||
dir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
hb := createTestHeadBlock(t, dir, 0, 2*numSamples)
|
||||
app := hb.Appender()
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b")))
|
||||
app = hb.Appender()
|
||||
_, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
q := hb.Querier(0, 100000)
|
||||
res := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
|
||||
require.True(t, res.Next())
|
||||
exps := res.At()
|
||||
it := exps.Iterator()
|
||||
ressmpls, err := expandSeriesIterator(it)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{11, 1}}, ressmpls)
|
||||
}
|
||||
|
||||
func TestDelete_e2e(t *testing.T) {
|
||||
numDatapoints := 1000
|
||||
numRanges := 1000
|
||||
timeInterval := int64(2)
|
||||
maxTime := int64(2 * 1000)
|
||||
minTime := int64(200)
|
||||
// Create 8 series with 1000 data-points of different ranges, delete and run queries.
|
||||
lbls := [][]labels.Label{
|
||||
{
|
||||
{"a", "b"},
|
||||
{"instance", "localhost:9090"},
|
||||
{"job", "prometheus"},
|
||||
},
|
||||
{
|
||||
{"a", "b"},
|
||||
{"instance", "127.0.0.1:9090"},
|
||||
{"job", "prometheus"},
|
||||
},
|
||||
{
|
||||
{"a", "b"},
|
||||
{"instance", "127.0.0.1:9090"},
|
||||
{"job", "prom-k8s"},
|
||||
},
|
||||
{
|
||||
{"a", "b"},
|
||||
{"instance", "localhost:9090"},
|
||||
{"job", "prom-k8s"},
|
||||
},
|
||||
{
|
||||
{"a", "c"},
|
||||
{"instance", "localhost:9090"},
|
||||
{"job", "prometheus"},
|
||||
},
|
||||
{
|
||||
{"a", "c"},
|
||||
{"instance", "127.0.0.1:9090"},
|
||||
{"job", "prometheus"},
|
||||
},
|
||||
{
|
||||
{"a", "c"},
|
||||
{"instance", "127.0.0.1:9090"},
|
||||
{"job", "prom-k8s"},
|
||||
},
|
||||
{
|
||||
{"a", "c"},
|
||||
{"instance", "localhost:9090"},
|
||||
{"job", "prom-k8s"},
|
||||
},
|
||||
}
|
||||
|
||||
seriesMap := map[string][]sample{}
|
||||
for _, l := range lbls {
|
||||
seriesMap[labels.New(l...).String()] = []sample{}
|
||||
}
|
||||
|
||||
dir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
hb := createTestHeadBlock(t, dir, minTime, maxTime)
|
||||
app := hb.Appender()
|
||||
|
||||
for _, l := range lbls {
|
||||
ls := labels.New(l...)
|
||||
series := []sample{}
|
||||
|
||||
ts := rand.Int63n(300)
|
||||
for i := 0; i < numDatapoints; i++ {
|
||||
v := rand.Float64()
|
||||
if ts >= minTime && ts <= maxTime {
|
||||
series = append(series, sample{ts, v})
|
||||
}
|
||||
|
||||
_, err := app.Add(ls, ts, v)
|
||||
if ts >= minTime && ts <= maxTime {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.Error(t, ErrOutOfBounds, err)
|
||||
}
|
||||
|
||||
ts += rand.Int63n(timeInterval) + 1
|
||||
}
|
||||
|
||||
seriesMap[labels.New(l...).String()] = series
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Delete a time-range from each-selector.
|
||||
dels := []struct {
|
||||
ms []labels.Matcher
|
||||
drange intervals
|
||||
}{
|
||||
{
|
||||
ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")},
|
||||
drange: intervals{{300, 500}, {600, 670}},
|
||||
},
|
||||
{
|
||||
ms: []labels.Matcher{
|
||||
labels.NewEqualMatcher("a", "b"),
|
||||
labels.NewEqualMatcher("job", "prom-k8s"),
|
||||
},
|
||||
drange: intervals{{300, 500}, {100, 670}},
|
||||
},
|
||||
{
|
||||
ms: []labels.Matcher{
|
||||
labels.NewEqualMatcher("a", "c"),
|
||||
labels.NewEqualMatcher("instance", "localhost:9090"),
|
||||
labels.NewEqualMatcher("job", "prometheus"),
|
||||
},
|
||||
drange: intervals{{300, 400}, {100, 6700}},
|
||||
},
|
||||
// TODO: Add Regexp Matchers.
|
||||
}
|
||||
|
||||
for _, del := range dels {
|
||||
// Reset the deletes everytime.
|
||||
writeTombstoneFile(hb.dir, newEmptyTombstoneReader())
|
||||
hb.tombstones = newEmptyTombstoneReader()
|
||||
|
||||
for _, r := range del.drange {
|
||||
require.NoError(t, hb.Delete(r.mint, r.maxt, del.ms...))
|
||||
}
|
||||
|
||||
matched := labels.Slice{}
|
||||
for _, ls := range lbls {
|
||||
s := labels.Selector(del.ms)
|
||||
if s.Matches(ls) {
|
||||
matched = append(matched, ls)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(matched)
|
||||
|
||||
for i := 0; i < numRanges; i++ {
|
||||
mint := rand.Int63n(200)
|
||||
maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints))
|
||||
|
||||
q := hb.Querier(mint, maxt)
|
||||
ss := q.Select(del.ms...)
|
||||
|
||||
// Build the mockSeriesSet.
|
||||
matchedSeries := make([]Series, 0, len(matched))
|
||||
for _, m := range matched {
|
||||
smpls := boundedSamples(seriesMap[m.String()], mint, maxt)
|
||||
smpls = deletedSamples(smpls, del.drange)
|
||||
|
||||
// Only append those series for which samples exist as mockSeriesSet
|
||||
// doesn't skip series with no samples.
|
||||
// TODO: But sometimes SeriesSet returns an empty SeriesIterator
|
||||
if len(smpls) > 0 {
|
||||
matchedSeries = append(matchedSeries, newSeries(
|
||||
m.Map(),
|
||||
smpls,
|
||||
))
|
||||
}
|
||||
}
|
||||
expSs := newListSeriesSet(matchedSeries)
|
||||
|
||||
// Compare both SeriesSets.
|
||||
for {
|
||||
eok, rok := expSs.Next(), ss.Next()
|
||||
|
||||
// Skip a series if iterator is empty.
|
||||
if rok {
|
||||
for !ss.At().Iterator().Next() {
|
||||
rok = ss.Next()
|
||||
if !rok {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
require.Equal(t, eok, rok, "next")
|
||||
|
||||
if !eok {
|
||||
break
|
||||
}
|
||||
sexp := expSs.At()
|
||||
sres := ss.At()
|
||||
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||
|
||||
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||
|
||||
require.Equal(t, errExp, errRes, "samples error")
|
||||
require.Equal(t, smplExp, smplRes, "samples")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func boundedSamples(full []sample, mint, maxt int64) []sample {
|
||||
for len(full) > 0 {
|
||||
if full[0].t >= mint {
|
||||
|
@ -394,3 +715,19 @@ func boundedSamples(full []sample, mint, maxt int64) []sample {
|
|||
// maxt is after highest sample.
|
||||
return full
|
||||
}
|
||||
|
||||
func deletedSamples(full []sample, dranges intervals) []sample {
|
||||
ds := make([]sample, 0, len(full))
|
||||
Outer:
|
||||
for _, s := range full {
|
||||
for _, r := range dranges {
|
||||
if r.inBounds(s.t) {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
ds = append(ds, s)
|
||||
}
|
||||
|
||||
return ds
|
||||
}
|
||||
|
|
6
index.go
6
index.go
|
@ -569,11 +569,7 @@ func newIndexReader(dir string) (*indexReader, error) {
|
|||
return nil, errors.Wrap(err, "read label index table")
|
||||
}
|
||||
r.postings, err = r.readOffsetTable(r.toc.postingsTable)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "read postings table")
|
||||
}
|
||||
|
||||
return r, nil
|
||||
return r, errors.Wrap(err, "read postings table")
|
||||
}
|
||||
|
||||
func (r *indexReader) readTOC() error {
|
||||
|
|
95
querier.go
95
querier.go
|
@ -126,8 +126,9 @@ func (q *querier) Close() error {
|
|||
|
||||
// blockQuerier provides querying access to a single block database.
|
||||
type blockQuerier struct {
|
||||
index IndexReader
|
||||
chunks ChunkReader
|
||||
index IndexReader
|
||||
chunks ChunkReader
|
||||
tombstones TombstoneReader
|
||||
|
||||
postingsMapper func(Postings) Postings
|
||||
|
||||
|
@ -149,6 +150,8 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
|||
p: p,
|
||||
index: q.index,
|
||||
absent: absent,
|
||||
|
||||
tombstones: q.tombstones,
|
||||
},
|
||||
chunks: q.chunks,
|
||||
mint: q.mint,
|
||||
|
@ -366,29 +369,35 @@ func (s *mergedSeriesSet) Next() bool {
|
|||
|
||||
type chunkSeriesSet interface {
|
||||
Next() bool
|
||||
At() (labels.Labels, []*ChunkMeta)
|
||||
At() (labels.Labels, []*ChunkMeta, intervals)
|
||||
Err() error
|
||||
}
|
||||
|
||||
// baseChunkSeries loads the label set and chunk references for a postings
|
||||
// list from an index. It filters out series that have labels set that should be unset.
|
||||
type baseChunkSeries struct {
|
||||
p Postings
|
||||
index IndexReader
|
||||
absent []string // labels that must be unset in results.
|
||||
p Postings
|
||||
index IndexReader
|
||||
tombstones TombstoneReader
|
||||
absent []string // labels that must be unset in results.
|
||||
|
||||
lset labels.Labels
|
||||
chks []*ChunkMeta
|
||||
err error
|
||||
lset labels.Labels
|
||||
chks []*ChunkMeta
|
||||
intervals intervals
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
|
||||
func (s *baseChunkSeries) Err() error { return s.err }
|
||||
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||
return s.lset, s.chks, s.intervals
|
||||
}
|
||||
|
||||
func (s *baseChunkSeries) Err() error { return s.err }
|
||||
|
||||
func (s *baseChunkSeries) Next() bool {
|
||||
Outer:
|
||||
for s.p.Next() {
|
||||
lset, chunks, err := s.index.Series(s.p.At())
|
||||
ref := s.p.At()
|
||||
lset, chunks, err := s.index.Series(ref)
|
||||
if err != nil {
|
||||
s.err = err
|
||||
return false
|
||||
|
@ -403,6 +412,19 @@ Outer:
|
|||
|
||||
s.lset = lset
|
||||
s.chks = chunks
|
||||
s.intervals = s.tombstones.Get(s.p.At())
|
||||
|
||||
if len(s.intervals) > 0 {
|
||||
// Only those chunks that are not entirely deleted.
|
||||
chks := make([]*ChunkMeta, 0, len(s.chks))
|
||||
for _, chk := range s.chks {
|
||||
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
|
||||
chks = append(chks, chk)
|
||||
}
|
||||
}
|
||||
|
||||
s.chks = chks
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -420,17 +442,20 @@ type populatedChunkSeries struct {
|
|||
chunks ChunkReader
|
||||
mint, maxt int64
|
||||
|
||||
err error
|
||||
chks []*ChunkMeta
|
||||
lset labels.Labels
|
||||
err error
|
||||
chks []*ChunkMeta
|
||||
lset labels.Labels
|
||||
intervals intervals
|
||||
}
|
||||
|
||||
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks }
|
||||
func (s *populatedChunkSeries) Err() error { return s.err }
|
||||
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||
return s.lset, s.chks, s.intervals
|
||||
}
|
||||
func (s *populatedChunkSeries) Err() error { return s.err }
|
||||
|
||||
func (s *populatedChunkSeries) Next() bool {
|
||||
for s.set.Next() {
|
||||
lset, chks := s.set.At()
|
||||
lset, chks, dranges := s.set.At()
|
||||
|
||||
for len(chks) > 0 {
|
||||
if chks[0].MaxTime >= s.mint {
|
||||
|
@ -457,6 +482,7 @@ func (s *populatedChunkSeries) Next() bool {
|
|||
|
||||
s.lset = lset
|
||||
s.chks = chks
|
||||
s.intervals = dranges
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -477,8 +503,15 @@ type blockSeriesSet struct {
|
|||
|
||||
func (s *blockSeriesSet) Next() bool {
|
||||
for s.set.Next() {
|
||||
lset, chunks := s.set.At()
|
||||
s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt}
|
||||
lset, chunks, dranges := s.set.At()
|
||||
s.cur = &chunkSeries{
|
||||
labels: lset,
|
||||
chunks: chunks,
|
||||
mint: s.mint,
|
||||
maxt: s.maxt,
|
||||
|
||||
intervals: dranges,
|
||||
}
|
||||
return true
|
||||
}
|
||||
if s.set.Err() != nil {
|
||||
|
@ -497,6 +530,8 @@ type chunkSeries struct {
|
|||
chunks []*ChunkMeta // in-order chunk refs
|
||||
|
||||
mint, maxt int64
|
||||
|
||||
intervals intervals
|
||||
}
|
||||
|
||||
func (s *chunkSeries) Labels() labels.Labels {
|
||||
|
@ -504,7 +539,7 @@ func (s *chunkSeries) Labels() labels.Labels {
|
|||
}
|
||||
|
||||
func (s *chunkSeries) Iterator() SeriesIterator {
|
||||
return newChunkSeriesIterator(s.chunks, s.mint, s.maxt)
|
||||
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
|
||||
}
|
||||
|
||||
// SeriesIterator iterates over the data of a time series.
|
||||
|
@ -601,16 +636,24 @@ type chunkSeriesIterator struct {
|
|||
cur chunks.Iterator
|
||||
|
||||
maxt, mint int64
|
||||
|
||||
intervals intervals
|
||||
}
|
||||
|
||||
func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator {
|
||||
func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator {
|
||||
it := cs[0].Chunk.Iterator()
|
||||
if len(dranges) > 0 {
|
||||
it = &deletedIterator{it: it, intervals: dranges}
|
||||
}
|
||||
return &chunkSeriesIterator{
|
||||
chunks: cs,
|
||||
i: 0,
|
||||
cur: cs[0].Chunk.Iterator(),
|
||||
cur: it,
|
||||
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
|
||||
intervals: dranges,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -645,6 +688,9 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
|||
|
||||
it.i = x
|
||||
it.cur = it.chunks[x].Chunk.Iterator()
|
||||
if len(it.intervals) > 0 {
|
||||
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
|
||||
}
|
||||
|
||||
for it.cur.Next() {
|
||||
t0, _ := it.cur.At()
|
||||
|
@ -676,6 +722,9 @@ func (it *chunkSeriesIterator) Next() bool {
|
|||
|
||||
it.i++
|
||||
it.cur = it.chunks[it.i].Chunk.Iterator()
|
||||
if len(it.intervals) > 0 {
|
||||
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
|
||||
}
|
||||
|
||||
return it.Next()
|
||||
}
|
||||
|
|
193
querier_test.go
193
querier_test.go
|
@ -232,6 +232,7 @@ func createIdxChkReaders(tc []struct {
|
|||
mi := newMockIndex()
|
||||
|
||||
for i, s := range tc {
|
||||
i = i + 1 // 0 is not a valid posting.
|
||||
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
||||
for _, chk := range s.chunks {
|
||||
// Collisions can be there, but for tests, its fine.
|
||||
|
@ -378,8 +379,181 @@ Outer:
|
|||
for _, c := range cases.queries {
|
||||
ir, cr := createIdxChkReaders(cases.data)
|
||||
querier := &blockQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: newEmptyTombstoneReader(),
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
}
|
||||
|
||||
res := querier.Select(c.ms...)
|
||||
|
||||
for {
|
||||
eok, rok := c.exp.Next(), res.Next()
|
||||
require.Equal(t, eok, rok, "next")
|
||||
|
||||
if !eok {
|
||||
continue Outer
|
||||
}
|
||||
sexp := c.exp.At()
|
||||
sres := res.At()
|
||||
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||
|
||||
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||
|
||||
require.Equal(t, errExp, errRes, "samples error")
|
||||
require.Equal(t, smplExp, smplRes, "samples")
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestBlockQuerierDelete(t *testing.T) {
|
||||
newSeries := func(l map[string]string, s []sample) Series {
|
||||
return &mockSeries{
|
||||
labels: func() labels.Labels { return labels.FromMap(l) },
|
||||
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
|
||||
}
|
||||
}
|
||||
|
||||
type query struct {
|
||||
mint, maxt int64
|
||||
ms []labels.Matcher
|
||||
exp SeriesSet
|
||||
}
|
||||
|
||||
cases := struct {
|
||||
data []struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}
|
||||
|
||||
tombstones tombstoneReader
|
||||
queries []query
|
||||
}{
|
||||
data: []struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}{
|
||||
{
|
||||
lset: map[string]string{
|
||||
"a": "a",
|
||||
},
|
||||
chunks: [][]sample{
|
||||
{
|
||||
{1, 2}, {2, 3}, {3, 4},
|
||||
},
|
||||
{
|
||||
{5, 2}, {6, 3}, {7, 4},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{
|
||||
"a": "a",
|
||||
"b": "b",
|
||||
},
|
||||
chunks: [][]sample{
|
||||
{
|
||||
{1, 1}, {2, 2}, {3, 3},
|
||||
},
|
||||
{
|
||||
{4, 15}, {5, 3}, {6, 6},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{
|
||||
"b": "b",
|
||||
},
|
||||
chunks: [][]sample{
|
||||
{
|
||||
{1, 3}, {2, 2}, {3, 6},
|
||||
},
|
||||
{
|
||||
{5, 1}, {6, 7}, {7, 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
tombstones: newTombstoneReader(
|
||||
map[uint32]intervals{
|
||||
1: intervals{{1, 3}},
|
||||
2: intervals{{1, 3}, {6, 10}},
|
||||
3: intervals{{6, 10}},
|
||||
},
|
||||
),
|
||||
|
||||
queries: []query{
|
||||
{
|
||||
mint: 2,
|
||||
maxt: 7,
|
||||
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
|
||||
exp: newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{
|
||||
"a": "a",
|
||||
},
|
||||
[]sample{{5, 2}, {6, 3}, {7, 4}},
|
||||
),
|
||||
newSeries(map[string]string{
|
||||
"a": "a",
|
||||
"b": "b",
|
||||
},
|
||||
[]sample{{4, 15}, {5, 3}},
|
||||
),
|
||||
}),
|
||||
},
|
||||
{
|
||||
mint: 2,
|
||||
maxt: 7,
|
||||
ms: []labels.Matcher{labels.NewEqualMatcher("b", "b")},
|
||||
exp: newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{
|
||||
"a": "a",
|
||||
"b": "b",
|
||||
},
|
||||
[]sample{{4, 15}, {5, 3}},
|
||||
),
|
||||
newSeries(map[string]string{
|
||||
"b": "b",
|
||||
},
|
||||
[]sample{{2, 2}, {3, 6}, {5, 1}},
|
||||
),
|
||||
}),
|
||||
},
|
||||
{
|
||||
mint: 1,
|
||||
maxt: 4,
|
||||
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
|
||||
exp: newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{
|
||||
"a": "a",
|
||||
"b": "b",
|
||||
},
|
||||
[]sample{{4, 15}},
|
||||
),
|
||||
}),
|
||||
},
|
||||
{
|
||||
mint: 1,
|
||||
maxt: 3,
|
||||
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
|
||||
exp: newListSeriesSet([]Series{}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
Outer:
|
||||
for _, c := range cases.queries {
|
||||
ir, cr := createIdxChkReaders(cases.data)
|
||||
querier := &blockQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: cases.tombstones,
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
|
@ -487,13 +661,14 @@ func TestBaseChunkSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
bcs := &baseChunkSeries{
|
||||
p: newListPostings(tc.postings),
|
||||
index: mi,
|
||||
p: newListPostings(tc.postings),
|
||||
index: mi,
|
||||
tombstones: newEmptyTombstoneReader(),
|
||||
}
|
||||
|
||||
i := 0
|
||||
for bcs.Next() {
|
||||
lset, chks := bcs.At()
|
||||
lset, chks, _ := bcs.At()
|
||||
|
||||
idx := tc.expIdxs[i]
|
||||
|
||||
|
@ -701,7 +876,7 @@ func TestSeriesIterator(t *testing.T) {
|
|||
chunkFromSamples(tc.b),
|
||||
chunkFromSamples(tc.c),
|
||||
}
|
||||
res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt)
|
||||
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
|
||||
|
||||
smplValid := make([]sample, 0)
|
||||
for _, s := range tc.exp {
|
||||
|
@ -772,7 +947,7 @@ func TestSeriesIterator(t *testing.T) {
|
|||
chunkFromSamples(tc.b),
|
||||
chunkFromSamples(tc.c),
|
||||
}
|
||||
res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt)
|
||||
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
|
||||
|
||||
smplValid := make([]sample, 0)
|
||||
for _, s := range tc.exp {
|
||||
|
@ -919,8 +1094,8 @@ func (m *mockChunkSeriesSet) Next() bool {
|
|||
return m.i < len(m.l)
|
||||
}
|
||||
|
||||
func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta) {
|
||||
return m.l[m.i], m.cm[m.i]
|
||||
func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) {
|
||||
return m.l[m.i], m.cm[m.i], nil
|
||||
}
|
||||
|
||||
func (m *mockChunkSeriesSet) Err() error {
|
||||
|
|
223
tombstones.go
Normal file
223
tombstones.go
Normal file
|
@ -0,0 +1,223 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const tombstoneFilename = "tombstones"
|
||||
|
||||
const (
|
||||
// MagicTombstone is 4 bytes at the head of a tombstone file.
|
||||
MagicTombstone = 0x130BA30
|
||||
|
||||
tombstoneFormatV1 = 1
|
||||
)
|
||||
|
||||
func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
||||
path := filepath.Join(dir, tombstoneFilename)
|
||||
tmp := path + ".tmp"
|
||||
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
|
||||
f, err := os.Create(tmp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)}
|
||||
buf.reset()
|
||||
// Write the meta.
|
||||
buf.putBE32(MagicTombstone)
|
||||
buf.putByte(tombstoneFormatV1)
|
||||
_, err = f.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mw := io.MultiWriter(f, hash)
|
||||
|
||||
for k, v := range tr {
|
||||
for _, itv := range v {
|
||||
buf.reset()
|
||||
buf.putUvarint32(k)
|
||||
buf.putVarint64(itv.mint)
|
||||
buf.putVarint64(itv.maxt)
|
||||
|
||||
_, err = mw.Write(buf.get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err = f.Write(hash.Sum(nil))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return renameFile(tmp, path)
|
||||
}
|
||||
|
||||
// Stone holds the information on the posting and time-range
|
||||
// that is deleted.
|
||||
type Stone struct {
|
||||
ref uint32
|
||||
intervals intervals
|
||||
}
|
||||
|
||||
// TombstoneReader is the iterator over tombstones.
|
||||
type TombstoneReader interface {
|
||||
Get(ref uint32) intervals
|
||||
}
|
||||
|
||||
func readTombstones(dir string) (tombstoneReader, error) {
|
||||
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(b) < 5 {
|
||||
return nil, errors.Wrap(errInvalidSize, "tombstones header")
|
||||
}
|
||||
|
||||
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
|
||||
if mg := d.be32(); mg != MagicTombstone {
|
||||
return nil, fmt.Errorf("invalid magic number %x", mg)
|
||||
}
|
||||
if flag := d.byte(); flag != tombstoneFormatV1 {
|
||||
return nil, fmt.Errorf("invalid tombstone format %x", flag)
|
||||
}
|
||||
|
||||
if d.err() != nil {
|
||||
return nil, d.err()
|
||||
}
|
||||
|
||||
// Verify checksum
|
||||
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
if _, err := hash.Write(d.get()); err != nil {
|
||||
return nil, errors.Wrap(err, "write to hash")
|
||||
}
|
||||
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
|
||||
return nil, errors.New("checksum did not match")
|
||||
}
|
||||
|
||||
stonesMap := newEmptyTombstoneReader()
|
||||
for d.len() > 0 {
|
||||
k := d.uvarint32()
|
||||
mint := d.varint64()
|
||||
maxt := d.varint64()
|
||||
if d.err() != nil {
|
||||
return nil, d.err()
|
||||
}
|
||||
|
||||
stonesMap.add(k, interval{mint, maxt})
|
||||
}
|
||||
|
||||
return newTombstoneReader(stonesMap), nil
|
||||
}
|
||||
|
||||
type tombstoneReader map[uint32]intervals
|
||||
|
||||
func newTombstoneReader(ts map[uint32]intervals) tombstoneReader {
|
||||
return tombstoneReader(ts)
|
||||
}
|
||||
|
||||
func newEmptyTombstoneReader() tombstoneReader {
|
||||
return tombstoneReader(make(map[uint32]intervals))
|
||||
}
|
||||
|
||||
func (t tombstoneReader) Get(ref uint32) intervals {
|
||||
return t[ref]
|
||||
}
|
||||
|
||||
func (t tombstoneReader) add(ref uint32, itv interval) {
|
||||
t[ref] = t[ref].add(itv)
|
||||
}
|
||||
|
||||
type interval struct {
|
||||
mint, maxt int64
|
||||
}
|
||||
|
||||
func (tr interval) inBounds(t int64) bool {
|
||||
return t >= tr.mint && t <= tr.maxt
|
||||
}
|
||||
|
||||
func (tr interval) isSubrange(dranges intervals) bool {
|
||||
for _, r := range dranges {
|
||||
if r.inBounds(tr.mint) && r.inBounds(tr.maxt) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
type intervals []interval
|
||||
|
||||
// This adds the new time-range to the existing ones.
|
||||
// The existing ones must be sorted.
|
||||
func (itvs intervals) add(n interval) intervals {
|
||||
for i, r := range itvs {
|
||||
// TODO(gouthamve): Make this codepath easier to digest.
|
||||
if r.inBounds(n.mint-1) || r.inBounds(n.mint) {
|
||||
if n.maxt > r.maxt {
|
||||
itvs[i].maxt = n.maxt
|
||||
}
|
||||
|
||||
j := 0
|
||||
for _, r2 := range itvs[i+1:] {
|
||||
if n.maxt < r2.mint {
|
||||
break
|
||||
}
|
||||
j++
|
||||
}
|
||||
if j != 0 {
|
||||
if itvs[i+j].maxt > n.maxt {
|
||||
itvs[i].maxt = itvs[i+j].maxt
|
||||
}
|
||||
itvs = append(itvs[:i+1], itvs[i+j+1:]...)
|
||||
}
|
||||
return itvs
|
||||
}
|
||||
|
||||
if r.inBounds(n.maxt+1) || r.inBounds(n.maxt) {
|
||||
if n.mint < r.maxt {
|
||||
itvs[i].mint = n.mint
|
||||
}
|
||||
return itvs
|
||||
}
|
||||
|
||||
if n.mint < r.mint {
|
||||
newRange := make(intervals, i, len(itvs[:i])+1)
|
||||
copy(newRange, itvs[:i])
|
||||
newRange = append(newRange, n)
|
||||
newRange = append(newRange, itvs[i:]...)
|
||||
|
||||
return newRange
|
||||
}
|
||||
}
|
||||
|
||||
itvs = append(itvs, n)
|
||||
return itvs
|
||||
}
|
123
tombstones_test.go
Normal file
123
tombstones_test.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWriteAndReadbackTombStones(t *testing.T) {
|
||||
tmpdir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
ref := uint32(0)
|
||||
|
||||
stones := make(map[uint32]intervals)
|
||||
// Generate the tombstones.
|
||||
for i := 0; i < 100; i++ {
|
||||
ref += uint32(rand.Int31n(10)) + 1
|
||||
numRanges := rand.Intn(5) + 1
|
||||
dranges := make(intervals, 0, numRanges)
|
||||
mint := rand.Int63n(time.Now().UnixNano())
|
||||
for j := 0; j < numRanges; j++ {
|
||||
dranges = dranges.add(interval{mint, mint + rand.Int63n(1000)})
|
||||
mint += rand.Int63n(1000) + 1
|
||||
}
|
||||
stones[ref] = dranges
|
||||
}
|
||||
|
||||
require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones)))
|
||||
|
||||
restr, err := readTombstones(tmpdir)
|
||||
require.NoError(t, err)
|
||||
exptr := newTombstoneReader(stones)
|
||||
// Compare the two readers.
|
||||
require.Equal(t, exptr, restr)
|
||||
}
|
||||
|
||||
func TestAddingNewIntervals(t *testing.T) {
|
||||
cases := []struct {
|
||||
exist intervals
|
||||
new interval
|
||||
|
||||
exp intervals
|
||||
}{
|
||||
{
|
||||
new: interval{1, 2},
|
||||
exp: intervals{{1, 2}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 2}},
|
||||
new: interval{1, 2},
|
||||
exp: intervals{{1, 2}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 4}, {6, 6}},
|
||||
new: interval{5, 6},
|
||||
exp: intervals{{1, 6}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{21, 23},
|
||||
exp: intervals{{1, 10}, {12, 23}, {25, 30}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 2}, {3, 5}, {7, 7}},
|
||||
new: interval{6, 7},
|
||||
exp: intervals{{1, 2}, {3, 7}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{21, 25},
|
||||
exp: intervals{{1, 10}, {12, 30}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{18, 23},
|
||||
exp: intervals{{1, 10}, {12, 23}, {25, 30}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{9, 23},
|
||||
exp: intervals{{1, 23}, {25, 30}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{9, 230},
|
||||
exp: intervals{{1, 230}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{5, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{1, 4},
|
||||
exp: intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||
},
|
||||
{
|
||||
exist: intervals{{5, 10}, {12, 20}, {25, 30}},
|
||||
new: interval{11, 14},
|
||||
exp: intervals{{5, 20}, {25, 30}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
|
||||
require.Equal(t, c.exp, c.exist.add(c.new))
|
||||
}
|
||||
return
|
||||
}
|
188
wal.go
188
wal.go
|
@ -46,8 +46,18 @@ const (
|
|||
WALEntrySymbols WALEntryType = 1
|
||||
WALEntrySeries WALEntryType = 2
|
||||
WALEntrySamples WALEntryType = 3
|
||||
WALEntryDeletes WALEntryType = 4
|
||||
)
|
||||
|
||||
// SamplesCB is the callback after reading samples.
|
||||
type SamplesCB func([]RefSample) error
|
||||
|
||||
// SeriesCB is the callback after reading series.
|
||||
type SeriesCB func([]labels.Labels) error
|
||||
|
||||
// DeletesCB is the callback after reading deletes.
|
||||
type DeletesCB func([]Stone) error
|
||||
|
||||
// SegmentWAL is a write ahead log for series data.
|
||||
type SegmentWAL struct {
|
||||
mtx sync.Mutex
|
||||
|
@ -71,15 +81,15 @@ type SegmentWAL struct {
|
|||
// It must be completely read before new entries are logged.
|
||||
type WAL interface {
|
||||
Reader() WALReader
|
||||
Log([]labels.Labels, []RefSample) error
|
||||
LogSeries([]labels.Labels) error
|
||||
LogSamples([]RefSample) error
|
||||
LogDeletes([]Stone) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// WALReader reads entries from a WAL.
|
||||
type WALReader interface {
|
||||
At() ([]labels.Labels, []RefSample)
|
||||
Next() bool
|
||||
Err() error
|
||||
Read(SeriesCB, SamplesCB, DeletesCB) error
|
||||
}
|
||||
|
||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||
|
@ -141,13 +151,40 @@ func (w *SegmentWAL) Reader() WALReader {
|
|||
}
|
||||
|
||||
// Log writes a batch of new series labels and samples to the log.
|
||||
func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
|
||||
//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
|
||||
//return nil
|
||||
//}
|
||||
|
||||
// LogSeries writes a batch of new series labels to the log.
|
||||
func (w *SegmentWAL) LogSeries(series []labels.Labels) error {
|
||||
if err := w.encodeSeries(series); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if w.flushInterval <= 0 {
|
||||
return w.Sync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogSamples writes a batch of new samples to the log.
|
||||
func (w *SegmentWAL) LogSamples(samples []RefSample) error {
|
||||
if err := w.encodeSamples(samples); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if w.flushInterval <= 0 {
|
||||
return w.Sync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogDeletes write a batch of new deletes to the log.
|
||||
func (w *SegmentWAL) LogDeletes(stones []Stone) error {
|
||||
if err := w.encodeDeletes(stones); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if w.flushInterval <= 0 {
|
||||
return w.Sync()
|
||||
}
|
||||
|
@ -369,6 +406,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
|||
const (
|
||||
walSeriesSimple = 1
|
||||
walSamplesSimple = 1
|
||||
walDeletesSimple = 1
|
||||
)
|
||||
|
||||
var walBuffers = sync.Pool{}
|
||||
|
@ -445,6 +483,23 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
|
|||
return w.entry(WALEntrySamples, walSamplesSimple, buf)
|
||||
}
|
||||
|
||||
func (w *SegmentWAL) encodeDeletes(stones []Stone) error {
|
||||
b := make([]byte, 2*binary.MaxVarintLen64)
|
||||
eb := &encbuf{b: b}
|
||||
buf := getWALBuffer()
|
||||
for _, s := range stones {
|
||||
for _, itv := range s.intervals {
|
||||
eb.reset()
|
||||
eb.putUvarint32(s.ref)
|
||||
eb.putVarint64(itv.mint)
|
||||
eb.putVarint64(itv.maxt)
|
||||
buf = append(buf, eb.get()...)
|
||||
}
|
||||
}
|
||||
|
||||
return w.entry(WALEntryDeletes, walDeletesSimple, buf)
|
||||
}
|
||||
|
||||
// walReader decodes and emits write ahead log entries.
|
||||
type walReader struct {
|
||||
logger log.Logger
|
||||
|
@ -454,9 +509,11 @@ type walReader struct {
|
|||
buf []byte
|
||||
crc32 hash.Hash32
|
||||
|
||||
err error
|
||||
labels []labels.Labels
|
||||
samples []RefSample
|
||||
curType WALEntryType
|
||||
curFlag byte
|
||||
curBuf []byte
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
|
||||
|
@ -471,18 +528,41 @@ func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
|
|||
}
|
||||
}
|
||||
|
||||
// At returns the last decoded entry of labels or samples.
|
||||
// The returned slices are only valid until the next call to Next(). Their elements
|
||||
// have to be copied to preserve them.
|
||||
func (r *walReader) At() ([]labels.Labels, []RefSample) {
|
||||
return r.labels, r.samples
|
||||
}
|
||||
|
||||
// Err returns the last error the reader encountered.
|
||||
func (r *walReader) Err() error {
|
||||
return r.err
|
||||
}
|
||||
|
||||
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error {
|
||||
for r.next() {
|
||||
et, flag, b := r.at()
|
||||
// In decoding below we never return a walCorruptionErr for now.
|
||||
// Those should generally be catched by entry decoding before.
|
||||
switch et {
|
||||
case WALEntrySeries:
|
||||
s, err := r.decodeSeries(flag, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seriesf(s)
|
||||
case WALEntrySamples:
|
||||
s, err := r.decodeSamples(flag, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
samplesf(s)
|
||||
case WALEntryDeletes:
|
||||
s, err := r.decodeDeletes(flag, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deletesf(s)
|
||||
}
|
||||
}
|
||||
|
||||
return r.Err()
|
||||
}
|
||||
|
||||
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
||||
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
||||
if r.cur >= len(r.wal.files) {
|
||||
|
@ -505,12 +585,13 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
|||
return et, flag, b, err
|
||||
}
|
||||
|
||||
// Next returns decodes the next entry pair and returns true
|
||||
// if it was succesful.
|
||||
func (r *walReader) Next() bool {
|
||||
r.labels = r.labels[:0]
|
||||
r.samples = r.samples[:0]
|
||||
func (r *walReader) at() (WALEntryType, byte, []byte) {
|
||||
return r.curType, r.curFlag, r.curBuf
|
||||
}
|
||||
|
||||
// next returns decodes the next entry pair and returns true
|
||||
// if it was succesful.
|
||||
func (r *walReader) next() bool {
|
||||
if r.cur >= len(r.wal.files) {
|
||||
return false
|
||||
}
|
||||
|
@ -537,7 +618,7 @@ func (r *walReader) Next() bool {
|
|||
return false
|
||||
}
|
||||
r.cur++
|
||||
return r.Next()
|
||||
return r.next()
|
||||
}
|
||||
if err != nil {
|
||||
r.err = err
|
||||
|
@ -548,19 +629,9 @@ func (r *walReader) Next() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// In decoding below we never return a walCorruptionErr for now.
|
||||
// Those should generally be catched by entry decoding before.
|
||||
|
||||
switch et {
|
||||
case WALEntrySamples:
|
||||
if err := r.decodeSamples(flag, b); err != nil {
|
||||
r.err = err
|
||||
}
|
||||
case WALEntrySeries:
|
||||
if err := r.decodeSeries(flag, b); err != nil {
|
||||
r.err = err
|
||||
}
|
||||
}
|
||||
r.curType = et
|
||||
r.curFlag = flag
|
||||
r.curBuf = b
|
||||
return r.err == nil
|
||||
}
|
||||
|
||||
|
@ -617,7 +688,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
|||
if etype == 0 {
|
||||
return 0, 0, nil, io.EOF
|
||||
}
|
||||
if etype != WALEntrySeries && etype != WALEntrySamples {
|
||||
if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes {
|
||||
return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype)
|
||||
}
|
||||
|
||||
|
@ -644,11 +715,12 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
|||
return etype, flag, buf, nil
|
||||
}
|
||||
|
||||
func (r *walReader) decodeSeries(flag byte, b []byte) error {
|
||||
func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) {
|
||||
series := []labels.Labels{}
|
||||
for len(b) > 0 {
|
||||
l, n := binary.Uvarint(b)
|
||||
if n < 1 {
|
||||
return errors.Wrap(errInvalidSize, "number of labels")
|
||||
return nil, errors.Wrap(errInvalidSize, "number of labels")
|
||||
}
|
||||
b = b[n:]
|
||||
lset := make(labels.Labels, l)
|
||||
|
@ -656,27 +728,29 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error {
|
|||
for i := 0; i < int(l); i++ {
|
||||
nl, n := binary.Uvarint(b)
|
||||
if n < 1 || len(b) < n+int(nl) {
|
||||
return errors.Wrap(errInvalidSize, "label name")
|
||||
return nil, errors.Wrap(errInvalidSize, "label name")
|
||||
}
|
||||
lset[i].Name = string(b[n : n+int(nl)])
|
||||
b = b[n+int(nl):]
|
||||
|
||||
vl, n := binary.Uvarint(b)
|
||||
if n < 1 || len(b) < n+int(vl) {
|
||||
return errors.Wrap(errInvalidSize, "label value")
|
||||
return nil, errors.Wrap(errInvalidSize, "label value")
|
||||
}
|
||||
lset[i].Value = string(b[n : n+int(vl)])
|
||||
b = b[n+int(vl):]
|
||||
}
|
||||
|
||||
r.labels = append(r.labels, lset)
|
||||
series = append(series, lset)
|
||||
}
|
||||
return nil
|
||||
return series, nil
|
||||
}
|
||||
|
||||
func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
||||
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
||||
samples := []RefSample{}
|
||||
|
||||
if len(b) < 16 {
|
||||
return errors.Wrap(errInvalidSize, "header length")
|
||||
return nil, errors.Wrap(errInvalidSize, "header length")
|
||||
}
|
||||
var (
|
||||
baseRef = binary.BigEndian.Uint64(b)
|
||||
|
@ -689,7 +763,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
|||
|
||||
dref, n := binary.Varint(b)
|
||||
if n < 1 {
|
||||
return errors.Wrap(errInvalidSize, "sample ref delta")
|
||||
return nil, errors.Wrap(errInvalidSize, "sample ref delta")
|
||||
}
|
||||
b = b[n:]
|
||||
|
||||
|
@ -697,18 +771,36 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
|
|||
|
||||
dtime, n := binary.Varint(b)
|
||||
if n < 1 {
|
||||
return errors.Wrap(errInvalidSize, "sample timestamp delta")
|
||||
return nil, errors.Wrap(errInvalidSize, "sample timestamp delta")
|
||||
}
|
||||
b = b[n:]
|
||||
smpl.T = baseTime + dtime
|
||||
|
||||
if len(b) < 8 {
|
||||
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
||||
return nil, errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
||||
}
|
||||
smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
|
||||
b = b[8:]
|
||||
|
||||
r.samples = append(r.samples, smpl)
|
||||
samples = append(samples, smpl)
|
||||
}
|
||||
return nil
|
||||
return samples, nil
|
||||
}
|
||||
|
||||
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
||||
db := &decbuf{b: b}
|
||||
stones := []Stone{}
|
||||
|
||||
for db.len() > 0 {
|
||||
var s Stone
|
||||
s.ref = db.uvarint32()
|
||||
s.intervals = intervals{{db.varint64(), db.varint64()}}
|
||||
if db.err() != nil {
|
||||
return nil, db.err()
|
||||
}
|
||||
|
||||
stones = append(stones, s)
|
||||
}
|
||||
|
||||
return stones, nil
|
||||
}
|
||||
|
|
86
wal_test.go
86
wal_test.go
|
@ -149,6 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
var (
|
||||
recordedSeries [][]labels.Labels
|
||||
recordedSamples [][]RefSample
|
||||
recordedDeletes [][]Stone
|
||||
)
|
||||
var totalSamples int
|
||||
|
||||
|
@ -166,32 +167,48 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
var (
|
||||
resultSeries [][]labels.Labels
|
||||
resultSamples [][]RefSample
|
||||
resultDeletes [][]Stone
|
||||
)
|
||||
|
||||
for r.Next() {
|
||||
lsets, smpls := r.At()
|
||||
|
||||
serf := func(lsets []labels.Labels) error {
|
||||
if len(lsets) > 0 {
|
||||
clsets := make([]labels.Labels, len(lsets))
|
||||
copy(clsets, lsets)
|
||||
resultSeries = append(resultSeries, clsets)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
smplf := func(smpls []RefSample) error {
|
||||
if len(smpls) > 0 {
|
||||
csmpls := make([]RefSample, len(smpls))
|
||||
copy(csmpls, smpls)
|
||||
resultSamples = append(resultSamples, csmpls)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
require.NoError(t, r.Err())
|
||||
|
||||
delf := func(stones []Stone) error {
|
||||
if len(stones) > 0 {
|
||||
resultDeletes = append(resultDeletes, stones)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
require.NoError(t, r.Read(serf, smplf, delf))
|
||||
|
||||
require.Equal(t, recordedSamples, resultSamples)
|
||||
require.Equal(t, recordedSeries, resultSeries)
|
||||
require.Equal(t, recordedDeletes, resultDeletes)
|
||||
|
||||
series := series[k : k+(numMetrics/iterations)]
|
||||
|
||||
// Insert in batches and generate different amounts of samples for each.
|
||||
for i := 0; i < len(series); i += stepSize {
|
||||
var samples []RefSample
|
||||
var stones []Stone
|
||||
|
||||
for j := 0; j < i*10; j++ {
|
||||
samples = append(samples, RefSample{
|
||||
|
@ -201,9 +218,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
for j := 0; j < i*20; j++ {
|
||||
ts := rand.Int63()
|
||||
stones = append(stones, Stone{rand.Uint32(), intervals{{ts, ts + rand.Int63n(10000)}}})
|
||||
}
|
||||
|
||||
lbls := series[i : i+stepSize]
|
||||
|
||||
require.NoError(t, w.Log(lbls, samples))
|
||||
require.NoError(t, w.LogSeries(lbls))
|
||||
require.NoError(t, w.LogSamples(samples))
|
||||
require.NoError(t, w.LogDeletes(stones))
|
||||
|
||||
if len(lbls) > 0 {
|
||||
recordedSeries = append(recordedSeries, lbls)
|
||||
|
@ -212,6 +236,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
recordedSamples = append(recordedSamples, samples)
|
||||
totalSamples += len(samples)
|
||||
}
|
||||
if len(stones) > 0 {
|
||||
recordedDeletes = append(recordedDeletes, stones)
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, w.Close())
|
||||
|
@ -292,13 +319,13 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
w, err := OpenSegmentWAL(dir, nil, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}}))
|
||||
require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}}))
|
||||
require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}}))
|
||||
require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}}))
|
||||
|
||||
require.NoError(t, w.cut())
|
||||
|
||||
require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}}))
|
||||
require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}}))
|
||||
require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}}))
|
||||
require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}}))
|
||||
|
||||
require.NoError(t, w.Close())
|
||||
|
||||
|
@ -314,17 +341,28 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
r := w2.Reader()
|
||||
serf := func(l []labels.Labels) error {
|
||||
require.Equal(t, 0, len(l))
|
||||
return nil
|
||||
}
|
||||
delf := func([]Stone) error { return nil }
|
||||
|
||||
require.True(t, r.Next())
|
||||
l, s := r.At()
|
||||
require.Equal(t, 0, len(l))
|
||||
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||
// Weird hack to check order of reads.
|
||||
i := 0
|
||||
samplf := func(s []RefSample) error {
|
||||
if i == 0 {
|
||||
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||
i++
|
||||
} else {
|
||||
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
|
||||
}
|
||||
|
||||
// Truncation should happen transparently and not cause an error.
|
||||
require.False(t, r.Next())
|
||||
require.Nil(t, r.Err())
|
||||
return nil
|
||||
}
|
||||
|
||||
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
|
||||
require.NoError(t, r.Read(serf, samplf, delf))
|
||||
|
||||
require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}}))
|
||||
require.NoError(t, w2.Close())
|
||||
|
||||
// We should see the first valid entry and the new one, everything after
|
||||
|
@ -334,18 +372,8 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
|
||||
r = w3.Reader()
|
||||
|
||||
require.True(t, r.Next())
|
||||
l, s = r.At()
|
||||
require.Equal(t, 0, len(l))
|
||||
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||
|
||||
require.True(t, r.Next())
|
||||
l, s = r.At()
|
||||
require.Equal(t, 0, len(l))
|
||||
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
|
||||
|
||||
require.False(t, r.Next())
|
||||
require.Nil(t, r.Err())
|
||||
i = 0
|
||||
require.NoError(t, r.Read(serf, samplf, delf))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue