Add tests for tombstones.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-21 23:20:05 +05:30
parent d32eb25662
commit 0b70333ef6
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
5 changed files with 149 additions and 37 deletions

View file

@ -198,6 +198,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
chunkr: cr, chunkr: cr,
indexr: ir, indexr: ir,
// TODO(gouthamve): We will be sorting the refs again internally, is it a big deal?
tombstones: newMapTombstoneReader(ts), tombstones: newMapTombstoneReader(ts),
} }
return pb, nil return pb, nil
@ -222,7 +223,7 @@ func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
maxt: maxt, maxt: maxt,
index: pb.Index(), index: pb.Index(),
chunks: pb.Chunks(), chunks: pb.Chunks(),
tombstones: pb.tombstones.Copy(), tombstones: pb.Tombstones(),
} }
} }
@ -269,7 +270,7 @@ Outer:
} }
// Merge the current and new tombstones. // Merge the current and new tombstones.
tr := pb.tombstones.Copy() tr := pb.Tombstones()
str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}}) str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}})
tombreader := newMergedTombstoneReader(tr, str) tombreader := newMergedTombstoneReader(tr, str)

View file

@ -424,7 +424,7 @@ func (c *compactionSeriesSet) Next() bool {
return false return false
} }
// Remove completely deleted chunks and re-encode partial ones. // Remove completely deleted chunks.
if len(c.dranges) > 0 { if len(c.dranges) > 0 {
chks := make([]*ChunkMeta, 0, len(c.c)) chks := make([]*ChunkMeta, 0, len(c.c))
for _, chk := range c.c { for _, chk := range c.c {

6
db.go
View file

@ -680,9 +680,9 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
s.headmtx.RLock() db.headmtx.RLock()
blocks := s.blocksForInterval(mint, maxt) blocks := db.blocksForInterval(mint, maxt)
s.headmtx.RUnlock() db.headmtx.RUnlock()
var g errgroup.Group var g errgroup.Group

View file

@ -102,15 +102,15 @@ type TombstoneReader interface {
Next() bool Next() bool
Seek(ref uint32) bool Seek(ref uint32) bool
At() stone At() stone
// A copy of the current instance. Changes to the copy will not affect parent. // Copy copies the current reader state. Changes to the copy will not affect parent.
Copy() TombstoneReader Copy() TombstoneReader
Err() error Err() error
} }
type tombstoneReader struct { type tombstoneReader struct {
stones []byte stones []byte
idx int
len int cur stone
b []byte b []byte
err error err error
@ -135,11 +135,10 @@ func newTombStoneReader(dir string) (*tombstoneReader, error) {
if err := d.err(); err != nil { if err := d.err(); err != nil {
return nil, err return nil, err
} }
off += 4 // For the numStones which has been read.
return &tombstoneReader{ return &tombstoneReader{
stones: b[off+4:], stones: b[off : off+int64(numStones*12)],
idx: -1,
len: int(numStones),
b: b, b: b,
}, nil }, nil
@ -150,26 +149,11 @@ func (t *tombstoneReader) Next() bool {
return false return false
} }
t.idx++ if len(t.stones) < 12 {
return false
}
return t.idx < t.len d := &decbuf{b: t.stones[:12]}
}
func (t *tombstoneReader) Seek(ref uint32) bool {
bytIdx := t.idx * 12
t.idx += sort.Search(t.len-t.idx, func(i int) bool {
return binary.BigEndian.Uint32(t.b[bytIdx+i*12:]) >= ref
})
return t.idx < t.len
}
func (t *tombstoneReader) At() stone {
bytIdx := t.idx * (4 + 8)
dat := t.stones[bytIdx : bytIdx+12]
d := &decbuf{b: dat}
ref := d.be32() ref := d.be32()
off := d.be64int64() off := d.be64int64()
@ -177,7 +161,7 @@ func (t *tombstoneReader) At() stone {
numRanges := d.varint64() numRanges := d.varint64()
if err := d.err(); err != nil { if err := d.err(); err != nil {
t.err = err t.err = err
return stone{ref: ref} return false
} }
dranges := make([]trange, 0, numRanges) dranges := make([]trange, 0, numRanges)
@ -186,20 +170,40 @@ func (t *tombstoneReader) At() stone {
maxt := d.varint64() maxt := d.varint64()
if err := d.err(); err != nil { if err := d.err(); err != nil {
t.err = err t.err = err
return stone{ref: ref, ranges: dranges} return false
} }
dranges = append(dranges, trange{mint, maxt}) dranges = append(dranges, trange{mint, maxt})
} }
return stone{ref: ref, ranges: dranges} t.stones = t.stones[12:]
t.cur = stone{ref: ref, ranges: dranges}
return true
}
func (t *tombstoneReader) Seek(ref uint32) bool {
i := sort.Search(len(t.stones)/12, func(i int) bool {
x := binary.BigEndian.Uint32(t.stones[i*12:])
return x >= ref
})
if i*12 < len(t.stones) {
t.stones = t.stones[i*12:]
return t.Next()
}
t.stones = nil
return false
}
func (t *tombstoneReader) At() stone {
return t.cur
} }
func (t *tombstoneReader) Copy() TombstoneReader { func (t *tombstoneReader) Copy() TombstoneReader {
return &tombstoneReader{ return &tombstoneReader{
stones: t.stones[:], stones: t.stones[:],
idx: t.idx, cur: t.cur,
len: t.len,
b: t.b, b: t.b,
} }
@ -291,6 +295,7 @@ func newSimpleTombstoneReader(refs []uint32, drange []trange) *simpleTombstoneRe
func (t *simpleTombstoneReader) Next() bool { func (t *simpleTombstoneReader) Next() bool {
if len(t.refs) > 0 { if len(t.refs) > 0 {
t.cur = t.refs[0] t.cur = t.refs[0]
t.refs = t.refs[1:]
return true return true
} }
@ -437,7 +442,8 @@ func (tr trange) isSubrange(ranges []trange) bool {
} }
// This adds the new time-range to the existing ones. // This adds the new time-range to the existing ones.
// The existing ones must be sorted and should not be nil. // The existing ones must be sorted.
// TODO(gouthamve): {1, 2}, {3, 4} can be merged into {1, 4}.
func addNewInterval(existing []trange, n trange) []trange { func addNewInterval(existing []trange, n trange) []trange {
for i, r := range existing { for i, r := range existing {
// TODO(gouthamve): Make this codepath easier to digest. // TODO(gouthamve): Make this codepath easier to digest.

View file

@ -4,6 +4,7 @@ import (
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"sort"
"testing" "testing"
"time" "time"
@ -94,3 +95,107 @@ func TestAddingNewIntervals(t *testing.T) {
} }
return return
} }
func TestTombstoneReadersSeek(t *testing.T) {
// This is assuming that the listPostings is perfect.
table := struct {
m map[uint32][]trange
cases []uint32
}{
m: map[uint32][]trange{
2: []trange{{1, 2}},
3: []trange{{1, 4}, {5, 6}},
4: []trange{{10, 15}, {16, 20}},
5: []trange{{1, 4}, {5, 6}},
50: []trange{{10, 20}, {35, 50}},
600: []trange{{100, 2000}},
1000: []trange{},
1500: []trange{{10000, 500000}},
1600: []trange{{1, 2}, {3, 4}, {4, 5}, {6, 7}},
},
cases: []uint32{1, 10, 20, 40, 30, 20, 50, 599, 601, 1000, 1600, 1601, 2000},
}
testFunc := func(t *testing.T, tr TombstoneReader) {
for _, ref := range table.cases {
// Create the listPostings.
refs := make([]uint32, 0, len(table.m))
for k := range table.m {
refs = append(refs, k)
}
sort.Sort(uint32slice(refs))
pr := newListPostings(refs)
// Compare both.
trc := tr.Copy()
require.Equal(t, pr.Seek(ref), trc.Seek(ref))
if pr.Seek(ref) {
require.Equal(t, pr.At(), trc.At().ref)
require.Equal(t, table.m[pr.At()], trc.At().ranges)
}
for pr.Next() {
require.True(t, trc.Next())
require.Equal(t, pr.At(), trc.At().ref)
require.Equal(t, table.m[pr.At()], trc.At().ranges)
}
require.False(t, trc.Next())
require.NoError(t, pr.Err())
require.NoError(t, tr.Err())
}
}
t.Run("tombstoneReader", func(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
mtr := newMapTombstoneReader(table.m)
writeTombstoneFile(tmpdir, mtr)
tr, err := readTombstoneFile(tmpdir)
require.NoError(t, err)
testFunc(t, tr)
return
})
t.Run("mapTombstoneReader", func(t *testing.T) {
mtr := newMapTombstoneReader(table.m)
testFunc(t, mtr)
return
})
t.Run("simpleTombstoneReader", func(t *testing.T) {
ranges := []trange{{1, 2}, {3, 4}, {5, 6}}
for _, ref := range table.cases {
// Create the listPostings.
refs := make([]uint32, 0, len(table.m))
for k := range table.m {
refs = append(refs, k)
}
sort.Sort(uint32slice(refs))
pr := newListPostings(refs[:])
tr := newSimpleTombstoneReader(refs[:], ranges)
// Compare both.
trc := tr.Copy()
require.Equal(t, pr.Seek(ref), trc.Seek(ref))
if pr.Seek(ref) {
require.Equal(t, pr.At(), trc.At().ref)
require.Equal(t, ranges, tr.At().ranges)
}
for pr.Next() {
require.True(t, trc.Next())
require.Equal(t, pr.At(), trc.At().ref, "refs")
require.Equal(t, ranges, trc.At().ranges)
}
require.False(t, trc.Next())
require.NoError(t, pr.Err())
require.NoError(t, tr.Err())
}
return
})
}