Sparse histogram remote-write support (#11001)

This commit is contained in:
Levi Harrison 2022-07-14 09:13:12 -04:00 committed by GitHub
parent 53982c3562
commit 08f3ddb864
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 600 additions and 167 deletions

View file

@ -748,12 +748,13 @@ func CheckTargetAddress(address model.LabelValue) error {
// RemoteWriteConfig is the configuration for writing to remote storage. // RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct { type RemoteWriteConfig struct {
URL *config.URL `yaml:"url"` URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"` Headers map[string]string `yaml:"headers,omitempty"`
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
Name string `yaml:"name,omitempty"` Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse // We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types. // values arbitrarily into the overflow maps of further-down types.

View file

@ -3045,6 +3045,9 @@ write_relabel_configs:
# Enables sending of exemplars over remote write. Note that exemplar storage itself must be enabled for exemplars to be scraped in the first place. # Enables sending of exemplars over remote write. Note that exemplar storage itself must be enabled for exemplars to be scraped in the first place.
[ send_exemplars: <boolean> | default = false ] [ send_exemplars: <boolean> | default = false ]
# Enables sending of native histograms, also known as sparse histograms, over remote write.
[ send_native_histograms: <boolean> | default = false ]
# Sets the `Authorization` header on every remote write request with the # Sets the `Authorization` header on every remote write request with the
# configured username and password. # configured username and password.
# password and password_file are mutually exclusive. # password and password_file are mutually exclusive.

View file

@ -49,6 +49,11 @@ func main() {
} }
fmt.Printf("\tExemplar: %+v %f %d\n", m, e.Value, e.Timestamp) fmt.Printf("\tExemplar: %+v %f %d\n", m, e.Value, e.Timestamp)
} }
for _, hp := range ts.Histograms {
h := remote.HistogramProtoToHistogram(hp)
fmt.Printf("\tHistogram: %s\n", (&h).String())
}
} }
}) })

View file

@ -16,7 +16,7 @@ require (
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aws/aws-sdk-go v1.44.20 // indirect github.com/aws/aws-sdk-go v1.44.47 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
@ -46,9 +46,9 @@ require (
go.uber.org/goleak v1.1.12 // indirect go.uber.org/goleak v1.1.12 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220624220833-87e55d714810 // indirect golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03 // indirect google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03 // indirect
@ -58,7 +58,7 @@ require (
) )
require ( require (
github.com/prometheus/prometheus v0.36.2 github.com/prometheus/prometheus v0.37.0-rc.0.0.20220713192720-53982c3562b0
golang.org/x/oauth2 v0.0.0-20220630143837-2104d58473e0 // indirect golang.org/x/oauth2 v0.0.0-20220630143837-2104d58473e0 // indirect
) )

View file

@ -174,6 +174,7 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
github.com/aws/aws-sdk-go v1.43.11/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.43.11/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.20 h1:nllTRN24EfhDSeKsNbIc6HoC8Ogd2NCJTRB8l84kDlM= github.com/aws/aws-sdk-go v1.44.20 h1:nllTRN24EfhDSeKsNbIc6HoC8Ogd2NCJTRB8l84kDlM=
github.com/aws/aws-sdk-go v1.44.20/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.44.20/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.47/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM=
@ -1022,6 +1023,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c/go.mod h1:S5n0C6tSgdnwWshBUceRx5G1OsjLv/EeZ9t3wIfEtsY= github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c/go.mod h1:S5n0C6tSgdnwWshBUceRx5G1OsjLv/EeZ9t3wIfEtsY=
github.com/prometheus/prometheus v0.36.2 h1:ZMqiEKdamv/YgI/7V5WtQGWbwEerCsXJ26CZgeXDUXM= github.com/prometheus/prometheus v0.36.2 h1:ZMqiEKdamv/YgI/7V5WtQGWbwEerCsXJ26CZgeXDUXM=
github.com/prometheus/prometheus v0.36.2/go.mod h1:GBcYMr17Nr2/iDIrWmiy9wC5GKl0NOQ5R9XynB1HAG8= github.com/prometheus/prometheus v0.36.2/go.mod h1:GBcYMr17Nr2/iDIrWmiy9wC5GKl0NOQ5R9XynB1HAG8=
github.com/prometheus/prometheus v0.37.0-rc.0.0.20220713192720-53982c3562b0 h1:bdQvFWZ0EM5n6ditW8wtXWy3RMB19vhNiI5TWlgBYdM=
github.com/prometheus/prometheus v0.37.0-rc.0.0.20220713192720-53982c3562b0/go.mod h1:y+uCk/SdO73g9bMtjCZbejjmcjY4X+xLuKN7cBor5UE=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
@ -1519,6 +1522,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220624220833-87e55d714810 h1:rHZQSjJdAI4Xf5Qzeh2bBc5YJIkPFVM6oDtMFYmgws0= golang.org/x/sys v0.0.0-20220624220833-87e55d714810 h1:rHZQSjJdAI4Xf5Qzeh2bBc5YJIkPFVM6oDtMFYmgws0=
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
@ -1544,6 +1548,7 @@ golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs= golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -502,6 +502,64 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
} }
} }
func HistogramProtoToHistogram(hp prompb.Histogram) histogram.Histogram {
h := histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveBuckets: hp.PositiveBuckets.GetDelta(),
NegativeBuckets: hp.NegativeBuckets.GetDelta(),
}
if hp.PositiveBuckets != nil {
h.PositiveSpans = spansProtoToSpans(hp.PositiveBuckets.Span)
}
if hp.NegativeBuckets != nil {
h.NegativeSpans = spansProtoToSpans(hp.NegativeBuckets.Span)
}
return h
}
func spansProtoToSpans(s []*prompb.Buckets_Span) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeBuckets: &prompb.Buckets{
Span: spansToSpansProto(h.NegativeSpans),
Delta: h.NegativeBuckets,
},
PositiveBuckets: &prompb.Buckets{
Span: spansToSpansProto(h.PositiveSpans),
Delta: h.PositiveBuckets,
},
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []*prompb.Buckets_Span {
spans := make([]*prompb.Buckets_Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &prompb.Buckets_Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
metric := make(model.Metric, len(labelPairs)) metric := make(model.Metric, len(labelPairs))

View file

@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
@ -27,6 +28,18 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
) )
var testHistogram = histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 0,
Sum: 20,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{-1},
}
var writeRequestFixture = &prompb.WriteRequest{ var writeRequestFixture = &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{ Timeseries: []prompb.TimeSeries{
{ {
@ -37,8 +50,9 @@ var writeRequestFixture = &prompb.WriteRequest{
{Name: "d", Value: "e"}, {Name: "d", Value: "e"},
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar"},
}, },
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{histogramToHistogramProto(0, &testHistogram)},
}, },
{ {
Labels: []prompb.Label{ Labels: []prompb.Label{
@ -48,8 +62,9 @@ var writeRequestFixture = &prompb.WriteRequest{
{Name: "d", Value: "e"}, {Name: "d", Value: "e"},
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar"},
}, },
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{histogramToHistogramProto(1, &testHistogram)},
}, },
}, },
} }
@ -349,3 +364,9 @@ func TestDecodeWriteRequest(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, writeRequestFixture, actual) require.Equal(t, writeRequestFixture, actual)
} }
func TestNilHistogramProto(t *testing.T) {
// This function will panic if it impromperly handles nil
// values, causing the test to fail.
HistogramProtoToHistogram(prompb.Histogram{})
}

View file

@ -32,6 +32,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
@ -54,30 +55,35 @@ const (
type queueManagerMetrics struct { type queueManagerMetrics struct {
reg prometheus.Registerer reg prometheus.Registerer
samplesTotal prometheus.Counter samplesTotal prometheus.Counter
exemplarsTotal prometheus.Counter exemplarsTotal prometheus.Counter
metadataTotal prometheus.Counter histogramsTotal prometheus.Counter
failedSamplesTotal prometheus.Counter metadataTotal prometheus.Counter
failedExemplarsTotal prometheus.Counter failedSamplesTotal prometheus.Counter
failedMetadataTotal prometheus.Counter failedExemplarsTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter failedHistogramsTotal prometheus.Counter
retriedExemplarsTotal prometheus.Counter failedMetadataTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter retriedSamplesTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter retriedExemplarsTotal prometheus.Counter
droppedExemplarsTotal prometheus.Counter retriedHistogramsTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter retriedMetadataTotal prometheus.Counter
sentBatchDuration prometheus.Histogram droppedSamplesTotal prometheus.Counter
highestSentTimestamp *maxTimestamp droppedExemplarsTotal prometheus.Counter
pendingSamples prometheus.Gauge droppedHistogramsTotal prometheus.Counter
pendingExemplars prometheus.Gauge enqueueRetriesTotal prometheus.Counter
shardCapacity prometheus.Gauge sentBatchDuration prometheus.Histogram
numShards prometheus.Gauge highestSentTimestamp *maxTimestamp
maxNumShards prometheus.Gauge pendingSamples prometheus.Gauge
minNumShards prometheus.Gauge pendingExemplars prometheus.Gauge
desiredNumShards prometheus.Gauge pendingHistograms prometheus.Gauge
sentBytesTotal prometheus.Counter shardCapacity prometheus.Gauge
metadataBytesTotal prometheus.Counter numShards prometheus.Gauge
maxSamplesPerSend prometheus.Gauge maxNumShards prometheus.Gauge
minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
sentBytesTotal prometheus.Counter
metadataBytesTotal prometheus.Counter
maxSamplesPerSend prometheus.Gauge
} }
func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics { func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics {
@ -103,6 +109,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of exemplars sent to remote storage.", Help: "Total number of exemplars sent to remote storage.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) })
m.histogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_total",
Help: "Total number of histograms sent to remote storage.",
ConstLabels: constLabels,
})
m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -124,6 +137,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of exemplars which failed on send to remote storage, non-recoverable errors.", Help: "Total number of exemplars which failed on send to remote storage, non-recoverable errors.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) })
m.failedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_failed_total",
Help: "Total number of histograms which failed on send to remote storage, non-recoverable errors.",
ConstLabels: constLabels,
})
m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -145,6 +165,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of exemplars which failed on send to remote storage but were retried because the send error was recoverable.", Help: "Total number of exemplars which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) })
m.retriedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_retried_total",
Help: "Total number of histograms which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -166,6 +193,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) })
m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_dropped_total",
Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -204,6 +238,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "The number of exemplars pending in the queues shards to be sent to the remote storage.", Help: "The number of exemplars pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) })
m.pendingHistograms = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_pending",
Help: "The number of histograms pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels,
})
m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -269,20 +310,25 @@ func (m *queueManagerMetrics) register() {
m.reg.MustRegister( m.reg.MustRegister(
m.samplesTotal, m.samplesTotal,
m.exemplarsTotal, m.exemplarsTotal,
m.histogramsTotal,
m.metadataTotal, m.metadataTotal,
m.failedSamplesTotal, m.failedSamplesTotal,
m.failedExemplarsTotal, m.failedExemplarsTotal,
m.failedHistogramsTotal,
m.failedMetadataTotal, m.failedMetadataTotal,
m.retriedSamplesTotal, m.retriedSamplesTotal,
m.retriedExemplarsTotal, m.retriedExemplarsTotal,
m.retriedHistogramsTotal,
m.retriedMetadataTotal, m.retriedMetadataTotal,
m.droppedSamplesTotal, m.droppedSamplesTotal,
m.droppedExemplarsTotal, m.droppedExemplarsTotal,
m.droppedHistogramsTotal,
m.enqueueRetriesTotal, m.enqueueRetriesTotal,
m.sentBatchDuration, m.sentBatchDuration,
m.highestSentTimestamp, m.highestSentTimestamp,
m.pendingSamples, m.pendingSamples,
m.pendingExemplars, m.pendingExemplars,
m.pendingHistograms,
m.shardCapacity, m.shardCapacity,
m.numShards, m.numShards,
m.maxNumShards, m.maxNumShards,
@ -299,20 +345,25 @@ func (m *queueManagerMetrics) unregister() {
if m.reg != nil { if m.reg != nil {
m.reg.Unregister(m.samplesTotal) m.reg.Unregister(m.samplesTotal)
m.reg.Unregister(m.exemplarsTotal) m.reg.Unregister(m.exemplarsTotal)
m.reg.Unregister(m.histogramsTotal)
m.reg.Unregister(m.metadataTotal) m.reg.Unregister(m.metadataTotal)
m.reg.Unregister(m.failedSamplesTotal) m.reg.Unregister(m.failedSamplesTotal)
m.reg.Unregister(m.failedExemplarsTotal) m.reg.Unregister(m.failedExemplarsTotal)
m.reg.Unregister(m.failedHistogramsTotal)
m.reg.Unregister(m.failedMetadataTotal) m.reg.Unregister(m.failedMetadataTotal)
m.reg.Unregister(m.retriedSamplesTotal) m.reg.Unregister(m.retriedSamplesTotal)
m.reg.Unregister(m.retriedExemplarsTotal) m.reg.Unregister(m.retriedExemplarsTotal)
m.reg.Unregister(m.retriedHistogramsTotal)
m.reg.Unregister(m.retriedMetadataTotal) m.reg.Unregister(m.retriedMetadataTotal)
m.reg.Unregister(m.droppedSamplesTotal) m.reg.Unregister(m.droppedSamplesTotal)
m.reg.Unregister(m.droppedExemplarsTotal) m.reg.Unregister(m.droppedExemplarsTotal)
m.reg.Unregister(m.droppedHistogramsTotal)
m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.enqueueRetriesTotal)
m.reg.Unregister(m.sentBatchDuration) m.reg.Unregister(m.sentBatchDuration)
m.reg.Unregister(m.highestSentTimestamp) m.reg.Unregister(m.highestSentTimestamp)
m.reg.Unregister(m.pendingSamples) m.reg.Unregister(m.pendingSamples)
m.reg.Unregister(m.pendingExemplars) m.reg.Unregister(m.pendingExemplars)
m.reg.Unregister(m.pendingHistograms)
m.reg.Unregister(m.shardCapacity) m.reg.Unregister(m.shardCapacity)
m.reg.Unregister(m.numShards) m.reg.Unregister(m.numShards)
m.reg.Unregister(m.maxNumShards) m.reg.Unregister(m.maxNumShards)
@ -341,15 +392,16 @@ type WriteClient interface {
type QueueManager struct { type QueueManager struct {
lastSendTimestamp atomic.Int64 lastSendTimestamp atomic.Int64
logger log.Logger logger log.Logger
flushDeadline time.Duration flushDeadline time.Duration
cfg config.QueueConfig cfg config.QueueConfig
mcfg config.MetadataConfig mcfg config.MetadataConfig
externalLabels labels.Labels externalLabels labels.Labels
relabelConfigs []*relabel.Config relabelConfigs []*relabel.Config
sendExemplars bool sendExemplars bool
watcher *wal.Watcher sendNativeHistograms bool
metadataWatcher *MetadataWatcher watcher *wal.Watcher
metadataWatcher *MetadataWatcher
clientMtx sync.RWMutex clientMtx sync.RWMutex
storeClient WriteClient storeClient WriteClient
@ -396,6 +448,7 @@ func NewQueueManager(
highestRecvTimestamp *maxTimestamp, highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager, sm ReadyScrapeManager,
enableExemplarRemoteWrite bool, enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
) *QueueManager { ) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
@ -403,14 +456,15 @@ func NewQueueManager(
logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint()) logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint())
t := &QueueManager{ t := &QueueManager{
logger: logger, logger: logger,
flushDeadline: flushDeadline, flushDeadline: flushDeadline,
cfg: cfg, cfg: cfg,
mcfg: mCfg, mcfg: mCfg,
externalLabels: externalLabels, externalLabels: externalLabels,
relabelConfigs: relabelConfigs, relabelConfigs: relabelConfigs,
storeClient: client, storeClient: client,
sendExemplars: enableExemplarRemoteWrite, sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -430,7 +484,7 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp, highestRecvTimestamp: highestRecvTimestamp,
} }
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite) t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
if t.mcfg.Send { if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
} }
@ -538,11 +592,11 @@ outer:
return false return false
default: default:
} }
if t.shards.enqueue(s.Ref, sampleOrExemplar{ if t.shards.enqueue(s.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls,
timestamp: s.T, timestamp: s.T,
value: s.V, value: s.V,
isSample: true, sType: tSample,
}) { }) {
continue outer continue outer
} }
@ -588,11 +642,59 @@ outer:
return false return false
default: default:
} }
if t.shards.enqueue(e.Ref, sampleOrExemplar{ if t.shards.enqueue(e.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls,
timestamp: e.T, timestamp: e.T,
value: e.V, value: e.V,
exemplarLabels: e.Labels, exemplarLabels: e.Labels,
sType: tExemplar,
}) {
continue outer
}
t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff
}
}
}
return true
}
func (t *QueueManager) AppendHistograms(histograms []record.RefHistogram) bool {
if !t.sendNativeHistograms {
return true
}
outer:
for _, h := range histograms {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond)
for {
select {
case <-t.quit:
return false
default:
}
if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls,
timestamp: h.T,
histogram: h.H,
sType: tHistogram,
}) { }) {
continue outer continue outer
} }
@ -921,8 +1023,9 @@ type shards struct {
qm *QueueManager qm *QueueManager
queues []*queue queues []*queue
// So we can accurately track how many of each are lost during shard shutdowns. // So we can accurately track how many of each are lost during shard shutdowns.
enqueuedSamples atomic.Int64 enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64 enqueuedExemplars atomic.Int64
enqueuedHistograms atomic.Int64
// Emulate a wait group with a channel and an atomic int, as you // Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group. // cannot select on a wait group.
@ -934,9 +1037,10 @@ type shards struct {
// Hard shutdown context is used to terminate outgoing HTTP connections // Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate. // after giving them a chance to terminate.
hardShutdown context.CancelFunc hardShutdown context.CancelFunc
samplesDroppedOnHardShutdown atomic.Uint32 samplesDroppedOnHardShutdown atomic.Uint32
exemplarsDroppedOnHardShutdown atomic.Uint32 exemplarsDroppedOnHardShutdown atomic.Uint32
histogramsDroppedOnHardShutdown atomic.Uint32
} }
// start the shards; must be called before any call to enqueue. // start the shards; must be called before any call to enqueue.
@ -961,8 +1065,10 @@ func (s *shards) start(n int) {
s.done = make(chan struct{}) s.done = make(chan struct{})
s.enqueuedSamples.Store(0) s.enqueuedSamples.Store(0)
s.enqueuedExemplars.Store(0) s.enqueuedExemplars.Store(0)
s.enqueuedHistograms.Store(0)
s.samplesDroppedOnHardShutdown.Store(0) s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0)
s.histogramsDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i]) go s.runShard(hardShutdownCtx, i, newQueues[i])
} }
@ -1008,7 +1114,7 @@ func (s *shards) stop() {
// retry. A shard is full when its configured capacity has been reached, // retry. A shard is full when its configured capacity has been reached,
// specifically, when s.queues[shard] has filled its batchQueue channel and the // specifically, when s.queues[shard] has filled its batchQueue channel and the
// partial batch has also been filled. // partial batch has also been filled.
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
@ -1021,12 +1127,16 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
if !appended { if !appended {
return false return false
} }
if data.isSample { switch data.sType {
case tSample:
s.qm.metrics.pendingSamples.Inc() s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc() s.enqueuedSamples.Inc()
} else { case tExemplar:
s.qm.metrics.pendingExemplars.Inc() s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc() s.enqueuedExemplars.Inc()
case tHistogram:
s.qm.metrics.pendingHistograms.Inc()
s.enqueuedHistograms.Inc()
} }
return true return true
} }
@ -1035,24 +1145,34 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
type queue struct { type queue struct {
// batchMtx covers operations appending to or publishing the partial batch. // batchMtx covers operations appending to or publishing the partial batch.
batchMtx sync.Mutex batchMtx sync.Mutex
batch []sampleOrExemplar batch []timeSeries
batchQueue chan []sampleOrExemplar batchQueue chan []timeSeries
// Since we know there are a limited number of batches out, using a stack // Since we know there are a limited number of batches out, using a stack
// is easy and safe so a sync.Pool is not necessary. // is easy and safe so a sync.Pool is not necessary.
// poolMtx covers adding and removing batches from the batchPool. // poolMtx covers adding and removing batches from the batchPool.
poolMtx sync.Mutex poolMtx sync.Mutex
batchPool [][]sampleOrExemplar batchPool [][]timeSeries
} }
type sampleOrExemplar struct { type timeSeries struct {
seriesLabels labels.Labels seriesLabels labels.Labels
value float64 value float64
histogram *histogram.Histogram
timestamp int64 timestamp int64
exemplarLabels labels.Labels exemplarLabels labels.Labels
isSample bool // The type of series: sample, exemplar, or histogram.
sType seriesType
} }
type seriesType int
const (
tSample seriesType = iota
tExemplar
tHistogram
)
func newQueue(batchSize, capacity int) *queue { func newQueue(batchSize, capacity int) *queue {
batches := capacity / batchSize batches := capacity / batchSize
// Always create an unbuffered channel even if capacity is configured to be // Always create an unbuffered channel even if capacity is configured to be
@ -1061,17 +1181,17 @@ func newQueue(batchSize, capacity int) *queue {
batches = 1 batches = 1
} }
return &queue{ return &queue{
batch: make([]sampleOrExemplar, 0, batchSize), batch: make([]timeSeries, 0, batchSize),
batchQueue: make(chan []sampleOrExemplar, batches), batchQueue: make(chan []timeSeries, batches),
// batchPool should have capacity for everything in the channel + 1 for // batchPool should have capacity for everything in the channel + 1 for
// the batch being processed. // the batch being processed.
batchPool: make([][]sampleOrExemplar, 0, batches+1), batchPool: make([][]timeSeries, 0, batches+1),
} }
} }
// Append the sampleOrExemplar to the buffered batch. Returns false if it // Append the timeSeries to the buffered batch. Returns false if it
// cannot be added and must be retried. // cannot be added and must be retried.
func (q *queue) Append(datum sampleOrExemplar) bool { func (q *queue) Append(datum timeSeries) bool {
q.batchMtx.Lock() q.batchMtx.Lock()
defer q.batchMtx.Unlock() defer q.batchMtx.Unlock()
q.batch = append(q.batch, datum) q.batch = append(q.batch, datum)
@ -1089,12 +1209,12 @@ func (q *queue) Append(datum sampleOrExemplar) bool {
return true return true
} }
func (q *queue) Chan() <-chan []sampleOrExemplar { func (q *queue) Chan() <-chan []timeSeries {
return q.batchQueue return q.batchQueue
} }
// Batch returns the current batch and allocates a new batch. // Batch returns the current batch and allocates a new batch.
func (q *queue) Batch() []sampleOrExemplar { func (q *queue) Batch() []timeSeries {
q.batchMtx.Lock() q.batchMtx.Lock()
defer q.batchMtx.Unlock() defer q.batchMtx.Unlock()
@ -1109,7 +1229,7 @@ func (q *queue) Batch() []sampleOrExemplar {
} }
// ReturnForReuse adds the batch buffer back to the internal pool. // ReturnForReuse adds the batch buffer back to the internal pool.
func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { func (q *queue) ReturnForReuse(batch []timeSeries) {
q.poolMtx.Lock() q.poolMtx.Lock()
defer q.poolMtx.Unlock() defer q.poolMtx.Unlock()
if len(q.batchPool) < cap(q.batchPool) { if len(q.batchPool) < cap(q.batchPool) {
@ -1149,7 +1269,7 @@ func (q *queue) tryEnqueueingBatch(done <-chan struct{}) bool {
} }
} }
func (q *queue) newBatch(capacity int) []sampleOrExemplar { func (q *queue) newBatch(capacity int) []timeSeries {
q.poolMtx.Lock() q.poolMtx.Lock()
defer q.poolMtx.Unlock() defer q.poolMtx.Unlock()
batches := len(q.batchPool) batches := len(q.batchPool)
@ -1158,7 +1278,7 @@ func (q *queue) newBatch(capacity int) []sampleOrExemplar {
q.batchPool = q.batchPool[:batches-1] q.batchPool = q.batchPool[:batches-1]
return batch return batch
} }
return make([]sampleOrExemplar, 0, capacity) return make([]timeSeries, 0, capacity)
} }
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
@ -1209,22 +1329,26 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
// Remove them from pending and mark them as failed. // Remove them from pending and mark them as failed.
droppedSamples := int(s.enqueuedSamples.Load()) droppedSamples := int(s.enqueuedSamples.Load())
droppedExemplars := int(s.enqueuedExemplars.Load()) droppedExemplars := int(s.enqueuedExemplars.Load())
droppedHistograms := int(s.enqueuedHistograms.Load())
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars)) s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars))
s.qm.metrics.pendingHistograms.Sub(float64(droppedHistograms))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars)) s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars))
s.qm.metrics.failedHistogramsTotal.Add(float64(droppedHistograms))
s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples)) s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples))
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars)) s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
s.histogramsDroppedOnHardShutdown.Add(uint32(droppedHistograms))
return return
case batch, ok := <-batchQueue: case batch, ok := <-batchQueue:
if !ok { if !ok {
return return
} }
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
stop() stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1232,10 +1356,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C: case <-timer.C:
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1243,43 +1367,51 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prompb.TimeSeries) (int, int) { func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) {
var nPendingSamples, nPendingExemplars int var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars { if s.qm.sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
} }
if s.qm.sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
if d.isSample { pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
nPendingSamples++ nPendingSamples++
} else { case tExemplar:
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil), Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
nPendingExemplars++ nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, histogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
} }
} }
return nPendingSamples, nPendingExemplars return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) { func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now() begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, pBuf, buf) err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf)
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
} }
// These counters are used to calculate the dynamic sharding, and as such // These counters are used to calculate the dynamic sharding, and as such
@ -1287,16 +1419,18 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.qm.dataOut.incr(int64(len(samples))) s.qm.dataOut.incr(int64(len(samples)))
s.qm.dataOutDuration.incr(int64(time.Since(begin))) s.qm.dataOutDuration.incr(int64(time.Since(begin)))
s.qm.lastSendTimestamp.Store(time.Now().Unix()) s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars also should be subtracted as an error means // Pending samples/exemplars/histograms also should be subtracted as an error means
// they will not be retried. // they will not be retried.
s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
s.enqueuedSamples.Sub(int64(sampleCount)) s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount)) s.enqueuedExemplars.Sub(int64(exemplarCount))
s.enqueuedHistograms.Sub(int64(histogramCount))
} }
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) error { func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
if err != nil { if err != nil {
@ -1326,10 +1460,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if exemplarCount > 0 { if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount)) span.SetAttributes(attribute.Int("exemplars", exemplarCount))
} }
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now() begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
err := s.qm.client().Store(ctx, *buf) err := s.qm.client().Store(ctx, *buf)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -1344,6 +1482,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
onRetry := func() { onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
} }
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
@ -1420,6 +1559,9 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp highest = ts.Exemplars[0].Timestamp
} }
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
} }
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{

View file

@ -36,6 +36,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/timestamp"
@ -60,13 +61,15 @@ func newHighestTimestampMetric() *maxTimestamp {
func TestSampleDelivery(t *testing.T) { func TestSampleDelivery(t *testing.T) {
testcases := []struct { testcases := []struct {
name string name string
samples bool samples bool
exemplars bool exemplars bool
histograms bool
}{ }{
{samples: true, exemplars: false, name: "samples only"}, {samples: true, exemplars: false, histograms: false, name: "samples only"},
{samples: true, exemplars: true, name: "both samples and exemplars"}, {samples: true, exemplars: true, histograms: true, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, name: "exemplars only"}, {samples: false, exemplars: true, histograms: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, name: "histograms only"},
} }
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
@ -86,6 +89,7 @@ func TestSampleDelivery(t *testing.T) {
writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = queueConfig writeConfig.QueueConfig = queueConfig
writeConfig.SendExemplars = true writeConfig.SendExemplars = true
writeConfig.SendNativeHistograms = true
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
@ -97,9 +101,10 @@ func TestSampleDelivery(t *testing.T) {
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
exemplars []record.RefExemplar exemplars []record.RefExemplar
histograms []record.RefHistogram
) )
// Generates same series in both cases. // Generates same series in both cases.
@ -109,6 +114,9 @@ func TestSampleDelivery(t *testing.T) {
if tc.exemplars { if tc.exemplars {
exemplars, series = createExemplars(n, n) exemplars, series = createExemplars(n, n)
} }
if tc.histograms {
histograms, series = createHistograms(n, n)
}
// Apply new config. // Apply new config.
queueConfig.Capacity = len(samples) queueConfig.Capacity = len(samples)
@ -126,15 +134,19 @@ func TestSampleDelivery(t *testing.T) {
// Send first half of data. // Send first half of data.
c.expectSamples(samples[:len(samples)/2], series) c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series) c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
qm.Append(samples[:len(samples)/2]) qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2]) qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
c.waitForExpectedData(t) c.waitForExpectedData(t)
// Send second half of data. // Send second half of data.
c.expectSamples(samples[len(samples)/2:], series) c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series) c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series)
qm.Append(samples[len(samples)/2:]) qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:]) qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
c.waitForExpectedData(t) c.waitForExpectedData(t)
}) })
} }
@ -149,7 +161,7 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -188,7 +200,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -230,7 +242,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -250,7 +262,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -288,7 +300,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
for j := 0; j < numSeries; j++ { for j := 0; j < numSeries; j++ {
@ -317,7 +329,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -353,7 +365,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() { go func() {
for { for {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.Start() m.Start()
h.Unlock() h.Unlock()
h.Lock() h.Lock()
@ -388,7 +400,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -433,7 +445,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -460,7 +472,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient() c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -512,7 +524,7 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient() client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.numShards = c.startingShards m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn) m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut) m.dataOut.incr(c.samplesOut)
@ -571,6 +583,37 @@ func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []recor
return exemplars, series return exemplars, series
} }
func createHistograms(numSamples, numSeries int) ([]record.RefHistogram, []record.RefSeries) {
histograms := make([]record.RefHistogram, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ {
h := record.RefHistogram{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
H: &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
},
}
histograms = append(histograms, h)
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.Labels{{Name: "__name__", Value: name}},
})
}
return histograms, series
}
func getSeriesNameFromRef(r record.RefSeries) string { func getSeriesNameFromRef(r record.RefSeries) string {
for _, l := range r.Labels { for _, l := range r.Labels {
if l.Name == "__name__" { if l.Name == "__name__" {
@ -581,16 +624,18 @@ func getSeriesNameFromRef(r record.RefSeries) string {
} }
type TestWriteClient struct { type TestWriteClient struct {
receivedSamples map[string][]prompb.Sample receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample
receivedExemplars map[string][]prompb.Exemplar receivedExemplars map[string][]prompb.Exemplar
expectedExemplars map[string][]prompb.Exemplar expectedExemplars map[string][]prompb.Exemplar
receivedMetadata map[string][]prompb.MetricMetadata receivedHistograms map[string][]prompb.Histogram
writesReceived int expectedHistograms map[string][]prompb.Histogram
withWaitGroup bool receivedMetadata map[string][]prompb.MetricMetadata
wg sync.WaitGroup writesReceived int
mtx sync.Mutex withWaitGroup bool
buf []byte wg sync.WaitGroup
mtx sync.Mutex
buf []byte
} }
func NewTestWriteClient() *TestWriteClient { func NewTestWriteClient() *TestWriteClient {
@ -644,6 +689,23 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
c.wg.Add(len(ss)) c.wg.Add(len(ss))
} }
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogram, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedHistograms = map[string][]prompb.Histogram{}
c.receivedHistograms = map[string][]prompb.Histogram{}
for _, h := range hh {
seriesName := getSeriesNameFromRef(series[h.Ref])
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], histogramToHistogramProto(h.T, h.H))
}
c.wg.Add(len(hh))
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
@ -657,6 +719,9 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
for ts, expectedExemplar := range c.expectedExemplars { for ts, expectedExemplar := range c.expectedExemplars {
require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts) require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts)
} }
for ts, expectedHistogram := range c.expectedHistograms {
require.Equal(tb, expectedHistogram, c.receivedHistograms[ts], ts)
}
} }
func (c *TestWriteClient) Store(_ context.Context, req []byte) error { func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
@ -676,7 +741,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
return err return err
} }
count := 0 count := 0
for _, ts := range reqProto.Timeseries { for _, ts := range reqProto.Timeseries {
var seriesName string var seriesName string
@ -695,6 +759,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
count++ count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
} }
for _, histogram := range ts.Histograms {
count++
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram)
}
} }
if c.withWaitGroup { if c.withWaitGroup {
c.wg.Add(-count) c.wg.Add(-count)
@ -791,7 +860,7 @@ func BenchmarkSampleSend(b *testing.B) {
dir := b.TempDir() dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -837,7 +906,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false) cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()
@ -913,7 +982,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
// Need to start the queue manager so the proper metrics are initialized. // Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual // However we can stop it right away since we don't need to do any actual
@ -990,7 +1059,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
for _, tc := range []struct { for _, tc := range []struct {
name string name string
@ -1181,7 +1250,7 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
batchSize := 10 batchSize := 10
queue := newQueue(batchSize, capacity) queue := newQueue(batchSize, capacity)
for i := 0; i < capacity+batchSize; i++ { for i := 0; i < capacity+batchSize; i++ {
queue.Append(sampleOrExemplar{}) queue.Append(timeSeries{})
} }
done := make(chan struct{}) done := make(chan struct{})

View file

@ -45,6 +45,12 @@ var (
Name: "exemplars_in_total", Name: "exemplars_in_total",
Help: "Exemplars in to remote storage, compare to exemplars out for queue managers.", Help: "Exemplars in to remote storage, compare to exemplars out for queue managers.",
}) })
histogramsIn = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_in_total",
Help: "Histograms in to remote storage, compare to histograms out for queue managers.",
})
) )
// WriteStorage represents all the remote write storage. // WriteStorage represents all the remote write storage.
@ -188,6 +194,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.highestTimestamp, rws.highestTimestamp,
rws.scraper, rws.scraper,
rwConf.SendExemplars, rwConf.SendExemplars,
rwConf.SendNativeHistograms,
) )
// Keep track of which queues are new so we know which to start. // Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash) newHashes = append(newHashes, hash)
@ -270,7 +277,7 @@ func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels,
return 0, nil return 0, nil
} }
func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, _ *histogram.Histogram) (storage.SeriesRef, error) { func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, h *histogram.Histogram) (storage.SeriesRef, error) {
t.histograms++ t.histograms++
if ts > t.highestTimestamp { if ts > t.highestTimestamp {
t.highestTimestamp = ts t.highestTimestamp = ts
@ -280,10 +287,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
// Commit implements storage.Appender. // Commit implements storage.Appender.
func (t *timestampTracker) Commit() error { func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars) t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)
samplesIn.Add(float64(t.samples)) samplesIn.Add(float64(t.samples))
exemplarsIn.Add(float64(t.exemplars)) exemplarsIn.Add(float64(t.exemplars))
histogramsIn.Add(float64(t.histograms))
t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000)) t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000))
return nil return nil
} }

View file

@ -117,6 +117,20 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
} }
} }
for _, hp := range ts.Histograms {
hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, &hs)
if err != nil {
unwrappedErr := errors.Unwrap(err)
// Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
}
return err
}
}
} }
if outOfOrderExemplarErrs > 0 { if outOfOrderExemplarErrs > 0 {

View file

@ -50,6 +50,7 @@ func TestRemoteWriteHandler(t *testing.T) {
i := 0 i := 0
j := 0 j := 0
k := 0
for _, ts := range writeRequestFixture.Timeseries { for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(ts.Labels) labels := labelProtosToLabels(ts.Labels)
for _, s := range ts.Samples { for _, s := range ts.Samples {
@ -62,6 +63,12 @@ func TestRemoteWriteHandler(t *testing.T) {
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++ j++
} }
for _, hp := range ts.Histograms {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, &h}, appendable.histograms[k])
k++
}
} }
} }
@ -113,6 +120,28 @@ func TestOutOfOrderExemplar(t *testing.T) {
require.Equal(t, http.StatusNoContent, resp.StatusCode) require.Equal(t, http.StatusNoContent, resp.StatusCode)
} }
func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{histogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{
latestHistogram: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}
func TestCommitErr(t *testing.T) { func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -136,11 +165,13 @@ func TestCommitErr(t *testing.T) {
} }
type mockAppendable struct { type mockAppendable struct {
latestSample int64 latestSample int64
samples []mockSample samples []mockSample
latestExemplar int64 latestExemplar int64
exemplars []mockExemplar exemplars []mockExemplar
commitErr error latestHistogram int64
histograms []mockHistogram
commitErr error
} }
type mockSample struct { type mockSample struct {
@ -156,6 +187,12 @@ type mockExemplar struct {
v float64 v float64
} }
type mockHistogram struct {
l labels.Labels
t int64
h *histogram.Histogram
}
func (m *mockAppendable) Appender(_ context.Context) storage.Appender { func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
return m return m
} }
@ -188,7 +225,12 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e
return 0, nil return 0, nil
} }
func (*mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (m *mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) {
// TODO(beorn7): Noop until we implement sparse histograms over remote write. if t < m.latestHistogram {
return 0, storage.ErrOutOfOrderSample
}
m.latestHistogram = t
m.histograms = append(m.histograms, mockHistogram{l, t, h})
return 0, nil return 0, nil
} }

View file

@ -49,6 +49,7 @@ type WriteTo interface {
// Once returned, the WAL Watcher will not attempt to pass that data again. // Once returned, the WAL Watcher will not attempt to pass that data again.
Append([]record.RefSample) bool Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogram) bool
StoreSeries([]record.RefSeries, int) StoreSeries([]record.RefSeries, int)
// Next two methods are intended for garbage-collection: first we call // Next two methods are intended for garbage-collection: first we call
@ -74,6 +75,7 @@ type Watcher struct {
walDir string walDir string
lastCheckpoint string lastCheckpoint string
sendExemplars bool sendExemplars bool
sendHistograms bool
metrics *WatcherMetrics metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics readerMetrics *LiveReaderMetrics
@ -144,18 +146,19 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
} }
// NewWatcher creates a new WAL watcher for a given WriteTo. // NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars bool) *Watcher { func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
return &Watcher{ return &Watcher{
logger: logger, logger: logger,
writer: writer, writer: writer,
metrics: metrics, metrics: metrics,
readerMetrics: readerMetrics, readerMetrics: readerMetrics,
walDir: path.Join(dir, "wal"), walDir: path.Join(dir, "wal"),
name: name, name: name,
sendExemplars: sendExemplars, sendExemplars: sendExemplars,
sendHistograms: sendHistograms,
quit: make(chan struct{}), quit: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
@ -473,11 +476,13 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
// Also used with readCheckpoint - implements segmentReadFn. // Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var ( var (
dec record.Decoder dec record.Decoder
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
send []record.RefSample samplesToSend []record.RefSample
exemplars []record.RefExemplar exemplars []record.RefExemplar
histograms []record.RefHistogram
histogramsToSend []record.RefHistogram
) )
for r.Next() && !isClosed(w.quit) { for r.Next() && !isClosed(w.quit) {
rec := r.Record() rec := r.Record()
@ -510,12 +515,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
duration := time.Since(w.startTime) duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration) level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
} }
send = append(send, s) samplesToSend = append(samplesToSend, s)
} }
} }
if len(send) > 0 { if len(samplesToSend) > 0 {
w.writer.Append(send) w.writer.Append(samplesToSend)
send = send[:0] samplesToSend = samplesToSend[:0]
} }
case record.Exemplars: case record.Exemplars:
@ -535,6 +540,34 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
} }
w.writer.AppendExemplars(exemplars) w.writer.AppendExemplars(exemplars)
case record.Histograms:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
}
if !tail {
break
}
histograms, err := dec.Histograms(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, h := range histograms {
if h.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
histogramsToSend = append(histogramsToSend, h)
}
}
if len(histogramsToSend) > 0 {
w.writer.AppendHistograms(histogramsToSend)
histogramsToSend = histogramsToSend[:0]
}
case record.Tombstones: case record.Tombstones:
default: default:
@ -589,6 +622,8 @@ func recordType(rt record.Type) string {
return "series" return "series"
case record.Samples: case record.Samples:
return "samples" return "samples"
case record.Histograms:
return "histograms"
case record.Tombstones: case record.Tombstones:
return "tombstones" return "tombstones"
default: default:

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
@ -53,6 +54,7 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) {
type writeToMock struct { type writeToMock struct {
samplesAppended int samplesAppended int
exemplarsAppended int exemplarsAppended int
histogramsAppended int
seriesLock sync.Mutex seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int seriesSegmentIndexes map[chunks.HeadSeriesRef]int
} }
@ -67,6 +69,11 @@ func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
return true return true
} }
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogram) bool {
wtm.histogramsAppended += len(h)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.UpdateSeriesSegment(series, index) wtm.UpdateSeriesSegment(series, index)
} }
@ -108,6 +115,7 @@ func TestTailSamples(t *testing.T) {
const seriesCount = 10 const seriesCount = 10
const samplesCount = 250 const samplesCount = 250
const exemplarsCount = 25 const exemplarsCount = 25
const histogramsCount = 50
for _, compress := range []bool{false, true} { for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
now := time.Now() now := time.Now()
@ -160,6 +168,26 @@ func TestTailSamples(t *testing.T) {
}, nil) }, nil)
require.NoError(t, w.Log(exemplar)) require.NoError(t, w.Log(exemplar))
} }
for j := 0; j < histogramsCount; j++ {
inner := rand.Intn(ref + 1)
histogram := enc.Histograms([]record.RefHistogram{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
},
}}, nil)
require.NoError(t, w.Log(histogram))
}
} }
// Start read after checkpoint, no more data written. // Start read after checkpoint, no more data written.
@ -167,7 +195,7 @@ func TestTailSamples(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
watcher.SetStartTime(now) watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
@ -185,12 +213,14 @@ func TestTailSamples(t *testing.T) {
expectedSeries := seriesCount expectedSeries := seriesCount
expectedSamples := seriesCount * samplesCount expectedSamples := seriesCount * samplesCount
expectedExemplars := seriesCount * exemplarsCount expectedExemplars := seriesCount * exemplarsCount
expectedHistograms := seriesCount * histogramsCount
retry(t, defaultRetryInterval, defaultRetries, func() bool { retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries return wt.checkNumLabels() >= expectedSeries
}) })
require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series") require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
}) })
} }
} }
@ -249,7 +279,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start() go watcher.Start()
expected := seriesCount expected := seriesCount
@ -338,7 +368,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start() go watcher.Start()
expected := seriesCount * 2 expected := seriesCount * 2
@ -404,7 +434,7 @@ func TestReadCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start() go watcher.Start()
expectedSeries := seriesCount expectedSeries := seriesCount
@ -473,7 +503,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
} }
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
@ -545,7 +575,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
go watcher.Start() go watcher.Start()