From 8fd73b1d281d3227c337b291e8c797a85251d3d5 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 6 May 2021 13:53:52 -0700 Subject: [PATCH] Add Exemplar Remote Write support (#8296) * Write exemplars to the WAL and send them over remote write. Signed-off-by: Callum Styan * Update example for exemplars, print data in a more obvious format. Signed-off-by: Callum Styan * Add metrics for remote write of exemplars. Signed-off-by: Callum Styan * Fix incorrect slices passed to send in remote write. Signed-off-by: Callum Styan * We need to unregister the new metrics. Signed-off-by: Callum Styan * Address review comments Signed-off-by: Callum Styan * Order of exemplar append vs write exemplar to WAL needs to change. Signed-off-by: Callum Styan * Several fixes to prevent sending uninitialized or incorrect samples with an exemplar. Fix dropping exemplar for missing series. Add tests for queue_manager sending exemplars Signed-off-by: Martin Disibio * Store both samples and exemplars in the same timeseries buffer to remove the alloc when building final request, keep sub-slices in separate buffers for re-use Signed-off-by: Martin Disibio * Condense sample/exemplar delivery tests to parameterized sub-tests Signed-off-by: Martin Disibio * Rename test methods for clarity now that they also handle exemplars Signed-off-by: Martin Disibio * Rename counter variable. Fix instances where metrics were not updated correctly Signed-off-by: Martin Disibio * Add exemplars to LoadWAL benchmark Signed-off-by: Callum Styan * last exemplars timestamp metric needs to convert value to seconds with ms precision Signed-off-by: Callum Styan * Process exemplar records in a separate go routine when loading the WAL. Signed-off-by: Callum Styan * Address review comments related to clarifying comments and variable names. Also refactor sample/exemplar to enqueue prompb types. Signed-off-by: Callum Styan * Regenerate types proto with comments, update protoc version again. Signed-off-by: Callum Styan * Put remote write of exemplars behind a feature flag. Signed-off-by: Callum Styan * Address some of Ganesh's review comments. Signed-off-by: Callum Styan * Move exemplar remote write feature flag to a config file field. Signed-off-by: Callum Styan * Address Bartek's review comments. Signed-off-by: Callum Styan * Don't allocate exemplar buffers in queue_manager if we're not going to send exemplars over remote write. Signed-off-by: Callum Styan * Add ValidateExemplar function, validate exemplars when appending to head and log them all to WAL before adding them to exemplar storage. Signed-off-by: Callum Styan * Address more reivew comments from Ganesh. Signed-off-by: Callum Styan * Add exemplar total label length check. Signed-off-by: Callum Styan * Address a few last review comments Signed-off-by: Callum Styan Co-authored-by: Martin Disibio --- .circleci/config.yml | 2 +- config/config.go | 1 + docs/configuration/configuration.md | 3 + docs/disabled_features.md | 3 +- .../example_write_adapter/server.go | 10 +- pkg/exemplar/exemplar.go | 10 +- prompb/types.pb.go | 444 +++++++++++++++--- prompb/types.proto | 14 + promql/test.go | 15 +- scrape/scrape.go | 1 - scripts/genproto.sh | 4 +- storage/interface.go | 3 + storage/remote/queue_manager.go | 398 +++++++++++----- storage/remote/queue_manager_test.go | 217 ++++++--- storage/remote/write.go | 12 +- tsdb/docs/format/wal.md | 32 ++ tsdb/exemplar.go | 78 ++- tsdb/exemplar_test.go | 88 +++- tsdb/head.go | 97 +++- tsdb/head_test.go | 148 +++--- tsdb/record/record.go | 84 +++- tsdb/record/record_test.go | 19 + tsdb/wal/checkpoint.go | 36 +- tsdb/wal/checkpoint_test.go | 11 + tsdb/wal/watcher.go | 36 +- tsdb/wal/watcher_test.go | 38 +- 26 files changed, 1443 insertions(+), 361 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index d3afc6ec50..2f1f260c8e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -41,7 +41,7 @@ jobs: GOMAXPROCS: "2" GO111MODULE: "on" - prometheus/check_proto: - version: "3.12.3" + version: "3.15.8" - prometheus/store_artifact: file: prometheus - prometheus/store_artifact: diff --git a/config/config.go b/config/config.go index e4045eb05f..d71f6b72ec 100644 --- a/config/config.go +++ b/config/config.go @@ -632,6 +632,7 @@ type RemoteWriteConfig struct { Headers map[string]string `yaml:"headers,omitempty"` WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` Name string `yaml:"name,omitempty"` + SendExemplars bool `yaml:"send_exemplars,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index ac45fa82f2..62208ab771 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -2141,6 +2141,9 @@ write_relabel_configs: # remote write configs. [ name: ] +# Enables sending of exemplars over remote write. Note that exemplar storage itself must be enabled for exemplars to be scraped in the first place. +[ send_exemplars: | default = false ] + # Sets the `Authorization` header on every remote write request with the # configured username and password. # password and password_file are mutually exclusive. diff --git a/docs/disabled_features.md b/docs/disabled_features.md index 65abd296d2..4066327a3e 100644 --- a/docs/disabled_features.md +++ b/docs/disabled_features.md @@ -52,5 +52,4 @@ The remote write receiver allows Prometheus to accept remote write requests from [OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars) introduces the ability for scrape targets to add exemplars to certain metrics. Exemplars are references to data outside of the MetricSet. A common use case are IDs of program traces. -Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=` uses roughly 100 bytes of memory via the in-memory exemplar storage. - +Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=` uses roughly 100 bytes of memory via the in-memory exemplar storage. If the exemplar storage is enabled, we will also append the exemplars to WAL for local persistence (for WAL duration). \ No newline at end of file diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index c61ed7b410..dc2bd0e70a 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -39,7 +39,15 @@ func main() { fmt.Println(m) for _, s := range ts.Samples { - fmt.Printf(" %f %d\n", s.Value, s.Timestamp) + fmt.Printf("\tSample: %f %d\n", s.Value, s.Timestamp) + } + + for _, e := range ts.Exemplars { + m := make(model.Metric, len(e.Labels)) + for _, l := range e.Labels { + m[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + fmt.Printf("\tExemplar: %+v %f %d\n", m, e.Value, e.Timestamp) } } }) diff --git a/pkg/exemplar/exemplar.go b/pkg/exemplar/exemplar.go index 8e3e01b5c9..27ba64d4b8 100644 --- a/pkg/exemplar/exemplar.go +++ b/pkg/exemplar/exemplar.go @@ -15,12 +15,16 @@ package exemplar import "github.com/prometheus/prometheus/pkg/labels" +// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters +// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars +const ExemplarMaxLabelSetLength = 128 + // Exemplar is additional information associated with a time series. type Exemplar struct { - Labels labels.Labels - Value float64 + Labels labels.Labels `json:"labels"` + Value float64 `json:"value"` + Ts int64 `json:"timestamp"` HasTs bool - Ts int64 } type QueryResult struct { diff --git a/prompb/types.pb.go b/prompb/types.pb.go index e09215adae..b29170e535 100644 --- a/prompb/types.pb.go +++ b/prompb/types.pb.go @@ -96,7 +96,7 @@ func (x LabelMatcher_Type) String() string { } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{5, 0} + return fileDescriptor_d938547f84707355, []int{6, 0} } // We require this to match chunkenc.Encoding. @@ -122,7 +122,7 @@ func (x Chunk_Encoding) String() string { } func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{7, 0} + return fileDescriptor_d938547f84707355, []int{8, 0} } type MetricMetadata struct { @@ -199,7 +199,9 @@ func (m *MetricMetadata) GetUnit() string { } type Sample struct { - Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` + Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` + // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -253,20 +255,89 @@ func (m *Sample) GetTimestamp() int64 { return 0 } -// TimeSeries represents samples and labels for a single time series. -type TimeSeries struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` +type Exemplar struct { + // Optional, can be empty. + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Value float64 `protobuf:"fixed64,2,opt,name=value,proto3" json:"value,omitempty"` + // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } +func (m *Exemplar) Reset() { *m = Exemplar{} } +func (m *Exemplar) String() string { return proto.CompactTextString(m) } +func (*Exemplar) ProtoMessage() {} +func (*Exemplar) Descriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{2} +} +func (m *Exemplar) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Exemplar) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Exemplar.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Exemplar) XXX_Merge(src proto.Message) { + xxx_messageInfo_Exemplar.Merge(m, src) +} +func (m *Exemplar) XXX_Size() int { + return m.Size() +} +func (m *Exemplar) XXX_DiscardUnknown() { + xxx_messageInfo_Exemplar.DiscardUnknown(m) +} + +var xxx_messageInfo_Exemplar proto.InternalMessageInfo + +func (m *Exemplar) GetLabels() []Label { + if m != nil { + return m.Labels + } + return nil +} + +func (m *Exemplar) GetValue() float64 { + if m != nil { + return m.Value + } + return 0 +} + +func (m *Exemplar) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +// TimeSeries represents samples and labels for a single time series. +type TimeSeries struct { + // For a timeseries to be valid, and for the samples and exemplars + // to be ingested by the remote system properly, the labels field is required. + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` + Exemplars []Exemplar `protobuf:"bytes,3,rep,name=exemplars,proto3" json:"exemplars"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + func (m *TimeSeries) Reset() { *m = TimeSeries{} } func (m *TimeSeries) String() string { return proto.CompactTextString(m) } func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{2} + return fileDescriptor_d938547f84707355, []int{3} } func (m *TimeSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -309,6 +380,13 @@ func (m *TimeSeries) GetSamples() []Sample { return nil } +func (m *TimeSeries) GetExemplars() []Exemplar { + if m != nil { + return m.Exemplars + } + return nil +} + type Label struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` @@ -321,7 +399,7 @@ func (m *Label) Reset() { *m = Label{} } func (m *Label) String() string { return proto.CompactTextString(m) } func (*Label) ProtoMessage() {} func (*Label) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{3} + return fileDescriptor_d938547f84707355, []int{4} } func (m *Label) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -375,7 +453,7 @@ func (m *Labels) Reset() { *m = Labels{} } func (m *Labels) String() string { return proto.CompactTextString(m) } func (*Labels) ProtoMessage() {} func (*Labels) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{4} + return fileDescriptor_d938547f84707355, []int{5} } func (m *Labels) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -425,7 +503,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{5} + return fileDescriptor_d938547f84707355, []int{6} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -492,7 +570,7 @@ func (m *ReadHints) Reset() { *m = ReadHints{} } func (m *ReadHints) String() string { return proto.CompactTextString(m) } func (*ReadHints) ProtoMessage() {} func (*ReadHints) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{6} + return fileDescriptor_d938547f84707355, []int{7} } func (m *ReadHints) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -586,7 +664,7 @@ func (m *Chunk) Reset() { *m = Chunk{} } func (m *Chunk) String() string { return proto.CompactTextString(m) } func (*Chunk) ProtoMessage() {} func (*Chunk) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{7} + return fileDescriptor_d938547f84707355, []int{8} } func (m *Chunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -658,7 +736,7 @@ func (m *ChunkedSeries) Reset() { *m = ChunkedSeries{} } func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) } func (*ChunkedSeries) ProtoMessage() {} func (*ChunkedSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{8} + return fileDescriptor_d938547f84707355, []int{9} } func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -707,6 +785,7 @@ func init() { proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value) proto.RegisterType((*MetricMetadata)(nil), "prometheus.MetricMetadata") proto.RegisterType((*Sample)(nil), "prometheus.Sample") + proto.RegisterType((*Exemplar)(nil), "prometheus.Exemplar") proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") proto.RegisterType((*Label)(nil), "prometheus.Label") proto.RegisterType((*Labels)(nil), "prometheus.Labels") @@ -719,51 +798,53 @@ func init() { func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } var fileDescriptor_d938547f84707355 = []byte{ - // 690 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xda, 0x40, - 0x10, 0xce, 0xfa, 0x17, 0x86, 0x04, 0x39, 0xab, 0x54, 0x75, 0xa3, 0x96, 0x22, 0x4b, 0x95, 0x38, - 0x54, 0x44, 0x49, 0x4f, 0x91, 0x7a, 0x21, 0x91, 0xf3, 0xa3, 0xc6, 0xa0, 0x2c, 0xa0, 0xfe, 0x5c, - 0xd0, 0x02, 0x1b, 0xb0, 0x8a, 0x8d, 0xe3, 0x5d, 0xaa, 0xf0, 0x20, 0xbd, 0xf5, 0x15, 0x7a, 0xe8, - 0x5b, 0xe4, 0xd8, 0x27, 0xa8, 0xaa, 0x3c, 0x49, 0xb5, 0x6b, 0x13, 0x13, 0xa5, 0x97, 0xf6, 0x36, - 0xf3, 0x7d, 0xdf, 0xfc, 0xec, 0xcc, 0xd8, 0x50, 0x11, 0xcb, 0x84, 0xf1, 0x66, 0x92, 0xce, 0xc5, - 0x1c, 0x43, 0x92, 0xce, 0x23, 0x26, 0xa6, 0x6c, 0xc1, 0x77, 0x77, 0x26, 0xf3, 0xc9, 0x5c, 0xc1, - 0x7b, 0xd2, 0xca, 0x14, 0xde, 0x37, 0x0d, 0xaa, 0x01, 0x13, 0x69, 0x38, 0x0a, 0x98, 0xa0, 0x63, - 0x2a, 0x28, 0x3e, 0x04, 0x43, 0xe6, 0x70, 0x51, 0x1d, 0x35, 0xaa, 0x07, 0xaf, 0x9a, 0x45, 0x8e, - 0xe6, 0x43, 0x65, 0xee, 0xf6, 0x96, 0x09, 0x23, 0x2a, 0x04, 0xbf, 0x06, 0x1c, 0x29, 0x6c, 0x70, - 0x45, 0xa3, 0x70, 0xb6, 0x1c, 0xc4, 0x34, 0x62, 0xae, 0x56, 0x47, 0x8d, 0x32, 0x71, 0x32, 0xe6, - 0x44, 0x11, 0x6d, 0x1a, 0x31, 0x8c, 0xc1, 0x98, 0xb2, 0x59, 0xe2, 0x1a, 0x8a, 0x57, 0xb6, 0xc4, - 0x16, 0x71, 0x28, 0x5c, 0x33, 0xc3, 0xa4, 0xed, 0x2d, 0x01, 0x8a, 0x4a, 0xb8, 0x02, 0x76, 0xbf, - 0xfd, 0xae, 0xdd, 0x79, 0xdf, 0x76, 0x36, 0xa4, 0x73, 0xdc, 0xe9, 0xb7, 0x7b, 0x3e, 0x71, 0x10, - 0x2e, 0x83, 0x79, 0xda, 0xea, 0x9f, 0xfa, 0x8e, 0x86, 0xb7, 0xa0, 0x7c, 0x76, 0xde, 0xed, 0x75, - 0x4e, 0x49, 0x2b, 0x70, 0x74, 0x8c, 0xa1, 0xaa, 0x98, 0x02, 0x33, 0x64, 0x68, 0xb7, 0x1f, 0x04, - 0x2d, 0xf2, 0xd1, 0x31, 0x71, 0x09, 0x8c, 0xf3, 0xf6, 0x49, 0xc7, 0xb1, 0xf0, 0x26, 0x94, 0xba, - 0xbd, 0x56, 0xcf, 0xef, 0xfa, 0x3d, 0xc7, 0xf6, 0xde, 0x82, 0xd5, 0xa5, 0x51, 0x32, 0x63, 0x78, - 0x07, 0xcc, 0x2f, 0x74, 0xb6, 0xc8, 0xc6, 0x82, 0x48, 0xe6, 0xe0, 0xe7, 0x50, 0x16, 0x61, 0xc4, - 0xb8, 0xa0, 0x51, 0xa2, 0xde, 0xa9, 0x93, 0x02, 0xf0, 0xae, 0x01, 0x7a, 0x61, 0xc4, 0xba, 0x2c, - 0x0d, 0x19, 0xc7, 0x7b, 0x60, 0xcd, 0xe8, 0x90, 0xcd, 0xb8, 0x8b, 0xea, 0x7a, 0xa3, 0x72, 0xb0, - 0xbd, 0x3e, 0xd9, 0x0b, 0xc9, 0x1c, 0x19, 0xb7, 0xbf, 0x5e, 0x6e, 0x90, 0x5c, 0x86, 0x0f, 0xc0, - 0xe6, 0xaa, 0x38, 0x77, 0x35, 0x15, 0x81, 0xd7, 0x23, 0xb2, 0xbe, 0xf2, 0x90, 0x95, 0xd0, 0xdb, - 0x07, 0x53, 0xa5, 0x92, 0x83, 0x54, 0xc3, 0x47, 0xd9, 0x20, 0xa5, 0x5d, 0xbc, 0x21, 0xdb, 0x48, - 0xe6, 0x78, 0x87, 0x60, 0x5d, 0x64, 0x05, 0xff, 0xb5, 0x43, 0xef, 0x2b, 0x82, 0x4d, 0x85, 0x07, - 0x54, 0x8c, 0xa6, 0x2c, 0xc5, 0xfb, 0x0f, 0x6e, 0xe7, 0xc5, 0xa3, 0xf8, 0x5c, 0xd7, 0x5c, 0xbb, - 0x99, 0x55, 0xa3, 0xda, 0xdf, 0x1a, 0xd5, 0xd7, 0x1b, 0x6d, 0x80, 0xa1, 0x2e, 0xc0, 0x02, 0xcd, - 0xbf, 0x74, 0x36, 0xb0, 0x0d, 0x7a, 0xdb, 0xbf, 0x74, 0x90, 0x04, 0x88, 0xdc, 0xba, 0x04, 0x88, - 0xef, 0xe8, 0xde, 0x0f, 0x04, 0x65, 0xc2, 0xe8, 0xf8, 0x2c, 0x8c, 0x05, 0xc7, 0x4f, 0xc1, 0xe6, - 0x82, 0x25, 0x83, 0x88, 0xab, 0xbe, 0x74, 0x62, 0x49, 0x37, 0xe0, 0xb2, 0xf4, 0xd5, 0x22, 0x1e, - 0xad, 0x4a, 0x4b, 0x1b, 0x3f, 0x83, 0x12, 0x17, 0x34, 0x15, 0x52, 0xad, 0x2b, 0xb5, 0xad, 0xfc, - 0x80, 0xe3, 0x27, 0x60, 0xb1, 0x78, 0x2c, 0x09, 0x43, 0x11, 0x26, 0x8b, 0xc7, 0x01, 0xc7, 0xbb, - 0x50, 0x9a, 0xa4, 0xf3, 0x45, 0x12, 0xc6, 0x13, 0xd7, 0xac, 0xeb, 0x8d, 0x32, 0xb9, 0xf7, 0x71, - 0x15, 0xb4, 0xe1, 0xd2, 0xb5, 0xea, 0xa8, 0x51, 0x22, 0xda, 0x70, 0x29, 0xb3, 0xa7, 0x34, 0x9e, - 0x30, 0x99, 0xc4, 0xce, 0xb2, 0x2b, 0x3f, 0xe0, 0xde, 0x77, 0x04, 0xe6, 0xf1, 0x74, 0x11, 0x7f, - 0xc6, 0x35, 0xa8, 0x44, 0x61, 0x3c, 0x90, 0x77, 0x54, 0xf4, 0x5c, 0x8e, 0xc2, 0x58, 0x1e, 0x53, - 0xc0, 0x15, 0x4f, 0x6f, 0xee, 0xf9, 0xfc, 0xec, 0x22, 0x7a, 0x93, 0xf3, 0xcd, 0x7c, 0x09, 0xba, - 0x5a, 0xc2, 0xee, 0xfa, 0x12, 0x54, 0x81, 0xa6, 0x1f, 0x8f, 0xe6, 0xe3, 0x30, 0x9e, 0x14, 0x1b, - 0x90, 0x9f, 0xb3, 0x7a, 0xd5, 0x26, 0x51, 0xb6, 0x57, 0x87, 0xd2, 0x4a, 0xf5, 0xf0, 0x8b, 0xb3, - 0x41, 0xff, 0xd0, 0x21, 0x0e, 0xf2, 0xae, 0x61, 0x4b, 0x65, 0x63, 0xe3, 0xff, 0xbd, 0xef, 0x3d, - 0xb0, 0x46, 0x32, 0xc3, 0xea, 0xbc, 0xb7, 0x1f, 0x75, 0xba, 0x0a, 0xc8, 0x64, 0x47, 0x3b, 0xb7, - 0x77, 0x35, 0xf4, 0xf3, 0xae, 0x86, 0x7e, 0xdf, 0xd5, 0xd0, 0x27, 0x4b, 0xaa, 0x93, 0xe1, 0xd0, - 0x52, 0x7f, 0xb2, 0x37, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xf3, 0xb7, 0x12, 0x44, 0xfa, 0x04, - 0x00, 0x00, + // 734 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcb, 0x6e, 0xdb, 0x46, + 0x14, 0xf5, 0xf0, 0x29, 0x5e, 0xd9, 0x02, 0x3d, 0x50, 0x51, 0xd6, 0x68, 0x55, 0x81, 0x40, 0x01, + 0x2d, 0x0a, 0x19, 0x76, 0x37, 0x35, 0xd0, 0x8d, 0x6c, 0xd0, 0x0f, 0xd4, 0x94, 0xe0, 0x91, 0x84, + 0x3e, 0x36, 0xc2, 0x48, 0x1a, 0x4b, 0x44, 0xc4, 0x47, 0x38, 0x54, 0x60, 0x7d, 0x48, 0x76, 0xf9, + 0x83, 0x20, 0x8b, 0xfc, 0x85, 0x97, 0xf9, 0x82, 0x20, 0xf0, 0x97, 0x04, 0x33, 0xa4, 0x4c, 0x29, + 0x4e, 0x16, 0xce, 0xee, 0xde, 0x7b, 0xce, 0xb9, 0x8f, 0xb9, 0x97, 0x84, 0x6a, 0xb6, 0x4a, 0x18, + 0x6f, 0x27, 0x69, 0x9c, 0xc5, 0x18, 0x92, 0x34, 0x0e, 0x59, 0x36, 0x67, 0x4b, 0x7e, 0x50, 0x9f, + 0xc5, 0xb3, 0x58, 0x86, 0x0f, 0x85, 0x95, 0x33, 0xdc, 0x37, 0x0a, 0xd4, 0x7c, 0x96, 0xa5, 0xc1, + 0xc4, 0x67, 0x19, 0x9d, 0xd2, 0x8c, 0xe2, 0x13, 0xd0, 0x44, 0x0e, 0x07, 0x35, 0x51, 0xab, 0x76, + 0xfc, 0x5b, 0xbb, 0xcc, 0xd1, 0xde, 0x66, 0x16, 0xee, 0x60, 0x95, 0x30, 0x22, 0x25, 0xf8, 0x77, + 0xc0, 0xa1, 0x8c, 0x8d, 0x6e, 0x69, 0x18, 0x2c, 0x56, 0xa3, 0x88, 0x86, 0xcc, 0x51, 0x9a, 0xa8, + 0x65, 0x11, 0x3b, 0x47, 0xce, 0x25, 0xd0, 0xa5, 0x21, 0xc3, 0x18, 0xb4, 0x39, 0x5b, 0x24, 0x8e, + 0x26, 0x71, 0x69, 0x8b, 0xd8, 0x32, 0x0a, 0x32, 0x47, 0xcf, 0x63, 0xc2, 0x76, 0x57, 0x00, 0x65, + 0x25, 0x5c, 0x05, 0x73, 0xd8, 0xfd, 0xbb, 0xdb, 0xfb, 0xa7, 0x6b, 0xef, 0x08, 0xe7, 0xac, 0x37, + 0xec, 0x0e, 0x3c, 0x62, 0x23, 0x6c, 0x81, 0x7e, 0xd1, 0x19, 0x5e, 0x78, 0xb6, 0x82, 0xf7, 0xc0, + 0xba, 0xbc, 0xea, 0x0f, 0x7a, 0x17, 0xa4, 0xe3, 0xdb, 0x2a, 0xc6, 0x50, 0x93, 0x48, 0x19, 0xd3, + 0x84, 0xb4, 0x3f, 0xf4, 0xfd, 0x0e, 0xf9, 0xcf, 0xd6, 0x71, 0x05, 0xb4, 0xab, 0xee, 0x79, 0xcf, + 0x36, 0xf0, 0x2e, 0x54, 0xfa, 0x83, 0xce, 0xc0, 0xeb, 0x7b, 0x03, 0xdb, 0x74, 0xff, 0x02, 0xa3, + 0x4f, 0xc3, 0x64, 0xc1, 0x70, 0x1d, 0xf4, 0x57, 0x74, 0xb1, 0xcc, 0x9f, 0x05, 0x91, 0xdc, 0xc1, + 0x3f, 0x83, 0x95, 0x05, 0x21, 0xe3, 0x19, 0x0d, 0x13, 0x39, 0xa7, 0x4a, 0xca, 0x80, 0x1b, 0x43, + 0xc5, 0xbb, 0x63, 0x61, 0xb2, 0xa0, 0x29, 0x3e, 0x04, 0x63, 0x41, 0xc7, 0x6c, 0xc1, 0x1d, 0xd4, + 0x54, 0x5b, 0xd5, 0xe3, 0xfd, 0xcd, 0x77, 0xbd, 0x16, 0xc8, 0xa9, 0x76, 0xff, 0xf1, 0xd7, 0x1d, + 0x52, 0xd0, 0xca, 0x82, 0xca, 0x37, 0x0b, 0xaa, 0x5f, 0x16, 0x7c, 0x8b, 0x00, 0x06, 0x41, 0xc8, + 0xfa, 0x2c, 0x0d, 0x18, 0x7f, 0x7e, 0xcd, 0x63, 0x30, 0xb9, 0x1c, 0x97, 0x3b, 0x8a, 0x54, 0xe0, + 0x4d, 0x45, 0xfe, 0x12, 0x85, 0x64, 0x4d, 0xc4, 0x7f, 0x82, 0xc5, 0x8a, 0x21, 0xb9, 0xa3, 0x4a, + 0x55, 0x7d, 0x53, 0xb5, 0x7e, 0x81, 0x42, 0x57, 0x92, 0xdd, 0x23, 0xd0, 0x65, 0x13, 0x62, 0xe9, + 0xf2, 0x50, 0x50, 0xbe, 0x74, 0x61, 0x6f, 0x8f, 0x6f, 0x15, 0xe3, 0xbb, 0x27, 0x60, 0x5c, 0xe7, + 0xad, 0x3e, 0x77, 0x36, 0xf7, 0x35, 0x82, 0x5d, 0x19, 0xf7, 0x69, 0x36, 0x99, 0xb3, 0x14, 0x1f, + 0x6d, 0xdd, 0xf9, 0x2f, 0x4f, 0xf4, 0x05, 0xaf, 0xbd, 0x71, 0xdf, 0xeb, 0x46, 0x95, 0xaf, 0x35, + 0xaa, 0x6e, 0x36, 0xda, 0x02, 0x4d, 0x5e, 0xab, 0x01, 0x8a, 0x77, 0x63, 0xef, 0x60, 0x13, 0xd4, + 0xae, 0x77, 0x63, 0x23, 0x11, 0x20, 0xe2, 0x42, 0x45, 0x80, 0x78, 0xb6, 0xea, 0xbe, 0x47, 0x60, + 0x11, 0x46, 0xa7, 0x97, 0x41, 0x94, 0x71, 0xfc, 0x23, 0x98, 0x3c, 0x63, 0xc9, 0x28, 0xe4, 0xb2, + 0x2f, 0x95, 0x18, 0xc2, 0xf5, 0xb9, 0x28, 0x7d, 0xbb, 0x8c, 0x26, 0xeb, 0xd2, 0xc2, 0xc6, 0x3f, + 0x41, 0x85, 0x67, 0x34, 0xcd, 0x04, 0x3b, 0xbf, 0x05, 0x53, 0xfa, 0x3e, 0xc7, 0x3f, 0x80, 0xc1, + 0xa2, 0xa9, 0x00, 0x34, 0x09, 0xe8, 0x2c, 0x9a, 0xfa, 0x1c, 0x1f, 0x40, 0x65, 0x96, 0xc6, 0xcb, + 0x24, 0x88, 0x66, 0x8e, 0xde, 0x54, 0x5b, 0x16, 0x79, 0xf4, 0x71, 0x0d, 0x94, 0xf1, 0xca, 0x31, + 0x9a, 0xa8, 0x55, 0x21, 0xca, 0x78, 0x25, 0xb2, 0xa7, 0x34, 0x9a, 0x31, 0x91, 0xc4, 0xcc, 0xb3, + 0x4b, 0xdf, 0xe7, 0xee, 0x3b, 0x04, 0xfa, 0xd9, 0x7c, 0x19, 0xbd, 0xc0, 0x0d, 0xa8, 0x86, 0x41, + 0x34, 0x12, 0x27, 0x58, 0xf6, 0x6c, 0x85, 0x41, 0x24, 0xce, 0xd0, 0xe7, 0x12, 0xa7, 0x77, 0x8f, + 0x78, 0xf1, 0x89, 0x84, 0xf4, 0xae, 0xc0, 0xdb, 0xc5, 0x12, 0x54, 0xb9, 0x84, 0x83, 0xcd, 0x25, + 0xc8, 0x02, 0x6d, 0x2f, 0x9a, 0xc4, 0xd3, 0x20, 0x9a, 0x95, 0x1b, 0x10, 0xbf, 0x1e, 0x39, 0xd5, + 0x2e, 0x91, 0xb6, 0xdb, 0x84, 0xca, 0x9a, 0xb5, 0xfd, 0x77, 0x30, 0x41, 0xfd, 0xb7, 0x47, 0x6c, + 0xe4, 0xbe, 0x84, 0x3d, 0x99, 0x8d, 0x4d, 0xbf, 0xf7, 0xcb, 0x38, 0x04, 0x63, 0x22, 0x32, 0xac, + 0x3f, 0x8c, 0xfd, 0x27, 0x9d, 0xae, 0x05, 0x39, 0xed, 0xb4, 0x7e, 0xff, 0xd0, 0x40, 0x1f, 0x1e, + 0x1a, 0xe8, 0xd3, 0x43, 0x03, 0xfd, 0x6f, 0x08, 0x76, 0x32, 0x1e, 0x1b, 0xf2, 0xaf, 0xfb, 0xc7, + 0xe7, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa9, 0xa3, 0x6c, 0x23, 0xa6, 0x05, 0x00, 0x00, } func (m *MetricMetadata) Marshal() (dAtA []byte, err error) { @@ -857,6 +938,58 @@ func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Exemplar) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Exemplar) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Timestamp != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x18 + } + if m.Value != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i-- + dAtA[i] = 0x11 + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *TimeSeries) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -881,6 +1014,20 @@ func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Exemplars) > 0 { + for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } if len(m.Samples) > 0 { for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { { @@ -1273,6 +1420,30 @@ func (m *Sample) Size() (n int) { return n } +func (m *Exemplar) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + if m.Value != 0 { + n += 9 + } + if m.Timestamp != 0 { + n += 1 + sovTypes(uint64(m.Timestamp)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func (m *TimeSeries) Size() (n int) { if m == nil { return 0 @@ -1291,6 +1462,12 @@ func (m *TimeSeries) Size() (n int) { n += 1 + l + sovTypes(uint64(l)) } } + if len(m.Exemplars) > 0 { + for _, e := range m.Exemplars { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1697,6 +1874,121 @@ func (m *Sample) Unmarshal(dAtA []byte) error { } return nil } +func (m *Exemplar) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Exemplar: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Exemplar: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, Label{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.Value = float64(math.Float64frombits(v)) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + m.Timestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Timestamp |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *TimeSeries) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1794,6 +2086,40 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Exemplars", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Exemplars = append(m.Exemplars, Exemplar{}) + if err := m.Exemplars[len(m.Exemplars)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTypes(dAtA[iNdEx:]) diff --git a/prompb/types.proto b/prompb/types.proto index 259a0d40d3..ee11f3a001 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -40,13 +40,27 @@ message MetricMetadata { message Sample { double value = 1; + // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. int64 timestamp = 2; } +message Exemplar { + // Optional, can be empty. + repeated Label labels = 1 [(gogoproto.nullable) = false]; + double value = 2; + // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 3; +} + // TimeSeries represents samples and labels for a single time series. message TimeSeries { + // For a timeseries to be valid, and for the samples and exemplars + // to be ingested by the remote system properly, the labels field is required. repeated Label labels = 1 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false]; + repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; } message Label { diff --git a/promql/test.go b/promql/test.go index a60b408907..acb1eb5ca6 100644 --- a/promql/test.go +++ b/promql/test.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql/parser" @@ -274,16 +275,18 @@ func (*evalCmd) testCmd() {} // loadCmd is a command that loads sequences of sample values for specific // metrics into the storage. type loadCmd struct { - gap time.Duration - metrics map[uint64]labels.Labels - defs map[uint64][]Point + gap time.Duration + metrics map[uint64]labels.Labels + defs map[uint64][]Point + exemplars map[uint64][]exemplar.Exemplar } func newLoadCmd(gap time.Duration) *loadCmd { return &loadCmd{ - gap: gap, - metrics: map[uint64]labels.Labels{}, - defs: map[uint64][]Point{}, + gap: gap, + metrics: map[uint64]labels.Labels{}, + defs: map[uint64][]Point{}, + exemplars: map[uint64][]exemplar.Exemplar{}, } } diff --git a/scrape/scrape.go b/scrape/scrape.go index 0985d2f465..20600a1e05 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1497,7 +1497,6 @@ func yoloString(b []byte) string { // Adds samples to the appender, checking the error, and then returns the # of samples added, // whether the caller should continue to process more samples, and any sample limit errors. - func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) { switch errors.Cause(err) { case nil: diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 9683b759bf..1c152d0208 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -10,8 +10,8 @@ if ! [[ "$0" =~ "scripts/genproto.sh" ]]; then exit 255 fi -if ! [[ $(protoc --version) =~ "3.12.3" ]]; then - echo "could not find protoc 3.12.3, is it installed + in PATH?" +if ! [[ $(protoc --version) =~ "3.15.8" ]]; then + echo "could not find protoc 3.15.8, is it installed + in PATH?" exit 255 fi diff --git a/storage/interface.go b/storage/interface.go index 5ddc300b34..6f65cf069e 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -16,6 +16,7 @@ package storage import ( "context" "errors" + "fmt" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" @@ -30,6 +31,8 @@ var ( ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp") ErrOutOfBounds = errors.New("out of bounds") ErrOutOfOrderExemplar = errors.New("out of order exemplar") + ErrDuplicateExemplar = errors.New("duplicate exemplar") + ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ) // Appendable allows creating appenders. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a3eb35e463..1dea309c34 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -52,25 +52,30 @@ const ( type queueManagerMetrics struct { reg prometheus.Registerer - samplesTotal prometheus.Counter - metadataTotal prometheus.Counter - failedSamplesTotal prometheus.Counter - failedMetadataTotal prometheus.Counter - retriedSamplesTotal prometheus.Counter - retriedMetadataTotal prometheus.Counter - droppedSamplesTotal prometheus.Counter - enqueueRetriesTotal prometheus.Counter - sentBatchDuration prometheus.Histogram - highestSentTimestamp *maxTimestamp - pendingSamples prometheus.Gauge - shardCapacity prometheus.Gauge - numShards prometheus.Gauge - maxNumShards prometheus.Gauge - minNumShards prometheus.Gauge - desiredNumShards prometheus.Gauge - samplesBytesTotal prometheus.Counter - metadataBytesTotal prometheus.Counter - maxSamplesPerSend prometheus.Gauge + samplesTotal prometheus.Counter + exemplarsTotal prometheus.Counter + metadataTotal prometheus.Counter + failedSamplesTotal prometheus.Counter + failedExemplarsTotal prometheus.Counter + failedMetadataTotal prometheus.Counter + retriedSamplesTotal prometheus.Counter + retriedExemplarsTotal prometheus.Counter + retriedMetadataTotal prometheus.Counter + droppedSamplesTotal prometheus.Counter + droppedExemplarsTotal prometheus.Counter + enqueueRetriesTotal prometheus.Counter + sentBatchDuration prometheus.Histogram + highestSentTimestamp *maxTimestamp + pendingSamples prometheus.Gauge + pendingExemplars prometheus.Gauge + shardCapacity prometheus.Gauge + numShards prometheus.Gauge + maxNumShards prometheus.Gauge + minNumShards prometheus.Gauge + desiredNumShards prometheus.Gauge + sentBytesTotal prometheus.Counter + metadataBytesTotal prometheus.Counter + maxSamplesPerSend prometheus.Gauge } func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics { @@ -89,6 +94,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of samples sent to remote storage.", ConstLabels: constLabels, }) + m.exemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exemplars_total", + Help: "Total number of exemplars sent to remote storage.", + ConstLabels: constLabels, + }) m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -103,6 +115,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", ConstLabels: constLabels, }) + m.failedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exemplars_failed_total", + Help: "Total number of exemplars which failed on send to remote storage, non-recoverable errors.", + ConstLabels: constLabels, + }) m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -117,6 +136,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", ConstLabels: constLabels, }) + m.retriedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exemplars_retried_total", + Help: "Total number of exemplars which failed on send to remote storage but were retried because the send error was recoverable.", + ConstLabels: constLabels, + }) m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -128,7 +154,14 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Namespace: namespace, Subsystem: subsystem, Name: "samples_dropped_total", - Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", + Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + ConstLabels: constLabels, + }) + m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exemplars_dropped_total", + Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, }) m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ @@ -162,6 +195,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "The number of samples pending in the queues shards to be sent to the remote storage.", ConstLabels: constLabels, }) + m.pendingExemplars = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exemplars_pending", + Help: "The number of exemplars pending in the queues shards to be sent to the remote storage.", + ConstLabels: constLabels, + }) m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -197,11 +237,11 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.", ConstLabels: constLabels, }) - m.samplesBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + m.sentBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "samples_bytes_total", - Help: "The total number of bytes of samples sent by the queue after compression.", + Name: "bytes_total", + Help: "The total number of bytes of data (not metadata) sent by the queue after compression. Note that when exemplars over remote write is enabled the exemplars included in a remote write request count towards this metric.", ConstLabels: constLabels, }) m.metadataBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{ @@ -215,7 +255,7 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Namespace: namespace, Subsystem: subsystem, Name: "max_samples_per_send", - Help: "The maximum number of samples to be sent, in a single request, to the remote storage.", + Help: "The maximum number of samples to be sent, in a single request, to the remote storage. Note that, when sending of exemplars over remote write is enabled, exemplars count towards this limt.", ConstLabels: constLabels, }) @@ -226,22 +266,27 @@ func (m *queueManagerMetrics) register() { if m.reg != nil { m.reg.MustRegister( m.samplesTotal, + m.exemplarsTotal, m.metadataTotal, m.failedSamplesTotal, + m.failedExemplarsTotal, m.failedMetadataTotal, m.retriedSamplesTotal, + m.retriedExemplarsTotal, m.retriedMetadataTotal, m.droppedSamplesTotal, + m.droppedExemplarsTotal, m.enqueueRetriesTotal, m.sentBatchDuration, m.highestSentTimestamp, m.pendingSamples, + m.pendingExemplars, m.shardCapacity, m.numShards, m.maxNumShards, m.minNumShards, m.desiredNumShards, - m.samplesBytesTotal, + m.sentBytesTotal, m.metadataBytesTotal, m.maxSamplesPerSend, ) @@ -251,22 +296,27 @@ func (m *queueManagerMetrics) register() { func (m *queueManagerMetrics) unregister() { if m.reg != nil { m.reg.Unregister(m.samplesTotal) + m.reg.Unregister(m.exemplarsTotal) m.reg.Unregister(m.metadataTotal) m.reg.Unregister(m.failedSamplesTotal) + m.reg.Unregister(m.failedExemplarsTotal) m.reg.Unregister(m.failedMetadataTotal) m.reg.Unregister(m.retriedSamplesTotal) + m.reg.Unregister(m.retriedExemplarsTotal) m.reg.Unregister(m.retriedMetadataTotal) m.reg.Unregister(m.droppedSamplesTotal) + m.reg.Unregister(m.droppedExemplarsTotal) m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.sentBatchDuration) m.reg.Unregister(m.highestSentTimestamp) m.reg.Unregister(m.pendingSamples) + m.reg.Unregister(m.pendingExemplars) m.reg.Unregister(m.shardCapacity) m.reg.Unregister(m.numShards) m.reg.Unregister(m.maxNumShards) m.reg.Unregister(m.minNumShards) m.reg.Unregister(m.desiredNumShards) - m.reg.Unregister(m.samplesBytesTotal) + m.reg.Unregister(m.sentBytesTotal) m.reg.Unregister(m.metadataBytesTotal) m.reg.Unregister(m.maxSamplesPerSend) } @@ -295,6 +345,7 @@ type QueueManager struct { mcfg config.MetadataConfig externalLabels labels.Labels relabelConfigs []*relabel.Config + sendExemplars bool watcher *wal.Watcher metadataWatcher *MetadataWatcher @@ -312,7 +363,7 @@ type QueueManager struct { quit chan struct{} wg sync.WaitGroup - samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate + dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate metrics *queueManagerMetrics interner *pool @@ -336,6 +387,7 @@ func NewQueueManager( interner *pool, highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, + enableExemplarRemoteWrite bool, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -350,6 +402,7 @@ func NewQueueManager( externalLabels: externalLabels, relabelConfigs: relabelConfigs, storeClient: client, + sendExemplars: enableExemplarRemoteWrite, seriesLabels: make(map[uint64]labels.Labels), seriesSegmentIndexes: make(map[uint64]int), @@ -359,17 +412,17 @@ func NewQueueManager( reshardChan: make(chan int), quit: make(chan struct{}), - samplesIn: samplesIn, - samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration), - samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), - samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), + dataIn: samplesIn, + dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration), + dataOut: newEWMARate(ewmaWeight, shardUpdateDuration), + dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), metrics: metrics, interner: interner, highestRecvTimestamp: highestRecvTimestamp, } - t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) + t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir, enableExemplarRemoteWrite) if t.mcfg.Send { t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } @@ -444,13 +497,14 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(samples []record.RefSample) bool { + var appendSample prompb.Sample outer: for _, s := range samples { t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { t.metrics.droppedSamplesTotal.Inc() - t.samplesDropped.incr(1) + t.dataDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) } @@ -466,12 +520,56 @@ outer: return false default: } + appendSample.Value = s.V + appendSample.Timestamp = s.T + if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) { + continue outer + } - if t.shards.enqueue(s.Ref, sample{ - labels: lbls, - t: s.T, - v: s.V, - }) { + t.metrics.enqueueRetriesTotal.Inc() + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + if backoff > t.cfg.MaxBackoff { + backoff = t.cfg.MaxBackoff + } + } + } + return true +} + +func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool { + if !t.sendExemplars { + return true + } + + var appendExemplar prompb.Exemplar +outer: + for _, e := range exemplars { + t.seriesMtx.Lock() + lbls, ok := t.seriesLabels[e.Ref] + if !ok { + t.metrics.droppedExemplarsTotal.Inc() + // Track dropped exemplars in the same EWMA for sharding calc. + t.dataDropped.incr(1) + if _, ok := t.droppedSeries[e.Ref]; !ok { + level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) + } + t.seriesMtx.Unlock() + continue + } + t.seriesMtx.Unlock() + // This will only loop if the queues are being resharded. + backoff := t.cfg.MinBackoff + for { + select { + case <-t.quit: + return false + default: + } + appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil) + appendExemplar.Timestamp = e.T + appendExemplar.Value = e.V + if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) { continue outer } @@ -687,27 +785,27 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool { // outlined in this functions implementation. It is up to the caller to reshard, or not, // based on the return value. func (t *QueueManager) calculateDesiredShards() int { - t.samplesOut.tick() - t.samplesDropped.tick() - t.samplesOutDuration.tick() + t.dataOut.tick() + t.dataDropped.tick() + t.dataOutDuration.tick() // We use the number of incoming samples as a prediction of how much work we // will need to do next iteration. We add to this any pending samples // (received - send) so we can catch up with any backlog. We use the average // outgoing batch latency to work out how many shards we need. var ( - samplesInRate = t.samplesIn.rate() - samplesOutRate = t.samplesOut.rate() - samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate) - samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) - samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate - highestSent = t.metrics.highestSentTimestamp.Get() - highestRecv = t.highestRecvTimestamp.Get() - delay = highestRecv - highestSent - samplesPending = delay * samplesInRate * samplesKeptRatio + dataInRate = t.dataIn.rate() + dataOutRate = t.dataOut.rate() + dataKeptRatio = dataOutRate / (t.dataDropped.rate() + dataOutRate) + dataOutDuration = t.dataOutDuration.rate() / float64(time.Second) + dataPendingRate = dataInRate*dataKeptRatio - dataOutRate + highestSent = t.metrics.highestSentTimestamp.Get() + highestRecv = t.highestRecvTimestamp.Get() + delay = highestRecv - highestSent + dataPending = delay * dataInRate * dataKeptRatio ) - if samplesOutRate <= 0 { + if dataOutRate <= 0 { return t.numShards } @@ -717,17 +815,17 @@ func (t *QueueManager) calculateDesiredShards() int { const integralGain = 0.1 / float64(shardUpdateDuration/time.Second) var ( - timePerSample = samplesOutDuration / samplesOutRate - desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) + timePerSample = dataOutDuration / dataOutRate + desiredShards = timePerSample * (dataInRate*dataKeptRatio + integralGain*dataPending) ) t.metrics.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", - "samplesInRate", samplesInRate, - "samplesOutRate", samplesOutRate, - "samplesKeptRatio", samplesKeptRatio, - "samplesPendingRate", samplesPendingRate, - "samplesPending", samplesPending, - "samplesOutDuration", samplesOutDuration, + "dataInRate", dataInRate, + "dataOutRate", dataOutRate, + "dataKeptRatio", dataKeptRatio, + "dataPendingRate", dataPendingRate, + "dataPending", dataPending, + "dataOutDuration", dataOutDuration, "timePerSample", timePerSample, "desiredShards", desiredShards, "highestSent", highestSent, @@ -785,17 +883,24 @@ func (t *QueueManager) newShards() *shards { return s } -type sample struct { - labels labels.Labels - t int64 - v float64 +type writeSample struct { + seriesLabels labels.Labels + sample prompb.Sample +} + +type writeExemplar struct { + seriesLabels labels.Labels + exemplar prompb.Exemplar } type shards struct { mtx sync.RWMutex // With the WAL, this is never actually contended. qm *QueueManager - queues []chan sample + queues []chan interface{} + // So we can accurately track how many of each are lost during shard shutdowns. + enqueuedSamples atomic.Int64 + enqueuedExemplars atomic.Int64 // Emulate a wait group with a channel and an atomic int, as you // cannot select on a wait group. @@ -807,8 +912,9 @@ type shards struct { // Hard shutdown context is used to terminate outgoing HTTP connections // after giving them a chance to terminate. - hardShutdown context.CancelFunc - droppedOnHardShutdown atomic.Uint32 + hardShutdown context.CancelFunc + samplesDroppedOnHardShutdown atomic.Uint32 + exemplarsDroppedOnHardShutdown atomic.Uint32 } // start the shards; must be called before any call to enqueue. @@ -819,9 +925,9 @@ func (s *shards) start(n int) { s.qm.metrics.pendingSamples.Set(0) s.qm.metrics.numShards.Set(float64(n)) - newQueues := make([]chan sample, n) + newQueues := make([]chan interface{}, n) for i := 0; i < n; i++ { - newQueues[i] = make(chan sample, s.qm.cfg.Capacity) + newQueues[i] = make(chan interface{}, s.qm.cfg.Capacity) } s.queues = newQueues @@ -831,7 +937,8 @@ func (s *shards) start(n int) { s.softShutdown = make(chan struct{}) s.running.Store(int32(n)) s.done = make(chan struct{}) - s.droppedOnHardShutdown.Store(0) + s.samplesDroppedOnHardShutdown.Store(0) + s.exemplarsDroppedOnHardShutdown.Store(0) for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } @@ -864,14 +971,17 @@ func (s *shards) stop() { // Force an unclean shutdown. s.hardShutdown() <-s.done - if dropped := s.droppedOnHardShutdown.Load(); dropped > 0 { + if dropped := s.samplesDroppedOnHardShutdown.Load(); dropped > 0 { level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped) } + if dropped := s.exemplarsDroppedOnHardShutdown.Load(); dropped > 0 { + level.Error(s.qm.logger).Log("msg", "Failed to flush all exemplars on shutdown", "count", dropped) + } } -// enqueue a sample. If we are currently in the process of shutting down or resharding, +// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, // will return false; in this case, you should back off and retry. -func (s *shards) enqueue(ref uint64, sample sample) bool { +func (s *shards) enqueue(ref uint64, data interface{}) bool { s.mtx.RLock() defer s.mtx.RUnlock() @@ -885,13 +995,22 @@ func (s *shards) enqueue(ref uint64, sample sample) bool { select { case <-s.softShutdown: return false - case s.queues[shard] <- sample: - s.qm.metrics.pendingSamples.Inc() + case s.queues[shard] <- data: + switch data.(type) { + case writeSample: + s.qm.metrics.pendingSamples.Inc() + s.enqueuedSamples.Inc() + case writeExemplar: + s.qm.metrics.pendingExemplars.Inc() + s.enqueuedExemplars.Inc() + default: + level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue") + } return true } } -func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { +func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}) { defer func() { if s.running.Dec() == 0 { close(s.done) @@ -901,14 +1020,26 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { shardNum := strconv.Itoa(shardID) // Send batches of at most MaxSamplesPerSend samples to the remote storage. - // If we have fewer samples than that, flush them out after a deadline - // anyways. + // If we have fewer samples than that, flush them out after a deadline anyways. var ( - max = s.qm.cfg.MaxSamplesPerSend - nPending = 0 - pendingSamples = allocateTimeSeries(max) + max = s.qm.cfg.MaxSamplesPerSend + // Rough estimate, 1% of active series will contain an exemplar on each scrape. + // TODO(cstyan): Casting this many times smells, also we could get index out of bounds issues here. + maxExemplars = int(math.Max(1, float64(max/10))) + nPending, nPendingSamples, nPendingExemplars = 0, 0, 0 + sampleBuffer = allocateSampleBuffer(max) + buf []byte + pendingData []prompb.TimeSeries + exemplarBuffer [][]prompb.Exemplar ) + totalPending := max + if s.qm.sendExemplars { + exemplarBuffer = allocateExemplarBuffer(maxExemplars) + totalPending += maxExemplars + } + + pendingData = make([]prompb.TimeSeries, totalPending) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { @@ -926,18 +1057,23 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { case <-ctx.Done(): // In this case we drop all samples in the buffer and the queue. // Remove them from pending and mark them as failed. - droppedSamples := nPending + len(queue) + droppedSamples := nPendingSamples + int(s.enqueuedSamples.Load()) + droppedExemplars := nPendingExemplars + int(s.enqueuedExemplars.Load()) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) + s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) - s.droppedOnHardShutdown.Add(uint32(droppedSamples)) + s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars)) + s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples)) + s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars)) return case sample, ok := <-queue: if !ok { - if nPending > 0 { - level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending) - s.sendSamples(ctx, pendingSamples[:nPending], &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPending)) + if nPendingSamples > 0 || nPendingExemplars > 0 { + level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars) + s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) + s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) + s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) level.Debug(s.qm.logger).Log("msg", "Done flushing.") } return @@ -946,25 +1082,44 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. - pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels) - pendingSamples[nPending].Samples[0].Timestamp = sample.t - pendingSamples[nPending].Samples[0].Value = sample.v - nPending++ + switch d := sample.(type) { + case writeSample: + sampleBuffer[nPendingSamples][0] = d.sample + pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Samples = sampleBuffer[nPendingSamples] + pendingData[nPending].Exemplars = nil + nPendingSamples++ + nPending++ - if nPending >= max { - s.sendSamples(ctx, pendingSamples, &buf) + case writeExemplar: + exemplarBuffer[nPendingExemplars][0] = d.exemplar + pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Samples = nil + pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars] + nPendingExemplars++ + nPending++ + } + + if nPendingSamples >= max || nPendingExemplars >= maxExemplars { + s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) + s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) + s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) + nPendingSamples = 0 + nPendingExemplars = 0 nPending = 0 - s.qm.metrics.pendingSamples.Sub(float64(max)) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) } case <-timer.C: - if nPending > 0 { - level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum) - s.sendSamples(ctx, pendingSamples[:nPending], &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPending)) + if nPendingSamples > 0 || nPendingExemplars > 0 { + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) + s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) + s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) + s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) + nPendingSamples = 0 + nPendingExemplars = 0 nPending = 0 } timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -972,23 +1127,24 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { } } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, buf) + err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, buf) if err != nil { - level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) - s.qm.metrics.failedSamplesTotal.Add(float64(len(samples))) + level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) + s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) + s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) } // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure. - s.qm.samplesOut.incr(int64(len(samples))) - s.qm.samplesOutDuration.incr(int64(time.Since(begin))) + s.qm.dataOut.incr(int64(len(samples))) + s.qm.dataOutDuration.incr(int64(time.Since(begin))) s.qm.lastSendTimestamp.Store(time.Now().Unix()) } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) error { // Build the WriteRequest with no metadata. req, highest, err := buildWriteRequest(samples, nil, *buf) if err != nil { @@ -998,7 +1154,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } reqSize := len(*buf) - sampleCount := len(samples) *buf = req // An anonymous function allows us to defer the completion of our per-try spans @@ -1009,6 +1164,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti defer span.Finish() span.SetTag("samples", sampleCount) + if exemplarCount > 0 { + span.SetTag("exemplars", exemplarCount) + } span.SetTag("request_size", reqSize) span.SetTag("try", try) span.SetTag("remote_name", s.qm.storeClient.Name()) @@ -1016,6 +1174,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti begin := time.Now() s.qm.metrics.samplesTotal.Add(float64(sampleCount)) + s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) err := s.qm.client().Store(ctx, *buf) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) @@ -1030,13 +1189,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti onRetry := func() { s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) + s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) } err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) if err != nil { return err } - s.qm.metrics.samplesBytesTotal.Add(float64(reqSize)) + s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) return nil } @@ -1096,10 +1256,13 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, buf []byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { - // At the moment we only ever append a TimeSeries with a single sample in it. - if ts.Samples[0].Timestamp > highest { + // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. + if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { highest = ts.Samples[0].Timestamp } + if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { + highest = ts.Exemplars[0].Timestamp + } } req := &prompb.WriteRequest{ @@ -1121,11 +1284,18 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta return compressed, highest, nil } -func allocateTimeSeries(capacity int) []prompb.TimeSeries { - timeseries := make([]prompb.TimeSeries, capacity) - // We only ever send one sample per timeseries, so preallocate with length one. - for i := range timeseries { - timeseries[i].Samples = []prompb.Sample{{}} +func allocateSampleBuffer(capacity int) [][]prompb.Sample { + buf := make([][]prompb.Sample, capacity) + for i := range buf { + buf[i] = []prompb.Sample{{}} } - return timeseries + return buf +} + +func allocateExemplarBuffer(capacity int) [][]prompb.Exemplar { + buf := make([][]prompb.Exemplar, capacity) + for i := range buf { + buf[i] = []prompb.Exemplar{{}} + } + return buf } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 0ac594ffab..4d8b40e98d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -60,21 +60,22 @@ func newHighestTimestampMetric() *maxTimestamp { } func TestSampleDelivery(t *testing.T) { + + testcases := []struct { + name string + samples bool + exemplars bool + }{ + {samples: true, exemplars: false, name: "samples only"}, + {samples: true, exemplars: true, name: "both samples and exemplars"}, + {samples: false, exemplars: true, name: "exemplars only"}, + } + // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 - samples, series := createTimeseries(n, n) + n := 3 - c := NewTestWriteClient() - c.expectSamples(samples[:len(samples)/2], series) - - queueConfig := config.DefaultQueueConfig - queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) - queueConfig.MaxShards = 1 - queueConfig.Capacity = len(samples) - queueConfig.MaxSamplesPerSend = len(samples) / 2 - - dir, err := ioutil.TempDir("", "TestSampleDeliver") + dir, err := ioutil.TempDir("", "TestSampleDelivery") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) @@ -83,13 +84,11 @@ func TestSampleDelivery(t *testing.T) { s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) defer s.Close() + queueConfig := config.DefaultQueueConfig + queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) + queueConfig.MaxShards = 1 + writeConfig := config.DefaultRemoteWriteConfig - conf := &config.Config{ - GlobalConfig: config.DefaultGlobalConfig, - RemoteWriteConfigs: []*config.RemoteWriteConfig{ - &writeConfig, - }, - } // We need to set URL's so that metric creation doesn't panic. writeConfig.URL = &common_config.URL{ URL: &url.URL{ @@ -97,19 +96,60 @@ func TestSampleDelivery(t *testing.T) { }, } writeConfig.QueueConfig = queueConfig - require.NoError(t, s.ApplyConfig(conf)) - hash, err := toHash(writeConfig) - require.NoError(t, err) - qm := s.rws.queues[hash] - qm.SetClient(c) + writeConfig.SendExemplars = true - qm.StoreSeries(series, 0) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &writeConfig, + }, + } - qm.Append(samples[:len(samples)/2]) - c.waitForExpectedSamples(t) - c.expectSamples(samples[len(samples)/2:], series) - qm.Append(samples[len(samples)/2:]) - c.waitForExpectedSamples(t) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + var ( + series []record.RefSeries + samples []record.RefSample + exemplars []record.RefExemplar + ) + + // Generates same series in both cases. + if tc.samples { + samples, series = createTimeseries(n, n) + } + if tc.exemplars { + exemplars, series = createExemplars(n, n) + } + + // Apply new config. + queueConfig.Capacity = len(samples) + queueConfig.MaxSamplesPerSend = len(samples) / 2 + require.NoError(t, s.ApplyConfig(conf)) + hash, err := toHash(writeConfig) + require.NoError(t, err) + qm := s.rws.queues[hash] + + c := NewTestWriteClient() + qm.SetClient(c) + + qm.StoreSeries(series, 0) + + // Send first half of data. + c.expectSamples(samples[:len(samples)/2], series) + c.expectExemplars(exemplars[:len(exemplars)/2], series) + qm.Append(samples[:len(samples)/2]) + qm.AppendExemplars(exemplars[:len(exemplars)/2]) + c.waitForExpectedData(t) + + // Send second half of data. + c.expectSamples(samples[len(samples)/2:], series) + c.expectExemplars(exemplars[len(exemplars)/2:], series) + qm.Append(samples[len(samples)/2:]) + qm.AppendExemplars(exemplars[len(exemplars)/2:]) + c.waitForExpectedData(t) + }) + } } func TestMetadataDelivery(t *testing.T) { @@ -123,7 +163,7 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.Start() defer m.Stop() @@ -157,7 +197,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -165,11 +205,11 @@ func TestSampleDeliveryTimeout(t *testing.T) { // Send the samples twice, waiting for the samples in the meantime. c.expectSamples(samples, series) m.Append(samples) - c.waitForExpectedSamples(t) + c.waitForExpectedData(t) c.expectSamples(samples, series) m.Append(samples) - c.waitForExpectedSamples(t) + c.waitForExpectedData(t) } func TestSampleDeliveryOrder(t *testing.T) { @@ -203,14 +243,14 @@ func TestSampleDeliveryOrder(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() // These should be received by the client. m.Append(samples) - c.waitForExpectedSamples(t) + c.waitForExpectedData(t) } func TestShutdown(t *testing.T) { @@ -227,7 +267,7 @@ func TestShutdown(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -269,7 +309,7 @@ func TestSeriesReset(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -302,7 +342,7 @@ func TestReshard(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) m.Start() @@ -322,7 +362,7 @@ func TestReshard(t *testing.T) { time.Sleep(100 * time.Millisecond) } - c.waitForExpectedSamples(t) + c.waitForExpectedData(t) } func TestReshardRaceWithStop(t *testing.T) { @@ -337,7 +377,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.Start() h.Unlock() h.Lock() @@ -357,7 +397,7 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.Start() for i := 1; i < 1000; i++ { @@ -408,10 +448,10 @@ func TestShouldReshard(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.numShards = c.startingShards - m.samplesIn.incr(c.samplesIn) - m.samplesOut.incr(c.samplesOut) + m.dataIn.incr(c.samplesIn) + m.dataOut.incr(c.samplesOut) m.lastSendTimestamp.Store(c.lastSendTimestamp) m.Start() @@ -436,7 +476,6 @@ func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.R T: int64(j), V: float64(i), }) - } series = append(series, record.RefSeries{ Ref: uint64(i), @@ -446,6 +485,28 @@ func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.R return samples, series } +func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { + exemplars := make([]record.RefExemplar, 0, numExemplars) + series := make([]record.RefSeries, 0, numSeries) + for i := 0; i < numSeries; i++ { + name := fmt.Sprintf("test_metric_%d", i) + for j := 0; j < numExemplars; j++ { + e := record.RefExemplar{ + Ref: uint64(i), + T: int64(j), + V: float64(i), + Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), + } + exemplars = append(exemplars, e) + } + series = append(series, record.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{{Name: "__name__", Value: name}}, + }) + } + return exemplars, series +} + func getSeriesNameFromRef(r record.RefSeries) string { for _, l := range r.Labels { if l.Name == "__name__" { @@ -456,13 +517,15 @@ func getSeriesNameFromRef(r record.RefSeries) string { } type TestWriteClient struct { - receivedSamples map[string][]prompb.Sample - expectedSamples map[string][]prompb.Sample - receivedMetadata map[string][]prompb.MetricMetadata - withWaitGroup bool - wg sync.WaitGroup - mtx sync.Mutex - buf []byte + receivedSamples map[string][]prompb.Sample + expectedSamples map[string][]prompb.Sample + receivedExemplars map[string][]prompb.Exemplar + expectedExemplars map[string][]prompb.Exemplar + receivedMetadata map[string][]prompb.MetricMetadata + withWaitGroup bool + wg sync.WaitGroup + mtx sync.Mutex + buf []byte } func NewTestWriteClient() *TestWriteClient { @@ -494,7 +557,29 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R c.wg.Add(len(ss)) } -func (c *TestWriteClient) waitForExpectedSamples(tb testing.TB) { +func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) { + if !c.withWaitGroup { + return + } + c.mtx.Lock() + defer c.mtx.Unlock() + + c.expectedExemplars = map[string][]prompb.Exemplar{} + c.receivedExemplars = map[string][]prompb.Exemplar{} + + for _, s := range ss { + seriesName := getSeriesNameFromRef(series[s.Ref]) + e := prompb.Exemplar{ + Labels: labelsToLabelsProto(s.Labels, nil), + Timestamp: s.T, + Value: s.V, + } + c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e) + } + c.wg.Add(len(ss)) +} + +func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { if !c.withWaitGroup { return } @@ -504,9 +589,12 @@ func (c *TestWriteClient) waitForExpectedSamples(tb testing.TB) { for ts, expectedSamples := range c.expectedSamples { require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts) } + for ts, expectedExemplar := range c.expectedExemplars { + require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts) + } } -func (c *TestWriteClient) expectSampleCount(numSamples int) { +func (c *TestWriteClient) expectDataCount(numSamples int) { if !c.withWaitGroup { return } @@ -515,7 +603,7 @@ func (c *TestWriteClient) expectSampleCount(numSamples int) { c.wg.Add(numSamples) } -func (c *TestWriteClient) waitForExpectedSampleCount() { +func (c *TestWriteClient) waitForExpectedDataCount() { if !c.withWaitGroup { return } @@ -553,6 +641,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error { count++ c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) } + + for _, ex := range ts.Exemplars { + count++ + c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) + } } if c.withWaitGroup { c.wg.Add(-count) @@ -621,7 +714,7 @@ func BenchmarkSampleDelivery(b *testing.B) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) // These should be received by the client. @@ -630,9 +723,9 @@ func BenchmarkSampleDelivery(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.expectSampleCount(len(samples)) + c.expectDataCount(len(samples)) m.Append(samples) - c.waitForExpectedSampleCount() + c.waitForExpectedDataCount() } // Do not include shutdown b.StopTimer() @@ -667,7 +760,7 @@ func BenchmarkStartup(b *testing.B) { c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil) + cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -719,7 +812,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -745,8 +838,8 @@ func TestCalculateDesiredShards(t *testing.T) { // helper function for sending samples. sendSamples := func(s int64, ts time.Duration) { pendingSamples -= s - m.samplesOut.incr(s) - m.samplesOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration)) + m.dataOut.incr(s) + m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration)) // highest sent is how far back pending samples would be at our input rate. highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) diff --git a/storage/remote/write.go b/storage/remote/write.go index a9270630fb..94d96295ee 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -37,6 +37,12 @@ var ( Name: "samples_in_total", Help: "Samples in to remote storage, compare to samples out for queue managers.", }) + exemplarsIn = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exemplars_in_total", + Help: "Exemplars in to remote storage, compare to exemplars out for queue managers.", + }) ) // WriteStorage represents all the remote write storage. @@ -169,6 +175,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.interner, rws.highestTimestamp, rws.scraper, + rwConf.SendExemplars, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) @@ -210,6 +217,7 @@ func (rws *WriteStorage) Close() error { type timestampTracker struct { writeStorage *WriteStorage samples int64 + exemplars int64 highestTimestamp int64 highestRecvTimestamp *maxTimestamp } @@ -224,14 +232,16 @@ func (t *timestampTracker) Append(_ uint64, _ labels.Labels, ts int64, _ float64 } func (t *timestampTracker) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) { + t.exemplars++ return 0, nil } // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { - t.writeStorage.samplesIn.incr(t.samples) + t.writeStorage.samplesIn.incr(t.samples + t.exemplars) samplesIn.Add(float64(t.samples)) + exemplarsIn.Add(float64(t.exemplars)) t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000)) return nil } diff --git a/tsdb/docs/format/wal.md b/tsdb/docs/format/wal.md index af2ec61426..2d55c441b0 100644 --- a/tsdb/docs/format/wal.md +++ b/tsdb/docs/format/wal.md @@ -86,3 +86,35 @@ and specify an interval for which samples of a series got deleted. │ . . . │ └─────────────────────────────────────────────────────┘ ``` + +### Exemplar records + +Exemplar records encode exemplars as a list of triples `(series_id, timestamp, value)` +plus the length of the labels list, and all the labels. +The first row stores the starting id and the starting timestamp. +Series reference and timestamp are encoded as deltas w.r.t the first exemplar. +The first exemplar record begins at the second row. + +See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ type = 5 <1b> │ +├──────────────────────────────────────────────────────────────────┤ +│ ┌────────────────────┬───────────────────────────┐ │ +│ │ id <8b> │ timestamp <8b> │ │ +│ └────────────────────┴───────────────────────────┘ │ +│ ┌────────────────────┬───────────────────────────┬─────────────┐ │ +│ │ id_delta │ timestamp_delta │ value <8b> │ │ +│ ├────────────────────┴───────────────────────────┴─────────────┤ │ +│ │ n = len(labels) │ │ +│ ├──────────────────────┬───────────────────────────────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴───────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├───────────────────────┬──────────────────────────────────────┤ │ +│ │ len(str_2n) │ str_2n │ │ │ +│ └───────────────────────┴────────────────┴─────────────────────┘ │ +│ . . . │ +└──────────────────────────────────────────────────────────────────┘ +``` diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index a7d3182acd..6332c1fe3e 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -17,6 +17,7 @@ import ( "context" "sort" "sync" + "unicode/utf8" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/exemplar" @@ -83,7 +84,7 @@ func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarSto }), outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", - Help: "Total number of out of order exemplar ingestion failed attempts", + Help: "Total number of out of order exemplar ingestion failed attempts.", }), } if reg != nil { @@ -165,6 +166,51 @@ Outer: return false } +func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error { + seriesLabels := l.String() + + // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. + // Optimize by moving the lock to be per series (& benchmark it). + ce.lock.RLock() + defer ce.lock.RUnlock() + return ce.validateExemplar(seriesLabels, e, false) +} + +// Not thread safe. The append parameters tells us whether this is an external validation, or interal +// as a reuslt of an AddExemplar call, in which case we should update any relevant metrics. +func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error { + idx, ok := ce.index[l] + if !ok { + return nil + } + + // Exemplar label length does not include chars involved in text rendering such as quotes + // equals sign, or commas. See definiton of const ExemplarMaxLabelLength. + labelSetLen := 0 + for _, l := range e.Labels { + labelSetLen += utf8.RuneCountInString(l.Name) + labelSetLen += utf8.RuneCountInString(l.Value) + + if labelSetLen > exemplar.ExemplarMaxLabelSetLength { + return storage.ErrExemplarLabelLength + } + } + + // Check for duplicate vs last stored exemplar for this series. + // NB these are expected, and appending them is a no-op. + if ce.exemplars[idx.newest].exemplar.Equals(e) { + return storage.ErrDuplicateExemplar + } + + if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts { + if append { + ce.outOfOrderExemplars.Inc() + } + return storage.ErrOutOfOrderExemplar + } + return nil +} + func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { seriesLabels := l.String() @@ -173,21 +219,19 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp ce.lock.Lock() defer ce.lock.Unlock() - idx, ok := ce.index[seriesLabels] + err := ce.validateExemplar(seriesLabels, e, true) + if err != nil { + if err == storage.ErrDuplicateExemplar { + // Duplicate exemplar, noop. + return nil + } + return err + } + + _, ok := ce.index[seriesLabels] if !ok { ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} } else { - // Check for duplicate vs last stored exemplar for this series. - // NB these are expected, add appending them is a no-op. - if ce.exemplars[idx.newest].exemplar.Equals(e) { - return nil - } - - if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts { - ce.outOfOrderExemplars.Inc() - return storage.ErrOutOfOrderExemplar - } - ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex } @@ -218,13 +262,13 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp ce.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) if next := ce.exemplars[ce.nextIndex]; next != nil { ce.exemplarsInStorage.Set(float64(len(ce.exemplars))) - ce.lastExemplarsTs.Set(float64(next.exemplar.Ts)) + ce.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000) return nil } // We did not yet fill the buffer. ce.exemplarsInStorage.Set(float64(ce.nextIndex)) - ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts)) + ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) return nil } @@ -234,6 +278,10 @@ func (noopExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) err return nil } +func (noopExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error { + return nil +} + func (noopExemplarStorage) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) { return &noopExemplarQuerier{}, nil } diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index 1c1780d22a..cd6983ffb4 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -16,6 +16,7 @@ package tsdb import ( "reflect" "strconv" + "strings" "testing" "github.com/stretchr/testify/require" @@ -25,6 +26,66 @@ import ( "github.com/prometheus/prometheus/storage" ) +// Tests the same exemplar cases as AddExemplar, but specfically the ValidateExemplar function so it can be relied on externally. +func TestValidateExemplar(t *testing.T) { + exs, err := NewCircularExemplarStorage(2, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l := labels.Labels{ + {Name: "service", Value: "asdf"}, + } + e := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "qwerty", + }, + }, + Value: 0.1, + Ts: 1, + } + + require.NoError(t, es.ValidateExemplar(l, e)) + require.NoError(t, es.AddExemplar(l, e)) + + e2 := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "zxcvb", + }, + }, + Value: 0.1, + Ts: 2, + } + + require.NoError(t, es.ValidateExemplar(l, e2)) + require.NoError(t, es.AddExemplar(l, e2)) + + require.Equal(t, es.ValidateExemplar(l, e2), storage.ErrDuplicateExemplar, "error is expected attempting to validate duplicate exemplar") + + e3 := e2 + e3.Ts = 3 + require.Equal(t, es.ValidateExemplar(l, e3), storage.ErrDuplicateExemplar, "error is expected when attempting to add duplicate exemplar, even with different timestamp") + + e3.Ts = 1 + e3.Value = 0.3 + require.Equal(t, es.ValidateExemplar(l, e3), storage.ErrOutOfOrderExemplar) + + e4 := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "a", + Value: strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength), + }, + }, + Value: 0.1, + Ts: 2, + } + require.Equal(t, storage.ErrExemplarLabelLength, es.ValidateExemplar(l, e4)) +} + func TestAddExemplar(t *testing.T) { exs, err := NewCircularExemplarStorage(2, nil) require.NoError(t, err) @@ -44,8 +105,7 @@ func TestAddExemplar(t *testing.T) { Ts: 1, } - err = es.AddExemplar(l, e) - require.NoError(t, err) + require.NoError(t, es.AddExemplar(l, e)) require.Equal(t, es.index[l.String()].newest, 0, "exemplar was not stored correctly") e2 := exemplar.Exemplar{ @@ -59,23 +119,31 @@ func TestAddExemplar(t *testing.T) { Ts: 2, } - err = es.AddExemplar(l, e2) - require.NoError(t, err) + require.NoError(t, es.AddExemplar(l, e2)) require.Equal(t, es.index[l.String()].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") require.True(t, es.exemplars[es.index[l.String()].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].newest].exemplar) - err = es.AddExemplar(l, e2) - require.NoError(t, err, "no error is expected attempting to add duplicate exemplar") + require.NoError(t, es.AddExemplar(l, e2), "no error is expected attempting to add duplicate exemplar") e3 := e2 e3.Ts = 3 - err = es.AddExemplar(l, e3) - require.NoError(t, err, "no error is expected when attempting to add duplicate exemplar, even with different timestamp") + require.NoError(t, es.AddExemplar(l, e3), "no error is expected when attempting to add duplicate exemplar, even with different timestamp") e3.Ts = 1 e3.Value = 0.3 - err = es.AddExemplar(l, e3) - require.Equal(t, err, storage.ErrOutOfOrderExemplar) + require.Equal(t, storage.ErrOutOfOrderExemplar, es.AddExemplar(l, e3)) + + e4 := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "a", + Value: strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength), + }, + }, + Value: 0.1, + Ts: 2, + } + require.Equal(t, storage.ErrExemplarLabelLength, es.AddExemplar(l, e4)) } func TestStorageOverflow(t *testing.T) { diff --git a/tsdb/head.go b/tsdb/head.go index 595cd48d74..f98e21ebe9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -58,6 +58,7 @@ var ( type ExemplarStorage interface { storage.ExemplarQueryable AddExemplar(labels.Labels, exemplar.Exemplar) error + ValidateExemplar(labels.Labels, exemplar.Exemplar) error } // Head handles reads and writes of time series data within a time window. @@ -459,15 +460,17 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 + var unknownExemplarRefs atomic.Uint64 // Start workers that each process samples for a partition of the series ID space. // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - inputs = make([]chan []record.RefSample, n) - outputs = make([]chan []record.RefSample, n) + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []record.RefSample, n) + outputs = make([]chan []record.RefSample, n) + exemplarsInput chan record.RefExemplar dec record.Decoder shards = make([][]record.RefSample, n) @@ -489,6 +492,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks return []tombstones.Stone{} }, } + exemplarsPool = sync.Pool{ + New: func() interface{} { + return []record.RefExemplar{} + }, + } ) defer func() { @@ -500,6 +508,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks for range outputs[i] { } } + close(exemplarsInput) wg.Wait() } }() @@ -516,6 +525,29 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks }(inputs[i], outputs[i]) } + wg.Add(1) + exemplarsInput = make(chan record.RefExemplar, 300) + go func(input <-chan record.RefExemplar) { + defer wg.Done() + for e := range input { + ms := h.series.getByID(e.Ref) + if ms == nil { + unknownExemplarRefs.Inc() + continue + } + + if e.T < h.minValidTime.Load() { + continue + } + // At the moment the only possible error here is out of order exemplars, which we shouldn't see when + // replaying the WAL, so lets just log the error if it's not that type. + err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) + if err != nil && err == storage.ErrOutOfOrderExemplar { + level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) + } + } + }(exemplarsInput) + go func() { defer close(decoded) for r.Next() { @@ -557,6 +589,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks return } decoded <- tstones + case record.Exemplars: + exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] + exemplars, err = dec.Exemplars(rec, exemplars) + if err != nil { + decodeErr = &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode exemplars"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- exemplars default: // Noop. } @@ -646,6 +690,12 @@ Outer: } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. tstonesPool.Put(v) + case []record.RefExemplar: + for _, e := range v { + exemplarsInput <- e + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + exemplarsPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } @@ -667,14 +717,15 @@ Outer: for range outputs[i] { } } + close(exemplarsInput) wg.Wait() if r.Err() != nil { return errors.Wrap(r.Err(), "read records") } - if unknownRefs.Load() > 0 { - level.Warn(h.logger).Log("msg", "Unknown series references", "count", unknownRefs.Load()) + if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { + level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) } return nil } @@ -1339,6 +1390,15 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex // Ensure no empty labels have gotten through. e.Labels = e.Labels.WithoutEmpty() + err := a.exemplarAppender.ValidateExemplar(s.lset, e) + if err != nil { + if err == storage.ErrDuplicateExemplar { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + return 0, err + } + a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) return s.ref, nil @@ -1382,14 +1442,36 @@ func (a *headAppender) log() error { return errors.Wrap(err, "log samples") } } + if len(a.exemplars) > 0 { + rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log exemplars") + } + } return nil } +func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar { + ret := make([]record.RefExemplar, 0, len(es)) + for _, e := range es { + ret = append(ret, record.RefExemplar{ + Ref: e.ref, + T: e.exemplar.Ts, + V: e.exemplar.Value, + Labels: e.exemplar.Labels, + }) + } + return ret +} + func (a *headAppender) Commit() (err error) { if a.closed { return ErrAppenderClosed } defer func() { a.closed = true }() + if err := a.log(); err != nil { _ = a.Rollback() // Most likely the same error will happen again. return errors.Wrap(err, "write to WAL") @@ -1404,7 +1486,6 @@ func (a *headAppender) Commit() (err error) { continue } level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) - continue } } @@ -1458,7 +1539,9 @@ func (a *headAppender) Rollback() (err error) { series.Unlock() } a.head.putAppendBuffer(a.samples) + a.head.putExemplarBuffer(a.exemplars) a.samples = nil + a.exemplars = nil // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index dfd916b761..2afbc20811 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -51,6 +51,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts := DefaultHeadOptions() opts.ChunkRange = chunkRange opts.ChunkDirRoot = dir + opts.NumExemplars = 10 h, err := NewHead(nil, nil, wlog, opts) require.NoError(t, err) @@ -87,6 +88,8 @@ func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { require.NoError(t, w.Log(enc.Samples(v, nil))) case []tombstones.Stone: require.NoError(t, w.Log(enc.Tombstones(v, nil))) + case []record.RefExemplar: + require.NoError(t, w.Log(enc.Exemplars(v, nil))) } } } @@ -148,61 +151,91 @@ func BenchmarkLoadWAL(b *testing.B) { } labelsPerSeries := 5 + // Rough estimates of most common % of samples that have an exemplar for each scrape. + exemplarsPercentages := []float64{0, 0.5, 1, 5} + lastExemplarsPerSeries := -1 for _, c := range cases { - b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries), - func(b *testing.B) { - dir, err := ioutil.TempDir("", "test_load_wal") - require.NoError(b, err) - defer func() { - require.NoError(b, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, dir, false) - require.NoError(b, err) - - // Write series. - refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) - for k := 0; k < c.batches; k++ { - refSeries = refSeries[:0] - for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ { - lbls := make(map[string]string, labelsPerSeries) - lbls[defaultLabelName] = strconv.Itoa(i) - for j := 1; len(lbls) < labelsPerSeries; j++ { - lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) - } - refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)}) - } - populateTestWAL(b, w, []interface{}{refSeries}) - } - - // Write samples. - refSamples := make([]record.RefSample, 0, c.seriesPerBatch) - for i := 0; i < c.samplesPerSeries; i++ { - for j := 0; j < c.batches; j++ { - refSamples = refSamples[:0] - for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { - refSamples = append(refSamples, record.RefSample{ - Ref: uint64(k) * 100, - T: int64(i) * 10, - V: float64(i) * 100, - }) - } - populateTestWAL(b, w, []interface{}{refSamples}) - } - } - - b.ResetTimer() - - // Load the WAL. - for i := 0; i < b.N; i++ { - opts := DefaultHeadOptions() - opts.ChunkRange = 1000 - opts.ChunkDirRoot = w.Dir() - h, err := NewHead(nil, nil, w, opts) + for _, p := range exemplarsPercentages { + exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100)) + // For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries + // multiple times without this check. + if exemplarsPerSeries == lastExemplarsPerSeries { + continue + } + lastExemplarsPerSeries = exemplarsPerSeries + // fmt.Println("exemplars per series: ", exemplarsPerSeries) + b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries), + func(b *testing.B) { + dir, err := ioutil.TempDir("", "test_load_wal") require.NoError(b, err) - h.Init(0) - } - }) + defer func() { + require.NoError(b, os.RemoveAll(dir)) + }() + + w, err := wal.New(nil, nil, dir, false) + require.NoError(b, err) + + // Write series. + refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) + for k := 0; k < c.batches; k++ { + refSeries = refSeries[:0] + for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ { + lbls := make(map[string]string, labelsPerSeries) + lbls[defaultLabelName] = strconv.Itoa(i) + for j := 1; len(lbls) < labelsPerSeries; j++ { + lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) + } + refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)}) + } + populateTestWAL(b, w, []interface{}{refSeries}) + } + + // Write samples. + refSamples := make([]record.RefSample, 0, c.seriesPerBatch) + for i := 0; i < c.samplesPerSeries; i++ { + for j := 0; j < c.batches; j++ { + refSamples = refSamples[:0] + for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { + refSamples = append(refSamples, record.RefSample{ + Ref: uint64(k) * 100, + T: int64(i) * 10, + V: float64(i) * 100, + }) + } + populateTestWAL(b, w, []interface{}{refSamples}) + } + } + + // Write samples. + refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch) + for i := 0; i < exemplarsPerSeries; i++ { + for j := 0; j < c.batches; j++ { + refExemplars = refExemplars[:0] + for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { + refExemplars = append(refExemplars, record.RefExemplar{ + Ref: uint64(k) * 100, + T: int64(i) * 10, + V: float64(i) * 100, + Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), + }) + } + populateTestWAL(b, w, []interface{}{refExemplars}) + } + } + + b.ResetTimer() + + // Load the WAL. + for i := 0; i < b.N; i++ { + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = w.Dir() + h, err := NewHead(nil, nil, w, opts) + require.NoError(b, err) + h.Init(0) + } + }) + } } } @@ -233,6 +266,9 @@ func TestHead_ReadWAL(t *testing.T) { []tombstones.Stone{ {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, }, + []record.RefExemplar{ + {Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("traceID", "asdf")}, + }, } head, w := newTestHead(t, 1000, compress) @@ -266,6 +302,12 @@ func TestHead_ReadWAL(t *testing.T) { require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) require.Equal(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) + + q, err := head.ExemplarQuerier(context.Background()) + require.NoError(t, err) + e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}) + require.NoError(t, err) + require.Equal(t, e[0].Exemplars[0], exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("traceID", "asdf")}) }) } } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 408882e832..b4ee77f0f9 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -37,6 +37,8 @@ const ( Samples Type = 2 // Tombstones is used to match WAL records of type Tombstones. Tombstones Type = 3 + // Exemplars is used to match WAL records of type Exemplars. + Exemplars Type = 4 ) var ( @@ -57,6 +59,14 @@ type RefSample struct { V float64 } +// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series. +type RefExemplar struct { + Ref uint64 + T int64 + V float64 + Labels labels.Labels +} + // Decoder decodes series, sample, and tombstone records. // The zero value is ready to use. type Decoder struct { @@ -69,7 +79,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones: + case Series, Samples, Tombstones, Exemplars: return t } return Unknown @@ -166,6 +176,48 @@ func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombston return tstones, nil } +func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar, error) { + dec := encoding.Decbuf{B: rec} + t := Type(dec.Byte()) + if t != Exemplars { + return nil, errors.New("invalid record type") + } + if dec.Len() == 0 { + return exemplars, nil + } + var ( + baseRef = dec.Be64() + baseTime = dec.Be64int64() + ) + for len(dec.B) > 0 && dec.Err() == nil { + dref := dec.Varint64() + dtime := dec.Varint64() + val := dec.Be64() + + lset := make(labels.Labels, dec.Uvarint()) + for i := range lset { + lset[i].Name = dec.UvarintStr() + lset[i].Value = dec.UvarintStr() + } + sort.Sort(lset) + + exemplars = append(exemplars, RefExemplar{ + Ref: baseRef + uint64(dref), + T: baseTime + dtime, + V: math.Float64frombits(val), + Labels: lset, + }) + } + + if dec.Err() != nil { + return nil, errors.Wrapf(dec.Err(), "decode error after %d exemplars", len(exemplars)) + } + if len(dec.B) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + return exemplars, nil +} + // Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. type Encoder struct { @@ -226,3 +278,33 @@ func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte { } return buf.Get() } + +func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(Exemplars)) + + if len(exemplars) == 0 { + return buf.Get() + } + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + first := exemplars[0] + + buf.PutBE64(first.Ref) + buf.PutBE64int64(first.T) + + for _, ex := range exemplars { + buf.PutVarint64(int64(ex.Ref) - int64(first.Ref)) + buf.PutVarint64(ex.T - first.T) + buf.PutBE64(math.Float64bits(ex.V)) + + buf.PutUvarint(len(ex.Labels)) + for _, l := range ex.Labels { + buf.PutUvarintStr(l.Name) + buf.PutUvarintStr(l.Value) + } + } + + return buf.Get() +} diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index 73f35478ec..f69989d502 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -74,6 +74,15 @@ func TestRecord_EncodeDecode(t *testing.T) { {Ref: 13, Intervals: tombstones.Intervals{{Mint: -1000, Maxt: -11}}}, {Ref: 13, Intervals: tombstones.Intervals{{Mint: 5000, Maxt: 1000}}}, }, decTstones) + + exemplars := []RefExemplar{ + {Ref: 0, T: 12423423, V: 1.2345, Labels: labels.FromStrings("traceID", "qwerty")}, + {Ref: 123, T: -1231, V: -123, Labels: labels.FromStrings("traceID", "asdf")}, + {Ref: 2, T: 0, V: 99999, Labels: labels.FromStrings("traceID", "zxcv")}, + } + decExemplars, err := dec.Exemplars(enc.Exemplars(exemplars, nil), nil) + require.NoError(t, err) + require.Equal(t, exemplars, decExemplars) } // TestRecord_Corrupted ensures that corrupted records return the correct error. @@ -117,6 +126,16 @@ func TestRecord_Corrupted(t *testing.T) { _, err := dec.Tombstones(corrupted, nil) require.Equal(t, err, encoding.ErrInvalidSize) }) + + t.Run("Test corrupted exemplar record", func(t *testing.T) { + exemplars := []RefExemplar{ + {Ref: 0, T: 12423423, V: 1.2345, Labels: labels.FromStrings("traceID", "asdf")}, + } + + corrupted := enc.Exemplars(exemplars, nil)[:8] + _, err := dec.Exemplars(corrupted, nil) + require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize) + }) } func TestRecord_Type(t *testing.T) { diff --git a/tsdb/wal/checkpoint.go b/tsdb/wal/checkpoint.go index a264e1e958..1f76816709 100644 --- a/tsdb/wal/checkpoint.go +++ b/tsdb/wal/checkpoint.go @@ -40,9 +40,11 @@ type CheckpointStats struct { DroppedSeries int DroppedSamples int DroppedTombstones int + DroppedExemplars int TotalSeries int // Processed series including dropped ones. TotalSamples int // Processed samples including dropped ones. TotalTombstones int // Processed tombstones including dropped ones. + TotalExemplars int // Processed exemplars including dropped ones. } // LastCheckpoint returns the directory name and index of the most recent checkpoint. @@ -144,16 +146,17 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bo r := NewReader(sgmReader) var ( - series []record.RefSeries - samples []record.RefSample - tstones []tombstones.Stone - dec record.Decoder - enc record.Encoder - buf []byte - recs [][]byte + series []record.RefSeries + samples []record.RefSample + tstones []tombstones.Stone + exemplars []record.RefExemplar + dec record.Decoder + enc record.Encoder + buf []byte + recs [][]byte ) for r.Next() { - series, samples, tstones = series[:0], samples[:0], tstones[:0] + series, samples, tstones, exemplars = series[:0], samples[:0], tstones[:0], exemplars[:0] // We don't reset the buffer since we batch up multiple records // before writing them to the checkpoint. @@ -219,6 +222,23 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bo stats.TotalTombstones += len(tstones) stats.DroppedTombstones += len(tstones) - len(repl) + case record.Exemplars: + exemplars, err = dec.Exemplars(rec, exemplars) + if err != nil { + return nil, errors.Wrap(err, "decode exemplars") + } + // Drop irrelevant exemplars in place. + repl := exemplars[:0] + for _, e := range exemplars { + if e.T >= mint { + repl = append(repl, e) + } + } + if len(repl) > 0 { + buf = enc.Exemplars(repl, buf) + } + stats.TotalExemplars += len(exemplars) + stats.DroppedExemplars += len(exemplars) - len(repl) default: // Unknown record type, probably from a future Prometheus version. continue diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wal/checkpoint_test.go index 034d95917a..c843aa12cb 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -177,6 +177,11 @@ func TestCheckpoint(t *testing.T) { }, nil) require.NoError(t, w.Log(b)) + b = enc.Exemplars([]record.RefExemplar{ + {Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i))}, + }, nil) + require.NoError(t, w.Log(b)) + last += 100 } require.NoError(t, w.Close()) @@ -215,6 +220,12 @@ func TestCheckpoint(t *testing.T) { for _, s := range samples { require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") } + case record.Exemplars: + exemplars, err := dec.Exemplars(rec, nil) + require.NoError(t, err) + for _, e := range exemplars { + require.GreaterOrEqual(t, e.T, last/2, "exemplar with wrong timestamp") + } } } require.NoError(t, r.Err()) diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 8670567f13..5deb608c35 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -46,6 +46,7 @@ const ( // and it is left to the implementer to make sure they are safe. type WriteTo interface { Append([]record.RefSample) bool + AppendExemplars([]record.RefExemplar) bool StoreSeries([]record.RefSeries, int) // SeriesReset is called after reading a checkpoint to allow the deletion // of all series created in a segment lower than the argument. @@ -66,6 +67,7 @@ type Watcher struct { logger log.Logger walDir string lastCheckpoint string + sendExemplars bool metrics *WatcherMetrics readerMetrics *LiveReaderMetrics @@ -136,7 +138,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string, sendExemplars bool) *Watcher { if logger == nil { logger = log.NewNopLogger() } @@ -147,8 +149,10 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge readerMetrics: readerMetrics, walDir: path.Join(walDir, "wal"), name: name, - quit: make(chan struct{}), - done: make(chan struct{}), + sendExemplars: sendExemplars, + + quit: make(chan struct{}), + done: make(chan struct{}), MaxSegment: -1, } @@ -462,10 +466,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( - dec record.Decoder - series []record.RefSeries - samples []record.RefSample - send []record.RefSample + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + send []record.RefSample + exemplars []record.RefExemplar ) for r.Next() && !isClosed(w.quit) { rec := r.Record() @@ -507,6 +512,23 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { send = send[:0] } + case record.Exemplars: + // Skip if experimental "exemplars over remote write" is not enabled. + if !w.sendExemplars { + break + } + // If we're not tailing a segment we can ignore any exemplars records we see. + // This speeds up replay of the WAL significantly. + if !tail { + break + } + exemplars, err := dec.Exemplars(rec, exemplars[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.AppendExemplars(exemplars) + case record.Tombstones: default: diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index 1627520803..03234c5dbe 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -50,6 +50,7 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) { type writeToMock struct { samplesAppended int + exemplarsAppended int seriesLock sync.Mutex seriesSegmentIndexes map[uint64]int } @@ -59,6 +60,11 @@ func (wtm *writeToMock) Append(s []record.RefSample) bool { return true } +func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { + wtm.exemplarsAppended += len(e) + return true +} + func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { wtm.seriesLock.Lock() defer wtm.seriesLock.Unlock() @@ -95,6 +101,7 @@ func TestTailSamples(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 const samplesCount = 250 + const exemplarsCount = 25 for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { now := time.Now() @@ -138,6 +145,19 @@ func TestTailSamples(t *testing.T) { }, nil) require.NoError(t, w.Log(sample)) } + + for j := 0; j < exemplarsCount; j++ { + inner := rand.Intn(ref + 1) + exemplar := enc.Exemplars([]record.RefExemplar{ + { + Ref: uint64(inner), + T: now.UnixNano() + 1, + V: float64(i), + Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", inner)), + }, + }, nil) + require.NoError(t, w.Log(exemplar)) + } } // Start read after checkpoint, no more data written. @@ -145,7 +165,7 @@ func TestTailSamples(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -162,11 +182,13 @@ func TestTailSamples(t *testing.T) { expectedSeries := seriesCount expectedSamples := seriesCount * samplesCount + expectedExemplars := seriesCount * exemplarsCount retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expectedSeries }) - require.Equal(t, expectedSeries, wt.checkNumLabels()) - require.Equal(t, expectedSamples, wt.samplesAppended) + require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series") + require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") + require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") }) } } @@ -229,7 +251,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) go watcher.Start() expected := seriesCount @@ -322,7 +344,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) go watcher.Start() expected := seriesCount * 2 @@ -392,7 +414,7 @@ func TestReadCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) go watcher.Start() expectedSeries := seriesCount @@ -465,7 +487,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -541,7 +563,7 @@ func TestCheckpointSeriesReset(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) watcher.MaxSegment = -1 go watcher.Start()