From 84a45cb79a7240305cee69ede7327ca509555c2e Mon Sep 17 00:00:00 2001 From: codwu Date: Fri, 8 Jun 2018 19:52:01 +0800 Subject: [PATCH 1/4] add rwmutex to prevent concurrent map read when delete series Signed-off-by: codwu --- block.go | 6 +++--- block_test.go | 2 +- compact.go | 2 +- db_test.go | 6 +++--- head.go | 8 +++---- head_test.go | 2 +- querier.go | 2 +- querier_test.go | 11 +++------- tombstones.go | 52 ++++++++++++++++++++++++++++++---------------- tombstones_test.go | 28 +++++++++++++++++++++++-- 10 files changed, 77 insertions(+), 42 deletions(-) diff --git a/block.go b/block.go index 9ae2adbde..991c2bdf2 100644 --- a/block.go +++ b/block.go @@ -424,7 +424,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := memTombstones{} + stones := NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -440,7 +440,7 @@ Outer: if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones[p.At()] = Intervals{{tmin, tmax}} + stones.put(p.At(), Intervals{{tmin, tmax}}) continue Outer } } @@ -452,7 +452,7 @@ Outer: err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { for _, iv := range ivs { - stones.add(id, iv) + stones.addInterval(id, iv) pb.meta.Stats.NumTombstones++ } return nil diff --git a/block_test.go b/block_test.go index 5d02a1617..93f145e64 100644 --- a/block_test.go +++ b/block_test.go @@ -67,7 +67,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) - testutil.Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader())) + testutil.Ok(t, writeTombstoneFile(dir, NewMemTombstones())) b, err := OpenBlock(dir, nil) testutil.Ok(t, err) diff --git a/compact.go b/compact.go index 16a3bd747..4119c6c39 100644 --- a/compact.go +++ b/compact.go @@ -472,7 +472,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } diff --git a/db_test.go b/db_test.go index f0f3142c3..17b33bb54 100644 --- a/db_test.go +++ b/db_test.go @@ -778,7 +778,7 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.blocks { - testutil.Equals(t, emptyTombstoneReader, b.tombstones) + testutil.Equals(t, NewMemTombstones(), b.tombstones) } } } @@ -807,8 +807,8 @@ func TestTombstoneCleanFail(t *testing.T) { block := createEmptyBlock(t, blockDir, meta) // Add some some fake tombstones to trigger the compaction. - tomb := memTombstones{} - tomb[0] = Intervals{{0, 1}} + tomb := NewMemTombstones() + tomb.put(0,Intervals{{0, 1}}) block.tombstones = tomb db.blocks = append(db.blocks, block) diff --git a/head.go b/head.go index 2c7c7ec38..c3fcfa35d 100644 --- a/head.go +++ b/head.go @@ -69,7 +69,7 @@ type Head struct { postings *index.MemPostings // postings lists for terms - tombstones memTombstones + tombstones *memTombstones } type headMetrics struct { @@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: memTombstones{}, + tombstones: NewMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error { if itv.Maxt < mint { continue } - h.tombstones.add(s.ref, itv) + h.tombstones.addInterval(s.ref, itv) } } } @@ -602,7 +602,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return err } for _, s := range stones { - h.tombstones.add(s.ref, s.intervals[0]) + h.tombstones.addInterval(s.ref, s.intervals[0]) } return nil } diff --git a/head_test.go b/head_test.go index caf4d652c..1e49234ea 100644 --- a/head_test.go +++ b/head_test.go @@ -300,7 +300,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = memTombstones{} + head.tombstones = NewMemTombstones() // Delete the ranges. for _, r := range c.intervals { diff --git a/querier.go b/querier.go index b5b9ae050..2e048e495 100644 --- a/querier.go +++ b/querier.go @@ -478,7 +478,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = EmptyTombstoneReader() + tr = NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { diff --git a/querier_test.go b/querier_test.go index 2eb10471c..7032f2f3a 100644 --- a/querier_test.go +++ b/querier_test.go @@ -457,7 +457,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: EmptyTombstoneReader(), + tombstones: NewMemTombstones(), mint: c.mint, maxt: c.maxt, @@ -557,12 +557,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: memTombstones{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, - 3: Intervals{{6, 10}}, - }, - + tombstones: NewMemTombstones().put(1, Intervals{{1, 3}}).put(2, Intervals{{1, 3}, {6, 10}}).put(3, Intervals{{6, 10}}), queries: []query{ { mint: 2, @@ -737,7 +732,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: index.NewListPostings(tc.postings), index: mi, - tombstones: EmptyTombstoneReader(), + tombstones: NewMemTombstones(), } i := 0 diff --git a/tombstones.go b/tombstones.go index 8c760cdce..3cf145f5c 100644 --- a/tombstones.go +++ b/tombstones.go @@ -21,6 +21,8 @@ import ( "os" "path/filepath" + "sync" + "github.com/pkg/errors" ) @@ -107,10 +109,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (memTombstones, error) { +func readTombstones(dir string) (*memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return memTombstones{}, nil + return NewMemTombstones(), nil } else if err != nil { return nil, err } @@ -140,7 +142,7 @@ func readTombstones(dir string) (memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := memTombstones{} + stonesMap := NewMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -150,27 +152,31 @@ func readTombstones(dir string) (memTombstones, error) { return nil, d.err() } - stonesMap.add(k, Interval{mint, maxt}) + stonesMap.addInterval(k, Interval{mint, maxt}) } return stonesMap, nil } -type memTombstones map[uint64]Intervals - -var emptyTombstoneReader = memTombstones{} - -// EmptyTombstoneReader returns a TombstoneReader that is always empty. -func EmptyTombstoneReader() TombstoneReader { - return emptyTombstoneReader +type memTombstones struct { + mts map[uint64]Intervals + mtx sync.RWMutex } -func (t memTombstones) Get(ref uint64) (Intervals, error) { - return t[ref], nil +func NewMemTombstones() *memTombstones { + return &memTombstones{mts: make(map[uint64]Intervals)} } -func (t memTombstones) Iter(f func(uint64, Intervals) error) error { - for ref, ivs := range t { +func (t *memTombstones) Get(ref uint64) (Intervals, error) { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.mts[ref], nil +} + +func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { + t.mtx.Lock() + defer t.mtx.Unlock() + for ref, ivs := range t.mts { if err := f(ref, ivs); err != nil { return err } @@ -178,8 +184,18 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error { return nil } -func (t memTombstones) add(ref uint64, itv Interval) { - t[ref] = t[ref].add(itv) +// addInterval to an existing memTombstones +func (t *memTombstones) addInterval(ref uint64, itv Interval) { + t.mtx.Lock() + t.mts[ref] = t.mts[ref].add(itv) + t.mtx.Unlock() +} + +func (t *memTombstones) put(ref uint64, itvs Intervals) *memTombstones { + t.mtx.Lock() + defer t.mtx.Unlock() + t.mts[ref] = itvs + return t } func (memTombstones) Close() error { @@ -208,7 +224,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// This adds the new time-range to the existing ones. +// add the new time-range to the existing ones. // The existing ones must be sorted. func (itvs Intervals) add(n Interval) Intervals { for i, r := range itvs { diff --git a/tombstones_test.go b/tombstones_test.go index c5c15ae17..05483e095 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "math/rand" "os" + "sync" "testing" "time" @@ -29,7 +30,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := memTombstones{} + stones := NewMemTombstones() // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -40,7 +41,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } - stones[ref] = dranges + stones.put(ref, dranges) } testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) @@ -121,3 +122,26 @@ func TestAddingNewIntervals(t *testing.T) { } return } + +// TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. +func TestMemTombstonesConcurrency(t *testing.T) { + tomb := NewMemTombstones() + totalRuns := 100 + var wg sync.WaitGroup + wg.Add(2) + + go func() { + for x := 0; x < totalRuns; x++ { + tomb.put(uint64(x), Intervals{{int64(x), int64(x)}}) + tomb.addInterval(uint64(x), Interval{int64(x), int64(x)}) + } + wg.Done() + }() + go func() { + for x := 0; x < totalRuns; x++ { + tomb.Get(uint64(x)) + } + wg.Done() + }() + wg.Wait() +} From cd145c90d58a304b50c0f3575a02ddad37368d0b Mon Sep 17 00:00:00 2001 From: codwu Date: Mon, 25 Jun 2018 21:52:11 +0800 Subject: [PATCH 2/4] remove `put` function and use RLock in `Iter` function Signed-off-by: codwu --- block.go | 2 +- db_test.go | 2 +- querier_test.go | 2 +- tombstones.go | 20 +++++++------------- tombstones_test.go | 3 +-- 5 files changed, 11 insertions(+), 18 deletions(-) diff --git a/block.go b/block.go index 991c2bdf2..1352553ae 100644 --- a/block.go +++ b/block.go @@ -440,7 +440,7 @@ Outer: if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones.put(p.At(), Intervals{{tmin, tmax}}) + stones.addInterval(p.At(), Interval{tmin, tmax}) continue Outer } } diff --git a/db_test.go b/db_test.go index 17b33bb54..f62bf0c4a 100644 --- a/db_test.go +++ b/db_test.go @@ -808,7 +808,7 @@ func TestTombstoneCleanFail(t *testing.T) { // Add some some fake tombstones to trigger the compaction. tomb := NewMemTombstones() - tomb.put(0,Intervals{{0, 1}}) + tomb.addInterval(0, Interval{0, 1}) block.tombstones = tomb db.blocks = append(db.blocks, block) diff --git a/querier_test.go b/querier_test.go index 7032f2f3a..51b01de1c 100644 --- a/querier_test.go +++ b/querier_test.go @@ -557,7 +557,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: NewMemTombstones().put(1, Intervals{{1, 3}}).put(2, Intervals{{1, 3}, {6, 10}}).put(3, Intervals{{6, 10}}), + tombstones: NewMemTombstones().addInterval(1, Interval{1, 3}).addInterval(2, Interval{1, 3}, Interval{6, 10}).addInterval(3, Interval{6, 10}), queries: []query{ { mint: 2, diff --git a/tombstones.go b/tombstones.go index 3cf145f5c..733a81afb 100644 --- a/tombstones.go +++ b/tombstones.go @@ -16,14 +16,12 @@ package tsdb import ( "encoding/binary" "fmt" + "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" - "sync" - - "github.com/pkg/errors" ) const tombstoneFilename = "tombstones" @@ -174,8 +172,8 @@ func (t *memTombstones) Get(ref uint64) (Intervals, error) { } func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { - t.mtx.Lock() - defer t.mtx.Unlock() + t.mtx.RLock() + defer t.mtx.RUnlock() for ref, ivs := range t.mts { if err := f(ref, ivs); err != nil { return err @@ -185,16 +183,12 @@ func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { } // addInterval to an existing memTombstones -func (t *memTombstones) addInterval(ref uint64, itv Interval) { - t.mtx.Lock() - t.mts[ref] = t.mts[ref].add(itv) - t.mtx.Unlock() -} - -func (t *memTombstones) put(ref uint64, itvs Intervals) *memTombstones { +func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) *memTombstones { t.mtx.Lock() defer t.mtx.Unlock() - t.mts[ref] = itvs + for _, itv := range itvs { + t.mts[ref] = t.mts[ref].add(itv) + } return t } diff --git a/tombstones_test.go b/tombstones_test.go index 05483e095..62bc06818 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -41,7 +41,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } - stones.put(ref, dranges) + stones.addInterval(ref, dranges...) } testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) @@ -132,7 +132,6 @@ func TestMemTombstonesConcurrency(t *testing.T) { go func() { for x := 0; x < totalRuns; x++ { - tomb.put(uint64(x), Intervals{{int64(x), int64(x)}}) tomb.addInterval(uint64(x), Interval{int64(x), int64(x)}) } wg.Done() From e4444ca48c0aa8415aeb29b27a23fb4d7cd211ec Mon Sep 17 00:00:00 2001 From: codwu Date: Fri, 6 Jul 2018 20:30:27 +0800 Subject: [PATCH 3/4] update `addInterval` function and test. Signed-off-by: codwu --- querier_test.go | 6 +++++- tombstones.go | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/querier_test.go b/querier_test.go index 51b01de1c..d3ddec407 100644 --- a/querier_test.go +++ b/querier_test.go @@ -557,7 +557,11 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: NewMemTombstones().addInterval(1, Interval{1, 3}).addInterval(2, Interval{1, 3}, Interval{6, 10}).addInterval(3, Interval{6, 10}), + tombstones: &memTombstones{mts: map[uint64]Intervals{ + 1: Intervals{{1, 3}}, + 2: Intervals{{1, 3}, {6, 10}}, + 3: Intervals{{6, 10}}, + }}, queries: []query{ { mint: 2, diff --git a/tombstones.go b/tombstones.go index 733a81afb..03389de72 100644 --- a/tombstones.go +++ b/tombstones.go @@ -183,13 +183,12 @@ func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { } // addInterval to an existing memTombstones -func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) *memTombstones { +func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { t.mtx.Lock() defer t.mtx.Unlock() for _, itv := range itvs { t.mts[ref] = t.mts[ref].add(itv) } - return t } func (memTombstones) Close() error { From bc6ef0b94e0a58d302480550635a2d4e7c01bf78 Mon Sep 17 00:00:00 2001 From: codwu Date: Tue, 10 Jul 2018 21:24:13 +0800 Subject: [PATCH 4/4] rename `mts` to `intvlGroups` Signed-off-by: codwu --- querier_test.go | 2 +- tombstones.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/querier_test.go b/querier_test.go index d3ddec407..fd8c7dec5 100644 --- a/querier_test.go +++ b/querier_test.go @@ -557,7 +557,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: &memTombstones{mts: map[uint64]Intervals{ + tombstones: &memTombstones{intvlGroups: map[uint64]Intervals{ 1: Intervals{{1, 3}}, 2: Intervals{{1, 3}, {6, 10}}, 3: Intervals{{6, 10}}, diff --git a/tombstones.go b/tombstones.go index 03389de72..d4a3d0ef1 100644 --- a/tombstones.go +++ b/tombstones.go @@ -157,24 +157,24 @@ func readTombstones(dir string) (*memTombstones, error) { } type memTombstones struct { - mts map[uint64]Intervals - mtx sync.RWMutex + intvlGroups map[uint64]Intervals + mtx sync.RWMutex } func NewMemTombstones() *memTombstones { - return &memTombstones{mts: make(map[uint64]Intervals)} + return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } func (t *memTombstones) Get(ref uint64) (Intervals, error) { t.mtx.RLock() defer t.mtx.RUnlock() - return t.mts[ref], nil + return t.intvlGroups[ref], nil } func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { t.mtx.RLock() defer t.mtx.RUnlock() - for ref, ivs := range t.mts { + for ref, ivs := range t.intvlGroups { if err := f(ref, ivs); err != nil { return err } @@ -187,7 +187,7 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { t.mtx.Lock() defer t.mtx.Unlock() for _, itv := range itvs { - t.mts[ref] = t.mts[ref].add(itv) + t.intvlGroups[ref] = t.intvlGroups[ref].add(itv) } }