Merge branch 'master' into tomb-clean

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-11-30 13:15:23 +05:30
commit 8a5ea9db74
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
13 changed files with 477 additions and 179 deletions

View file

@ -77,6 +77,7 @@ type BlockMetaCompaction struct {
Level int `json:"level"` Level int `json:"level"`
// ULIDs of all source head blocks that went into the block. // ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"` Sources []ulid.ULID `json:"sources,omitempty"`
Failed bool `json:"failed,omitempty"`
} }
const ( const (
@ -144,8 +145,7 @@ type Block struct {
chunkr ChunkReader chunkr ChunkReader
indexr IndexReader indexr IndexReader
tombstones TombstoneReader
tombstones tombstoneReader
} }
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
@ -245,6 +245,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) {
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
} }
func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.dir, &pb.meta)
}
type blockIndexReader struct { type blockIndexReader struct {
IndexReader IndexReader
b *Block b *Block
@ -284,13 +289,15 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return ErrClosing return ErrClosing
} }
pr := newPostingsReader(pb.indexr) p, absent, err := PostingsForMatchers(pb.indexr, ms...)
p, absent := pr.Select(ms...) if err != nil {
return errors.Wrap(err, "select series")
}
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
stones := map[uint64]Intervals{} stones := memTombstones{}
var lset labels.Labels var lset labels.Labels
var chks []ChunkMeta var chks []ChunkMeta
@ -322,27 +329,42 @@ Outer:
return p.Err() return p.Err()
} }
// Merge the current and new tombstones. err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for k, v := range stones { for _, iv := range ivs {
pb.tombstones.add(k, v[0]) stones.add(id, iv)
pb.meta.Stats.NumTombstones++
} }
return nil
})
if err != nil {
return err
}
pb.tombstones = stones
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err return err
} }
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }
// CleanTombstones will rewrite the block if there any tombstones to remove them // CleanTombstones will rewrite the block if there any tombstones to remove them
// and returns if there was a re-write. // and returns if there was a re-write.
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) { func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
if len(pb.tombstones) == 0 { numStones := 0
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _ = range ivs {
numStones++
}
return nil
})
if numStones == 0 {
return false, nil return false, nil
} }
if err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil { if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
return false, err return false, err
} }

View file

@ -12,3 +12,43 @@
// limitations under the License. // limitations under the License.
package tsdb package tsdb
import (
"io/ioutil"
"os"
"testing"
)
func TestSetCompactionFailed(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test-tsdb")
Ok(t, err)
b := createEmptyBlock(t, tmpdir)
Equals(t, false, b.meta.Compaction.Failed)
Ok(t, b.setCompactionFailed())
Equals(t, true, b.meta.Compaction.Failed)
Ok(t, b.Close())
b, err = OpenBlock(tmpdir, nil)
Ok(t, err)
Equals(t, true, b.meta.Compaction.Failed)
}
func createEmptyBlock(t *testing.T, dir string) *Block {
Ok(t, os.MkdirAll(dir, 0777))
Ok(t, writeMetaFile(dir, &BlockMeta{}))
ir, err := newIndexWriter(dir)
Ok(t, err)
Ok(t, ir.Close())
Ok(t, os.MkdirAll(chunkDir(dir), 0777))
Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader()))
b, err := OpenBlock(dir, nil)
Ok(t, err)
return b
}

View file

@ -52,7 +52,7 @@ type Compactor interface {
Plan(dir string) ([]string, error) Plan(dir string) ([]string, error)
// Write persists a Block into a directory. // Write persists a Block into a directory.
Write(dest string, b BlockReader, mint, maxt int64) error Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must // Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan(). // only be called concurrently with results of Plan().
@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
continue continue
} }
Outer:
for _, p := range parts { for _, p := range parts {
// Donot select the range if it has a block whose compaction failed.
for _, dm := range p {
if dm.meta.Compaction.Failed {
continue Outer
}
}
mint := p[0].meta.MinTime mint := p[0].meta.MinTime
maxt := p[len(p)-1].meta.MaxTime maxt := p[len(p)-1].meta.MaxTime
// Pick the range of blocks if it spans the full range (potentially with gaps) // Pick the range of blocks if it spans the full range (potentially with gaps)
@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// provided directories. // provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var blocks []BlockReader var blocks []BlockReader
var bs []*Block
var metas []*BlockMeta var metas []*BlockMeta
for _, d := range dirs { for _, d := range dirs {
@ -313,15 +322,30 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
metas = append(metas, meta) metas = append(metas, meta)
blocks = append(blocks, b) blocks = append(blocks, b)
bs = append(bs, b)
} }
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(dest, compactBlockMetas(uid, metas...), blocks...) err = c.write(dest, compactBlockMetas(uid, metas...), blocks...)
if err == nil {
return nil
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { var merr MultiError
merr.Add(err)
for _, b := range bs {
if err := b.setCompactionFailed(); err != nil {
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
}
}
return merr
}
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
@ -333,7 +357,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e
meta.Compaction.Level = 1 meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid} meta.Compaction.Sources = []ulid.ULID{uid}
return c.write(dest, meta, b) return uid, c.write(dest, meta, b)
} }
// instrumentedChunkWriter is used for level 1 compactions to record statistics // instrumentedChunkWriter is used for level 1 compactions to record statistics
@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error {
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
defer func(t time.Time) { defer func(t time.Time) {
if err != nil { if err != nil {
c.metrics.failed.Inc() c.metrics.failed.Inc()
// TODO(gouthamve): Handle error how?
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
} }
c.metrics.ran.Inc() c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds()) c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now()) }(time.Now())
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil { if err = os.RemoveAll(tmp); err != nil {
return err return err
} }
@ -418,7 +446,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
// Create an empty tombstones file. // Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil {
return errors.Wrap(err, "write new tombstones file") return errors.Wrap(err, "write new tombstones file")
} }
@ -453,7 +481,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// of the provided blocks. It returns meta information for the new block. // of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
var ( var (
set compactionSet set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16) allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{} closers = []io.Closer{}
) )
@ -589,7 +617,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
} }
for l := range postings.m { for _, l := range postings.sortedKeys() {
if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings") return errors.Wrap(err, "write postings")
} }
@ -597,18 +625,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil return nil
} }
type compactionSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, Intervals)
Err() error
}
type compactionSeriesSet struct { type compactionSeriesSet struct {
p Postings p Postings
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
tombstones TombstoneReader tombstones TombstoneReader
series SeriesSet
l labels.Labels l labels.Labels
c []ChunkMeta c []ChunkMeta
@ -631,7 +652,11 @@ func (c *compactionSeriesSet) Next() bool {
} }
var err error var err error
c.intervals = c.tombstones.Get(c.p.At()) c.intervals, err = c.tombstones.Get(c.p.At())
if err != nil {
c.err = errors.Wrap(err, "get tombstones")
return false
}
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At()) c.err = errors.Wrapf(err, "get series %d", c.p.At())
@ -675,7 +700,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
} }
type compactionMerger struct { type compactionMerger struct {
a, b compactionSet a, b ChunkSeriesSet
aok, bok bool aok, bok bool
l labels.Labels l labels.Labels
@ -688,7 +713,7 @@ type compactionSeries struct {
chunks []*ChunkMeta chunks []*ChunkMeta
} }
func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{ c := &compactionMerger{
a: a, a: a,
b: b, b: b,

View file

@ -14,8 +14,13 @@
package tsdb package tsdb
import ( import (
"io/ioutil"
"os"
"path/filepath"
"testing" "testing"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -157,17 +162,6 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, nil) }, nil)
require.NoError(t, err) require.NoError(t, err)
metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta {
meta := &BlockMeta{MinTime: mint, MaxTime: maxt}
if stats != nil {
meta.Stats = *stats
}
return dirMeta{
dir: name,
meta: meta,
}
}
cases := []struct { cases := []struct {
metas []dirMeta metas []dirMeta
expected []string expected []string
@ -274,3 +268,85 @@ func TestLeveledCompactor_plan(t *testing.T) {
require.Equal(t, c.expected, res, "test case %d", i) require.Equal(t, c.expected, res, "test case %d", i)
} }
} }
func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, nil, []int64{
20,
60,
240,
720,
2160,
}, nil)
Ok(t, err)
cases := []struct {
metas []dirMeta
}{
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 40, 60, nil),
},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 60, 80, nil),
},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 40, 60, nil),
metaRange("4", 60, 120, nil),
metaRange("5", 120, 180, nil),
},
},
}
for _, c := range cases {
c.metas[1].meta.Compaction.Failed = true
res, err := compactor.plan(c.metas)
Ok(t, err)
Equals(t, []string(nil), res)
}
}
func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{
20,
60,
240,
720,
2160,
}, nil)
Ok(t, err)
tmpdir, err := ioutil.TempDir("", "test")
Ok(t, err)
NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp")
Assert(t, os.IsNotExist(err), "directory is not cleaned up")
}
func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
meta := &BlockMeta{MinTime: mint, MaxTime: maxt}
if stats != nil {
meta.Stats = *stats
}
return dirMeta{
dir: name,
meta: meta,
}
}
type erringBReader struct{}
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }

41
db.go
View file

@ -283,16 +283,23 @@ func (db *DB) retentionCutoff() (bool, error) {
} }
db.mtx.RLock() db.mtx.RLock()
defer db.mtx.RUnlock() blocks := db.blocks[:]
db.mtx.RUnlock()
if len(db.blocks) == 0 { if len(blocks) == 0 {
return false, nil return false, nil
} }
last := db.blocks[len(db.blocks)-1] last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint) mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
dirs, err := retentionCutoffDirs(db.dir, mint)
if err != nil {
return false, err
}
// This will close the dirs and then delete the dirs.
return len(dirs) > 0, db.reload(dirs...)
} }
// Appender opens a new appender against the database. // Appender opens a new appender against the database.
@ -350,7 +357,7 @@ func (db *DB) compact() (changes bool, err error) {
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
} }
if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
return changes, errors.Wrap(err, "persist head block") return changes, errors.Wrap(err, "persist head block")
} }
changes = true changes = true
@ -394,40 +401,37 @@ func (db *DB) compact() (changes bool, err error) {
return changes, nil return changes, nil
} }
// retentionCutoff deletes all directories of blocks in dir that are strictly // retentionCutoffDirs returns all directories of blocks in dir that are strictly
// before mint. // before mint.
func retentionCutoff(dir string, mint int64) (bool, error) { func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "open directory") return nil, errors.Wrapf(err, "open directory")
} }
defer df.Close() defer df.Close()
dirs, err := blockDirs(dir) dirs, err := blockDirs(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "list block dirs %s", dir) return nil, errors.Wrapf(err, "list block dirs %s", dir)
} }
changes := false delDirs := []string{}
for _, dir := range dirs { for _, dir := range dirs {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return changes, errors.Wrapf(err, "read block meta %s", dir) return nil, errors.Wrapf(err, "read block meta %s", dir)
} }
// The first block we encounter marks that we crossed the boundary // The first block we encounter marks that we crossed the boundary
// of deletable blocks. // of deletable blocks.
if meta.MaxTime >= mint { if meta.MaxTime >= mint {
break break
} }
changes = true
if err := os.RemoveAll(dir); err != nil { delDirs = append(delDirs, dir)
return changes, err
}
} }
return changes, fileutil.Fsync(df) return delDirs, nil
} }
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
@ -621,7 +625,8 @@ func (db *DB) Snapshot(dir string) error {
return errors.Wrap(err, "error snapshotting headblock") return errors.Wrap(err, "error snapshotting headblock")
} }
} }
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
return errors.Wrap(err, "snapshot head block")
} }
// Querier returns a new querier over the data partition for the given time range. // Querier returns a new querier over the data partition for the given time range.

View file

@ -27,17 +27,21 @@ import (
) )
func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, err := ioutil.TempDir("", "test")
Ok(t, err)
db, err := Open(tmpdir, nil, nil, opts) db, err = Open(tmpdir, nil, nil, opts)
require.NoError(t, err) require.NoError(t, err)
// Do not close the test database by default as it will deadlock on test failures. // Do not close the test database by default as it will deadlock on test failures.
return db, func() { os.RemoveAll(tmpdir) } return db, func() { os.RemoveAll(tmpdir) }
} }
// Convert a SeriesSet into a form useable with reflect.DeepEqual. // query runs a matcher query against the querier and fully expands its data.
func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample {
ss, err := q.Select(matchers...)
Ok(t, err)
result := map[string][]sample{} result := map[string][]sample{}
for ss.Next() { for ss.Next() {
@ -49,12 +53,12 @@ func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample {
t, v := it.At() t, v := it.At()
samples = append(samples, sample{t: t, v: v}) samples = append(samples, sample{t: t, v: v})
} }
require.NoError(t, it.Err()) Ok(t, it.Err())
name := series.Labels().String() name := series.Labels().String()
result[name] = samples result[name] = samples
} }
require.NoError(t, ss.Err()) Ok(t, ss.Err())
return result return result
} }
@ -70,7 +74,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
querier, err := db.Querier(0, 1) querier, err := db.Querier(0, 1)
require.NoError(t, err) require.NoError(t, err)
seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar"))
require.Equal(t, seriesSet, map[string][]sample{}) require.Equal(t, seriesSet, map[string][]sample{})
require.NoError(t, querier.Close()) require.NoError(t, querier.Close())
@ -82,7 +86,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer querier.Close() defer querier.Close()
seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar"))
require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}})
} }
@ -102,7 +106,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer querier.Close() defer querier.Close()
seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar"))
require.Equal(t, seriesSet, map[string][]sample{}) require.Equal(t, seriesSet, map[string][]sample{})
} }
@ -146,7 +150,7 @@ func TestDBAppenderAddRef(t *testing.T) {
q, err := db.Querier(0, 200) q, err := db.Querier(0, 200)
require.NoError(t, err) require.NoError(t, err)
res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) res := query(t, q, labels.NewEqualMatcher("a", "b"))
require.Equal(t, map[string][]sample{ require.Equal(t, map[string][]sample{
labels.FromStrings("a", "b").String(): []sample{ labels.FromStrings("a", "b").String(): []sample{
@ -198,7 +202,8 @@ Outer:
q, err := db.Querier(0, numSamples) q, err := db.Querier(0, numSamples)
require.NoError(t, err) require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
require.NoError(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
@ -294,8 +299,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
q, err := db.Querier(0, 10) q, err := db.Querier(0, 10)
require.NoError(t, err) require.NoError(t, err)
ss := q.Select(labels.NewEqualMatcher("a", "b")) ssMap := query(t, q, labels.NewEqualMatcher("a", "b"))
ssMap := readSeriesSet(t, ss)
require.Equal(t, map[string][]sample{ require.Equal(t, map[string][]sample{
labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}},
@ -314,8 +318,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
q, err = db.Querier(0, 10) q, err = db.Querier(0, 10)
require.NoError(t, err) require.NoError(t, err)
ss = q.Select(labels.NewEqualMatcher("a", "b")) ssMap = query(t, q, labels.NewEqualMatcher("a", "b"))
ssMap = readSeriesSet(t, ss)
require.Equal(t, map[string][]sample{ require.Equal(t, map[string][]sample{
labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}},
@ -352,7 +355,9 @@ func TestDB_Snapshot(t *testing.T) {
defer querier.Close() defer querier.Close()
// sum values // sum values
seriesSet := querier.Select(labels.NewEqualMatcher("foo", "bar")) seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
require.NoError(t, err)
sum := 0.0 sum := 0.0
for seriesSet.Next() { for seriesSet.Next() {
series := seriesSet.At().Iterator() series := seriesSet.At().Iterator()
@ -500,7 +505,8 @@ func TestDB_e2e(t *testing.T) {
q, err := db.Querier(mint, maxt) q, err := db.Querier(mint, maxt)
require.NoError(t, err) require.NoError(t, err)
ss := q.Select(qry.ms...) ss, err := q.Select(qry.ms...)
require.NoError(t, err)
result := map[string][]sample{} result := map[string][]sample{}
@ -526,7 +532,8 @@ func TestDB_e2e(t *testing.T) {
} }
func TestWALFlushedOnDBClose(t *testing.T) { func TestWALFlushedOnDBClose(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, err := ioutil.TempDir("", "test")
Ok(t, err)
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil) db, err := Open(tmpdir, nil, nil, nil)
@ -601,7 +608,8 @@ func TestTombstoneClean(t *testing.T) {
q, err := db.Querier(0, numSamples) q, err := db.Querier(0, numSamples)
require.NoError(t, err) require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
require.NoError(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
@ -637,7 +645,64 @@ func TestTombstoneClean(t *testing.T) {
} }
for _, b := range db.blocks { for _, b := range db.blocks {
Equals(t, 0, len(b.tombstones)) Equals(t, emptyTombstoneReader, b.tombstones)
} }
} }
} }
func TestDB_Retention(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
app := db.Appender()
_, err = app.Add(lbls, 0, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())
// create snapshot to make it create a block.
// TODO(gouthamve): Add a method to compact headblock.
snap, err := ioutil.TempDir("", "snap")
require.NoError(t, err)
require.NoError(t, db.Snapshot(snap))
require.NoError(t, db.Close())
defer os.RemoveAll(snap)
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
require.NoError(t, err)
Equals(t, 1, len(db.blocks))
app = db.Appender()
_, err = app.Add(lbls, 100, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())
// Snapshot again to create another block.
snap, err = ioutil.TempDir("", "snap")
require.NoError(t, err)
require.NoError(t, db.Snapshot(snap))
require.NoError(t, db.Close())
defer os.RemoveAll(snap)
// reopen DB from snapshot
db, err = Open(snap, nil, nil, &Options{
RetentionDuration: 10,
BlockRanges: []int64{50},
})
require.NoError(t, err)
Equals(t, 2, len(db.blocks))
// Now call rentention.
changes, err := db.retentionCutoff()
Ok(t, err)
Assert(t, changes, "there should be changes")
Equals(t, 1, len(db.blocks))
Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
}

10
head.go
View file

@ -66,7 +66,7 @@ type Head struct {
postings *memPostings // postings lists for terms postings *memPostings // postings lists for terms
tombstones tombstoneReader tombstones memTombstones
} }
type headMetrics struct { type headMetrics struct {
@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: newUnorderedMemPostings(), postings: newUnorderedMemPostings(),
tombstones: newEmptyTombstoneReader(), tombstones: memTombstones{},
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -574,8 +574,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := h.indexRange(mint, maxt) ir := h.indexRange(mint, maxt)
pr := newPostingsReader(ir) p, absent, err := PostingsForMatchers(ir, ms...)
p, absent := pr.Select(ms...) if err != nil {
return errors.Wrap(err, "select series")
}
var stones []Stone var stones []Stone

View file

@ -318,7 +318,7 @@ func TestHeadDeleteSimple(t *testing.T) {
Outer: Outer:
for _, c := range cases { for _, c := range cases {
// Reset the tombstones. // Reset the tombstones.
head.tombstones = newEmptyTombstoneReader() head.tombstones = memTombstones{}
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.intervals {
@ -328,7 +328,8 @@ Outer:
// Compare the result. // Compare the result.
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err) require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
require.NoError(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {

View file

@ -50,6 +50,25 @@ func newUnorderedMemPostings() *memPostings {
} }
} }
// sortedKeys returns a list of sorted label keys of the postings.
func (p *memPostings) sortedKeys() []labels.Label {
p.mtx.RLock()
keys := make([]labels.Label, 0, len(p.m))
for l := range p.m {
keys = append(keys, l)
}
p.mtx.RUnlock()
sort.Slice(keys, func(i, j int) bool {
if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 {
return d < 0
}
return keys[i].Value < keys[j].Value
})
return keys
}
// Postings returns an iterator over the postings list for s. // Postings returns an iterator over the postings list for s.
func (p *memPostings) get(name, value string) Postings { func (p *memPostings) get(name, value string) Postings {
p.mtx.RLock() p.mtx.RLock()
@ -165,6 +184,11 @@ func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{} var emptyPostings = errPostings{}
// EmptyPostings returns a postings list that's always empty.
func EmptyPostings() Postings {
return emptyPostings
}
// Intersect returns a new postings list over the intersection of the // Intersect returns a new postings list over the intersection of the
// input postings. // input postings.
func Intersect(its ...Postings) Postings { func Intersect(its ...Postings) Postings {

View file

@ -27,7 +27,7 @@ import (
// time range. // time range.
type Querier interface { type Querier interface {
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
Select(...labels.Matcher) SeriesSet Select(...labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
LabelValues(string) ([]string, error) LabelValues(string) ([]string, error)
@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
func (q *querier) Select(ms ...labels.Matcher) SeriesSet { func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms) return q.sel(q.blocks, ms)
} }
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
if len(qs) == 0 { if len(qs) == 0 {
return nopSeriesSet{} return EmptySeriesSet(), nil
} }
if len(qs) == 1 { if len(qs) == 1 {
return qs[0].Select(ms...) return qs[0].Select(ms...)
} }
l := len(qs) / 2 l := len(qs) / 2
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
a, err := q.sel(qs[:l], ms)
if err != nil {
return nil, err
}
b, err := q.sel(qs[l:], ms)
if err != nil {
return nil, err
}
return newMergedSeriesSet(a, b), nil
} }
func (q *querier) Close() error { func (q *querier) Close() error {
@ -141,20 +150,14 @@ type blockQuerier struct {
mint, maxt int64 mint, maxt int64
} }
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) {
pr := newPostingsReader(q.index) base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
if err != nil {
p, absent := pr.Select(ms...) return nil, err
}
return &blockSeriesSet{ return &blockSeriesSet{
set: &populatedChunkSeries{ set: &populatedChunkSeries{
set: &baseChunkSeries{ set: base,
p: p,
index: q.index,
absent: absent,
tombstones: q.tombstones,
},
chunks: q.chunks, chunks: q.chunks,
mint: q.mint, mint: q.mint,
maxt: q.maxt, maxt: q.maxt,
@ -162,7 +165,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
mint: q.mint, mint: q.mint,
maxt: q.maxt, maxt: q.maxt,
} }, nil
} }
func (q *blockQuerier) LabelValues(name string) ([]string, error) { func (q *blockQuerier) LabelValues(name string) ([]string, error) {
@ -196,16 +199,10 @@ func (q *blockQuerier) Close() error {
return merr.Err() return merr.Err()
} }
// postingsReader is used to select matching postings from an IndexReader. // PostingsForMatchers assembles a single postings iterator against the index reader
type postingsReader struct { // based on the given matchers. It returns a list of label names that must be manually
index IndexReader // checked to not exist in series the postings list points to.
} func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) {
func newPostingsReader(i IndexReader) *postingsReader {
return &postingsReader{index: i}
}
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
var ( var (
its []Postings its []Postings
absent []string absent []string
@ -217,12 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
absent = append(absent, m.Name()) absent = append(absent, m.Name())
continue continue
} }
its = append(its, r.selectSingle(m)) it, err := postingsForMatcher(index, m)
if err != nil {
return nil, nil, err
} }
its = append(its, it)
p := Intersect(its...) }
return index.SortedPostings(Intersect(its...)), absent, nil
return r.index.SortedPostings(p), absent
} }
// tuplesByPrefix uses binary search to find prefix matches within ts. // tuplesByPrefix uses binary search to find prefix matches within ts.
@ -256,33 +254,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
return matches, nil return matches, nil
} }
func (r *postingsReader) selectSingle(m labels.Matcher) Postings { func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
// Fast-path for equal matching. // Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok { if em, ok := m.(*labels.EqualMatcher); ok {
it, err := r.index.Postings(em.Name(), em.Value()) it, err := index.Postings(em.Name(), em.Value())
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
return it return it, nil
} }
tpls, err := r.index.LabelValues(m.Name()) tpls, err := index.LabelValues(m.Name())
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
var res []string var res []string
if pm, ok := m.(*labels.PrefixMatcher); ok { if pm, ok := m.(*labels.PrefixMatcher); ok {
res, err = tuplesByPrefix(pm, tpls) res, err = tuplesByPrefix(pm, tpls)
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
} else { } else {
for i := 0; i < tpls.Len(); i++ { for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i) vals, err := tpls.At(i)
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
if m.Matches(vals[0]) { if m.Matches(vals[0]) {
res = append(res, vals[0]) res = append(res, vals[0])
@ -291,20 +289,20 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
} }
if len(res) == 0 { if len(res) == 0 {
return emptyPostings return EmptyPostings(), nil
} }
var rit []Postings var rit []Postings
for _, v := range res { for _, v := range res {
it, err := r.index.Postings(m.Name(), v) it, err := index.Postings(m.Name(), v)
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
rit = append(rit, it) rit = append(rit, it)
} }
return Merge(rit...) return Merge(rit...), nil
} }
func mergeStrings(a, b []string) []string { func mergeStrings(a, b []string) []string {
@ -342,11 +340,12 @@ type SeriesSet interface {
Err() error Err() error
} }
type nopSeriesSet struct{} var emptySeriesSet = errSeriesSet{}
func (nopSeriesSet) Next() bool { return false } // EmptySeriesSet returns a series set that's always empty.
func (nopSeriesSet) At() Series { return nil } func EmptySeriesSet() SeriesSet {
func (nopSeriesSet) Err() error { return nil } return emptySeriesSet
}
// mergedSeriesSet takes two series sets as a single series set. The input series sets // mergedSeriesSet takes two series sets as a single series set. The input series sets
// must be sorted and sequential in time, i.e. if they have the same label set, // must be sorted and sequential in time, i.e. if they have the same label set,
@ -418,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool {
return true return true
} }
type chunkSeriesSet interface { type ChunkSeriesSet interface {
Next() bool Next() bool
At() (labels.Labels, []ChunkMeta, Intervals) At() (labels.Labels, []ChunkMeta, Intervals)
Err() error Err() error
@ -438,6 +437,24 @@ type baseChunkSeries struct {
err error err error
} }
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
if tr == nil {
tr = EmptyTombstoneReader()
}
p, absent, err := PostingsForMatchers(ir, ms...)
if err != nil {
return nil, err
}
return &baseChunkSeries{
p: p,
index: ir,
tombstones: tr,
absent: absent,
}, nil
}
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
return s.lset, s.chks, s.intervals return s.lset, s.chks, s.intervals
} }
@ -448,6 +465,7 @@ func (s *baseChunkSeries) Next() bool {
var ( var (
lset labels.Labels lset labels.Labels
chunks []ChunkMeta chunks []ChunkMeta
err error
) )
Outer: Outer:
for s.p.Next() { for s.p.Next() {
@ -470,7 +488,11 @@ Outer:
s.lset = lset s.lset = lset
s.chks = chunks s.chks = chunks
s.intervals = s.tombstones.Get(s.p.At()) s.intervals, err = s.tombstones.Get(s.p.At())
if err != nil {
s.err = errors.Wrap(err, "get tombstones")
return false
}
if len(s.intervals) > 0 { if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted. // Only those chunks that are not entirely deleted.
@ -496,7 +518,7 @@ Outer:
// with known chunk references. It filters out chunks that do not fit the // with known chunk references. It filters out chunks that do not fit the
// given time range. // given time range.
type populatedChunkSeries struct { type populatedChunkSeries struct {
set chunkSeriesSet set ChunkSeriesSet
chunks ChunkReader chunks ChunkReader
mint, maxt int64 mint, maxt int64
@ -553,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool {
// blockSeriesSet is a set of series from an inverted index query. // blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct { type blockSeriesSet struct {
set chunkSeriesSet set ChunkSeriesSet
err error err error
cur Series cur Series

View file

@ -454,13 +454,14 @@ Outer:
querier := &blockQuerier{ querier := &blockQuerier{
index: ir, index: ir,
chunks: cr, chunks: cr,
tombstones: newEmptyTombstoneReader(), tombstones: EmptyTombstoneReader(),
mint: c.mint, mint: c.mint,
maxt: c.maxt, maxt: c.maxt,
} }
res := querier.Select(c.ms...) res, err := querier.Select(c.ms...)
require.NoError(t, err)
for { for {
eok, rok := c.exp.Next(), res.Next() eok, rok := c.exp.Next(), res.Next()
@ -505,7 +506,7 @@ func TestBlockQuerierDelete(t *testing.T) {
chunks [][]sample chunks [][]sample
} }
tombstones tombstoneReader tombstones TombstoneReader
queries []query queries []query
}{ }{
data: []struct { data: []struct {
@ -553,13 +554,11 @@ func TestBlockQuerierDelete(t *testing.T) {
}, },
}, },
}, },
tombstones: newTombstoneReader( tombstones: memTombstones{
map[uint64]Intervals{
1: Intervals{{1, 3}}, 1: Intervals{{1, 3}},
2: Intervals{{1, 3}, {6, 10}}, 2: Intervals{{1, 3}, {6, 10}},
3: Intervals{{6, 10}}, 3: Intervals{{6, 10}},
}, },
),
queries: []query{ queries: []query{
{ {
@ -632,7 +631,8 @@ Outer:
maxt: c.maxt, maxt: c.maxt,
} }
res := querier.Select(c.ms...) res, err := querier.Select(c.ms...)
require.NoError(t, err)
for { for {
eok, rok := c.exp.Next(), res.Next() eok, rok := c.exp.Next(), res.Next()
@ -734,7 +734,7 @@ func TestBaseChunkSeries(t *testing.T) {
bcs := &baseChunkSeries{ bcs := &baseChunkSeries{
p: newListPostings(tc.postings), p: newListPostings(tc.postings),
index: mi, index: mi,
tombstones: newEmptyTombstoneReader(), tombstones: EmptyTombstoneReader(),
} }
i := 0 i := 0
@ -1228,7 +1228,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
sel = func(sets []SeriesSet) SeriesSet { sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 { if len(sets) == 0 {
return nopSeriesSet{} return EmptySeriesSet()
} }
if len(sets) == 1 { if len(sets) == 1 {
return sets[0] return sets[0]

View file

@ -35,12 +35,17 @@ const (
// TombstoneReader gives access to tombstone intervals by series reference. // TombstoneReader gives access to tombstone intervals by series reference.
type TombstoneReader interface { type TombstoneReader interface {
Get(ref uint64) Intervals // Get returns deletion intervals for the series with the given reference.
Get(ref uint64) (Intervals, error)
// Iter calls the given function for each encountered interval.
Iter(func(uint64, Intervals) error) error
// Close any underlying resources
Close() error Close() error
} }
func writeTombstoneFile(dir string, tr tombstoneReader) error { func writeTombstoneFile(dir string, tr TombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename) path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
hash := newCRC32() hash := newCRC32()
@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
mw := io.MultiWriter(f, hash) mw := io.MultiWriter(f, hash)
for k, v := range tr { tr.Iter(func(ref uint64, ivs Intervals) error {
for _, itv := range v { for _, iv := range ivs {
buf.reset() buf.reset()
buf.putUvarint64(k)
buf.putVarint64(itv.Mint) buf.putUvarint64(ref)
buf.putVarint64(itv.Maxt) buf.putVarint64(iv.Mint)
buf.putVarint64(iv.Maxt)
_, err = mw.Write(buf.get()) _, err = mw.Write(buf.get())
if err != nil { if err != nil {
return err return err
} }
} }
} return nil
})
_, err = f.Write(hash.Sum(nil)) _, err = f.Write(hash.Sum(nil))
if err != nil { if err != nil {
@ -100,7 +107,7 @@ type Stone struct {
intervals Intervals intervals Intervals
} }
func readTombstones(dir string) (tombstoneReader, error) { func readTombstones(dir string) (memTombstones, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if err != nil { if err != nil {
return nil, err return nil, err
@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) {
return nil, errors.New("checksum did not match") return nil, errors.New("checksum did not match")
} }
stonesMap := newEmptyTombstoneReader() stonesMap := memTombstones{}
for d.len() > 0 { for d.len() > 0 {
k := d.uvarint64() k := d.uvarint64()
mint := d.varint64() mint := d.varint64()
@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) {
stonesMap.add(k, Interval{mint, maxt}) stonesMap.add(k, Interval{mint, maxt})
} }
return newTombstoneReader(stonesMap), nil return stonesMap, nil
} }
type tombstoneReader map[uint64]Intervals type memTombstones map[uint64]Intervals
func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { var emptyTombstoneReader = memTombstones{}
return tombstoneReader(ts)
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
func EmptyTombstoneReader() TombstoneReader {
return emptyTombstoneReader
} }
func newEmptyTombstoneReader() tombstoneReader { func (t memTombstones) Get(ref uint64) (Intervals, error) {
return tombstoneReader(make(map[uint64]Intervals)) return t[ref], nil
} }
func (t tombstoneReader) Get(ref uint64) Intervals { func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
return t[ref] for ref, ivs := range t {
if err := f(ref, ivs); err != nil {
return err
}
}
return nil
} }
func (t tombstoneReader) add(ref uint64, itv Interval) { func (t memTombstones) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv) t[ref] = t[ref].add(itv)
} }
func (tombstoneReader) Close() error { func (memTombstones) Close() error {
return nil return nil
} }

View file

@ -29,7 +29,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
ref := uint64(0) ref := uint64(0)
stones := make(map[uint64]Intervals) stones := memTombstones{}
// Generate the tombstones. // Generate the tombstones.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
ref += uint64(rand.Int31n(10)) + 1 ref += uint64(rand.Int31n(10)) + 1
@ -43,13 +43,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
stones[ref] = dranges stones[ref] = dranges
} }
require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) require.NoError(t, writeTombstoneFile(tmpdir, stones))
restr, err := readTombstones(tmpdir) restr, err := readTombstones(tmpdir)
require.NoError(t, err) require.NoError(t, err)
exptr := newTombstoneReader(stones)
// Compare the two readers. // Compare the two readers.
require.Equal(t, exptr, restr) require.Equal(t, stones, restr)
} }
func TestAddingNewIntervals(t *testing.T) { func TestAddingNewIntervals(t *testing.T) {