Final delete fixes.

* Make sure no reads happen on the block when delete is in progress.
* Fix bugs in compaction.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-26 16:31:45 +05:30
parent c211ec4f49
commit 6febabeb28
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
4 changed files with 22 additions and 27 deletions

View file

@ -234,7 +234,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
delStones := map[uint32]intervals{} newStones := map[uint32]intervals{}
Outer: Outer:
for p.Next() { for p.Next() {
@ -259,7 +259,7 @@ Outer:
if mint < chunks[0].MinTime { if mint < chunks[0].MinTime {
mint = chunks[0].MinTime mint = chunks[0].MinTime
} }
delStones[p.At()] = intervals{{mint, maxtime}} newStones[p.At()] = intervals{{mint, maxtime}}
continue Outer continue Outer
} }
} }
@ -270,18 +270,15 @@ Outer:
} }
// Merge the current and new tombstones. // Merge the current and new tombstones.
for k, v := range pb.tombstones { for k, v := range newStones {
for _, itv := range v { pb.tombstones[k] = pb.tombstones[k].add(v[0])
delStones[k] = delStones[k].add(itv)
} }
}
tombreader := newTombstoneReader(delStones)
if err := writeTombstoneFile(pb.dir, tombreader); err != nil { if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err return err
} }
pb.meta.NumTombstones = int64(len(delStones)) pb.meta.NumTombstones = int64(len(pb.tombstones))
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }

View file

@ -263,12 +263,8 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
} }
// Create an empty tombstones file. // Create an empty tombstones file.
tf, err := os.Create(filepath.Join(tmp, tombstoneFilename)) if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
if err != nil { return errors.Wrap(err, "write new tombstones file")
return errors.Wrap(err, "touch tombstones file")
}
if err := tf.Close(); err != nil {
return errors.Wrap(err, "close tombstones file")
} }
// Block successfully written, make visible and remove old ones. // Block successfully written, make visible and remove old ones.
@ -444,11 +440,6 @@ func (c *compactionSeriesSet) Next() bool {
chks := make([]*ChunkMeta, 0, len(c.c)) chks := make([]*ChunkMeta, 0, len(c.c))
for _, chk := range c.c { for _, chk := range c.c {
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil {
return false
}
chks = append(chks, chk) chks = append(chks, chk)
} }
} }
@ -456,6 +447,13 @@ func (c *compactionSeriesSet) Next() bool {
c.c = chks c.c = chks
} }
for _, chk := range c.c {
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil {
return false
}
}
return true return true
} }

11
db.go
View file

@ -424,6 +424,7 @@ func (db *DB) reloadBlocks() error {
if err := validateBlockSequence(blocks); err != nil { if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence") return errors.Wrap(err, "invalid block sequence")
} }
// Close all opened blocks that no longer exist after we returned all locks. // Close all opened blocks that no longer exist after we returned all locks.
for _, b := range db.blocks { for _, b := range db.blocks {
if _, ok := exist[b.Meta().ULID]; !ok { if _, ok := exist[b.Meta().ULID]; !ok {
@ -670,22 +671,24 @@ func (a *dbAppender) Rollback() error {
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
db.mtx.Lock()
defer db.mtx.Unlock()
db.mtx.RLock()
blocks := db.blocksForInterval(mint, maxt) blocks := db.blocksForInterval(mint, maxt)
db.mtx.RUnlock()
var g errgroup.Group var g errgroup.Group
for _, b := range blocks { for _, b := range blocks {
g.Go(func() error { return b.Delete(mint, maxt, ms...) }) g.Go(func(b Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
} }
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
return err return err
} }
return db.reloadBlocks() return nil
} }
// appendable returns a copy of a slice of HeadBlocks that can still be appended to. // appendable returns a copy of a slice of HeadBlocks that can still be appended to.

View file

@ -260,12 +260,9 @@ Outer:
return err return err
} }
// Map is accessed in other places also, so protect it.
h.mtx.Lock()
for k, v := range newStones { for k, v := range newStones {
h.tombstones[k] = h.tombstones[k].add(v[0]) h.tombstones[k] = h.tombstones[k].add(v[0])
} }
h.mtx.Unlock()
h.meta.NumTombstones = int64(len(h.tombstones)) h.meta.NumTombstones = int64(len(h.tombstones))
return nil return nil