Add a function to cleanup tombstones.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-11-22 18:04:50 +05:30
parent a00d700d48
commit e7445d00b0
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
3 changed files with 141 additions and 0 deletions

View file

@ -335,6 +335,20 @@ Outer:
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }
// CleanTombstones will rewrite the block if there any tombstones to remove them
// and returns if there was a re-write.
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
if len(pb.tombstones) == 0 {
return false, nil
}
if err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
return false, err
}
return true, nil
}
// Snapshot creates snapshot of the block into dir. // Snapshot creates snapshot of the block into dir.
func (pb *Block) Snapshot(dir string) error { func (pb *Block) Snapshot(dir string) error {
blockDir := filepath.Join(dir, pb.meta.ULID.String()) blockDir := filepath.Join(dir, pb.meta.ULID.String())

37
db.go
View file

@ -122,6 +122,7 @@ type dbMetrics struct {
reloads prometheus.Counter reloads prometheus.Counter
reloadsFailed prometheus.Counter reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter compactionsTriggered prometheus.Counter
tombCleanTimer prometheus.Histogram
} }
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
@ -147,6 +148,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.", Help: "Total number of triggered compactions for the partition.",
}) })
m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.",
})
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
@ -154,6 +159,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.reloads, m.reloads,
m.reloadsFailed, m.reloadsFailed,
m.compactionsTriggered, m.compactionsTriggered,
m.tombCleanTimer,
) )
} }
return m return m
@ -686,6 +692,37 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return nil return nil
} }
// CleanTombstones re-writes any blocks with tombstones.
func (db *DB) CleanTombstones() error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
start := time.Now()
defer db.metrics.tombCleanTimer.Observe(float64(time.Since(start).Seconds()))
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
deleted := []string{}
for _, b := range blocks {
ok, err := b.CleanTombstones(db.Dir(), db.compactor)
if err != nil {
return errors.Wrapf(err, "clean tombstones: %s", b.Dir())
}
if ok {
deleted = append(deleted, b.Dir())
}
}
if len(deleted) == 0 {
return nil
}
return errors.Wrap(db.reload(deleted...), "reload blocks")
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool { func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/ // Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax return amin <= bmax && bmin <= amax

View file

@ -551,3 +551,93 @@ func TestWALFlushedOnDBClose(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, values, []string{"labelvalue"}) require.Equal(t, values, []string{"labelvalue"})
} }
func TestTombstoneClean(t *testing.T) {
numSamples := int64(10)
db, close := openTestDB(t, nil)
defer close()
app := db.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
require.NoError(t, app.Commit())
cases := []struct {
intervals Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
for _, c := range cases {
// Delete the ranges.
// create snapshot
snap, err := ioutil.TempDir("", "snap")
require.NoError(t, err)
require.NoError(t, db.Snapshot(snap))
require.NoError(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
require.NoError(t, err)
for _, r := range c.intervals {
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
}
// All of the setup for THIS line.
require.NoError(t, db.CleanTombstones())
// Compare the result.
q, err := db.Querier(0, numSamples)
require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b"))
expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]})
}
expss := newListSeriesSet([]Series{
newSeries(map[string]string{"a": "b"}, expSamples),
})
if len(expSamples) == 0 {
require.False(t, res.Next())
continue
}
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok, "next")
if !eok {
break
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
require.Equal(t, errExp, errRes, "samples error")
require.Equal(t, smplExp, smplRes, "samples")
}
for _, b := range db.blocks {
Equals(t, 0, len(b.tombstones))
}
}
}