Use histogram records for custom value handling

This commit is contained in:
Carrie Edwards 2024-10-28 08:43:00 -07:00
parent aa144b7263
commit 6d413fad36
5 changed files with 34 additions and 198 deletions

View file

@ -181,7 +181,6 @@ func (h *Head) appender() *headAppender {
samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(),
exemplars: exemplarsBuf,
customValues: h.getCustomValuesBuffer(),
histograms: h.getHistogramBuffer(),
floatHistograms: h.getFloatHistogramBuffer(),
metadata: h.getMetadataBuffer(),
@ -245,18 +244,6 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
h.exemplarsPool.Put(b[:0])
}
func (h *Head) getCustomValuesBuffer() []record.RefCustomValues {
b := h.customValuesPool.Get()
if b == nil {
return make([]record.RefCustomValues, 0, 512)
}
return b
}
func (h *Head) putCustomValuesBuffer(b []record.RefCustomValues) {
h.customValuesPool.Put(b[:0])
}
func (h *Head) getHistogramBuffer() []record.RefHistogramSample {
b := h.histogramsPool.Get()
if b == nil {
@ -339,7 +326,6 @@ type headAppender struct {
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
customValues []record.RefCustomValues // Custom values for histograms that use custom buckets held by this appender.
metadata []record.RefMetadata // New metadata held by this appender.
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
@ -704,13 +690,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = &histogram.Histogram{}
if histogram.IsCustomBucketsSchema(h.Schema) {
a.customValues = append(a.customValues, record.RefCustomValues{
Ref: s.ref,
CustomValues: h.CustomValues,
})
}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
@ -746,12 +726,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
if histogram.IsCustomBucketsSchema(fh.Schema) {
a.customValues = append(a.customValues, record.RefCustomValues{
Ref: s.ref,
CustomValues: fh.CustomValues,
})
}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
@ -967,13 +941,6 @@ func (a *headAppender) log() error {
return fmt.Errorf("log samples: %w", err)
}
}
if len(a.customValues) > 0 {
rec = enc.CustomValues(a.customValues, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log custom values: %w", err)
}
}
if len(a.histograms) > 0 {
rec = enc.HistogramSamples(a.histograms, buf)
buf = rec[:0]
@ -1460,7 +1427,6 @@ func (a *headAppender) Commit() (err error) {
defer a.head.putAppendBuffer(a.samples)
defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.head.putExemplarBuffer(a.exemplars)
defer a.head.putCustomValuesBuffer(a.customValues)
defer a.head.putHistogramBuffer(a.histograms)
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
defer a.head.putMetadataBuffer(a.metadata)
@ -1982,7 +1948,6 @@ func (a *headAppender) Rollback() (err error) {
}
a.head.putAppendBuffer(a.samples)
a.head.putExemplarBuffer(a.exemplars)
a.head.putCustomValuesBuffer(a.customValues)
a.head.putHistogramBuffer(a.histograms)
a.head.putFloatHistogramBuffer(a.floatHistograms)
a.head.putMetadataBuffer(a.metadata)

View file

@ -58,7 +58,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
var unknownExemplarRefs atomic.Uint64
var unknownHistogramRefs atomic.Uint64
var unknownMetadataRefs atomic.Uint64
var unknownCustomValuesRefs atomic.Uint64
// Track number of series records that had overlapping m-map chunks.
var mmapOverlappingChunks atomic.Uint64
@ -82,7 +82,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
metadataPool zeropool.Pool[[]record.RefMetadata]
customValuesPool zeropool.Pool[[]record.RefCustomValues]
)
defer func() {
@ -225,18 +224,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decoded <- meta
case record.CustomValues:
customVals := customValuesPool.Get()[:0]
customVals, err := dec.CustomValues(rec, customVals)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode custom values: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- customVals
default:
// Noop.
}
@ -428,29 +415,6 @@ Outer:
}
}
metadataPool.Put(v)
case []record.RefCustomValues:
for _, cv := range v {
s := h.series.getByID(cv.Ref)
if s == nil {
unknownCustomValuesRefs.Inc()
continue
}
//TODO: do we actually want to check lastFloatHistogramValue?
if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.customValues = cv.CustomValues
}
}
customValuesPool.Put(v)
// iterate over custom value records and do same things as for series/samples - put them in correct processor
// processor depends on series ref
// something like this:
// idx := uint64(mSeries.ref) % uint64(concurrency)
// processors[idx].input <- walSubsetProcessorInputItem{customValues: }
//for _, cv := range v {
// idx := uint64(cv.Ref) % uint64(concurrency)
// processors[idx].input <- walSubsetProcessorInputItem{customValues: cv}
//}
//customValuesPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -720,7 +684,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
// for error reporting.
var unknownRefs, unknownHistogramRefs, unknownCustomValuesRefs, mmapMarkerUnknownRefs atomic.Uint64
var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
lastSeq, lastOff := lastMmapRef.Unpack()
// Start workers that each process samples for a partition of the series ID space.
@ -820,18 +784,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decodedCh <- hists
case record.CustomValues:
customVals := customValuesPool.Get().([]record.RefCustomValues)[:0]
customVals, err := dec.CustomValues(rec, customVals)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode custom values: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- customVals
default:
// Noop.
}
@ -916,18 +868,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
if histogram.IsCustomBucketsSchema(sam.H.Schema) {
ms := h.series.getByID(sam.Ref)
if ms == nil {
unknownHistogramRefs.Inc()
continue
}
if ms.lastFloatHistogramValue != nil {
sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
} else {
sam.H.CustomValues = ms.customValues
}
}
mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
}
@ -960,14 +900,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
ms := h.series.getByID(sam.Ref)
if ms == nil {
unknownHistogramRefs.Inc()
continue
}
sam.FH.CustomValues = ms.customValues
}
mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
}

View file

@ -513,6 +513,18 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
for i := range h.NegativeBuckets {
h.NegativeBuckets[i] = buf.Varint64()
}
if histogram.IsCustomBucketsSchema(h.Schema) {
l = buf.Uvarint()
if l > 0 {
if l > 0 {
h.CustomValues = make([]float64, l)
}
for i := range h.CustomValues {
h.CustomValues[i] = buf.Be64Float64()
}
}
}
}
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
@ -595,39 +607,18 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
for i := range fh.NegativeBuckets {
fh.NegativeBuckets[i] = buf.Be64Float64()
}
}
// TODO: optimize
func (d *Decoder) CustomValues(rec []byte, customValues []RefCustomValues) ([]RefCustomValues, error) {
dec := encoding.Decbuf{B: rec}
if Type(dec.Byte()) != CustomValues {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
return customValues, nil
}
for len(dec.B) > 0 && dec.Err() == nil {
ref := storage.SeriesRef(dec.Be64())
l := dec.Uvarint()
if histogram.IsCustomBucketsSchema(fh.Schema) {
l = buf.Uvarint()
if l > 0 {
vals := make([]float64, l)
for i := range vals {
vals[i] = dec.Be64Float64()
if l > 0 {
fh.CustomValues = make([]float64, l)
}
for i := range fh.CustomValues {
fh.CustomValues[i] = buf.Be64Float64()
}
customValues = append(customValues, RefCustomValues{
Ref: chunks.HeadSeriesRef(ref),
CustomValues: vals,
})
}
}
if dec.Err() != nil {
return nil, dec.Err()
}
if len(dec.B) > 0 {
return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return customValues, nil
}
// Encoder encodes series, sample, and tombstones records.
@ -813,6 +804,13 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
for _, b := range h.NegativeBuckets {
buf.PutVarint64(b)
}
if histogram.IsCustomBucketsSchema(h.Schema) {
buf.PutUvarint(len(h.CustomValues))
for _, v := range h.CustomValues {
buf.PutBEFloat64(v)
}
}
}
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
@ -871,28 +869,11 @@ func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) {
for _, b := range h.NegativeBuckets {
buf.PutBEFloat64(b)
}
}
func (e *Encoder) CustomValues(customValues []RefCustomValues, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(CustomValues))
if len(customValues) == 0 {
return buf.Get()
}
for _, v := range customValues {
buf.PutBE64(uint64(v.Ref))
EncodeCustomValues(&buf, v.CustomValues)
}
return buf.Get()
}
// TODO: optimize
func EncodeCustomValues(buf *encoding.Encbuf, values []float64) {
buf.PutUvarint(len(values))
for _, v := range values {
buf.PutBEFloat64(v)
if histogram.IsCustomBucketsSchema(h.Schema) {
buf.PutUvarint(len(h.CustomValues))
for _, v := range h.CustomValues {
buf.PutBEFloat64(v)
}
}
}

View file

@ -181,22 +181,6 @@ func TestRecord_EncodeDecode(t *testing.T) {
decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
require.NoError(t, err)
require.Equal(t, floatHistograms, decFloatHistograms)
// Custom values for histograms
customValues := []RefCustomValues{
{
Ref: 56,
CustomValues: []float64{0, 1, 2, 3, 4},
},
{
Ref: 42,
CustomValues: []float64{5, 10, 15, 20, 25},
},
}
decCustomValues, err := dec.CustomValues(enc.CustomValues(customValues, nil), nil)
require.NoError(t, err)
require.Equal(t, customValues, decCustomValues)
}
// TestRecord_Corrupted ensures that corrupted records return the correct error.
@ -285,15 +269,6 @@ func TestRecord_Corrupted(t *testing.T) {
_, err := dec.HistogramSamples(corrupted, nil)
require.ErrorIs(t, err, encoding.ErrInvalidSize)
})
t.Run("Test corrupted customValues record", func(t *testing.T) {
customValues := []RefCustomValues{
{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}},
}
corrupted := enc.CustomValues(customValues, nil)[:8]
_, err := dec.CustomValues(corrupted, nil)
require.ErrorIs(t, err, encoding.ErrInvalidSize)
})
}
func TestRecord_Type(t *testing.T) {
@ -337,10 +312,6 @@ func TestRecord_Type(t *testing.T) {
recordType = dec.Type(enc.HistogramSamples(histograms, nil))
require.Equal(t, HistogramSamples, recordType)
customValues := []RefCustomValues{{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}}
recordType = dec.Type(enc.CustomValues(customValues, nil))
require.Equal(t, CustomValues, recordType)
recordType = dec.Type(nil)
require.Equal(t, Unknown, recordType)

View file

@ -151,7 +151,6 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
samples []record.RefSample
histogramSamples []record.RefHistogramSample
floatHistogramSamples []record.RefFloatHistogramSample
customValues []record.RefCustomValues
tstones []tombstones.Stone
exemplars []record.RefExemplar
metadata []record.RefMetadata
@ -300,18 +299,6 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
}
stats.TotalMetadata += len(metadata)
stats.DroppedMetadata += len(metadata) - repl
case record.CustomValues:
customValues, err = dec.CustomValues(rec, customValues)
if err != nil {
return nil, fmt.Errorf("decode custom values: %w", err)
}
repl := customValues[:0]
for _, v := range customValues {
repl = append(repl, v)
}
if len(repl) > 0 {
buf = enc.CustomValues(repl, buf)
}
default:
// Unknown record type, probably from a future Prometheus version.
continue