Do not merge: Test showing race of compact vs append (with our reverted "fix")

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-07-11 19:41:02 +01:00
parent 492061b24c
commit 6e50624ef5
3 changed files with 78 additions and 4 deletions

View file

@ -656,7 +656,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
defer func() {
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(closeAll(closers))
if cerr := closeAll(closers); cerr != nil {
merr.Add(errors.Wrap(err, "close"))
}
err = merr.Err()
c.metrics.populatingBlocks.Set(0)
}()
@ -708,7 +710,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
syms := indexr.Symbols()
if i == 0 {
set = s
symbols = syms

View file

@ -46,6 +46,10 @@ var (
ErrInvalidSample = errors.New("invalid sample")
)
// TEST!
var testSimulation1 sync.Mutex
var testSimulation2 sync.Mutex
// Head handles reads and writes of time series data within a time window.
type Head struct {
// Keep all 64bit atomically accessed variables at the top of this struct.
@ -1104,6 +1108,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
if err != nil {
return 0, err
}
if created {
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
@ -1282,6 +1287,8 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
return nil
}
var lockThings bool
// gc removes data before the minimum timestamp from the head.
func (h *Head) gc() {
// Only data strictly lower than this timestamp must be deleted.
@ -1337,6 +1344,13 @@ func (h *Head) gc() {
panic(err)
}
if lockThings {
// Test purposes. Simulate runtime freezing this go routine and allowing append to continue.
testSimulation1.Unlock()
testSimulation2.Lock()
testSimulation2.Unlock()
}
h.symMtx.Lock()
h.symbols = symbols
@ -1692,8 +1706,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
h.metrics.seriesCreated.Inc()
atomic.AddUint64(&h.numSeries, 1)
h.postings.Add(id, lset)
h.symMtx.Lock()
defer h.symMtx.Unlock()
@ -1709,6 +1721,13 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
h.symbols[l.Value] = struct{}{}
}
h.postings.Add(id, lset)
if lockThings {
// Allow GC to continue and append wrong symbol with missing just added one! ):
testSimulation2.Unlock()
}
return s, true, nil
}

View file

@ -1875,3 +1875,57 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
})
}
}
// Regression test showing race between compact and append.
func TestHeadCompactionRace(t *testing.T) {
db, closeFn := openTestDB(t, &Options{
RetentionDuration: 100000000,
NoLockfile: true,
MinBlockDuration: 1000000,
MaxBlockDuration: 1000000,
}, []int64{1000000})
t.Cleanup(func() {
closeFn()
testutil.Ok(t, db.Close())
})
head := db.Head()
app := head.Appender()
_, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v"}}, 10, 10)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
var wg sync.WaitGroup
// Simulate racy gc vs append
testSimulation1.Lock()
testSimulation2.Lock()
wg.Add(1)
go func() {
defer wg.Done()
testSimulation1.Lock()
testSimulation1.Unlock()
app := head.Appender()
_, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v2"}}, 10, 10)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}()
lockThings = true
head.gc()
wg.Wait()
r, err := db.head.Index()
testutil.Ok(t, err)
all := r.Symbols()
var s []string
for all.Next() {
s = append(s, all.At())
}
testutil.Ok(t, all.Err())
testutil.Equals(t, []string{"", "n", "v2"}, s)
}