From 6d413fad361914372ccea6c99c74fb49fe1048d9 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 28 Oct 2024 08:43:00 -0700 Subject: [PATCH] Use histogram records for custom value handling --- tsdb/head_append.go | 37 +---------------- tsdb/head_wal.go | 72 +-------------------------------- tsdb/record/record.go | 81 +++++++++++++++----------------------- tsdb/record/record_test.go | 29 -------------- tsdb/wlog/checkpoint.go | 13 ------ 5 files changed, 34 insertions(+), 198 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index fbed0ee7eb..7601f7847b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -181,7 +181,6 @@ func (h *Head) appender() *headAppender { samples: h.getAppendBuffer(), sampleSeries: h.getSeriesBuffer(), exemplars: exemplarsBuf, - customValues: h.getCustomValuesBuffer(), histograms: h.getHistogramBuffer(), floatHistograms: h.getFloatHistogramBuffer(), metadata: h.getMetadataBuffer(), @@ -245,18 +244,6 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { h.exemplarsPool.Put(b[:0]) } -func (h *Head) getCustomValuesBuffer() []record.RefCustomValues { - b := h.customValuesPool.Get() - if b == nil { - return make([]record.RefCustomValues, 0, 512) - } - return b -} - -func (h *Head) putCustomValuesBuffer(b []record.RefCustomValues) { - h.customValuesPool.Put(b[:0]) -} - func (h *Head) getHistogramBuffer() []record.RefHistogramSample { b := h.histogramsPool.Get() if b == nil { @@ -339,7 +326,6 @@ type headAppender struct { histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender. floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - customValues []record.RefCustomValues // Custom values for histograms that use custom buckets held by this appender. metadata []record.RefMetadata // New metadata held by this appender. metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. exemplars []exemplarWithSeriesRef // New exemplars held by this appender. @@ -704,13 +690,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels // This whole "if" should be removed. if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { s.lastHistogramValue = &histogram.Histogram{} - if histogram.IsCustomBucketsSchema(h.Schema) { - a.customValues = append(a.customValues, record.RefCustomValues{ - Ref: s.ref, - CustomValues: h.CustomValues, - }) - } - + } // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) @@ -746,12 +726,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels // This whole "if" should be removed. if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { s.lastFloatHistogramValue = &histogram.FloatHistogram{} - if histogram.IsCustomBucketsSchema(fh.Schema) { - a.customValues = append(a.customValues, record.RefCustomValues{ - Ref: s.ref, - CustomValues: fh.CustomValues, - }) - } } // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise @@ -967,13 +941,6 @@ func (a *headAppender) log() error { return fmt.Errorf("log samples: %w", err) } } - if len(a.customValues) > 0 { - rec = enc.CustomValues(a.customValues, buf) - buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom values: %w", err) - } - } if len(a.histograms) > 0 { rec = enc.HistogramSamples(a.histograms, buf) buf = rec[:0] @@ -1460,7 +1427,6 @@ func (a *headAppender) Commit() (err error) { defer a.head.putAppendBuffer(a.samples) defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putExemplarBuffer(a.exemplars) - defer a.head.putCustomValuesBuffer(a.customValues) defer a.head.putHistogramBuffer(a.histograms) defer a.head.putFloatHistogramBuffer(a.floatHistograms) defer a.head.putMetadataBuffer(a.metadata) @@ -1982,7 +1948,6 @@ func (a *headAppender) Rollback() (err error) { } a.head.putAppendBuffer(a.samples) a.head.putExemplarBuffer(a.exemplars) - a.head.putCustomValuesBuffer(a.customValues) a.head.putHistogramBuffer(a.histograms) a.head.putFloatHistogramBuffer(a.floatHistograms) a.head.putMetadataBuffer(a.metadata) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 637929428d..885d14a08b 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -58,7 +58,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch var unknownExemplarRefs atomic.Uint64 var unknownHistogramRefs atomic.Uint64 var unknownMetadataRefs atomic.Uint64 - var unknownCustomValuesRefs atomic.Uint64 + // Track number of series records that had overlapping m-map chunks. var mmapOverlappingChunks atomic.Uint64 @@ -82,7 +82,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch histogramsPool zeropool.Pool[[]record.RefHistogramSample] floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] metadataPool zeropool.Pool[[]record.RefMetadata] - customValuesPool zeropool.Pool[[]record.RefCustomValues] ) defer func() { @@ -225,18 +224,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- meta - case record.CustomValues: - customVals := customValuesPool.Get()[:0] - customVals, err := dec.CustomValues(rec, customVals) - if err != nil { - decodeErr = &wlog.CorruptionErr{ - Err: fmt.Errorf("decode custom values: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- customVals default: // Noop. } @@ -428,29 +415,6 @@ Outer: } } metadataPool.Put(v) - case []record.RefCustomValues: - for _, cv := range v { - s := h.series.getByID(cv.Ref) - if s == nil { - unknownCustomValuesRefs.Inc() - continue - } - //TODO: do we actually want to check lastFloatHistogramValue? - if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.customValues = cv.CustomValues - } - } - customValuesPool.Put(v) - // iterate over custom value records and do same things as for series/samples - put them in correct processor - // processor depends on series ref - // something like this: - // idx := uint64(mSeries.ref) % uint64(concurrency) - // processors[idx].input <- walSubsetProcessorInputItem{customValues: } - //for _, cv := range v { - // idx := uint64(cv.Ref) % uint64(concurrency) - // processors[idx].input <- walSubsetProcessorInputItem{customValues: cv} - //} - //customValuesPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } @@ -720,7 +684,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about // for error reporting. - var unknownRefs, unknownHistogramRefs, unknownCustomValuesRefs, mmapMarkerUnknownRefs atomic.Uint64 + var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -820,18 +784,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decodedCh <- hists - case record.CustomValues: - customVals := customValuesPool.Get().([]record.RefCustomValues)[:0] - customVals, err := dec.CustomValues(rec, customVals) - if err != nil { - decodeErr = &wlog.CorruptionErr{ - Err: fmt.Errorf("decode custom values: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decodedCh <- customVals default: // Noop. } @@ -916,18 +868,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } - if histogram.IsCustomBucketsSchema(sam.H.Schema) { - ms := h.series.getByID(sam.Ref) - if ms == nil { - unknownHistogramRefs.Inc() - continue - } - if ms.lastFloatHistogramValue != nil { - sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues - } else { - sam.H.CustomValues = ms.customValues - } - } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) } @@ -960,14 +900,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } - if histogram.IsCustomBucketsSchema(sam.FH.Schema) { - ms := h.series.getByID(sam.Ref) - if ms == nil { - unknownHistogramRefs.Inc() - continue - } - sam.FH.CustomValues = ms.customValues - } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 6f5fb24384..d759c18551 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -513,6 +513,18 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) { for i := range h.NegativeBuckets { h.NegativeBuckets[i] = buf.Varint64() } + + if histogram.IsCustomBucketsSchema(h.Schema) { + l = buf.Uvarint() + if l > 0 { + if l > 0 { + h.CustomValues = make([]float64, l) + } + for i := range h.CustomValues { + h.CustomValues[i] = buf.Be64Float64() + } + } + } } func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) { @@ -595,39 +607,18 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) { for i := range fh.NegativeBuckets { fh.NegativeBuckets[i] = buf.Be64Float64() } -} -// TODO: optimize -func (d *Decoder) CustomValues(rec []byte, customValues []RefCustomValues) ([]RefCustomValues, error) { - dec := encoding.Decbuf{B: rec} - - if Type(dec.Byte()) != CustomValues { - return nil, errors.New("invalid record type") - } - if dec.Len() == 0 { - return customValues, nil - } - for len(dec.B) > 0 && dec.Err() == nil { - ref := storage.SeriesRef(dec.Be64()) - l := dec.Uvarint() + if histogram.IsCustomBucketsSchema(fh.Schema) { + l = buf.Uvarint() if l > 0 { - vals := make([]float64, l) - for i := range vals { - vals[i] = dec.Be64Float64() + if l > 0 { + fh.CustomValues = make([]float64, l) + } + for i := range fh.CustomValues { + fh.CustomValues[i] = buf.Be64Float64() } - customValues = append(customValues, RefCustomValues{ - Ref: chunks.HeadSeriesRef(ref), - CustomValues: vals, - }) } } - if dec.Err() != nil { - return nil, dec.Err() - } - if len(dec.B) > 0 { - return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) - } - return customValues, nil } // Encoder encodes series, sample, and tombstones records. @@ -813,6 +804,13 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) { for _, b := range h.NegativeBuckets { buf.PutVarint64(b) } + + if histogram.IsCustomBucketsSchema(h.Schema) { + buf.PutUvarint(len(h.CustomValues)) + for _, v := range h.CustomValues { + buf.PutBEFloat64(v) + } + } } func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { @@ -871,28 +869,11 @@ func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) { for _, b := range h.NegativeBuckets { buf.PutBEFloat64(b) } -} -func (e *Encoder) CustomValues(customValues []RefCustomValues, b []byte) []byte { - buf := encoding.Encbuf{B: b} - buf.PutByte(byte(CustomValues)) - - if len(customValues) == 0 { - return buf.Get() - } - - for _, v := range customValues { - buf.PutBE64(uint64(v.Ref)) - EncodeCustomValues(&buf, v.CustomValues) - } - - return buf.Get() -} - -// TODO: optimize -func EncodeCustomValues(buf *encoding.Encbuf, values []float64) { - buf.PutUvarint(len(values)) - for _, v := range values { - buf.PutBEFloat64(v) + if histogram.IsCustomBucketsSchema(h.Schema) { + buf.PutUvarint(len(h.CustomValues)) + for _, v := range h.CustomValues { + buf.PutBEFloat64(v) + } } } diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index c5f9f09f26..f3a657aecb 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -181,22 +181,6 @@ func TestRecord_EncodeDecode(t *testing.T) { decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) require.NoError(t, err) require.Equal(t, floatHistograms, decFloatHistograms) - - // Custom values for histograms - customValues := []RefCustomValues{ - { - Ref: 56, - CustomValues: []float64{0, 1, 2, 3, 4}, - }, - { - Ref: 42, - CustomValues: []float64{5, 10, 15, 20, 25}, - }, - } - - decCustomValues, err := dec.CustomValues(enc.CustomValues(customValues, nil), nil) - require.NoError(t, err) - require.Equal(t, customValues, decCustomValues) } // TestRecord_Corrupted ensures that corrupted records return the correct error. @@ -285,15 +269,6 @@ func TestRecord_Corrupted(t *testing.T) { _, err := dec.HistogramSamples(corrupted, nil) require.ErrorIs(t, err, encoding.ErrInvalidSize) }) - - t.Run("Test corrupted customValues record", func(t *testing.T) { - customValues := []RefCustomValues{ - {Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}, - } - corrupted := enc.CustomValues(customValues, nil)[:8] - _, err := dec.CustomValues(corrupted, nil) - require.ErrorIs(t, err, encoding.ErrInvalidSize) - }) } func TestRecord_Type(t *testing.T) { @@ -337,10 +312,6 @@ func TestRecord_Type(t *testing.T) { recordType = dec.Type(enc.HistogramSamples(histograms, nil)) require.Equal(t, HistogramSamples, recordType) - customValues := []RefCustomValues{{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}} - recordType = dec.Type(enc.CustomValues(customValues, nil)) - require.Equal(t, CustomValues, recordType) - recordType = dec.Type(nil) require.Equal(t, Unknown, recordType) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index a96d142495..58e11c770e 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -151,7 +151,6 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He samples []record.RefSample histogramSamples []record.RefHistogramSample floatHistogramSamples []record.RefFloatHistogramSample - customValues []record.RefCustomValues tstones []tombstones.Stone exemplars []record.RefExemplar metadata []record.RefMetadata @@ -300,18 +299,6 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } stats.TotalMetadata += len(metadata) stats.DroppedMetadata += len(metadata) - repl - case record.CustomValues: - customValues, err = dec.CustomValues(rec, customValues) - if err != nil { - return nil, fmt.Errorf("decode custom values: %w", err) - } - repl := customValues[:0] - for _, v := range customValues { - repl = append(repl, v) - } - if len(repl) > 0 { - buf = enc.CustomValues(repl, buf) - } default: // Unknown record type, probably from a future Prometheus version. continue