Alternative approach: bundle metadata in TimeSeries protobuf

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
This commit is contained in:
Paschalis Tsilias 2023-04-24 16:43:38 +03:00
parent 49cccc6c2e
commit 0093563165
9 changed files with 606 additions and 161 deletions

View file

@ -857,6 +857,7 @@ type RemoteWriteConfig struct {
Name string `yaml:"name,omitempty"` Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
SendWALMetadata bool `yaml:"send_metadata,omitempty"` // TODO(@tpaschalis) Adding an extra field to enable us to remove the `metadata_config` struct in the future.
// We cannot do proper Go type embedding below as the parser will then parse // We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types. // values arbitrarily into the overflow maps of further-down types.
@ -965,10 +966,6 @@ type MetadataConfig struct {
SendInterval model.Duration `yaml:"send_interval"` SendInterval model.Duration `yaml:"send_interval"`
// Maximum number of samples per send. // Maximum number of samples per send.
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
// SendFromWAL controls whether we send metadata from the WAL
// TODO (@tpaschalis) Maybe this should also be the feature flag that
// disables the current MetadataWatcher?
SendFromWAL bool `yaml:"send_from_wal,omitempty"`
} }
// RemoteReadConfig is the configuration for reading from remote storage. // RemoteReadConfig is the configuration for reading from remote storage.

View file

@ -68,6 +68,53 @@ func (MetricMetadata_MetricType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{0, 0} return fileDescriptor_d938547f84707355, []int{0, 0}
} }
// The protobuf style guide recommends setting enum values as uppercase.
// Having them lowercase would save us an instruction every time we convert
// from textparse.MetricType.
// https://protobuf.dev/programming-guides/style/#enums
type Metadata_MetricType int32
const (
Metadata_UNKNOWN Metadata_MetricType = 0
Metadata_COUNTER Metadata_MetricType = 1
Metadata_GAUGE Metadata_MetricType = 2
Metadata_HISTOGRAM Metadata_MetricType = 3
Metadata_GAUGEHISTOGRAM Metadata_MetricType = 4
Metadata_SUMMARY Metadata_MetricType = 5
Metadata_INFO Metadata_MetricType = 6
Metadata_STATESET Metadata_MetricType = 7
)
var Metadata_MetricType_name = map[int32]string{
0: "UNKNOWN",
1: "COUNTER",
2: "GAUGE",
3: "HISTOGRAM",
4: "GAUGEHISTOGRAM",
5: "SUMMARY",
6: "INFO",
7: "STATESET",
}
var Metadata_MetricType_value = map[string]int32{
"UNKNOWN": 0,
"COUNTER": 1,
"GAUGE": 2,
"HISTOGRAM": 3,
"GAUGEHISTOGRAM": 4,
"SUMMARY": 5,
"INFO": 6,
"STATESET": 7,
}
func (x Metadata_MetricType) String() string {
return proto.EnumName(Metadata_MetricType_name, int32(x))
}
func (Metadata_MetricType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{1, 0}
}
type Histogram_ResetHint int32 type Histogram_ResetHint int32
const ( const (
@ -96,7 +143,7 @@ func (x Histogram_ResetHint) String() string {
} }
func (Histogram_ResetHint) EnumDescriptor() ([]byte, []int) { func (Histogram_ResetHint) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{3, 0} return fileDescriptor_d938547f84707355, []int{4, 0}
} }
type LabelMatcher_Type int32 type LabelMatcher_Type int32
@ -127,7 +174,7 @@ func (x LabelMatcher_Type) String() string {
} }
func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{8, 0} return fileDescriptor_d938547f84707355, []int{9, 0}
} }
// We require this to match chunkenc.Encoding. // We require this to match chunkenc.Encoding.
@ -159,7 +206,7 @@ func (x Chunk_Encoding) String() string {
} }
func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) { func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{10, 0} return fileDescriptor_d938547f84707355, []int{11, 0}
} }
type MetricMetadata struct { type MetricMetadata struct {
@ -235,6 +282,69 @@ func (m *MetricMetadata) GetUnit() string {
return "" return ""
} }
type Metadata struct {
Type Metadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.Metadata_MetricType" json:"type,omitempty"`
Help string `protobuf:"bytes,2,opt,name=help,proto3" json:"help,omitempty"`
Unit string `protobuf:"bytes,3,opt,name=unit,proto3" json:"unit,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Metadata) Reset() { *m = Metadata{} }
func (m *Metadata) String() string { return proto.CompactTextString(m) }
func (*Metadata) ProtoMessage() {}
func (*Metadata) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{1}
}
func (m *Metadata) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Metadata.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Metadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_Metadata.Merge(m, src)
}
func (m *Metadata) XXX_Size() int {
return m.Size()
}
func (m *Metadata) XXX_DiscardUnknown() {
xxx_messageInfo_Metadata.DiscardUnknown(m)
}
var xxx_messageInfo_Metadata proto.InternalMessageInfo
func (m *Metadata) GetType() Metadata_MetricType {
if m != nil {
return m.Type
}
return Metadata_UNKNOWN
}
func (m *Metadata) GetHelp() string {
if m != nil {
return m.Help
}
return ""
}
func (m *Metadata) GetUnit() string {
if m != nil {
return m.Unit
}
return ""
}
type Sample struct { type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
// timestamp is in ms format, see model/timestamp/timestamp.go for // timestamp is in ms format, see model/timestamp/timestamp.go for
@ -249,7 +359,7 @@ func (m *Sample) Reset() { *m = Sample{} }
func (m *Sample) String() string { return proto.CompactTextString(m) } func (m *Sample) String() string { return proto.CompactTextString(m) }
func (*Sample) ProtoMessage() {} func (*Sample) ProtoMessage() {}
func (*Sample) Descriptor() ([]byte, []int) { func (*Sample) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{1} return fileDescriptor_d938547f84707355, []int{2}
} }
func (m *Sample) XXX_Unmarshal(b []byte) error { func (m *Sample) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -308,7 +418,7 @@ func (m *Exemplar) Reset() { *m = Exemplar{} }
func (m *Exemplar) String() string { return proto.CompactTextString(m) } func (m *Exemplar) String() string { return proto.CompactTextString(m) }
func (*Exemplar) ProtoMessage() {} func (*Exemplar) ProtoMessage() {}
func (*Exemplar) Descriptor() ([]byte, []int) { func (*Exemplar) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{2} return fileDescriptor_d938547f84707355, []int{3}
} }
func (m *Exemplar) XXX_Unmarshal(b []byte) error { func (m *Exemplar) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -412,7 +522,7 @@ func (m *Histogram) Reset() { *m = Histogram{} }
func (m *Histogram) String() string { return proto.CompactTextString(m) } func (m *Histogram) String() string { return proto.CompactTextString(m) }
func (*Histogram) ProtoMessage() {} func (*Histogram) ProtoMessage() {}
func (*Histogram) Descriptor() ([]byte, []int) { func (*Histogram) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{3} return fileDescriptor_d938547f84707355, []int{4}
} }
func (m *Histogram) XXX_Unmarshal(b []byte) error { func (m *Histogram) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -615,7 +725,7 @@ func (m *BucketSpan) Reset() { *m = BucketSpan{} }
func (m *BucketSpan) String() string { return proto.CompactTextString(m) } func (m *BucketSpan) String() string { return proto.CompactTextString(m) }
func (*BucketSpan) ProtoMessage() {} func (*BucketSpan) ProtoMessage() {}
func (*BucketSpan) Descriptor() ([]byte, []int) { func (*BucketSpan) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{4} return fileDescriptor_d938547f84707355, []int{5}
} }
func (m *BucketSpan) XXX_Unmarshal(b []byte) error { func (m *BucketSpan) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -666,6 +776,7 @@ type TimeSeries struct {
Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"`
Exemplars []Exemplar `protobuf:"bytes,3,rep,name=exemplars,proto3" json:"exemplars"` Exemplars []Exemplar `protobuf:"bytes,3,rep,name=exemplars,proto3" json:"exemplars"`
Histograms []Histogram `protobuf:"bytes,4,rep,name=histograms,proto3" json:"histograms"` Histograms []Histogram `protobuf:"bytes,4,rep,name=histograms,proto3" json:"histograms"`
Metadatas []Metadata `protobuf:"bytes,5,rep,name=metadatas,proto3" json:"metadatas"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -675,7 +786,7 @@ func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) } func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) { func (*TimeSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{5} return fileDescriptor_d938547f84707355, []int{6}
} }
func (m *TimeSeries) XXX_Unmarshal(b []byte) error { func (m *TimeSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -732,6 +843,13 @@ func (m *TimeSeries) GetHistograms() []Histogram {
return nil return nil
} }
func (m *TimeSeries) GetMetadatas() []Metadata {
if m != nil {
return m.Metadatas
}
return nil
}
type Label struct { type Label struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
@ -744,7 +862,7 @@ func (m *Label) Reset() { *m = Label{} }
func (m *Label) String() string { return proto.CompactTextString(m) } func (m *Label) String() string { return proto.CompactTextString(m) }
func (*Label) ProtoMessage() {} func (*Label) ProtoMessage() {}
func (*Label) Descriptor() ([]byte, []int) { func (*Label) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{6} return fileDescriptor_d938547f84707355, []int{7}
} }
func (m *Label) XXX_Unmarshal(b []byte) error { func (m *Label) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -798,7 +916,7 @@ func (m *Labels) Reset() { *m = Labels{} }
func (m *Labels) String() string { return proto.CompactTextString(m) } func (m *Labels) String() string { return proto.CompactTextString(m) }
func (*Labels) ProtoMessage() {} func (*Labels) ProtoMessage() {}
func (*Labels) Descriptor() ([]byte, []int) { func (*Labels) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{7} return fileDescriptor_d938547f84707355, []int{8}
} }
func (m *Labels) XXX_Unmarshal(b []byte) error { func (m *Labels) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -848,7 +966,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} }
func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) }
func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) ProtoMessage() {}
func (*LabelMatcher) Descriptor() ([]byte, []int) { func (*LabelMatcher) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{8} return fileDescriptor_d938547f84707355, []int{9}
} }
func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { func (m *LabelMatcher) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -915,7 +1033,7 @@ func (m *ReadHints) Reset() { *m = ReadHints{} }
func (m *ReadHints) String() string { return proto.CompactTextString(m) } func (m *ReadHints) String() string { return proto.CompactTextString(m) }
func (*ReadHints) ProtoMessage() {} func (*ReadHints) ProtoMessage() {}
func (*ReadHints) Descriptor() ([]byte, []int) { func (*ReadHints) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{9} return fileDescriptor_d938547f84707355, []int{10}
} }
func (m *ReadHints) XXX_Unmarshal(b []byte) error { func (m *ReadHints) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -1009,7 +1127,7 @@ func (m *Chunk) Reset() { *m = Chunk{} }
func (m *Chunk) String() string { return proto.CompactTextString(m) } func (m *Chunk) String() string { return proto.CompactTextString(m) }
func (*Chunk) ProtoMessage() {} func (*Chunk) ProtoMessage() {}
func (*Chunk) Descriptor() ([]byte, []int) { func (*Chunk) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{10} return fileDescriptor_d938547f84707355, []int{11}
} }
func (m *Chunk) XXX_Unmarshal(b []byte) error { func (m *Chunk) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -1081,7 +1199,7 @@ func (m *ChunkedSeries) Reset() { *m = ChunkedSeries{} }
func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) } func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) }
func (*ChunkedSeries) ProtoMessage() {} func (*ChunkedSeries) ProtoMessage() {}
func (*ChunkedSeries) Descriptor() ([]byte, []int) { func (*ChunkedSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{11} return fileDescriptor_d938547f84707355, []int{12}
} }
func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error { func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -1126,10 +1244,12 @@ func (m *ChunkedSeries) GetChunks() []Chunk {
func init() { func init() {
proto.RegisterEnum("prometheus.MetricMetadata_MetricType", MetricMetadata_MetricType_name, MetricMetadata_MetricType_value) proto.RegisterEnum("prometheus.MetricMetadata_MetricType", MetricMetadata_MetricType_name, MetricMetadata_MetricType_value)
proto.RegisterEnum("prometheus.Metadata_MetricType", Metadata_MetricType_name, Metadata_MetricType_value)
proto.RegisterEnum("prometheus.Histogram_ResetHint", Histogram_ResetHint_name, Histogram_ResetHint_value) proto.RegisterEnum("prometheus.Histogram_ResetHint", Histogram_ResetHint_name, Histogram_ResetHint_value)
proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value)
proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value) proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value)
proto.RegisterType((*MetricMetadata)(nil), "prometheus.MetricMetadata") proto.RegisterType((*MetricMetadata)(nil), "prometheus.MetricMetadata")
proto.RegisterType((*Metadata)(nil), "prometheus.Metadata")
proto.RegisterType((*Sample)(nil), "prometheus.Sample") proto.RegisterType((*Sample)(nil), "prometheus.Sample")
proto.RegisterType((*Exemplar)(nil), "prometheus.Exemplar") proto.RegisterType((*Exemplar)(nil), "prometheus.Exemplar")
proto.RegisterType((*Histogram)(nil), "prometheus.Histogram") proto.RegisterType((*Histogram)(nil), "prometheus.Histogram")
@ -1146,76 +1266,78 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{ var fileDescriptor_d938547f84707355 = []byte{
// 1092 bytes of a gzipped FileDescriptorProto // 1126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x6e, 0xdb, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdd, 0x6e, 0x1b, 0x45,
0x13, 0x36, 0x49, 0x89, 0x12, 0x47, 0x87, 0xd0, 0xfb, 0x3b, 0xf9, 0x59, 0xa3, 0x71, 0x54, 0x02, 0x14, 0xce, 0xee, 0xda, 0x6b, 0xef, 0xf1, 0x4f, 0x36, 0x43, 0x5a, 0x96, 0x88, 0xa6, 0x61, 0xa5,
0x69, 0x85, 0xa2, 0x90, 0x11, 0xb7, 0x17, 0x0d, 0x1a, 0x14, 0xb0, 0x1d, 0xf9, 0x80, 0x5a, 0x12, 0x82, 0x85, 0x90, 0xa3, 0xa6, 0x5c, 0x50, 0x51, 0x21, 0x25, 0xa9, 0xf3, 0x23, 0xb2, 0xb6, 0x3a,
0xb2, 0x92, 0xd1, 0xa6, 0x37, 0xc2, 0x5a, 0x5a, 0x4b, 0x44, 0xc4, 0x43, 0xb9, 0xab, 0xc0, 0xea, 0x76, 0x04, 0xe5, 0xc6, 0x9a, 0xd8, 0x13, 0x7b, 0xd5, 0xfd, 0x63, 0x67, 0x5c, 0xc5, 0xbc, 0x07,
0x7b, 0xf4, 0xae, 0x2f, 0xd1, 0xb7, 0x08, 0xd0, 0x9b, 0xf6, 0x05, 0x8a, 0xc2, 0x57, 0x7d, 0x8c, 0x77, 0x5c, 0xf1, 0x06, 0xbc, 0x45, 0x25, 0x6e, 0xe0, 0x05, 0x10, 0xca, 0x93, 0xa0, 0x99, 0xfd,
0x62, 0x87, 0xa4, 0x48, 0xc5, 0x29, 0xd0, 0xf4, 0x6e, 0xe7, 0x9b, 0x6f, 0x76, 0x3e, 0xee, 0xce, 0x75, 0xdc, 0x4a, 0x94, 0x2b, 0xee, 0xf6, 0x9c, 0xf3, 0x9d, 0x39, 0xdf, 0x9c, 0x39, 0x3f, 0x0b,
0xcc, 0x12, 0x6a, 0x72, 0x15, 0x71, 0xd1, 0x89, 0xe2, 0x50, 0x86, 0x04, 0xa2, 0x38, 0xf4, 0xb9, 0x0d, 0xbe, 0x8c, 0x28, 0xeb, 0x46, 0x71, 0xc8, 0x43, 0x04, 0x51, 0x1c, 0xfa, 0x94, 0xcf, 0xe9,
0x9c, 0xf3, 0xa5, 0xd8, 0xdd, 0x99, 0x85, 0xb3, 0x10, 0xe1, 0x7d, 0xb5, 0x4a, 0x18, 0xee, 0xcf, 0x82, 0xed, 0x6c, 0xcf, 0xc2, 0x59, 0x28, 0xd5, 0xfb, 0xe2, 0x2b, 0x41, 0xd8, 0xbf, 0xa8, 0xd0,
0x3a, 0x34, 0x7b, 0x5c, 0xc6, 0xde, 0xa4, 0xc7, 0x25, 0x9b, 0x32, 0xc9, 0xc8, 0x53, 0x28, 0xa9, 0x76, 0x28, 0x8f, 0xdd, 0x89, 0x43, 0x39, 0x99, 0x12, 0x4e, 0xd0, 0x53, 0xa8, 0x88, 0x33, 0x2c,
0x3d, 0x1c, 0xad, 0xa5, 0xb5, 0x9b, 0x07, 0x8f, 0x3b, 0xf9, 0x1e, 0x9d, 0x4d, 0x66, 0x6a, 0x8e, 0x65, 0x4f, 0xe9, 0xb4, 0x0f, 0x1e, 0x75, 0x8b, 0x33, 0xba, 0xab, 0xc8, 0x54, 0x1c, 0x2d, 0x23,
0x56, 0x11, 0xa7, 0x18, 0x42, 0x3e, 0x03, 0xe2, 0x23, 0x36, 0xbe, 0x66, 0xbe, 0xb7, 0x58, 0x8d, 0x8a, 0xa5, 0x0b, 0xfa, 0x02, 0x90, 0x2f, 0x75, 0xe3, 0x6b, 0xe2, 0xbb, 0xde, 0x72, 0x1c, 0x10,
0x03, 0xe6, 0x73, 0x47, 0x6f, 0x69, 0x6d, 0x8b, 0xda, 0x89, 0xe7, 0x04, 0x1d, 0x7d, 0xe6, 0x73, 0x9f, 0x5a, 0xea, 0x9e, 0xd2, 0x31, 0xb0, 0x99, 0x58, 0x4e, 0xa4, 0xa1, 0x4f, 0x7c, 0x8a, 0x10,
0x42, 0xa0, 0x34, 0xe7, 0x8b, 0xc8, 0x29, 0xa1, 0x1f, 0xd7, 0x0a, 0x5b, 0x06, 0x9e, 0x74, 0xca, 0x54, 0xe6, 0xd4, 0x8b, 0xac, 0x8a, 0xb4, 0xcb, 0x6f, 0xa1, 0x5b, 0x04, 0x2e, 0xb7, 0xaa, 0x89,
0x09, 0xa6, 0xd6, 0xee, 0x0a, 0x20, 0xcf, 0x44, 0x6a, 0x50, 0xb9, 0xec, 0x7f, 0xd3, 0x1f, 0x7c, 0x4e, 0x7c, 0xdb, 0x4b, 0x80, 0x22, 0x12, 0x6a, 0x40, 0xed, 0xb2, 0xff, 0x6d, 0x7f, 0xf0, 0x5d,
0xdb, 0xb7, 0xb7, 0x94, 0x71, 0x3c, 0xb8, 0xec, 0x8f, 0xba, 0xd4, 0xd6, 0x88, 0x05, 0xe5, 0xd3, 0xdf, 0xdc, 0x10, 0xc2, 0xf1, 0xe0, 0xb2, 0x3f, 0xea, 0x61, 0x53, 0x41, 0x06, 0x54, 0x4f, 0x0f,
0xc3, 0xcb, 0xd3, 0xae, 0xad, 0x93, 0x06, 0x58, 0x67, 0xe7, 0xc3, 0xd1, 0xe0, 0x94, 0x1e, 0xf6, 0x2f, 0x4f, 0x7b, 0xa6, 0x8a, 0x5a, 0x60, 0x9c, 0x9d, 0x0f, 0x47, 0x83, 0x53, 0x7c, 0xe8, 0x98,
0x6c, 0x83, 0x10, 0x68, 0xa2, 0x27, 0xc7, 0x4a, 0x2a, 0x74, 0x78, 0xd9, 0xeb, 0x1d, 0xd2, 0x97, 0x1a, 0x42, 0xd0, 0x96, 0x96, 0x42, 0x57, 0x11, 0xae, 0xc3, 0x4b, 0xc7, 0x39, 0xc4, 0x2f, 0xcd,
0x76, 0x99, 0x54, 0xa1, 0x74, 0xde, 0x3f, 0x19, 0xd8, 0x26, 0xa9, 0x43, 0x75, 0x38, 0x3a, 0x1c, 0x2a, 0xaa, 0x43, 0xe5, 0xbc, 0x7f, 0x32, 0x30, 0x75, 0xd4, 0x84, 0xfa, 0x70, 0x74, 0x38, 0xea,
0x75, 0x87, 0xdd, 0x91, 0x5d, 0x71, 0x9f, 0x81, 0x39, 0x64, 0x7e, 0xb4, 0xe0, 0x64, 0x07, 0xca, 0x0d, 0x7b, 0x23, 0xb3, 0x66, 0xdf, 0x2a, 0x50, 0xcf, 0x13, 0xf3, 0x64, 0x25, 0x31, 0x0f, 0xef,
0xaf, 0xd9, 0x62, 0x99, 0x1c, 0x8b, 0x46, 0x13, 0x83, 0x7c, 0x08, 0x96, 0xf4, 0x7c, 0x2e, 0x24, 0x24, 0xe6, 0x1d, 0x29, 0xc9, 0x2e, 0xa9, 0xbe, 0xe5, 0x92, 0xda, 0xff, 0xe3, 0x92, 0xcf, 0x40,
0xf3, 0x23, 0xfc, 0x4e, 0x83, 0xe6, 0x80, 0x1b, 0x42, 0xb5, 0x7b, 0xc3, 0xfd, 0x68, 0xc1, 0x62, 0x1f, 0x12, 0x3f, 0xf2, 0x28, 0xda, 0x86, 0xea, 0x6b, 0xe2, 0x2d, 0x92, 0x2b, 0x2a, 0x38, 0x11,
0xb2, 0x0f, 0xe6, 0x82, 0x5d, 0xf1, 0x85, 0x70, 0xb4, 0x96, 0xd1, 0xae, 0x1d, 0x6c, 0x17, 0xcf, 0xd0, 0xc7, 0x60, 0x70, 0xd7, 0xa7, 0x8c, 0x13, 0x3f, 0xb9, 0x87, 0x86, 0x0b, 0x85, 0x1d, 0x42,
0xf5, 0x42, 0x79, 0x8e, 0x4a, 0x6f, 0xfe, 0x78, 0xb4, 0x45, 0x53, 0x5a, 0x9e, 0x50, 0xff, 0xc7, 0xbd, 0x77, 0x43, 0xfd, 0xc8, 0x23, 0x31, 0xda, 0x07, 0xdd, 0x23, 0x57, 0xd4, 0x63, 0x96, 0xb2,
0x84, 0xc6, 0xdb, 0x09, 0x7f, 0x2d, 0x83, 0x75, 0xe6, 0x09, 0x19, 0xce, 0x62, 0xe6, 0x93, 0x87, 0xa7, 0x75, 0x1a, 0x07, 0x5b, 0xe5, 0x1c, 0x5d, 0x08, 0xcb, 0x51, 0xe5, 0xcd, 0x5f, 0x0f, 0x37,
0x60, 0x4d, 0xc2, 0x65, 0x20, 0xc7, 0x5e, 0x20, 0x51, 0x76, 0xe9, 0x6c, 0x8b, 0x56, 0x11, 0x3a, 0x70, 0x0a, 0x2b, 0x02, 0xaa, 0xef, 0x0c, 0xa8, 0xdd, 0x0d, 0xf8, 0x7b, 0x15, 0x8c, 0x33, 0x97,
0x0f, 0x24, 0xf9, 0x08, 0x6a, 0x89, 0xfb, 0x7a, 0x11, 0x32, 0x99, 0xa4, 0x39, 0xdb, 0xa2, 0x80, 0xf1, 0x70, 0x16, 0x13, 0x1f, 0x3d, 0x00, 0x63, 0x12, 0x2e, 0x02, 0x3e, 0x76, 0x03, 0x2e, 0x69,
0xe0, 0x89, 0xc2, 0x88, 0x0d, 0x86, 0x58, 0xfa, 0x98, 0x47, 0xa3, 0x6a, 0x49, 0x1e, 0x80, 0x29, 0x57, 0xce, 0x36, 0x70, 0x5d, 0xaa, 0xce, 0x03, 0x8e, 0x3e, 0x81, 0x46, 0x62, 0xbe, 0xf6, 0x42,
0x26, 0x73, 0xee, 0x33, 0xbc, 0xb5, 0x6d, 0x9a, 0x5a, 0xe4, 0x31, 0x34, 0x7f, 0xe4, 0x71, 0x38, 0xc2, 0x93, 0x30, 0x67, 0x1b, 0x18, 0xa4, 0xf2, 0x44, 0xe8, 0x90, 0x09, 0x1a, 0x5b, 0xf8, 0x32,
0x96, 0xf3, 0x98, 0x8b, 0x79, 0xb8, 0x98, 0xe2, 0x0d, 0x6a, 0xb4, 0xa1, 0xd0, 0x51, 0x06, 0x92, 0x8e, 0x82, 0xc5, 0x27, 0xba, 0x0f, 0x3a, 0x9b, 0xcc, 0xa9, 0x4f, 0x64, 0x69, 0x6e, 0xe1, 0x54,
0x8f, 0x53, 0x5a, 0xae, 0xcb, 0x44, 0x5d, 0x1a, 0xad, 0x2b, 0xfc, 0x38, 0xd3, 0xf6, 0x29, 0xd8, 0x42, 0x8f, 0xa0, 0xfd, 0x13, 0x8d, 0xc3, 0x31, 0x9f, 0xc7, 0x94, 0xcd, 0x43, 0x6f, 0x2a, 0xcb,
0x05, 0x5e, 0x22, 0xb0, 0x82, 0x02, 0x35, 0xda, 0x5c, 0x33, 0x13, 0x91, 0xc7, 0xd0, 0x0c, 0xf8, 0x54, 0xc1, 0x2d, 0xa1, 0x1d, 0x65, 0x4a, 0xf4, 0x69, 0x0a, 0x2b, 0x78, 0xe9, 0x92, 0x97, 0x82,
0x8c, 0x49, 0xef, 0x35, 0x1f, 0x8b, 0x88, 0x05, 0xc2, 0xa9, 0xe2, 0x09, 0x3f, 0x28, 0x9e, 0xf0, 0x9b, 0x42, 0x7f, 0x9c, 0x71, 0xfb, 0x1c, 0xcc, 0x12, 0x2e, 0x21, 0x58, 0x93, 0x04, 0x15, 0xdc,
0xd1, 0x72, 0xf2, 0x8a, 0xcb, 0x61, 0xc4, 0x82, 0xf4, 0x98, 0x1b, 0x59, 0x8c, 0xc2, 0x04, 0xf9, 0xce, 0x91, 0x09, 0xc9, 0x63, 0x68, 0x07, 0x74, 0x46, 0xb8, 0xfb, 0x9a, 0x8e, 0x59, 0x44, 0x02,
0x04, 0xee, 0xad, 0x37, 0x99, 0xf2, 0x85, 0x64, 0xc2, 0xb1, 0x5a, 0x46, 0x9b, 0xd0, 0xf5, 0xde, 0x66, 0xd5, 0x65, 0x86, 0xef, 0x97, 0x33, 0x7c, 0xb4, 0x98, 0xbc, 0xa2, 0x7c, 0x18, 0x91, 0x20,
0xcf, 0x11, 0xdd, 0x20, 0xa2, 0x3a, 0xe1, 0x40, 0xcb, 0x68, 0x6b, 0x39, 0x11, 0xa5, 0x09, 0x25, 0x4d, 0x73, 0x2b, 0xf3, 0x11, 0x3a, 0x86, 0x3e, 0x83, 0xcd, 0xfc, 0x90, 0x29, 0xf5, 0x38, 0x61,
0x2b, 0x0a, 0x85, 0x57, 0x90, 0x55, 0xfb, 0x37, 0xb2, 0xb2, 0x98, 0xb5, 0xac, 0xf5, 0x26, 0xa9, 0x96, 0xb1, 0xa7, 0x75, 0x10, 0xce, 0xcf, 0x7e, 0x2e, 0xb5, 0x2b, 0x40, 0xc9, 0x8e, 0x59, 0xb0,
0xac, 0x7a, 0x22, 0x2b, 0x83, 0x73, 0x59, 0x6b, 0x62, 0x2a, 0xab, 0x91, 0xc8, 0xca, 0xe0, 0x54, 0xa7, 0x75, 0x94, 0x02, 0x28, 0xa9, 0x31, 0x41, 0x2b, 0x0a, 0x99, 0x5b, 0xa2, 0xd5, 0xf8, 0x37,
0xd6, 0xd7, 0x00, 0x31, 0x17, 0x5c, 0x8e, 0xe7, 0xea, 0xf4, 0x9b, 0xd8, 0xe3, 0x8f, 0x8a, 0x92, 0xb4, 0x32, 0x9f, 0x9c, 0x56, 0x7e, 0x48, 0x4a, 0xab, 0x99, 0xd0, 0xca, 0xd4, 0x05, 0xad, 0x1c,
0xd6, 0xf5, 0xd3, 0xa1, 0x8a, 0x77, 0xe6, 0x05, 0x92, 0x5a, 0x71, 0xb6, 0xdc, 0x2c, 0xc0, 0x7b, 0x98, 0xd2, 0x6a, 0x25, 0xb4, 0x32, 0x75, 0x4a, 0xeb, 0x1b, 0x80, 0x98, 0x32, 0xca, 0xc7, 0x73,
0x6f, 0x17, 0xe0, 0x17, 0x60, 0xad, 0xa3, 0x36, 0x3b, 0xb5, 0x02, 0xc6, 0xcb, 0xee, 0xd0, 0xd6, 0x91, 0xfd, 0xf6, 0x7a, 0xbf, 0xe6, 0xf5, 0xd3, 0xc5, 0x02, 0x77, 0xe6, 0x06, 0x1c, 0x1b, 0x71,
0x88, 0x09, 0x7a, 0x7f, 0x60, 0xeb, 0x79, 0xb7, 0x1a, 0x47, 0x15, 0x28, 0xa3, 0xe6, 0xa3, 0x3a, 0xf6, 0xb9, 0x5a, 0x80, 0x9b, 0x77, 0x0b, 0xf0, 0x4b, 0x30, 0x72, 0xaf, 0xd5, 0x4e, 0xad, 0x81,
0x40, 0x7e, 0xed, 0xee, 0x33, 0x80, 0xfc, 0x7c, 0x54, 0xe5, 0x85, 0xd7, 0xd7, 0x82, 0x27, 0xa5, 0xf6, 0xb2, 0x37, 0x34, 0x15, 0xa4, 0x83, 0xda, 0x1f, 0x98, 0x6a, 0xd1, 0xad, 0xda, 0x51, 0x0d,
0xbc, 0x4d, 0x53, 0x4b, 0xe1, 0x0b, 0x1e, 0xcc, 0xe4, 0x1c, 0x2b, 0xb8, 0x41, 0x53, 0xcb, 0xfd, 0xaa, 0x92, 0xf3, 0x51, 0x13, 0xa0, 0x78, 0x76, 0xfb, 0x19, 0x40, 0x91, 0x1f, 0x51, 0x79, 0xe1,
0x4b, 0x03, 0x18, 0x79, 0x3e, 0x1f, 0xf2, 0xd8, 0xe3, 0xe2, 0xfd, 0xfb, 0xef, 0x00, 0x2a, 0x02, 0xf5, 0x35, 0xa3, 0x49, 0x29, 0x6f, 0xe1, 0x54, 0x12, 0x7a, 0x8f, 0x06, 0x33, 0x3e, 0x97, 0x15,
0x5b, 0x5f, 0x38, 0x3a, 0x46, 0x90, 0x62, 0x44, 0x32, 0x15, 0xd2, 0x90, 0x8c, 0x48, 0xbe, 0x04, 0xdc, 0xc2, 0xa9, 0x64, 0xff, 0xaa, 0x02, 0x8c, 0x5c, 0x9f, 0x0e, 0x69, 0xec, 0x52, 0xf6, 0xfe,
0x8b, 0xa7, 0x0d, 0x2f, 0x1c, 0x03, 0xa3, 0x76, 0x8a, 0x51, 0xd9, 0x34, 0x48, 0xe3, 0x72, 0x32, 0xfd, 0x77, 0x00, 0x35, 0x26, 0x5b, 0x9f, 0x59, 0xaa, 0xf4, 0x40, 0x65, 0x8f, 0x64, 0x2a, 0xa4,
0xf9, 0x0a, 0x60, 0x9e, 0x1d, 0xbc, 0x70, 0x4a, 0x18, 0x7a, 0xff, 0x9d, 0xd7, 0x92, 0xc6, 0x16, 0x2e, 0x19, 0x10, 0x7d, 0x05, 0x06, 0x4d, 0x1b, 0x9e, 0x59, 0x9a, 0xf4, 0xda, 0x2e, 0x7b, 0x65,
0xe8, 0xee, 0x13, 0x28, 0xe3, 0x17, 0xa8, 0xe9, 0x89, 0x13, 0x57, 0x4b, 0xa6, 0xa7, 0x5a, 0x6f, 0xd3, 0x20, 0xf5, 0x2b, 0xc0, 0xe8, 0x6b, 0x80, 0x79, 0x96, 0x78, 0x66, 0x55, 0xa4, 0xeb, 0xbd,
0xce, 0x11, 0x2b, 0x9d, 0x23, 0xee, 0x53, 0x30, 0x2f, 0x92, 0xef, 0x7c, 0xdf, 0x83, 0x71, 0x7f, 0xb7, 0x3e, 0x4b, 0xea, 0x5b, 0x82, 0x8b, 0xb0, 0x7e, 0x3a, 0x65, 0x99, 0x55, 0x5d, 0x0f, 0x9b,
0xd2, 0xa0, 0x8e, 0x78, 0x8f, 0xc9, 0xc9, 0x9c, 0xc7, 0xe4, 0xc9, 0xc6, 0x83, 0xf1, 0xf0, 0x4e, 0x8d, 0xe0, 0x2c, 0x6c, 0x0e, 0xb6, 0x1f, 0x43, 0x55, 0xde, 0x5d, 0xcc, 0x5d, 0xb9, 0x90, 0x94,
0x7c, 0xca, 0xeb, 0x14, 0x1e, 0x8a, 0x4c, 0xa8, 0xfe, 0x2e, 0xa1, 0x46, 0x51, 0x68, 0x1b, 0x4a, 0x64, 0xee, 0x8a, 0xef, 0xd5, 0x09, 0x64, 0xa4, 0x13, 0xc8, 0x7e, 0x0a, 0xfa, 0x45, 0x92, 0xa1,
0x38, 0xf6, 0x4d, 0xd0, 0xbb, 0x2f, 0x92, 0x3a, 0xea, 0x77, 0x5f, 0x24, 0x75, 0x44, 0xd5, 0xa8, 0xf7, 0x4d, 0xa9, 0xfd, 0xb3, 0x02, 0x4d, 0xa9, 0x77, 0x08, 0x9f, 0xcc, 0x69, 0x8c, 0x1e, 0xaf,
0x57, 0x00, 0xed, 0xda, 0x86, 0xfb, 0x8b, 0xa6, 0x8a, 0x8f, 0x4d, 0x55, 0xed, 0x09, 0xf2, 0x7f, 0xac, 0x8d, 0x07, 0x6b, 0xfe, 0x29, 0xae, 0xbb, 0xba, 0x34, 0x4a, 0x9b, 0xf3, 0x0e, 0x51, 0xad,
0xa8, 0x08, 0xc9, 0xa3, 0xb1, 0x2f, 0x50, 0x97, 0x41, 0x4d, 0x65, 0xf6, 0x84, 0x4a, 0x7d, 0xbd, 0x4c, 0xb4, 0x03, 0x15, 0xb9, 0x30, 0x74, 0x50, 0x7b, 0x2f, 0x92, 0x0a, 0xec, 0xf7, 0x5e, 0x24,
0x0c, 0x26, 0x59, 0x6a, 0xb5, 0x26, 0x1f, 0x40, 0x55, 0x48, 0x16, 0x4b, 0xc5, 0x4e, 0x86, 0x6a, 0x15, 0x88, 0xc5, 0x92, 0x10, 0x0a, 0xdc, 0x33, 0x35, 0xfb, 0x37, 0x45, 0x94, 0x2d, 0x99, 0x8a,
0x05, 0xed, 0x9e, 0x20, 0xf7, 0xc1, 0xe4, 0xc1, 0x74, 0x8c, 0x97, 0xa2, 0x1c, 0x65, 0x1e, 0x4c, 0xaa, 0x65, 0xe8, 0x43, 0xa8, 0x31, 0x4e, 0xa3, 0xb1, 0xcf, 0x24, 0x2f, 0x0d, 0xeb, 0x42, 0x74,
0x7b, 0x82, 0xec, 0x42, 0x75, 0x16, 0x87, 0xcb, 0xc8, 0x0b, 0x66, 0x4e, 0xb9, 0x65, 0xb4, 0x2d, 0x98, 0x08, 0x7d, 0xbd, 0x08, 0x26, 0x59, 0x68, 0xf1, 0x8d, 0x3e, 0x82, 0x3a, 0xe3, 0x24, 0xe6,
0xba, 0xb6, 0x49, 0x13, 0xf4, 0xab, 0x15, 0x0e, 0xb6, 0x2a, 0xd5, 0xaf, 0x56, 0x6a, 0xf7, 0x98, 0x02, 0x9d, 0x8c, 0xe3, 0x9a, 0x94, 0x1d, 0x86, 0xee, 0x81, 0x4e, 0x83, 0xe9, 0x58, 0x3e, 0xa7,
0x05, 0x33, 0xae, 0x36, 0xa9, 0x24, 0xbb, 0xa3, 0xdd, 0x13, 0xee, 0xef, 0x1a, 0x94, 0x8f, 0xe7, 0x30, 0x54, 0x69, 0x30, 0x75, 0x18, 0xda, 0x81, 0xfa, 0x2c, 0x0e, 0x17, 0x91, 0x1b, 0xcc, 0xe4,
0xcb, 0xe0, 0x15, 0xd9, 0x83, 0x9a, 0xef, 0x05, 0x63, 0xd5, 0x4a, 0xb9, 0x66, 0xcb, 0xf7, 0x02, 0x5b, 0x19, 0x38, 0x97, 0x51, 0x1b, 0xd4, 0xab, 0xa5, 0x1c, 0x89, 0x75, 0xac, 0x5e, 0x2d, 0xc5,
0x55, 0xc3, 0x3d, 0x81, 0x7e, 0x76, 0xb3, 0xf6, 0xa7, 0x6f, 0x8d, 0xcf, 0x6e, 0x52, 0x7f, 0x27, 0xe9, 0x31, 0x09, 0x66, 0x54, 0x1c, 0x52, 0x4b, 0x4e, 0x97, 0xb2, 0xc3, 0xec, 0x3f, 0x15, 0xa8,
0xbd, 0x04, 0x03, 0x2f, 0x61, 0xb7, 0x78, 0x09, 0x98, 0xa0, 0xd3, 0x0d, 0x26, 0xe1, 0xd4, 0x0b, 0x1e, 0xcf, 0x17, 0xc1, 0x2b, 0xb4, 0x0b, 0x0d, 0xdf, 0x0d, 0xc6, 0xa2, 0x09, 0x0b, 0xce, 0x86,
0x66, 0xf9, 0x0d, 0xa8, 0x37, 0x1c, 0xbf, 0xaa, 0x4e, 0x71, 0xed, 0x3e, 0x87, 0x6a, 0xc6, 0xba, 0xef, 0x06, 0xa2, 0xfa, 0x1d, 0x26, 0xed, 0xe4, 0x26, 0xb7, 0xa7, 0x5b, 0xca, 0x27, 0x37, 0xa9,
0xd3, 0xbc, 0xdf, 0x0d, 0xd4, 0x13, 0xbb, 0xf1, 0xae, 0xea, 0xe4, 0x7f, 0x70, 0xef, 0xe4, 0x62, 0xbd, 0x9b, 0x3e, 0x82, 0x26, 0x1f, 0x61, 0xa7, 0xfc, 0x08, 0x32, 0x40, 0xb7, 0x17, 0x4c, 0xc2,
0x70, 0x38, 0x1a, 0x17, 0x1e, 0x5b, 0xf7, 0x07, 0x68, 0x60, 0x46, 0x3e, 0xfd, 0xaf, 0xad, 0xb7, 0xa9, 0x1b, 0xcc, 0x8a, 0x17, 0x10, 0xc5, 0x23, 0x6f, 0xd5, 0xc4, 0xf2, 0xdb, 0x7e, 0x0e, 0xf5,
0x0f, 0xe6, 0x44, 0xed, 0x90, 0x75, 0xde, 0xf6, 0x9d, 0xaf, 0xc9, 0x02, 0x12, 0xda, 0xd1, 0xce, 0x0c, 0xb5, 0xd6, 0xf6, 0xdf, 0x0f, 0xc4, 0x72, 0x5e, 0xd9, 0xc8, 0x2a, 0xfa, 0x00, 0x36, 0x4f,
0x9b, 0xdb, 0x3d, 0xed, 0xb7, 0xdb, 0x3d, 0xed, 0xcf, 0xdb, 0x3d, 0xed, 0x7b, 0x53, 0xb1, 0xa3, 0x2e, 0x06, 0x87, 0xa3, 0x71, 0x69, 0x4d, 0xdb, 0x3f, 0x42, 0x4b, 0x46, 0xa4, 0xd3, 0xff, 0xda,
0xab, 0x2b, 0x13, 0x7f, 0x71, 0x3e, 0xff, 0x3b, 0x00, 0x00, 0xff, 0xff, 0xfb, 0x5f, 0xf2, 0x4d, 0xb4, 0xfb, 0xa0, 0x4f, 0xc4, 0x09, 0x59, 0xcf, 0x6e, 0xad, 0xdd, 0x26, 0x73, 0x48, 0x60, 0x47,
0x13, 0x09, 0x00, 0x00, 0xdb, 0x6f, 0x6e, 0x77, 0x95, 0x3f, 0x6e, 0x77, 0x95, 0xbf, 0x6f, 0x77, 0x95, 0x1f, 0x74, 0x81,
0x8e, 0xae, 0xae, 0x74, 0xf9, 0x07, 0xf8, 0xe4, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xb0,
0xf6, 0x08, 0x32, 0x0a, 0x00, 0x00,
} }
func (m *MetricMetadata) Marshal() (dAtA []byte, err error) { func (m *MetricMetadata) Marshal() (dAtA []byte, err error) {
@ -1271,6 +1393,52 @@ func (m *MetricMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *Metadata) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Metadata) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Unit) > 0 {
i -= len(m.Unit)
copy(dAtA[i:], m.Unit)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Unit)))
i--
dAtA[i] = 0x1a
}
if len(m.Help) > 0 {
i -= len(m.Help)
copy(dAtA[i:], m.Help)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Help)))
i--
dAtA[i] = 0x12
}
if m.Type != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.Type))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *Sample) Marshal() (dAtA []byte, err error) { func (m *Sample) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -1630,6 +1798,20 @@ func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized) i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized)
} }
if len(m.Metadatas) > 0 {
for iNdEx := len(m.Metadatas) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Metadatas[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
}
}
if len(m.Histograms) > 0 { if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- { for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{ {
@ -2032,6 +2214,29 @@ func (m *MetricMetadata) Size() (n int) {
return n return n
} }
func (m *Metadata) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Type != 0 {
n += 1 + sovTypes(uint64(m.Type))
}
l = len(m.Help)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
l = len(m.Unit)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Sample) Size() (n int) { func (m *Sample) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -2223,6 +2428,12 @@ func (m *TimeSeries) Size() (n int) {
n += 1 + l + sovTypes(uint64(l)) n += 1 + l + sovTypes(uint64(l))
} }
} }
if len(m.Metadatas) > 0 {
for _, e := range m.Metadatas {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -2548,6 +2759,140 @@ func (m *MetricMetadata) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *Metadata) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Metadata: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Metadata: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
}
m.Type = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Type |= Metadata_MetricType(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Help = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Unit = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Sample) Unmarshal(dAtA []byte) error { func (m *Sample) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0
@ -3526,6 +3871,40 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metadatas", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Metadatas = append(m.Metadatas, Metadata{})
if err := m.Metadatas[len(m.Metadatas)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:]) skippy, err := skipTypes(dAtA[iNdEx:])

View file

@ -38,6 +38,26 @@ message MetricMetadata {
string unit = 5; string unit = 5;
} }
message Metadata {
// The protobuf style guide recommends setting enum values as uppercase.
// Having them lowercase would save us an instruction every time we convert
// from textparse.MetricType.
// https://protobuf.dev/programming-guides/style/#enums
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
MetricType type = 1;
string help = 2;
string unit = 3;
}
message Sample { message Sample {
double value = 1; double value = 1;
// timestamp is in ms format, see model/timestamp/timestamp.go for // timestamp is in ms format, see model/timestamp/timestamp.go for
@ -127,6 +147,7 @@ message TimeSeries {
repeated Sample samples = 2 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
repeated Metadata metadatas = 5 [(gogoproto.nullable) = false];
} }
message Label { message Label {

View file

@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -624,6 +625,14 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
} }
} }
func metadataProtoToMetadata(mp prompb.Metadata) metadata.Metadata {
return metadata.Metadata{
Type: metricTypeFromProtoEquivalent(mp.Type),
Unit: mp.Unit,
Help: mp.Help,
}
}
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the // HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message // provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics. // represents an integer histogram and not a float histogram, or it panics.
@ -799,6 +808,21 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M
return prompb.MetricMetadata_MetricType(v) return prompb.MetricMetadata_MetricType(v)
} }
func metricTypeToProtoEquivalent(t textparse.MetricType) prompb.Metadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.Metadata_MetricType_value[mt]
if !ok {
return prompb.Metadata_UNKNOWN
}
return prompb.Metadata_MetricType(v)
}
func metricTypeFromProtoEquivalent(t prompb.Metadata_MetricType) textparse.MetricType {
mt := strings.ToLower(t.String())
return textparse.MetricType(mt) // TODO(@tpaschalis) a better way for this?
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression. // snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {

View file

@ -410,6 +410,7 @@ type QueueManager struct {
relabelConfigs []*relabel.Config relabelConfigs []*relabel.Config
sendExemplars bool sendExemplars bool
sendNativeHistograms bool sendNativeHistograms bool
sendMetadata bool
watcher *wlog.Watcher watcher *wlog.Watcher
metadataWatcher *MetadataWatcher metadataWatcher *MetadataWatcher
@ -459,6 +460,7 @@ func NewQueueManager(
sm ReadyScrapeManager, sm ReadyScrapeManager,
enableExemplarRemoteWrite bool, enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool, enableNativeHistogramRemoteWrite bool,
enableMetadataRemoteWrite bool,
) *QueueManager { ) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
@ -481,6 +483,7 @@ func NewQueueManager(
storeClient: client, storeClient: client,
sendExemplars: enableExemplarRemoteWrite, sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite,
sendMetadata: enableMetadataRemoteWrite,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -500,7 +503,7 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp, highestRecvTimestamp: highestRecvTimestamp,
} }
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, enableMetadataRemoteWrite)
if t.mcfg.Send { if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
} }
@ -774,7 +777,7 @@ outer:
} }
func (t *QueueManager) AppendWALMetadata(ms []record.RefMetadata) bool { func (t *QueueManager) AppendWALMetadata(ms []record.RefMetadata) bool {
if !t.mcfg.SendFromWAL { if !t.sendMetadata {
return true return true
} }
@ -784,16 +787,16 @@ outer:
lbls, ok := t.seriesLabels[m.Ref] lbls, ok := t.seriesLabels[m.Ref]
if !ok { if !ok {
t.metrics.droppedMetadataTotal.Inc() t.metrics.droppedMetadataTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc. // Track dropped metadata in the same EWMA for sharding calc.
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[m.Ref]; !ok { if _, ok := t.droppedSeries[m.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", m.Ref) level.Info(t.logger).Log("msg", "Dropped metadata for series that was not explicitly dropped via relabelling", "ref", m.Ref)
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff backoff := t.cfg.MinBackoff
for { for {
select { select {
@ -802,12 +805,12 @@ outer:
default: default:
} }
if t.shards.enqueue(m.Ref, timeSeries{ if t.shards.enqueue(m.Ref, timeSeries{
seriesLabels: lbls,
sType: tMetadata, sType: tMetadata,
seriesLabels: lbls, // TODO (@tpaschalis) We take the labels here so we can refer to the metric's name on populateTimeSeries. There's probably a better way to do that.
metadata: &metadata.Metadata{ metadata: &metadata.Metadata{
Type: record.ToTextparseMetricType(m.Type),
Help: m.Help, Help: m.Help,
Unit: m.Unit, Unit: m.Unit,
Type: record.ToTextparseMetricType(m.Type),
}, },
}) { }) {
continue outer continue outer
@ -1180,6 +1183,7 @@ func (s *shards) start(n int) {
s.samplesDroppedOnHardShutdown.Store(0) s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0)
s.histogramsDroppedOnHardShutdown.Store(0) s.histogramsDroppedOnHardShutdown.Store(0)
s.metadataDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i]) go s.runShard(hardShutdownCtx, i, newQueues[i])
} }
@ -1416,6 +1420,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pBuf = proto.NewBuffer(nil) pBuf = proto.NewBuffer(nil)
buf []byte buf []byte
) )
// TODO(@tpaschalis) Should we also raise the max if we have WAL metadata?
if s.qm.sendExemplars { if s.qm.sendExemplars {
max += int(float64(max) * 0.1) max += int(float64(max) * 0.1)
} }
@ -1429,11 +1434,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
// TODO(@tpaschalis) Since metadata might appear infrequently, I'm not sure
// whether it's cheaper to pre-allocate, or initialize the slice as empty
// and append whenever metadata is encountered.
pendingMetadata := make([]prompb.MetricMetadata, s.qm.mcfg.MaxSamplesPerSend)
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() { stop := func() {
if !timer.Stop() { if !timer.Stop() {
@ -1471,10 +1471,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok { if !ok {
return return
} }
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata) nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms + nPendingMetadata
s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
stop() stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1482,12 +1482,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C: case <-timer.C:
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata) nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms + nPendingMetadata
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms, "metadata", nPendingMetadata)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1495,7 +1494,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, pendingMetadata []prompb.MetricMetadata) (int, int, int, int) { func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1505,6 +1504,9 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
if s.qm.sendNativeHistograms { if s.qm.sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
} }
if s.qm.sendMetadata {
pendingData[nPending].Metadatas = pendingData[nPending].Metadatas[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
@ -1531,19 +1533,20 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++ nPendingHistograms++
case tMetadata: case tMetadata:
pendingMetadata[nPendingMetadata].MetricFamilyName = d.seriesLabels.Get(labels.MetricName) pendingData[nPending].Metadatas = append(pendingData[nPending].Metadatas, prompb.Metadata{
pendingMetadata[nPendingMetadata].Type = metricTypeToMetricTypeProto(d.metadata.Type) Type: metricTypeToProtoEquivalent(d.metadata.Type),
pendingMetadata[nPendingMetadata].Help = d.metadata.Help Help: d.metadata.Help,
pendingMetadata[nPendingMetadata].Unit = d.metadata.Unit Unit: d.metadata.Unit,
})
nPendingMetadata++ nPendingMetadata++
} }
} }
return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) { func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now() begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, metadata, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf) err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf)
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
@ -1570,12 +1573,9 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, m
} }
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
// TODO(@tpaschalis) If we're going to reuse this method for metadata as well, func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
// we need a better name.
// TODO(@tpaschalis) Add metadata-specific metrics and attributes.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, metadata, pBuf, *buf) req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
if err != nil { if err != nil {
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
@ -1606,6 +1606,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if histogramCount > 0 { if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount)) span.SetAttributes(attribute.Int("histograms", histogramCount))
} }
if metadataCount > 0 {
span.SetAttributes(attribute.Int("metadata", metadataCount))
}
begin := time.Now() begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.samplesTotal.Add(float64(sampleCount))

View file

@ -66,12 +66,14 @@ func TestSampleDelivery(t *testing.T) {
exemplars bool exemplars bool
histograms bool histograms bool
floatHistograms bool floatHistograms bool
metadata bool
}{ }{
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"}, {samples: true, exemplars: false, histograms: false, floatHistograms: false, metadata: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"}, {samples: true, exemplars: true, histograms: true, floatHistograms: true, metadata: false, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"}, {samples: false, exemplars: true, histograms: false, floatHistograms: false, metadata: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, {samples: false, exemplars: false, histograms: true, floatHistograms: false, metadata: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, {samples: false, exemplars: false, histograms: false, floatHistograms: true, metadata: false, name: "float histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: false, metadata: true, name: "metadata only"},
} }
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
@ -92,6 +94,7 @@ func TestSampleDelivery(t *testing.T) {
writeConfig.QueueConfig = queueConfig writeConfig.QueueConfig = queueConfig
writeConfig.SendExemplars = true writeConfig.SendExemplars = true
writeConfig.SendNativeHistograms = true writeConfig.SendNativeHistograms = true
writeConfig.SendWALMetadata = true
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
@ -108,6 +111,7 @@ func TestSampleDelivery(t *testing.T) {
exemplars []record.RefExemplar exemplars []record.RefExemplar
histograms []record.RefHistogramSample histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample floatHistograms []record.RefFloatHistogramSample
metadata []record.RefMetadata
) )
// Generates same series in both cases. // Generates same series in both cases.
@ -123,6 +127,9 @@ func TestSampleDelivery(t *testing.T) {
if tc.floatHistograms { if tc.floatHistograms {
_, floatHistograms, series = createHistograms(n, n, true) _, floatHistograms, series = createHistograms(n, n, true)
} }
if tc.metadata {
metadata, series = createMetadata(n)
}
// Apply new config. // Apply new config.
queueConfig.Capacity = len(samples) queueConfig.Capacity = len(samples)
@ -142,10 +149,12 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[:len(exemplars)/2], series) c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series) c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series) c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
c.expectTsMetadata(metadata[:len(metadata)/2], series)
qm.Append(samples[:len(samples)/2]) qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2]) qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2]) qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
qm.AppendWALMetadata(metadata[:len(metadata)/2])
c.waitForExpectedData(t) c.waitForExpectedData(t)
// Send second half of data. // Send second half of data.
@ -153,10 +162,12 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[len(exemplars)/2:], series) c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series) c.expectHistograms(histograms[len(histograms)/2:], series)
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series) c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
c.expectTsMetadata(metadata[len(metadata)/2:], series)
qm.Append(samples[len(samples)/2:]) qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:]) qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:]) qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:]) qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
qm.AppendWALMetadata(metadata[len(metadata)/2:])
c.waitForExpectedData(t) c.waitForExpectedData(t)
}) })
} }
@ -171,7 +182,7 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -204,8 +215,6 @@ func TestWALMetadataDelivery(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1 cfg.MaxShards = 1
mcfg := config.DefaultMetadataConfig
mcfg.SendFromWAL = true
writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = cfg writeConfig.QueueConfig = cfg
@ -223,14 +232,13 @@ func TestWALMetadataDelivery(t *testing.T) {
hash, err := toHash(writeConfig) hash, err := toHash(writeConfig)
require.NoError(t, err) require.NoError(t, err)
qm := s.rws.queues[hash] qm := s.rws.queues[hash]
qm.mcfg.SendFromWAL = true qm.sendMetadata = true
qm.mcfg.MaxSamplesPerSend = 10
c := NewTestWriteClient() c := NewTestWriteClient()
qm.SetClient(c) qm.SetClient(c)
qm.StoreSeries(series, 0) qm.StoreSeries(series, 0)
c.expectMetadata(metadata, series) c.expectTsMetadata(metadata, series)
qm.AppendWALMetadata(metadata) qm.AppendWALMetadata(metadata)
c.waitForExpectedData(t) c.waitForExpectedData(t)
@ -250,7 +258,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -292,7 +300,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -312,7 +320,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -350,7 +358,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
for j := 0; j < numSeries; j++ { for j := 0; j < numSeries; j++ {
@ -379,7 +387,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -415,7 +423,7 @@ func TestReshardRaceWithStop(*testing.T) {
go func() { go func() {
for { for {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start() m.Start()
h.Unlock() h.Unlock()
h.Lock() h.Lock()
@ -450,7 +458,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -495,7 +503,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -522,7 +530,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient() c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -569,7 +577,7 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient() client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.numShards = c.startingShards m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn) m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut) m.dataOut.incr(c.samplesOut)
@ -697,7 +705,7 @@ func createMetadata(numMetadata int) ([]record.RefMetadata, []record.RefSeries)
}) })
series = append(series, record.RefSeries{ series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i), Ref: chunks.HeadSeriesRef(i),
Labels: labels.Labels{{Name: "__name__", Value: name}}, Labels: labels.Labels{{Name: "__name__", Value: name}, {Name: "foo", Value: "bar"}},
}) })
} }
@ -717,8 +725,9 @@ type TestWriteClient struct {
receivedFloatHistograms map[string][]prompb.Histogram receivedFloatHistograms map[string][]prompb.Histogram
expectedHistograms map[string][]prompb.Histogram expectedHistograms map[string][]prompb.Histogram
expectedFloatHistograms map[string][]prompb.Histogram expectedFloatHistograms map[string][]prompb.Histogram
receivedTsMetadata map[string][]prompb.Metadata
expectedTsMetadata map[string][]prompb.Metadata
receivedMetadata map[string][]prompb.MetricMetadata receivedMetadata map[string][]prompb.MetricMetadata
expectedMetadata map[string][]prompb.MetricMetadata
writesReceived int writesReceived int
withWaitGroup bool withWaitGroup bool
wg sync.WaitGroup wg sync.WaitGroup
@ -811,19 +820,19 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
c.wg.Add(len(fhs)) c.wg.Add(len(fhs))
} }
func (c *TestWriteClient) expectMetadata(ms []record.RefMetadata, series []record.RefSeries) { func (c *TestWriteClient) expectTsMetadata(ms []record.RefMetadata, series []record.RefSeries) {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
} }
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
c.expectedMetadata = map[string][]prompb.MetricMetadata{} c.expectedTsMetadata = map[string][]prompb.Metadata{}
c.receivedMetadata = map[string][]prompb.MetricMetadata{} c.receivedTsMetadata = map[string][]prompb.Metadata{}
for _, m := range ms { for _, m := range ms {
seriesName := getSeriesNameFromRef(series[m.Ref]) seriesName := getSeriesNameFromRef(series[m.Ref])
c.expectedMetadata[seriesName] = append(c.expectedMetadata[seriesName], MetadataToMetadataProto(seriesName, m)) c.expectedTsMetadata[seriesName] = append(c.expectedTsMetadata[seriesName], prompb.Metadata{Type: metricTypeToProtoEquivalent(record.ToTextparseMetricType(m.Type)), Unit: m.Unit, Help: m.Help})
} }
c.wg.Add(len(ms)) c.wg.Add(len(ms))
} }
@ -847,8 +856,8 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
for ts, expectedFloatHistogram := range c.expectedFloatHistograms { for ts, expectedFloatHistogram := range c.expectedFloatHistograms {
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts) require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts)
} }
for ts, expectedMetadata := range c.expectedMetadata { for ts, expectedMetadata := range c.expectedTsMetadata {
require.Equal(tb, expectedMetadata, c.receivedMetadata[ts], ts) require.Equal(tb, expectedMetadata, c.receivedTsMetadata[ts], ts)
} }
} }
@ -890,18 +899,20 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
} else { } else {
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram) c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram)
} }
}
for _, metadata := range ts.Metadatas {
count++
c.receivedTsMetadata[seriesName] = append(c.receivedTsMetadata[seriesName], metadata)
} }
} }
for _, m := range reqProto.Metadata {
count++
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
}
if c.withWaitGroup { if c.withWaitGroup {
c.wg.Add(-count) c.wg.Add(-count)
} }
for _, m := range reqProto.Metadata {
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
}
c.writesReceived++ c.writesReceived++
return nil return nil
@ -989,7 +1000,7 @@ func BenchmarkSampleSend(b *testing.B) {
dir := b.TempDir() dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -1035,7 +1046,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false) cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()
@ -1118,7 +1129,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
// Need to start the queue manager so the proper metrics are initialized. // Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual // However we can stop it right away since we don't need to do any actual
@ -1195,7 +1206,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for _, tc := range []struct { for _, tc := range []struct {
name string name string

View file

@ -196,6 +196,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.scraper, rws.scraper,
rwConf.SendExemplars, rwConf.SendExemplars,
rwConf.SendNativeHistograms, rwConf.SendNativeHistograms,
rwConf.SendWALMetadata,
) )
// Keep track of which queues are new so we know which to start. // Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash) newHashes = append(newHashes, hash)

View file

@ -145,6 +145,11 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
return err return err
} }
} }
for _, mp := range ts.Metadatas {
m := metadataProtoToMetadata(mp)
app.UpdateMetadata(0, labels, m)
}
} }
if outOfOrderExemplarErrs > 0 { if outOfOrderExemplarErrs > 0 {

View file

@ -78,6 +78,7 @@ type Watcher struct {
lastCheckpoint string lastCheckpoint string
sendExemplars bool sendExemplars bool
sendHistograms bool sendHistograms bool
sendMetadata bool
metrics *WatcherMetrics metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics readerMetrics *LiveReaderMetrics
@ -148,7 +149,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
} }
// NewWatcher creates a new WAL watcher for a given WriteTo. // NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher { func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -603,6 +604,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
} }
case record.Metadata: case record.Metadata:
if !w.sendHistograms || !tail {
break
}
meta, err := dec.Metadata(rec, metadata[:0]) meta, err := dec.Metadata(rec, metadata[:0])
if err != nil { if err != nil {
w.recordDecodeFailsMetric.Inc() w.recordDecodeFailsMetric.Inc()