From 84a45cb79a7240305cee69ede7327ca509555c2e Mon Sep 17 00:00:00 2001 From: codwu Date: Fri, 8 Jun 2018 19:52:01 +0800 Subject: [PATCH] add rwmutex to prevent concurrent map read when delete series Signed-off-by: codwu --- block.go | 6 +++--- block_test.go | 2 +- compact.go | 2 +- db_test.go | 6 +++--- head.go | 8 +++---- head_test.go | 2 +- querier.go | 2 +- querier_test.go | 11 +++------- tombstones.go | 52 ++++++++++++++++++++++++++++++---------------- tombstones_test.go | 28 +++++++++++++++++++++++-- 10 files changed, 77 insertions(+), 42 deletions(-) diff --git a/block.go b/block.go index 9ae2adbde0..991c2bdf2e 100644 --- a/block.go +++ b/block.go @@ -424,7 +424,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := memTombstones{} + stones := NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -440,7 +440,7 @@ Outer: if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones[p.At()] = Intervals{{tmin, tmax}} + stones.put(p.At(), Intervals{{tmin, tmax}}) continue Outer } } @@ -452,7 +452,7 @@ Outer: err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { for _, iv := range ivs { - stones.add(id, iv) + stones.addInterval(id, iv) pb.meta.Stats.NumTombstones++ } return nil diff --git a/block_test.go b/block_test.go index 5d02a16171..93f145e643 100644 --- a/block_test.go +++ b/block_test.go @@ -67,7 +67,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) - testutil.Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader())) + testutil.Ok(t, writeTombstoneFile(dir, NewMemTombstones())) b, err := OpenBlock(dir, nil) testutil.Ok(t, err) diff --git a/compact.go b/compact.go index 16a3bd7471..4119c6c39e 100644 --- a/compact.go +++ b/compact.go @@ -472,7 +472,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } diff --git a/db_test.go b/db_test.go index f0f3142c37..17b33bb549 100644 --- a/db_test.go +++ b/db_test.go @@ -778,7 +778,7 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.blocks { - testutil.Equals(t, emptyTombstoneReader, b.tombstones) + testutil.Equals(t, NewMemTombstones(), b.tombstones) } } } @@ -807,8 +807,8 @@ func TestTombstoneCleanFail(t *testing.T) { block := createEmptyBlock(t, blockDir, meta) // Add some some fake tombstones to trigger the compaction. - tomb := memTombstones{} - tomb[0] = Intervals{{0, 1}} + tomb := NewMemTombstones() + tomb.put(0,Intervals{{0, 1}}) block.tombstones = tomb db.blocks = append(db.blocks, block) diff --git a/head.go b/head.go index 2c7c7ec38a..c3fcfa35d5 100644 --- a/head.go +++ b/head.go @@ -69,7 +69,7 @@ type Head struct { postings *index.MemPostings // postings lists for terms - tombstones memTombstones + tombstones *memTombstones } type headMetrics struct { @@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: memTombstones{}, + tombstones: NewMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error { if itv.Maxt < mint { continue } - h.tombstones.add(s.ref, itv) + h.tombstones.addInterval(s.ref, itv) } } } @@ -602,7 +602,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return err } for _, s := range stones { - h.tombstones.add(s.ref, s.intervals[0]) + h.tombstones.addInterval(s.ref, s.intervals[0]) } return nil } diff --git a/head_test.go b/head_test.go index caf4d652c5..1e49234ea2 100644 --- a/head_test.go +++ b/head_test.go @@ -300,7 +300,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = memTombstones{} + head.tombstones = NewMemTombstones() // Delete the ranges. for _, r := range c.intervals { diff --git a/querier.go b/querier.go index b5b9ae050d..2e048e4952 100644 --- a/querier.go +++ b/querier.go @@ -478,7 +478,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = EmptyTombstoneReader() + tr = NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { diff --git a/querier_test.go b/querier_test.go index 2eb10471c2..7032f2f3a5 100644 --- a/querier_test.go +++ b/querier_test.go @@ -457,7 +457,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: EmptyTombstoneReader(), + tombstones: NewMemTombstones(), mint: c.mint, maxt: c.maxt, @@ -557,12 +557,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: memTombstones{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, - 3: Intervals{{6, 10}}, - }, - + tombstones: NewMemTombstones().put(1, Intervals{{1, 3}}).put(2, Intervals{{1, 3}, {6, 10}}).put(3, Intervals{{6, 10}}), queries: []query{ { mint: 2, @@ -737,7 +732,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: index.NewListPostings(tc.postings), index: mi, - tombstones: EmptyTombstoneReader(), + tombstones: NewMemTombstones(), } i := 0 diff --git a/tombstones.go b/tombstones.go index 8c760cdcec..3cf145f5c2 100644 --- a/tombstones.go +++ b/tombstones.go @@ -21,6 +21,8 @@ import ( "os" "path/filepath" + "sync" + "github.com/pkg/errors" ) @@ -107,10 +109,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (memTombstones, error) { +func readTombstones(dir string) (*memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return memTombstones{}, nil + return NewMemTombstones(), nil } else if err != nil { return nil, err } @@ -140,7 +142,7 @@ func readTombstones(dir string) (memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := memTombstones{} + stonesMap := NewMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -150,27 +152,31 @@ func readTombstones(dir string) (memTombstones, error) { return nil, d.err() } - stonesMap.add(k, Interval{mint, maxt}) + stonesMap.addInterval(k, Interval{mint, maxt}) } return stonesMap, nil } -type memTombstones map[uint64]Intervals - -var emptyTombstoneReader = memTombstones{} - -// EmptyTombstoneReader returns a TombstoneReader that is always empty. -func EmptyTombstoneReader() TombstoneReader { - return emptyTombstoneReader +type memTombstones struct { + mts map[uint64]Intervals + mtx sync.RWMutex } -func (t memTombstones) Get(ref uint64) (Intervals, error) { - return t[ref], nil +func NewMemTombstones() *memTombstones { + return &memTombstones{mts: make(map[uint64]Intervals)} } -func (t memTombstones) Iter(f func(uint64, Intervals) error) error { - for ref, ivs := range t { +func (t *memTombstones) Get(ref uint64) (Intervals, error) { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.mts[ref], nil +} + +func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { + t.mtx.Lock() + defer t.mtx.Unlock() + for ref, ivs := range t.mts { if err := f(ref, ivs); err != nil { return err } @@ -178,8 +184,18 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error { return nil } -func (t memTombstones) add(ref uint64, itv Interval) { - t[ref] = t[ref].add(itv) +// addInterval to an existing memTombstones +func (t *memTombstones) addInterval(ref uint64, itv Interval) { + t.mtx.Lock() + t.mts[ref] = t.mts[ref].add(itv) + t.mtx.Unlock() +} + +func (t *memTombstones) put(ref uint64, itvs Intervals) *memTombstones { + t.mtx.Lock() + defer t.mtx.Unlock() + t.mts[ref] = itvs + return t } func (memTombstones) Close() error { @@ -208,7 +224,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// This adds the new time-range to the existing ones. +// add the new time-range to the existing ones. // The existing ones must be sorted. func (itvs Intervals) add(n Interval) Intervals { for i, r := range itvs { diff --git a/tombstones_test.go b/tombstones_test.go index c5c15ae17c..05483e0953 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "math/rand" "os" + "sync" "testing" "time" @@ -29,7 +30,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := memTombstones{} + stones := NewMemTombstones() // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -40,7 +41,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } - stones[ref] = dranges + stones.put(ref, dranges) } testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) @@ -121,3 +122,26 @@ func TestAddingNewIntervals(t *testing.T) { } return } + +// TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. +func TestMemTombstonesConcurrency(t *testing.T) { + tomb := NewMemTombstones() + totalRuns := 100 + var wg sync.WaitGroup + wg.Add(2) + + go func() { + for x := 0; x < totalRuns; x++ { + tomb.put(uint64(x), Intervals{{int64(x), int64(x)}}) + tomb.addInterval(uint64(x), Interval{int64(x), int64(x)}) + } + wg.Done() + }() + go func() { + for x := 0; x < totalRuns; x++ { + tomb.Get(uint64(x)) + } + wg.Done() + }() + wg.Wait() +}