diff --git a/tsdb/compact.go b/tsdb/compact.go index acbdbf6443..1f172c8267 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -656,7 +656,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, defer func() { var merr tsdb_errors.MultiError merr.Add(err) - merr.Add(closeAll(closers)) + if cerr := closeAll(closers); cerr != nil { + merr.Add(errors.Wrap(err, "close")) + } err = merr.Err() c.metrics.populatingBlocks.Set(0) }() @@ -708,7 +710,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) syms := indexr.Symbols() - if i == 0 { set = s symbols = syms diff --git a/tsdb/head.go b/tsdb/head.go index 3b1d4ad057..7bef68a85e 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -46,6 +46,10 @@ var ( ErrInvalidSample = errors.New("invalid sample") ) +// TEST! +var testSimulation1 sync.Mutex +var testSimulation2 sync.Mutex + // Head handles reads and writes of time series data within a time window. type Head struct { // Keep all 64bit atomically accessed variables at the top of this struct. @@ -1104,6 +1108,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro if err != nil { return 0, err } + if created { a.series = append(a.series, record.RefSeries{ Ref: s.ref, @@ -1282,6 +1287,8 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return nil } +var lockThings bool + // gc removes data before the minimum timestamp from the head. func (h *Head) gc() { // Only data strictly lower than this timestamp must be deleted. @@ -1337,6 +1344,13 @@ func (h *Head) gc() { panic(err) } + if lockThings { + // Test purposes. Simulate runtime freezing this go routine and allowing append to continue. + testSimulation1.Unlock() + testSimulation2.Lock() + testSimulation2.Unlock() + } + h.symMtx.Lock() h.symbols = symbols @@ -1692,8 +1706,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.metrics.seriesCreated.Inc() atomic.AddUint64(&h.numSeries, 1) - h.postings.Add(id, lset) - h.symMtx.Lock() defer h.symMtx.Unlock() @@ -1709,6 +1721,13 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.symbols[l.Value] = struct{}{} } + h.postings.Add(id, lset) + + if lockThings { + // Allow GC to continue and append wrong symbol with missing just added one! ): + testSimulation2.Unlock() + } + return s, true, nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6c5e466587..22676dde80 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1875,3 +1875,57 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { }) } } + +// Regression test showing race between compact and append. +func TestHeadCompactionRace(t *testing.T) { + db, closeFn := openTestDB(t, &Options{ + RetentionDuration: 100000000, + NoLockfile: true, + MinBlockDuration: 1000000, + MaxBlockDuration: 1000000, + }, []int64{1000000}) + t.Cleanup(func() { + closeFn() + testutil.Ok(t, db.Close()) + }) + + head := db.Head() + + app := head.Appender() + _, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v"}}, 10, 10) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + var wg sync.WaitGroup + + // Simulate racy gc vs append + testSimulation1.Lock() + testSimulation2.Lock() + + wg.Add(1) + go func() { + defer wg.Done() + + testSimulation1.Lock() + testSimulation1.Unlock() + app := head.Appender() + _, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v2"}}, 10, 10) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + }() + lockThings = true + head.gc() + wg.Wait() + + r, err := db.head.Index() + testutil.Ok(t, err) + + all := r.Symbols() + var s []string + for all.Next() { + s = append(s, all.At()) + } + testutil.Ok(t, all.Err()) + testutil.Equals(t, []string{"", "n", "v2"}, s) + +}