diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index cc7d0990f6..c339a9a5bb 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -104,6 +104,14 @@ func (e *Encbuf) PutHashSum(h hash.Hash) { e.B = h.Sum(e.B) } +// IsWholeWhenMultiplied checks to see if the number when multiplied by 1000 can +// be converted into an integer without losing precision. +func IsWholeWhenMultiplied(in float64) bool { + i := uint(math.Round(in * 1000)) + out := float64(i) / 1000 + return in == out +} + // Decbuf provides safe methods to extract data from a byte slice. It does all // necessary bounds checking and advancing of the byte slice. // Several datums can be extracted without checking for errors. However, before using diff --git a/tsdb/head.go b/tsdb/head.go index c67c438e52..b7a358a6a6 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -87,6 +87,7 @@ type Head struct { logger *slog.Logger appendPool zeropool.Pool[[]record.RefSample] exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef] + customValuesPool zeropool.Pool[[]record.RefCustomValues] histogramsPool zeropool.Pool[[]record.RefHistogramSample] floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] metadataPool zeropool.Pool[[]record.RefMetadata] @@ -2134,6 +2135,7 @@ type memSeries struct { // We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates. lastHistogramValue *histogram.Histogram lastFloatHistogramValue *histogram.FloatHistogram + customValues []float64 // Current appender for the head chunk. Set when a new head chunk is cut. // It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit diff --git a/tsdb/head_append.go b/tsdb/head_append.go index ea2a163f26..fbed0ee7eb 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -181,6 +181,7 @@ func (h *Head) appender() *headAppender { samples: h.getAppendBuffer(), sampleSeries: h.getSeriesBuffer(), exemplars: exemplarsBuf, + customValues: h.getCustomValuesBuffer(), histograms: h.getHistogramBuffer(), floatHistograms: h.getFloatHistogramBuffer(), metadata: h.getMetadataBuffer(), @@ -244,6 +245,18 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { h.exemplarsPool.Put(b[:0]) } +func (h *Head) getCustomValuesBuffer() []record.RefCustomValues { + b := h.customValuesPool.Get() + if b == nil { + return make([]record.RefCustomValues, 0, 512) + } + return b +} + +func (h *Head) putCustomValuesBuffer(b []record.RefCustomValues) { + h.customValuesPool.Put(b[:0]) +} + func (h *Head) getHistogramBuffer() []record.RefHistogramSample { b := h.histogramsPool.Get() if b == nil { @@ -326,6 +339,7 @@ type headAppender struct { histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender. floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + customValues []record.RefCustomValues // Custom values for histograms that use custom buckets held by this appender. metadata []record.RefMetadata // New metadata held by this appender. metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. exemplars []exemplarWithSeriesRef // New exemplars held by this appender. @@ -690,7 +704,12 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels // This whole "if" should be removed. if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { s.lastHistogramValue = &histogram.Histogram{} - } + if histogram.IsCustomBucketsSchema(h.Schema) { + a.customValues = append(a.customValues, record.RefCustomValues{ + Ref: s.ref, + CustomValues: h.CustomValues, + }) + } // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -727,6 +746,12 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels // This whole "if" should be removed. if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { s.lastFloatHistogramValue = &histogram.FloatHistogram{} + if histogram.IsCustomBucketsSchema(fh.Schema) { + a.customValues = append(a.customValues, record.RefCustomValues{ + Ref: s.ref, + CustomValues: fh.CustomValues, + }) + } } // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise @@ -942,6 +967,13 @@ func (a *headAppender) log() error { return fmt.Errorf("log samples: %w", err) } } + if len(a.customValues) > 0 { + rec = enc.CustomValues(a.customValues, buf) + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log custom values: %w", err) + } + } if len(a.histograms) > 0 { rec = enc.HistogramSamples(a.histograms, buf) buf = rec[:0] @@ -1428,6 +1460,7 @@ func (a *headAppender) Commit() (err error) { defer a.head.putAppendBuffer(a.samples) defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putExemplarBuffer(a.exemplars) + defer a.head.putCustomValuesBuffer(a.customValues) defer a.head.putHistogramBuffer(a.histograms) defer a.head.putFloatHistogramBuffer(a.floatHistograms) defer a.head.putMetadataBuffer(a.metadata) @@ -1949,6 +1982,7 @@ func (a *headAppender) Rollback() (err error) { } a.head.putAppendBuffer(a.samples) a.head.putExemplarBuffer(a.exemplars) + a.head.putCustomValuesBuffer(a.customValues) a.head.putHistogramBuffer(a.histograms) a.head.putFloatHistogramBuffer(a.floatHistograms) a.head.putMetadataBuffer(a.metadata) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 6744d582ae..637929428d 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -58,6 +58,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch var unknownExemplarRefs atomic.Uint64 var unknownHistogramRefs atomic.Uint64 var unknownMetadataRefs atomic.Uint64 + var unknownCustomValuesRefs atomic.Uint64 // Track number of series records that had overlapping m-map chunks. var mmapOverlappingChunks atomic.Uint64 @@ -81,6 +82,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch histogramsPool zeropool.Pool[[]record.RefHistogramSample] floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] metadataPool zeropool.Pool[[]record.RefMetadata] + customValuesPool zeropool.Pool[[]record.RefCustomValues] ) defer func() { @@ -223,6 +225,18 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- meta + case record.CustomValues: + customVals := customValuesPool.Get()[:0] + customVals, err := dec.CustomValues(rec, customVals) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: fmt.Errorf("decode custom values: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- customVals default: // Noop. } @@ -331,6 +345,19 @@ Outer: if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } + if histogram.IsCustomBucketsSchema(sam.H.Schema) { + ms := h.series.getByID(sam.Ref) + if ms == nil { + unknownHistogramRefs.Inc() + continue + } + + if ms.lastFloatHistogramValue != nil { + sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues + } else { + sam.H.CustomValues = ms.customValues + } + } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) } @@ -367,6 +394,14 @@ Outer: if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } + if histogram.IsCustomBucketsSchema(sam.FH.Schema) { + ms := h.series.getByID(sam.Ref) + if ms == nil { + unknownHistogramRefs.Inc() + continue + } + sam.FH.CustomValues = ms.customValues + } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) } @@ -393,6 +428,29 @@ Outer: } } metadataPool.Put(v) + case []record.RefCustomValues: + for _, cv := range v { + s := h.series.getByID(cv.Ref) + if s == nil { + unknownCustomValuesRefs.Inc() + continue + } + //TODO: do we actually want to check lastFloatHistogramValue? + if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.customValues = cv.CustomValues + } + } + customValuesPool.Put(v) + // iterate over custom value records and do same things as for series/samples - put them in correct processor + // processor depends on series ref + // something like this: + // idx := uint64(mSeries.ref) % uint64(concurrency) + // processors[idx].input <- walSubsetProcessorInputItem{customValues: } + //for _, cv := range v { + // idx := uint64(cv.Ref) % uint64(concurrency) + // processors[idx].input <- walSubsetProcessorInputItem{customValues: cv} + //} + //customValuesPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } @@ -616,10 +674,25 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.t <= ms.mmMaxTime { continue } + var chunkCreated bool if s.h != nil { + //if histogram.IsCustomBucketsSchema(s.h.Schema) { + // if ms.lastHistogramValue != nil { + // + // } + //} _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) } else { + //if histogram.IsCustomBucketsSchema(s.fh.Schema) { + // if ms.lastFloatHistogramValue != nil { + // s.h.CustomValues = ms.lastFloatHistogramValue.CustomValues + // } else { + // s.h.CustomValues = ms.customValues + // } + // //customVals := h. + // //s.h.CustomValues = + //} _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) } if chunkCreated { @@ -647,7 +720,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about // for error reporting. - var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 + var unknownRefs, unknownHistogramRefs, unknownCustomValuesRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -747,6 +820,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decodedCh <- hists + case record.CustomValues: + customVals := customValuesPool.Get().([]record.RefCustomValues)[:0] + customVals, err := dec.CustomValues(rec, customVals) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: fmt.Errorf("decode custom values: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decodedCh <- customVals default: // Noop. } @@ -831,6 +916,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } + if histogram.IsCustomBucketsSchema(sam.H.Schema) { + ms := h.series.getByID(sam.Ref) + if ms == nil { + unknownHistogramRefs.Inc() + continue + } + if ms.lastFloatHistogramValue != nil { + sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues + } else { + sam.H.CustomValues = ms.customValues + } + } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) } @@ -863,6 +960,14 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } + if histogram.IsCustomBucketsSchema(sam.FH.Schema) { + ms := h.series.getByID(sam.Ref) + if ms == nil { + unknownHistogramRefs.Inc() + continue + } + sam.FH.CustomValues = ms.customValues + } mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 784d0b23d7..6f5fb24384 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -52,6 +52,7 @@ const ( HistogramSamples Type = 7 // FloatHistogramSamples is used to match WAL records of type Float Histograms. FloatHistogramSamples Type = 8 + CustomValues Type = 9 ) func (rt Type) String() string { @@ -72,6 +73,8 @@ func (rt Type) String() string { return "mmapmarkers" case Metadata: return "metadata" + case CustomValues: + return "custom_values" default: return "unknown" } @@ -147,6 +150,11 @@ type RefSeries struct { Labels labels.Labels } +type RefCustomValues struct { + Ref chunks.HeadSeriesRef + CustomValues []float64 +} + // RefSample is a timestamp/value pair associated with a reference to a series. // TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample. type RefSample struct { @@ -207,7 +215,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples: + case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomValues: return t } return Unknown @@ -589,6 +597,39 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) { } } +// TODO: optimize +func (d *Decoder) CustomValues(rec []byte, customValues []RefCustomValues) ([]RefCustomValues, error) { + dec := encoding.Decbuf{B: rec} + + if Type(dec.Byte()) != CustomValues { + return nil, errors.New("invalid record type") + } + if dec.Len() == 0 { + return customValues, nil + } + for len(dec.B) > 0 && dec.Err() == nil { + ref := storage.SeriesRef(dec.Be64()) + l := dec.Uvarint() + if l > 0 { + vals := make([]float64, l) + for i := range vals { + vals[i] = dec.Be64Float64() + } + customValues = append(customValues, RefCustomValues{ + Ref: chunks.HeadSeriesRef(ref), + CustomValues: vals, + }) + } + } + if dec.Err() != nil { + return nil, dec.Err() + } + if len(dec.B) > 0 { + return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + return customValues, nil +} + // Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. type Encoder struct{} @@ -831,3 +872,27 @@ func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) { buf.PutBEFloat64(b) } } + +func (e *Encoder) CustomValues(customValues []RefCustomValues, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(CustomValues)) + + if len(customValues) == 0 { + return buf.Get() + } + + for _, v := range customValues { + buf.PutBE64(uint64(v.Ref)) + EncodeCustomValues(&buf, v.CustomValues) + } + + return buf.Get() +} + +// TODO: optimize +func EncodeCustomValues(buf *encoding.Encbuf, values []float64) { + buf.PutUvarint(len(values)) + for _, v := range values { + buf.PutBEFloat64(v) + } +} diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index f3a657aecb..c5f9f09f26 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -181,6 +181,22 @@ func TestRecord_EncodeDecode(t *testing.T) { decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) require.NoError(t, err) require.Equal(t, floatHistograms, decFloatHistograms) + + // Custom values for histograms + customValues := []RefCustomValues{ + { + Ref: 56, + CustomValues: []float64{0, 1, 2, 3, 4}, + }, + { + Ref: 42, + CustomValues: []float64{5, 10, 15, 20, 25}, + }, + } + + decCustomValues, err := dec.CustomValues(enc.CustomValues(customValues, nil), nil) + require.NoError(t, err) + require.Equal(t, customValues, decCustomValues) } // TestRecord_Corrupted ensures that corrupted records return the correct error. @@ -269,6 +285,15 @@ func TestRecord_Corrupted(t *testing.T) { _, err := dec.HistogramSamples(corrupted, nil) require.ErrorIs(t, err, encoding.ErrInvalidSize) }) + + t.Run("Test corrupted customValues record", func(t *testing.T) { + customValues := []RefCustomValues{ + {Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}, + } + corrupted := enc.CustomValues(customValues, nil)[:8] + _, err := dec.CustomValues(corrupted, nil) + require.ErrorIs(t, err, encoding.ErrInvalidSize) + }) } func TestRecord_Type(t *testing.T) { @@ -312,6 +337,10 @@ func TestRecord_Type(t *testing.T) { recordType = dec.Type(enc.HistogramSamples(histograms, nil)) require.Equal(t, HistogramSamples, recordType) + customValues := []RefCustomValues{{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}} + recordType = dec.Type(enc.CustomValues(customValues, nil)) + require.Equal(t, CustomValues, recordType) + recordType = dec.Type(nil) require.Equal(t, Unknown, recordType) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 58e11c770e..a96d142495 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -151,6 +151,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He samples []record.RefSample histogramSamples []record.RefHistogramSample floatHistogramSamples []record.RefFloatHistogramSample + customValues []record.RefCustomValues tstones []tombstones.Stone exemplars []record.RefExemplar metadata []record.RefMetadata @@ -299,6 +300,18 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } stats.TotalMetadata += len(metadata) stats.DroppedMetadata += len(metadata) - repl + case record.CustomValues: + customValues, err = dec.CustomValues(rec, customValues) + if err != nil { + return nil, fmt.Errorf("decode custom values: %w", err) + } + repl := customValues[:0] + for _, v := range customValues { + repl = append(repl, v) + } + if len(repl) > 0 { + buf = enc.CustomValues(repl, buf) + } default: // Unknown record type, probably from a future Prometheus version. continue