From c2cfa656d71da015f5a6adbefbf80e1bff02dd67 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Sat, 11 Jul 2020 19:41:02 +0100 Subject: [PATCH] Do not merge: Test showing race of compact vs append. Signed-off-by: Bartlomiej Plotka --- tsdb/compact.go | 8 ++++++- tsdb/head.go | 7 ++++++ tsdb/head_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index acbdbf6443..a2ace70bd0 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -656,7 +656,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, defer func() { var merr tsdb_errors.MultiError merr.Add(err) - merr.Add(closeAll(closers)) + if cerr := closeAll(closers); cerr != nil { + merr.Add(errors.Wrap(err, "close")) + } err = merr.Err() c.metrics.populatingBlocks.Set(0) }() @@ -707,7 +709,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, all = indexr.SortedPostings(all) s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) + syms := indexr.Symbols() + // When we are here unblock append as we already got postings with extra one, and symbols + // with missing symbol for new posting. + testSimulation1.Unlock() if i == 0 { set = s diff --git a/tsdb/head.go b/tsdb/head.go index 3b1d4ad057..42f5dba0fe 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -46,6 +46,10 @@ var ( ErrInvalidSample = errors.New("invalid sample") ) +// TEST! +var testSimulation1 sync.Mutex +var testSimulation2 sync.Mutex + // Head handles reads and writes of time series data within a time window. type Head struct { // Keep all 64bit atomically accessed variables at the top of this struct. @@ -1694,6 +1698,9 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.postings.Add(id, lset) + testSimulation1.Lock() + testSimulation1.Unlock() + h.symMtx.Lock() defer h.symMtx.Unlock() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6c5e466587..fbf397a39c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io/ioutil" "math" @@ -24,6 +25,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -1875,3 +1877,58 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { }) } } + +// Regression test showing race between compact and append. +func TestHeadCompactionRace(t *testing.T) { + db, closeFn := openTestDB(t, &Options{ + RetentionDuration: 100000000, + NoLockfile: true, + MinBlockDuration: 1000000, + MaxBlockDuration: 1000000, + }, []int64{1000000}) + t.Cleanup(func() { + closeFn() + testutil.Ok(t, db.Close()) + }) + + head := db.Head() + + app := head.Appender() + _, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v"}}, 10, 10) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + var wg sync.WaitGroup + + // Simulate racy append where posting is added but symbol not. Symbol is added only after symMtx.Lock. + testSimulation1.Lock() + + wg.Add(1) + go func() { + defer wg.Done() + app := head.Appender() + _, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v2"}}, 10, 10) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + }() + + // Wait until posting from append will be added indicating locked append state. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + t.Fatal("timeout") + case <-time.After(100 * time.Millisecond): + } + + if len(head.postings.SortedKeys()) == 3 { + // Locked on 3rd posting (all, n=v, n=v2). + break + } + } + + // Compaction fails reproducing https://github.com/prometheus/prometheus/issues/7373 + testutil.Ok(t, db.CompactHead(NewRangeHead(head, 0, 10000000))) + wg.Wait() +}