diff --git a/tsdb/head_append.go b/tsdb/head_append.go index b385758cac..adfd5d4bf7 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -339,13 +339,17 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } } if value.IsStaleNaN(v) { + // This is not thread safe as we should be holding the lock for "s". + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we could do this conversion + // in commit. This code should move into Commit(). switch { case s.lastHistogramValue != nil: return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) @@ -402,7 +406,7 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -432,20 +436,18 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab return storage.SeriesRef(s.ref), nil } -func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { +func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) { // Ensure no empty labels have gotten through. lset = lset.WithoutEmpty() if lset.IsEmpty() { - return nil, fmt.Errorf("empty labelset: %w", ErrInvalidSample) + return nil, false, fmt.Errorf("empty labelset: %w", ErrInvalidSample) } if l, dup := lset.HasDuplicateLabelNames(); dup { - return nil, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) + return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) } - var created bool - var err error - s, created, err := a.head.getOrCreate(lset.Hash(), lset) + s, created, err = a.head.getOrCreate(lset.Hash(), lset) if err != nil { - return nil, err + return nil, false, err } if created { a.series = append(a.series, record.RefSeries{ @@ -453,7 +455,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { Labels: lset, }) } - return s, nil + return s, created, nil } // appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) @@ -652,41 +654,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } + var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - if lset.IsEmpty() { - return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample) - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) - } - - var created bool var err error - s, created, err = a.head.getOrCreate(lset.Hash(), lset) + s, created, err = a.getOrCreate(lset) if err != nil { return 0, err } - if created { - switch { - case h != nil: - s.lastHistogramValue = &histogram.Histogram{} - case fh != nil: - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } } switch { case h != nil: s.Lock() + + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we would not need to do this. + // This whole "if" should be removed. + if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.lastHistogramValue = &histogram.Histogram{} + } + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) @@ -716,6 +704,14 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels a.histogramSeries = append(a.histogramSeries, s) case fh != nil: s.Lock() + + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we would not need to do this. + // This whole "if" should be removed. + if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.lastFloatHistogramValue = &histogram.FloatHistogram{} + } + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) @@ -763,42 +759,29 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l if ct >= t { return 0, storage.ErrCTNewerThanSample } + + var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - if lset.IsEmpty() { - return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample) - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) - } - - var created bool var err error - s, created, err = a.head.getOrCreate(lset.Hash(), lset) + s, created, err = a.getOrCreate(lset) if err != nil { return 0, err } - if created { - switch { - case h != nil: - s.lastHistogramValue = &histogram.Histogram{} - case fh != nil: - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } } switch { case h != nil: zeroHistogram := &histogram.Histogram{} s.Lock() + + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we would not need to do this. + // This whole "if" should be removed. + if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.lastHistogramValue = zeroHistogram + } + // Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed. // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) @@ -825,6 +808,14 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l case fh != nil: zeroFloatHistogram := &histogram.FloatHistogram{} s.Lock() + + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we would not need to do this. + // This whole "if" should be removed. + if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.lastFloatHistogramValue = zeroFloatHistogram + } + // Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed. // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) // OOO is not allowed for CTZeroSamples. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 19dcc1f080..671e85cd78 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6524,3 +6524,60 @@ func (c *countSeriesLifecycleCallback) PostCreation(labels.Labels) { c.crea func (c *countSeriesLifecycleCallback) PostDeletion(s map[chunks.HeadSeriesRef]labels.Labels) { c.deleted.Add(int64(len(s))) } + +// Regression test for data race https://github.com/prometheus/prometheus/issues/15139. +func TestHeadAppendHistogramAndCommitConcurrency(t *testing.T) { + h := tsdbutil.GenerateTestHistogram(1) + fh := tsdbutil.GenerateTestFloatHistogram(1) + + testCases := map[string]func(storage.Appender, int) error{ + "integer histogram": func(app storage.Appender, i int) error { + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar", "serial", strconv.Itoa(i)), 1, h, nil) + return err + }, + "float histogram": func(app storage.Appender, i int) error { + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar", "serial", strconv.Itoa(i)), 1, nil, fh) + return err + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + testHeadAppendHistogramAndCommitConcurrency(t, tc) + }) + } +} + +func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(storage.Appender, int) error) { + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + defer func() { + require.NoError(t, head.Close()) + }() + + wg := sync.WaitGroup{} + wg.Add(2) + + // How this works: Commit() should be atomic, thus one of the commits will + // be first and the other second. The first commit will create a new series + // and write a sample. The second commit will see an exact duplicate sample + // which it should ignore. Unless there's a race that causes the + // memSeries.lastHistogram to be corrupt and fail the duplicate check. + go func() { + defer wg.Done() + for i := 0; i < 10000; i++ { + app := head.Appender(context.Background()) + require.NoError(t, appendFn(app, i)) + require.NoError(t, app.Commit()) + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 10000; i++ { + app := head.Appender(context.Background()) + require.NoError(t, appendFn(app, i)) + require.NoError(t, app.Commit()) + } + }() + + wg.Wait() +}