Misc. fixes incorporating feedback.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-23 18:13:30 +05:30
parent 31cf939448
commit 9bf7aa9af1
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
5 changed files with 85 additions and 18 deletions

16
db.go
View file

@ -120,7 +120,7 @@ type DB struct {
donec chan struct{} donec chan struct{}
stopc chan struct{} stopc chan struct{}
// compMtx is used to control compactions and deletions. // cmtx is used to control compactions and deletions.
cmtx sync.Mutex cmtx sync.Mutex
} }
@ -671,17 +671,14 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
db.headmtx.RLock() db.mtx.RLock()
blocks := db.blocksForInterval(mint, maxt) blocks := db.blocksForInterval(mint, maxt)
db.headmtx.RUnlock() db.mtx.RUnlock()
var g errgroup.Group var g errgroup.Group
for _, b := range blocks { for _, b := range blocks {
f := func() error { g.Go(func() error { return b.Delete(mint, maxt, ms...) })
return b.Delete(mint, maxt, ms...)
}
g.Go(f)
} }
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
@ -705,10 +702,7 @@ func (db *DB) appendable() (r []headBlock) {
func intervalOverlap(amin, amax, bmin, bmax int64) bool { func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/ // Checks Overlap: http://stackoverflow.com/questions/3269434/
if amin <= bmax && bmin <= amax { return amin <= bmax && bmin <= amax
return true
}
return false
} }
func intervalContains(min, max, t int64) bool { func intervalContains(min, max, t int64) bool {

View file

@ -15,6 +15,7 @@ package tsdb
import ( import (
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"testing" "testing"
@ -141,3 +142,77 @@ func TestDBAppenderAddRef(t *testing.T) {
err = app2.AddFast(string(refb), 1, 1) err = app2.AddFast(string(refb), 1, 1)
require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
} }
func TestDeleteSimple(t *testing.T) {
numSamples := int64(10)
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
app := db.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
require.NoError(t, app.Commit())
cases := []struct {
intervals intervals
remaint []int64
}{
{
intervals: intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Outer:
for _, c := range cases {
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.intervals {
require.NoError(t, db.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b")))
}
// Compare the result.
q := db.Querier(0, numSamples)
res := q.Select(labels.NewEqualMatcher("a", "b"))
expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]})
}
expss := newListSeriesSet([]Series{
newSeries(map[string]string{"a": "b"}, expSamples),
})
if len(expSamples) == 0 {
require.False(t, res.Next())
continue
}
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok, "next")
if !eok {
continue Outer
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
require.Equal(t, errExp, errRes, "samples error")
require.Equal(t, smplExp, smplRes, "samples")
}
}
}

View file

@ -153,8 +153,7 @@ func (h *HeadBlock) init() error {
deletesFunc := func(stones []stone) error { deletesFunc := func(stones []stone) error {
for _, s := range stones { for _, s := range stones {
for _, itv := range s.intervals { for _, itv := range s.intervals {
// TODO(gouthamve): Recheck. h.tombstones.stones[s.ref] = h.tombstones.stones[s.ref].add(itv)
h.tombstones.stones[s.ref].add(itv)
} }
} }

View file

@ -382,7 +382,7 @@ func TestHeadBlock_e2e(t *testing.T) {
return return
} }
func TestDeleteSimple(t *testing.T) { func TestHBDeleteSimple(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
dir, _ := ioutil.TempDir("", "test") dir, _ := ioutil.TempDir("", "test")
@ -427,7 +427,6 @@ func TestDeleteSimple(t *testing.T) {
Outer: Outer:
for _, c := range cases { for _, c := range cases {
// Reset the tombstones. // Reset the tombstones.
writeTombstoneFile(hb.dir, newEmptyTombstoneReader())
hb.tombstones = newEmptyTombstoneReader() hb.tombstones = newEmptyTombstoneReader()
// Delete the ranges. // Delete the ranges.

6
wal.go
View file

@ -49,13 +49,13 @@ const (
WALEntryDeletes WALEntryType = 4 WALEntryDeletes WALEntryType = 4
) )
// SamplesCB yolo. // SamplesCB is the callback after reading samples.
type SamplesCB func([]RefSample) error type SamplesCB func([]RefSample) error
// SeriesCB yolo. // SeriesCB is the callback after reading series.
type SeriesCB func([]labels.Labels) error type SeriesCB func([]labels.Labels) error
// DeletesCB yolo. // DeletesCB is the callback after reading deletes.
type DeletesCB func([]stone) error type DeletesCB func([]stone) error
// SegmentWAL is a write ahead log for series data. // SegmentWAL is a write ahead log for series data.