define separate proto types for remote write 2.0

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-12-20 18:31:21 -03:00
parent 8df1d63885
commit 48f9285b11
9 changed files with 2539 additions and 169 deletions

View file

@ -26,6 +26,10 @@ func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
func (h MinHistogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*MinHistogram_CountFloat)
return ok
}
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size()

View file

@ -45,8 +45,8 @@ func TestOptimizedMarshal(t *testing.T) {
14, 15,
},
Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Samples: []MinSample{{Value: 1, Timestamp: 0}},
Exemplars: []MinExemplar{{LabelsRefs: []uint32{0, 1}, Value: 1, Timestamp: 0}},
Histograms: nil,
},
{
@ -60,8 +60,8 @@ func TestOptimizedMarshal(t *testing.T) {
12, 13,
14, 15,
},
Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Samples: []MinSample{{Value: 2, Timestamp: 1}},
Exemplars: []MinExemplar{{LabelsRefs: []uint32{0, 1}, Value: 2, Timestamp: 1}},
Histograms: nil,
},
},

File diff suppressed because it is too large Load diff

View file

@ -38,6 +38,23 @@ message MetricMetadata {
string unit = 5;
}
message MinMetricMetadata {
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
MetricType type = 1;
uint32 help_ref = 3;
uint32 unit_ref = 4;
}
message Sample {
double value = 1;
// timestamp is in ms format, see model/timestamp/timestamp.go for
@ -45,6 +62,14 @@ message Sample {
int64 timestamp = 2;
}
message MinSample {
double value = 1;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 2;
}
message Exemplar {
// Optional, can be empty.
repeated Label labels = 1 [(gogoproto.nullable) = false];
@ -54,6 +79,13 @@ message Exemplar {
int64 timestamp = 3;
}
message MinExemplar {
// TODO: same as TimeSeries.labels_refs
repeated uint32 labels_refs = 1;
double value = 2;
// timestamp is in ms.
int64 timestamp = 3;
}
// A native histogram, also known as a sparse histogram.
// Original design doc:
@ -120,6 +152,73 @@ message BucketSpan {
uint32 length = 2; // Length of consecutive buckets.
}
// A native histogram, also known as a sparse histogram.
// Original design doc:
// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
// The appendix of this design doc also explains the concept of float
// histograms. This Histogram message can represent both, the usual
// integer histogram as well as a float histogram.
message MinHistogram {
enum ResetHint {
UNKNOWN = 0; // Need to test for a counter reset explicitly.
YES = 1; // This is the 1st histogram after a counter reset.
NO = 2; // There was no counter reset between this and the previous Histogram.
GAUGE = 3; // This is a gauge histogram where counter resets don't happen.
}
oneof count { // Count of observations in the histogram.
uint64 count_int = 1;
double count_float = 2;
}
double sum = 3; // Sum of observations in the histogram.
// The schema defines the bucket schema. Currently, valid numbers
// are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1
// is a bucket boundary in each case, and then each power of two is
// divided into 2^n logarithmic buckets. Or in other words, each
// bucket boundary is the previous boundary times 2^(2^-n). In the
// future, more bucket schemas may be added using numbers < -4 or >
// 8.
sint32 schema = 4;
double zero_threshold = 5; // Breadth of the zero bucket.
oneof zero_count { // Count in zero bucket.
uint64 zero_count_int = 6;
double zero_count_float = 7;
}
// Negative Buckets.
repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_counts = 10; // Absolute count of each bucket.
// Positive Buckets.
repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_counts = 13; // Absolute count of each bucket.
ResetHint reset_hint = 14;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 15;
}
// A BucketSpan defines a number of consecutive buckets with their
// offset. Logically, it would be more straightforward to include the
// bucket counts in the Span. However, the protobuf representation is
// more compact in the way the data is structured here (with all the
// buckets in a single array separate from the Spans).
message MinBucketSpan {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}
// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars
@ -134,12 +233,12 @@ message TimeSeries {
message MinimizedTimeSeriesStr {
// Sorted list of label name-value pair references, encoded as indices to a strings array.
// This list's len is always multiple of 2.
repeated uint32 label_symbols = 1 [(gogoproto.nullable) = false];
repeated uint32 label_symbols = 1;
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
repeated MinSample samples = 2 [(gogoproto.nullable) = false];
repeated MinExemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated MinHistogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}

View file

@ -628,6 +628,17 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
}
}
func minExemplarProtoToExemplar(ep prompb.MinExemplar, symbols []string) exemplar.Exemplar {
timestamp := ep.Timestamp
return exemplar.Exemplar{
Labels: Uint32StrRefToLabels(symbols, ep.LabelsRefs),
Value: ep.Value,
Ts: timestamp,
HasTs: timestamp != 0,
}
}
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
@ -692,6 +703,45 @@ func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogr
}
}
func FloatMinHistogramProtoToFloatHistogram(hp prompb.MinHistogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func MinHistogramProtoToHistogram(hp prompb.MinHistogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
@ -727,6 +777,22 @@ func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.H
}
}
func HistogramToMinHistogramProto(timestamp int64, h *histogram.Histogram) prompb.MinHistogram {
return prompb.MinHistogram{
Count: &prompb.MinHistogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.MinHistogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: prompb.MinHistogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count},
@ -743,6 +809,22 @@ func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogra
}
}
func FloatHistogramToMinHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.MinHistogram {
return prompb.MinHistogram{
Count: &prompb.MinHistogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &prompb.MinHistogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: prompb.MinHistogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan {
spans := make([]prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
@ -935,12 +1017,43 @@ func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequestStr
for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
exemplars[j].Labels = e.Labels
Uint32StrRefToLabels(redReq.Symbols, e.LabelsRefs).Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
}
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
for j, s := range rts.Samples {
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
req.Timeseries[i].Samples[j].Value = s.Value
}
req.Timeseries[i].Samples = rts.Samples
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Histograms = rts.Histograms
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
for j, h := range rts.Histograms {
// TODO: double check
if h.IsFloatHistogram() {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
} else {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
}
req.Timeseries[i].Histograms[j].Sum = h.Sum
req.Timeseries[i].Histograms[j].Schema = h.Schema
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
req.Timeseries[i].Histograms[j].NegativeSpans = h.NegativeSpans
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
req.Timeseries[i].Histograms[j].PositiveSpans = h.PositiveSpans
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)
req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp
}
}
return req, nil

View file

@ -88,19 +88,26 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequestStr {
ref := st.RefStr(s)
labels = append(labels, ref)
}
for _, s := range []string{
"f", "g", // 10, 11
"h", "i", // 12, 13
} {
st.RefStr(s)
}
return &prompb.MinimizedWriteRequestStr{
Timeseries: []prompb.MinimizedTimeSeriesStr{
{
LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
Samples: []prompb.MinSample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.MinExemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}},
Histograms: []prompb.MinHistogram{HistogramToMinHistogramProto(0, &testHistogram), FloatHistogramToMinHistogramProto(1, testHistogram.ToFloat(nil))},
},
{
LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat(nil))},
Samples: []prompb.MinSample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.MinExemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}},
Histograms: []prompb.MinHistogram{HistogramToMinHistogramProto(2, &testHistogram), FloatHistogramToMinHistogramProto(3, testHistogram.ToFloat(nil))},
},
},
Symbols: st.LabelsStrings(),

View file

@ -1381,7 +1381,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pendingMinStrData := make([]prompb.MinimizedTimeSeriesStr, max)
for i := range pendingMinStrData {
pendingMinStrData[i].Samples = []prompb.Sample{{}}
pendingMinStrData[i].Samples = []prompb.MinSample{{}}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1623,23 +1623,23 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri
pendingData[nPending].LabelSymbols = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.MinSample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.MinExemplar{
LabelsRefs: labelsToUint32SliceStr(d.exemplarLabels, symbolTable, nil), // TODO: optimize, reuse slice
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToMinHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToMinHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}

View file

@ -1505,8 +1505,8 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeriesStr, len(tc.batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
seriesBuff[i].Samples = []prompb.MinSample{{}}
seriesBuff[i].Exemplars = []prompb.MinExemplar{{}}
}
pBuf := []byte{}

View file

@ -189,8 +189,26 @@ func (h *writeHandler) appendSamples(app storage.Appender, ss []prompb.Sample, l
var ref storage.SeriesRef
var err error
for _, s := range ss {
ref, err = app.Append(ref, labels, s.Timestamp, s.
Value)
ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue())
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp)
}
return err
}
}
return nil
}
func (h *writeHandler) appendMinSamples(app storage.Appender, ss []prompb.MinSample, labels labels.Labels) error {
var ref storage.SeriesRef
var err error
for _, s := range ss {
ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue())
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
@ -231,6 +249,32 @@ func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histog
return nil
}
func (h *writeHandler) appendMinHistograms(app storage.Appender, hh []prompb.MinHistogram, labels labels.Labels) error {
var err error
for _, hp := range hh {
if hp.IsFloatHistogram() {
fhs := FloatMinHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else {
hs := MinHistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
// Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
}
return err
}
}
return nil
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
@ -305,17 +349,17 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWri
for _, ts := range req.Timeseries {
ls := Uint32StrRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
err := h.appendMinSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
e := minExemplarProtoToExemplar(ep, req.Symbols)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
err = h.appendMinHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}