mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Do not merge: Test showing race of compact vs append.
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
492061b24c
commit
c2cfa656d7
|
@ -656,7 +656,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
defer func() {
|
||||
var merr tsdb_errors.MultiError
|
||||
merr.Add(err)
|
||||
merr.Add(closeAll(closers))
|
||||
if cerr := closeAll(closers); cerr != nil {
|
||||
merr.Add(errors.Wrap(err, "close"))
|
||||
}
|
||||
err = merr.Err()
|
||||
c.metrics.populatingBlocks.Set(0)
|
||||
}()
|
||||
|
@ -707,7 +709,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
all = indexr.SortedPostings(all)
|
||||
|
||||
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
|
||||
|
||||
syms := indexr.Symbols()
|
||||
// When we are here unblock append as we already got postings with extra one, and symbols
|
||||
// with missing symbol for new posting.
|
||||
testSimulation1.Unlock()
|
||||
|
||||
if i == 0 {
|
||||
set = s
|
||||
|
|
|
@ -46,6 +46,10 @@ var (
|
|||
ErrInvalidSample = errors.New("invalid sample")
|
||||
)
|
||||
|
||||
// TEST!
|
||||
var testSimulation1 sync.Mutex
|
||||
var testSimulation2 sync.Mutex
|
||||
|
||||
// Head handles reads and writes of time series data within a time window.
|
||||
type Head struct {
|
||||
// Keep all 64bit atomically accessed variables at the top of this struct.
|
||||
|
@ -1694,6 +1698,9 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
|
|||
|
||||
h.postings.Add(id, lset)
|
||||
|
||||
testSimulation1.Lock()
|
||||
testSimulation1.Unlock()
|
||||
|
||||
h.symMtx.Lock()
|
||||
defer h.symMtx.Unlock()
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
@ -24,6 +25,7 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
|
@ -1875,3 +1877,58 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Regression test showing race between compact and append.
|
||||
func TestHeadCompactionRace(t *testing.T) {
|
||||
db, closeFn := openTestDB(t, &Options{
|
||||
RetentionDuration: 100000000,
|
||||
NoLockfile: true,
|
||||
MinBlockDuration: 1000000,
|
||||
MaxBlockDuration: 1000000,
|
||||
}, []int64{1000000})
|
||||
t.Cleanup(func() {
|
||||
closeFn()
|
||||
testutil.Ok(t, db.Close())
|
||||
})
|
||||
|
||||
head := db.Head()
|
||||
|
||||
app := head.Appender()
|
||||
_, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v"}}, 10, 10)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Simulate racy append where posting is added but symbol not. Symbol is added only after symMtx.Lock.
|
||||
testSimulation1.Lock()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
app := head.Appender()
|
||||
_, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v2"}}, 10, 10)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
}()
|
||||
|
||||
// Wait until posting from append will be added indicating locked append state.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timeout")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
if len(head.postings.SortedKeys()) == 3 {
|
||||
// Locked on 3rd posting (all, n=v, n=v2).
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Compaction fails reproducing https://github.com/prometheus/prometheus/issues/7373
|
||||
testutil.Ok(t, db.CompactHead(NewRangeHead(head, 0, 10000000)))
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue