mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #15142 from krajorama/fix-appendhistogram-race
bugfix: data race in head.Appender.AppendHistogram and Commit
This commit is contained in:
commit
b8867f8ead
|
@ -339,13 +339,17 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
|||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
var err error
|
||||
s, err = a.getOrCreate(lset)
|
||||
s, _, err = a.getOrCreate(lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(v) {
|
||||
// This is not thread safe as we should be holding the lock for "s".
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we could do this conversion
|
||||
// in commit. This code should move into Commit().
|
||||
switch {
|
||||
case s.lastHistogramValue != nil:
|
||||
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
|
||||
|
@ -402,7 +406,7 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
|
|||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
var err error
|
||||
s, err = a.getOrCreate(lset)
|
||||
s, _, err = a.getOrCreate(lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -432,20 +436,18 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
|
|||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
|
||||
func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) {
|
||||
// Ensure no empty labels have gotten through.
|
||||
lset = lset.WithoutEmpty()
|
||||
if lset.IsEmpty() {
|
||||
return nil, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
|
||||
return nil, false, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
|
||||
}
|
||||
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||
return nil, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
|
||||
return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
|
||||
}
|
||||
var created bool
|
||||
var err error
|
||||
s, created, err := a.head.getOrCreate(lset.Hash(), lset)
|
||||
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
if created {
|
||||
a.series = append(a.series, record.RefSeries{
|
||||
|
@ -453,7 +455,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
|
|||
Labels: lset,
|
||||
})
|
||||
}
|
||||
return s, nil
|
||||
return s, created, nil
|
||||
}
|
||||
|
||||
// appendable checks whether the given sample is valid for appending to the series. (if we return false and no error)
|
||||
|
@ -652,41 +654,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
|||
}
|
||||
}
|
||||
|
||||
var created bool
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
// Ensure no empty labels have gotten through.
|
||||
lset = lset.WithoutEmpty()
|
||||
if lset.IsEmpty() {
|
||||
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
|
||||
}
|
||||
|
||||
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
|
||||
}
|
||||
|
||||
var created bool
|
||||
var err error
|
||||
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
|
||||
s, created, err = a.getOrCreate(lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if created {
|
||||
switch {
|
||||
case h != nil:
|
||||
s.lastHistogramValue = &histogram.Histogram{}
|
||||
case fh != nil:
|
||||
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||
}
|
||||
a.series = append(a.series, record.RefSeries{
|
||||
Ref: s.ref,
|
||||
Labels: lset,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case h != nil:
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastHistogramValue = &histogram.Histogram{}
|
||||
}
|
||||
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
|
||||
|
@ -716,6 +704,14 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
|||
a.histogramSeries = append(a.histogramSeries, s)
|
||||
case fh != nil:
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||
}
|
||||
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
_, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
|
||||
|
@ -763,42 +759,29 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
|||
if ct >= t {
|
||||
return 0, storage.ErrCTNewerThanSample
|
||||
}
|
||||
|
||||
var created bool
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
// Ensure no empty labels have gotten through.
|
||||
lset = lset.WithoutEmpty()
|
||||
if lset.IsEmpty() {
|
||||
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
|
||||
}
|
||||
|
||||
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
|
||||
}
|
||||
|
||||
var created bool
|
||||
var err error
|
||||
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
|
||||
s, created, err = a.getOrCreate(lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if created {
|
||||
switch {
|
||||
case h != nil:
|
||||
s.lastHistogramValue = &histogram.Histogram{}
|
||||
case fh != nil:
|
||||
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||
}
|
||||
a.series = append(a.series, record.RefSeries{
|
||||
Ref: s.ref,
|
||||
Labels: lset,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case h != nil:
|
||||
zeroHistogram := &histogram.Histogram{}
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastHistogramValue = zeroHistogram
|
||||
}
|
||||
|
||||
// Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
|
||||
// We set it to true to make this implementation as close as possible to the float implementation.
|
||||
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true)
|
||||
|
@ -825,6 +808,14 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
|||
case fh != nil:
|
||||
zeroFloatHistogram := &histogram.FloatHistogram{}
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastFloatHistogramValue = zeroFloatHistogram
|
||||
}
|
||||
|
||||
// Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
|
||||
// We set it to true to make this implementation as close as possible to the float implementation.
|
||||
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) // OOO is not allowed for CTZeroSamples.
|
||||
|
|
|
@ -6524,3 +6524,60 @@ func (c *countSeriesLifecycleCallback) PostCreation(labels.Labels) { c.crea
|
|||
func (c *countSeriesLifecycleCallback) PostDeletion(s map[chunks.HeadSeriesRef]labels.Labels) {
|
||||
c.deleted.Add(int64(len(s)))
|
||||
}
|
||||
|
||||
// Regression test for data race https://github.com/prometheus/prometheus/issues/15139.
|
||||
func TestHeadAppendHistogramAndCommitConcurrency(t *testing.T) {
|
||||
h := tsdbutil.GenerateTestHistogram(1)
|
||||
fh := tsdbutil.GenerateTestFloatHistogram(1)
|
||||
|
||||
testCases := map[string]func(storage.Appender, int) error{
|
||||
"integer histogram": func(app storage.Appender, i int) error {
|
||||
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar", "serial", strconv.Itoa(i)), 1, h, nil)
|
||||
return err
|
||||
},
|
||||
"float histogram": func(app storage.Appender, i int) error {
|
||||
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar", "serial", strconv.Itoa(i)), 1, nil, fh)
|
||||
return err
|
||||
},
|
||||
}
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testHeadAppendHistogramAndCommitConcurrency(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(storage.Appender, int) error) {
|
||||
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||
defer func() {
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
// How this works: Commit() should be atomic, thus one of the commits will
|
||||
// be first and the other second. The first commit will create a new series
|
||||
// and write a sample. The second commit will see an exact duplicate sample
|
||||
// which it should ignore. Unless there's a race that causes the
|
||||
// memSeries.lastHistogram to be corrupt and fail the duplicate check.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 10000; i++ {
|
||||
app := head.Appender(context.Background())
|
||||
require.NoError(t, appendFn(app, i))
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 10000; i++ {
|
||||
app := head.Appender(context.Background())
|
||||
require.NoError(t, appendFn(app, i))
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue