Add Exemplar Remote Write support (#8296)

* Write exemplars to the WAL and send them over remote write.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Update example for exemplars, print data in a more obvious format.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Add metrics for remote write of exemplars.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix incorrect slices passed to send in remote write.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* We need to unregister the new metrics.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address review comments

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Order of exemplar append vs write exemplar to WAL needs to change.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Several fixes to prevent sending uninitialized or incorrect samples with an exemplar. Fix dropping exemplar for missing series. Add tests for queue_manager sending exemplars

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Store both samples and exemplars in the same timeseries buffer to remove the alloc when building final request, keep sub-slices in separate buffers for re-use

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Condense sample/exemplar delivery tests to parameterized sub-tests

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Rename test methods for clarity now that they also handle exemplars

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Rename counter variable. Fix instances where metrics were not updated correctly

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Add exemplars to LoadWAL benchmark

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* last exemplars timestamp metric needs to convert value to seconds with
ms precision

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Process exemplar records in a separate go routine when loading the WAL.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address review comments related to clarifying comments and variable
names. Also refactor sample/exemplar to enqueue prompb types.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Regenerate types proto with comments, update protoc version again.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Put remote write of exemplars behind a feature flag.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address some of Ganesh's review comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Move exemplar remote write feature flag to a config file field.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address Bartek's review comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Don't allocate exemplar buffers in queue_manager if we're not going to
send exemplars over remote write.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Add ValidateExemplar function, validate exemplars when appending to head
and log them all to WAL before adding them to exemplar storage.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address more reivew comments from Ganesh.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Add exemplar total label length check.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address a few last review comments

Signed-off-by: Callum Styan <callumstyan@gmail.com>

Co-authored-by: Martin Disibio <mdisibio@gmail.com>
This commit is contained in:
Callum Styan 2021-05-06 13:53:52 -07:00 committed by GitHub
parent 4b49ffbad5
commit 8fd73b1d28
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1443 additions and 361 deletions

View file

@ -41,7 +41,7 @@ jobs:
GOMAXPROCS: "2"
GO111MODULE: "on"
- prometheus/check_proto:
version: "3.12.3"
version: "3.15.8"
- prometheus/store_artifact:
file: prometheus
- prometheus/store_artifact:

View file

@ -632,6 +632,7 @@ type RemoteWriteConfig struct {
Headers map[string]string `yaml:"headers,omitempty"`
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.

View file

@ -2141,6 +2141,9 @@ write_relabel_configs:
# remote write configs.
[ name: <string> ]
# Enables sending of exemplars over remote write. Note that exemplar storage itself must be enabled for exemplars to be scraped in the first place.
[ send_exemplars: <boolean> | default = false ]
# Sets the `Authorization` header on every remote write request with the
# configured username and password.
# password and password_file are mutually exclusive.

View file

@ -52,5 +52,4 @@ The remote write receiver allows Prometheus to accept remote write requests from
[OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars) introduces the ability for scrape targets to add exemplars to certain metrics. Exemplars are references to data outside of the MetricSet. A common use case are IDs of program traces.
Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=<jaeger-trace-id>` uses roughly 100 bytes of memory via the in-memory exemplar storage.
Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=<jaeger-trace-id>` uses roughly 100 bytes of memory via the in-memory exemplar storage. If the exemplar storage is enabled, we will also append the exemplars to WAL for local persistence (for WAL duration).

View file

@ -39,7 +39,15 @@ func main() {
fmt.Println(m)
for _, s := range ts.Samples {
fmt.Printf(" %f %d\n", s.Value, s.Timestamp)
fmt.Printf("\tSample: %f %d\n", s.Value, s.Timestamp)
}
for _, e := range ts.Exemplars {
m := make(model.Metric, len(e.Labels))
for _, l := range e.Labels {
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
fmt.Printf("\tExemplar: %+v %f %d\n", m, e.Value, e.Timestamp)
}
}
})

View file

@ -15,12 +15,16 @@ package exemplar
import "github.com/prometheus/prometheus/pkg/labels"
// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
const ExemplarMaxLabelSetLength = 128
// Exemplar is additional information associated with a time series.
type Exemplar struct {
Labels labels.Labels
Value float64
Labels labels.Labels `json:"labels"`
Value float64 `json:"value"`
Ts int64 `json:"timestamp"`
HasTs bool
Ts int64
}
type QueryResult struct {

View file

@ -96,7 +96,7 @@ func (x LabelMatcher_Type) String() string {
}
func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{5, 0}
return fileDescriptor_d938547f84707355, []int{6, 0}
}
// We require this to match chunkenc.Encoding.
@ -122,7 +122,7 @@ func (x Chunk_Encoding) String() string {
}
func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{7, 0}
return fileDescriptor_d938547f84707355, []int{8, 0}
}
type MetricMetadata struct {
@ -199,7 +199,9 @@ func (m *MetricMetadata) GetUnit() string {
}
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -253,20 +255,89 @@ func (m *Sample) GetTimestamp() int64 {
return 0
}
// TimeSeries represents samples and labels for a single time series.
type TimeSeries struct {
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"`
type Exemplar struct {
// Optional, can be empty.
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
Value float64 `protobuf:"fixed64,2,opt,name=value,proto3" json:"value,omitempty"`
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Exemplar) Reset() { *m = Exemplar{} }
func (m *Exemplar) String() string { return proto.CompactTextString(m) }
func (*Exemplar) ProtoMessage() {}
func (*Exemplar) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{2}
}
func (m *Exemplar) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Exemplar) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Exemplar.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Exemplar) XXX_Merge(src proto.Message) {
xxx_messageInfo_Exemplar.Merge(m, src)
}
func (m *Exemplar) XXX_Size() int {
return m.Size()
}
func (m *Exemplar) XXX_DiscardUnknown() {
xxx_messageInfo_Exemplar.DiscardUnknown(m)
}
var xxx_messageInfo_Exemplar proto.InternalMessageInfo
func (m *Exemplar) GetLabels() []Label {
if m != nil {
return m.Labels
}
return nil
}
func (m *Exemplar) GetValue() float64 {
if m != nil {
return m.Value
}
return 0
}
func (m *Exemplar) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
// TimeSeries represents samples and labels for a single time series.
type TimeSeries struct {
// For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required.
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"`
Exemplars []Exemplar `protobuf:"bytes,3,rep,name=exemplars,proto3" json:"exemplars"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{2}
return fileDescriptor_d938547f84707355, []int{3}
}
func (m *TimeSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -309,6 +380,13 @@ func (m *TimeSeries) GetSamples() []Sample {
return nil
}
func (m *TimeSeries) GetExemplars() []Exemplar {
if m != nil {
return m.Exemplars
}
return nil
}
type Label struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
@ -321,7 +399,7 @@ func (m *Label) Reset() { *m = Label{} }
func (m *Label) String() string { return proto.CompactTextString(m) }
func (*Label) ProtoMessage() {}
func (*Label) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{3}
return fileDescriptor_d938547f84707355, []int{4}
}
func (m *Label) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -375,7 +453,7 @@ func (m *Labels) Reset() { *m = Labels{} }
func (m *Labels) String() string { return proto.CompactTextString(m) }
func (*Labels) ProtoMessage() {}
func (*Labels) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{4}
return fileDescriptor_d938547f84707355, []int{5}
}
func (m *Labels) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -425,7 +503,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} }
func (m *LabelMatcher) String() string { return proto.CompactTextString(m) }
func (*LabelMatcher) ProtoMessage() {}
func (*LabelMatcher) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{5}
return fileDescriptor_d938547f84707355, []int{6}
}
func (m *LabelMatcher) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -492,7 +570,7 @@ func (m *ReadHints) Reset() { *m = ReadHints{} }
func (m *ReadHints) String() string { return proto.CompactTextString(m) }
func (*ReadHints) ProtoMessage() {}
func (*ReadHints) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{6}
return fileDescriptor_d938547f84707355, []int{7}
}
func (m *ReadHints) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -586,7 +664,7 @@ func (m *Chunk) Reset() { *m = Chunk{} }
func (m *Chunk) String() string { return proto.CompactTextString(m) }
func (*Chunk) ProtoMessage() {}
func (*Chunk) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{7}
return fileDescriptor_d938547f84707355, []int{8}
}
func (m *Chunk) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -658,7 +736,7 @@ func (m *ChunkedSeries) Reset() { *m = ChunkedSeries{} }
func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) }
func (*ChunkedSeries) ProtoMessage() {}
func (*ChunkedSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{8}
return fileDescriptor_d938547f84707355, []int{9}
}
func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -707,6 +785,7 @@ func init() {
proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value)
proto.RegisterType((*MetricMetadata)(nil), "prometheus.MetricMetadata")
proto.RegisterType((*Sample)(nil), "prometheus.Sample")
proto.RegisterType((*Exemplar)(nil), "prometheus.Exemplar")
proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries")
proto.RegisterType((*Label)(nil), "prometheus.Label")
proto.RegisterType((*Labels)(nil), "prometheus.Labels")
@ -719,51 +798,53 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{
// 690 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xda, 0x40,
0x10, 0xce, 0xfa, 0x17, 0x86, 0x04, 0x39, 0xab, 0x54, 0x75, 0xa3, 0x96, 0x22, 0x4b, 0x95, 0x38,
0x54, 0x44, 0x49, 0x4f, 0x91, 0x7a, 0x21, 0x91, 0xf3, 0xa3, 0xc6, 0xa0, 0x2c, 0xa0, 0xfe, 0x5c,
0xd0, 0x02, 0x1b, 0xb0, 0x8a, 0x8d, 0xe3, 0x5d, 0xaa, 0xf0, 0x20, 0xbd, 0xf5, 0x15, 0x7a, 0xe8,
0x5b, 0xe4, 0xd8, 0x27, 0xa8, 0xaa, 0x3c, 0x49, 0xb5, 0x6b, 0x13, 0x13, 0xa5, 0x97, 0xf6, 0x36,
0xf3, 0x7d, 0xdf, 0xfc, 0xec, 0xcc, 0xd8, 0x50, 0x11, 0xcb, 0x84, 0xf1, 0x66, 0x92, 0xce, 0xc5,
0x1c, 0x43, 0x92, 0xce, 0x23, 0x26, 0xa6, 0x6c, 0xc1, 0x77, 0x77, 0x26, 0xf3, 0xc9, 0x5c, 0xc1,
0x7b, 0xd2, 0xca, 0x14, 0xde, 0x37, 0x0d, 0xaa, 0x01, 0x13, 0x69, 0x38, 0x0a, 0x98, 0xa0, 0x63,
0x2a, 0x28, 0x3e, 0x04, 0x43, 0xe6, 0x70, 0x51, 0x1d, 0x35, 0xaa, 0x07, 0xaf, 0x9a, 0x45, 0x8e,
0xe6, 0x43, 0x65, 0xee, 0xf6, 0x96, 0x09, 0x23, 0x2a, 0x04, 0xbf, 0x06, 0x1c, 0x29, 0x6c, 0x70,
0x45, 0xa3, 0x70, 0xb6, 0x1c, 0xc4, 0x34, 0x62, 0xae, 0x56, 0x47, 0x8d, 0x32, 0x71, 0x32, 0xe6,
0x44, 0x11, 0x6d, 0x1a, 0x31, 0x8c, 0xc1, 0x98, 0xb2, 0x59, 0xe2, 0x1a, 0x8a, 0x57, 0xb6, 0xc4,
0x16, 0x71, 0x28, 0x5c, 0x33, 0xc3, 0xa4, 0xed, 0x2d, 0x01, 0x8a, 0x4a, 0xb8, 0x02, 0x76, 0xbf,
0xfd, 0xae, 0xdd, 0x79, 0xdf, 0x76, 0x36, 0xa4, 0x73, 0xdc, 0xe9, 0xb7, 0x7b, 0x3e, 0x71, 0x10,
0x2e, 0x83, 0x79, 0xda, 0xea, 0x9f, 0xfa, 0x8e, 0x86, 0xb7, 0xa0, 0x7c, 0x76, 0xde, 0xed, 0x75,
0x4e, 0x49, 0x2b, 0x70, 0x74, 0x8c, 0xa1, 0xaa, 0x98, 0x02, 0x33, 0x64, 0x68, 0xb7, 0x1f, 0x04,
0x2d, 0xf2, 0xd1, 0x31, 0x71, 0x09, 0x8c, 0xf3, 0xf6, 0x49, 0xc7, 0xb1, 0xf0, 0x26, 0x94, 0xba,
0xbd, 0x56, 0xcf, 0xef, 0xfa, 0x3d, 0xc7, 0xf6, 0xde, 0x82, 0xd5, 0xa5, 0x51, 0x32, 0x63, 0x78,
0x07, 0xcc, 0x2f, 0x74, 0xb6, 0xc8, 0xc6, 0x82, 0x48, 0xe6, 0xe0, 0xe7, 0x50, 0x16, 0x61, 0xc4,
0xb8, 0xa0, 0x51, 0xa2, 0xde, 0xa9, 0x93, 0x02, 0xf0, 0xae, 0x01, 0x7a, 0x61, 0xc4, 0xba, 0x2c,
0x0d, 0x19, 0xc7, 0x7b, 0x60, 0xcd, 0xe8, 0x90, 0xcd, 0xb8, 0x8b, 0xea, 0x7a, 0xa3, 0x72, 0xb0,
0xbd, 0x3e, 0xd9, 0x0b, 0xc9, 0x1c, 0x19, 0xb7, 0xbf, 0x5e, 0x6e, 0x90, 0x5c, 0x86, 0x0f, 0xc0,
0xe6, 0xaa, 0x38, 0x77, 0x35, 0x15, 0x81, 0xd7, 0x23, 0xb2, 0xbe, 0xf2, 0x90, 0x95, 0xd0, 0xdb,
0x07, 0x53, 0xa5, 0x92, 0x83, 0x54, 0xc3, 0x47, 0xd9, 0x20, 0xa5, 0x5d, 0xbc, 0x21, 0xdb, 0x48,
0xe6, 0x78, 0x87, 0x60, 0x5d, 0x64, 0x05, 0xff, 0xb5, 0x43, 0xef, 0x2b, 0x82, 0x4d, 0x85, 0x07,
0x54, 0x8c, 0xa6, 0x2c, 0xc5, 0xfb, 0x0f, 0x6e, 0xe7, 0xc5, 0xa3, 0xf8, 0x5c, 0xd7, 0x5c, 0xbb,
0x99, 0x55, 0xa3, 0xda, 0xdf, 0x1a, 0xd5, 0xd7, 0x1b, 0x6d, 0x80, 0xa1, 0x2e, 0xc0, 0x02, 0xcd,
0xbf, 0x74, 0x36, 0xb0, 0x0d, 0x7a, 0xdb, 0xbf, 0x74, 0x90, 0x04, 0x88, 0xdc, 0xba, 0x04, 0x88,
0xef, 0xe8, 0xde, 0x0f, 0x04, 0x65, 0xc2, 0xe8, 0xf8, 0x2c, 0x8c, 0x05, 0xc7, 0x4f, 0xc1, 0xe6,
0x82, 0x25, 0x83, 0x88, 0xab, 0xbe, 0x74, 0x62, 0x49, 0x37, 0xe0, 0xb2, 0xf4, 0xd5, 0x22, 0x1e,
0xad, 0x4a, 0x4b, 0x1b, 0x3f, 0x83, 0x12, 0x17, 0x34, 0x15, 0x52, 0xad, 0x2b, 0xb5, 0xad, 0xfc,
0x80, 0xe3, 0x27, 0x60, 0xb1, 0x78, 0x2c, 0x09, 0x43, 0x11, 0x26, 0x8b, 0xc7, 0x01, 0xc7, 0xbb,
0x50, 0x9a, 0xa4, 0xf3, 0x45, 0x12, 0xc6, 0x13, 0xd7, 0xac, 0xeb, 0x8d, 0x32, 0xb9, 0xf7, 0x71,
0x15, 0xb4, 0xe1, 0xd2, 0xb5, 0xea, 0xa8, 0x51, 0x22, 0xda, 0x70, 0x29, 0xb3, 0xa7, 0x34, 0x9e,
0x30, 0x99, 0xc4, 0xce, 0xb2, 0x2b, 0x3f, 0xe0, 0xde, 0x77, 0x04, 0xe6, 0xf1, 0x74, 0x11, 0x7f,
0xc6, 0x35, 0xa8, 0x44, 0x61, 0x3c, 0x90, 0x77, 0x54, 0xf4, 0x5c, 0x8e, 0xc2, 0x58, 0x1e, 0x53,
0xc0, 0x15, 0x4f, 0x6f, 0xee, 0xf9, 0xfc, 0xec, 0x22, 0x7a, 0x93, 0xf3, 0xcd, 0x7c, 0x09, 0xba,
0x5a, 0xc2, 0xee, 0xfa, 0x12, 0x54, 0x81, 0xa6, 0x1f, 0x8f, 0xe6, 0xe3, 0x30, 0x9e, 0x14, 0x1b,
0x90, 0x9f, 0xb3, 0x7a, 0xd5, 0x26, 0x51, 0xb6, 0x57, 0x87, 0xd2, 0x4a, 0xf5, 0xf0, 0x8b, 0xb3,
0x41, 0xff, 0xd0, 0x21, 0x0e, 0xf2, 0xae, 0x61, 0x4b, 0x65, 0x63, 0xe3, 0xff, 0xbd, 0xef, 0x3d,
0xb0, 0x46, 0x32, 0xc3, 0xea, 0xbc, 0xb7, 0x1f, 0x75, 0xba, 0x0a, 0xc8, 0x64, 0x47, 0x3b, 0xb7,
0x77, 0x35, 0xf4, 0xf3, 0xae, 0x86, 0x7e, 0xdf, 0xd5, 0xd0, 0x27, 0x4b, 0xaa, 0x93, 0xe1, 0xd0,
0x52, 0x7f, 0xb2, 0x37, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xf3, 0xb7, 0x12, 0x44, 0xfa, 0x04,
0x00, 0x00,
// 734 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcb, 0x6e, 0xdb, 0x46,
0x14, 0xf5, 0xf0, 0x29, 0x5e, 0xd9, 0x02, 0x3d, 0x50, 0x51, 0xd6, 0x68, 0x55, 0x81, 0x40, 0x01,
0x2d, 0x0a, 0x19, 0x76, 0x37, 0x35, 0xd0, 0x8d, 0x6c, 0xd0, 0x0f, 0xd4, 0x94, 0xe0, 0x91, 0x84,
0x3e, 0x36, 0xc2, 0x48, 0x1a, 0x4b, 0x44, 0xc4, 0x47, 0x38, 0x54, 0x60, 0x7d, 0x48, 0x76, 0xf9,
0x83, 0x20, 0x8b, 0xfc, 0x85, 0x97, 0xf9, 0x82, 0x20, 0xf0, 0x97, 0x04, 0x33, 0xa4, 0x4c, 0x29,
0x4e, 0x16, 0xce, 0xee, 0xde, 0x7b, 0xce, 0xb9, 0x8f, 0xb9, 0x97, 0x84, 0x6a, 0xb6, 0x4a, 0x18,
0x6f, 0x27, 0x69, 0x9c, 0xc5, 0x18, 0x92, 0x34, 0x0e, 0x59, 0x36, 0x67, 0x4b, 0x7e, 0x50, 0x9f,
0xc5, 0xb3, 0x58, 0x86, 0x0f, 0x85, 0x95, 0x33, 0xdc, 0x37, 0x0a, 0xd4, 0x7c, 0x96, 0xa5, 0xc1,
0xc4, 0x67, 0x19, 0x9d, 0xd2, 0x8c, 0xe2, 0x13, 0xd0, 0x44, 0x0e, 0x07, 0x35, 0x51, 0xab, 0x76,
0xfc, 0x5b, 0xbb, 0xcc, 0xd1, 0xde, 0x66, 0x16, 0xee, 0x60, 0x95, 0x30, 0x22, 0x25, 0xf8, 0x77,
0xc0, 0xa1, 0x8c, 0x8d, 0x6e, 0x69, 0x18, 0x2c, 0x56, 0xa3, 0x88, 0x86, 0xcc, 0x51, 0x9a, 0xa8,
0x65, 0x11, 0x3b, 0x47, 0xce, 0x25, 0xd0, 0xa5, 0x21, 0xc3, 0x18, 0xb4, 0x39, 0x5b, 0x24, 0x8e,
0x26, 0x71, 0x69, 0x8b, 0xd8, 0x32, 0x0a, 0x32, 0x47, 0xcf, 0x63, 0xc2, 0x76, 0x57, 0x00, 0x65,
0x25, 0x5c, 0x05, 0x73, 0xd8, 0xfd, 0xbb, 0xdb, 0xfb, 0xa7, 0x6b, 0xef, 0x08, 0xe7, 0xac, 0x37,
0xec, 0x0e, 0x3c, 0x62, 0x23, 0x6c, 0x81, 0x7e, 0xd1, 0x19, 0x5e, 0x78, 0xb6, 0x82, 0xf7, 0xc0,
0xba, 0xbc, 0xea, 0x0f, 0x7a, 0x17, 0xa4, 0xe3, 0xdb, 0x2a, 0xc6, 0x50, 0x93, 0x48, 0x19, 0xd3,
0x84, 0xb4, 0x3f, 0xf4, 0xfd, 0x0e, 0xf9, 0xcf, 0xd6, 0x71, 0x05, 0xb4, 0xab, 0xee, 0x79, 0xcf,
0x36, 0xf0, 0x2e, 0x54, 0xfa, 0x83, 0xce, 0xc0, 0xeb, 0x7b, 0x03, 0xdb, 0x74, 0xff, 0x02, 0xa3,
0x4f, 0xc3, 0x64, 0xc1, 0x70, 0x1d, 0xf4, 0x57, 0x74, 0xb1, 0xcc, 0x9f, 0x05, 0x91, 0xdc, 0xc1,
0x3f, 0x83, 0x95, 0x05, 0x21, 0xe3, 0x19, 0x0d, 0x13, 0x39, 0xa7, 0x4a, 0xca, 0x80, 0x1b, 0x43,
0xc5, 0xbb, 0x63, 0x61, 0xb2, 0xa0, 0x29, 0x3e, 0x04, 0x63, 0x41, 0xc7, 0x6c, 0xc1, 0x1d, 0xd4,
0x54, 0x5b, 0xd5, 0xe3, 0xfd, 0xcd, 0x77, 0xbd, 0x16, 0xc8, 0xa9, 0x76, 0xff, 0xf1, 0xd7, 0x1d,
0x52, 0xd0, 0xca, 0x82, 0xca, 0x37, 0x0b, 0xaa, 0x5f, 0x16, 0x7c, 0x8b, 0x00, 0x06, 0x41, 0xc8,
0xfa, 0x2c, 0x0d, 0x18, 0x7f, 0x7e, 0xcd, 0x63, 0x30, 0xb9, 0x1c, 0x97, 0x3b, 0x8a, 0x54, 0xe0,
0x4d, 0x45, 0xfe, 0x12, 0x85, 0x64, 0x4d, 0xc4, 0x7f, 0x82, 0xc5, 0x8a, 0x21, 0xb9, 0xa3, 0x4a,
0x55, 0x7d, 0x53, 0xb5, 0x7e, 0x81, 0x42, 0x57, 0x92, 0xdd, 0x23, 0xd0, 0x65, 0x13, 0x62, 0xe9,
0xf2, 0x50, 0x50, 0xbe, 0x74, 0x61, 0x6f, 0x8f, 0x6f, 0x15, 0xe3, 0xbb, 0x27, 0x60, 0x5c, 0xe7,
0xad, 0x3e, 0x77, 0x36, 0xf7, 0x35, 0x82, 0x5d, 0x19, 0xf7, 0x69, 0x36, 0x99, 0xb3, 0x14, 0x1f,
0x6d, 0xdd, 0xf9, 0x2f, 0x4f, 0xf4, 0x05, 0xaf, 0xbd, 0x71, 0xdf, 0xeb, 0x46, 0x95, 0xaf, 0x35,
0xaa, 0x6e, 0x36, 0xda, 0x02, 0x4d, 0x5e, 0xab, 0x01, 0x8a, 0x77, 0x63, 0xef, 0x60, 0x13, 0xd4,
0xae, 0x77, 0x63, 0x23, 0x11, 0x20, 0xe2, 0x42, 0x45, 0x80, 0x78, 0xb6, 0xea, 0xbe, 0x47, 0x60,
0x11, 0x46, 0xa7, 0x97, 0x41, 0x94, 0x71, 0xfc, 0x23, 0x98, 0x3c, 0x63, 0xc9, 0x28, 0xe4, 0xb2,
0x2f, 0x95, 0x18, 0xc2, 0xf5, 0xb9, 0x28, 0x7d, 0xbb, 0x8c, 0x26, 0xeb, 0xd2, 0xc2, 0xc6, 0x3f,
0x41, 0x85, 0x67, 0x34, 0xcd, 0x04, 0x3b, 0xbf, 0x05, 0x53, 0xfa, 0x3e, 0xc7, 0x3f, 0x80, 0xc1,
0xa2, 0xa9, 0x00, 0x34, 0x09, 0xe8, 0x2c, 0x9a, 0xfa, 0x1c, 0x1f, 0x40, 0x65, 0x96, 0xc6, 0xcb,
0x24, 0x88, 0x66, 0x8e, 0xde, 0x54, 0x5b, 0x16, 0x79, 0xf4, 0x71, 0x0d, 0x94, 0xf1, 0xca, 0x31,
0x9a, 0xa8, 0x55, 0x21, 0xca, 0x78, 0x25, 0xb2, 0xa7, 0x34, 0x9a, 0x31, 0x91, 0xc4, 0xcc, 0xb3,
0x4b, 0xdf, 0xe7, 0xee, 0x3b, 0x04, 0xfa, 0xd9, 0x7c, 0x19, 0xbd, 0xc0, 0x0d, 0xa8, 0x86, 0x41,
0x34, 0x12, 0x27, 0x58, 0xf6, 0x6c, 0x85, 0x41, 0x24, 0xce, 0xd0, 0xe7, 0x12, 0xa7, 0x77, 0x8f,
0x78, 0xf1, 0x89, 0x84, 0xf4, 0xae, 0xc0, 0xdb, 0xc5, 0x12, 0x54, 0xb9, 0x84, 0x83, 0xcd, 0x25,
0xc8, 0x02, 0x6d, 0x2f, 0x9a, 0xc4, 0xd3, 0x20, 0x9a, 0x95, 0x1b, 0x10, 0xbf, 0x1e, 0x39, 0xd5,
0x2e, 0x91, 0xb6, 0xdb, 0x84, 0xca, 0x9a, 0xb5, 0xfd, 0x77, 0x30, 0x41, 0xfd, 0xb7, 0x47, 0x6c,
0xe4, 0xbe, 0x84, 0x3d, 0x99, 0x8d, 0x4d, 0xbf, 0xf7, 0xcb, 0x38, 0x04, 0x63, 0x22, 0x32, 0xac,
0x3f, 0x8c, 0xfd, 0x27, 0x9d, 0xae, 0x05, 0x39, 0xed, 0xb4, 0x7e, 0xff, 0xd0, 0x40, 0x1f, 0x1e,
0x1a, 0xe8, 0xd3, 0x43, 0x03, 0xfd, 0x6f, 0x08, 0x76, 0x32, 0x1e, 0x1b, 0xf2, 0xaf, 0xfb, 0xc7,
0xe7, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa9, 0xa3, 0x6c, 0x23, 0xa6, 0x05, 0x00, 0x00,
}
func (m *MetricMetadata) Marshal() (dAtA []byte, err error) {
@ -857,6 +938,58 @@ func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *Exemplar) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Exemplar) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Timestamp != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.Timestamp))
i--
dAtA[i] = 0x18
}
if m.Value != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i--
dAtA[i] = 0x11
}
if len(m.Labels) > 0 {
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *TimeSeries) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -881,6 +1014,20 @@ func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
@ -1273,6 +1420,30 @@ func (m *Sample) Size() (n int) {
return n
}
func (m *Exemplar) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Labels) > 0 {
for _, e := range m.Labels {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
if m.Value != 0 {
n += 9
}
if m.Timestamp != 0 {
n += 1 + sovTypes(uint64(m.Timestamp))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *TimeSeries) Size() (n int) {
if m == nil {
return 0
@ -1291,6 +1462,12 @@ func (m *TimeSeries) Size() (n int) {
n += 1 + l + sovTypes(uint64(l))
}
}
if len(m.Exemplars) > 0 {
for _, e := range m.Exemplars {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -1697,6 +1874,121 @@ func (m *Sample) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *Exemplar) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Exemplar: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Exemplar: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Labels = append(m.Labels, Label{})
if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 1 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.Value = float64(math.Float64frombits(v))
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
m.Timestamp = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Timestamp |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *TimeSeries) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1794,6 +2086,40 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Exemplars", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Exemplars = append(m.Exemplars, Exemplar{})
if err := m.Exemplars[len(m.Exemplars)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])

View file

@ -40,13 +40,27 @@ message MetricMetadata {
message Sample {
double value = 1;
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 2;
}
message Exemplar {
// Optional, can be empty.
repeated Label labels = 1 [(gogoproto.nullable) = false];
double value = 2;
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 3;
}
// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
}
message Label {

View file

@ -26,6 +26,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql/parser"
@ -274,16 +275,18 @@ func (*evalCmd) testCmd() {}
// loadCmd is a command that loads sequences of sample values for specific
// metrics into the storage.
type loadCmd struct {
gap time.Duration
metrics map[uint64]labels.Labels
defs map[uint64][]Point
gap time.Duration
metrics map[uint64]labels.Labels
defs map[uint64][]Point
exemplars map[uint64][]exemplar.Exemplar
}
func newLoadCmd(gap time.Duration) *loadCmd {
return &loadCmd{
gap: gap,
metrics: map[uint64]labels.Labels{},
defs: map[uint64][]Point{},
gap: gap,
metrics: map[uint64]labels.Labels{},
defs: map[uint64][]Point{},
exemplars: map[uint64][]exemplar.Exemplar{},
}
}

View file

@ -1497,7 +1497,6 @@ func yoloString(b []byte) string {
// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) {
switch errors.Cause(err) {
case nil:

View file

@ -10,8 +10,8 @@ if ! [[ "$0" =~ "scripts/genproto.sh" ]]; then
exit 255
fi
if ! [[ $(protoc --version) =~ "3.12.3" ]]; then
echo "could not find protoc 3.12.3, is it installed + in PATH?"
if ! [[ $(protoc --version) =~ "3.15.8" ]]; then
echo "could not find protoc 3.15.8, is it installed + in PATH?"
exit 255
fi

View file

@ -16,6 +16,7 @@ package storage
import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
@ -30,6 +31,8 @@ var (
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
ErrOutOfBounds = errors.New("out of bounds")
ErrOutOfOrderExemplar = errors.New("out of order exemplar")
ErrDuplicateExemplar = errors.New("duplicate exemplar")
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
)
// Appendable allows creating appenders.

View file

@ -52,25 +52,30 @@ const (
type queueManagerMetrics struct {
reg prometheus.Registerer
samplesTotal prometheus.Counter
metadataTotal prometheus.Counter
failedSamplesTotal prometheus.Counter
failedMetadataTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge
shardCapacity prometheus.Gauge
numShards prometheus.Gauge
maxNumShards prometheus.Gauge
minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
samplesBytesTotal prometheus.Counter
metadataBytesTotal prometheus.Counter
maxSamplesPerSend prometheus.Gauge
samplesTotal prometheus.Counter
exemplarsTotal prometheus.Counter
metadataTotal prometheus.Counter
failedSamplesTotal prometheus.Counter
failedExemplarsTotal prometheus.Counter
failedMetadataTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter
retriedExemplarsTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter
droppedExemplarsTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge
pendingExemplars prometheus.Gauge
shardCapacity prometheus.Gauge
numShards prometheus.Gauge
maxNumShards prometheus.Gauge
minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
sentBytesTotal prometheus.Counter
metadataBytesTotal prometheus.Counter
maxSamplesPerSend prometheus.Gauge
}
func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics {
@ -89,6 +94,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of samples sent to remote storage.",
ConstLabels: constLabels,
})
m.exemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_total",
Help: "Total number of exemplars sent to remote storage.",
ConstLabels: constLabels,
})
m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -103,6 +115,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.",
ConstLabels: constLabels,
})
m.failedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_failed_total",
Help: "Total number of exemplars which failed on send to remote storage, non-recoverable errors.",
ConstLabels: constLabels,
})
m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -117,6 +136,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.retriedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_retried_total",
Help: "Total number of exemplars which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -128,7 +154,14 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_dropped_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_dropped_total",
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
@ -162,6 +195,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "The number of samples pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels,
})
m.pendingExemplars = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_pending",
Help: "The number of exemplars pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels,
})
m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -197,11 +237,11 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.",
ConstLabels: constLabels,
})
m.samplesBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
m.sentBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_bytes_total",
Help: "The total number of bytes of samples sent by the queue after compression.",
Name: "bytes_total",
Help: "The total number of bytes of data (not metadata) sent by the queue after compression. Note that when exemplars over remote write is enabled the exemplars included in a remote write request count towards this metric.",
ConstLabels: constLabels,
})
m.metadataBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
@ -215,7 +255,7 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Namespace: namespace,
Subsystem: subsystem,
Name: "max_samples_per_send",
Help: "The maximum number of samples to be sent, in a single request, to the remote storage.",
Help: "The maximum number of samples to be sent, in a single request, to the remote storage. Note that, when sending of exemplars over remote write is enabled, exemplars count towards this limt.",
ConstLabels: constLabels,
})
@ -226,22 +266,27 @@ func (m *queueManagerMetrics) register() {
if m.reg != nil {
m.reg.MustRegister(
m.samplesTotal,
m.exemplarsTotal,
m.metadataTotal,
m.failedSamplesTotal,
m.failedExemplarsTotal,
m.failedMetadataTotal,
m.retriedSamplesTotal,
m.retriedExemplarsTotal,
m.retriedMetadataTotal,
m.droppedSamplesTotal,
m.droppedExemplarsTotal,
m.enqueueRetriesTotal,
m.sentBatchDuration,
m.highestSentTimestamp,
m.pendingSamples,
m.pendingExemplars,
m.shardCapacity,
m.numShards,
m.maxNumShards,
m.minNumShards,
m.desiredNumShards,
m.samplesBytesTotal,
m.sentBytesTotal,
m.metadataBytesTotal,
m.maxSamplesPerSend,
)
@ -251,22 +296,27 @@ func (m *queueManagerMetrics) register() {
func (m *queueManagerMetrics) unregister() {
if m.reg != nil {
m.reg.Unregister(m.samplesTotal)
m.reg.Unregister(m.exemplarsTotal)
m.reg.Unregister(m.metadataTotal)
m.reg.Unregister(m.failedSamplesTotal)
m.reg.Unregister(m.failedExemplarsTotal)
m.reg.Unregister(m.failedMetadataTotal)
m.reg.Unregister(m.retriedSamplesTotal)
m.reg.Unregister(m.retriedExemplarsTotal)
m.reg.Unregister(m.retriedMetadataTotal)
m.reg.Unregister(m.droppedSamplesTotal)
m.reg.Unregister(m.droppedExemplarsTotal)
m.reg.Unregister(m.enqueueRetriesTotal)
m.reg.Unregister(m.sentBatchDuration)
m.reg.Unregister(m.highestSentTimestamp)
m.reg.Unregister(m.pendingSamples)
m.reg.Unregister(m.pendingExemplars)
m.reg.Unregister(m.shardCapacity)
m.reg.Unregister(m.numShards)
m.reg.Unregister(m.maxNumShards)
m.reg.Unregister(m.minNumShards)
m.reg.Unregister(m.desiredNumShards)
m.reg.Unregister(m.samplesBytesTotal)
m.reg.Unregister(m.sentBytesTotal)
m.reg.Unregister(m.metadataBytesTotal)
m.reg.Unregister(m.maxSamplesPerSend)
}
@ -295,6 +345,7 @@ type QueueManager struct {
mcfg config.MetadataConfig
externalLabels labels.Labels
relabelConfigs []*relabel.Config
sendExemplars bool
watcher *wal.Watcher
metadataWatcher *MetadataWatcher
@ -312,7 +363,7 @@ type QueueManager struct {
quit chan struct{}
wg sync.WaitGroup
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate
metrics *queueManagerMetrics
interner *pool
@ -336,6 +387,7 @@ func NewQueueManager(
interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -350,6 +402,7 @@ func NewQueueManager(
externalLabels: externalLabels,
relabelConfigs: relabelConfigs,
storeClient: client,
sendExemplars: enableExemplarRemoteWrite,
seriesLabels: make(map[uint64]labels.Labels),
seriesSegmentIndexes: make(map[uint64]int),
@ -359,17 +412,17 @@ func NewQueueManager(
reshardChan: make(chan int),
quit: make(chan struct{}),
samplesIn: samplesIn,
samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
dataIn: samplesIn,
dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOut: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
metrics: metrics,
interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
}
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir)
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir, enableExemplarRemoteWrite)
if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
}
@ -444,13 +497,14 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
var appendSample prompb.Sample
outer:
for _, s := range samples {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref]
if !ok {
t.metrics.droppedSamplesTotal.Inc()
t.samplesDropped.incr(1)
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
}
@ -466,12 +520,56 @@ outer:
return false
default:
}
appendSample.Value = s.V
appendSample.Timestamp = s.T
if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) {
continue outer
}
if t.shards.enqueue(s.Ref, sample{
labels: lbls,
t: s.T,
v: s.V,
}) {
t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff
}
}
}
return true
}
func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
if !t.sendExemplars {
return true
}
var appendExemplar prompb.Exemplar
outer:
for _, e := range exemplars {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref]
if !ok {
t.metrics.droppedExemplarsTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff
for {
select {
case <-t.quit:
return false
default:
}
appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil)
appendExemplar.Timestamp = e.T
appendExemplar.Value = e.V
if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) {
continue outer
}
@ -687,27 +785,27 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func (t *QueueManager) calculateDesiredShards() int {
t.samplesOut.tick()
t.samplesDropped.tick()
t.samplesOutDuration.tick()
t.dataOut.tick()
t.dataDropped.tick()
t.dataOutDuration.tick()
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
samplesInRate = t.samplesIn.rate()
samplesOutRate = t.samplesOut.rate()
samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate)
samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second)
samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = t.highestRecvTimestamp.Get()
delay = highestRecv - highestSent
samplesPending = delay * samplesInRate * samplesKeptRatio
dataInRate = t.dataIn.rate()
dataOutRate = t.dataOut.rate()
dataKeptRatio = dataOutRate / (t.dataDropped.rate() + dataOutRate)
dataOutDuration = t.dataOutDuration.rate() / float64(time.Second)
dataPendingRate = dataInRate*dataKeptRatio - dataOutRate
highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = t.highestRecvTimestamp.Get()
delay = highestRecv - highestSent
dataPending = delay * dataInRate * dataKeptRatio
)
if samplesOutRate <= 0 {
if dataOutRate <= 0 {
return t.numShards
}
@ -717,17 +815,17 @@ func (t *QueueManager) calculateDesiredShards() int {
const integralGain = 0.1 / float64(shardUpdateDuration/time.Second)
var (
timePerSample = samplesOutDuration / samplesOutRate
desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)
timePerSample = dataOutDuration / dataOutRate
desiredShards = timePerSample * (dataInRate*dataKeptRatio + integralGain*dataPending)
)
t.metrics.desiredNumShards.Set(desiredShards)
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
"samplesInRate", samplesInRate,
"samplesOutRate", samplesOutRate,
"samplesKeptRatio", samplesKeptRatio,
"samplesPendingRate", samplesPendingRate,
"samplesPending", samplesPending,
"samplesOutDuration", samplesOutDuration,
"dataInRate", dataInRate,
"dataOutRate", dataOutRate,
"dataKeptRatio", dataKeptRatio,
"dataPendingRate", dataPendingRate,
"dataPending", dataPending,
"dataOutDuration", dataOutDuration,
"timePerSample", timePerSample,
"desiredShards", desiredShards,
"highestSent", highestSent,
@ -785,17 +883,24 @@ func (t *QueueManager) newShards() *shards {
return s
}
type sample struct {
labels labels.Labels
t int64
v float64
type writeSample struct {
seriesLabels labels.Labels
sample prompb.Sample
}
type writeExemplar struct {
seriesLabels labels.Labels
exemplar prompb.Exemplar
}
type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended.
qm *QueueManager
queues []chan sample
queues []chan interface{}
// So we can accurately track how many of each are lost during shard shutdowns.
enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64
// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
@ -807,8 +912,9 @@ type shards struct {
// Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate.
hardShutdown context.CancelFunc
droppedOnHardShutdown atomic.Uint32
hardShutdown context.CancelFunc
samplesDroppedOnHardShutdown atomic.Uint32
exemplarsDroppedOnHardShutdown atomic.Uint32
}
// start the shards; must be called before any call to enqueue.
@ -819,9 +925,9 @@ func (s *shards) start(n int) {
s.qm.metrics.pendingSamples.Set(0)
s.qm.metrics.numShards.Set(float64(n))
newQueues := make([]chan sample, n)
newQueues := make([]chan interface{}, n)
for i := 0; i < n; i++ {
newQueues[i] = make(chan sample, s.qm.cfg.Capacity)
newQueues[i] = make(chan interface{}, s.qm.cfg.Capacity)
}
s.queues = newQueues
@ -831,7 +937,8 @@ func (s *shards) start(n int) {
s.softShutdown = make(chan struct{})
s.running.Store(int32(n))
s.done = make(chan struct{})
s.droppedOnHardShutdown.Store(0)
s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
}
@ -864,14 +971,17 @@ func (s *shards) stop() {
// Force an unclean shutdown.
s.hardShutdown()
<-s.done
if dropped := s.droppedOnHardShutdown.Load(); dropped > 0 {
if dropped := s.samplesDroppedOnHardShutdown.Load(); dropped > 0 {
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped)
}
if dropped := s.exemplarsDroppedOnHardShutdown.Load(); dropped > 0 {
level.Error(s.qm.logger).Log("msg", "Failed to flush all exemplars on shutdown", "count", dropped)
}
}
// enqueue a sample. If we are currently in the process of shutting down or resharding,
// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref uint64, sample sample) bool {
func (s *shards) enqueue(ref uint64, data interface{}) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
@ -885,13 +995,22 @@ func (s *shards) enqueue(ref uint64, sample sample) bool {
select {
case <-s.softShutdown:
return false
case s.queues[shard] <- sample:
s.qm.metrics.pendingSamples.Inc()
case s.queues[shard] <- data:
switch data.(type) {
case writeSample:
s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc()
case writeExemplar:
s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc()
default:
level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue")
}
return true
}
}
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}) {
defer func() {
if s.running.Dec() == 0 {
close(s.done)
@ -901,14 +1020,26 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
shardNum := strconv.Itoa(shardID)
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
// If we have fewer samples than that, flush them out after a deadline anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
nPending = 0
pendingSamples = allocateTimeSeries(max)
max = s.qm.cfg.MaxSamplesPerSend
// Rough estimate, 1% of active series will contain an exemplar on each scrape.
// TODO(cstyan): Casting this many times smells, also we could get index out of bounds issues here.
maxExemplars = int(math.Max(1, float64(max/10)))
nPending, nPendingSamples, nPendingExemplars = 0, 0, 0
sampleBuffer = allocateSampleBuffer(max)
buf []byte
pendingData []prompb.TimeSeries
exemplarBuffer [][]prompb.Exemplar
)
totalPending := max
if s.qm.sendExemplars {
exemplarBuffer = allocateExemplarBuffer(maxExemplars)
totalPending += maxExemplars
}
pendingData = make([]prompb.TimeSeries, totalPending)
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
@ -926,18 +1057,23 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
case <-ctx.Done():
// In this case we drop all samples in the buffer and the queue.
// Remove them from pending and mark them as failed.
droppedSamples := nPending + len(queue)
droppedSamples := nPendingSamples + int(s.enqueuedSamples.Load())
droppedExemplars := nPendingExemplars + int(s.enqueuedExemplars.Load())
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
s.droppedOnHardShutdown.Add(uint32(droppedSamples))
s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars))
s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples))
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
return
case sample, ok := <-queue:
if !ok {
if nPending > 0 {
level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending)
s.sendSamples(ctx, pendingSamples[:nPending], &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPending))
if nPendingSamples > 0 || nPendingExemplars > 0 {
level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars)
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
level.Debug(s.qm.logger).Log("msg", "Done flushing.")
}
return
@ -946,25 +1082,44 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels)
pendingSamples[nPending].Samples[0].Timestamp = sample.t
pendingSamples[nPending].Samples[0].Value = sample.v
nPending++
switch d := sample.(type) {
case writeSample:
sampleBuffer[nPendingSamples][0] = d.sample
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = sampleBuffer[nPendingSamples]
pendingData[nPending].Exemplars = nil
nPendingSamples++
nPending++
if nPending >= max {
s.sendSamples(ctx, pendingSamples, &buf)
case writeExemplar:
exemplarBuffer[nPendingExemplars][0] = d.exemplar
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = nil
pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars]
nPendingExemplars++
nPending++
}
if nPendingSamples >= max || nPendingExemplars >= maxExemplars {
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
nPendingSamples = 0
nPendingExemplars = 0
nPending = 0
s.qm.metrics.pendingSamples.Sub(float64(max))
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
case <-timer.C:
if nPending > 0 {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum)
s.sendSamples(ctx, pendingSamples[:nPending], &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPending))
if nPendingSamples > 0 || nPendingExemplars > 0 {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
nPendingSamples = 0
nPendingExemplars = 0
nPending = 0
}
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -972,23 +1127,24 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
}
}
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) {
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, buf)
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, buf)
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(len(samples)))
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
}
// These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.samplesOut.incr(int64(len(samples)))
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
s.qm.dataOut.incr(int64(len(samples)))
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
s.qm.lastSendTimestamp.Store(time.Now().Unix())
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, *buf)
if err != nil {
@ -998,7 +1154,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
reqSize := len(*buf)
sampleCount := len(samples)
*buf = req
// An anonymous function allows us to defer the completion of our per-try spans
@ -1009,6 +1164,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
defer span.Finish()
span.SetTag("samples", sampleCount)
if exemplarCount > 0 {
span.SetTag("exemplars", exemplarCount)
}
span.SetTag("request_size", reqSize)
span.SetTag("try", try)
span.SetTag("remote_name", s.qm.storeClient.Name())
@ -1016,6 +1174,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
err := s.qm.client().Store(ctx, *buf)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -1030,13 +1189,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
}
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
if err != nil {
return err
}
s.qm.metrics.samplesBytesTotal.Add(float64(reqSize))
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return nil
}
@ -1096,10 +1256,13 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, buf []byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample in it.
if ts.Samples[0].Timestamp > highest {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
}
req := &prompb.WriteRequest{
@ -1121,11 +1284,18 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
return compressed, highest, nil
}
func allocateTimeSeries(capacity int) []prompb.TimeSeries {
timeseries := make([]prompb.TimeSeries, capacity)
// We only ever send one sample per timeseries, so preallocate with length one.
for i := range timeseries {
timeseries[i].Samples = []prompb.Sample{{}}
func allocateSampleBuffer(capacity int) [][]prompb.Sample {
buf := make([][]prompb.Sample, capacity)
for i := range buf {
buf[i] = []prompb.Sample{{}}
}
return timeseries
return buf
}
func allocateExemplarBuffer(capacity int) [][]prompb.Exemplar {
buf := make([][]prompb.Exemplar, capacity)
for i := range buf {
buf[i] = []prompb.Exemplar{{}}
}
return buf
}

View file

@ -60,21 +60,22 @@ func newHighestTimestampMetric() *maxTimestamp {
}
func TestSampleDelivery(t *testing.T) {
testcases := []struct {
name string
samples bool
exemplars bool
}{
{samples: true, exemplars: false, name: "samples only"},
{samples: true, exemplars: true, name: "both samples and exemplars"},
{samples: false, exemplars: true, name: "exemplars only"},
}
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := config.DefaultQueueConfig.MaxSamplesPerSend * 2
samples, series := createTimeseries(n, n)
n := 3
c := NewTestWriteClient()
c.expectSamples(samples[:len(samples)/2], series)
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples) / 2
dir, err := ioutil.TempDir("", "TestSampleDeliver")
dir, err := ioutil.TempDir("", "TestSampleDelivery")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
@ -83,13 +84,11 @@ func TestSampleDelivery(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
writeConfig := config.DefaultRemoteWriteConfig
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
&writeConfig,
},
}
// We need to set URL's so that metric creation doesn't panic.
writeConfig.URL = &common_config.URL{
URL: &url.URL{
@ -97,19 +96,60 @@ func TestSampleDelivery(t *testing.T) {
},
}
writeConfig.QueueConfig = queueConfig
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
qm.SetClient(c)
writeConfig.SendExemplars = true
qm.StoreSeries(series, 0)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
&writeConfig,
},
}
qm.Append(samples[:len(samples)/2])
c.waitForExpectedSamples(t)
c.expectSamples(samples[len(samples)/2:], series)
qm.Append(samples[len(samples)/2:])
c.waitForExpectedSamples(t)
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var (
series []record.RefSeries
samples []record.RefSample
exemplars []record.RefExemplar
)
// Generates same series in both cases.
if tc.samples {
samples, series = createTimeseries(n, n)
}
if tc.exemplars {
exemplars, series = createExemplars(n, n)
}
// Apply new config.
queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples) / 2
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient()
qm.SetClient(c)
qm.StoreSeries(series, 0)
// Send first half of data.
c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series)
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
c.waitForExpectedData(t)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series)
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
c.waitForExpectedData(t)
})
}
}
func TestMetadataDelivery(t *testing.T) {
@ -123,7 +163,7 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.Start()
defer m.Stop()
@ -157,7 +197,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
}()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -165,11 +205,11 @@ func TestSampleDeliveryTimeout(t *testing.T) {
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedSamples(t)
c.waitForExpectedData(t)
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedSamples(t)
c.waitForExpectedData(t)
}
func TestSampleDeliveryOrder(t *testing.T) {
@ -203,14 +243,14 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(samples)
c.waitForExpectedSamples(t)
c.waitForExpectedData(t)
}
func TestShutdown(t *testing.T) {
@ -227,7 +267,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -269,7 +309,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -302,7 +342,7 @@ func TestReshard(t *testing.T) {
}()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
@ -322,7 +362,7 @@ func TestReshard(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
c.waitForExpectedSamples(t)
c.waitForExpectedData(t)
}
func TestReshardRaceWithStop(t *testing.T) {
@ -337,7 +377,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.Start()
h.Unlock()
h.Lock()
@ -357,7 +397,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.Start()
for i := 1; i < 1000; i++ {
@ -408,10 +448,10 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.numShards = c.startingShards
m.samplesIn.incr(c.samplesIn)
m.samplesOut.incr(c.samplesOut)
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
m.lastSendTimestamp.Store(c.lastSendTimestamp)
m.Start()
@ -436,7 +476,6 @@ func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.R
T: int64(j),
V: float64(i),
})
}
series = append(series, record.RefSeries{
Ref: uint64(i),
@ -446,6 +485,28 @@ func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.R
return samples, series
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numExemplars; j++ {
e := record.RefExemplar{
Ref: uint64(i),
T: int64(j),
V: float64(i),
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
}
exemplars = append(exemplars, e)
}
series = append(series, record.RefSeries{
Ref: uint64(i),
Labels: labels.Labels{{Name: "__name__", Value: name}},
})
}
return exemplars, series
}
func getSeriesNameFromRef(r record.RefSeries) string {
for _, l := range r.Labels {
if l.Name == "__name__" {
@ -456,13 +517,15 @@ func getSeriesNameFromRef(r record.RefSeries) string {
}
type TestWriteClient struct {
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
receivedMetadata map[string][]prompb.MetricMetadata
withWaitGroup bool
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
receivedExemplars map[string][]prompb.Exemplar
expectedExemplars map[string][]prompb.Exemplar
receivedMetadata map[string][]prompb.MetricMetadata
withWaitGroup bool
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
}
func NewTestWriteClient() *TestWriteClient {
@ -494,7 +557,29 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R
c.wg.Add(len(ss))
}
func (c *TestWriteClient) waitForExpectedSamples(tb testing.TB) {
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedExemplars = map[string][]prompb.Exemplar{}
c.receivedExemplars = map[string][]prompb.Exemplar{}
for _, s := range ss {
seriesName := getSeriesNameFromRef(series[s.Ref])
e := prompb.Exemplar{
Labels: labelsToLabelsProto(s.Labels, nil),
Timestamp: s.T,
Value: s.V,
}
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
}
c.wg.Add(len(ss))
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
if !c.withWaitGroup {
return
}
@ -504,9 +589,12 @@ func (c *TestWriteClient) waitForExpectedSamples(tb testing.TB) {
for ts, expectedSamples := range c.expectedSamples {
require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts)
}
for ts, expectedExemplar := range c.expectedExemplars {
require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts)
}
}
func (c *TestWriteClient) expectSampleCount(numSamples int) {
func (c *TestWriteClient) expectDataCount(numSamples int) {
if !c.withWaitGroup {
return
}
@ -515,7 +603,7 @@ func (c *TestWriteClient) expectSampleCount(numSamples int) {
c.wg.Add(numSamples)
}
func (c *TestWriteClient) waitForExpectedSampleCount() {
func (c *TestWriteClient) waitForExpectedDataCount() {
if !c.withWaitGroup {
return
}
@ -553,6 +641,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
}
for _, ex := range ts.Exemplars {
count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
}
}
if c.withWaitGroup {
c.wg.Add(-count)
@ -621,7 +714,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -630,9 +723,9 @@ func BenchmarkSampleDelivery(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.expectSampleCount(len(samples))
c.expectDataCount(len(samples))
m.Append(samples)
c.waitForExpectedSampleCount()
c.waitForExpectedDataCount()
}
// Do not include shutdown
b.StopTimer()
@ -667,7 +760,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil)
cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
@ -719,7 +812,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
@ -745,8 +838,8 @@ func TestCalculateDesiredShards(t *testing.T) {
// helper function for sending samples.
sendSamples := func(s int64, ts time.Duration) {
pendingSamples -= s
m.samplesOut.incr(s)
m.samplesOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration))
m.dataOut.incr(s)
m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration))
// highest sent is how far back pending samples would be at our input rate.
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)

View file

@ -37,6 +37,12 @@ var (
Name: "samples_in_total",
Help: "Samples in to remote storage, compare to samples out for queue managers.",
})
exemplarsIn = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_in_total",
Help: "Exemplars in to remote storage, compare to exemplars out for queue managers.",
})
)
// WriteStorage represents all the remote write storage.
@ -169,6 +175,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.interner,
rws.highestTimestamp,
rws.scraper,
rwConf.SendExemplars,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)
@ -210,6 +217,7 @@ func (rws *WriteStorage) Close() error {
type timestampTracker struct {
writeStorage *WriteStorage
samples int64
exemplars int64
highestTimestamp int64
highestRecvTimestamp *maxTimestamp
}
@ -224,14 +232,16 @@ func (t *timestampTracker) Append(_ uint64, _ labels.Labels, ts int64, _ float64
}
func (t *timestampTracker) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) {
t.exemplars++
return 0, nil
}
// Commit implements storage.Appender.
func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples)
t.writeStorage.samplesIn.incr(t.samples + t.exemplars)
samplesIn.Add(float64(t.samples))
exemplarsIn.Add(float64(t.exemplars))
t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000))
return nil
}

View file

@ -86,3 +86,35 @@ and specify an interval for which samples of a series got deleted.
│ . . . │
└─────────────────────────────────────────────────────┘
```
### Exemplar records
Exemplar records encode exemplars as a list of triples `(series_id, timestamp, value)`
plus the length of the labels list, and all the labels.
The first row stores the starting id and the starting timestamp.
Series reference and timestamp are encoded as deltas w.r.t the first exemplar.
The first exemplar record begins at the second row.
See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
```
┌──────────────────────────────────────────────────────────────────┐
│ type = 5 <1b>
├──────────────────────────────────────────────────────────────────┤
│ ┌────────────────────┬───────────────────────────┐ │
│ │ id <8b> │ timestamp <8b> │ │
│ └────────────────────┴───────────────────────────┘ │
│ ┌────────────────────┬───────────────────────────┬─────────────┐ │
│ │ id_delta <uvarint> │ timestamp_delta <uvarint> │ value <8b> │ │
│ ├────────────────────┴───────────────────────────┴─────────────┤ │
│ │ n = len(labels) <uvarint> │ │
│ ├──────────────────────┬───────────────────────────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────────────────────────────┤ │
│ │ ... │ │
│ ├───────────────────────┬──────────────────────────────────────┤ │
│ │ len(str_2n) <uvarint> │ str_2n <bytes> │ │ │
│ └───────────────────────┴────────────────┴─────────────────────┘ │
│ . . . │
└──────────────────────────────────────────────────────────────────┘
```

View file

@ -17,6 +17,7 @@ import (
"context"
"sort"
"sync"
"unicode/utf8"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/exemplar"
@ -83,7 +84,7 @@ func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarSto
}),
outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total",
Help: "Total number of out of order exemplar ingestion failed attempts",
Help: "Total number of out of order exemplar ingestion failed attempts.",
}),
}
if reg != nil {
@ -165,6 +166,51 @@ Outer:
return false
}
func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error {
seriesLabels := l.String()
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
// Optimize by moving the lock to be per series (& benchmark it).
ce.lock.RLock()
defer ce.lock.RUnlock()
return ce.validateExemplar(seriesLabels, e, false)
}
// Not thread safe. The append parameters tells us whether this is an external validation, or interal
// as a reuslt of an AddExemplar call, in which case we should update any relevant metrics.
func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error {
idx, ok := ce.index[l]
if !ok {
return nil
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definiton of const ExemplarMaxLabelLength.
labelSetLen := 0
for _, l := range e.Labels {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return storage.ErrExemplarLabelLength
}
}
// Check for duplicate vs last stored exemplar for this series.
// NB these are expected, and appending them is a no-op.
if ce.exemplars[idx.newest].exemplar.Equals(e) {
return storage.ErrDuplicateExemplar
}
if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
if append {
ce.outOfOrderExemplars.Inc()
}
return storage.ErrOutOfOrderExemplar
}
return nil
}
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
seriesLabels := l.String()
@ -173,21 +219,19 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
ce.lock.Lock()
defer ce.lock.Unlock()
idx, ok := ce.index[seriesLabels]
err := ce.validateExemplar(seriesLabels, e, true)
if err != nil {
if err == storage.ErrDuplicateExemplar {
// Duplicate exemplar, noop.
return nil
}
return err
}
_, ok := ce.index[seriesLabels]
if !ok {
ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
} else {
// Check for duplicate vs last stored exemplar for this series.
// NB these are expected, add appending them is a no-op.
if ce.exemplars[idx.newest].exemplar.Equals(e) {
return nil
}
if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
ce.outOfOrderExemplars.Inc()
return storage.ErrOutOfOrderExemplar
}
ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex
}
@ -218,13 +262,13 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
ce.seriesWithExemplarsInStorage.Set(float64(len(ce.index)))
if next := ce.exemplars[ce.nextIndex]; next != nil {
ce.exemplarsInStorage.Set(float64(len(ce.exemplars)))
ce.lastExemplarsTs.Set(float64(next.exemplar.Ts))
ce.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000)
return nil
}
// We did not yet fill the buffer.
ce.exemplarsInStorage.Set(float64(ce.nextIndex))
ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts))
ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
return nil
}
@ -234,6 +278,10 @@ func (noopExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) err
return nil
}
func (noopExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error {
return nil
}
func (noopExemplarStorage) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) {
return &noopExemplarQuerier{}, nil
}

View file

@ -16,6 +16,7 @@ package tsdb
import (
"reflect"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
@ -25,6 +26,66 @@ import (
"github.com/prometheus/prometheus/storage"
)
// Tests the same exemplar cases as AddExemplar, but specfically the ValidateExemplar function so it can be relied on externally.
func TestValidateExemplar(t *testing.T) {
exs, err := NewCircularExemplarStorage(2, nil)
require.NoError(t, err)
es := exs.(*CircularExemplarStorage)
l := labels.Labels{
{Name: "service", Value: "asdf"},
}
e := exemplar.Exemplar{
Labels: labels.Labels{
labels.Label{
Name: "traceID",
Value: "qwerty",
},
},
Value: 0.1,
Ts: 1,
}
require.NoError(t, es.ValidateExemplar(l, e))
require.NoError(t, es.AddExemplar(l, e))
e2 := exemplar.Exemplar{
Labels: labels.Labels{
labels.Label{
Name: "traceID",
Value: "zxcvb",
},
},
Value: 0.1,
Ts: 2,
}
require.NoError(t, es.ValidateExemplar(l, e2))
require.NoError(t, es.AddExemplar(l, e2))
require.Equal(t, es.ValidateExemplar(l, e2), storage.ErrDuplicateExemplar, "error is expected attempting to validate duplicate exemplar")
e3 := e2
e3.Ts = 3
require.Equal(t, es.ValidateExemplar(l, e3), storage.ErrDuplicateExemplar, "error is expected when attempting to add duplicate exemplar, even with different timestamp")
e3.Ts = 1
e3.Value = 0.3
require.Equal(t, es.ValidateExemplar(l, e3), storage.ErrOutOfOrderExemplar)
e4 := exemplar.Exemplar{
Labels: labels.Labels{
labels.Label{
Name: "a",
Value: strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength),
},
},
Value: 0.1,
Ts: 2,
}
require.Equal(t, storage.ErrExemplarLabelLength, es.ValidateExemplar(l, e4))
}
func TestAddExemplar(t *testing.T) {
exs, err := NewCircularExemplarStorage(2, nil)
require.NoError(t, err)
@ -44,8 +105,7 @@ func TestAddExemplar(t *testing.T) {
Ts: 1,
}
err = es.AddExemplar(l, e)
require.NoError(t, err)
require.NoError(t, es.AddExemplar(l, e))
require.Equal(t, es.index[l.String()].newest, 0, "exemplar was not stored correctly")
e2 := exemplar.Exemplar{
@ -59,23 +119,31 @@ func TestAddExemplar(t *testing.T) {
Ts: 2,
}
err = es.AddExemplar(l, e2)
require.NoError(t, err)
require.NoError(t, es.AddExemplar(l, e2))
require.Equal(t, es.index[l.String()].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update")
require.True(t, es.exemplars[es.index[l.String()].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].newest].exemplar)
err = es.AddExemplar(l, e2)
require.NoError(t, err, "no error is expected attempting to add duplicate exemplar")
require.NoError(t, es.AddExemplar(l, e2), "no error is expected attempting to add duplicate exemplar")
e3 := e2
e3.Ts = 3
err = es.AddExemplar(l, e3)
require.NoError(t, err, "no error is expected when attempting to add duplicate exemplar, even with different timestamp")
require.NoError(t, es.AddExemplar(l, e3), "no error is expected when attempting to add duplicate exemplar, even with different timestamp")
e3.Ts = 1
e3.Value = 0.3
err = es.AddExemplar(l, e3)
require.Equal(t, err, storage.ErrOutOfOrderExemplar)
require.Equal(t, storage.ErrOutOfOrderExemplar, es.AddExemplar(l, e3))
e4 := exemplar.Exemplar{
Labels: labels.Labels{
labels.Label{
Name: "a",
Value: strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength),
},
},
Value: 0.1,
Ts: 2,
}
require.Equal(t, storage.ErrExemplarLabelLength, es.AddExemplar(l, e4))
}
func TestStorageOverflow(t *testing.T) {

View file

@ -58,6 +58,7 @@ var (
type ExemplarStorage interface {
storage.ExemplarQueryable
AddExemplar(labels.Labels, exemplar.Exemplar) error
ValidateExemplar(labels.Labels, exemplar.Exemplar) error
}
// Head handles reads and writes of time series data within a time window.
@ -459,15 +460,17 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs atomic.Uint64
var unknownExemplarRefs atomic.Uint64
// Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n)
outputs = make([]chan []record.RefSample, n)
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n)
outputs = make([]chan []record.RefSample, n)
exemplarsInput chan record.RefExemplar
dec record.Decoder
shards = make([][]record.RefSample, n)
@ -489,6 +492,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
return []tombstones.Stone{}
},
}
exemplarsPool = sync.Pool{
New: func() interface{} {
return []record.RefExemplar{}
},
}
)
defer func() {
@ -500,6 +508,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
for range outputs[i] {
}
}
close(exemplarsInput)
wg.Wait()
}
}()
@ -516,6 +525,29 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}(inputs[i], outputs[i])
}
wg.Add(1)
exemplarsInput = make(chan record.RefExemplar, 300)
go func(input <-chan record.RefExemplar) {
defer wg.Done()
for e := range input {
ms := h.series.getByID(e.Ref)
if ms == nil {
unknownExemplarRefs.Inc()
continue
}
if e.T < h.minValidTime.Load() {
continue
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
if err != nil && err == storage.ErrOutOfOrderExemplar {
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
}
}
}(exemplarsInput)
go func() {
defer close(decoded)
for r.Next() {
@ -557,6 +589,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
return
}
decoded <- tstones
case record.Exemplars:
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0]
exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil {
decodeErr = &wal.CorruptionErr{
Err: errors.Wrap(err, "decode exemplars"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- exemplars
default:
// Noop.
}
@ -646,6 +690,12 @@ Outer:
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
tstonesPool.Put(v)
case []record.RefExemplar:
for _, e := range v {
exemplarsInput <- e
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -667,14 +717,15 @@ Outer:
for range outputs[i] {
}
}
close(exemplarsInput)
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
if unknownRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "count", unknownRefs.Load())
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load())
}
return nil
}
@ -1339,6 +1390,15 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
err := a.exemplarAppender.ValidateExemplar(s.lset, e)
if err != nil {
if err == storage.ErrDuplicateExemplar {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
return 0, err
}
a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e})
return s.ref, nil
@ -1382,14 +1442,36 @@ func (a *headAppender) log() error {
return errors.Wrap(err, "log samples")
}
}
if len(a.exemplars) > 0 {
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log exemplars")
}
}
return nil
}
func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar {
ret := make([]record.RefExemplar, 0, len(es))
for _, e := range es {
ret = append(ret, record.RefExemplar{
Ref: e.ref,
T: e.exemplar.Ts,
V: e.exemplar.Value,
Labels: e.exemplar.Labels,
})
}
return ret
}
func (a *headAppender) Commit() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
if err := a.log(); err != nil {
_ = a.Rollback() // Most likely the same error will happen again.
return errors.Wrap(err, "write to WAL")
@ -1404,7 +1486,6 @@ func (a *headAppender) Commit() (err error) {
continue
}
level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err)
continue
}
}
@ -1458,7 +1539,9 @@ func (a *headAppender) Rollback() (err error) {
series.Unlock()
}
a.head.putAppendBuffer(a.samples)
a.head.putExemplarBuffer(a.exemplars)
a.samples = nil
a.exemplars = nil
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.

View file

@ -51,6 +51,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
opts := DefaultHeadOptions()
opts.ChunkRange = chunkRange
opts.ChunkDirRoot = dir
opts.NumExemplars = 10
h, err := NewHead(nil, nil, wlog, opts)
require.NoError(t, err)
@ -87,6 +88,8 @@ func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
require.NoError(t, w.Log(enc.Samples(v, nil)))
case []tombstones.Stone:
require.NoError(t, w.Log(enc.Tombstones(v, nil)))
case []record.RefExemplar:
require.NoError(t, w.Log(enc.Exemplars(v, nil)))
}
}
}
@ -148,61 +151,91 @@ func BenchmarkLoadWAL(b *testing.B) {
}
labelsPerSeries := 5
// Rough estimates of most common % of samples that have an exemplar for each scrape.
exemplarsPercentages := []float64{0, 0.5, 1, 5}
lastExemplarsPerSeries := -1
for _, c := range cases {
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries),
func(b *testing.B) {
dir, err := ioutil.TempDir("", "test_load_wal")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
require.NoError(b, err)
// Write series.
refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
for k := 0; k < c.batches; k++ {
refSeries = refSeries[:0]
for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ {
lbls := make(map[string]string, labelsPerSeries)
lbls[defaultLabelName] = strconv.Itoa(i)
for j := 1; len(lbls) < labelsPerSeries; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)})
}
populateTestWAL(b, w, []interface{}{refSeries})
}
// Write samples.
refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
for i := 0; i < c.samplesPerSeries; i++ {
for j := 0; j < c.batches; j++ {
refSamples = refSamples[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refSamples = append(refSamples, record.RefSample{
Ref: uint64(k) * 100,
T: int64(i) * 10,
V: float64(i) * 100,
})
}
populateTestWAL(b, w, []interface{}{refSamples})
}
}
b.ResetTimer()
// Load the WAL.
for i := 0; i < b.N; i++ {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
h, err := NewHead(nil, nil, w, opts)
for _, p := range exemplarsPercentages {
exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100))
// For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries
// multiple times without this check.
if exemplarsPerSeries == lastExemplarsPerSeries {
continue
}
lastExemplarsPerSeries = exemplarsPerSeries
// fmt.Println("exemplars per series: ", exemplarsPerSeries)
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries),
func(b *testing.B) {
dir, err := ioutil.TempDir("", "test_load_wal")
require.NoError(b, err)
h.Init(0)
}
})
defer func() {
require.NoError(b, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
require.NoError(b, err)
// Write series.
refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
for k := 0; k < c.batches; k++ {
refSeries = refSeries[:0]
for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ {
lbls := make(map[string]string, labelsPerSeries)
lbls[defaultLabelName] = strconv.Itoa(i)
for j := 1; len(lbls) < labelsPerSeries; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)})
}
populateTestWAL(b, w, []interface{}{refSeries})
}
// Write samples.
refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
for i := 0; i < c.samplesPerSeries; i++ {
for j := 0; j < c.batches; j++ {
refSamples = refSamples[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refSamples = append(refSamples, record.RefSample{
Ref: uint64(k) * 100,
T: int64(i) * 10,
V: float64(i) * 100,
})
}
populateTestWAL(b, w, []interface{}{refSamples})
}
}
// Write samples.
refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch)
for i := 0; i < exemplarsPerSeries; i++ {
for j := 0; j < c.batches; j++ {
refExemplars = refExemplars[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refExemplars = append(refExemplars, record.RefExemplar{
Ref: uint64(k) * 100,
T: int64(i) * 10,
V: float64(i) * 100,
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
})
}
populateTestWAL(b, w, []interface{}{refExemplars})
}
}
b.ResetTimer()
// Load the WAL.
for i := 0; i < b.N; i++ {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
h, err := NewHead(nil, nil, w, opts)
require.NoError(b, err)
h.Init(0)
}
})
}
}
}
@ -233,6 +266,9 @@ func TestHead_ReadWAL(t *testing.T) {
[]tombstones.Stone{
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
},
[]record.RefExemplar{
{Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("traceID", "asdf")},
},
}
head, w := newTestHead(t, 1000, compress)
@ -266,6 +302,12 @@ func TestHead_ReadWAL(t *testing.T) {
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
require.NoError(t, err)
require.Equal(t, e[0].Exemplars[0], exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("traceID", "asdf")})
})
}
}

View file

@ -37,6 +37,8 @@ const (
Samples Type = 2
// Tombstones is used to match WAL records of type Tombstones.
Tombstones Type = 3
// Exemplars is used to match WAL records of type Exemplars.
Exemplars Type = 4
)
var (
@ -57,6 +59,14 @@ type RefSample struct {
V float64
}
// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
type RefExemplar struct {
Ref uint64
T int64
V float64
Labels labels.Labels
}
// Decoder decodes series, sample, and tombstone records.
// The zero value is ready to use.
type Decoder struct {
@ -69,7 +79,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown
}
switch t := Type(rec[0]); t {
case Series, Samples, Tombstones:
case Series, Samples, Tombstones, Exemplars:
return t
}
return Unknown
@ -166,6 +176,48 @@ func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombston
return tstones, nil
}
func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != Exemplars {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
return exemplars, nil
}
var (
baseRef = dec.Be64()
baseTime = dec.Be64int64()
)
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
val := dec.Be64()
lset := make(labels.Labels, dec.Uvarint())
for i := range lset {
lset[i].Name = dec.UvarintStr()
lset[i].Value = dec.UvarintStr()
}
sort.Sort(lset)
exemplars = append(exemplars, RefExemplar{
Ref: baseRef + uint64(dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
Labels: lset,
})
}
if dec.Err() != nil {
return nil, errors.Wrapf(dec.Err(), "decode error after %d exemplars", len(exemplars))
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return exemplars, nil
}
// Encoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type Encoder struct {
@ -226,3 +278,33 @@ func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte {
}
return buf.Get()
}
func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(Exemplars))
if len(exemplars) == 0 {
return buf.Get()
}
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
first := exemplars[0]
buf.PutBE64(first.Ref)
buf.PutBE64int64(first.T)
for _, ex := range exemplars {
buf.PutVarint64(int64(ex.Ref) - int64(first.Ref))
buf.PutVarint64(ex.T - first.T)
buf.PutBE64(math.Float64bits(ex.V))
buf.PutUvarint(len(ex.Labels))
for _, l := range ex.Labels {
buf.PutUvarintStr(l.Name)
buf.PutUvarintStr(l.Value)
}
}
return buf.Get()
}

View file

@ -74,6 +74,15 @@ func TestRecord_EncodeDecode(t *testing.T) {
{Ref: 13, Intervals: tombstones.Intervals{{Mint: -1000, Maxt: -11}}},
{Ref: 13, Intervals: tombstones.Intervals{{Mint: 5000, Maxt: 1000}}},
}, decTstones)
exemplars := []RefExemplar{
{Ref: 0, T: 12423423, V: 1.2345, Labels: labels.FromStrings("traceID", "qwerty")},
{Ref: 123, T: -1231, V: -123, Labels: labels.FromStrings("traceID", "asdf")},
{Ref: 2, T: 0, V: 99999, Labels: labels.FromStrings("traceID", "zxcv")},
}
decExemplars, err := dec.Exemplars(enc.Exemplars(exemplars, nil), nil)
require.NoError(t, err)
require.Equal(t, exemplars, decExemplars)
}
// TestRecord_Corrupted ensures that corrupted records return the correct error.
@ -117,6 +126,16 @@ func TestRecord_Corrupted(t *testing.T) {
_, err := dec.Tombstones(corrupted, nil)
require.Equal(t, err, encoding.ErrInvalidSize)
})
t.Run("Test corrupted exemplar record", func(t *testing.T) {
exemplars := []RefExemplar{
{Ref: 0, T: 12423423, V: 1.2345, Labels: labels.FromStrings("traceID", "asdf")},
}
corrupted := enc.Exemplars(exemplars, nil)[:8]
_, err := dec.Exemplars(corrupted, nil)
require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize)
})
}
func TestRecord_Type(t *testing.T) {

View file

@ -40,9 +40,11 @@ type CheckpointStats struct {
DroppedSeries int
DroppedSamples int
DroppedTombstones int
DroppedExemplars int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples including dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
TotalExemplars int // Processed exemplars including dropped ones.
}
// LastCheckpoint returns the directory name and index of the most recent checkpoint.
@ -144,16 +146,17 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bo
r := NewReader(sgmReader)
var (
series []record.RefSeries
samples []record.RefSample
tstones []tombstones.Stone
dec record.Decoder
enc record.Encoder
buf []byte
recs [][]byte
series []record.RefSeries
samples []record.RefSample
tstones []tombstones.Stone
exemplars []record.RefExemplar
dec record.Decoder
enc record.Encoder
buf []byte
recs [][]byte
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
series, samples, tstones, exemplars = series[:0], samples[:0], tstones[:0], exemplars[:0]
// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
@ -219,6 +222,23 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bo
stats.TotalTombstones += len(tstones)
stats.DroppedTombstones += len(tstones) - len(repl)
case record.Exemplars:
exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil {
return nil, errors.Wrap(err, "decode exemplars")
}
// Drop irrelevant exemplars in place.
repl := exemplars[:0]
for _, e := range exemplars {
if e.T >= mint {
repl = append(repl, e)
}
}
if len(repl) > 0 {
buf = enc.Exemplars(repl, buf)
}
stats.TotalExemplars += len(exemplars)
stats.DroppedExemplars += len(exemplars) - len(repl)
default:
// Unknown record type, probably from a future Prometheus version.
continue

View file

@ -177,6 +177,11 @@ func TestCheckpoint(t *testing.T) {
}, nil)
require.NoError(t, w.Log(b))
b = enc.Exemplars([]record.RefExemplar{
{Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i))},
}, nil)
require.NoError(t, w.Log(b))
last += 100
}
require.NoError(t, w.Close())
@ -215,6 +220,12 @@ func TestCheckpoint(t *testing.T) {
for _, s := range samples {
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
}
case record.Exemplars:
exemplars, err := dec.Exemplars(rec, nil)
require.NoError(t, err)
for _, e := range exemplars {
require.GreaterOrEqual(t, e.T, last/2, "exemplar with wrong timestamp")
}
}
}
require.NoError(t, r.Err())

View file

@ -46,6 +46,7 @@ const (
// and it is left to the implementer to make sure they are safe.
type WriteTo interface {
Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool
StoreSeries([]record.RefSeries, int)
// SeriesReset is called after reading a checkpoint to allow the deletion
// of all series created in a segment lower than the argument.
@ -66,6 +67,7 @@ type Watcher struct {
logger log.Logger
walDir string
lastCheckpoint string
sendExemplars bool
metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics
@ -136,7 +138,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
}
// NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher {
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string, sendExemplars bool) *Watcher {
if logger == nil {
logger = log.NewNopLogger()
}
@ -147,8 +149,10 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
readerMetrics: readerMetrics,
walDir: path.Join(walDir, "wal"),
name: name,
quit: make(chan struct{}),
done: make(chan struct{}),
sendExemplars: sendExemplars,
quit: make(chan struct{}),
done: make(chan struct{}),
MaxSegment: -1,
}
@ -462,10 +466,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var (
dec record.Decoder
series []record.RefSeries
samples []record.RefSample
send []record.RefSample
dec record.Decoder
series []record.RefSeries
samples []record.RefSample
send []record.RefSample
exemplars []record.RefExemplar
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
@ -507,6 +512,23 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
send = send[:0]
}
case record.Exemplars:
// Skip if experimental "exemplars over remote write" is not enabled.
if !w.sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if !tail {
break
}
exemplars, err := dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.AppendExemplars(exemplars)
case record.Tombstones:
default:

View file

@ -50,6 +50,7 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) {
type writeToMock struct {
samplesAppended int
exemplarsAppended int
seriesLock sync.Mutex
seriesSegmentIndexes map[uint64]int
}
@ -59,6 +60,11 @@ func (wtm *writeToMock) Append(s []record.RefSample) bool {
return true
}
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
wtm.exemplarsAppended += len(e)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
@ -95,6 +101,7 @@ func TestTailSamples(t *testing.T) {
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
const exemplarsCount = 25
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
now := time.Now()
@ -138,6 +145,19 @@ func TestTailSamples(t *testing.T) {
}, nil)
require.NoError(t, w.Log(sample))
}
for j := 0; j < exemplarsCount; j++ {
inner := rand.Intn(ref + 1)
exemplar := enc.Exemplars([]record.RefExemplar{
{
Ref: uint64(inner),
T: now.UnixNano() + 1,
V: float64(i),
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", inner)),
},
}, nil)
require.NoError(t, w.Log(exemplar))
}
}
// Start read after checkpoint, no more data written.
@ -145,7 +165,7 @@ func TestTailSamples(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true)
watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers.
@ -162,11 +182,13 @@ func TestTailSamples(t *testing.T) {
expectedSeries := seriesCount
expectedSamples := seriesCount * samplesCount
expectedExemplars := seriesCount * exemplarsCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries
})
require.Equal(t, expectedSeries, wt.checkNumLabels())
require.Equal(t, expectedSamples, wt.samplesAppended)
require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
})
}
}
@ -229,7 +251,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false)
go watcher.Start()
expected := seriesCount
@ -322,7 +344,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false)
go watcher.Start()
expected := seriesCount * 2
@ -392,7 +414,7 @@ func TestReadCheckpoint(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false)
go watcher.Start()
expectedSeries := seriesCount
@ -465,7 +487,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
}
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false)
watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
@ -541,7 +563,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false)
watcher.MaxSegment = -1
go watcher.Start()