diff --git a/block.go b/block.go index 664ead49e..1a45fb971 100644 --- a/block.go +++ b/block.go @@ -434,7 +434,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := memTombstones{} + stones := NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -450,7 +450,7 @@ Outer: if chk.OverlapsClosedInterval(mint, maxt) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones[p.At()] = Intervals{{tmin, tmax}} + stones.addInterval(p.At(), Interval{tmin, tmax}) continue Outer } } @@ -462,7 +462,7 @@ Outer: err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { for _, iv := range ivs { - stones.add(id, iv) + stones.addInterval(id, iv) pb.meta.Stats.NumTombstones++ } return nil diff --git a/block_test.go b/block_test.go index 5d02a1617..93f145e64 100644 --- a/block_test.go +++ b/block_test.go @@ -67,7 +67,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) - testutil.Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader())) + testutil.Ok(t, writeTombstoneFile(dir, NewMemTombstones())) b, err := OpenBlock(dir, nil) testutil.Ok(t, err) diff --git a/compact.go b/compact.go index aae8cc1c2..1da130057 100644 --- a/compact.go +++ b/compact.go @@ -483,7 +483,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } diff --git a/db_test.go b/db_test.go index 7ed21aaca..205b0f844 100644 --- a/db_test.go +++ b/db_test.go @@ -780,7 +780,7 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.blocks { - testutil.Equals(t, emptyTombstoneReader, b.tombstones) + testutil.Equals(t, NewMemTombstones(), b.tombstones) } } } @@ -809,8 +809,8 @@ func TestTombstoneCleanFail(t *testing.T) { block := createEmptyBlock(t, blockDir, meta) // Add some some fake tombstones to trigger the compaction. - tomb := memTombstones{} - tomb[0] = Intervals{{0, 1}} + tomb := NewMemTombstones() + tomb.addInterval(0, Interval{0, 1}) block.tombstones = tomb db.blocks = append(db.blocks, block) diff --git a/head.go b/head.go index c694d20ca..372842865 100644 --- a/head.go +++ b/head.go @@ -69,7 +69,7 @@ type Head struct { postings *index.MemPostings // postings lists for terms - tombstones memTombstones + tombstones *memTombstones } type headMetrics struct { @@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: memTombstones{}, + tombstones: NewMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error { if itv.Maxt < mint { continue } - h.tombstones.add(s.ref, itv) + h.tombstones.addInterval(s.ref, itv) } } } @@ -605,7 +605,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return err } for _, s := range stones { - h.tombstones.add(s.ref, s.intervals[0]) + h.tombstones.addInterval(s.ref, s.intervals[0]) } return nil } diff --git a/head_test.go b/head_test.go index 2944bb672..9a8c89364 100644 --- a/head_test.go +++ b/head_test.go @@ -315,7 +315,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = memTombstones{} + head.tombstones = NewMemTombstones() // Delete the ranges. for _, r := range c.intervals { diff --git a/querier.go b/querier.go index b5b9ae050..2e048e495 100644 --- a/querier.go +++ b/querier.go @@ -478,7 +478,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = EmptyTombstoneReader() + tr = NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { diff --git a/querier_test.go b/querier_test.go index 2eb10471c..fd8c7dec5 100644 --- a/querier_test.go +++ b/querier_test.go @@ -457,7 +457,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: EmptyTombstoneReader(), + tombstones: NewMemTombstones(), mint: c.mint, maxt: c.maxt, @@ -557,12 +557,11 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: memTombstones{ + tombstones: &memTombstones{intvlGroups: map[uint64]Intervals{ 1: Intervals{{1, 3}}, 2: Intervals{{1, 3}, {6, 10}}, 3: Intervals{{6, 10}}, - }, - + }}, queries: []query{ { mint: 2, @@ -737,7 +736,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: index.NewListPostings(tc.postings), index: mi, - tombstones: EmptyTombstoneReader(), + tombstones: NewMemTombstones(), } i := 0 diff --git a/tombstones.go b/tombstones.go index 8c760cdce..d4a3d0ef1 100644 --- a/tombstones.go +++ b/tombstones.go @@ -16,12 +16,12 @@ package tsdb import ( "encoding/binary" "fmt" + "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" - - "github.com/pkg/errors" + "sync" ) const tombstoneFilename = "tombstones" @@ -107,10 +107,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (memTombstones, error) { +func readTombstones(dir string) (*memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return memTombstones{}, nil + return NewMemTombstones(), nil } else if err != nil { return nil, err } @@ -140,7 +140,7 @@ func readTombstones(dir string) (memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := memTombstones{} + stonesMap := NewMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -150,27 +150,31 @@ func readTombstones(dir string) (memTombstones, error) { return nil, d.err() } - stonesMap.add(k, Interval{mint, maxt}) + stonesMap.addInterval(k, Interval{mint, maxt}) } return stonesMap, nil } -type memTombstones map[uint64]Intervals - -var emptyTombstoneReader = memTombstones{} - -// EmptyTombstoneReader returns a TombstoneReader that is always empty. -func EmptyTombstoneReader() TombstoneReader { - return emptyTombstoneReader +type memTombstones struct { + intvlGroups map[uint64]Intervals + mtx sync.RWMutex } -func (t memTombstones) Get(ref uint64) (Intervals, error) { - return t[ref], nil +func NewMemTombstones() *memTombstones { + return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } -func (t memTombstones) Iter(f func(uint64, Intervals) error) error { - for ref, ivs := range t { +func (t *memTombstones) Get(ref uint64) (Intervals, error) { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.intvlGroups[ref], nil +} + +func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { + t.mtx.RLock() + defer t.mtx.RUnlock() + for ref, ivs := range t.intvlGroups { if err := f(ref, ivs); err != nil { return err } @@ -178,8 +182,13 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error { return nil } -func (t memTombstones) add(ref uint64, itv Interval) { - t[ref] = t[ref].add(itv) +// addInterval to an existing memTombstones +func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { + t.mtx.Lock() + defer t.mtx.Unlock() + for _, itv := range itvs { + t.intvlGroups[ref] = t.intvlGroups[ref].add(itv) + } } func (memTombstones) Close() error { @@ -208,7 +217,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// This adds the new time-range to the existing ones. +// add the new time-range to the existing ones. // The existing ones must be sorted. func (itvs Intervals) add(n Interval) Intervals { for i, r := range itvs { diff --git a/tombstones_test.go b/tombstones_test.go index c5c15ae17..62bc06818 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "math/rand" "os" + "sync" "testing" "time" @@ -29,7 +30,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := memTombstones{} + stones := NewMemTombstones() // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -40,7 +41,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } - stones[ref] = dranges + stones.addInterval(ref, dranges...) } testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) @@ -121,3 +122,25 @@ func TestAddingNewIntervals(t *testing.T) { } return } + +// TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. +func TestMemTombstonesConcurrency(t *testing.T) { + tomb := NewMemTombstones() + totalRuns := 100 + var wg sync.WaitGroup + wg.Add(2) + + go func() { + for x := 0; x < totalRuns; x++ { + tomb.addInterval(uint64(x), Interval{int64(x), int64(x)}) + } + wg.Done() + }() + go func() { + for x := 0; x < totalRuns; x++ { + tomb.Get(uint64(x)) + } + wg.Done() + }() + wg.Wait() +}