mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Handle custom buckets in WAL and WBL
This commit is contained in:
parent
af2a1cb10c
commit
aa144b7263
|
@ -104,6 +104,14 @@ func (e *Encbuf) PutHashSum(h hash.Hash) {
|
||||||
e.B = h.Sum(e.B)
|
e.B = h.Sum(e.B)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsWholeWhenMultiplied checks to see if the number when multiplied by 1000 can
|
||||||
|
// be converted into an integer without losing precision.
|
||||||
|
func IsWholeWhenMultiplied(in float64) bool {
|
||||||
|
i := uint(math.Round(in * 1000))
|
||||||
|
out := float64(i) / 1000
|
||||||
|
return in == out
|
||||||
|
}
|
||||||
|
|
||||||
// Decbuf provides safe methods to extract data from a byte slice. It does all
|
// Decbuf provides safe methods to extract data from a byte slice. It does all
|
||||||
// necessary bounds checking and advancing of the byte slice.
|
// necessary bounds checking and advancing of the byte slice.
|
||||||
// Several datums can be extracted without checking for errors. However, before using
|
// Several datums can be extracted without checking for errors. However, before using
|
||||||
|
|
|
@ -87,6 +87,7 @@ type Head struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
appendPool zeropool.Pool[[]record.RefSample]
|
appendPool zeropool.Pool[[]record.RefSample]
|
||||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||||
|
customValuesPool zeropool.Pool[[]record.RefCustomValues]
|
||||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||||
|
@ -2134,6 +2135,7 @@ type memSeries struct {
|
||||||
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
|
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||||||
lastHistogramValue *histogram.Histogram
|
lastHistogramValue *histogram.Histogram
|
||||||
lastFloatHistogramValue *histogram.FloatHistogram
|
lastFloatHistogramValue *histogram.FloatHistogram
|
||||||
|
customValues []float64
|
||||||
|
|
||||||
// Current appender for the head chunk. Set when a new head chunk is cut.
|
// Current appender for the head chunk. Set when a new head chunk is cut.
|
||||||
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
||||||
|
|
|
@ -181,6 +181,7 @@ func (h *Head) appender() *headAppender {
|
||||||
samples: h.getAppendBuffer(),
|
samples: h.getAppendBuffer(),
|
||||||
sampleSeries: h.getSeriesBuffer(),
|
sampleSeries: h.getSeriesBuffer(),
|
||||||
exemplars: exemplarsBuf,
|
exemplars: exemplarsBuf,
|
||||||
|
customValues: h.getCustomValuesBuffer(),
|
||||||
histograms: h.getHistogramBuffer(),
|
histograms: h.getHistogramBuffer(),
|
||||||
floatHistograms: h.getFloatHistogramBuffer(),
|
floatHistograms: h.getFloatHistogramBuffer(),
|
||||||
metadata: h.getMetadataBuffer(),
|
metadata: h.getMetadataBuffer(),
|
||||||
|
@ -244,6 +245,18 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
|
||||||
h.exemplarsPool.Put(b[:0])
|
h.exemplarsPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Head) getCustomValuesBuffer() []record.RefCustomValues {
|
||||||
|
b := h.customValuesPool.Get()
|
||||||
|
if b == nil {
|
||||||
|
return make([]record.RefCustomValues, 0, 512)
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) putCustomValuesBuffer(b []record.RefCustomValues) {
|
||||||
|
h.customValuesPool.Put(b[:0])
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Head) getHistogramBuffer() []record.RefHistogramSample {
|
func (h *Head) getHistogramBuffer() []record.RefHistogramSample {
|
||||||
b := h.histogramsPool.Get()
|
b := h.histogramsPool.Get()
|
||||||
if b == nil {
|
if b == nil {
|
||||||
|
@ -326,6 +339,7 @@ type headAppender struct {
|
||||||
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||||
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
|
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
|
||||||
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||||
|
customValues []record.RefCustomValues // Custom values for histograms that use custom buckets held by this appender.
|
||||||
metadata []record.RefMetadata // New metadata held by this appender.
|
metadata []record.RefMetadata // New metadata held by this appender.
|
||||||
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
|
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
|
||||||
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
|
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
|
||||||
|
@ -690,7 +704,12 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||||
// This whole "if" should be removed.
|
// This whole "if" should be removed.
|
||||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||||
s.lastHistogramValue = &histogram.Histogram{}
|
s.lastHistogramValue = &histogram.Histogram{}
|
||||||
}
|
if histogram.IsCustomBucketsSchema(h.Schema) {
|
||||||
|
a.customValues = append(a.customValues, record.RefCustomValues{
|
||||||
|
Ref: s.ref,
|
||||||
|
CustomValues: h.CustomValues,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||||
// to skip that sample from the WAL and write only in the WBL.
|
// to skip that sample from the WAL and write only in the WBL.
|
||||||
|
@ -727,6 +746,12 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||||
// This whole "if" should be removed.
|
// This whole "if" should be removed.
|
||||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||||
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||||
|
if histogram.IsCustomBucketsSchema(fh.Schema) {
|
||||||
|
a.customValues = append(a.customValues, record.RefCustomValues{
|
||||||
|
Ref: s.ref,
|
||||||
|
CustomValues: fh.CustomValues,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||||
|
@ -942,6 +967,13 @@ func (a *headAppender) log() error {
|
||||||
return fmt.Errorf("log samples: %w", err)
|
return fmt.Errorf("log samples: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(a.customValues) > 0 {
|
||||||
|
rec = enc.CustomValues(a.customValues, buf)
|
||||||
|
buf = rec[:0]
|
||||||
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
|
return fmt.Errorf("log custom values: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if len(a.histograms) > 0 {
|
if len(a.histograms) > 0 {
|
||||||
rec = enc.HistogramSamples(a.histograms, buf)
|
rec = enc.HistogramSamples(a.histograms, buf)
|
||||||
buf = rec[:0]
|
buf = rec[:0]
|
||||||
|
@ -1428,6 +1460,7 @@ func (a *headAppender) Commit() (err error) {
|
||||||
defer a.head.putAppendBuffer(a.samples)
|
defer a.head.putAppendBuffer(a.samples)
|
||||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||||
defer a.head.putExemplarBuffer(a.exemplars)
|
defer a.head.putExemplarBuffer(a.exemplars)
|
||||||
|
defer a.head.putCustomValuesBuffer(a.customValues)
|
||||||
defer a.head.putHistogramBuffer(a.histograms)
|
defer a.head.putHistogramBuffer(a.histograms)
|
||||||
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
|
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||||
defer a.head.putMetadataBuffer(a.metadata)
|
defer a.head.putMetadataBuffer(a.metadata)
|
||||||
|
@ -1949,6 +1982,7 @@ func (a *headAppender) Rollback() (err error) {
|
||||||
}
|
}
|
||||||
a.head.putAppendBuffer(a.samples)
|
a.head.putAppendBuffer(a.samples)
|
||||||
a.head.putExemplarBuffer(a.exemplars)
|
a.head.putExemplarBuffer(a.exemplars)
|
||||||
|
a.head.putCustomValuesBuffer(a.customValues)
|
||||||
a.head.putHistogramBuffer(a.histograms)
|
a.head.putHistogramBuffer(a.histograms)
|
||||||
a.head.putFloatHistogramBuffer(a.floatHistograms)
|
a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||||
a.head.putMetadataBuffer(a.metadata)
|
a.head.putMetadataBuffer(a.metadata)
|
||||||
|
|
107
tsdb/head_wal.go
107
tsdb/head_wal.go
|
@ -58,6 +58,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
var unknownExemplarRefs atomic.Uint64
|
var unknownExemplarRefs atomic.Uint64
|
||||||
var unknownHistogramRefs atomic.Uint64
|
var unknownHistogramRefs atomic.Uint64
|
||||||
var unknownMetadataRefs atomic.Uint64
|
var unknownMetadataRefs atomic.Uint64
|
||||||
|
var unknownCustomValuesRefs atomic.Uint64
|
||||||
// Track number of series records that had overlapping m-map chunks.
|
// Track number of series records that had overlapping m-map chunks.
|
||||||
var mmapOverlappingChunks atomic.Uint64
|
var mmapOverlappingChunks atomic.Uint64
|
||||||
|
|
||||||
|
@ -81,6 +82,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||||
|
customValuesPool zeropool.Pool[[]record.RefCustomValues]
|
||||||
)
|
)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -223,6 +225,18 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
decoded <- meta
|
decoded <- meta
|
||||||
|
case record.CustomValues:
|
||||||
|
customVals := customValuesPool.Get()[:0]
|
||||||
|
customVals, err := dec.CustomValues(rec, customVals)
|
||||||
|
if err != nil {
|
||||||
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
Err: fmt.Errorf("decode custom values: %w", err),
|
||||||
|
Segment: r.Segment(),
|
||||||
|
Offset: r.Offset(),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
decoded <- customVals
|
||||||
default:
|
default:
|
||||||
// Noop.
|
// Noop.
|
||||||
}
|
}
|
||||||
|
@ -331,6 +345,19 @@ Outer:
|
||||||
if r, ok := multiRef[sam.Ref]; ok {
|
if r, ok := multiRef[sam.Ref]; ok {
|
||||||
sam.Ref = r
|
sam.Ref = r
|
||||||
}
|
}
|
||||||
|
if histogram.IsCustomBucketsSchema(sam.H.Schema) {
|
||||||
|
ms := h.series.getByID(sam.Ref)
|
||||||
|
if ms == nil {
|
||||||
|
unknownHistogramRefs.Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if ms.lastFloatHistogramValue != nil {
|
||||||
|
sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||||
|
} else {
|
||||||
|
sam.H.CustomValues = ms.customValues
|
||||||
|
}
|
||||||
|
}
|
||||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
||||||
}
|
}
|
||||||
|
@ -367,6 +394,14 @@ Outer:
|
||||||
if r, ok := multiRef[sam.Ref]; ok {
|
if r, ok := multiRef[sam.Ref]; ok {
|
||||||
sam.Ref = r
|
sam.Ref = r
|
||||||
}
|
}
|
||||||
|
if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
|
||||||
|
ms := h.series.getByID(sam.Ref)
|
||||||
|
if ms == nil {
|
||||||
|
unknownHistogramRefs.Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sam.FH.CustomValues = ms.customValues
|
||||||
|
}
|
||||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
||||||
}
|
}
|
||||||
|
@ -393,6 +428,29 @@ Outer:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
metadataPool.Put(v)
|
metadataPool.Put(v)
|
||||||
|
case []record.RefCustomValues:
|
||||||
|
for _, cv := range v {
|
||||||
|
s := h.series.getByID(cv.Ref)
|
||||||
|
if s == nil {
|
||||||
|
unknownCustomValuesRefs.Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//TODO: do we actually want to check lastFloatHistogramValue?
|
||||||
|
if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||||
|
s.customValues = cv.CustomValues
|
||||||
|
}
|
||||||
|
}
|
||||||
|
customValuesPool.Put(v)
|
||||||
|
// iterate over custom value records and do same things as for series/samples - put them in correct processor
|
||||||
|
// processor depends on series ref
|
||||||
|
// something like this:
|
||||||
|
// idx := uint64(mSeries.ref) % uint64(concurrency)
|
||||||
|
// processors[idx].input <- walSubsetProcessorInputItem{customValues: }
|
||||||
|
//for _, cv := range v {
|
||||||
|
// idx := uint64(cv.Ref) % uint64(concurrency)
|
||||||
|
// processors[idx].input <- walSubsetProcessorInputItem{customValues: cv}
|
||||||
|
//}
|
||||||
|
//customValuesPool.Put(v)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||||
}
|
}
|
||||||
|
@ -616,10 +674,25 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
if s.t <= ms.mmMaxTime {
|
if s.t <= ms.mmMaxTime {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var chunkCreated bool
|
var chunkCreated bool
|
||||||
if s.h != nil {
|
if s.h != nil {
|
||||||
|
//if histogram.IsCustomBucketsSchema(s.h.Schema) {
|
||||||
|
// if ms.lastHistogramValue != nil {
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//}
|
||||||
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
|
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
|
||||||
} else {
|
} else {
|
||||||
|
//if histogram.IsCustomBucketsSchema(s.fh.Schema) {
|
||||||
|
// if ms.lastFloatHistogramValue != nil {
|
||||||
|
// s.h.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||||
|
// } else {
|
||||||
|
// s.h.CustomValues = ms.customValues
|
||||||
|
// }
|
||||||
|
// //customVals := h.
|
||||||
|
// //s.h.CustomValues =
|
||||||
|
//}
|
||||||
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
|
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
|
||||||
}
|
}
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
|
@ -647,7 +720,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
||||||
// Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
|
// Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
|
||||||
// for error reporting.
|
// for error reporting.
|
||||||
var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
|
var unknownRefs, unknownHistogramRefs, unknownCustomValuesRefs, mmapMarkerUnknownRefs atomic.Uint64
|
||||||
|
|
||||||
lastSeq, lastOff := lastMmapRef.Unpack()
|
lastSeq, lastOff := lastMmapRef.Unpack()
|
||||||
// Start workers that each process samples for a partition of the series ID space.
|
// Start workers that each process samples for a partition of the series ID space.
|
||||||
|
@ -747,6 +820,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
decodedCh <- hists
|
decodedCh <- hists
|
||||||
|
case record.CustomValues:
|
||||||
|
customVals := customValuesPool.Get().([]record.RefCustomValues)[:0]
|
||||||
|
customVals, err := dec.CustomValues(rec, customVals)
|
||||||
|
if err != nil {
|
||||||
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
Err: fmt.Errorf("decode custom values: %w", err),
|
||||||
|
Segment: r.Segment(),
|
||||||
|
Offset: r.Offset(),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
decodedCh <- customVals
|
||||||
default:
|
default:
|
||||||
// Noop.
|
// Noop.
|
||||||
}
|
}
|
||||||
|
@ -831,6 +916,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
if r, ok := multiRef[sam.Ref]; ok {
|
if r, ok := multiRef[sam.Ref]; ok {
|
||||||
sam.Ref = r
|
sam.Ref = r
|
||||||
}
|
}
|
||||||
|
if histogram.IsCustomBucketsSchema(sam.H.Schema) {
|
||||||
|
ms := h.series.getByID(sam.Ref)
|
||||||
|
if ms == nil {
|
||||||
|
unknownHistogramRefs.Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ms.lastFloatHistogramValue != nil {
|
||||||
|
sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||||
|
} else {
|
||||||
|
sam.H.CustomValues = ms.customValues
|
||||||
|
}
|
||||||
|
}
|
||||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
||||||
}
|
}
|
||||||
|
@ -863,6 +960,14 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
if r, ok := multiRef[sam.Ref]; ok {
|
if r, ok := multiRef[sam.Ref]; ok {
|
||||||
sam.Ref = r
|
sam.Ref = r
|
||||||
}
|
}
|
||||||
|
if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
|
||||||
|
ms := h.series.getByID(sam.Ref)
|
||||||
|
if ms == nil {
|
||||||
|
unknownHistogramRefs.Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sam.FH.CustomValues = ms.customValues
|
||||||
|
}
|
||||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ const (
|
||||||
HistogramSamples Type = 7
|
HistogramSamples Type = 7
|
||||||
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
|
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
|
||||||
FloatHistogramSamples Type = 8
|
FloatHistogramSamples Type = 8
|
||||||
|
CustomValues Type = 9
|
||||||
)
|
)
|
||||||
|
|
||||||
func (rt Type) String() string {
|
func (rt Type) String() string {
|
||||||
|
@ -72,6 +73,8 @@ func (rt Type) String() string {
|
||||||
return "mmapmarkers"
|
return "mmapmarkers"
|
||||||
case Metadata:
|
case Metadata:
|
||||||
return "metadata"
|
return "metadata"
|
||||||
|
case CustomValues:
|
||||||
|
return "custom_values"
|
||||||
default:
|
default:
|
||||||
return "unknown"
|
return "unknown"
|
||||||
}
|
}
|
||||||
|
@ -147,6 +150,11 @@ type RefSeries struct {
|
||||||
Labels labels.Labels
|
Labels labels.Labels
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RefCustomValues struct {
|
||||||
|
Ref chunks.HeadSeriesRef
|
||||||
|
CustomValues []float64
|
||||||
|
}
|
||||||
|
|
||||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||||
// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample.
|
// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample.
|
||||||
type RefSample struct {
|
type RefSample struct {
|
||||||
|
@ -207,7 +215,7 @@ func (d *Decoder) Type(rec []byte) Type {
|
||||||
return Unknown
|
return Unknown
|
||||||
}
|
}
|
||||||
switch t := Type(rec[0]); t {
|
switch t := Type(rec[0]); t {
|
||||||
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples:
|
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomValues:
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
return Unknown
|
return Unknown
|
||||||
|
@ -589,6 +597,39 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: optimize
|
||||||
|
func (d *Decoder) CustomValues(rec []byte, customValues []RefCustomValues) ([]RefCustomValues, error) {
|
||||||
|
dec := encoding.Decbuf{B: rec}
|
||||||
|
|
||||||
|
if Type(dec.Byte()) != CustomValues {
|
||||||
|
return nil, errors.New("invalid record type")
|
||||||
|
}
|
||||||
|
if dec.Len() == 0 {
|
||||||
|
return customValues, nil
|
||||||
|
}
|
||||||
|
for len(dec.B) > 0 && dec.Err() == nil {
|
||||||
|
ref := storage.SeriesRef(dec.Be64())
|
||||||
|
l := dec.Uvarint()
|
||||||
|
if l > 0 {
|
||||||
|
vals := make([]float64, l)
|
||||||
|
for i := range vals {
|
||||||
|
vals[i] = dec.Be64Float64()
|
||||||
|
}
|
||||||
|
customValues = append(customValues, RefCustomValues{
|
||||||
|
Ref: chunks.HeadSeriesRef(ref),
|
||||||
|
CustomValues: vals,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dec.Err() != nil {
|
||||||
|
return nil, dec.Err()
|
||||||
|
}
|
||||||
|
if len(dec.B) > 0 {
|
||||||
|
return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||||
|
}
|
||||||
|
return customValues, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Encoder encodes series, sample, and tombstones records.
|
// Encoder encodes series, sample, and tombstones records.
|
||||||
// The zero value is ready to use.
|
// The zero value is ready to use.
|
||||||
type Encoder struct{}
|
type Encoder struct{}
|
||||||
|
@ -831,3 +872,27 @@ func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) {
|
||||||
buf.PutBEFloat64(b)
|
buf.PutBEFloat64(b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Encoder) CustomValues(customValues []RefCustomValues, b []byte) []byte {
|
||||||
|
buf := encoding.Encbuf{B: b}
|
||||||
|
buf.PutByte(byte(CustomValues))
|
||||||
|
|
||||||
|
if len(customValues) == 0 {
|
||||||
|
return buf.Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range customValues {
|
||||||
|
buf.PutBE64(uint64(v.Ref))
|
||||||
|
EncodeCustomValues(&buf, v.CustomValues)
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf.Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: optimize
|
||||||
|
func EncodeCustomValues(buf *encoding.Encbuf, values []float64) {
|
||||||
|
buf.PutUvarint(len(values))
|
||||||
|
for _, v := range values {
|
||||||
|
buf.PutBEFloat64(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -181,6 +181,22 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
||||||
decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
|
decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||||
|
|
||||||
|
// Custom values for histograms
|
||||||
|
customValues := []RefCustomValues{
|
||||||
|
{
|
||||||
|
Ref: 56,
|
||||||
|
CustomValues: []float64{0, 1, 2, 3, 4},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Ref: 42,
|
||||||
|
CustomValues: []float64{5, 10, 15, 20, 25},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
decCustomValues, err := dec.CustomValues(enc.CustomValues(customValues, nil), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, customValues, decCustomValues)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestRecord_Corrupted ensures that corrupted records return the correct error.
|
// TestRecord_Corrupted ensures that corrupted records return the correct error.
|
||||||
|
@ -269,6 +285,15 @@ func TestRecord_Corrupted(t *testing.T) {
|
||||||
_, err := dec.HistogramSamples(corrupted, nil)
|
_, err := dec.HistogramSamples(corrupted, nil)
|
||||||
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("Test corrupted customValues record", func(t *testing.T) {
|
||||||
|
customValues := []RefCustomValues{
|
||||||
|
{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}},
|
||||||
|
}
|
||||||
|
corrupted := enc.CustomValues(customValues, nil)[:8]
|
||||||
|
_, err := dec.CustomValues(corrupted, nil)
|
||||||
|
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRecord_Type(t *testing.T) {
|
func TestRecord_Type(t *testing.T) {
|
||||||
|
@ -312,6 +337,10 @@ func TestRecord_Type(t *testing.T) {
|
||||||
recordType = dec.Type(enc.HistogramSamples(histograms, nil))
|
recordType = dec.Type(enc.HistogramSamples(histograms, nil))
|
||||||
require.Equal(t, HistogramSamples, recordType)
|
require.Equal(t, HistogramSamples, recordType)
|
||||||
|
|
||||||
|
customValues := []RefCustomValues{{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}}
|
||||||
|
recordType = dec.Type(enc.CustomValues(customValues, nil))
|
||||||
|
require.Equal(t, CustomValues, recordType)
|
||||||
|
|
||||||
recordType = dec.Type(nil)
|
recordType = dec.Type(nil)
|
||||||
require.Equal(t, Unknown, recordType)
|
require.Equal(t, Unknown, recordType)
|
||||||
|
|
||||||
|
|
|
@ -151,6 +151,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
||||||
samples []record.RefSample
|
samples []record.RefSample
|
||||||
histogramSamples []record.RefHistogramSample
|
histogramSamples []record.RefHistogramSample
|
||||||
floatHistogramSamples []record.RefFloatHistogramSample
|
floatHistogramSamples []record.RefFloatHistogramSample
|
||||||
|
customValues []record.RefCustomValues
|
||||||
tstones []tombstones.Stone
|
tstones []tombstones.Stone
|
||||||
exemplars []record.RefExemplar
|
exemplars []record.RefExemplar
|
||||||
metadata []record.RefMetadata
|
metadata []record.RefMetadata
|
||||||
|
@ -299,6 +300,18 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
||||||
}
|
}
|
||||||
stats.TotalMetadata += len(metadata)
|
stats.TotalMetadata += len(metadata)
|
||||||
stats.DroppedMetadata += len(metadata) - repl
|
stats.DroppedMetadata += len(metadata) - repl
|
||||||
|
case record.CustomValues:
|
||||||
|
customValues, err = dec.CustomValues(rec, customValues)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode custom values: %w", err)
|
||||||
|
}
|
||||||
|
repl := customValues[:0]
|
||||||
|
for _, v := range customValues {
|
||||||
|
repl = append(repl, v)
|
||||||
|
}
|
||||||
|
if len(repl) > 0 {
|
||||||
|
buf = enc.CustomValues(repl, buf)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
// Unknown record type, probably from a future Prometheus version.
|
// Unknown record type, probably from a future Prometheus version.
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in a new issue