[PRW 2.0] (chain3) generalize remote write logic for DRY/maintainability.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2024-06-24 09:14:02 +01:00
parent 786e3042af
commit 68bf54297f
8 changed files with 1008 additions and 1134 deletions

View file

@ -14,17 +14,132 @@
package prompb
import (
"math"
"sync"
"github.com/prometheus/prometheus/model/histogram"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
// IsFloatHistogram returns true if the histogram is float.
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
// ToIntHistogram returns Integer Prometheus histogram from remote implementation
// of integer. It's a caller responsibility to check if it's not Float histogram.
func (h Histogram) ToIntHistogram() *histogram.Histogram {
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountInt(),
Count: h.GetCountInt(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeDeltas(),
}
}
// ToFloatHistogram returns Float Prometheus histogram from remote implementation
// of float (or integer).
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
if h.IsFloatHistogram() {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountFloat(),
Count: h.GetCountFloat(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeCounts(),
}
}
// Conversion.
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: float64(h.GetZeroCountInt()),
Count: float64(h.GetCountInt()),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
}
}
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
// FromIntHistogram returns remote Histogram from the Integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
// FromFloatHistogram returns remote Histogram from the Float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size()
data, ok := p.Get().(*[]byte)
@ -37,3 +152,54 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
}
return r.Marshal()
}
// FilterTimeSeries returns filtered times series with filtering and timestamp statistics.
func FilterTimeSeries(timeSeries []TimeSeries, filter func(TimeSeries) bool) (highest, lowest int64, filtered []TimeSeries, droppedSeries, droppedSamples, droppedExemplars, droppedHistograms int) {
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
droppedSeries++
if len(ts.Samples) > 0 {
droppedSamples = +len(ts.Samples)
}
if len(ts.Histograms) > 0 {
droppedHistograms = +len(ts.Histograms)
}
if len(ts.Exemplars) > 0 {
droppedExemplars = +len(ts.Exemplars)
}
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
// TODO(bwplotka): Still true?
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
// Get the lowest timestamp.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSeries, droppedSamples, droppedHistograms, droppedExemplars
}

View file

@ -14,17 +14,132 @@
package writev2
import (
"math"
"slices"
"github.com/prometheus/prometheus/model/histogram"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
// IsFloatHistogram returns true if the histogram is float.
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
// ToIntHistogram returns Integer Prometheus histogram from remote implementation
// of integer. It's a caller responsibility to check if it's not Float histogram.
func (h Histogram) ToIntHistogram() *histogram.Histogram {
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountInt(),
Count: h.GetCountInt(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeDeltas(),
}
}
// ToFloatHistogram returns Float Prometheus histogram from remote implementation
// of float (or integer).
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
if h.IsFloatHistogram() {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountFloat(),
Count: h.GetCountFloat(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeCounts(),
}
}
// Conversion.
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: float64(h.GetZeroCountInt()),
Count: float64(h.GetCountInt()),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
}
}
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
// FromIntHistogram returns remote Histogram from the Integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
// FromFloatHistogram returns remote Histogram from the Float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
@ -170,3 +285,54 @@ func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
}
return len(dAtA) - i, nil
}
// FilterTimeSeries returns filtered times series with filtering and timestamp statistics.
func FilterTimeSeries(timeSeries []TimeSeries, filter func(TimeSeries) bool) (highest, lowest int64, filtered []TimeSeries, droppedSeries, droppedSamples, droppedExemplars, droppedHistograms int) {
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
droppedSeries++
if len(ts.Samples) > 0 {
droppedSamples = +len(ts.Samples)
}
if len(ts.Histograms) > 0 {
droppedHistograms = +len(ts.Histograms)
}
if len(ts.Exemplars) > 0 {
droppedExemplars = +len(ts.Exemplars)
}
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
// TODO(bwplotka): Still true?
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
// Get the lowest timestamp.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSeries, droppedSamples, droppedHistograms, droppedExemplars
}

View file

@ -22,8 +22,8 @@ type SymbolsTable struct {
}
// NewSymbolTable returns a symbol table.
func NewSymbolTable() SymbolsTable {
return SymbolsTable{
func NewSymbolTable() *SymbolsTable {
return &SymbolsTable{
// Empty string is required as a first element.
symbolsMap: map[string]uint32{"": 0},
strings: []string{""},
@ -73,8 +73,8 @@ func (t *SymbolsTable) Reset() {
}
// DesymbolizeLabels decodes label references, with given symbols to labels.
func DesymbolizeLabels(labelRefs []uint32, symbols []string) labels.Labels {
b := labels.NewScratchBuilder(len(labelRefs))
func DesymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels {
b.Reset()
for i := 0; i < len(labelRefs); i += 2 {
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
}

View file

@ -49,7 +49,8 @@ func TestSymbolsTable(t *testing.T) {
ls := labels.FromStrings("__name__", "qwer", "zxcv", "1234")
encoded := s.SymbolizeLabels(ls, nil)
require.Equal(t, []uint32{1, 3, 4, 5}, encoded)
decoded := DesymbolizeLabels(encoded, s.Symbols())
sb := labels.NewScratchBuilder(len(encoded))
decoded := DesymbolizeLabels(&sb, encoded, s.Symbols())
require.Equal(t, ls, decoded)
// Different buf.

View file

@ -155,10 +155,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
})
case chunkenc.ValHistogram:
ts, h := iter.AtHistogram(nil)
histograms = append(histograms, HistogramToHistogramProto(ts, h))
histograms = append(histograms, prompb.FromIntHistogram(ts, h))
case chunkenc.ValFloatHistogram:
ts, fh := iter.AtFloatHistogram(nil)
histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh))
histograms = append(histograms, prompb.FromFloatHistogram(ts, fh))
default:
return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType)
}
@ -483,18 +483,15 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist
panic("iterator is not on an integer histogram sample")
}
h := c.series.histograms[c.histogramsCur]
return h.Timestamp, HistogramProtoToHistogram(h)
return h.Timestamp, h.ToIntHistogram()
}
// AtFloatHistogram implements chunkenc.Iterator.
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
switch c.curValType {
case chunkenc.ValHistogram:
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
fh := c.series.histograms[c.histogramsCur]
return fh.Timestamp, HistogramProtoToFloatHistogram(fh)
case chunkenc.ValFloatHistogram:
fh := c.series.histograms[c.histogramsCur]
return fh.Timestamp, FloatHistogramProtoToFloatHistogram(fh)
return fh.Timestamp, fh.ToFloatHistogram()
default:
panic("iterator is not on a histogram sample")
}
@ -619,290 +616,143 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
return result, nil
}
func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemplar.Exemplar {
timestamp := ep.Timestamp
// writeRequest allows semi-generic, efficient and consistent implementation for
// writing & reading write request.
// Interfaces alone will not help us as we have a lot or slices with different types.
// Generics will not help us given many limitations like
// * inability to access structs fields https://stackoverflow.com/a/76924195
// * inability to have generic methods.
// * do not work with return params only.
// * out structs used unfortunate nullable with gogo, so even methods are not accessible as
// all of them are pointer receivers. Removal of gogo would make generics more viable.
type writeRequest struct {
v1 *prompb.WriteRequest
v2 *writev2.Request
}
return exemplar.Exemplar{
Labels: labelProtosToLabels(b, ep.Labels),
Value: ep.Value,
Ts: timestamp,
HasTs: timestamp != 0,
func (r *writeRequest) ForeachTimeseries(f func(ts *writeTimeSeries) error) error {
ts := &writeTimeSeries{}
if r.v1 != nil {
for _, v1 := range r.v1.Timeseries {
ts.v1 = &v1
if err := f(ts); err != nil {
return err
}
}
return nil
}
ts.v2Symbols = r.v2.Symbols
for _, v2 := range r.v2.Timeseries {
ts.v2 = &v2
if err := f(ts); err != nil {
return err
}
}
return nil
}
type writeTimeSeries struct {
v1 *prompb.TimeSeries
v2 *writev2.TimeSeries
v2Symbols []string
}
func (s *writeTimeSeries) Labels(b *labels.ScratchBuilder) labels.Labels {
if s.v1 != nil {
return labelProtosToLabels(b, s.v1.Labels)
}
return writev2.DesymbolizeLabels(b, s.v2.LabelsRefs, s.v2Symbols)
}
type writeSample interface {
GetTimestamp() int64
GetValue() float64
}
func (s *writeTimeSeries) ForeachSample(f func(s writeSample) error) error {
if s.v1 != nil {
for _, v1 := range s.v1.Samples {
if err := f(&v1); err != nil {
return err
}
}
return nil
}
for _, v2 := range s.v2.Samples {
if err := f(&v2); err != nil {
return err
}
}
return nil
}
func (s *writeTimeSeries) ForeachHistogram(f func(ts int64, h *histogram.Histogram, fh *histogram.FloatHistogram) error) error {
if s.v1 != nil {
for _, v1 := range s.v1.Histograms {
var (
h *histogram.Histogram
fh *histogram.FloatHistogram
)
if v1.IsFloatHistogram() {
fh = v1.ToFloatHistogram()
} else {
h = v1.ToIntHistogram()
}
if err := f(v1.GetTimestamp(), h, fh); err != nil {
return err
}
}
return nil
}
for _, v2 := range s.v2.Histograms {
var (
h *histogram.Histogram
fh *histogram.FloatHistogram
)
if v2.IsFloatHistogram() {
fh = v2.ToFloatHistogram()
} else {
h = v2.ToIntHistogram()
}
if err := f(v2.GetTimestamp(), h, fh); err != nil {
return err
}
}
return nil
}
func (s *writeTimeSeries) ForeachExemplar(b *labels.ScratchBuilder, f func(s exemplar.Exemplar)) {
if s.v1 != nil {
for _, v1 := range s.v1.Exemplars {
timestamp := v1.GetTimestamp()
f(exemplar.Exemplar{
Labels: labelProtosToLabels(b, v1.Labels),
Value: v1.Value,
Ts: timestamp,
HasTs: timestamp != 0,
})
}
}
for _, v2 := range s.v2.Exemplars {
timestamp := v2.GetTimestamp()
f(exemplar.Exemplar{
Labels: writev2.DesymbolizeLabels(b, v2.LabelsRefs, s.v2Symbols),
Value: v2.Value,
Ts: timestamp,
HasTs: timestamp != 0,
})
}
}
func exemplarProtoV2ToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar {
timestamp := ep.Timestamp
return exemplar.Exemplar{
Labels: writev2.DesymbolizeLabels(ep.LabelsRefs, symbols),
Value: ep.Value,
Ts: timestamp,
HasTs: timestamp != 0,
func (s *writeTimeSeries) Metadata() (met metadata.Metadata, isV2 bool) {
if s.v1 != nil {
return met, false
}
}
func metadataProtoV2ToMetadata(mp writev2.Metadata, symbols []string) metadata.Metadata {
return metadata.Metadata{
Type: metricTypeFromProtoV2Equivalent(mp.Type),
Unit: symbols[mp.UnitRef],
Help: symbols[mp.HelpRef],
}
}
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
// HistogramProtoV2ToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func HistogramProtoV2ToHistogram(hp writev2.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents a float histogram and not an integer histogram,
// or it panics.
func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// FloatHistogramProtoV2ToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents a float histogram and not an integer histogram,
// or it panics.
func FloatHistogramProtoV2ToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message
// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a
// float histogram, or it panics.
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToFloatHistogram called with a float histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: float64(hp.GetZeroCountInt()),
Count: float64(hp.GetCountInt()),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
}
}
func FloatV2HistogramProtoToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// V2HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func V2HistogramProtoToHistogram(hp writev2.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func spansProtoV2ToSpans(s []writev2.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: prompb.Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
func HistogramToV2HistogramProto(timestamp int64, h *histogram.Histogram) writev2.Histogram {
return writev2.Histogram{
Count: &writev2.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &writev2.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToV2SpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToV2SpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: writev2.Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: prompb.Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func FloatHistogramToV2HistogramProto(timestamp int64, fh *histogram.FloatHistogram) writev2.Histogram {
return writev2.Histogram{
Count: &writev2.Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &writev2.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToV2SpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToV2SpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: writev2.Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan {
spans := make([]prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func spansToV2SpansProto(s []histogram.Span) []writev2.BucketSpan {
spans := make([]writev2.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = writev2.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
Type: metricTypeFromProtoV2Equivalent(s.v2.Metadata.Type),
Unit: s.v2Symbols[s.v2.Metadata.UnitRef],
Help: s.v2Symbols[s.v2.Metadata.HelpRef],
}, true
}
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric.
@ -1055,74 +905,3 @@ func DecodeV2WriteRequestStr(r io.Reader) (*writev2.Request, error) {
return &req, nil
}
func V2WriteRequestToWriteRequest(redReq *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
// TODO handle metadata?
}
for i, rts := range redReq.Timeseries {
writev2.DesymbolizeLabels(rts.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
writev2.DesymbolizeLabels(e.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
}
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
for j, s := range rts.Samples {
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
req.Timeseries[i].Samples[j].Value = s.Value
}
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
for j, h := range rts.Histograms {
// TODO: double check
if h.IsFloatHistogram() {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
} else {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
}
for _, span := range h.NegativeSpans {
req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
for _, span := range h.PositiveSpans {
req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
req.Timeseries[i].Histograms[j].Sum = h.Sum
req.Timeseries[i].Histograms[j].Schema = h.Schema
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)
req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp
}
}
return req, nil
}

View file

@ -559,29 +559,38 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
}
pBuf := proto.NewBuffer(nil)
var buf []byte
numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend)))
for i := 0; i < numSends; i++ {
last := (i + 1) * t.mcfg.MaxSamplesPerSend
if last > len(metadata) {
last = len(metadata)
}
err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], pBuf)
if err != nil {
if err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], pBuf, &buf); err != nil {
t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend)))
level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err)
}
}
}
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte) error {
pBuf.Reset()
// Build the WriteRequest with no samples (v1 flow).
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.enc)
pMsg := &prompb.WriteRequest{
Metadata: metadata,
}
if err := pBuf.Marshal(pMsg); err != nil {
return err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
*buf = (*buf)[0:cap(*buf)]
req, err := compressPayload(buf, pBuf.Bytes(), SnappyBlockCompression)
if err != nil {
return err
}
metadataCount := len(metadata)
attemptStore := func(try int) error {
ctx, span := otel.Tracer("").Start(ctx, "Remote Metadata Send Batch")
defer span.End()
@ -621,83 +630,13 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
return nil
}
func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
limitTs := baseTime.Add(-sampleAgeLimit)
sampleTs := timestamp.Time(ts)
return sampleTs.Before(limitTs)
}
func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
default:
return false
}
return false
}
}
func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts writev2.TimeSeries) bool {
return func(ts writev2.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
default:
return false
}
return false
}
}
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
currentTime := time.Now()
outer:
for _, s := range samples {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) {
if isTimestampTooOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
@ -758,7 +697,7 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
currentTime := time.Now()
outer:
for _, e := range exemplars {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) {
if isTimestampTooOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
@ -815,7 +754,7 @@ func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample)
currentTime := time.Now()
outer:
for _, h := range histograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
if isTimestampTooOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
@ -870,7 +809,7 @@ func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHi
currentTime := time.Now()
outer:
for _, h := range floatHistograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
if isTimestampTooOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
@ -1378,6 +1317,8 @@ type timeSeries struct {
timestamp int64
exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram.
// TODO(bwplotka): Does not make sense in PRW2.0 world? Exemplar can be attached
// to the same TimeSeries. Adjust.
sType seriesType
}
@ -1509,35 +1450,17 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}()
shardNum := strconv.Itoa(shardID)
symbolTable := writev2.NewSymbolTable()
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
pBuf = proto.NewBuffer(nil)
pBufRaw []byte
buf []byte
)
max := s.qm.cfg.MaxSamplesPerSend
// TODO(@tpaschalis) Should we also raise the max if we have WAL metadata?
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
}
// TODO: Dry all of this, we should make an interface/generic for the timeseries type.
batchQueue := queue.Chan()
pendingData := make([]prompb.TimeSeries, max)
for i := range pendingData {
pendingData[i].Samples = []prompb.Sample{{}}
if s.qm.sendExemplars {
pendingData[i].Exemplars = []prompb.Exemplar{{}}
}
}
pendingDataV2 := make([]writev2.TimeSeries, max)
for i := range pendingDataV2 {
pendingDataV2[i].Samples = []writev2.Sample{{}}
}
pendingSeries := newProtoTimeSeriesBuffer(s.qm.protoMsg, max, s.qm.sendExemplars, s.qm.sendNativeHistograms)
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
@ -1550,22 +1473,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
defer stop()
sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, enc Compression, timer bool) {
switch protoMsg {
case config.RemoteWriteProtoMsgV1:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
if timer {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
}
_ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf, enc)
case config.RemoteWriteProtoMsgV2:
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
_ = s.sendV2Samples(ctx, pendingDataV2[:n], symbolTable.Symbols(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf, enc)
symbolTable.Reset()
sendBatch := func(batch []timeSeries, timerTick bool) {
pendingSeries.Put(batch)
if timerTick {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", pendingSeries.nPendingSamples,
"exemplars", pendingSeries.nPendingExemplars, "shard", shardNum, "histograms", pendingSeries.nPendingHistograms)
}
queue.ReturnForReuse(batch)
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, pendingSeries, SnappyBlockCompression)
s.updateMetrics(err, pendingSeries.nPendingSamples, pendingSeries.nPendingExemplars, pendingSeries.nPendingHistograms, 0, time.Since(begin))
}
for {
@ -1592,80 +1509,267 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
return
}
sendBatch(batch, s.qm.protoMsg, s.qm.enc, false)
// TODO(bwplotka): Previously the return was between popular and send, double check.
queue.ReturnForReuse(batch)
sendBatch(batch, false)
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
sendBatch(batch, s.qm.protoMsg, s.qm.enc, true)
sendBatch(batch, true)
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
}
}
func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write
// messages. Similar to writeRequest it does not use generics/interfaces for
// reasons stated in writeRequest commentary.
// TODO(bwplotka): This might make little sense without gogo, remove it with gogo
// removal.
type protoTimeSeriesBuffer struct {
v1 []prompb.TimeSeries
v2 []writev2.TimeSeries
v2Symbols *writev2.SymbolsTable
sendExemplars, sendNativeHistograms bool
nPendingSamples, nPendingExemplars, nPendingHistograms int
pBuf *proto.Buffer
buf []byte
}
func newProtoTimeSeriesBuffer(protoMsg config.RemoteWriteProtoMsg, max int, sendExemplars, sendNativeHistograms bool) *protoTimeSeriesBuffer {
ret := &protoTimeSeriesBuffer{
sendExemplars: sendExemplars,
sendNativeHistograms: sendNativeHistograms,
pBuf: proto.NewBuffer(nil),
}
if protoMsg == config.RemoteWriteProtoMsgV1 {
ret.v1 = make([]prompb.TimeSeries, max)
for i := range ret.v1 {
// NOTO(bwplotka): Why empty one-elem samples and exemplar?
ret.v1[i].Samples = []prompb.Sample{{}}
if sendExemplars {
ret.v1[i].Exemplars = []prompb.Exemplar{{}}
}
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
return ret
}
ret.v2 = make([]writev2.TimeSeries, max)
for i := range ret.v2 {
// NOTO(bwplotka): Why empty one-elem samples and exemplar?
ret.v2[i].Samples = []writev2.Sample{{}}
ret.v2[i].Exemplars = []writev2.Exemplar{{}}
}
ret.sendNativeHistograms = true // Always send native histograms as documented.
ret.v2Symbols = writev2.NewSymbolTable()
return ret
}
func (p *protoTimeSeriesBuffer) Put(batch []timeSeries) {
p.reset(len(batch))
if p.v1 != nil {
p.putV1(batch)
return
}
p.putV2(batch)
}
func (p *protoTimeSeriesBuffer) reset(batchLen int) {
p.nPendingSamples, p.nPendingExemplars, p.nPendingHistograms = 0, 0, 0
if p.v1 != nil {
for i := 0; i < batchLen; i++ {
p.v1[i].Samples = p.v1[i].Samples[:0]
if p.sendExemplars {
p.v1[i].Exemplars = p.v1[i].Exemplars[:0]
}
if p.sendNativeHistograms {
p.v1[i].Histograms = p.v1[i].Histograms[:0]
}
}
return
}
p.v2Symbols.Reset()
for i := 0; i < batchLen; i++ {
p.v2[i].Samples = p.v2[i].Samples[:0]
if p.sendExemplars {
p.v2[i].Exemplars = p.v2[i].Exemplars[:0]
}
if p.sendNativeHistograms {
p.v2[i].Histograms = p.v2[i].Histograms[:0]
}
}
}
func (p *protoTimeSeriesBuffer) putV1(batch []timeSeries) {
for nPending, d := range batch {
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
p.v1[nPending].Labels = labelsToLabelsProto(d.seriesLabels, p.v1[nPending].Labels)
switch d.sType {
case tSample:
p.v1[nPending].Samples = append(p.v1[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
p.nPendingSamples++
case tExemplar:
p.v1[nPending].Exemplars = append(p.v1[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
p.nPendingExemplars++
case tHistogram:
p.v1[nPending].Histograms = append(p.v1[nPending].Histograms, prompb.FromIntHistogram(d.timestamp, d.histogram))
p.nPendingHistograms++
case tFloatHistogram:
p.v1[nPending].Histograms = append(p.v1[nPending].Histograms, prompb.FromFloatHistogram(d.timestamp, d.floatHistogram))
p.nPendingHistograms++
}
}
}
func (p *protoTimeSeriesBuffer) putV2(batch []timeSeries) {
for nPending, d := range batch {
// TODO(bwplotka): should we also safeguard against empty metadata here?
if d.metadata != nil {
p.v2[nPending].Metadata.Type = metricTypeToMetricTypeProtoV2(d.metadata.Type)
p.v2[nPending].Metadata.HelpRef = p.v2Symbols.Symbolize(d.metadata.Help)
p.v2[nPending].Metadata.HelpRef = p.v2Symbols.Symbolize(d.metadata.Unit)
} else {
// log?
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
p.v2[nPending].LabelsRefs = p.v2Symbols.SymbolizeLabels(d.seriesLabels, p.v2[nPending].LabelsRefs)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
p.v2[nPending].Samples = append(p.v2[nPending].Samples, writev2.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
p.nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
p.v2[nPending].Exemplars = append(p.v2[nPending].Exemplars, writev2.Exemplar{
LabelsRefs: p.v2Symbols.SymbolizeLabels(d.exemplarLabels, nil), // TODO: optimize, reuse slice
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
p.nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
p.v2[nPending].Histograms = append(p.v2[nPending].Histograms, writev2.FromIntHistogram(d.timestamp, d.histogram))
p.nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
p.v2[nPending].Histograms = append(p.v2[nPending].Histograms, writev2.FromFloatHistogram(d.timestamp, d.floatHistogram))
p.nPendingHistograms++
case tMetadata:
panic("We shouldn't receive metadata type data in queue manager for v2, it should already be inserted into the timeSeries")
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
return err
func isTimestampTooOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool {
limitTs := baseTime.Add(-sampleAgeLimit)
sampleTs := timestamp.Time(ts)
return sampleTs.Before(limitTs)
}
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
begin := time.Now()
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
return err
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) {
var droppedSeries, droppedSamples, droppedExemplars, droppedHistograms int
if p.v1 != nil {
// Bit duplicated code, but any interface/generic would make it only more code.
highest, lowest, p.v1, droppedSeries, droppedSamples, droppedExemplars, droppedHistograms = prompb.FilterTimeSeries(p.v1, func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// The first element should be the olderst.
case len(ts.Samples) > 0:
if isTimestampTooOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
return true
}
case len(ts.Histograms) > 0:
if isTimestampTooOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
return true
}
case len(ts.Exemplars) > 0:
if isTimestampTooOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
return true
}
}
return false
})
} else {
highest, lowest, p.v2, droppedSeries, droppedSamples, droppedExemplars, droppedHistograms = writev2.FilterTimeSeries(p.v2, func(ts writev2.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
if isTimestampTooOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
return true
}
case len(ts.Histograms) > 0:
if isTimestampTooOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
return true
}
case len(ts.Exemplars) > 0:
if isTimestampTooOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
return true
}
}
return false
})
}
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Add(-1 * float64(droppedSamples))
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Add(-1 * float64(droppedHistograms))
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Add(-1 * float64(droppedExemplars))
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSeries", droppedSeries, "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}
return highest, lowest
}
func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
func (p *protoTimeSeriesBuffer) BuildCompressedRequest(enc Compression) ([]byte, error) {
p.pBuf.Reset()
if p.v1 != nil {
pMsg := &prompb.WriteRequest{
Timeseries: p.v1,
}
if err := p.pBuf.Marshal(pMsg); err != nil {
return nil, err
}
} else {
pMsg := &writev2.Request{
Symbols: p.v2Symbols.Symbols(),
Timeseries: p.v2,
}
tmpbuf, err := pMsg.OptimizedMarshal(p.pBuf.Bytes())
if err != nil {
return nil, err
}
p.pBuf.SetBuf(tmpbuf)
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
p.buf = (p.buf)[0:cap(p.buf)]
compressed, err := compressPayload(&p.buf, p.pBuf.Bytes(), enc)
return compressed, err
}
func (s *shards) updateMetrics(err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
@ -1689,18 +1793,20 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc)
func (s *shards) sendSamplesWithBackoff(ctx context.Context, series *protoTimeSeriesBuffer, enc Compression) error {
// Don't filter, only obtain highest & lowest timestamps.
highest, lowest := series.FilterOutTooOldSamples(s.qm.logger, s.qm.metrics, time.Time{}, 0)
s.qm.buildRequestLimitTimestamp.Store(lowest)
// Build the request with no metadata.
reqBytes, err := series.BuildCompressedRequest(enc)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
reqSize := len(req)
*buf = req
reqSize := len(reqBytes)
series.buf = reqBytes
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
@ -1708,22 +1814,18 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
attemptStore := func(try int) error {
currentTime := time.Now()
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildWriteRequest(
s.qm.logger,
samples,
nil,
pBuf,
buf,
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
enc,
)
if isTimestampTooOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out too old samples during retries.
highest, lowest = series.FilterOutTooOldSamples(s.qm.logger, s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit))
s.qm.buildRequestLimitTimestamp.Store(lowest)
reqBytes, err := series.BuildCompressedRequest(enc)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
*buf = req
reqSize = len(reqBytes)
series.buf = reqBytes
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
@ -1731,39 +1833,38 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
// TODO(bwplotka): This does not count dropped samples in the filter above.
// Is this on purpose? Given drop samples metric?
attribute.Int("samples", series.nPendingSamples),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
if series.nPendingExemplars > 0 {
span.SetAttributes(attribute.Int("exemplars", series.nPendingExemplars))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
if series.nPendingHistograms > 0 {
span.SetAttributes(attribute.Int("histograms", series.nPendingHistograms))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.samplesTotal.Add(float64(series.nPendingSamples))
s.qm.metrics.exemplarsTotal.Add(float64(series.nPendingExemplars))
s.qm.metrics.histogramsTotal.Add(float64(series.nPendingHistograms))
err := s.qm.client().Store(ctx, series.buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {
span.RecordError(err)
return err
}
return nil
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
s.qm.metrics.retriedSamplesTotal.Add(float64(series.nPendingSamples))
s.qm.metrics.retriedExemplarsTotal.Add(float64(series.nPendingExemplars))
s.qm.metrics.retriedHistogramsTotal.Add(float64(series.nPendingHistograms))
}
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
@ -1779,148 +1880,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err
}
// sendV2Samples to the remote storage with backoff for recoverable errors.
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
reqSize := len(req)
*buf = req
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func(try int) error {
currentTime := time.Now()
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildV2WriteRequest(
s.qm.logger,
samples,
labels,
pBuf,
buf,
isV2TimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
enc,
)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
return err
}
*buf = req
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {
span.RecordError(err)
return err
}
return nil
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
}
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
return err
}
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return err
}
func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
// todo: should we also safeguard against empty metadata here?
if d.metadata != nil {
pendingData[nPending].Metadata.Type = metricTypeToMetricTypeProtoV2(d.metadata.Type)
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Help)
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Unit)
nPendingMetadata++
}
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingData[nPending].LabelsRefs = symbolTable.SymbolizeLabels(d.seriesLabels, pendingData[nPending].LabelsRefs)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, writev2.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{
LabelsRefs: symbolTable.SymbolizeLabels(d.exemplarLabels, nil), // TODO: optimize, reuse slice
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToV2HistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToV2HistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
case tMetadata:
// TODO: log or return an error?
// we shouldn't receive metadata type data here, it should already be inserted into the timeSeries
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata
}
func (t *QueueManager) sendWriteRequestWithBackoff(ctx context.Context, attempt func(int) error, onRetry func()) error {
backoff := t.cfg.MinBackoff
sleepDuration := model.Duration(0)
@ -2010,58 +1969,6 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda
}
}
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
// Get lowest timestamp
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}
func compressPayload(tmpbuf *[]byte, inp []byte, enc Compression) (compressed []byte, _ error) {
switch enc {
case SnappyBlockCompression:
@ -2075,130 +1982,3 @@ func compressPayload(tmpbuf *[]byte, inp []byte, enc Compression) (compressed []
return compressed, fmt.Errorf("Unknown compression scheme [%v]", enc)
}
}
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}
req := &prompb.WriteRequest{
Timeseries: timeSeries,
Metadata: metadata,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, highest, lowest, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed, err = compressPayload(buf, pBuf.Bytes(), enc)
if err != nil {
return nil, highest, lowest, err
}
return compressed, highest, lowest, nil
}
func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}
req := &writev2.Request{
Symbols: labels,
Timeseries: timeSeries,
}
if pBuf == nil {
pBuf = &[]byte{} // For convenience in tests. Not efficient.
}
data, err := req.OptimizedMarshal(*pBuf)
if err != nil {
return nil, highest, lowest, err
}
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed, err = compressPayload(buf, data, enc)
if err != nil {
return nil, highest, lowest, err
}
return compressed, highest, lowest, nil
}
func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) (int64, int64, []writev2.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
// Get the lowest timestamp.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}

View file

@ -1034,7 +1034,7 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
for _, h := range hh {
seriesName := getSeriesNameFromRef(series[h.Ref])
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], prompb.FromIntHistogram(h.T, h.H))
}
}
@ -1047,7 +1047,7 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
for _, fh := range fhs {
seriesName := getSeriesNameFromRef(series[fh.Ref])
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], prompb.FromFloatHistogram(fh.T, fh.FH))
}
}
@ -1134,7 +1134,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int) e
var reqProtoV2 writev2.Request
err = proto.Unmarshal(reqBuf, &reqProtoV2)
if err == nil {
reqProto, err = V2WriteRequestToWriteRequest(&reqProtoV2)
reqProto, err = v2WriteRequestToWriteRequest(&reqProtoV2)
}
}
if err != nil {
@ -1170,6 +1170,78 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int) e
return nil
}
func v2WriteRequestToWriteRequest(reqV2 *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(reqV2.Timeseries)),
// TODO handle metadata?
}
for i, rts := range reqV2.Timeseries {
b := labels.NewScratchBuilder(len(rts.LabelsRefs))
writev2.DesymbolizeLabels(&b, rts.LabelsRefs, reqV2.Symbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
b := labels.NewScratchBuilder(len(rts.LabelsRefs))
writev2.DesymbolizeLabels(&b, e.LabelsRefs, reqV2.Symbols).Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
}
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
for j, s := range rts.Samples {
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
req.Timeseries[i].Samples[j].Value = s.Value
}
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
for j, h := range rts.Histograms {
if h.IsFloatHistogram() {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
} else {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
}
for _, span := range h.NegativeSpans {
req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
for _, span := range h.PositiveSpans {
req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
req.Timeseries[i].Histograms[j].Sum = h.Sum
req.Timeseries[i].Histograms[j].Schema = h.Schema
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)
req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp
}
}
return req, nil
}
func (c *TestWriteClient) Name() string {
return "testwriteclient"
}
@ -1805,95 +1877,95 @@ func createDummyTimeSeries(instances int) []timeSeries {
return result
}
func BenchmarkBuildWriteRequest(b *testing.B) {
noopLogger := log.NewNopLogger()
bench := func(b *testing.B, batch []timeSeries) {
buff := make([]byte, 0)
seriesBuff := make([]prompb.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
pBuf := proto.NewBuffer(nil)
// Warmup buffers
for i := 0; i < 10; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
}
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
if err != nil {
b.Fatal(err)
}
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
}
twoBatch := createDummyTimeSeries(2)
tenBatch := createDummyTimeSeries(10)
hundredBatch := createDummyTimeSeries(100)
b.Run("2 instances", func(b *testing.B) {
bench(b, twoBatch)
})
b.Run("10 instances", func(b *testing.B) {
bench(b, tenBatch)
})
b.Run("1k instances", func(b *testing.B) {
bench(b, hundredBatch)
})
}
func BenchmarkBuildV2WriteRequest(b *testing.B) {
noopLogger := log.NewNopLogger()
type testcase struct {
batch []timeSeries
}
testCases := []testcase{
{createDummyTimeSeries(2)},
{createDummyTimeSeries(10)},
{createDummyTimeSeries(100)},
}
for _, tc := range testCases {
symbolTable := writev2.NewSymbolTable()
buff := make([]byte, 0)
seriesBuff := make([]writev2.TimeSeries, len(tc.batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []writev2.Sample{{}}
seriesBuff[i].Exemplars = []writev2.Exemplar{{}}
}
pBuf := []byte{}
// Warmup buffers
for i := 0; i < 10; i++ {
populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy")
}
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0
for j := 0; j < b.N; j++ {
populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer()
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy")
if err != nil {
b.Fatal(err)
}
symbolTable.Reset()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
})
}
}
//func BenchmarkBuildWriteRequest(b *testing.B) {
// noopLogger := log.NewNopLogger()
// bench := func(b *testing.B, batch []timeSeries) {
// buff := make([]byte, 0)
// seriesBuff := make([]prompb.TimeSeries, len(batch))
// for i := range seriesBuff {
// seriesBuff[i].Samples = []prompb.Sample{{}}
// seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
// }
// pBuf := proto.NewBuffer(nil)
//
// // Warmup buffers
// for i := 0; i < 10; i++ {
// populateTimeSeries(batch, seriesBuff, true, true)
// buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
// }
//
// b.ResetTimer()
// totalSize := 0
// for i := 0; i < b.N; i++ {
// populateTimeSeries(batch, seriesBuff, true, true)
// req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
// if err != nil {
// b.Fatal(err)
// }
// totalSize += len(req)
// b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
// }
// }
//
// twoBatch := createDummyTimeSeries(2)
// tenBatch := createDummyTimeSeries(10)
// hundredBatch := createDummyTimeSeries(100)
//
// b.Run("2 instances", func(b *testing.B) {
// bench(b, twoBatch)
// })
//
// b.Run("10 instances", func(b *testing.B) {
// bench(b, tenBatch)
// })
//
// b.Run("1k instances", func(b *testing.B) {
// bench(b, hundredBatch)
// })
//}
//
//func BenchmarkBuildV2WriteRequest(b *testing.B) {
// noopLogger := log.NewNopLogger()
// type testcase struct {
// batch []timeSeries
// }
// testCases := []testcase{
// {createDummyTimeSeries(2)},
// {createDummyTimeSeries(10)},
// {createDummyTimeSeries(100)},
// }
// for _, tc := range testCases {
// symbolTable := writev2.NewSymbolTable()
// buff := make([]byte, 0)
// seriesBuff := make([]writev2.TimeSeries, len(tc.batch))
// for i := range seriesBuff {
// seriesBuff[i].Samples = []writev2.Sample{{}}
// seriesBuff[i].Exemplars = []writev2.Exemplar{{}}
// }
// pBuf := []byte{}
//
// // Warmup buffers
// for i := 0; i < 10; i++ {
// populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
// buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy")
// }
//
// b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
// totalSize := 0
// for j := 0; j < b.N; j++ {
// populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
// b.ResetTimer()
// req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy")
// if err != nil {
// b.Fatal(err)
// }
// symbolTable.Reset()
// totalSize += len(req)
// b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
// }
// })
// }
//}
func TestDropOldTimeSeries(t *testing.T) {
size := 10
@ -1924,8 +1996,8 @@ func TestDropOldTimeSeries(t *testing.T) {
func TestIsSampleOld(t *testing.T) {
currentTime := time.Now()
require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second))))
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
require.True(t, isTimestampTooOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second))))
require.False(t, isTimestampTooOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
}
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
@ -1976,7 +2048,7 @@ func TestBuildTimeSeries(t *testing.T) {
testCases := []struct {
name string
ts []prompb.TimeSeries
filter func(ts prompb.TimeSeries) bool
tooOldTs int64
lowestTs int64
highestTs int64
droppedSamples int
@ -2010,7 +2082,6 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: nil,
responseLen: 3,
lowestTs: 1234567890,
highestTs: 1234567892,
@ -2051,7 +2122,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
tooOldTs: 1234567892,
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
@ -2093,7 +2164,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
tooOldTs: 1234567892,
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
@ -2135,7 +2206,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
tooOldTs: 1234567895,
responseLen: 2,
lowestTs: 1234567895,
highestTs: 1234567897,
@ -2146,12 +2217,25 @@ func TestBuildTimeSeries(t *testing.T) {
// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Len(t, result, tc.responseLen)
series := newProtoTimeSeriesBuffer(config.RemoteWriteProtoMsgV1, 0, true, true)
series.v1 = tc.ts
metrics := &queueManagerMetrics{}
baseTime := time.Time{}
sampleAgeLimit := time.Duration(0)
if tc.tooOldTs != 0 {
baseTime = timestamp.Time(tc.tooOldTs + 1)
sampleAgeLimit = 1 * time.Millisecond
}
highest, lowest := series.FilterOutTooOldSamples(log.NewNopLogger(), metrics, baseTime, sampleAgeLimit)
require.Equal(t, tc.highestTs, highest)
require.Equal(t, tc.lowestTs, lowest)
require.Equal(t, tc.droppedSamples, droppedSamples)
require.NotNil(t, series.v1)
require.Len(t, series.v1, tc.responseLen)
require.Equal(t, tc.droppedSamples, client_testutil.ToFloat64(metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld)))
})
}
}

View file

@ -27,11 +27,12 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
)
@ -148,28 +149,27 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
req := &writeRequest{}
// Now we have a decompressed buffer we can unmarshal it.
switch msg {
case config.RemoteWriteProtoMsgV1:
var req prompb.WriteRequest
if err := proto.Unmarshal(decompressed, &req); err != nil {
if err := proto.Unmarshal(decompressed, req.v1); err != nil {
// TODO(bwplotka): Add more context to responded error?
level.Error(h.logger).Log("msg", "Error decoding remote write request", "protobuf_message", msg, "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = h.write(r.Context(), &req)
case config.RemoteWriteProtoMsgV2:
var req writev2.Request
if err := proto.Unmarshal(decompressed, &req); err != nil {
if err := proto.Unmarshal(decompressed, req.v2); err != nil {
// TODO(bwplotka): Add more context to responded error?
level.Error(h.logger).Log("msg", "Error decoding remote write request", "protobuf_message", msg, "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = h.writeV2(r.Context(), &req)
}
err = h.write(r.Context(), req)
switch {
case err == nil:
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample):
@ -203,7 +203,7 @@ func (h *writeHandler) checkAppendExemplarError(err error, e exemplar.Exemplar,
}
}
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
func (h *writeHandler) write(ctx context.Context, req *writeRequest) (err error) {
outOfOrderExemplarErrs := 0
samplesWithInvalidLabels := 0
@ -217,28 +217,68 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}()
b := labels.NewScratchBuilder(0)
for _, ts := range req.Timeseries {
ls := labelProtosToLabels(&b, ts.Labels)
if err := req.ForeachTimeseries(func(ts *writeTimeSeries) error {
ls := ts.Labels(&b)
if !ls.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String())
samplesWithInvalidLabels++
continue
return nil
}
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
var ref storage.SeriesRef
if err := ts.ForeachSample(func(s writeSample) error {
ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue())
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", ls.String(), "timestamp", s.GetTimestamp())
}
return err
}
return nil
}); err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(&b, ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
ts.ForeachExemplar(&b, func(e exemplar.Exemplar) {
_, err := app.AppendExemplar(0, ls, e)
err = h.checkAppendExemplarError(err, e, &outOfOrderExemplarErrs)
if err != nil {
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", err)
}
})
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
if err := ts.ForeachHistogram(func(ts int64, hs *histogram.Histogram, fhs *histogram.FloatHistogram) error {
if _, err = app.AppendHistogram(0, ls, ts, hs, fhs); err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
// Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", ls.String(), "timestamp", ts)
}
return err
}
return nil
}); err != nil {
return err
}
// Update metadata, but only for PRW 2.0.
if m, isV2 := ts.Metadata(); isV2 {
if _, err = app.UpdateMetadata(0, ls, m); err != nil {
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
}
}
return nil
}); err != nil {
return err
}
if outOfOrderExemplarErrs > 0 {
@ -251,148 +291,6 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
return nil
}
func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := writev2.DesymbolizeLabels(ts.LabelsRefs, req.Symbols)
err := h.appendSamplesV2(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoV2ToExemplar(ep, req.Symbols)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistogramsV2(app, ts.Histograms, ls)
if err != nil {
return err
}
m := metadataProtoV2ToMetadata(ts.Metadata, req.Symbols)
if _, err = app.UpdateMetadata(0, ls, m); err != nil {
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}
func (h *writeHandler) appendExemplar(app storage.Appender, e exemplar.Exemplar, labels labels.Labels, outOfOrderExemplarErrs *int) {
_, err := app.AppendExemplar(0, labels, e)
err = h.checkAppendExemplarError(err, e, outOfOrderExemplarErrs)
if err != nil {
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", err)
}
}
func (h *writeHandler) appendSamples(app storage.Appender, ss []prompb.Sample, labels labels.Labels) error {
var ref storage.SeriesRef
var err error
for _, s := range ss {
ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue())
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp)
}
return err
}
}
return nil
}
func (h *writeHandler) appendSamplesV2(app storage.Appender, ss []writev2.Sample, labels labels.Labels) error {
var ref storage.SeriesRef
var err error
for _, s := range ss {
ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue())
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp)
}
return err
}
}
return nil
}
func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histogram, labels labels.Labels) error {
var err error
for _, hp := range hh {
if hp.IsFloatHistogram() {
fhs := FloatHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else {
hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
// Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
}
return err
}
}
return nil
}
func (h *writeHandler) appendHistogramsV2(app storage.Appender, hh []writev2.Histogram, labels labels.Labels) error {
var err error
for _, hp := range hh {
if hp.IsFloatHistogram() {
fhs := FloatV2HistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else {
hs := V2HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
// Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
}
return err
}
}
return nil
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
@ -433,9 +331,9 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
prwMetrics = append(prwMetrics, *ts)
}
err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{
err = h.rwHandler.write(r.Context(), &writeRequest{v1: &prompb.WriteRequest{
Timeseries: prwMetrics,
})
}})
switch {
case err == nil: