Add Tombstones() method to Block.

Also add Seek() to TombstoneReader

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-16 19:48:28 +05:30
parent 3de55171d3
commit cea3c88f17
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
3 changed files with 99 additions and 16 deletions

View file

@ -41,6 +41,9 @@ type DiskBlock interface {
// Chunks returns a ChunkReader over the block's data.
Chunks() ChunkReader
// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() TombstoneReader
// Delete deletes data from the block.
Delete(mint, maxt int64, ms ...labels.Matcher) error
@ -241,6 +244,10 @@ type persistedBlock struct {
chunkr *chunkReader
indexr *indexReader
// For tombstones.
stones []uint32
tombstones map[uint32][]trange
}
func newPersistedBlock(dir string) (*persistedBlock, error) {
@ -258,11 +265,23 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, err
}
tr, err := readTombstoneFile(dir)
if err != nil {
return nil, err
}
ts := make(map[uint32][]trange)
for tr.Next() {
s := tr.At()
ts[s.ref] = s.ranges
}
pb := &persistedBlock{
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: ts,
}
return pb, nil
}
@ -292,7 +311,10 @@ func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
func (pb *persistedBlock) Tombstones() TombstoneReader {
return newMapTombstoneReader(pb.tombstones)
}
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
pr := newPostingsReader(pb.indexr)
@ -348,6 +370,7 @@ type stone struct {
// TombstoneReader is the iterator over tombstones.
type TombstoneReader interface {
Next() bool
Seek(ref uint32) bool
At() stone
Err() error
}
@ -402,6 +425,16 @@ func (t *tombstoneReader) Next() bool {
return t.idx < t.len
}
func (t *tombstoneReader) Seek(ref uint32) bool {
bytIdx := t.idx * 12
t.idx += sort.Search(t.len-t.idx, func(i int) bool {
return binary.BigEndian.Uint32(t.b[bytIdx+i*12:]) >= ref
})
return t.idx < t.len
}
func (t *tombstoneReader) At() stone {
bytIdx := t.idx * (4 + 8)
dat := t.stones[bytIdx : bytIdx+12]
@ -443,6 +476,7 @@ type mapTombstoneReader struct {
stones map[uint32][]trange
}
// TODO(gouthamve): Take pre-sorted refs.
func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader {
refs := make([]uint32, 0, len(ts))
for k := range ts {
@ -463,6 +497,25 @@ func (t *mapTombstoneReader) Next() bool {
return false
}
func (t *mapTombstoneReader) Seek(ref uint32) bool {
// If the current value satisfies, then return.
if t.cur >= ref {
return true
}
// Do binary search between current position and end.
i := sort.Search(len(t.refs), func(i int) bool {
return t.refs[i] >= ref
})
if i < len(t.refs) {
t.cur = t.refs[i]
t.refs = t.refs[i+1:]
return true
}
t.refs = nil
return false
}
func (t *mapTombstoneReader) At() stone {
return stone{ref: t.cur, ranges: t.stones[t.cur]}
}
@ -492,6 +545,25 @@ func (t *simpleTombstoneReader) Next() bool {
return false
}
func (t *simpleTombstoneReader) Seek(ref uint32) bool {
// If the current value satisfies, then return.
if t.cur >= ref {
return true
}
// Do binary search between current position and end.
i := sort.Search(len(t.refs), func(i int) bool {
return t.refs[i] >= ref
})
if i < len(t.refs) {
t.cur = t.refs[i]
t.refs = t.refs[i+1:]
return true
}
t.refs = nil
return false
}
func (t *simpleTombstoneReader) At() stone {
return stone{ref: t.cur, ranges: t.ranges}
}
@ -554,6 +626,17 @@ func (t *mergedTombstoneReader) Next() bool {
return true
}
func (t *mergedTombstoneReader) Seek(ref uint32) bool {
if t.cur.ref >= ref {
return true
}
t.aok = t.a.Seek(ref)
t.bok = t.b.Seek(ref)
t.initialized = true
return t.Next()
}
func (t *mergedTombstoneReader) At() stone {
return t.cur
}

25
head.go
View file

@ -210,19 +210,24 @@ func (h *HeadBlock) Meta() BlockMeta {
return m
}
// Dir implements headBlock
// Dir implements headBlock.
func (h *HeadBlock) Dir() string { return h.dir }
// Persisted implements headBlock
// Persisted implements headBlock.
func (h *HeadBlock) Persisted() bool { return false }
// Index implements headBlock
// Index implements headBlock.
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
// Chunks implements headBlock
// Chunks implements headBlock.
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
// Delete implements headBlock
// Tombstones implements headBlock.
func (h *HeadBlock) Tombstones() TombstoneReader {
return newMapTombstoneReader(h.tombstones)
}
// Delete implements headBlock.
func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error {
h.mtx.RLock()
@ -246,13 +251,7 @@ Outer:
}
}
rs, ok := h.tombstones[ref]
if !ok {
h.tombstones[ref] = []trange{{mint, maxt}}
continue
}
h.tombstones[ref] = addNewInterval(rs, trange{mint, maxt})
h.tombstones[ref] = addNewInterval(h.tombstones[ref], trange{mint, maxt})
}
if p.Err() != nil {
@ -262,7 +261,7 @@ Outer:
return writeTombstoneFile(h.dir, newMapTombstoneReader(h.tombstones))
}
// Querier implements Queryable and headBlock
// Querier implements Queryable and headBlock.
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
h.mtx.RLock()
defer h.mtx.RUnlock()

View file

@ -126,8 +126,9 @@ func (q *querier) Close() error {
// blockQuerier provides querying access to a single block database.
type blockQuerier struct {
index IndexReader
chunks ChunkReader
index IndexReader
chunks ChunkReader
tombstones TombstoneReader
postingsMapper func(Postings) Postings