Merge pull request #209 from Gouthamve/fix-dels

Partial chunk deletions are not compacted away
This commit is contained in:
Goutham Veeramachaneni 2017-11-30 15:25:25 +05:30 committed by GitHub
commit 981e504bd7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 17 deletions

View file

@ -46,7 +46,7 @@ func createEmptyBlock(t *testing.T, dir string) *Block {
Ok(t, os.MkdirAll(chunkDir(dir), 0777))
Ok(t, writeTombstoneFile(dir, newEmptyTombstoneReader()))
Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader()))
b, err := OpenBlock(dir, nil)
Ok(t, err)

View file

@ -553,22 +553,24 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
for _, chk := range chks {
if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
newChunk := chunks.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
}
chk.Chunk = newChunk
for i, chk := range chks {
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
continue
}
newChunk := chunks.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
}
chks[i].Chunk = newChunk
}
}
if err := chunkw.WriteChunks(chks...); err != nil {

2
db.go
View file

@ -616,7 +616,7 @@ func (db *DB) Snapshot(dir string) error {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
}
}
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())

View file

@ -371,6 +371,91 @@ func TestDB_Snapshot(t *testing.T) {
require.Equal(t, sum, 1000.0)
}
func TestDB_SnapshotWithDelete(t *testing.T) {
numSamples := int64(10)
db, close := openTestDB(t, nil)
defer close()
app := db.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
require.NoError(t, app.Commit())
cases := []struct {
intervals Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Outer:
for _, c := range cases {
// TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges.
for _, r := range c.intervals {
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
}
// create snapshot
snap, err := ioutil.TempDir("", "snap")
require.NoError(t, err)
require.NoError(t, db.Snapshot(snap))
require.NoError(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
require.NoError(t, err)
// Compare the result.
q, err := db.Querier(0, numSamples)
require.NoError(t, err)
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
require.NoError(t, err)
expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]})
}
expss := newListSeriesSet([]Series{
newSeries(map[string]string{"a": "b"}, expSamples),
})
if len(expSamples) == 0 {
require.False(t, res.Next())
continue
}
for {
eok, rok := expss.Next(), res.Next()
require.Equal(t, eok, rok, "next")
if !eok {
continue Outer
}
sexp := expss.At()
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
require.Equal(t, errExp, errRes, "samples error")
require.Equal(t, smplExp, smplRes, "samples")
}
}
}
func TestDB_e2e(t *testing.T) {
const (
numDatapoints = 1000