mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Add separate handling for histograms and custom bucket histograms
This commit is contained in:
parent
37df50adb9
commit
454f6d39ca
|
@ -1154,40 +1154,35 @@ func (a *appender) log() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(a.pendingHistograms) > 0 {
|
if len(a.pendingHistograms) > 0 {
|
||||||
buf1, buf2 := encoder.HistogramSamples(a.pendingHistograms, buf)
|
var customBucketsExist bool
|
||||||
//buf = append(buf1, buf2...)
|
buf, customBucketsExist = encoder.HistogramSamples(a.pendingHistograms, buf)
|
||||||
//if err := a.wal.Log(buf); err != nil {
|
if err := a.wal.Log(buf); err != nil {
|
||||||
// return err
|
return err
|
||||||
//}
|
|
||||||
if len(buf1) > 0 {
|
|
||||||
buf = buf1[:0]
|
|
||||||
if err := a.wal.Log(buf1); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if len(buf2) > 0 {
|
buf = buf[:0]
|
||||||
buf = buf2[:0]
|
if customBucketsExist {
|
||||||
if err := a.wal.Log(buf2); err != nil {
|
buf = encoder.CustomBucketHistogramSamples(a.pendingHistograms, buf)
|
||||||
|
if err := a.wal.Log(buf); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
buf = buf[:0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(a.pendingFloatHistograms) > 0 {
|
if len(a.pendingFloatHistograms) > 0 {
|
||||||
buf1, buf2 := encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
|
var customBucketsExist bool
|
||||||
if len(buf1) > 0 {
|
buf, customBucketsExist = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
|
||||||
buf = buf1[:0]
|
if err := a.wal.Log(buf); err != nil {
|
||||||
if err := a.wal.Log(buf1); err != nil {
|
return err
|
||||||
|
}
|
||||||
|
buf = buf[:0]
|
||||||
|
if customBucketsExist {
|
||||||
|
buf = encoder.CustomBucketFloatHistogramSamples(a.pendingFloatHistograms, buf)
|
||||||
|
if err := a.wal.Log(buf); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
buf = buf[:0]
|
||||||
}
|
}
|
||||||
if len(buf2) > 0 {
|
|
||||||
buf = buf2[:0]
|
|
||||||
if err := a.wal.Log(buf2); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//buf = buf[:0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(a.pendingExamplars) > 0 {
|
if len(a.pendingExamplars) > 0 {
|
||||||
|
|
|
@ -163,6 +163,18 @@ func TestCommit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lbls = labelsForTest(t.Name()+"_custom_bucket_histogram", numSeries)
|
||||||
|
for _, l := range lbls {
|
||||||
|
lset := labels.New(l...)
|
||||||
|
|
||||||
|
customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
|
||||||
|
|
||||||
|
for i := 0; i < numHistograms; i++ {
|
||||||
|
_, err := app.AppendHistogram(0, lset, int64(i), customBucketHistograms[i], nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
|
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
|
||||||
for _, l := range lbls {
|
for _, l := range lbls {
|
||||||
lset := labels.New(l...)
|
lset := labels.New(l...)
|
||||||
|
@ -175,6 +187,18 @@ func TestCommit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lbls = labelsForTest(t.Name()+"custom_bucket_float_histogram", numSeries)
|
||||||
|
for _, l := range lbls {
|
||||||
|
lset := labels.New(l...)
|
||||||
|
|
||||||
|
customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
|
||||||
|
|
||||||
|
for i := 0; i < numHistograms; i++ {
|
||||||
|
_, err := app.AppendHistogram(0, lset, int64(i), nil, customBucketFloatHistograms[i])
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
|
|
||||||
|
@ -230,11 +254,11 @@ func TestCommit(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the WAL contained the same number of committed series/samples/exemplars.
|
// Check that the WAL contained the same number of committed series/samples/exemplars.
|
||||||
require.Equal(t, numSeries*3, walSeriesCount, "unexpected number of series")
|
require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series")
|
||||||
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
|
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
|
||||||
require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
|
require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
|
||||||
require.Equal(t, numSeries*numHistograms, walHistogramCount, "unexpected number of histograms")
|
require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms")
|
||||||
require.Equal(t, numSeries*numHistograms, walFloatHistogramCount, "unexpected number of float histograms")
|
require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRollback(t *testing.T) {
|
func TestRollback(t *testing.T) {
|
||||||
|
|
|
@ -942,45 +942,30 @@ func (a *headAppender) log() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(a.histograms) > 0 {
|
if len(a.histograms) > 0 {
|
||||||
rec1, rec2 := enc.HistogramSamples(a.histograms, buf)
|
rec, customBucketsExist := enc.HistogramSamples(a.histograms, buf)
|
||||||
//rec = append(rec1, rec2...)
|
buf = rec[:0]
|
||||||
//
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
//buf = rec[:0]
|
return fmt.Errorf("log histograms: %w", err)
|
||||||
//
|
|
||||||
//if err := a.head.wal.Log(rec); err != nil {
|
|
||||||
// return fmt.Errorf("log samples: %w", err)
|
|
||||||
//}
|
|
||||||
if len(rec1) != 0 {
|
|
||||||
buf = rec1[:0]
|
|
||||||
if err := a.head.wal.Log(rec1); err != nil {
|
|
||||||
return fmt.Errorf("log histograms: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if len(rec2) != 0 {
|
|
||||||
buf = rec2[:0]
|
if customBucketsExist {
|
||||||
if err := a.head.wal.Log(rec2); err != nil {
|
enc.CustomBucketHistogramSamples(a.histograms, buf)
|
||||||
|
buf = rec[:0]
|
||||||
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
return fmt.Errorf("log custom bucket histograms: %w", err)
|
return fmt.Errorf("log custom bucket histograms: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(a.floatHistograms) > 0 {
|
if len(a.floatHistograms) > 0 {
|
||||||
rec1, rec2 := enc.FloatHistogramSamples(a.floatHistograms, buf)
|
rec, customBucketsExist := enc.FloatHistogramSamples(a.floatHistograms, buf)
|
||||||
//rec = append(rec1, rec2...)
|
buf = rec[:0]
|
||||||
//
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
//buf = rec[:0]
|
return fmt.Errorf("log float histograms: %w", err)
|
||||||
//
|
|
||||||
//if err := a.head.wal.Log(rec); err != nil {
|
|
||||||
// return fmt.Errorf("log samples: %w", err)
|
|
||||||
//}
|
|
||||||
if len(rec1) != 0 {
|
|
||||||
buf = rec1[:0]
|
|
||||||
if err := a.head.wal.Log(rec1); err != nil {
|
|
||||||
return fmt.Errorf("log float histograms: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if len(rec2) != 0 {
|
|
||||||
buf = rec2[:0]
|
if customBucketsExist {
|
||||||
if err := a.head.wal.Log(rec2); err != nil {
|
buf = rec[:0]
|
||||||
|
if err := a.head.wal.Log(rec); err != nil {
|
||||||
return fmt.Errorf("log custom bucket float histograms: %w", err)
|
return fmt.Errorf("log custom bucket float histograms: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,10 +193,6 @@ type RefFloatHistogramSample struct {
|
||||||
FH *histogram.FloatHistogram
|
FH *histogram.FloatHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
type RefCustomBucketHistogramSample struct {
|
|
||||||
RefHistogramSample
|
|
||||||
}
|
|
||||||
|
|
||||||
// RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk.
|
// RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk.
|
||||||
type RefMmapMarker struct {
|
type RefMmapMarker struct {
|
||||||
Ref chunks.HeadSeriesRef
|
Ref chunks.HeadSeriesRef
|
||||||
|
@ -748,15 +744,12 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte {
|
||||||
return buf.Get()
|
return buf.Get()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []byte) {
|
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, bool) {
|
||||||
buf := encoding.Encbuf{B: b}
|
buf := encoding.Encbuf{B: b}
|
||||||
buf.PutByte(byte(HistogramSamples))
|
buf.PutByte(byte(HistogramSamples))
|
||||||
|
|
||||||
customBucketHistBuf := encoding.Encbuf{B: b}
|
|
||||||
customBucketHistBuf.PutByte(byte(CustomBucketHistogramSamples))
|
|
||||||
|
|
||||||
if len(histograms) == 0 {
|
if len(histograms) == 0 {
|
||||||
return buf.Get(), customBucketHistBuf.Get()
|
return buf.Get(), false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store base timestamp and base reference number of first histogram.
|
// Store base timestamp and base reference number of first histogram.
|
||||||
|
@ -765,34 +758,46 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([
|
||||||
buf.PutBE64(uint64(first.Ref))
|
buf.PutBE64(uint64(first.Ref))
|
||||||
buf.PutBE64int64(first.T)
|
buf.PutBE64int64(first.T)
|
||||||
|
|
||||||
customBucketHistBuf.PutBE64(uint64(first.Ref))
|
customBucketSamplesExist := false
|
||||||
customBucketHistBuf.PutBE64int64(first.T)
|
|
||||||
|
|
||||||
histsAdded := 0
|
|
||||||
customBucketHistsAdded := 0
|
|
||||||
for _, h := range histograms {
|
for _, h := range histograms {
|
||||||
if h.H.UsesCustomBuckets() {
|
if h.H.UsesCustomBuckets() {
|
||||||
customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
customBucketSamplesExist = true
|
||||||
customBucketHistBuf.PutVarint64(h.T - first.T)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
EncodeHistogram(&customBucketHistBuf, h.H)
|
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||||
customBucketHistsAdded++
|
buf.PutVarint64(h.T - first.T)
|
||||||
} else {
|
|
||||||
|
EncodeHistogram(&buf, h.H)
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf.Get(), customBucketSamplesExist
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Encoder) CustomBucketHistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
|
||||||
|
buf := encoding.Encbuf{B: b}
|
||||||
|
buf.PutByte(byte(CustomBucketHistogramSamples))
|
||||||
|
|
||||||
|
if len(histograms) == 0 {
|
||||||
|
return buf.Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store base timestamp and base reference number of first histogram.
|
||||||
|
// All histograms encode their timestamp and ref as delta to those.
|
||||||
|
first := histograms[0]
|
||||||
|
buf.PutBE64(uint64(first.Ref))
|
||||||
|
buf.PutBE64int64(first.T)
|
||||||
|
|
||||||
|
for _, h := range histograms {
|
||||||
|
if h.H.UsesCustomBuckets() {
|
||||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||||
buf.PutVarint64(h.T - first.T)
|
buf.PutVarint64(h.T - first.T)
|
||||||
|
|
||||||
EncodeHistogram(&buf, h.H)
|
EncodeHistogram(&buf, h.H)
|
||||||
histsAdded++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if customBucketHistsAdded == 0 {
|
return buf.Get()
|
||||||
customBucketHistBuf.Reset()
|
|
||||||
} else if histsAdded == 0 {
|
|
||||||
buf.Reset()
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.Get(), customBucketHistBuf.Get()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeHistogram encodes a Histogram into a byte slice.
|
// EncodeHistogram encodes a Histogram into a byte slice.
|
||||||
|
@ -836,15 +841,12 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []byte) {
|
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, bool) {
|
||||||
buf := encoding.Encbuf{B: b}
|
buf := encoding.Encbuf{B: b}
|
||||||
buf.PutByte(byte(FloatHistogramSamples))
|
buf.PutByte(byte(FloatHistogramSamples))
|
||||||
|
|
||||||
customBucketHistBuf := encoding.Encbuf{B: b}
|
|
||||||
customBucketHistBuf.PutByte(byte(CustomBucketFloatHistogramSamples))
|
|
||||||
|
|
||||||
if len(histograms) == 0 {
|
if len(histograms) == 0 {
|
||||||
return buf.Get(), customBucketHistBuf.Get()
|
return buf.Get(), false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store base timestamp and base reference number of first histogram.
|
// Store base timestamp and base reference number of first histogram.
|
||||||
|
@ -853,34 +855,46 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b
|
||||||
buf.PutBE64(uint64(first.Ref))
|
buf.PutBE64(uint64(first.Ref))
|
||||||
buf.PutBE64int64(first.T)
|
buf.PutBE64int64(first.T)
|
||||||
|
|
||||||
customBucketHistBuf.PutBE64(uint64(first.Ref))
|
customBucketsExist := false
|
||||||
customBucketHistBuf.PutBE64int64(first.T)
|
|
||||||
|
|
||||||
histsAdded := 0
|
|
||||||
customBucketHistsAdded := 0
|
|
||||||
for _, h := range histograms {
|
for _, h := range histograms {
|
||||||
if h.FH.UsesCustomBuckets() {
|
if h.FH.UsesCustomBuckets() {
|
||||||
customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
customBucketsExist = true
|
||||||
customBucketHistBuf.PutVarint64(h.T - first.T)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
EncodeFloatHistogram(&customBucketHistBuf, h.FH)
|
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||||
customBucketHistsAdded++
|
buf.PutVarint64(h.T - first.T)
|
||||||
} else {
|
|
||||||
|
EncodeFloatHistogram(&buf, h.FH)
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf.Get(), customBucketsExist
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Encoder) CustomBucketFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
|
||||||
|
buf := encoding.Encbuf{B: b}
|
||||||
|
buf.PutByte(byte(CustomBucketFloatHistogramSamples))
|
||||||
|
|
||||||
|
if len(histograms) == 0 {
|
||||||
|
return buf.Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store base timestamp and base reference number of first histogram.
|
||||||
|
// All histograms encode their timestamp and ref as delta to those.
|
||||||
|
first := histograms[0]
|
||||||
|
buf.PutBE64(uint64(first.Ref))
|
||||||
|
buf.PutBE64int64(first.T)
|
||||||
|
|
||||||
|
for _, h := range histograms {
|
||||||
|
if h.FH.UsesCustomBuckets() {
|
||||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||||
buf.PutVarint64(h.T - first.T)
|
buf.PutVarint64(h.T - first.T)
|
||||||
|
|
||||||
EncodeFloatHistogram(&buf, h.FH)
|
EncodeFloatHistogram(&buf, h.FH)
|
||||||
histsAdded++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if customBucketHistsAdded == 0 {
|
return buf.Get()
|
||||||
customBucketHistBuf.Reset()
|
|
||||||
} else if histsAdded == 0 {
|
|
||||||
buf.Reset()
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.Get(), customBucketHistBuf.Get()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeFloatHistogram encodes the Float Histogram into a byte slice.
|
// EncodeFloatHistogram encodes the Float Histogram into a byte slice.
|
||||||
|
|
|
@ -166,12 +166,12 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
histSamples, customBucketHistSamples := enc.HistogramSamples(histograms, nil)
|
histSamples, _ := enc.HistogramSamples(histograms, nil)
|
||||||
|
customBucketHistSamples := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||||
decHistograms, err := dec.HistogramSamples(histSamples, nil)
|
decHistograms, err := dec.HistogramSamples(histSamples, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
decCustomBucketHistograms, err := dec.HistogramSamples(customBucketHistSamples, nil)
|
decCustomBucketHistSamples, err := dec.HistogramSamples(customBucketHistSamples, nil)
|
||||||
require.NoError(t, err)
|
decHistograms = append(decHistograms, decCustomBucketHistSamples...)
|
||||||
decHistograms = append(decHistograms, decCustomBucketHistograms...)
|
|
||||||
require.Equal(t, histograms, decHistograms)
|
require.Equal(t, histograms, decHistograms)
|
||||||
|
|
||||||
floatHistograms := make([]RefFloatHistogramSample, len(histograms))
|
floatHistograms := make([]RefFloatHistogramSample, len(histograms))
|
||||||
|
@ -182,10 +182,12 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
||||||
FH: h.H.ToFloat(nil),
|
FH: h.H.ToFloat(nil),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
histSamples, customBucketFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
|
floatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil)
|
||||||
decFloatHistograms, err := dec.FloatHistogramSamples(histSamples, nil)
|
customBucketFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil)
|
||||||
|
decFloatHistograms, err := dec.FloatHistogramSamples(floatHistSamples, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil)
|
decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...)
|
decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...)
|
||||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||||
|
|
||||||
|
@ -194,7 +196,8 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
||||||
histograms[i].H.CounterResetHint = histogram.GaugeType
|
histograms[i].H.CounterResetHint = histogram.GaugeType
|
||||||
}
|
}
|
||||||
|
|
||||||
gaugeHistSamples, customBucketGaugeHistSamples := enc.HistogramSamples(histograms, nil)
|
gaugeHistSamples, _ := enc.HistogramSamples(histograms, nil)
|
||||||
|
customBucketGaugeHistSamples := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||||
decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil)
|
decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil)
|
decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil)
|
||||||
|
@ -207,10 +210,12 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
||||||
floatHistograms[i].FH.CounterResetHint = histogram.GaugeType
|
floatHistograms[i].FH.CounterResetHint = histogram.GaugeType
|
||||||
}
|
}
|
||||||
|
|
||||||
gaugeHistSamples, customBucketGaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
|
gaugeFloatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil)
|
||||||
decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeHistSamples, nil)
|
customBucketGaugeFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil)
|
||||||
|
decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeFloatHistSamples, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil)
|
decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...)
|
decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...)
|
||||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||||
}
|
}
|
||||||
|
@ -295,10 +300,27 @@ func TestRecord_Corrupted(t *testing.T) {
|
||||||
PositiveBuckets: []int64{1, 1, -1, 0},
|
PositiveBuckets: []int64{1, 1, -1, 0},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Ref: 67,
|
||||||
|
T: 5678,
|
||||||
|
H: &histogram.Histogram{
|
||||||
|
Count: 8,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 35.5,
|
||||||
|
Schema: -53,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 2, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{2, -1, 2, 0},
|
||||||
|
CustomValues: []float64{0, 2, 4, 6, 8},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
corruptedHists, corruptedCustomBucketHists := enc.HistogramSamples(histograms, nil)
|
corruptedHists, _ := enc.HistogramSamples(histograms, nil)
|
||||||
corruptedHists = corruptedHists[:8]
|
corruptedHists = corruptedHists[:8]
|
||||||
|
corruptedCustomBucketHists := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||||
corruptedCustomBucketHists = corruptedCustomBucketHists[:8]
|
corruptedCustomBucketHists = corruptedCustomBucketHists[:8]
|
||||||
_, err := dec.HistogramSamples(corruptedHists, nil)
|
_, err := dec.HistogramSamples(corruptedHists, nil)
|
||||||
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||||
|
@ -361,9 +383,10 @@ func TestRecord_Type(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
hists, customBucketHists := enc.HistogramSamples(histograms, nil)
|
hists, _ := enc.HistogramSamples(histograms, nil)
|
||||||
recordType = dec.Type(hists)
|
recordType = dec.Type(hists)
|
||||||
require.Equal(t, HistogramSamples, recordType)
|
require.Equal(t, HistogramSamples, recordType)
|
||||||
|
customBucketHists := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||||
recordType = dec.Type(customBucketHists)
|
recordType = dec.Type(customBucketHists)
|
||||||
require.Equal(t, CustomBucketHistogramSamples, recordType)
|
require.Equal(t, CustomBucketHistogramSamples, recordType)
|
||||||
|
|
||||||
|
|
|
@ -238,7 +238,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(repl) > 0 {
|
if len(repl) > 0 {
|
||||||
_, buf = enc.HistogramSamples(repl, buf)
|
buf = enc.CustomBucketHistogramSamples(repl, buf)
|
||||||
}
|
}
|
||||||
stats.TotalSamples += len(histogramSamples)
|
stats.TotalSamples += len(histogramSamples)
|
||||||
stats.DroppedSamples += len(histogramSamples) - len(repl)
|
stats.DroppedSamples += len(histogramSamples) - len(repl)
|
||||||
|
@ -272,7 +272,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(repl) > 0 {
|
if len(repl) > 0 {
|
||||||
_, buf = enc.FloatHistogramSamples(repl, buf)
|
buf = enc.CustomBucketFloatHistogramSamples(repl, buf)
|
||||||
}
|
}
|
||||||
stats.TotalSamples += len(floatHistogramSamples)
|
stats.TotalSamples += len(floatHistogramSamples)
|
||||||
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
|
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
|
||||||
|
|
|
@ -127,6 +127,20 @@ func TestCheckpoint(t *testing.T) {
|
||||||
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
|
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
makeCustomBucketHistogram := func(i int) *histogram.Histogram {
|
||||||
|
return &histogram.Histogram{
|
||||||
|
Count: 5 + uint64(i*4),
|
||||||
|
ZeroCount: 2 + uint64(i),
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 18.4 * float64(i+1),
|
||||||
|
Schema: -53,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
CustomValues: []float64{0, 1, 2, 3, 4},
|
||||||
|
}
|
||||||
|
}
|
||||||
makeFloatHistogram := func(i int) *histogram.FloatHistogram {
|
makeFloatHistogram := func(i int) *histogram.FloatHistogram {
|
||||||
return &histogram.FloatHistogram{
|
return &histogram.FloatHistogram{
|
||||||
Count: 5 + float64(i*4),
|
Count: 5 + float64(i*4),
|
||||||
|
@ -141,6 +155,20 @@ func TestCheckpoint(t *testing.T) {
|
||||||
PositiveBuckets: []float64{float64(i + 1), 1, -1, 0},
|
PositiveBuckets: []float64{float64(i + 1), 1, -1, 0},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
makeCustomBucketFloatHistogram := func(i int) *histogram.FloatHistogram {
|
||||||
|
return &histogram.FloatHistogram{
|
||||||
|
Count: 5 + float64(i*4),
|
||||||
|
ZeroCount: 2 + float64(i),
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 18.4 * float64(i+1),
|
||||||
|
Schema: -53,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
CustomValues: []float64{0, 1, 2, 3, 4},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
|
@ -208,24 +236,40 @@ func TestCheckpoint(t *testing.T) {
|
||||||
require.NoError(t, w.Log(b))
|
require.NoError(t, w.Log(b))
|
||||||
samplesInWAL += 4
|
samplesInWAL += 4
|
||||||
h := makeHistogram(i)
|
h := makeHistogram(i)
|
||||||
b1, b2 := enc.HistogramSamples([]record.RefHistogramSample{
|
b, _ = enc.HistogramSamples([]record.RefHistogramSample{
|
||||||
{Ref: 0, T: last, H: h},
|
{Ref: 0, T: last, H: h},
|
||||||
{Ref: 1, T: last + 10000, H: h},
|
{Ref: 1, T: last + 10000, H: h},
|
||||||
{Ref: 2, T: last + 20000, H: h},
|
{Ref: 2, T: last + 20000, H: h},
|
||||||
{Ref: 3, T: last + 30000, H: h},
|
{Ref: 3, T: last + 30000, H: h},
|
||||||
}, nil)
|
}, nil)
|
||||||
require.NoError(t, w.Log(b1))
|
require.NoError(t, w.Log(b))
|
||||||
require.NoError(t, w.Log(b2))
|
histogramsInWAL += 4
|
||||||
|
cbh := makeCustomBucketHistogram(i)
|
||||||
|
b = enc.CustomBucketHistogramSamples([]record.RefHistogramSample{
|
||||||
|
{Ref: 0, T: last, H: cbh},
|
||||||
|
{Ref: 1, T: last + 10000, H: cbh},
|
||||||
|
{Ref: 2, T: last + 20000, H: cbh},
|
||||||
|
{Ref: 3, T: last + 30000, H: cbh},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, w.Log(b))
|
||||||
histogramsInWAL += 4
|
histogramsInWAL += 4
|
||||||
fh := makeFloatHistogram(i)
|
fh := makeFloatHistogram(i)
|
||||||
b1, b2 = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
|
b, _ = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
|
||||||
{Ref: 0, T: last, FH: fh},
|
{Ref: 0, T: last, FH: fh},
|
||||||
{Ref: 1, T: last + 10000, FH: fh},
|
{Ref: 1, T: last + 10000, FH: fh},
|
||||||
{Ref: 2, T: last + 20000, FH: fh},
|
{Ref: 2, T: last + 20000, FH: fh},
|
||||||
{Ref: 3, T: last + 30000, FH: fh},
|
{Ref: 3, T: last + 30000, FH: fh},
|
||||||
}, nil)
|
}, nil)
|
||||||
require.NoError(t, w.Log(b1))
|
require.NoError(t, w.Log(b))
|
||||||
require.NoError(t, w.Log(b2))
|
floatHistogramsInWAL += 4
|
||||||
|
cbfh := makeCustomBucketFloatHistogram(i)
|
||||||
|
b = enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{
|
||||||
|
{Ref: 0, T: last, FH: cbfh},
|
||||||
|
{Ref: 1, T: last + 10000, FH: cbfh},
|
||||||
|
{Ref: 2, T: last + 20000, FH: cbfh},
|
||||||
|
{Ref: 3, T: last + 30000, FH: cbfh},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, w.Log(b))
|
||||||
floatHistogramsInWAL += 4
|
floatHistogramsInWAL += 4
|
||||||
|
|
||||||
b = enc.Exemplars([]record.RefExemplar{
|
b = enc.Exemplars([]record.RefExemplar{
|
||||||
|
@ -286,14 +330,14 @@ func TestCheckpoint(t *testing.T) {
|
||||||
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
|
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
|
||||||
}
|
}
|
||||||
samplesInCheckpoint += len(samples)
|
samplesInCheckpoint += len(samples)
|
||||||
case record.HistogramSamples:
|
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||||
histograms, err := dec.HistogramSamples(rec, nil)
|
histograms, err := dec.HistogramSamples(rec, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, h := range histograms {
|
for _, h := range histograms {
|
||||||
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
|
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
|
||||||
}
|
}
|
||||||
histogramsInCheckpoint += len(histograms)
|
histogramsInCheckpoint += len(histograms)
|
||||||
case record.FloatHistogramSamples:
|
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||||
floatHistograms, err := dec.FloatHistogramSamples(rec, nil)
|
floatHistograms, err := dec.FloatHistogramSamples(rec, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, h := range floatHistograms {
|
for _, h := range floatHistograms {
|
||||||
|
|
|
@ -546,7 +546,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||||
}
|
}
|
||||||
w.writer.AppendExemplars(exemplars)
|
w.writer.AppendExemplars(exemplars)
|
||||||
|
|
||||||
case record.HistogramSamples:
|
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||||
// Skip if experimental "histograms over remote write" is not enabled.
|
// Skip if experimental "histograms over remote write" is not enabled.
|
||||||
if !w.sendHistograms {
|
if !w.sendHistograms {
|
||||||
break
|
break
|
||||||
|
@ -574,7 +574,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||||
histogramsToSend = histogramsToSend[:0]
|
histogramsToSend = histogramsToSend[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
case record.FloatHistogramSamples:
|
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||||
// Skip if experimental "histograms over remote write" is not enabled.
|
// Skip if experimental "histograms over remote write" is not enabled.
|
||||||
if !w.sendHistograms {
|
if !w.sendHistograms {
|
||||||
break
|
break
|
||||||
|
|
|
@ -209,21 +209,43 @@ func TestTailSamples(t *testing.T) {
|
||||||
NegativeBuckets: []int64{int64(-i) - 1},
|
NegativeBuckets: []int64{int64(-i) - 1},
|
||||||
}
|
}
|
||||||
|
|
||||||
histogram, customBucketHistogram := enc.HistogramSamples([]record.RefHistogramSample{{
|
histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{
|
||||||
Ref: chunks.HeadSeriesRef(inner),
|
Ref: chunks.HeadSeriesRef(inner),
|
||||||
T: now.UnixNano() + 1,
|
T: now.UnixNano() + 1,
|
||||||
H: hist,
|
H: hist,
|
||||||
}}, nil)
|
}}, nil)
|
||||||
require.NoError(t, w.Log(histogram))
|
require.NoError(t, w.Log(histograms))
|
||||||
require.NoError(t, w.Log(customBucketHistogram))
|
|
||||||
|
|
||||||
floatHistogram, floatCustomBucketHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
|
customBucketHist := &histogram.Histogram{
|
||||||
|
Schema: -53,
|
||||||
|
ZeroThreshold: 1e-128,
|
||||||
|
ZeroCount: 0,
|
||||||
|
Count: 2,
|
||||||
|
Sum: 0,
|
||||||
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
||||||
|
CustomValues: []float64{float64(i) + 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
customBucketHistograms := enc.CustomBucketHistogramSamples([]record.RefHistogramSample{{
|
||||||
|
Ref: chunks.HeadSeriesRef(inner),
|
||||||
|
T: now.UnixNano() + 1,
|
||||||
|
H: customBucketHist,
|
||||||
|
}}, nil)
|
||||||
|
require.NoError(t, w.Log(customBucketHistograms))
|
||||||
|
|
||||||
|
floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
|
||||||
Ref: chunks.HeadSeriesRef(inner),
|
Ref: chunks.HeadSeriesRef(inner),
|
||||||
T: now.UnixNano() + 1,
|
T: now.UnixNano() + 1,
|
||||||
FH: hist.ToFloat(nil),
|
FH: hist.ToFloat(nil),
|
||||||
}}, nil)
|
}}, nil)
|
||||||
require.NoError(t, w.Log(floatHistogram))
|
require.NoError(t, w.Log(floatHistograms))
|
||||||
require.NoError(t, w.Log(floatCustomBucketHistogram))
|
|
||||||
|
customBucketFloatHistograms := enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{{
|
||||||
|
Ref: chunks.HeadSeriesRef(inner),
|
||||||
|
T: now.UnixNano() + 1,
|
||||||
|
FH: customBucketHist.ToFloat(nil),
|
||||||
|
}}, nil)
|
||||||
|
require.NoError(t, w.Log(customBucketFloatHistograms))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,7 +272,7 @@ func TestTailSamples(t *testing.T) {
|
||||||
expectedSeries := seriesCount
|
expectedSeries := seriesCount
|
||||||
expectedSamples := seriesCount * samplesCount
|
expectedSamples := seriesCount * samplesCount
|
||||||
expectedExemplars := seriesCount * exemplarsCount
|
expectedExemplars := seriesCount * exemplarsCount
|
||||||
expectedHistograms := seriesCount * histogramsCount
|
expectedHistograms := seriesCount * histogramsCount * 2
|
||||||
retry(t, defaultRetryInterval, defaultRetries, func() bool {
|
retry(t, defaultRetryInterval, defaultRetries, func() bool {
|
||||||
return wt.checkNumSeries() >= expectedSeries
|
return wt.checkNumSeries() >= expectedSeries
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue