Refactor tombstone reader types

This commit is contained in:
Fabian Reinartz 2017-11-13 13:32:24 +01:00
parent e5ce2bef43
commit 3ef4326114
8 changed files with 77 additions and 50 deletions

View file

@ -142,10 +142,9 @@ type Block struct {
dir string dir string
meta BlockMeta meta BlockMeta
chunkr ChunkReader chunkr ChunkReader
indexr IndexReader indexr IndexReader
tombstones TombstoneReader
tombstones tombstoneReader
} }
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
@ -293,7 +292,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
stones := map[uint64]Intervals{} stones := memTombstones{}
var lset labels.Labels var lset labels.Labels
var chks []ChunkMeta var chks []ChunkMeta
@ -325,16 +324,21 @@ Outer:
return p.Err() return p.Err()
} }
// Merge the current and new tombstones. err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for k, v := range stones { for _, iv := range ivs {
pb.tombstones.add(k, v[0]) stones.add(id, iv)
pb.meta.Stats.NumTombstones++
}
return nil
})
if err != nil {
return err
} }
pb.tombstones = stones
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err return err
} }
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }

View file

@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
// Create an empty tombstones file. // Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil {
return errors.Wrap(err, "write new tombstones file") return errors.Wrap(err, "write new tombstones file")
} }
@ -631,7 +631,11 @@ func (c *compactionSeriesSet) Next() bool {
} }
var err error var err error
c.intervals = c.tombstones.Get(c.p.At()) c.intervals, err = c.tombstones.Get(c.p.At())
if err != nil {
c.err = errors.Wrap(err, "get tombstones")
return false
}
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At()) c.err = errors.Wrapf(err, "get series %d", c.p.At())

View file

@ -66,7 +66,7 @@ type Head struct {
postings *memPostings // postings lists for terms postings *memPostings // postings lists for terms
tombstones tombstoneReader tombstones memTombstones
} }
type headMetrics struct { type headMetrics struct {
@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: newUnorderedMemPostings(), postings: newUnorderedMemPostings(),
tombstones: newEmptyTombstoneReader(), tombstones: memTombstones{},
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)

View file

@ -318,7 +318,7 @@ func TestHeadDeleteSimple(t *testing.T) {
Outer: Outer:
for _, c := range cases { for _, c := range cases {
// Reset the tombstones. // Reset the tombstones.
head.tombstones = newEmptyTombstoneReader() head.tombstones = memTombstones{}
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.intervals {

View file

@ -465,6 +465,7 @@ func (s *baseChunkSeries) Next() bool {
var ( var (
lset labels.Labels lset labels.Labels
chunks []ChunkMeta chunks []ChunkMeta
err error
) )
Outer: Outer:
for s.p.Next() { for s.p.Next() {
@ -487,7 +488,11 @@ Outer:
s.lset = lset s.lset = lset
s.chks = chunks s.chks = chunks
s.intervals = s.tombstones.Get(s.p.At()) s.intervals, err = s.tombstones.Get(s.p.At())
if err != nil {
s.err = errors.Wrap(err, "get tombstones")
return false
}
if len(s.intervals) > 0 { if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted. // Only those chunks that are not entirely deleted.

View file

@ -454,7 +454,7 @@ Outer:
querier := &blockQuerier{ querier := &blockQuerier{
index: ir, index: ir,
chunks: cr, chunks: cr,
tombstones: newEmptyTombstoneReader(), tombstones: EmptyTombstoneReader(),
mint: c.mint, mint: c.mint,
maxt: c.maxt, maxt: c.maxt,
@ -506,7 +506,7 @@ func TestBlockQuerierDelete(t *testing.T) {
chunks [][]sample chunks [][]sample
} }
tombstones tombstoneReader tombstones TombstoneReader
queries []query queries []query
}{ }{
data: []struct { data: []struct {
@ -554,13 +554,11 @@ func TestBlockQuerierDelete(t *testing.T) {
}, },
}, },
}, },
tombstones: newTombstoneReader( tombstones: memTombstones{
map[uint64]Intervals{ 1: Intervals{{1, 3}},
1: Intervals{{1, 3}}, 2: Intervals{{1, 3}, {6, 10}},
2: Intervals{{1, 3}, {6, 10}}, 3: Intervals{{6, 10}},
3: Intervals{{6, 10}}, },
},
),
queries: []query{ queries: []query{
{ {
@ -736,7 +734,7 @@ func TestBaseChunkSeries(t *testing.T) {
bcs := &baseChunkSeries{ bcs := &baseChunkSeries{
p: newListPostings(tc.postings), p: newListPostings(tc.postings),
index: mi, index: mi,
tombstones: newEmptyTombstoneReader(), tombstones: EmptyTombstoneReader(),
} }
i := 0 i := 0

View file

@ -35,12 +35,17 @@ const (
// TombstoneReader gives access to tombstone intervals by series reference. // TombstoneReader gives access to tombstone intervals by series reference.
type TombstoneReader interface { type TombstoneReader interface {
Get(ref uint64) Intervals // Get returns deletion intervals for the series with the given reference.
Get(ref uint64) (Intervals, error)
// Iter calls the given function for each encountered interval.
Iter(func(uint64, Intervals) error) error
// Close any underlying resources
Close() error Close() error
} }
func writeTombstoneFile(dir string, tr tombstoneReader) error { func writeTombstoneFile(dir string, tr TombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename) path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
hash := newCRC32() hash := newCRC32()
@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
mw := io.MultiWriter(f, hash) mw := io.MultiWriter(f, hash)
for k, v := range tr { tr.Iter(func(ref uint64, ivs Intervals) error {
for _, itv := range v { for _, iv := range ivs {
buf.reset() buf.reset()
buf.putUvarint64(k)
buf.putVarint64(itv.Mint) buf.putUvarint64(ref)
buf.putVarint64(itv.Maxt) buf.putVarint64(iv.Mint)
buf.putVarint64(iv.Maxt)
_, err = mw.Write(buf.get()) _, err = mw.Write(buf.get())
if err != nil { if err != nil {
return err return err
} }
} }
} return nil
})
_, err = f.Write(hash.Sum(nil)) _, err = f.Write(hash.Sum(nil))
if err != nil { if err != nil {
@ -100,7 +107,7 @@ type Stone struct {
intervals Intervals intervals Intervals
} }
func readTombstones(dir string) (tombstoneReader, error) { func readTombstones(dir string) (memTombstones, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if err != nil { if err != nil {
return nil, err return nil, err
@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) {
return nil, errors.New("checksum did not match") return nil, errors.New("checksum did not match")
} }
stonesMap := newEmptyTombstoneReader() stonesMap := memTombstones{}
for d.len() > 0 { for d.len() > 0 {
k := d.uvarint64() k := d.uvarint64()
mint := d.varint64() mint := d.varint64()
@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) {
stonesMap.add(k, Interval{mint, maxt}) stonesMap.add(k, Interval{mint, maxt})
} }
return newTombstoneReader(stonesMap), nil return stonesMap, nil
} }
type tombstoneReader map[uint64]Intervals type memTombstones map[uint64]Intervals
func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { var emptyTombstoneReader = memTombstones{}
return tombstoneReader(ts)
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
func EmptyTombstoneReader() TombstoneReader {
return emptyTombstoneReader
} }
func newEmptyTombstoneReader() tombstoneReader { func (t memTombstones) Get(ref uint64) (Intervals, error) {
return tombstoneReader(make(map[uint64]Intervals)) return t[ref], nil
} }
func (t tombstoneReader) Get(ref uint64) Intervals { func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
return t[ref] for ref, ivs := range t {
if err := f(ref, ivs); err != nil {
return err
}
}
return nil
} }
func (t tombstoneReader) add(ref uint64, itv Interval) { func (t memTombstones) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv) t[ref] = t[ref].add(itv)
} }
func (tombstoneReader) Close() error { func (memTombstones) Close() error {
return nil return nil
} }

View file

@ -29,7 +29,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
ref := uint64(0) ref := uint64(0)
stones := make(map[uint64]Intervals) stones := memTombstones{}
// Generate the tombstones. // Generate the tombstones.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
ref += uint64(rand.Int31n(10)) + 1 ref += uint64(rand.Int31n(10)) + 1
@ -43,13 +43,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
stones[ref] = dranges stones[ref] = dranges
} }
require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) require.NoError(t, writeTombstoneFile(tmpdir, stones))
restr, err := readTombstones(tmpdir) restr, err := readTombstones(tmpdir)
require.NoError(t, err) require.NoError(t, err)
exptr := newTombstoneReader(stones)
// Compare the two readers. // Compare the two readers.
require.Equal(t, exptr, restr) require.Equal(t, stones, restr)
} }
func TestAddingNewIntervals(t *testing.T) { func TestAddingNewIntervals(t *testing.T) {