diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8d66d1e818..9a865da812 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -321,7 +321,7 @@ type headAppender struct { func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append. - // If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work. + // Fail fast if OOO is disabled. if a.oooTimeWindow == 0 && t < a.minValidTime { a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc() return 0, storage.ErrOutOfBounds @@ -489,46 +489,94 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi return false, headMaxt - t, storage.ErrOutOfOrderSample } -// appendableHistogram checks whether the given histogram is valid for appending to the series. -func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { - if s.headChunks == nil { - return nil +// appendableHistogram checks whether the given histogram sample is valid for appending to the series. (if we return false and no error) +// The sample belongs to the out of order chunk if we return true and no error. +// An error signifies the sample cannot be handled. +func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) { + // Check if we can append in the in-order chunk. + if t >= minValidTime { + if s.headChunks == nil { + // The series has no sample and was freshly created. + return false, 0, nil + } + msMaxt := s.maxTime() + if t > msMaxt { + return false, 0, nil + } + if t == msMaxt { + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + // This only checks against the latest in-order sample. + // The OOO headchunk has its own method to detect these duplicates. + if !h.Equals(s.lastHistogramValue) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. + return false, 0, nil + } } - if t > s.headChunks.maxTime { - return nil - } - if t < s.headChunks.maxTime { - return storage.ErrOutOfOrderSample + // The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk. + if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow { + if !oooHistogramsEnabled { + return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled + } + return true, headMaxt - t, nil } - // We are allowing exact duplicates as we can encounter them in valid cases - // like federation and erroring out at that time would be extremely noisy. - if !h.Equals(s.lastHistogramValue) { - return storage.ErrDuplicateSampleForTimestamp + // The sample cannot go in both in-order and out-of-order chunk. + if oooTimeWindow > 0 { + return true, headMaxt - t, storage.ErrTooOldSample } - return nil + if t < minValidTime { + return false, headMaxt - t, storage.ErrOutOfBounds + } + return false, headMaxt - t, storage.ErrOutOfOrderSample } -// appendableFloatHistogram checks whether the given float histogram is valid for appending to the series. -func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error { - if s.headChunks == nil { - return nil +// appendableFloatHistogram checks whether the given float histogram sample is valid for appending to the series. (if we return false and no error) +// The sample belongs to the out of order chunk if we return true and no error. +// An error signifies the sample cannot be handled. +func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) { + // Check if we can append in the in-order chunk. + if t >= minValidTime { + if s.headChunks == nil { + // The series has no sample and was freshly created. + return false, 0, nil + } + msMaxt := s.maxTime() + if t > msMaxt { + return false, 0, nil + } + if t == msMaxt { + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + // This only checks against the latest in-order sample. + // The OOO headchunk has its own method to detect these duplicates. + if !fh.Equals(s.lastFloatHistogramValue) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. + return false, 0, nil + } } - if t > s.headChunks.maxTime { - return nil - } - if t < s.headChunks.maxTime { - return storage.ErrOutOfOrderSample + // The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk. + if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow { + if !oooHistogramsEnabled { + return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled + } + return true, headMaxt - t, nil } - // We are allowing exact duplicates as we can encounter them in valid cases - // like federation and erroring out at that time would be extremely noisy. - if !fh.Equals(s.lastFloatHistogramValue) { - return storage.ErrDuplicateSampleForTimestamp + // The sample cannot go in both in-order and out-of-order chunk. + if oooTimeWindow > 0 { + return true, headMaxt - t, storage.ErrTooOldSample } - return nil + if t < minValidTime { + return false, headMaxt - t, storage.ErrOutOfBounds + } + return false, headMaxt - t, storage.ErrOutOfOrderSample } // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't @@ -573,10 +621,16 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels return 0, storage.ErrNativeHistogramsDisabled } - if t < a.minValidTime { + // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the histogram sample is an in-order append. + // Fail fast if OOO is disabled. + if a.oooTimeWindow == 0 && t < a.minValidTime { a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() return 0, storage.ErrOutOfBounds } + // Also fail fast if OOO is enabled, but OOO native histogram ingestion is disabled. + if a.oooTimeWindow > 0 && t < a.minValidTime && !a.head.opts.EnableOOONativeHistograms.Load() { + return 0, storage.ErrOOONativeHistogramsDisabled + } if h != nil { if err := h.Validate(); err != nil { @@ -625,15 +679,25 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels switch { case h != nil: s.Lock() - if err := s.appendableHistogram(t, h); err != nil { - s.Unlock() - if errors.Is(err, storage.ErrOutOfOrderSample) { + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise + // to skip that sample from the WAL and write only in the WBL. + _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + if err != nil { + s.pendingCommit = true + } + s.Unlock() + if delta > 0 { + a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) + } + if err != nil { + switch { + case errors.Is(err, storage.ErrOutOfOrderSample): a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + case errors.Is(err, storage.ErrTooOldSample): + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() } return 0, err } - s.pendingCommit = true - s.Unlock() a.histograms = append(a.histograms, record.RefHistogramSample{ Ref: s.ref, T: t, @@ -642,15 +706,25 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels a.histogramSeries = append(a.histogramSeries, s) case fh != nil: s.Lock() - if err := s.appendableFloatHistogram(t, fh); err != nil { - s.Unlock() - if errors.Is(err, storage.ErrOutOfOrderSample) { + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise + // to skip that sample from the WAL and write only in the WBL. + _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + if err == nil { + s.pendingCommit = true + } + s.Unlock() + if delta > 0 { + a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) + } + if err != nil { + switch { + case errors.Is(err, storage.ErrOutOfOrderSample): a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + case errors.Is(err, storage.ErrTooOldSample): + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() } return 0, err } - s.pendingCommit = true - s.Unlock() a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: t, @@ -837,20 +911,24 @@ func (a *headAppender) Commit() (err error) { floatsAppended = len(a.samples) histogramsAppended = len(a.histograms) + len(a.floatHistograms) // number of samples out of order but accepted: with ooo enabled and within time window - floatOOOAccepted int + oooFloatsAccepted int + oooHistogramAccepted int // number of samples rejected due to: out of order but OOO support disabled. floatOOORejected int histoOOORejected int // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window) floatTooOldRejected int + histoTooOldRejected int // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled) - floatOOBRejected int - + floatOOBRejected int + histoOOBRejected int inOrderMint int64 = math.MaxInt64 inOrderMaxt int64 = math.MinInt64 oooMinT int64 = math.MaxInt64 oooMaxT int64 = math.MinInt64 wblSamples []record.RefSample + wblHistograms []record.RefHistogramSample + wblFloatHistograms []record.RefFloatHistogramSample oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef oooMmapMarkersCount int oooRecords [][]byte @@ -899,8 +977,18 @@ func (a *headAppender) Commit() (err error) { r := enc.Samples(wblSamples, a.head.getBytesBuffer()) oooRecords = append(oooRecords, r) } + if len(wblHistograms) > 0 { + r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer()) + oooRecords = append(oooRecords, r) + } + if len(wblFloatHistograms) > 0 { + r := enc.FloatHistogramSamples(wblFloatHistograms, a.head.getBytesBuffer()) + oooRecords = append(oooRecords, r) + } wblSamples = nil + wblHistograms = nil + wblFloatHistograms = nil oooMmapMarkers = nil } for i, s := range a.samples { @@ -933,7 +1021,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != nil { @@ -966,7 +1054,7 @@ func (a *headAppender) Commit() (err error) { if s.T > oooMaxT { oooMaxT = s.T } - floatOOOAccepted++ + oooFloatsAccepted++ } else { // Sample is an exact duplicate of the last sample. // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, @@ -1002,51 +1090,194 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.histograms { series = a.histogramSeries[i] series.Lock() - ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts) - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { + oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + switch { + case err == nil: + // Do nothing. + case errors.Is(err, storage.ErrOutOfOrderSample): histogramsAppended-- histoOOORejected++ + case errors.Is(err, storage.ErrOutOfBounds): + histogramsAppended-- + histoOOBRejected++ + case errors.Is(err, storage.ErrTooOldSample): + histogramsAppended-- + histoTooOldRejected++ + default: + histogramsAppended-- } + + var ok, chunkCreated bool + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax) + if chunkCreated { + r, ok := oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + collectOOORecords() + } + + if oooMmapMarkers == nil { + oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + oooMmapMarkers[series.ref] = mmapRefs + oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + oooMmapMarkersCount++ + } + } + if ok { + wblHistograms = append(wblHistograms, s) + if s.T < oooMinT { + oooMinT = s.T + } + if s.T > oooMaxT { + oooMaxT = s.T + } + oooHistogramAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + histogramsAppended-- + } + default: + ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts) + if ok { + if s.T < inOrderMint { + inOrderMint = s.T + } + if s.T > inOrderMaxt { + inOrderMaxt = s.T + } + } else { + histogramsAppended-- + histoOOORejected++ + } + } + if chunkCreated { a.head.metrics.chunks.Inc() a.head.metrics.chunksCreated.Inc() } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() } + histogramsAppended += len(a.floatHistograms) for i, s := range a.floatHistograms { series = a.floatHistogramSeries[i] series.Lock() - ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts) - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { + oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + switch { + case err == nil: + // Do nothing. + case errors.Is(err, storage.ErrOutOfOrderSample): histogramsAppended-- histoOOORejected++ + case errors.Is(err, storage.ErrOutOfBounds): + histogramsAppended-- + histoOOBRejected++ + case errors.Is(err, storage.ErrTooOldSample): + histogramsAppended-- + histoTooOldRejected++ + default: + histogramsAppended-- } + + var ok, chunkCreated bool + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax) + if chunkCreated { + r, ok := oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + collectOOORecords() + } + + if oooMmapMarkers == nil { + oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + oooMmapMarkers[series.ref] = mmapRefs + oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + oooMmapMarkersCount++ + } + } + if ok { + wblFloatHistograms = append(wblFloatHistograms, s) + if s.T < oooMinT { + oooMinT = s.T + } + if s.T > oooMaxT { + oooMaxT = s.T + } + oooHistogramAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + histogramsAppended-- + } + default: + ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts) + if ok { + if s.T < inOrderMint { + inOrderMint = s.T + } + if s.T > inOrderMaxt { + inOrderMaxt = s.T + } + } else { + histogramsAppended-- + histoOOORejected++ + } + } + if chunkCreated { a.head.metrics.chunks.Inc() a.head.metrics.chunksCreated.Inc() } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() } for i, m := range a.metadata { @@ -1062,7 +1293,8 @@ func (a *headAppender) Commit() (err error) { a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted)) + a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted)) + a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(oooHistogramAccepted)) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT) @@ -1080,7 +1312,7 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } @@ -1091,7 +1323,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk chunkCreated = true } - ok := c.chunk.Insert(t, v) + ok := c.chunk.Insert(t, v, h, fh) if ok { if chunkCreated || t < c.minTime { c.minTime = t @@ -1464,9 +1696,9 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap handleChunkWriteError(err) return nil } - chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1) + chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks)) for _, memchunk := range chks { - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError) + chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError) chunkRefs = append(chunkRefs, chunkRef) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ ref: chunkRef, diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 2852709a04..975aeb99e3 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -649,9 +649,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { - // Track number of samples, m-map markers, that referenced a series we don't know about + // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about // for error reporting. - var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 + var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -660,8 +660,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch concurrency = h.opts.WALReplayConcurrency processors = make([]wblSubsetProcessor, concurrency) - dec = record.NewDecoder(syms) - shards = make([][]record.RefSample, concurrency) + dec record.Decoder + shards = make([][]record.RefSample, concurrency) + histogramShards = make([][]histogramRecord, concurrency) decodedCh = make(chan interface{}, 10) decodeErr error @@ -675,6 +676,16 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return []record.RefMmapMarker{} }, } + histogramSamplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefHistogramSample{} + }, + } + floatHistogramSamplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefFloatHistogramSample{} + }, + } ) defer func() { @@ -695,8 +706,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch processors[i].setup() go func(wp *wblSubsetProcessor) { - unknown := wp.processWBLSamples(h) + unknown, unknownHistograms := wp.processWBLSamples(h) unknownRefs.Add(unknown) + unknownHistogramRefs.Add(unknownHistograms) wg.Done() }(&processors[i]) } @@ -730,6 +742,30 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decodedCh <- markers + case record.HistogramSamples: + hists := histogramSamplesPool.Get().([]record.RefHistogramSample)[:0] + hists, err = dec.HistogramSamples(rec, hists) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: fmt.Errorf("decode histograms: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decodedCh <- hists + case record.FloatHistogramSamples: + hists := floatHistogramSamplesPool.Get().([]record.RefFloatHistogramSample)[:0] + hists, err = dec.FloatHistogramSamples(rec, hists) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: fmt.Errorf("decode float histograms: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decodedCh <- hists default: // Noop. } @@ -794,6 +830,70 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch idx := uint64(ms.ref) % uint64(concurrency) processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms} } + case []record.RefHistogramSample: + samples := v + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + m := 5000 + if len(samples) < m { + m = len(samples) + } + for i := 0; i < concurrency; i++ { + if histogramShards[i] == nil { + histogramShards[i] = processors[i].reuseHistogramBuf() + } + } + for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } + mod := uint64(sam.Ref) % uint64(concurrency) + histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) + } + for i := 0; i < concurrency; i++ { + if len(histogramShards[i]) > 0 { + processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]} + histogramShards[i] = nil + } + } + samples = samples[m:] + } + histogramSamplesPool.Put(v) //nolint:staticcheck + case []record.RefFloatHistogramSample: + samples := v + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + m := 5000 + if len(samples) < m { + m = len(samples) + } + for i := 0; i < concurrency; i++ { + if histogramShards[i] == nil { + histogramShards[i] = processors[i].reuseHistogramBuf() + } + } + for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } + mod := uint64(sam.Ref) % uint64(concurrency) + histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) + } + for i := 0; i < concurrency; i++ { + if len(histogramShards[i]) > 0 { + processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]} + histogramShards[i] = nil + } + } + samples = samples[m:] + } + floatHistogramSamplesPool.Put(v) //nolint:staticcheck default: panic(fmt.Errorf("unexpected decodedCh type: %T", d)) } @@ -836,17 +936,20 @@ func (e errLoadWbl) Unwrap() error { } type wblSubsetProcessor struct { - input chan wblSubsetProcessorInputItem - output chan []record.RefSample + input chan wblSubsetProcessorInputItem + output chan []record.RefSample + histogramsOutput chan []histogramRecord } type wblSubsetProcessorInputItem struct { - mmappedSeries *memSeries - samples []record.RefSample + mmappedSeries *memSeries + samples []record.RefSample + histogramSamples []histogramRecord } func (wp *wblSubsetProcessor) setup() { wp.output = make(chan []record.RefSample, 300) + wp.histogramsOutput = make(chan []histogramRecord, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300) } @@ -854,6 +957,8 @@ func (wp *wblSubsetProcessor) closeAndDrain() { close(wp.input) for range wp.output { } + for range wp.histogramsOutput { + } } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. @@ -866,10 +971,21 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample { return nil } +// If there is a buffer in the output chan, return it for reuse, otherwise return nil. +func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord { + select { + case buf := <-wp.histogramsOutput: + return buf[:0] + default: + } + return nil +} + // processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. -func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { +func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) { defer close(wp.output) + defer close(wp.histogramsOutput) oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. @@ -890,7 +1006,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { unknownRefs++ continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax) + ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -908,11 +1024,44 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { case wp.output <- in.samples: default: } + for _, s := range in.histogramSamples { + ms := h.series.getByID(s.ref) + if ms == nil { + unknownHistogramRefs++ + continue + } + if s.t <= ms.mmMaxTime { + continue + } + var chunkCreated bool + var ok bool + if s.h != nil { + ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax) + } else { + ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax) + } + if chunkCreated { + h.metrics.chunksCreated.Inc() + h.metrics.chunks.Inc() + } + if ok { + if s.t > maxt { + maxt = s.t + } + if s.t < mint { + mint = s.t + } + } + } + select { + case wp.histogramsOutput <- in.histogramSamples: + default: + } } h.updateMinOOOMaxOOOTime(mint, maxt) - return unknownRefs + return unknownRefs, unknownHistogramRefs } const ( diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index b2556d62e9..71abc52041 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -15,6 +15,7 @@ package tsdb import ( "fmt" + "github.com/prometheus/prometheus/model/histogram" "sort" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -39,13 +40,13 @@ func NewOOOChunk() *OOOChunk { // Insert inserts the sample such that order is maintained. // Returns false if insert was not possible due to the same timestamp already existing. -func (o *OOOChunk) Insert(t int64, v float64) bool { +func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { // Although out-of-order samples can be out-of-order amongst themselves, we // are opinionated and expect them to be usually in-order meaning we could // try to append at the end first if the new timestamp is higher than the // last known timestamp. if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { - o.samples = append(o.samples, sample{t, v, nil, nil}) + o.samples = append(o.samples, sample{t, v, h, fh}) return true } @@ -54,7 +55,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { if i >= len(o.samples) { // none found. append it at the end - o.samples = append(o.samples, sample{t, v, nil, nil}) + o.samples = append(o.samples, sample{t, v, h, fh}) return true } @@ -66,7 +67,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. o.samples = append(o.samples, sample{}) copy(o.samples[i+1:], o.samples[i:]) - o.samples[i] = sample{t, v, nil, nil} + o.samples[i] = sample{t, v, h, fh} return true }