Merge pull request #11011 from prometheus/beorn7/protobuf

prompb: Modify layout of histograms
This commit is contained in:
Björn Rabenstein 2022-07-14 18:20:54 +02:00 committed by GitHub
commit 5937b4f5d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 587 additions and 585 deletions

View file

@ -52,7 +52,7 @@ func main() {
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
h := remote.HistogramProtoToHistogram(hp) h := remote.HistogramProtoToHistogram(hp)
fmt.Printf("\tHistogram: %s\n", (&h).String()) fmt.Printf("\tHistogram: %s\n", h.String())
} }
} }
}) })

File diff suppressed because it is too large Load diff

View file

@ -86,40 +86,46 @@ message Histogram {
uint64 zero_count_int = 6; uint64 zero_count_int = 6;
double zero_count_float = 7; double zero_count_float = 7;
} }
Buckets negative_buckets = 8;
Buckets positive_buckets = 9; // Negative Buckets.
ResetHint reset_hint = 10; repeated BucketSpan negative_spans = 8;
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_counts = 10; // Absolute count of each bucket.
// Positive Buckets.
repeated BucketSpan positive_spans = 11;
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_counts = 13; // Absolute count of each bucket.
ResetHint reset_hint = 14;
// timestamp is in ms format, see model/timestamp/timestamp.go for // timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp. // conversion from time.Time to Prometheus timestamp.
int64 timestamp = 11; int64 timestamp = 15;
} }
// Sparse buckets. // A BucketSpan defines a number of consecutive buckets with their
message Buckets { // offset. Logically, it would be more straightforward to include the
// A Span is a given number of consecutive buckets at a given // bucket counts in the Span. However, the protobuf representation is
// offset. Logically, it would be more straightforward to include // more compact in the way the data is structured here (with all the
// the bucket counts in the Span. However, the protobuf // buckets in a single array separate from the Spans).
// representation is more compact in the way the data is structured message BucketSpan {
// here (with all the buckets in a single array separate from the sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
// Spans). uint32 length = 2; // Length of consecutive buckets.
message Span {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}
repeated Span span = 1;
// Only one of "delta" or "count" may be used, the former for regular
// histograms with integer counts, the latter for float histograms.
repeated sint64 delta = 2; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double count = 3; // Absolute count of each bucket.
} }
// TimeSeries represents samples and labels for a single time series. // TimeSeries represents samples and labels for a single time series.
message TimeSeries { message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars // For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required. // to be ingested by the remote system properly, the labels field is required.
repeated Label labels = 1 [(gogoproto.nullable) = false]; repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
} }

View file

@ -502,28 +502,24 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
} }
} }
func HistogramProtoToHistogram(hp prompb.Histogram) histogram.Histogram { // HistogramProtoToHistogram extracts a (normal integer) Histogram from the
h := histogram.Histogram{ // provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema, Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold, ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(), ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(), Count: hp.GetCountInt(),
Sum: hp.Sum, Sum: hp.Sum,
PositiveBuckets: hp.PositiveBuckets.GetDelta(), PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
NegativeBuckets: hp.NegativeBuckets.GetDelta(), PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
} }
if hp.PositiveBuckets != nil {
h.PositiveSpans = spansProtoToSpans(hp.PositiveBuckets.Span)
}
if hp.NegativeBuckets != nil {
h.NegativeSpans = spansProtoToSpans(hp.NegativeBuckets.Span)
}
return h
} }
func spansProtoToSpans(s []*prompb.Buckets_Span) []histogram.Span { func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s)) spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
@ -534,27 +530,23 @@ func spansProtoToSpans(s []*prompb.Buckets_Span) []histogram.Span {
func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram { func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{ return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count}, Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum, Sum: h.Sum,
Schema: h.Schema, Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold, ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeBuckets: &prompb.Buckets{ NegativeSpans: spansToSpansProto(h.NegativeSpans),
Span: spansToSpansProto(h.NegativeSpans), NegativeDeltas: h.NegativeBuckets,
Delta: h.NegativeBuckets, PositiveSpans: spansToSpansProto(h.PositiveSpans),
}, PositiveDeltas: h.PositiveBuckets,
PositiveBuckets: &prompb.Buckets{ Timestamp: timestamp,
Span: spansToSpansProto(h.PositiveSpans),
Delta: h.PositiveBuckets,
},
Timestamp: timestamp,
} }
} }
func spansToSpansProto(s []histogram.Span) []*prompb.Buckets_Span { func spansToSpansProto(s []histogram.Span) []*prompb.BucketSpan {
spans := make([]*prompb.Buckets_Span, len(s)) spans := make([]*prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
spans[i] = &prompb.Buckets_Span{Offset: s[i].Offset, Length: s[i].Length} spans[i] = &prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
} }
return spans return spans

View file

@ -120,7 +120,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
hs := HistogramProtoToHistogram(hp) hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, &hs) _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs)
if err != nil { if err != nil {
unwrappedErr := errors.Unwrap(err) unwrappedErr := errors.Unwrap(err)
// Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is // Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is

View file

@ -66,7 +66,7 @@ func TestRemoteWriteHandler(t *testing.T) {
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
h := HistogramProtoToHistogram(hp) h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, &h}, appendable.histograms[k]) require.Equal(t, mockHistogram{labels, hp.Timestamp, h}, appendable.histograms[k])
k++ k++
} }
} }