TSDB head: fix race between AppendHistogram and Commit

Move writing memSeries lastHistogramValue and lastFloatHistogramValue
after series creation under lock.

The resulting code isn't totally correct in the sense that we're setting
these values before Commit() , so they might be overwritten/rolled back
later.

Also Append of stale sample checks the values without lock, so there's
still a potential race.

The correct solution would be to set these only in Commit() which we
actually do, but then Commit() would also need to process samples in
order and not floats first, then histograms, then float histograms - which
leads to not knowing what stale marker to write for histograms.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-10-10 16:42:27 +02:00
parent 631fadc4ca
commit bb70370d72

View file

@ -339,13 +339,17 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, err = a.getOrCreate(lset)
s, _, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
}
if value.IsStaleNaN(v) {
// This is not thread safe as we should be holding the lock for "s".
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we could do this conversion
// in commit. This code should move into Commit().
switch {
case s.lastHistogramValue != nil:
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
@ -402,7 +406,7 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, err = a.getOrCreate(lset)
s, _, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
@ -432,20 +436,18 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return nil, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
return nil, false, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return nil, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
var err error
s, created, err := a.head.getOrCreate(lset.Hash(), lset)
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
if err != nil {
return nil, err
return nil, false, err
}
if created {
a.series = append(a.series, record.RefSeries{
@ -453,7 +455,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
Labels: lset,
})
}
return s, nil
return s, created, nil
}
// appendable checks whether the given sample is valid for appending to the series. (if we return false and no error)
@ -652,41 +654,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
}
}
var created bool
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
var err error
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
s, created, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
if created {
switch {
case h != nil:
s.lastHistogramValue = &histogram.Histogram{}
case fh != nil:
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
}
switch {
case h != nil:
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = &histogram.Histogram{}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
@ -716,6 +704,14 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
a.histogramSeries = append(a.histogramSeries, s)
case fh != nil:
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
@ -763,42 +759,29 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
if ct >= t {
return 0, storage.ErrCTNewerThanSample
}
var created bool
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
var err error
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
s, created, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
if created {
switch {
case h != nil:
s.lastHistogramValue = &histogram.Histogram{}
case fh != nil:
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
}
switch {
case h != nil:
zeroHistogram := &histogram.Histogram{}
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = zeroHistogram
}
// Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true)
@ -825,6 +808,14 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{}
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = zeroFloatHistogram
}
// Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) // OOO is not allowed for CTZeroSamples.