mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Move to in-histogram WAL record for the first iteration
Rationales: Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
1e85911492
commit
aebe4ae446
|
@ -104,14 +104,6 @@ func (e *Encbuf) PutHashSum(h hash.Hash) {
|
|||
e.B = h.Sum(e.B)
|
||||
}
|
||||
|
||||
// IsWholeWhenMultiplied checks to see if the number when multiplied by 1000 can
|
||||
// be converted into an integer without losing precision.
|
||||
func IsWholeWhenMultiplied(in float64) bool {
|
||||
i := uint(math.Round(in * 1000))
|
||||
out := float64(i) / 1000
|
||||
return in == out
|
||||
}
|
||||
|
||||
// Decbuf provides safe methods to extract data from a byte slice. It does all
|
||||
// necessary bounds checking and advancing of the byte slice.
|
||||
// Several datums can be extracted without checking for errors. However, before using
|
||||
|
|
|
@ -87,7 +87,6 @@ type Head struct {
|
|||
logger *slog.Logger
|
||||
appendPool zeropool.Pool[[]record.RefSample]
|
||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||
customValuesPool zeropool.Pool[[]record.RefCustomValues]
|
||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||
|
@ -2139,7 +2138,6 @@ type memSeries struct {
|
|||
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||||
lastHistogramValue *histogram.Histogram
|
||||
lastFloatHistogramValue *histogram.FloatHistogram
|
||||
customValues []float64
|
||||
|
||||
// Current appender for the head chunk. Set when a new head chunk is cut.
|
||||
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
||||
|
|
|
@ -175,7 +175,6 @@ func (h *Head) appender() *headAppender {
|
|||
samples: h.getAppendBuffer(),
|
||||
sampleSeries: h.getSeriesBuffer(),
|
||||
exemplars: exemplarsBuf,
|
||||
customValues: h.getCustomValuesBuffer(),
|
||||
histograms: h.getHistogramBuffer(),
|
||||
floatHistograms: h.getFloatHistogramBuffer(),
|
||||
metadata: h.getMetadataBuffer(),
|
||||
|
@ -239,18 +238,6 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
|
|||
h.exemplarsPool.Put(b[:0])
|
||||
}
|
||||
|
||||
func (h *Head) getCustomValuesBuffer() []record.RefCustomValues {
|
||||
b := h.customValuesPool.Get()
|
||||
if b == nil {
|
||||
return make([]record.RefCustomValues, 0, 512)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (h *Head) putCustomValuesBuffer(b []record.RefCustomValues) {
|
||||
h.customValuesPool.Put(b[:0])
|
||||
}
|
||||
|
||||
func (h *Head) getHistogramBuffer() []record.RefHistogramSample {
|
||||
b := h.histogramsPool.Get()
|
||||
if b == nil {
|
||||
|
@ -333,7 +320,6 @@ type headAppender struct {
|
|||
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
|
||||
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
customValues []record.RefCustomValues // Custom values for histograms that use custom buckets held by this appender.
|
||||
metadata []record.RefMetadata // New metadata held by this appender.
|
||||
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
|
||||
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
|
||||
|
@ -687,12 +673,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
|||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastHistogramValue = &histogram.Histogram{}
|
||||
if histogram.IsCustomBucketsSchema(h.Schema) {
|
||||
a.customValues = append(a.customValues, record.RefCustomValues{
|
||||
Ref: s.ref,
|
||||
CustomValues: h.CustomValues,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
|
@ -729,12 +710,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
|||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||
if histogram.IsCustomBucketsSchema(fh.Schema) {
|
||||
a.customValues = append(a.customValues, record.RefCustomValues{
|
||||
Ref: s.ref,
|
||||
CustomValues: fh.CustomValues,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
|
@ -950,13 +925,6 @@ func (a *headAppender) log() error {
|
|||
return fmt.Errorf("log samples: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.customValues) > 0 {
|
||||
rec = enc.CustomValues(a.customValues, buf)
|
||||
buf = rec[:0]
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom values: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.histograms) > 0 {
|
||||
rec = enc.HistogramSamples(a.histograms, buf)
|
||||
buf = rec[:0]
|
||||
|
@ -1038,7 +1006,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
defer a.head.putAppendBuffer(a.samples)
|
||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||
defer a.head.putExemplarBuffer(a.exemplars)
|
||||
defer a.head.putCustomValuesBuffer(a.customValues)
|
||||
defer a.head.putHistogramBuffer(a.histograms)
|
||||
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||
defer a.head.putMetadataBuffer(a.metadata)
|
||||
|
@ -1066,7 +1033,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
wblSamples []record.RefSample
|
||||
wblHistograms []record.RefHistogramSample
|
||||
wblFloatHistograms []record.RefFloatHistogramSample
|
||||
wblCustomValues []record.RefCustomValues
|
||||
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
|
||||
oooMmapMarkersCount int
|
||||
oooRecords [][]byte
|
||||
|
@ -1090,7 +1056,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
wblSamples = nil
|
||||
wblHistograms = nil
|
||||
wblFloatHistograms = nil
|
||||
wblCustomValues = nil
|
||||
oooMmapMarkers = nil
|
||||
oooMmapMarkersCount = 0
|
||||
return
|
||||
|
@ -1118,10 +1083,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
r := enc.Samples(wblSamples, a.head.getBytesBuffer())
|
||||
oooRecords = append(oooRecords, r)
|
||||
}
|
||||
if len(wblCustomValues) > 0 {
|
||||
r := enc.CustomValues(wblCustomValues, a.head.getBytesBuffer())
|
||||
oooRecords = append(oooRecords, r)
|
||||
}
|
||||
if len(wblHistograms) > 0 {
|
||||
r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer())
|
||||
oooRecords = append(oooRecords, r)
|
||||
|
@ -1134,7 +1095,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
wblSamples = nil
|
||||
wblHistograms = nil
|
||||
wblFloatHistograms = nil
|
||||
wblCustomValues = nil
|
||||
oooMmapMarkers = nil
|
||||
}
|
||||
for i, s := range a.samples {
|
||||
|
@ -1290,12 +1250,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
}
|
||||
if ok {
|
||||
wblHistograms = append(wblHistograms, s)
|
||||
if histogram.IsCustomBucketsSchema(s.H.Schema) {
|
||||
wblCustomValues = append(wblCustomValues, record.RefCustomValues{
|
||||
Ref: s.Ref,
|
||||
CustomValues: s.H.CustomValues,
|
||||
})
|
||||
}
|
||||
if s.T < oooMinT {
|
||||
oooMinT = s.T
|
||||
}
|
||||
|
@ -1392,12 +1346,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
}
|
||||
if ok {
|
||||
wblFloatHistograms = append(wblFloatHistograms, s)
|
||||
if histogram.IsCustomBucketsSchema(s.FH.Schema) {
|
||||
wblCustomValues = append(wblCustomValues, record.RefCustomValues{
|
||||
Ref: s.Ref,
|
||||
CustomValues: s.FH.CustomValues,
|
||||
})
|
||||
}
|
||||
if s.T < oooMinT {
|
||||
oooMinT = s.T
|
||||
}
|
||||
|
@ -1934,7 +1882,6 @@ func (a *headAppender) Rollback() (err error) {
|
|||
}
|
||||
a.head.putAppendBuffer(a.samples)
|
||||
a.head.putExemplarBuffer(a.exemplars)
|
||||
a.head.putCustomValuesBuffer(a.customValues)
|
||||
a.head.putHistogramBuffer(a.histograms)
|
||||
a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||
a.head.putMetadataBuffer(a.metadata)
|
||||
|
|
189
tsdb/head_wal.go
189
tsdb/head_wal.go
|
@ -58,7 +58,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
var unknownExemplarRefs atomic.Uint64
|
||||
var unknownHistogramRefs atomic.Uint64
|
||||
var unknownMetadataRefs atomic.Uint64
|
||||
var unknownCustomValuesRefs atomic.Uint64
|
||||
// Track number of series records that had overlapping m-map chunks.
|
||||
var mmapOverlappingChunks atomic.Uint64
|
||||
|
||||
|
@ -82,7 +81,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||
customValuesPool zeropool.Pool[[]record.RefCustomValues]
|
||||
)
|
||||
|
||||
defer func() {
|
||||
|
@ -225,18 +223,18 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return
|
||||
}
|
||||
decoded <- meta
|
||||
case record.CustomValues:
|
||||
customVals := customValuesPool.Get()[:0]
|
||||
customVals, err := dec.CustomValues(rec, customVals)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: fmt.Errorf("decode custom values: %w", err),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
}
|
||||
return
|
||||
}
|
||||
decoded <- customVals
|
||||
//case record.CustomValues:
|
||||
// customVals := customValuesPool.Get()[:0]
|
||||
// customVals, err := dec.CustomValues(rec, customVals)
|
||||
// if err != nil {
|
||||
// decodeErr = &wlog.CorruptionErr{
|
||||
// Err: fmt.Errorf("decode custom values: %w", err),
|
||||
// Segment: r.Segment(),
|
||||
// Offset: r.Offset(),
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// decoded <- customVals
|
||||
default:
|
||||
// Noop.
|
||||
}
|
||||
|
@ -345,19 +343,19 @@ Outer:
|
|||
if r, ok := multiRef[sam.Ref]; ok {
|
||||
sam.Ref = r
|
||||
}
|
||||
if histogram.IsCustomBucketsSchema(sam.H.Schema) {
|
||||
ms := h.series.getByID(sam.Ref)
|
||||
if ms == nil {
|
||||
unknownHistogramRefs.Inc()
|
||||
continue
|
||||
}
|
||||
|
||||
if ms.lastFloatHistogramValue != nil {
|
||||
sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||
} else {
|
||||
sam.H.CustomValues = ms.customValues
|
||||
}
|
||||
}
|
||||
//if histogram.IsCustomBucketsSchema(sam.H.Schema) {
|
||||
// ms := h.series.getByID(sam.Ref)
|
||||
// if ms == nil {
|
||||
// unknownHistogramRefs.Inc()
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// if ms.lastFloatHistogramValue != nil {
|
||||
// sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||
// } else {
|
||||
// sam.H.CustomValues = ms.customValues
|
||||
// }
|
||||
//}
|
||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
||||
}
|
||||
|
@ -394,14 +392,14 @@ Outer:
|
|||
if r, ok := multiRef[sam.Ref]; ok {
|
||||
sam.Ref = r
|
||||
}
|
||||
if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
|
||||
ms := h.series.getByID(sam.Ref)
|
||||
if ms == nil {
|
||||
unknownHistogramRefs.Inc()
|
||||
continue
|
||||
}
|
||||
sam.FH.CustomValues = ms.customValues
|
||||
}
|
||||
//if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
|
||||
// ms := h.series.getByID(sam.Ref)
|
||||
// if ms == nil {
|
||||
// unknownHistogramRefs.Inc()
|
||||
// continue
|
||||
// }
|
||||
// sam.FH.CustomValues = ms.customValues
|
||||
//}
|
||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
||||
}
|
||||
|
@ -428,29 +426,11 @@ Outer:
|
|||
}
|
||||
}
|
||||
metadataPool.Put(v)
|
||||
case []record.RefCustomValues:
|
||||
for _, cv := range v {
|
||||
s := h.series.getByID(cv.Ref)
|
||||
if s == nil {
|
||||
unknownCustomValuesRefs.Inc()
|
||||
continue
|
||||
}
|
||||
//TODO: do we actually want to check lastFloatHistogramValue?
|
||||
if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.customValues = cv.CustomValues
|
||||
}
|
||||
}
|
||||
customValuesPool.Put(v)
|
||||
// iterate over custom value records and do same things as for series/samples - put them in correct processor
|
||||
// processor depends on series ref
|
||||
// something like this:
|
||||
// idx := uint64(mSeries.ref) % uint64(concurrency)
|
||||
// processors[idx].input <- walSubsetProcessorInputItem{customValues: }
|
||||
//for _, cv := range v {
|
||||
// idx := uint64(cv.Ref) % uint64(concurrency)
|
||||
// processors[idx].input <- walSubsetProcessorInputItem{customValues: cv}
|
||||
//}
|
||||
//customValuesPool.Put(v)
|
||||
///if histogram.IsCustomBucketsSchema(s.h.Schema) {
|
||||
// // if ms.lastHistogramValue != nil {
|
||||
// //
|
||||
// // }
|
||||
// //}
|
||||
default:
|
||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||
}
|
||||
|
@ -755,11 +735,6 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return []record.RefFloatHistogramSample{}
|
||||
},
|
||||
}
|
||||
customValuesPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return []record.RefCustomValues{}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
defer func() {
|
||||
|
@ -840,18 +815,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return
|
||||
}
|
||||
decodedCh <- hists
|
||||
case record.CustomValues:
|
||||
customVals := customValuesPool.Get().([]record.RefCustomValues)[:0]
|
||||
customVals, err := dec.CustomValues(rec, customVals)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
Err: fmt.Errorf("decode custom values: %w", err),
|
||||
Segment: r.Segment(),
|
||||
Offset: r.Offset(),
|
||||
}
|
||||
return
|
||||
}
|
||||
decodedCh <- customVals
|
||||
//case record.CustomValues:
|
||||
// customVals := customValuesPool.Get().([]record.RefCustomValues)[:0]
|
||||
// customVals, err := dec.CustomValues(rec, customVals)
|
||||
// if err != nil {
|
||||
// decodeErr = &wlog.CorruptionErr{
|
||||
// Err: fmt.Errorf("decode custom values: %w", err),
|
||||
// Segment: r.Segment(),
|
||||
// Offset: r.Offset(),
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// decodedCh <- customVals
|
||||
default:
|
||||
// Noop.
|
||||
}
|
||||
|
@ -936,18 +911,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
if r, ok := multiRef[sam.Ref]; ok {
|
||||
sam.Ref = r
|
||||
}
|
||||
if histogram.IsCustomBucketsSchema(sam.H.Schema) {
|
||||
ms := h.series.getByID(sam.Ref)
|
||||
if ms == nil {
|
||||
unknownHistogramRefs.Inc()
|
||||
continue
|
||||
}
|
||||
if ms.lastFloatHistogramValue != nil {
|
||||
sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||
} else {
|
||||
sam.H.CustomValues = ms.customValues
|
||||
}
|
||||
}
|
||||
//if histogram.IsCustomBucketsSchema(sam.H.Schema) {
|
||||
// ms := h.series.getByID(sam.Ref)
|
||||
// if ms == nil {
|
||||
// unknownHistogramRefs.Inc()
|
||||
// continue
|
||||
// }
|
||||
// if ms.lastFloatHistogramValue != nil {
|
||||
// sam.H.CustomValues = ms.lastFloatHistogramValue.CustomValues
|
||||
// } else {
|
||||
// sam.H.CustomValues = ms.customValues
|
||||
// }
|
||||
//}
|
||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
|
||||
}
|
||||
|
@ -980,14 +955,14 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
if r, ok := multiRef[sam.Ref]; ok {
|
||||
sam.Ref = r
|
||||
}
|
||||
if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
|
||||
ms := h.series.getByID(sam.Ref)
|
||||
if ms == nil {
|
||||
unknownHistogramRefs.Inc()
|
||||
continue
|
||||
}
|
||||
sam.FH.CustomValues = ms.customValues
|
||||
}
|
||||
//if histogram.IsCustomBucketsSchema(sam.FH.Schema) {
|
||||
// ms := h.series.getByID(sam.Ref)
|
||||
// if ms == nil {
|
||||
// unknownHistogramRefs.Inc()
|
||||
// continue
|
||||
// }
|
||||
// sam.FH.CustomValues = ms.customValues
|
||||
//}
|
||||
mod := uint64(sam.Ref) % uint64(concurrency)
|
||||
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
|
||||
}
|
||||
|
@ -1001,18 +976,18 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
floatHistogramSamplesPool.Put(v) //nolint:staticcheck
|
||||
|
||||
case []record.RefCustomValues:
|
||||
for _, cv := range v {
|
||||
s := h.series.getByID(cv.Ref)
|
||||
if s == nil {
|
||||
unknownCustomValuesRefs.Inc()
|
||||
continue
|
||||
}
|
||||
if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.customValues = cv.CustomValues
|
||||
}
|
||||
}
|
||||
customValuesPool.Put(v)
|
||||
//case []record.RefCustomValues:
|
||||
// for _, cv := range v {
|
||||
// s := h.series.getByID(cv.Ref)
|
||||
// if s == nil {
|
||||
// unknownCustomValuesRefs.Inc()
|
||||
// continue
|
||||
// }
|
||||
// if s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
// s.customValues = cv.CustomValues
|
||||
// }
|
||||
// }
|
||||
// customValuesPool.Put(v)
|
||||
default:
|
||||
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ const (
|
|||
HistogramSamples Type = 7
|
||||
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
|
||||
FloatHistogramSamples Type = 8
|
||||
CustomValues Type = 9
|
||||
)
|
||||
|
||||
func (rt Type) String() string {
|
||||
|
@ -73,8 +72,6 @@ func (rt Type) String() string {
|
|||
return "mmapmarkers"
|
||||
case Metadata:
|
||||
return "metadata"
|
||||
case CustomValues:
|
||||
return "custom_values"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
|
@ -150,11 +147,6 @@ type RefSeries struct {
|
|||
Labels labels.Labels
|
||||
}
|
||||
|
||||
type RefCustomValues struct {
|
||||
Ref chunks.HeadSeriesRef
|
||||
CustomValues []float64
|
||||
}
|
||||
|
||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||
// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample.
|
||||
type RefSample struct {
|
||||
|
@ -209,13 +201,13 @@ func NewDecoder(t *labels.SymbolTable) Decoder { // FIXME remove t
|
|||
}
|
||||
|
||||
// Type returns the type of the record.
|
||||
// Returns RecordUnknown if no valid record type is found.
|
||||
// Returns Unknown if no valid record type is found.
|
||||
func (d *Decoder) Type(rec []byte) Type {
|
||||
if len(rec) < 1 {
|
||||
return Unknown
|
||||
}
|
||||
switch t := Type(rec[0]); t {
|
||||
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomValues:
|
||||
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples:
|
||||
return t
|
||||
}
|
||||
return Unknown
|
||||
|
@ -513,6 +505,15 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
|
|||
for i := range h.NegativeBuckets {
|
||||
h.NegativeBuckets[i] = buf.Varint64()
|
||||
}
|
||||
|
||||
// TODO(bwplotka): This breaks compatibility (think: rollback to older version).
|
||||
l = buf.Uvarint()
|
||||
if l > 0 {
|
||||
h.CustomValues = make([]float64, l)
|
||||
}
|
||||
for i := range h.CustomValues {
|
||||
h.CustomValues[i] = buf.Be64Float64()
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
|
||||
|
@ -595,39 +596,15 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
|
|||
for i := range fh.NegativeBuckets {
|
||||
fh.NegativeBuckets[i] = buf.Be64Float64()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: optimize
|
||||
func (d *Decoder) CustomValues(rec []byte, customValues []RefCustomValues) ([]RefCustomValues, error) {
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
|
||||
if Type(dec.Byte()) != CustomValues {
|
||||
return nil, errors.New("invalid record type")
|
||||
// TODO(bwplotka): This breaks compatibility (think: rollback to older version).
|
||||
l = buf.Uvarint()
|
||||
if l > 0 {
|
||||
fh.CustomValues = make([]float64, l)
|
||||
}
|
||||
if dec.Len() == 0 {
|
||||
return customValues, nil
|
||||
for i := range fh.CustomValues {
|
||||
fh.CustomValues[i] = buf.Be64Float64()
|
||||
}
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
ref := storage.SeriesRef(dec.Be64())
|
||||
l := dec.Uvarint()
|
||||
if l > 0 {
|
||||
vals := make([]float64, l)
|
||||
for i := range vals {
|
||||
vals[i] = dec.Be64Float64()
|
||||
}
|
||||
customValues = append(customValues, RefCustomValues{
|
||||
Ref: chunks.HeadSeriesRef(ref),
|
||||
CustomValues: vals,
|
||||
})
|
||||
}
|
||||
}
|
||||
if dec.Err() != nil {
|
||||
return nil, dec.Err()
|
||||
}
|
||||
if len(dec.B) > 0 {
|
||||
return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return customValues, nil
|
||||
}
|
||||
|
||||
// Encoder encodes series, sample, and tombstones records.
|
||||
|
@ -813,6 +790,13 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
|
|||
for _, b := range h.NegativeBuckets {
|
||||
buf.PutVarint64(b)
|
||||
}
|
||||
|
||||
// TODO(bwplotka): This breaks compatibility (think: rollback to older version).
|
||||
// Should we version records (e.g. using type?)
|
||||
buf.PutUvarint(len(h.CustomValues))
|
||||
for _, v := range h.CustomValues {
|
||||
buf.PutBEFloat64(v)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
|
||||
|
@ -871,28 +855,11 @@ func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) {
|
|||
for _, b := range h.NegativeBuckets {
|
||||
buf.PutBEFloat64(b)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Encoder) CustomValues(customValues []RefCustomValues, b []byte) []byte {
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(CustomValues))
|
||||
|
||||
if len(customValues) == 0 {
|
||||
return buf.Get()
|
||||
}
|
||||
|
||||
for _, v := range customValues {
|
||||
buf.PutBE64(uint64(v.Ref))
|
||||
EncodeCustomValues(&buf, v.CustomValues)
|
||||
}
|
||||
|
||||
return buf.Get()
|
||||
}
|
||||
|
||||
// TODO: optimize
|
||||
func EncodeCustomValues(buf *encoding.Encbuf, values []float64) {
|
||||
buf.PutUvarint(len(values))
|
||||
for _, v := range values {
|
||||
// TODO(bwplotka): This breaks compatibility (think: rollback to older version).
|
||||
// Should we version records (e.g. using type?)
|
||||
buf.PutUvarint(len(h.CustomValues))
|
||||
for _, v := range h.CustomValues {
|
||||
buf.PutBEFloat64(v)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,21 +182,59 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||
|
||||
// Custom values for histograms
|
||||
customValues := []RefCustomValues{
|
||||
// NHCB.
|
||||
histograms = []RefHistogramSample{
|
||||
{
|
||||
Ref: 56,
|
||||
CustomValues: []float64{0, 1, 2, 3, 4},
|
||||
Ref: 56,
|
||||
T: 1234,
|
||||
H: &histogram.Histogram{
|
||||
Count: 5,
|
||||
Sum: 18.4 * rand.Float64(),
|
||||
Schema: histogram.CustomBucketsSchema,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 1, -1, 0},
|
||||
CustomValues: []float64{
|
||||
0, 100, 1000, 10000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Ref: 42,
|
||||
CustomValues: []float64{5, 10, 15, 20, 25},
|
||||
Ref: 42,
|
||||
T: 5678,
|
||||
H: &histogram.Histogram{
|
||||
Count: 11,
|
||||
Sum: 35.5,
|
||||
Schema: histogram.CustomBucketsSchema,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 1, -1, 0},
|
||||
CustomValues: []float64{
|
||||
0, 100, 1000, 10000,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
decCustomValues, err := dec.CustomValues(enc.CustomValues(customValues, nil), nil)
|
||||
decHistograms, err = dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, customValues, decCustomValues)
|
||||
require.Equal(t, histograms, decHistograms)
|
||||
|
||||
floatHistograms = make([]RefFloatHistogramSample, len(histograms))
|
||||
for i, h := range histograms {
|
||||
floatHistograms[i] = RefFloatHistogramSample{
|
||||
Ref: h.Ref,
|
||||
T: h.T,
|
||||
FH: h.H.ToFloat(nil),
|
||||
}
|
||||
}
|
||||
decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||
}
|
||||
|
||||
// TestRecord_Corrupted ensures that corrupted records return the correct error.
|
||||
|
@ -285,15 +323,6 @@ func TestRecord_Corrupted(t *testing.T) {
|
|||
_, err := dec.HistogramSamples(corrupted, nil)
|
||||
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||
})
|
||||
|
||||
t.Run("Test corrupted customValues record", func(t *testing.T) {
|
||||
customValues := []RefCustomValues{
|
||||
{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}},
|
||||
}
|
||||
corrupted := enc.CustomValues(customValues, nil)[:8]
|
||||
_, err := dec.CustomValues(corrupted, nil)
|
||||
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRecord_Type(t *testing.T) {
|
||||
|
@ -337,10 +366,6 @@ func TestRecord_Type(t *testing.T) {
|
|||
recordType = dec.Type(enc.HistogramSamples(histograms, nil))
|
||||
require.Equal(t, HistogramSamples, recordType)
|
||||
|
||||
customValues := []RefCustomValues{{Ref: 56, CustomValues: []float64{0, 1, 2, 3, 4}}}
|
||||
recordType = dec.Type(enc.CustomValues(customValues, nil))
|
||||
require.Equal(t, CustomValues, recordType)
|
||||
|
||||
recordType = dec.Type(nil)
|
||||
require.Equal(t, Unknown, recordType)
|
||||
|
||||
|
|
|
@ -151,7 +151,6 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
samples []record.RefSample
|
||||
histogramSamples []record.RefHistogramSample
|
||||
floatHistogramSamples []record.RefFloatHistogramSample
|
||||
customValues []record.RefCustomValues
|
||||
tstones []tombstones.Stone
|
||||
exemplars []record.RefExemplar
|
||||
metadata []record.RefMetadata
|
||||
|
@ -300,18 +299,18 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
}
|
||||
stats.TotalMetadata += len(metadata)
|
||||
stats.DroppedMetadata += len(metadata) - repl
|
||||
case record.CustomValues:
|
||||
customValues, err = dec.CustomValues(rec, customValues)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode custom values: %w", err)
|
||||
}
|
||||
repl := customValues[:0]
|
||||
for _, v := range customValues {
|
||||
repl = append(repl, v)
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
buf = enc.CustomValues(repl, buf)
|
||||
}
|
||||
//case record.CustomValues:
|
||||
// customValues, err = dec.CustomValues(rec, customValues)
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("decode custom values: %w", err)
|
||||
// }
|
||||
// repl := customValues[:0]
|
||||
// for _, v := range customValues {
|
||||
// repl = append(repl, v)
|
||||
// }
|
||||
// if len(repl) > 0 {
|
||||
// buf = enc.CustomValues(repl, buf)
|
||||
// }
|
||||
default:
|
||||
// Unknown record type, probably from a future Prometheus version.
|
||||
continue
|
||||
|
|
Loading…
Reference in a new issue