Add support for ingesting OOO native histograms

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
Co-authored by: Jeanette Tan <jeanette.tan@grafana.com>:
Co-authored by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>:
Co-authored by: Fiona Liao <fiona.liao@grafana.com>:
This commit is contained in:
Carrie Edwards 2024-07-31 10:20:07 -07:00
parent 49e795edd8
commit dd65c3d3fc
3 changed files with 472 additions and 90 deletions

View file

@ -321,7 +321,7 @@ type headAppender struct {
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append. // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append.
// If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work. // Fail fast if OOO is disabled.
if a.oooTimeWindow == 0 && t < a.minValidTime { if a.oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc() a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return 0, storage.ErrOutOfBounds return 0, storage.ErrOutOfBounds
@ -489,46 +489,94 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
return false, headMaxt - t, storage.ErrOutOfOrderSample return false, headMaxt - t, storage.ErrOutOfOrderSample
} }
// appendableHistogram checks whether the given histogram is valid for appending to the series. // appendableHistogram checks whether the given histogram sample is valid for appending to the series. (if we return false and no error)
func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { // The sample belongs to the out of order chunk if we return true and no error.
if s.headChunks == nil { // An error signifies the sample cannot be handled.
return nil func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
if s.headChunks == nil {
// The series has no sample and was freshly created.
return false, 0, nil
}
msMaxt := s.maxTime()
if t > msMaxt {
return false, 0, nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if !h.Equals(s.lastHistogramValue) {
return false, 0, storage.ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
} }
if t > s.headChunks.maxTime { // The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
return nil if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
} if !oooHistogramsEnabled {
if t < s.headChunks.maxTime { return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled
return storage.ErrOutOfOrderSample }
return true, headMaxt - t, nil
} }
// We are allowing exact duplicates as we can encounter them in valid cases // The sample cannot go in both in-order and out-of-order chunk.
// like federation and erroring out at that time would be extremely noisy. if oooTimeWindow > 0 {
if !h.Equals(s.lastHistogramValue) { return true, headMaxt - t, storage.ErrTooOldSample
return storage.ErrDuplicateSampleForTimestamp
} }
return nil if t < minValidTime {
return false, headMaxt - t, storage.ErrOutOfBounds
}
return false, headMaxt - t, storage.ErrOutOfOrderSample
} }
// appendableFloatHistogram checks whether the given float histogram is valid for appending to the series. // appendableFloatHistogram checks whether the given float histogram sample is valid for appending to the series. (if we return false and no error)
func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error { // The sample belongs to the out of order chunk if we return true and no error.
if s.headChunks == nil { // An error signifies the sample cannot be handled.
return nil func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
if s.headChunks == nil {
// The series has no sample and was freshly created.
return false, 0, nil
}
msMaxt := s.maxTime()
if t > msMaxt {
return false, 0, nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if !fh.Equals(s.lastFloatHistogramValue) {
return false, 0, storage.ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
} }
if t > s.headChunks.maxTime { // The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
return nil if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
} if !oooHistogramsEnabled {
if t < s.headChunks.maxTime { return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled
return storage.ErrOutOfOrderSample }
return true, headMaxt - t, nil
} }
// We are allowing exact duplicates as we can encounter them in valid cases // The sample cannot go in both in-order and out-of-order chunk.
// like federation and erroring out at that time would be extremely noisy. if oooTimeWindow > 0 {
if !fh.Equals(s.lastFloatHistogramValue) { return true, headMaxt - t, storage.ErrTooOldSample
return storage.ErrDuplicateSampleForTimestamp
} }
return nil if t < minValidTime {
return false, headMaxt - t, storage.ErrOutOfBounds
}
return false, headMaxt - t, storage.ErrOutOfOrderSample
} }
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
@ -573,10 +621,16 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return 0, storage.ErrNativeHistogramsDisabled return 0, storage.ErrNativeHistogramsDisabled
} }
if t < a.minValidTime { // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the histogram sample is an in-order append.
// Fail fast if OOO is disabled.
if a.oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
return 0, storage.ErrOutOfBounds return 0, storage.ErrOutOfBounds
} }
// Also fail fast if OOO is enabled, but OOO native histogram ingestion is disabled.
if a.oooTimeWindow > 0 && t < a.minValidTime && !a.head.opts.EnableOOONativeHistograms.Load() {
return 0, storage.ErrOOONativeHistogramsDisabled
}
if h != nil { if h != nil {
if err := h.Validate(); err != nil { if err := h.Validate(); err != nil {
@ -625,15 +679,25 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
switch { switch {
case h != nil: case h != nil:
s.Lock() s.Lock()
if err := s.appendableHistogram(t, h); err != nil { // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
s.Unlock() // to skip that sample from the WAL and write only in the WBL.
if errors.Is(err, storage.ErrOutOfOrderSample) { _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if err != nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
switch {
case errors.Is(err, storage.ErrOutOfOrderSample):
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
case errors.Is(err, storage.ErrTooOldSample):
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
} }
return 0, err return 0, err
} }
s.pendingCommit = true
s.Unlock()
a.histograms = append(a.histograms, record.RefHistogramSample{ a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref, Ref: s.ref,
T: t, T: t,
@ -642,15 +706,25 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
a.histogramSeries = append(a.histogramSeries, s) a.histogramSeries = append(a.histogramSeries, s)
case fh != nil: case fh != nil:
s.Lock() s.Lock()
if err := s.appendableFloatHistogram(t, fh); err != nil { // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
s.Unlock() // to skip that sample from the WAL and write only in the WBL.
if errors.Is(err, storage.ErrOutOfOrderSample) { _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
switch {
case errors.Is(err, storage.ErrOutOfOrderSample):
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
case errors.Is(err, storage.ErrTooOldSample):
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
} }
return 0, err return 0, err
} }
s.pendingCommit = true
s.Unlock()
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref, Ref: s.ref,
T: t, T: t,
@ -837,20 +911,24 @@ func (a *headAppender) Commit() (err error) {
floatsAppended = len(a.samples) floatsAppended = len(a.samples)
histogramsAppended = len(a.histograms) + len(a.floatHistograms) histogramsAppended = len(a.histograms) + len(a.floatHistograms)
// number of samples out of order but accepted: with ooo enabled and within time window // number of samples out of order but accepted: with ooo enabled and within time window
floatOOOAccepted int oooFloatsAccepted int
oooHistogramAccepted int
// number of samples rejected due to: out of order but OOO support disabled. // number of samples rejected due to: out of order but OOO support disabled.
floatOOORejected int floatOOORejected int
histoOOORejected int histoOOORejected int
// number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window) // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
floatTooOldRejected int floatTooOldRejected int
histoTooOldRejected int
// number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled) // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
floatOOBRejected int floatOOBRejected int
histoOOBRejected int
inOrderMint int64 = math.MaxInt64 inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64 inOrderMaxt int64 = math.MinInt64
oooMinT int64 = math.MaxInt64 oooMinT int64 = math.MaxInt64
oooMaxT int64 = math.MinInt64 oooMaxT int64 = math.MinInt64
wblSamples []record.RefSample wblSamples []record.RefSample
wblHistograms []record.RefHistogramSample
wblFloatHistograms []record.RefFloatHistogramSample
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
oooMmapMarkersCount int oooMmapMarkersCount int
oooRecords [][]byte oooRecords [][]byte
@ -899,8 +977,18 @@ func (a *headAppender) Commit() (err error) {
r := enc.Samples(wblSamples, a.head.getBytesBuffer()) r := enc.Samples(wblSamples, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r) oooRecords = append(oooRecords, r)
} }
if len(wblHistograms) > 0 {
r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
if len(wblFloatHistograms) > 0 {
r := enc.FloatHistogramSamples(wblFloatHistograms, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
wblSamples = nil wblSamples = nil
wblHistograms = nil
wblFloatHistograms = nil
oooMmapMarkers = nil oooMmapMarkers = nil
} }
for i, s := range a.samples { for i, s := range a.samples {
@ -933,7 +1021,7 @@ func (a *headAppender) Commit() (err error) {
// Sample is OOO and OOO handling is enabled // Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance. // and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated { if chunkCreated {
r, ok := oooMmapMarkers[series.ref] r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil { if !ok || r != nil {
@ -966,7 +1054,7 @@ func (a *headAppender) Commit() (err error) {
if s.T > oooMaxT { if s.T > oooMaxT {
oooMaxT = s.T oooMaxT = s.T
} }
floatOOOAccepted++ oooFloatsAccepted++
} else { } else {
// Sample is an exact duplicate of the last sample. // Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
@ -1002,51 +1090,194 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.histograms { for i, s := range a.histograms {
series = a.histogramSeries[i] series = a.histogramSeries[i]
series.Lock() series.Lock()
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if ok { oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if s.T < inOrderMint { switch {
inOrderMint = s.T case err == nil:
} // Do nothing.
if s.T > inOrderMaxt { case errors.Is(err, storage.ErrOutOfOrderSample):
inOrderMaxt = s.T
}
} else {
histogramsAppended-- histogramsAppended--
histoOOORejected++ histoOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
histogramsAppended--
histoOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
histogramsAppended--
histoTooOldRejected++
default:
histogramsAppended--
} }
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblHistograms = append(wblHistograms, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
histogramsAppended--
}
default:
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsAppended--
histoOOORejected++
}
}
if chunkCreated { if chunkCreated {
a.head.metrics.chunks.Inc() a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc() a.head.metrics.chunksCreated.Inc()
} }
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
} }
histogramsAppended += len(a.floatHistograms)
for i, s := range a.floatHistograms { for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i] series = a.floatHistogramSeries[i]
series.Lock() series.Lock()
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if ok { oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if s.T < inOrderMint { switch {
inOrderMint = s.T case err == nil:
} // Do nothing.
if s.T > inOrderMaxt { case errors.Is(err, storage.ErrOutOfOrderSample):
inOrderMaxt = s.T
}
} else {
histogramsAppended-- histogramsAppended--
histoOOORejected++ histoOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
histogramsAppended--
histoOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
histogramsAppended--
histoTooOldRejected++
default:
histogramsAppended--
} }
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblFloatHistograms = append(wblFloatHistograms, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
histogramsAppended--
}
default:
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsAppended--
histoOOORejected++
}
}
if chunkCreated { if chunkCreated {
a.head.metrics.chunks.Inc() a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc() a.head.metrics.chunksCreated.Inc()
} }
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
} }
for i, m := range a.metadata { for i, m := range a.metadata {
@ -1062,7 +1293,8 @@ func (a *headAppender) Commit() (err error) {
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected)) a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted)) a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(oooHistogramAccepted))
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT) a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT)
@ -1080,7 +1312,7 @@ func (a *headAppender) Commit() (err error) {
} }
// insert is like append, except it inserts. Used for OOO samples. // insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil { if s.ooo == nil {
s.ooo = &memSeriesOOOFields{} s.ooo = &memSeriesOOOFields{}
} }
@ -1091,7 +1323,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
chunkCreated = true chunkCreated = true
} }
ok := c.chunk.Insert(t, v) ok := c.chunk.Insert(t, v, h, fh)
if ok { if ok {
if chunkCreated || t < c.minTime { if chunkCreated || t < c.minTime {
c.minTime = t c.minTime = t
@ -1464,9 +1696,9 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
handleChunkWriteError(err) handleChunkWriteError(err)
return nil return nil
} }
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1) chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks))
for _, memchunk := range chks { for _, memchunk := range chks {
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError) chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
chunkRefs = append(chunkRefs, chunkRef) chunkRefs = append(chunkRefs, chunkRef)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef, ref: chunkRef,

View file

@ -649,9 +649,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
} }
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, m-map markers, that referenced a series we don't know about // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
// for error reporting. // for error reporting.
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
lastSeq, lastOff := lastMmapRef.Unpack() lastSeq, lastOff := lastMmapRef.Unpack()
// Start workers that each process samples for a partition of the series ID space. // Start workers that each process samples for a partition of the series ID space.
@ -660,8 +660,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
concurrency = h.opts.WALReplayConcurrency concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, concurrency) processors = make([]wblSubsetProcessor, concurrency)
dec = record.NewDecoder(syms) dec record.Decoder
shards = make([][]record.RefSample, concurrency) shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
decodedCh = make(chan interface{}, 10) decodedCh = make(chan interface{}, 10)
decodeErr error decodeErr error
@ -675,6 +676,16 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return []record.RefMmapMarker{} return []record.RefMmapMarker{}
}, },
} }
histogramSamplesPool = sync.Pool{
New: func() interface{} {
return []record.RefHistogramSample{}
},
}
floatHistogramSamplesPool = sync.Pool{
New: func() interface{} {
return []record.RefFloatHistogramSample{}
},
}
) )
defer func() { defer func() {
@ -695,8 +706,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
processors[i].setup() processors[i].setup()
go func(wp *wblSubsetProcessor) { go func(wp *wblSubsetProcessor) {
unknown := wp.processWBLSamples(h) unknown, unknownHistograms := wp.processWBLSamples(h)
unknownRefs.Add(unknown) unknownRefs.Add(unknown)
unknownHistogramRefs.Add(unknownHistograms)
wg.Done() wg.Done()
}(&processors[i]) }(&processors[i])
} }
@ -730,6 +742,30 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return return
} }
decodedCh <- markers decodedCh <- markers
case record.HistogramSamples:
hists := histogramSamplesPool.Get().([]record.RefHistogramSample)[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode histograms: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- hists
case record.FloatHistogramSamples:
hists := floatHistogramSamplesPool.Get().([]record.RefFloatHistogramSample)[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode float histograms: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- hists
default: default:
// Noop. // Noop.
} }
@ -794,6 +830,70 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
idx := uint64(ms.ref) % uint64(concurrency) idx := uint64(ms.ref) % uint64(concurrency)
processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms} processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms}
} }
case []record.RefHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf()
}
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
}
for i := 0; i < concurrency; i++ {
if len(histogramShards[i]) > 0 {
processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil
}
}
samples = samples[m:]
}
histogramSamplesPool.Put(v) //nolint:staticcheck
case []record.RefFloatHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf()
}
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
}
for i := 0; i < concurrency; i++ {
if len(histogramShards[i]) > 0 {
processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil
}
}
samples = samples[m:]
}
floatHistogramSamplesPool.Put(v) //nolint:staticcheck
default: default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d)) panic(fmt.Errorf("unexpected decodedCh type: %T", d))
} }
@ -836,17 +936,20 @@ func (e errLoadWbl) Unwrap() error {
} }
type wblSubsetProcessor struct { type wblSubsetProcessor struct {
input chan wblSubsetProcessorInputItem input chan wblSubsetProcessorInputItem
output chan []record.RefSample output chan []record.RefSample
histogramsOutput chan []histogramRecord
} }
type wblSubsetProcessorInputItem struct { type wblSubsetProcessorInputItem struct {
mmappedSeries *memSeries mmappedSeries *memSeries
samples []record.RefSample samples []record.RefSample
histogramSamples []histogramRecord
} }
func (wp *wblSubsetProcessor) setup() { func (wp *wblSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300) wp.output = make(chan []record.RefSample, 300)
wp.histogramsOutput = make(chan []histogramRecord, 300)
wp.input = make(chan wblSubsetProcessorInputItem, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300)
} }
@ -854,6 +957,8 @@ func (wp *wblSubsetProcessor) closeAndDrain() {
close(wp.input) close(wp.input)
for range wp.output { for range wp.output {
} }
for range wp.histogramsOutput {
}
} }
// If there is a buffer in the output chan, return it for reuse, otherwise return nil. // If there is a buffer in the output chan, return it for reuse, otherwise return nil.
@ -866,10 +971,21 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
return nil return nil
} }
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord {
select {
case buf := <-wp.histogramsOutput:
return buf[:0]
default:
}
return nil
}
// processWBLSamples adds the samples it receives to the head and passes // processWBLSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse. // the buffer received to an output channel for reuse.
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) {
defer close(wp.output) defer close(wp.output)
defer close(wp.histogramsOutput)
oooCapMax := h.opts.OutOfOrderCapMax.Load() oooCapMax := h.opts.OutOfOrderCapMax.Load()
// We don't check for minValidTime for ooo samples. // We don't check for minValidTime for ooo samples.
@ -890,7 +1006,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
unknownRefs++ unknownRefs++
continue continue
} }
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax) ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax)
if chunkCreated { if chunkCreated {
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
@ -908,11 +1024,44 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
case wp.output <- in.samples: case wp.output <- in.samples:
default: default:
} }
for _, s := range in.histogramSamples {
ms := h.series.getByID(s.ref)
if ms == nil {
unknownHistogramRefs++
continue
}
if s.t <= ms.mmMaxTime {
continue
}
var chunkCreated bool
var ok bool
if s.h != nil {
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax)
} else {
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax)
}
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if ok {
if s.t > maxt {
maxt = s.t
}
if s.t < mint {
mint = s.t
}
}
}
select {
case wp.histogramsOutput <- in.histogramSamples:
default:
}
} }
h.updateMinOOOMaxOOOTime(mint, maxt) h.updateMinOOOMaxOOOTime(mint, maxt)
return unknownRefs return unknownRefs, unknownHistogramRefs
} }
const ( const (

View file

@ -15,6 +15,7 @@ package tsdb
import ( import (
"fmt" "fmt"
"github.com/prometheus/prometheus/model/histogram"
"sort" "sort"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
@ -39,13 +40,13 @@ func NewOOOChunk() *OOOChunk {
// Insert inserts the sample such that order is maintained. // Insert inserts the sample such that order is maintained.
// Returns false if insert was not possible due to the same timestamp already existing. // Returns false if insert was not possible due to the same timestamp already existing.
func (o *OOOChunk) Insert(t int64, v float64) bool { func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
// Although out-of-order samples can be out-of-order amongst themselves, we // Although out-of-order samples can be out-of-order amongst themselves, we
// are opinionated and expect them to be usually in-order meaning we could // are opinionated and expect them to be usually in-order meaning we could
// try to append at the end first if the new timestamp is higher than the // try to append at the end first if the new timestamp is higher than the
// last known timestamp. // last known timestamp.
if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t {
o.samples = append(o.samples, sample{t, v, nil, nil}) o.samples = append(o.samples, sample{t, v, h, fh})
return true return true
} }
@ -54,7 +55,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
if i >= len(o.samples) { if i >= len(o.samples) {
// none found. append it at the end // none found. append it at the end
o.samples = append(o.samples, sample{t, v, nil, nil}) o.samples = append(o.samples, sample{t, v, h, fh})
return true return true
} }
@ -66,7 +67,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
// Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway.
o.samples = append(o.samples, sample{}) o.samples = append(o.samples, sample{})
copy(o.samples[i+1:], o.samples[i:]) copy(o.samples[i+1:], o.samples[i:])
o.samples[i] = sample{t, v, nil, nil} o.samples[i] = sample{t, v, h, fh}
return true return true
} }