histograms: Add remote-write support for Float Histograms (#11817)

* adapt code.go and write_handler.go to support float histograms
* adapt watcher.go to support float histograms
* wip adapt queue_manager.go to support float histograms
* address comments for metrics in queue_manager.go
* set test cases for queue manager
* use same counts for histograms and float histograms
* refactor createHistograms tests
* fix float histograms ref in watcher_test.go
* address PR comments

Signed-off-by: Marc Tuduri <marctc@protonmail.com>
This commit is contained in:
Marc Tudurí 2023-01-13 12:09:20 +01:00 committed by GitHub
parent 72f20d949a
commit 721f33dbb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 272 additions and 79 deletions

View file

@ -525,7 +525,7 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// represents an integer histogram and not a float histogram.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
@ -540,6 +540,23 @@ func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
}
}
// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
@ -564,6 +581,21 @@ func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.H
}
}
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []*prompb.BucketSpan {
spans := make([]*prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {

View file

@ -55,7 +55,7 @@ var writeRequestFixture = &prompb.WriteRequest{
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
},
{
Labels: []prompb.Label{
@ -67,7 +67,7 @@ var writeRequestFixture = &prompb.WriteRequest{
},
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(1, &testHistogram)},
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
},
},
}
@ -368,6 +368,7 @@ func TestNilHistogramProto(t *testing.T) {
// This function will panic if it impromperly handles nil
// values, causing the test to fail.
HistogramProtoToHistogram(prompb.Histogram{})
HistogramProtoToFloatHistogram(prompb.Histogram{})
}
func TestStreamResponse(t *testing.T) {

View file

@ -716,6 +716,53 @@ outer:
return true
}
func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample) bool {
if !t.sendNativeHistograms {
return true
}
outer:
for _, h := range floatHistograms {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond)
for {
select {
case <-t.quit:
return false
default:
}
if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls,
timestamp: h.T,
floatHistogram: h.FH,
sType: tFloatHistogram,
}) {
continue outer
}
t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff
}
}
}
return true
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
@ -1129,7 +1176,7 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
case tExemplar:
s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc()
case tHistogram:
case tHistogram, tFloatHistogram:
s.qm.metrics.pendingHistograms.Inc()
s.enqueuedHistograms.Inc()
}
@ -1154,6 +1201,7 @@ type timeSeries struct {
seriesLabels labels.Labels
value float64
histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram
timestamp int64
exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram.
@ -1166,6 +1214,7 @@ const (
tSample seriesType = iota
tExemplar
tHistogram
tFloatHistogram
)
func newQueue(batchSize, capacity int) *queue {
@ -1353,7 +1402,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
}
queue.ReturnForReuse(batch)
@ -1394,6 +1444,9 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms

View file

@ -65,11 +65,13 @@ func TestSampleDelivery(t *testing.T) {
samples bool
exemplars bool
histograms bool
floatHistograms bool
}{
{samples: true, exemplars: false, histograms: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, name: "histograms only"},
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
}
// Let's create an even number of send batches so we don't run into the
@ -105,6 +107,7 @@ func TestSampleDelivery(t *testing.T) {
samples []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
)
// Generates same series in both cases.
@ -115,7 +118,10 @@ func TestSampleDelivery(t *testing.T) {
exemplars, series = createExemplars(n, n)
}
if tc.histograms {
histograms, series = createHistograms(n, n)
histograms, _, series = createHistograms(n, n, false)
}
if tc.floatHistograms {
_, floatHistograms, series = createHistograms(n, n, true)
}
// Apply new config.
@ -135,18 +141,22 @@ func TestSampleDelivery(t *testing.T) {
c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series)
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
c.waitForExpectedData(t)
})
}
@ -586,16 +596,14 @@ func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []recor
return exemplars, series
}
func createHistograms(numSamples, numSeries int) ([]record.RefHistogramSample, []record.RefSeries) {
func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.RefHistogramSample, []record.RefFloatHistogramSample, []record.RefSeries) {
histograms := make([]record.RefHistogramSample, 0, numSamples)
floatHistograms := make([]record.RefFloatHistogramSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ {
h := record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
H: &histogram.Histogram{
hist := &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
@ -605,16 +613,33 @@ func createHistograms(numSamples, numSeries int) ([]record.RefHistogramSample, [
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
},
}
if floatHistogram {
fh := record.RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
FH: hist.ToFloat(),
}
floatHistograms = append(floatHistograms, fh)
} else {
h := record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
H: hist,
}
histograms = append(histograms, h)
}
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name),
})
}
return histograms, series
if floatHistogram {
return nil, floatHistograms, series
}
return histograms, nil, series
}
func getSeriesNameFromRef(r record.RefSeries) string {
@ -627,7 +652,9 @@ type TestWriteClient struct {
receivedExemplars map[string][]prompb.Exemplar
expectedExemplars map[string][]prompb.Exemplar
receivedHistograms map[string][]prompb.Histogram
receivedFloatHistograms map[string][]prompb.Histogram
expectedHistograms map[string][]prompb.Histogram
expectedFloatHistograms map[string][]prompb.Histogram
receivedMetadata map[string][]prompb.MetricMetadata
writesReceived int
withWaitGroup bool
@ -704,6 +731,23 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
c.wg.Add(len(hh))
}
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedFloatHistograms = map[string][]prompb.Histogram{}
c.receivedFloatHistograms = map[string][]prompb.Histogram{}
for _, fh := range fhs {
seriesName := getSeriesNameFromRef(series[fh.Ref])
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
}
c.wg.Add(len(fhs))
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
if !c.withWaitGroup {
return
@ -720,6 +764,9 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
for ts, expectedHistogram := range c.expectedHistograms {
require.Equal(tb, expectedHistogram, c.receivedHistograms[ts], ts)
}
for ts, expectedFloatHistogram := range c.expectedFloatHistograms {
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts)
}
}
func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
@ -755,8 +802,13 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
for _, histogram := range ts.Histograms {
count++
if histogram.GetCountFloat() > 0 || histogram.GetZeroCountFloat() > 0 {
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram)
} else {
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram)
}
}
}
if c.withWaitGroup {
c.wg.Add(-count)

View file

@ -124,16 +124,20 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
}
// TODO(codesome): support float histograms.
for _, hp := range ts.Histograms {
if hp.GetCountFloat() > 0 || hp.GetZeroCountFloat() > 0 { // It is a float histogram.
fhs := HistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else {
hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
// Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)

View file

@ -66,8 +66,14 @@ func TestRemoteWriteHandler(t *testing.T) {
}
for _, hp := range ts.Histograms {
if hp.GetCountFloat() > 0 || hp.GetZeroCountFloat() > 0 { // It is a float histogram.
fh := HistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
}
@ -124,7 +130,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
}}, nil, nil, nil)
require.NoError(t, err)

View file

@ -50,6 +50,7 @@ type WriteTo interface {
Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogramSample) bool
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
StoreSeries([]record.RefSeries, int)
// Next two methods are intended for garbage-collection: first we call
@ -483,6 +484,8 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
@ -567,7 +570,33 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.AppendHistograms(histogramsToSend)
histogramsToSend = histogramsToSend[:0]
}
case record.FloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
}
if !tail {
break
}
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, fh := range floatHistograms {
if fh.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
floatHistogramsToSend = append(floatHistogramsToSend, fh)
}
}
if len(floatHistogramsToSend) > 0 {
w.writer.AppendFloatHistograms(floatHistogramsToSend)
floatHistogramsToSend = floatHistogramsToSend[:0]
}
case record.Tombstones:
default:

View file

@ -55,6 +55,7 @@ type writeToMock struct {
samplesAppended int
exemplarsAppended int
histogramsAppended int
floatHistogramsAppended int
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
}
@ -74,6 +75,11 @@ func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
return true
}
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
wtm.floatHistogramsAppended += len(fh)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.UpdateSeriesSegment(series, index)
}
@ -171,10 +177,7 @@ func TestTailSamples(t *testing.T) {
for j := 0; j < histogramsCount; j++ {
inner := rand.Intn(ref + 1)
histogram := enc.HistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: &histogram.Histogram{
hist := &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
@ -184,9 +187,21 @@ func TestTailSamples(t *testing.T) {
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
},
}
histogram := enc.HistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: hist,
}}, nil)
require.NoError(t, w.Log(histogram))
floatHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
FH: hist.ToFloat(),
}}, nil)
require.NoError(t, w.Log(floatHistogram))
}
}
@ -221,6 +236,7 @@ func TestTailSamples(t *testing.T) {
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms")
})
}
}