Merge pull request #330 from codwu/tsdb-delete

add rwmutex to prevent concurrent map read when delete series
This commit is contained in:
Fabian Reinartz 2018-07-11 13:21:26 +02:00 committed by GitHub
commit 99a2c4314f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 72 additions and 41 deletions

View file

@ -434,7 +434,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
stones := memTombstones{} stones := NewMemTombstones()
var lset labels.Labels var lset labels.Labels
var chks []chunks.Meta var chks []chunks.Meta
@ -450,7 +450,7 @@ Outer:
if chk.OverlapsClosedInterval(mint, maxt) { if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}} stones.addInterval(p.At(), Interval{tmin, tmax})
continue Outer continue Outer
} }
} }
@ -462,7 +462,7 @@ Outer:
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _, iv := range ivs { for _, iv := range ivs {
stones.add(id, iv) stones.addInterval(id, iv)
pb.meta.Stats.NumTombstones++ pb.meta.Stats.NumTombstones++
} }
return nil return nil

View file

@ -67,7 +67,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777))
testutil.Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader())) testutil.Ok(t, writeTombstoneFile(dir, NewMemTombstones()))
b, err := OpenBlock(dir, nil) b, err := OpenBlock(dir, nil)
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -483,7 +483,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
// Create an empty tombstones file. // Create an empty tombstones file.
if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file") return errors.Wrap(err, "write new tombstones file")
} }

View file

@ -780,7 +780,7 @@ func TestTombstoneClean(t *testing.T) {
} }
for _, b := range db.blocks { for _, b := range db.blocks {
testutil.Equals(t, emptyTombstoneReader, b.tombstones) testutil.Equals(t, NewMemTombstones(), b.tombstones)
} }
} }
} }
@ -809,8 +809,8 @@ func TestTombstoneCleanFail(t *testing.T) {
block := createEmptyBlock(t, blockDir, meta) block := createEmptyBlock(t, blockDir, meta)
// Add some some fake tombstones to trigger the compaction. // Add some some fake tombstones to trigger the compaction.
tomb := memTombstones{} tomb := NewMemTombstones()
tomb[0] = Intervals{{0, 1}} tomb.addInterval(0, Interval{0, 1})
block.tombstones = tomb block.tombstones = tomb
db.blocks = append(db.blocks, block) db.blocks = append(db.blocks, block)

View file

@ -69,7 +69,7 @@ type Head struct {
postings *index.MemPostings // postings lists for terms postings *index.MemPostings // postings lists for terms
tombstones memTombstones tombstones *memTombstones
} }
type headMetrics struct { type headMetrics struct {
@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(), postings: index.NewUnorderedMemPostings(),
tombstones: memTombstones{}, tombstones: NewMemTombstones(),
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error {
if itv.Maxt < mint { if itv.Maxt < mint {
continue continue
} }
h.tombstones.add(s.ref, itv) h.tombstones.addInterval(s.ref, itv)
} }
} }
} }
@ -605,7 +605,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return err return err
} }
for _, s := range stones { for _, s := range stones {
h.tombstones.add(s.ref, s.intervals[0]) h.tombstones.addInterval(s.ref, s.intervals[0])
} }
return nil return nil
} }

View file

@ -315,7 +315,7 @@ func TestHeadDeleteSimple(t *testing.T) {
Outer: Outer:
for _, c := range cases { for _, c := range cases {
// Reset the tombstones. // Reset the tombstones.
head.tombstones = memTombstones{} head.tombstones = NewMemTombstones()
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.intervals {

View file

@ -478,7 +478,7 @@ type baseChunkSeries struct {
// over them. It drops chunks based on tombstones in the given reader. // over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
if tr == nil { if tr == nil {
tr = EmptyTombstoneReader() tr = NewMemTombstones()
} }
p, err := PostingsForMatchers(ir, ms...) p, err := PostingsForMatchers(ir, ms...)
if err != nil { if err != nil {

View file

@ -457,7 +457,7 @@ Outer:
querier := &blockQuerier{ querier := &blockQuerier{
index: ir, index: ir,
chunks: cr, chunks: cr,
tombstones: EmptyTombstoneReader(), tombstones: NewMemTombstones(),
mint: c.mint, mint: c.mint,
maxt: c.maxt, maxt: c.maxt,
@ -557,12 +557,11 @@ func TestBlockQuerierDelete(t *testing.T) {
}, },
}, },
}, },
tombstones: memTombstones{ tombstones: &memTombstones{intvlGroups: map[uint64]Intervals{
1: Intervals{{1, 3}}, 1: Intervals{{1, 3}},
2: Intervals{{1, 3}, {6, 10}}, 2: Intervals{{1, 3}, {6, 10}},
3: Intervals{{6, 10}}, 3: Intervals{{6, 10}},
}, }},
queries: []query{ queries: []query{
{ {
mint: 2, mint: 2,
@ -737,7 +736,7 @@ func TestBaseChunkSeries(t *testing.T) {
bcs := &baseChunkSeries{ bcs := &baseChunkSeries{
p: index.NewListPostings(tc.postings), p: index.NewListPostings(tc.postings),
index: mi, index: mi,
tombstones: EmptyTombstoneReader(), tombstones: NewMemTombstones(),
} }
i := 0 i := 0

View file

@ -16,12 +16,12 @@ package tsdb
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/pkg/errors"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/pkg/errors"
) )
const tombstoneFilename = "tombstones" const tombstoneFilename = "tombstones"
@ -107,10 +107,10 @@ type Stone struct {
intervals Intervals intervals Intervals
} }
func readTombstones(dir string) (memTombstones, error) { func readTombstones(dir string) (*memTombstones, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return memTombstones{}, nil return NewMemTombstones(), nil
} else if err != nil { } else if err != nil {
return nil, err return nil, err
} }
@ -140,7 +140,7 @@ func readTombstones(dir string) (memTombstones, error) {
return nil, errors.New("checksum did not match") return nil, errors.New("checksum did not match")
} }
stonesMap := memTombstones{} stonesMap := NewMemTombstones()
for d.len() > 0 { for d.len() > 0 {
k := d.uvarint64() k := d.uvarint64()
@ -150,27 +150,31 @@ func readTombstones(dir string) (memTombstones, error) {
return nil, d.err() return nil, d.err()
} }
stonesMap.add(k, Interval{mint, maxt}) stonesMap.addInterval(k, Interval{mint, maxt})
} }
return stonesMap, nil return stonesMap, nil
} }
type memTombstones map[uint64]Intervals type memTombstones struct {
intvlGroups map[uint64]Intervals
var emptyTombstoneReader = memTombstones{} mtx sync.RWMutex
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
func EmptyTombstoneReader() TombstoneReader {
return emptyTombstoneReader
} }
func (t memTombstones) Get(ref uint64) (Intervals, error) { func NewMemTombstones() *memTombstones {
return t[ref], nil return &memTombstones{intvlGroups: make(map[uint64]Intervals)}
} }
func (t memTombstones) Iter(f func(uint64, Intervals) error) error { func (t *memTombstones) Get(ref uint64) (Intervals, error) {
for ref, ivs := range t { t.mtx.RLock()
defer t.mtx.RUnlock()
return t.intvlGroups[ref], nil
}
func (t *memTombstones) Iter(f func(uint64, Intervals) error) error {
t.mtx.RLock()
defer t.mtx.RUnlock()
for ref, ivs := range t.intvlGroups {
if err := f(ref, ivs); err != nil { if err := f(ref, ivs); err != nil {
return err return err
} }
@ -178,8 +182,13 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
return nil return nil
} }
func (t memTombstones) add(ref uint64, itv Interval) { // addInterval to an existing memTombstones
t[ref] = t[ref].add(itv) func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
t.mtx.Lock()
defer t.mtx.Unlock()
for _, itv := range itvs {
t.intvlGroups[ref] = t.intvlGroups[ref].add(itv)
}
} }
func (memTombstones) Close() error { func (memTombstones) Close() error {
@ -208,7 +217,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool {
// Intervals represents a set of increasing and non-overlapping time-intervals. // Intervals represents a set of increasing and non-overlapping time-intervals.
type Intervals []Interval type Intervals []Interval
// This adds the new time-range to the existing ones. // add the new time-range to the existing ones.
// The existing ones must be sorted. // The existing ones must be sorted.
func (itvs Intervals) add(n Interval) Intervals { func (itvs Intervals) add(n Interval) Intervals {
for i, r := range itvs { for i, r := range itvs {

View file

@ -17,6 +17,7 @@ import (
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
@ -29,7 +30,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
ref := uint64(0) ref := uint64(0)
stones := memTombstones{} stones := NewMemTombstones()
// Generate the tombstones. // Generate the tombstones.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
ref += uint64(rand.Int31n(10)) + 1 ref += uint64(rand.Int31n(10)) + 1
@ -40,7 +41,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)})
mint += rand.Int63n(1000) + 1 mint += rand.Int63n(1000) + 1
} }
stones[ref] = dranges stones.addInterval(ref, dranges...)
} }
testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) testutil.Ok(t, writeTombstoneFile(tmpdir, stones))
@ -121,3 +122,25 @@ func TestAddingNewIntervals(t *testing.T) {
} }
return return
} }
// TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines.
func TestMemTombstonesConcurrency(t *testing.T) {
tomb := NewMemTombstones()
totalRuns := 100
var wg sync.WaitGroup
wg.Add(2)
go func() {
for x := 0; x < totalRuns; x++ {
tomb.addInterval(uint64(x), Interval{int64(x), int64(x)})
}
wg.Done()
}()
go func() {
for x := 0; x < totalRuns; x++ {
tomb.Get(uint64(x))
}
wg.Done()
}()
wg.Wait()
}