Make TombstoneReader a Getter.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-24 11:24:24 +05:30
parent 9bf7aa9af1
commit f29fb62fba
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
9 changed files with 79 additions and 585 deletions

View file

@ -162,7 +162,7 @@ type persistedBlock struct {
indexr *indexReader indexr *indexReader
// For tombstones. // For tombstones.
tombstones *mapTombstoneReader tombstones tombstoneReader
} }
func newPersistedBlock(dir string) (*persistedBlock, error) { func newPersistedBlock(dir string) (*persistedBlock, error) {
@ -180,25 +180,18 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, err return nil, err
} }
tr, err := readTombstoneFile(dir) tr, err := readTombstones(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ts := make(map[uint32]intervals)
for tr.Next() {
s := tr.At()
ts[s.ref] = s.intervals
}
pb := &persistedBlock{ pb := &persistedBlock{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
chunkr: cr, chunkr: cr,
indexr: ir, indexr: ir,
// TODO(gouthamve): We will be sorting the refs again internally, is it a big deal? tombstones: tr,
tombstones: newMapTombstoneReader(ts),
} }
return pb, nil return pb, nil
} }
@ -230,7 +223,7 @@ func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Tombstones() TombstoneReader { func (pb *persistedBlock) Tombstones() TombstoneReader {
return pb.tombstones.Copy() return pb.tombstones
} }
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
@ -277,16 +270,18 @@ Outer:
} }
// Merge the current and new tombstones. // Merge the current and new tombstones.
tr := pb.Tombstones() for k, v := range pb.tombstones {
str := newMapTombstoneReader(delStones) for _, itv := range v {
tombreader := newMergedTombstoneReader(tr, str) delStones[k] = delStones[k].add(itv)
}
}
tombreader := newTombstoneReader(delStones)
if err := writeTombstoneFile(pb.dir, tombreader); err != nil { if err := writeTombstoneFile(pb.dir, tombreader); err != nil {
return err return err
} }
// TODO(gouthamve): This counts any common tombstones too. But gives the same heuristic. pb.meta.NumTombstones = int64(len(delStones))
pb.meta.NumTombstones += int64(len(delStones))
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }

View file

@ -432,14 +432,8 @@ func (c *compactionSeriesSet) Next() bool {
return false return false
} }
if c.tombstones.Seek(c.p.At()) { c.intervals = c.tombstones.At(c.p.At())
s := c.tombstones.At()
if c.p.At() == s.ref {
c.intervals = s.intervals
} else {
c.intervals = nil
}
}
c.l, c.c, c.err = c.index.Series(c.p.At()) c.l, c.c, c.err = c.index.Series(c.p.At())
if c.err != nil { if c.err != nil {
return false return false

16
head.go
View file

@ -69,7 +69,7 @@ type HeadBlock struct {
values map[string]stringset // label names to possible values values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms postings *memPostings // postings lists for terms
tombstones *mapTombstoneReader tombstones tombstoneReader
meta BlockMeta meta BlockMeta
} }
@ -153,7 +153,7 @@ func (h *HeadBlock) init() error {
deletesFunc := func(stones []stone) error { deletesFunc := func(stones []stone) error {
for _, s := range stones { for _, s := range stones {
for _, itv := range s.intervals { for _, itv := range s.intervals {
h.tombstones.stones[s.ref] = h.tombstones.stones[s.ref].add(itv) h.tombstones[s.ref] = h.tombstones[s.ref].add(itv)
} }
} }
@ -163,7 +163,6 @@ func (h *HeadBlock) init() error {
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL") return errors.Wrap(err, "consume WAL")
} }
h.tombstones = newMapTombstoneReader(h.tombstones.stones)
return nil return nil
} }
@ -221,7 +220,7 @@ func (h *HeadBlock) Meta() BlockMeta {
// Tombstones returns the TombstoneReader against the block. // Tombstones returns the TombstoneReader against the block.
func (h *HeadBlock) Tombstones() TombstoneReader { func (h *HeadBlock) Tombstones() TombstoneReader {
return h.tombstones.Copy() return h.tombstones
} }
// Delete implements headBlock. // Delete implements headBlock.
@ -257,16 +256,15 @@ Outer:
if p.Err() != nil { if p.Err() != nil {
return p.Err() return p.Err()
} }
if err := h.wal.LogDeletes(newMapTombstoneReader(newStones)); err != nil { if err := h.wal.LogDeletes(newTombstoneReader(newStones)); err != nil {
return err return err
} }
for k, v := range newStones { for k, v := range newStones {
h.tombstones.stones[k] = h.tombstones.stones[k].add(v[0]) h.tombstones[k] = h.tombstones[k].add(v[0])
} }
h.tombstones = newMapTombstoneReader(h.tombstones.stones)
h.meta.NumTombstones = int64(len(h.tombstones.stones)) h.meta.NumTombstones = int64(len(h.tombstones))
return nil return nil
} }
@ -296,7 +294,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
maxt: maxt, maxt: maxt,
index: h.Index(), index: h.Index(),
chunks: h.Chunks(), chunks: h.Chunks(),
tombstones: h.Tombstones().Copy(), tombstones: h.Tombstones(),
postingsMapper: func(p Postings) Postings { postingsMapper: func(p Postings) Postings {
ep := make([]uint32, 0, 64) ep := make([]uint32, 0, 64)

View file

@ -151,7 +151,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
index: q.index, index: q.index,
absent: absent, absent: absent,
tombstones: q.tombstones.Copy(), tombstones: q.tombstones,
}, },
chunks: q.chunks, chunks: q.chunks,
mint: q.mint, mint: q.mint,
@ -412,9 +412,9 @@ Outer:
s.lset = lset s.lset = lset
s.chks = chunks s.chks = chunks
if s.tombstones.Seek(ref) && s.tombstones.At().ref == ref { s.intervals = s.tombstones.At(s.p.At())
s.intervals = s.tombstones.At().intervals
if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted. // Only those chunks that are not entirely deleted.
chks := make([]*ChunkMeta, 0, len(s.chks)) chks := make([]*ChunkMeta, 0, len(s.chks))
for _, chk := range s.chks { for _, chk := range s.chks {

View file

@ -432,7 +432,7 @@ func TestBlockQuerierDelete(t *testing.T) {
chunks [][]sample chunks [][]sample
} }
tombstones *mapTombstoneReader tombstones tombstoneReader
queries []query queries []query
}{ }{
data: []struct { data: []struct {
@ -480,7 +480,7 @@ func TestBlockQuerierDelete(t *testing.T) {
}, },
}, },
}, },
tombstones: newMapTombstoneReader( tombstones: newTombstoneReader(
map[uint32]intervals{ map[uint32]intervals{
1: intervals{{1, 3}}, 1: intervals{{1, 3}},
2: intervals{{1, 3}, {6, 10}}, 2: intervals{{1, 3}, {6, 10}},
@ -553,7 +553,7 @@ Outer:
querier := &blockQuerier{ querier := &blockQuerier{
index: ir, index: ir,
chunks: cr, chunks: cr,
tombstones: cases.tombstones.Copy(), tombstones: cases.tombstones,
mint: c.mint, mint: c.mint,
maxt: c.maxt, maxt: c.maxt,

View file

@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort"
) )
const tombstoneFilename = "tombstones" const tombstoneFilename = "tombstones"
@ -19,11 +18,7 @@ const (
tombstoneFormatV1 = 1 tombstoneFormatV1 = 1
) )
func readTombstoneFile(dir string) (TombstoneReader, error) { func writeTombstoneFile(dir string, tr tombstoneReader) error {
return newTombStoneReader(dir)
}
func writeTombstoneFile(dir string, tr TombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename) path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
@ -48,15 +43,13 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
} }
pos += int64(n) pos += int64(n)
for tr.Next() { for k, v := range tr {
s := tr.At() refs = append(refs, k)
stoneOff[k] = pos
refs = append(refs, s.ref)
stoneOff[s.ref] = pos
// Write the ranges. // Write the ranges.
buf.reset() buf.reset()
buf.putUvarint(len(s.intervals)) buf.putUvarint(len(v))
n, err := f.Write(buf.get()) n, err := f.Write(buf.get())
if err != nil { if err != nil {
return err return err
@ -64,7 +57,7 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
pos += int64(n) pos += int64(n)
buf.reset() buf.reset()
for _, r := range s.intervals { for _, r := range v {
buf.putVarint64(r.mint) buf.putVarint64(r.mint)
buf.putVarint64(r.maxt) buf.putVarint64(r.maxt)
} }
@ -76,9 +69,6 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
} }
pos += int64(n) pos += int64(n)
} }
if err := tr.Err(); err != nil {
return err
}
// Write the offset table. // Write the offset table.
// Pad first. // Pad first.
@ -130,25 +120,10 @@ type stone struct {
// TombstoneReader is the iterator over tombstones. // TombstoneReader is the iterator over tombstones.
type TombstoneReader interface { type TombstoneReader interface {
Next() bool At(ref uint32) intervals
Seek(ref uint32) bool
At() stone
// Copy copies the current reader state. Changes to the copy will not affect parent.
Copy() TombstoneReader
Err() error
} }
type tombstoneReader struct { func readTombstones(dir string) (tombstoneReader, error) {
stones []byte
cur stone
b []byte
err error
}
func newTombStoneReader(dir string) (*tombstoneReader, error) {
// TODO(gouthamve): MMAP?
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if err != nil { if err != nil {
return nil, err return nil, err
@ -173,292 +148,50 @@ func newTombStoneReader(dir string) (*tombstoneReader, error) {
} }
off += 4 // For the numStones which has been read. off += 4 // For the numStones which has been read.
return &tombstoneReader{ stones := b[off : off+int64(numStones*12)]
stones: b[off : off+int64(numStones*12)], stonesMap := make(map[uint32]intervals)
for len(stones) >= 12 {
d := &decbuf{b: stones[:12]}
ref := d.be32()
off := d.be64int64()
b: b, d = &decbuf{b: b[off:]}
}, nil numRanges := d.uvarint()
}
func (t *tombstoneReader) Next() bool {
if t.err != nil {
return false
}
if len(t.stones) < 12 {
return false
}
d := &decbuf{b: t.stones[:12]}
ref := d.be32()
off := d.be64int64()
d = &decbuf{b: t.b[off:]}
numRanges := d.uvarint()
if err := d.err(); err != nil {
t.err = err
return false
}
dranges := make(intervals, 0, numRanges)
for i := 0; i < int(numRanges); i++ {
mint := d.varint64()
maxt := d.varint64()
if err := d.err(); err != nil { if err := d.err(); err != nil {
t.err = err return nil, err
return false
} }
dranges = append(dranges, interval{mint, maxt}) dranges := make(intervals, 0, numRanges)
} for i := 0; i < int(numRanges); i++ {
mint := d.varint64()
maxt := d.varint64()
if err := d.err(); err != nil {
return nil, err
}
// TODO(gouthamve): Verify checksum. dranges = append(dranges, interval{mint, maxt})
t.stones = t.stones[12:]
t.cur = stone{ref: ref, intervals: dranges}
return true
}
func (t *tombstoneReader) Seek(ref uint32) bool {
i := sort.Search(len(t.stones)/12, func(i int) bool {
x := binary.BigEndian.Uint32(t.stones[i*12:])
return x >= ref
})
if i*12 < len(t.stones) {
t.stones = t.stones[i*12:]
return t.Next()
}
t.stones = nil
return false
}
func (t *tombstoneReader) At() stone {
return t.cur
}
func (t *tombstoneReader) Copy() TombstoneReader {
return &tombstoneReader{
stones: t.stones[:],
cur: t.cur,
b: t.b,
}
}
func (t *tombstoneReader) Err() error {
return t.err
}
type mapTombstoneReader struct {
refs []uint32
cur uint32
stones map[uint32]intervals
}
func newMapTombstoneReader(ts map[uint32]intervals) *mapTombstoneReader {
refs := make([]uint32, 0, len(ts))
for k := range ts {
refs = append(refs, k)
}
sort.Sort(uint32slice(refs))
return &mapTombstoneReader{stones: ts, refs: refs}
}
func newEmptyTombstoneReader() *mapTombstoneReader {
return &mapTombstoneReader{stones: make(map[uint32]intervals)}
}
func (t *mapTombstoneReader) Next() bool {
if len(t.refs) > 0 {
t.cur = t.refs[0]
t.refs = t.refs[1:]
return true
}
t.cur = 0
return false
}
func (t *mapTombstoneReader) Seek(ref uint32) bool {
// If the current value satisfies, then return.
if t.cur >= ref {
return true
}
// Do binary search between current position and end.
i := sort.Search(len(t.refs), func(i int) bool {
return t.refs[i] >= ref
})
if i < len(t.refs) {
t.cur = t.refs[i]
t.refs = t.refs[i+1:]
return true
}
t.refs = nil
return false
}
func (t *mapTombstoneReader) At() stone {
return stone{ref: t.cur, intervals: t.stones[t.cur]}
}
func (t *mapTombstoneReader) Copy() TombstoneReader {
return &mapTombstoneReader{
refs: t.refs[:],
cur: t.cur,
stones: t.stones,
}
}
func (t *mapTombstoneReader) Err() error {
return nil
}
type simpleTombstoneReader struct {
refs []uint32
cur uint32
intervals intervals
}
func newSimpleTombstoneReader(refs []uint32, dranges intervals) *simpleTombstoneReader {
return &simpleTombstoneReader{refs: refs, intervals: dranges}
}
func (t *simpleTombstoneReader) Next() bool {
if len(t.refs) > 0 {
t.cur = t.refs[0]
t.refs = t.refs[1:]
return true
}
t.cur = 0
return false
}
func (t *simpleTombstoneReader) Seek(ref uint32) bool {
// If the current value satisfies, then return.
if t.cur >= ref {
return true
}
// Do binary search between current position and end.
i := sort.Search(len(t.refs), func(i int) bool {
return t.refs[i] >= ref
})
if i < len(t.refs) {
t.cur = t.refs[i]
t.refs = t.refs[i+1:]
return true
}
t.refs = nil
return false
}
func (t *simpleTombstoneReader) At() stone {
return stone{ref: t.cur, intervals: t.intervals}
}
func (t *simpleTombstoneReader) Copy() TombstoneReader {
return &simpleTombstoneReader{refs: t.refs[:], cur: t.cur, intervals: t.intervals}
}
func (t *simpleTombstoneReader) Err() error {
return nil
}
type mergedTombstoneReader struct {
a, b TombstoneReader
cur stone
initialized bool
aok, bok bool
}
func newMergedTombstoneReader(a, b TombstoneReader) *mergedTombstoneReader {
return &mergedTombstoneReader{a: a, b: b}
}
func (t *mergedTombstoneReader) Next() bool {
if !t.initialized {
t.aok = t.a.Next()
t.bok = t.b.Next()
t.initialized = true
}
if !t.aok && !t.bok {
return false
}
if !t.aok {
t.cur = t.b.At()
t.bok = t.b.Next()
return true
}
if !t.bok {
t.cur = t.a.At()
t.aok = t.a.Next()
return true
}
acur, bcur := t.a.At(), t.b.At()
if acur.ref < bcur.ref {
t.cur = acur
t.aok = t.a.Next()
} else if acur.ref > bcur.ref {
t.cur = bcur
t.bok = t.b.Next()
} else {
// Merge time ranges.
for _, r := range bcur.intervals {
acur.intervals = acur.intervals.add(r)
} }
t.cur = acur // TODO(gouthamve): Verify checksum.
t.aok = t.a.Next() stones = stones[12:]
t.bok = t.b.Next() stonesMap[ref] = dranges
}
return true
}
func (t *mergedTombstoneReader) Seek(ref uint32) bool {
if t.cur.ref >= ref {
return true
} }
t.aok = t.a.Seek(ref) return newTombstoneReader(stonesMap), nil
t.bok = t.b.Seek(ref)
t.initialized = true
return t.Next()
}
func (t *mergedTombstoneReader) At() stone {
return t.cur
} }
func (t *mergedTombstoneReader) Copy() TombstoneReader { type tombstoneReader map[uint32]intervals
return &mergedTombstoneReader{
a: t.a.Copy(),
b: t.b.Copy(),
cur: t.cur, func newTombstoneReader(ts map[uint32]intervals) tombstoneReader {
return tombstoneReader(ts)
initialized: t.initialized,
aok: t.aok,
bok: t.bok,
}
} }
func (t *mergedTombstoneReader) Err() error { func newEmptyTombstoneReader() tombstoneReader {
if t.a.Err() != nil { return tombstoneReader(make(map[uint32]intervals))
return t.a.Err() }
}
return t.b.Err() func (t tombstoneReader) At(ref uint32) intervals {
return t[ref]
} }
type interval struct { type interval struct {

View file

@ -4,7 +4,6 @@ import (
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"sort"
"testing" "testing"
"time" "time"
@ -31,20 +30,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
stones[ref] = dranges stones[ref] = dranges
} }
require.NoError(t, writeTombstoneFile(tmpdir, newMapTombstoneReader(stones))) require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones)))
restr, err := readTombstoneFile(tmpdir) restr, err := readTombstones(tmpdir)
require.NoError(t, err) require.NoError(t, err)
exptr := newMapTombstoneReader(stones) exptr := newTombstoneReader(stones)
// Compare the two readers. // Compare the two readers.
for restr.Next() { require.Equal(t, restr, exptr)
require.True(t, exptr.Next())
require.Equal(t, exptr.At(), restr.At())
}
require.False(t, exptr.Next())
require.NoError(t, restr.Err())
require.NoError(t, exptr.Err())
} }
func TestAddingNewIntervals(t *testing.T) { func TestAddingNewIntervals(t *testing.T) {
@ -116,218 +108,3 @@ func TestAddingNewIntervals(t *testing.T) {
} }
return return
} }
func TestTombstoneReadersSeek(t *testing.T) {
// This is assuming that the listPostings is perfect.
table := struct {
m map[uint32]intervals
cases []uint32
}{
m: map[uint32]intervals{
2: intervals{{1, 2}},
3: intervals{{1, 4}, {5, 6}},
4: intervals{{10, 15}, {16, 20}},
5: intervals{{1, 4}, {5, 6}},
50: intervals{{10, 20}, {35, 50}},
600: intervals{{100, 2000}},
1000: intervals{},
1500: intervals{{10000, 500000}},
1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}},
},
cases: []uint32{1, 10, 20, 40, 30, 20, 50, 599, 601, 1000, 1600, 1601, 2000},
}
testFunc := func(t *testing.T, tr TombstoneReader) {
for _, ref := range table.cases {
// Create the listPostings.
refs := make([]uint32, 0, len(table.m))
for k := range table.m {
refs = append(refs, k)
}
sort.Sort(uint32slice(refs))
pr := newListPostings(refs)
// Compare both.
trc := tr.Copy()
require.Equal(t, pr.Seek(ref), trc.Seek(ref))
if pr.Seek(ref) {
require.Equal(t, pr.At(), trc.At().ref)
require.Equal(t, table.m[pr.At()], trc.At().intervals)
}
for pr.Next() {
require.True(t, trc.Next())
require.Equal(t, pr.At(), trc.At().ref)
require.Equal(t, table.m[pr.At()], trc.At().intervals)
}
require.False(t, trc.Next())
require.NoError(t, pr.Err())
require.NoError(t, tr.Err())
}
}
t.Run("tombstoneReader", func(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
mtr := newMapTombstoneReader(table.m)
writeTombstoneFile(tmpdir, mtr)
tr, err := readTombstoneFile(tmpdir)
require.NoError(t, err)
testFunc(t, tr)
return
})
t.Run("mapTombstoneReader", func(t *testing.T) {
mtr := newMapTombstoneReader(table.m)
testFunc(t, mtr)
return
})
t.Run("simpleTombstoneReader", func(t *testing.T) {
dranges := intervals{{1, 2}, {3, 4}, {5, 6}}
for _, ref := range table.cases {
// Create the listPostings.
refs := make([]uint32, 0, len(table.m))
for k := range table.m {
refs = append(refs, k)
}
sort.Sort(uint32slice(refs))
pr := newListPostings(refs[:])
tr := newSimpleTombstoneReader(refs[:], dranges)
// Compare both.
trc := tr.Copy()
require.Equal(t, pr.Seek(ref), trc.Seek(ref))
if pr.Seek(ref) {
require.Equal(t, pr.At(), trc.At().ref)
require.Equal(t, dranges, tr.At().intervals)
}
for pr.Next() {
require.True(t, trc.Next())
require.Equal(t, pr.At(), trc.At().ref, "refs")
require.Equal(t, dranges, trc.At().intervals)
}
require.False(t, trc.Next())
require.NoError(t, pr.Err())
require.NoError(t, tr.Err())
}
return
})
}
func TestMergedTombstoneReader(t *testing.T) {
cases := []struct {
a, b TombstoneReader
exp TombstoneReader
}{
{
a: newMapTombstoneReader(
map[uint32]intervals{
2: intervals{{1, 2}},
3: intervals{{1, 4}, {6, 6}},
4: intervals{{10, 15}, {16, 20}},
5: intervals{{1, 4}, {5, 6}},
50: intervals{{10, 20}, {35, 50}},
600: intervals{{100, 2000}},
1000: intervals{},
1500: intervals{{10000, 500000}},
1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}},
},
),
b: newMapTombstoneReader(
map[uint32]intervals{
2: intervals{{1, 2}},
3: intervals{{5, 6}},
4: intervals{{10, 15}, {16, 20}},
5: intervals{{1, 4}, {5, 6}},
50: intervals{{10, 20}, {35, 50}},
600: intervals{{100, 2000}},
1000: intervals{},
1500: intervals{{10000, 500000}},
1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}},
},
),
exp: newMapTombstoneReader(
map[uint32]intervals{
2: intervals{{1, 2}},
3: intervals{{1, 6}},
4: intervals{{10, 20}},
5: intervals{{1, 6}},
50: intervals{{10, 20}, {35, 50}},
600: intervals{{100, 2000}},
1000: intervals{},
1500: intervals{{10000, 500000}},
1600: intervals{{1, 7}},
},
),
},
{
a: newMapTombstoneReader(
map[uint32]intervals{
2: intervals{{1, 2}},
3: intervals{{1, 4}, {6, 6}},
4: intervals{{10, 15}, {17, 20}},
5: intervals{{1, 6}},
50: intervals{{10, 20}, {35, 50}},
600: intervals{{100, 2000}},
1000: intervals{},
1500: intervals{{10000, 500000}},
1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}},
},
),
b: newMapTombstoneReader(
map[uint32]intervals{
20: intervals{{1, 2}},
30: intervals{{1, 4}, {5, 6}},
40: intervals{{10, 15}, {16, 20}},
60: intervals{{1, 4}, {5, 6}},
500: intervals{{10, 20}, {35, 50}},
6000: intervals{{100, 2000}},
10000: intervals{},
15000: intervals{{10000, 500000}},
1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}},
},
),
exp: newMapTombstoneReader(
map[uint32]intervals{
2: intervals{{1, 2}},
3: intervals{{1, 4}, {6, 6}},
4: intervals{{10, 15}, {17, 20}},
5: intervals{{1, 6}},
50: intervals{{10, 20}, {35, 50}},
600: intervals{{100, 2000}},
1000: intervals{},
1500: intervals{{10000, 500000}},
20: intervals{{1, 2}},
30: intervals{{1, 4}, {5, 6}},
40: intervals{{10, 15}, {16, 20}},
60: intervals{{1, 4}, {5, 6}},
500: intervals{{10, 20}, {35, 50}},
6000: intervals{{100, 2000}},
10000: intervals{},
15000: intervals{{10000, 500000}},
1600: intervals{{1, 7}},
},
),
},
}
for _, c := range cases {
res := newMergedTombstoneReader(c.a, c.b)
for c.exp.Next() {
require.True(t, res.Next())
require.Equal(t, c.exp.At(), res.At())
}
require.False(t, res.Next())
}
return
}

15
wal.go
View file

@ -83,7 +83,7 @@ type WAL interface {
Reader() WALReader Reader() WALReader
LogSeries([]labels.Labels) error LogSeries([]labels.Labels) error
LogSamples([]RefSample) error LogSamples([]RefSample) error
LogDeletes(TombstoneReader) error LogDeletes(tombstoneReader) error
Close() error Close() error
} }
@ -180,7 +180,7 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
} }
// LogDeletes write a batch of new deletes to the log. // LogDeletes write a batch of new deletes to the log.
func (w *SegmentWAL) LogDeletes(tr TombstoneReader) error { func (w *SegmentWAL) LogDeletes(tr tombstoneReader) error {
if err := w.encodeDeletes(tr); err != nil { if err := w.encodeDeletes(tr); err != nil {
return err return err
} }
@ -483,17 +483,16 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
return w.entry(WALEntrySamples, walSamplesSimple, buf) return w.entry(WALEntrySamples, walSamplesSimple, buf)
} }
func (w *SegmentWAL) encodeDeletes(tr TombstoneReader) error { func (w *SegmentWAL) encodeDeletes(tr tombstoneReader) error {
b := make([]byte, 2*binary.MaxVarintLen64) b := make([]byte, 2*binary.MaxVarintLen64)
eb := &encbuf{b: b} eb := &encbuf{b: b}
buf := getWALBuffer() buf := getWALBuffer()
for tr.Next() { for k, v := range tr {
eb.reset() eb.reset()
s := tr.At() eb.putUvarint32(k)
eb.putUvarint32(s.ref) eb.putUvarint(len(v))
eb.putUvarint(len(s.intervals))
buf = append(buf, eb.get()...) buf = append(buf, eb.get()...)
for _, itv := range s.intervals { for _, itv := range v {
eb.reset() eb.reset()
eb.putVarint64(itv.mint) eb.putVarint64(itv.mint)
eb.putVarint64(itv.maxt) eb.putVarint64(itv.maxt)

View file

@ -189,7 +189,6 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
return nil return nil
} }
// TODO: Add this.
delf := func(stones []stone) error { delf := func(stones []stone) error {
if len(stones) > 0 { if len(stones) > 0 {
cstones := make([]stone, len(stones)) cstones := make([]stone, len(stones))
@ -230,7 +229,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
require.NoError(t, w.LogSeries(lbls)) require.NoError(t, w.LogSeries(lbls))
require.NoError(t, w.LogSamples(samples)) require.NoError(t, w.LogSamples(samples))
require.NoError(t, w.LogDeletes(newMapTombstoneReader(stones))) require.NoError(t, w.LogDeletes(newTombstoneReader(stones)))
if len(lbls) > 0 { if len(lbls) > 0 {
recordedSeries = append(recordedSeries, lbls) recordedSeries = append(recordedSeries, lbls)
@ -240,12 +239,11 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
totalSamples += len(samples) totalSamples += len(samples)
} }
if len(stones) > 0 { if len(stones) > 0 {
tr := newMapTombstoneReader(stones) tr := newTombstoneReader(stones)
newdels := []stone{} newdels := []stone{}
for tr.Next() { for k, v := range tr {
newdels = append(newdels, tr.At()) newdels = append(newdels, stone{k, v})
} }
require.NoError(t, tr.Err())
recordedDeletes = append(recordedDeletes, newdels) recordedDeletes = append(recordedDeletes, newdels)
} }