Revert "Change logic around AppendWALMetadata; introduce StoreMetadata similar to StoreSeries"

This reverts commit bd41c24e42.
This commit is contained in:
Paschalis Tsilias 2023-04-28 10:23:21 +03:00
parent bd41c24e42
commit 3acfa1bbfc
9 changed files with 323 additions and 164 deletions

View file

@ -776,7 +776,7 @@ type TimeSeries struct {
Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"`
Exemplars []Exemplar `protobuf:"bytes,3,rep,name=exemplars,proto3" json:"exemplars"`
Histograms []Histogram `protobuf:"bytes,4,rep,name=histograms,proto3" json:"histograms"`
Metadata Metadata `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata"`
Metadatas []Metadata `protobuf:"bytes,5,rep,name=metadatas,proto3" json:"metadatas"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -843,11 +843,11 @@ func (m *TimeSeries) GetHistograms() []Histogram {
return nil
}
func (m *TimeSeries) GetMetadata() Metadata {
func (m *TimeSeries) GetMetadatas() []Metadata {
if m != nil {
return m.Metadata
return m.Metadatas
}
return Metadata{}
return nil
}
type Label struct {
@ -1266,78 +1266,78 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{
// 1130 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdb, 0x6e, 0xdb, 0x46,
0x13, 0x36, 0x49, 0x89, 0x12, 0x47, 0x07, 0xd3, 0xfb, 0x3b, 0xf9, 0x59, 0xa3, 0x71, 0x54, 0x02,
0x69, 0x85, 0xa2, 0x90, 0x11, 0xa7, 0x28, 0x1a, 0x34, 0x28, 0x60, 0x3b, 0xf2, 0x01, 0x35, 0x25,
0x64, 0x25, 0xa3, 0x4d, 0x6f, 0x84, 0xb5, 0xb4, 0x96, 0x88, 0xf0, 0x54, 0xee, 0x2a, 0xb0, 0xfa,
0x1e, 0xbd, 0xeb, 0x45, 0x5f, 0xa1, 0x6f, 0x11, 0xa0, 0x37, 0xed, 0x0b, 0x14, 0x85, 0x9f, 0xa4,
0xd8, 0xe5, 0x51, 0x76, 0x02, 0x34, 0xbd, 0xea, 0xdd, 0xce, 0xcc, 0x37, 0x3b, 0xdf, 0x0e, 0xe7,
0x40, 0x68, 0xf0, 0x55, 0x44, 0x59, 0x2f, 0x8a, 0x43, 0x1e, 0x22, 0x88, 0xe2, 0xd0, 0xa7, 0x7c,
0x41, 0x97, 0x6c, 0x67, 0x7b, 0x1e, 0xce, 0x43, 0xa9, 0xde, 0x13, 0xa7, 0x04, 0x61, 0xff, 0xac,
0x42, 0xdb, 0xa1, 0x3c, 0x76, 0xa7, 0x0e, 0xe5, 0x64, 0x46, 0x38, 0x41, 0x4f, 0xa1, 0x22, 0xee,
0xb0, 0x94, 0x8e, 0xd2, 0x6d, 0xef, 0x3f, 0xea, 0x15, 0x77, 0xf4, 0xd6, 0x91, 0xa9, 0x38, 0x5e,
0x45, 0x14, 0x4b, 0x17, 0xf4, 0x19, 0x20, 0x5f, 0xea, 0x26, 0x57, 0xc4, 0x77, 0xbd, 0xd5, 0x24,
0x20, 0x3e, 0xb5, 0xd4, 0x8e, 0xd2, 0x35, 0xb0, 0x99, 0x58, 0x8e, 0xa5, 0x61, 0x40, 0x7c, 0x8a,
0x10, 0x54, 0x16, 0xd4, 0x8b, 0xac, 0x8a, 0xb4, 0xcb, 0xb3, 0xd0, 0x2d, 0x03, 0x97, 0x5b, 0xd5,
0x44, 0x27, 0xce, 0xf6, 0x0a, 0xa0, 0x88, 0x84, 0x1a, 0x50, 0xbb, 0x18, 0x7c, 0x33, 0x18, 0x7e,
0x3b, 0x30, 0x37, 0x84, 0x70, 0x34, 0xbc, 0x18, 0x8c, 0xfb, 0xd8, 0x54, 0x90, 0x01, 0xd5, 0x93,
0x83, 0x8b, 0x93, 0xbe, 0xa9, 0xa2, 0x16, 0x18, 0xa7, 0x67, 0xa3, 0xf1, 0xf0, 0x04, 0x1f, 0x38,
0xa6, 0x86, 0x10, 0xb4, 0xa5, 0xa5, 0xd0, 0x55, 0x84, 0xeb, 0xe8, 0xc2, 0x71, 0x0e, 0xf0, 0x4b,
0xb3, 0x8a, 0xea, 0x50, 0x39, 0x1b, 0x1c, 0x0f, 0x4d, 0x1d, 0x35, 0xa1, 0x3e, 0x1a, 0x1f, 0x8c,
0xfb, 0xa3, 0xfe, 0xd8, 0xac, 0xd9, 0x37, 0x0a, 0xd4, 0xf3, 0xc4, 0x3c, 0x59, 0x4b, 0xcc, 0xc3,
0x5b, 0x89, 0x79, 0x47, 0x4a, 0xb2, 0x47, 0xaa, 0x6f, 0x79, 0xa4, 0xf6, 0xdf, 0x78, 0xe4, 0x33,
0xd0, 0x47, 0xc4, 0x8f, 0x3c, 0x8a, 0xb6, 0xa1, 0xfa, 0x9a, 0x78, 0xcb, 0xe4, 0x89, 0x0a, 0x4e,
0x04, 0xf4, 0x21, 0x18, 0xdc, 0xf5, 0x29, 0xe3, 0xc4, 0x4f, 0xde, 0xa1, 0xe1, 0x42, 0x61, 0x87,
0x50, 0xef, 0x5f, 0x53, 0x3f, 0xf2, 0x48, 0x8c, 0xf6, 0x40, 0xf7, 0xc8, 0x25, 0xf5, 0x98, 0xa5,
0x74, 0xb4, 0x6e, 0x63, 0x7f, 0xab, 0x9c, 0xa3, 0x73, 0x61, 0x39, 0xac, 0xbc, 0xf9, 0xf3, 0xe1,
0x06, 0x4e, 0x61, 0x45, 0x40, 0xf5, 0x9d, 0x01, 0xb5, 0xdb, 0x01, 0x7f, 0xab, 0x82, 0x71, 0xea,
0x32, 0x1e, 0xce, 0x63, 0xe2, 0xa3, 0x07, 0x60, 0x4c, 0xc3, 0x65, 0xc0, 0x27, 0x6e, 0xc0, 0x25,
0xed, 0xca, 0xe9, 0x06, 0xae, 0x4b, 0xd5, 0x59, 0xc0, 0xd1, 0x47, 0xd0, 0x48, 0xcc, 0x57, 0x5e,
0x48, 0x78, 0x12, 0xe6, 0x74, 0x03, 0x83, 0x54, 0x1e, 0x0b, 0x1d, 0x32, 0x41, 0x63, 0x4b, 0x5f,
0xc6, 0x51, 0xb0, 0x38, 0xa2, 0xfb, 0xa0, 0xb3, 0xe9, 0x82, 0xfa, 0x44, 0x96, 0xe6, 0x16, 0x4e,
0x25, 0xf4, 0x08, 0xda, 0x3f, 0xd2, 0x38, 0x9c, 0xf0, 0x45, 0x4c, 0xd9, 0x22, 0xf4, 0x66, 0xb2,
0x4c, 0x15, 0xdc, 0x12, 0xda, 0x71, 0xa6, 0x44, 0x1f, 0xa7, 0xb0, 0x82, 0x97, 0x2e, 0x79, 0x29,
0xb8, 0x29, 0xf4, 0x47, 0x19, 0xb7, 0x4f, 0xc1, 0x2c, 0xe1, 0x12, 0x82, 0x35, 0x49, 0x50, 0xc1,
0xed, 0x1c, 0x99, 0x90, 0x3c, 0x82, 0x76, 0x40, 0xe7, 0x84, 0xbb, 0xaf, 0xe9, 0x84, 0x45, 0x24,
0x60, 0x56, 0x5d, 0x66, 0xf8, 0x7e, 0x39, 0xc3, 0x87, 0xcb, 0xe9, 0x2b, 0xca, 0x47, 0x11, 0x09,
0xd2, 0x34, 0xb7, 0x32, 0x1f, 0xa1, 0x63, 0xe8, 0x13, 0xd8, 0xcc, 0x2f, 0x99, 0x51, 0x8f, 0x13,
0x66, 0x19, 0x1d, 0xad, 0x8b, 0x70, 0x7e, 0xf7, 0x73, 0xa9, 0x5d, 0x03, 0x4a, 0x76, 0xcc, 0x82,
0x8e, 0xd6, 0x55, 0x0a, 0xa0, 0xa4, 0xc6, 0x04, 0xad, 0x28, 0x64, 0x6e, 0x89, 0x56, 0xe3, 0x9f,
0xd0, 0xca, 0x7c, 0x72, 0x5a, 0xf9, 0x25, 0x29, 0xad, 0x66, 0x42, 0x2b, 0x53, 0x17, 0xb4, 0x72,
0x60, 0x4a, 0xab, 0x95, 0xd0, 0xca, 0xd4, 0x29, 0xad, 0xaf, 0x01, 0x62, 0xca, 0x28, 0x9f, 0x2c,
0x44, 0xf6, 0xdb, 0x77, 0xfb, 0x35, 0xaf, 0x9f, 0x1e, 0x16, 0xb8, 0x53, 0x37, 0xe0, 0xd8, 0x88,
0xb3, 0xe3, 0x7a, 0x01, 0x6e, 0xde, 0x2e, 0xc0, 0xcf, 0xc1, 0xc8, 0xbd, 0xd6, 0x3b, 0xb5, 0x06,
0xda, 0xcb, 0xfe, 0xc8, 0x54, 0x90, 0x0e, 0xea, 0x60, 0x68, 0xaa, 0x45, 0xb7, 0x6a, 0x87, 0x35,
0xa8, 0x4a, 0xce, 0x87, 0x4d, 0x80, 0xe2, 0xb3, 0xdb, 0xcf, 0x00, 0x8a, 0xfc, 0x88, 0xca, 0x0b,
0xaf, 0xae, 0x18, 0x4d, 0x4a, 0x79, 0x0b, 0xa7, 0x92, 0xd0, 0x7b, 0x34, 0x98, 0xf3, 0x85, 0xac,
0xe0, 0x16, 0x4e, 0x25, 0xfb, 0x17, 0x15, 0x60, 0xec, 0xfa, 0x74, 0x44, 0x63, 0x97, 0xb2, 0xf7,
0xef, 0xbf, 0x7d, 0xa8, 0x31, 0xd9, 0xfa, 0xcc, 0x52, 0xa5, 0x07, 0x2a, 0x7b, 0x24, 0x53, 0x21,
0x75, 0xc9, 0x80, 0xe8, 0x4b, 0x30, 0x68, 0xda, 0xf0, 0xcc, 0xd2, 0xa4, 0xd7, 0x76, 0xd9, 0x2b,
0x9b, 0x06, 0xa9, 0x5f, 0x01, 0x46, 0x5f, 0x01, 0x2c, 0xb2, 0xc4, 0x33, 0xab, 0x22, 0x5d, 0xef,
0xbd, 0xf5, 0xb3, 0xa4, 0xbe, 0x25, 0x38, 0xfa, 0x02, 0xea, 0x7e, 0x3a, 0x65, 0x65, 0xdb, 0xdd,
0x8a, 0x9a, 0x4d, 0xe0, 0xd4, 0x33, 0xc7, 0xda, 0x8f, 0xa1, 0x2a, 0x5f, 0x2e, 0xa6, 0xae, 0x5c,
0x47, 0x4a, 0x32, 0x75, 0xc5, 0x79, 0x7d, 0xfe, 0x18, 0xe9, 0xfc, 0xb1, 0x9f, 0x82, 0x7e, 0x9e,
0xe4, 0xe7, 0x7d, 0x13, 0x6a, 0xff, 0xa4, 0x40, 0x53, 0xea, 0x1d, 0xc2, 0xa7, 0x0b, 0x1a, 0xa3,
0xc7, 0x6b, 0x4b, 0xe3, 0xc1, 0x1d, 0xff, 0x14, 0xd7, 0x5b, 0x5f, 0x19, 0xa5, 0xbd, 0x79, 0x8b,
0xa8, 0x56, 0x26, 0xda, 0x85, 0x8a, 0x5c, 0x17, 0x3a, 0xa8, 0xfd, 0x17, 0x49, 0xfd, 0x0d, 0xfa,
0x2f, 0x92, 0xfa, 0xc3, 0x62, 0x45, 0x08, 0x05, 0xee, 0x9b, 0x9a, 0xfd, 0xab, 0x22, 0x8a, 0x96,
0xcc, 0x44, 0xcd, 0x32, 0xf4, 0x7f, 0xa8, 0x31, 0x4e, 0xa3, 0x89, 0xcf, 0x24, 0x2f, 0x0d, 0xeb,
0x42, 0x74, 0x98, 0x08, 0x7d, 0xb5, 0x0c, 0xa6, 0x59, 0x68, 0x71, 0x46, 0x1f, 0x40, 0x9d, 0x71,
0x12, 0x73, 0x81, 0x4e, 0x86, 0x71, 0x4d, 0xca, 0x0e, 0x43, 0xf7, 0x40, 0xa7, 0xc1, 0x6c, 0x22,
0x3f, 0xa6, 0x30, 0x54, 0x69, 0x30, 0x73, 0x18, 0xda, 0x81, 0xfa, 0x3c, 0x0e, 0x97, 0x91, 0x1b,
0xcc, 0xad, 0x6a, 0x47, 0xeb, 0x1a, 0x38, 0x97, 0x51, 0x1b, 0xd4, 0xcb, 0x95, 0x1c, 0x88, 0x75,
0xac, 0x5e, 0xae, 0xc4, 0xed, 0x31, 0x09, 0xe6, 0x54, 0x5c, 0x52, 0x4b, 0x6e, 0x97, 0xb2, 0xc3,
0xec, 0x3f, 0x14, 0xa8, 0x1e, 0x2d, 0x96, 0xc1, 0x2b, 0xb4, 0x0b, 0x0d, 0xdf, 0x0d, 0x26, 0xa2,
0x05, 0x0b, 0xce, 0x86, 0xef, 0x06, 0xa2, 0xf6, 0x1d, 0x26, 0xed, 0xe4, 0x3a, 0xb7, 0xa7, 0x3b,
0xca, 0x27, 0xd7, 0xa9, 0xbd, 0x97, 0x7e, 0x04, 0x4d, 0x7e, 0x84, 0x9d, 0xf2, 0x47, 0x90, 0x01,
0x7a, 0xfd, 0x60, 0x1a, 0xce, 0xdc, 0x60, 0x5e, 0x7c, 0x01, 0x59, 0x67, 0xe2, 0x55, 0x4d, 0x2c,
0xcf, 0xf6, 0x73, 0xa8, 0x67, 0xa8, 0x3b, 0x4d, 0xff, 0xdd, 0x50, 0xac, 0xe6, 0xb5, 0x7d, 0xac,
0xa2, 0xff, 0xc1, 0xe6, 0xf1, 0xf9, 0xf0, 0x60, 0x3c, 0x29, 0x2d, 0x69, 0xfb, 0x07, 0x68, 0xc9,
0x88, 0x74, 0xf6, 0x6f, 0x5b, 0x76, 0x0f, 0xf4, 0xa9, 0xb8, 0x21, 0xeb, 0xd8, 0xad, 0x3b, 0xaf,
0xc9, 0x1c, 0x12, 0xd8, 0xe1, 0xf6, 0x9b, 0x9b, 0x5d, 0xe5, 0xf7, 0x9b, 0x5d, 0xe5, 0xaf, 0x9b,
0x5d, 0xe5, 0x7b, 0x5d, 0xa0, 0xa3, 0xcb, 0x4b, 0x5d, 0xfe, 0xff, 0x3d, 0xf9, 0x3b, 0x00, 0x00,
0xff, 0xff, 0xbd, 0xa4, 0x41, 0x7d, 0x30, 0x0a, 0x00, 0x00,
// 1126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdd, 0x6e, 0x1b, 0x45,
0x14, 0xce, 0xee, 0xda, 0x6b, 0xef, 0xf1, 0x4f, 0x36, 0x43, 0x5a, 0x96, 0x88, 0xa6, 0x61, 0xa5,
0x82, 0x85, 0x90, 0xa3, 0xa6, 0x5c, 0x50, 0x51, 0x21, 0x25, 0xa9, 0xf3, 0x23, 0xb2, 0xb6, 0x3a,
0x76, 0x04, 0xe5, 0xc6, 0x9a, 0xd8, 0x13, 0x7b, 0xd5, 0xfd, 0x63, 0x67, 0x5c, 0xc5, 0xbc, 0x07,
0x77, 0x5c, 0xf1, 0x06, 0xbc, 0x45, 0x25, 0x6e, 0xe0, 0x05, 0x10, 0xca, 0x93, 0xa0, 0x99, 0xfd,
0x75, 0xdc, 0x4a, 0x94, 0x2b, 0xee, 0xf6, 0x9c, 0xf3, 0x9d, 0x39, 0xdf, 0x9c, 0x39, 0x3f, 0x0b,
0x0d, 0xbe, 0x8c, 0x28, 0xeb, 0x46, 0x71, 0xc8, 0x43, 0x04, 0x51, 0x1c, 0xfa, 0x94, 0xcf, 0xe9,
0x82, 0xed, 0x6c, 0xcf, 0xc2, 0x59, 0x28, 0xd5, 0xfb, 0xe2, 0x2b, 0x41, 0xd8, 0xbf, 0xa8, 0xd0,
0x76, 0x28, 0x8f, 0xdd, 0x89, 0x43, 0x39, 0x99, 0x12, 0x4e, 0xd0, 0x53, 0xa8, 0x88, 0x33, 0x2c,
0x65, 0x4f, 0xe9, 0xb4, 0x0f, 0x1e, 0x75, 0x8b, 0x33, 0xba, 0xab, 0xc8, 0x54, 0x1c, 0x2d, 0x23,
0x8a, 0xa5, 0x0b, 0xfa, 0x02, 0x90, 0x2f, 0x75, 0xe3, 0x6b, 0xe2, 0xbb, 0xde, 0x72, 0x1c, 0x10,
0x9f, 0x5a, 0xea, 0x9e, 0xd2, 0x31, 0xb0, 0x99, 0x58, 0x4e, 0xa4, 0xa1, 0x4f, 0x7c, 0x8a, 0x10,
0x54, 0xe6, 0xd4, 0x8b, 0xac, 0x8a, 0xb4, 0xcb, 0x6f, 0xa1, 0x5b, 0x04, 0x2e, 0xb7, 0xaa, 0x89,
0x4e, 0x7c, 0xdb, 0x4b, 0x80, 0x22, 0x12, 0x6a, 0x40, 0xed, 0xb2, 0xff, 0x6d, 0x7f, 0xf0, 0x5d,
0xdf, 0xdc, 0x10, 0xc2, 0xf1, 0xe0, 0xb2, 0x3f, 0xea, 0x61, 0x53, 0x41, 0x06, 0x54, 0x4f, 0x0f,
0x2f, 0x4f, 0x7b, 0xa6, 0x8a, 0x5a, 0x60, 0x9c, 0x9d, 0x0f, 0x47, 0x83, 0x53, 0x7c, 0xe8, 0x98,
0x1a, 0x42, 0xd0, 0x96, 0x96, 0x42, 0x57, 0x11, 0xae, 0xc3, 0x4b, 0xc7, 0x39, 0xc4, 0x2f, 0xcd,
0x2a, 0xaa, 0x43, 0xe5, 0xbc, 0x7f, 0x32, 0x30, 0x75, 0xd4, 0x84, 0xfa, 0x70, 0x74, 0x38, 0xea,
0x0d, 0x7b, 0x23, 0xb3, 0x66, 0xdf, 0x2a, 0x50, 0xcf, 0x13, 0xf3, 0x64, 0x25, 0x31, 0x0f, 0xef,
0x24, 0xe6, 0x1d, 0x29, 0xc9, 0x2e, 0xa9, 0xbe, 0xe5, 0x92, 0xda, 0xff, 0xe3, 0x92, 0xcf, 0x40,
0x1f, 0x12, 0x3f, 0xf2, 0x28, 0xda, 0x86, 0xea, 0x6b, 0xe2, 0x2d, 0x92, 0x2b, 0x2a, 0x38, 0x11,
0xd0, 0xc7, 0x60, 0x70, 0xd7, 0xa7, 0x8c, 0x13, 0x3f, 0xb9, 0x87, 0x86, 0x0b, 0x85, 0x1d, 0x42,
0xbd, 0x77, 0x43, 0xfd, 0xc8, 0x23, 0x31, 0xda, 0x07, 0xdd, 0x23, 0x57, 0xd4, 0x63, 0x96, 0xb2,
0xa7, 0x75, 0x1a, 0x07, 0x5b, 0xe5, 0x1c, 0x5d, 0x08, 0xcb, 0x51, 0xe5, 0xcd, 0x5f, 0x0f, 0x37,
0x70, 0x0a, 0x2b, 0x02, 0xaa, 0xef, 0x0c, 0xa8, 0xdd, 0x0d, 0xf8, 0x7b, 0x15, 0x8c, 0x33, 0x97,
0xf1, 0x70, 0x16, 0x13, 0x1f, 0x3d, 0x00, 0x63, 0x12, 0x2e, 0x02, 0x3e, 0x76, 0x03, 0x2e, 0x69,
0x57, 0xce, 0x36, 0x70, 0x5d, 0xaa, 0xce, 0x03, 0x8e, 0x3e, 0x81, 0x46, 0x62, 0xbe, 0xf6, 0x42,
0xc2, 0x93, 0x30, 0x67, 0x1b, 0x18, 0xa4, 0xf2, 0x44, 0xe8, 0x90, 0x09, 0x1a, 0x5b, 0xf8, 0x32,
0x8e, 0x82, 0xc5, 0x27, 0xba, 0x0f, 0x3a, 0x9b, 0xcc, 0xa9, 0x4f, 0x64, 0x69, 0x6e, 0xe1, 0x54,
0x42, 0x8f, 0xa0, 0xfd, 0x13, 0x8d, 0xc3, 0x31, 0x9f, 0xc7, 0x94, 0xcd, 0x43, 0x6f, 0x2a, 0xcb,
0x54, 0xc1, 0x2d, 0xa1, 0x1d, 0x65, 0x4a, 0xf4, 0x69, 0x0a, 0x2b, 0x78, 0xe9, 0x92, 0x97, 0x82,
0x9b, 0x42, 0x7f, 0x9c, 0x71, 0xfb, 0x1c, 0xcc, 0x12, 0x2e, 0x21, 0x58, 0x93, 0x04, 0x15, 0xdc,
0xce, 0x91, 0x09, 0xc9, 0x63, 0x68, 0x07, 0x74, 0x46, 0xb8, 0xfb, 0x9a, 0x8e, 0x59, 0x44, 0x02,
0x66, 0xd5, 0x65, 0x86, 0xef, 0x97, 0x33, 0x7c, 0xb4, 0x98, 0xbc, 0xa2, 0x7c, 0x18, 0x91, 0x20,
0x4d, 0x73, 0x2b, 0xf3, 0x11, 0x3a, 0x86, 0x3e, 0x83, 0xcd, 0xfc, 0x90, 0x29, 0xf5, 0x38, 0x61,
0x96, 0xb1, 0xa7, 0x75, 0x10, 0xce, 0xcf, 0x7e, 0x2e, 0xb5, 0x2b, 0x40, 0xc9, 0x8e, 0x59, 0xb0,
0xa7, 0x75, 0x94, 0x02, 0x28, 0xa9, 0x31, 0x41, 0x2b, 0x0a, 0x99, 0x5b, 0xa2, 0xd5, 0xf8, 0x37,
0xb4, 0x32, 0x9f, 0x9c, 0x56, 0x7e, 0x48, 0x4a, 0xab, 0x99, 0xd0, 0xca, 0xd4, 0x05, 0xad, 0x1c,
0x98, 0xd2, 0x6a, 0x25, 0xb4, 0x32, 0x75, 0x4a, 0xeb, 0x1b, 0x80, 0x98, 0x32, 0xca, 0xc7, 0x73,
0x91, 0xfd, 0xf6, 0x7a, 0xbf, 0xe6, 0xf5, 0xd3, 0xc5, 0x02, 0x77, 0xe6, 0x06, 0x1c, 0x1b, 0x71,
0xf6, 0xb9, 0x5a, 0x80, 0x9b, 0x77, 0x0b, 0xf0, 0x4b, 0x30, 0x72, 0xaf, 0xd5, 0x4e, 0xad, 0x81,
0xf6, 0xb2, 0x37, 0x34, 0x15, 0xa4, 0x83, 0xda, 0x1f, 0x98, 0x6a, 0xd1, 0xad, 0xda, 0x51, 0x0d,
0xaa, 0x92, 0xf3, 0x51, 0x13, 0xa0, 0x78, 0x76, 0xfb, 0x19, 0x40, 0x91, 0x1f, 0x51, 0x79, 0xe1,
0xf5, 0x35, 0xa3, 0x49, 0x29, 0x6f, 0xe1, 0x54, 0x12, 0x7a, 0x8f, 0x06, 0x33, 0x3e, 0x97, 0x15,
0xdc, 0xc2, 0xa9, 0x64, 0xff, 0xaa, 0x02, 0x8c, 0x5c, 0x9f, 0x0e, 0x69, 0xec, 0x52, 0xf6, 0xfe,
0xfd, 0x77, 0x00, 0x35, 0x26, 0x5b, 0x9f, 0x59, 0xaa, 0xf4, 0x40, 0x65, 0x8f, 0x64, 0x2a, 0xa4,
0x2e, 0x19, 0x10, 0x7d, 0x05, 0x06, 0x4d, 0x1b, 0x9e, 0x59, 0x9a, 0xf4, 0xda, 0x2e, 0x7b, 0x65,
0xd3, 0x20, 0xf5, 0x2b, 0xc0, 0xe8, 0x6b, 0x80, 0x79, 0x96, 0x78, 0x66, 0x55, 0xa4, 0xeb, 0xbd,
0xb7, 0x3e, 0x4b, 0xea, 0x5b, 0x82, 0x8b, 0xb0, 0x7e, 0x3a, 0x65, 0x99, 0x55, 0x5d, 0x0f, 0x9b,
0x8d, 0xe0, 0x2c, 0x6c, 0x0e, 0xb6, 0x1f, 0x43, 0x55, 0xde, 0x5d, 0xcc, 0x5d, 0xb9, 0x90, 0x94,
0x64, 0xee, 0x8a, 0xef, 0xd5, 0x09, 0x64, 0xa4, 0x13, 0xc8, 0x7e, 0x0a, 0xfa, 0x45, 0x92, 0xa1,
0xf7, 0x4d, 0xa9, 0xfd, 0xb3, 0x02, 0x4d, 0xa9, 0x77, 0x08, 0x9f, 0xcc, 0x69, 0x8c, 0x1e, 0xaf,
0xac, 0x8d, 0x07, 0x6b, 0xfe, 0x29, 0xae, 0xbb, 0xba, 0x34, 0x4a, 0x9b, 0xf3, 0x0e, 0x51, 0xad,
0x4c, 0xb4, 0x03, 0x15, 0xb9, 0x30, 0x74, 0x50, 0x7b, 0x2f, 0x92, 0x0a, 0xec, 0xf7, 0x5e, 0x24,
0x15, 0x88, 0xc5, 0x92, 0x10, 0x0a, 0xdc, 0x33, 0x35, 0xfb, 0x37, 0x45, 0x94, 0x2d, 0x99, 0x8a,
0xaa, 0x65, 0xe8, 0x43, 0xa8, 0x31, 0x4e, 0xa3, 0xb1, 0xcf, 0x24, 0x2f, 0x0d, 0xeb, 0x42, 0x74,
0x98, 0x08, 0x7d, 0xbd, 0x08, 0x26, 0x59, 0x68, 0xf1, 0x8d, 0x3e, 0x82, 0x3a, 0xe3, 0x24, 0xe6,
0x02, 0x9d, 0x8c, 0xe3, 0x9a, 0x94, 0x1d, 0x86, 0xee, 0x81, 0x4e, 0x83, 0xe9, 0x58, 0x3e, 0xa7,
0x30, 0x54, 0x69, 0x30, 0x75, 0x18, 0xda, 0x81, 0xfa, 0x2c, 0x0e, 0x17, 0x91, 0x1b, 0xcc, 0xe4,
0x5b, 0x19, 0x38, 0x97, 0x51, 0x1b, 0xd4, 0xab, 0xa5, 0x1c, 0x89, 0x75, 0xac, 0x5e, 0x2d, 0xc5,
0xe9, 0x31, 0x09, 0x66, 0x54, 0x1c, 0x52, 0x4b, 0x4e, 0x97, 0xb2, 0xc3, 0xec, 0x3f, 0x15, 0xa8,
0x1e, 0xcf, 0x17, 0xc1, 0x2b, 0xb4, 0x0b, 0x0d, 0xdf, 0x0d, 0xc6, 0xa2, 0x09, 0x0b, 0xce, 0x86,
0xef, 0x06, 0xa2, 0xfa, 0x1d, 0x26, 0xed, 0xe4, 0x26, 0xb7, 0xa7, 0x5b, 0xca, 0x27, 0x37, 0xa9,
0xbd, 0x9b, 0x3e, 0x82, 0x26, 0x1f, 0x61, 0xa7, 0xfc, 0x08, 0x32, 0x40, 0xb7, 0x17, 0x4c, 0xc2,
0xa9, 0x1b, 0xcc, 0x8a, 0x17, 0x10, 0xc5, 0x23, 0x6f, 0xd5, 0xc4, 0xf2, 0xdb, 0x7e, 0x0e, 0xf5,
0x0c, 0xb5, 0xd6, 0xf6, 0xdf, 0x0f, 0xc4, 0x72, 0x5e, 0xd9, 0xc8, 0x2a, 0xfa, 0x00, 0x36, 0x4f,
0x2e, 0x06, 0x87, 0xa3, 0x71, 0x69, 0x4d, 0xdb, 0x3f, 0x42, 0x4b, 0x46, 0xa4, 0xd3, 0xff, 0xda,
0xb4, 0xfb, 0xa0, 0x4f, 0xc4, 0x09, 0x59, 0xcf, 0x6e, 0xad, 0xdd, 0x26, 0x73, 0x48, 0x60, 0x47,
0xdb, 0x6f, 0x6e, 0x77, 0x95, 0x3f, 0x6e, 0x77, 0x95, 0xbf, 0x6f, 0x77, 0x95, 0x1f, 0x74, 0x81,
0x8e, 0xae, 0xae, 0x74, 0xf9, 0x07, 0xf8, 0xe4, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xb0,
0xf6, 0x08, 0x32, 0x0a, 0x00, 0x00,
}
func (m *MetricMetadata) Marshal() (dAtA []byte, err error) {
@ -1798,16 +1798,20 @@ func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
{
size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
if len(m.Metadatas) > 0 {
for iNdEx := len(m.Metadatas) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Metadatas[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
@ -2424,8 +2428,12 @@ func (m *TimeSeries) Size() (n int) {
n += 1 + l + sovTypes(uint64(l))
}
}
l = m.Metadata.Size()
n += 1 + l + sovTypes(uint64(l))
if len(m.Metadatas) > 0 {
for _, e := range m.Metadatas {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -3865,7 +3873,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Metadatas", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
@ -3892,7 +3900,8 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
m.Metadatas = append(m.Metadatas, Metadata{})
if err := m.Metadatas[len(m.Metadatas)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex

View file

@ -147,7 +147,7 @@ message TimeSeries {
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
Metadata metadata = 5 [(gogoproto.nullable) = false];
repeated Metadata metadatas = 5 [(gogoproto.nullable) = false];
}
message Label {

View file

@ -57,7 +57,7 @@ var writeRequestFixture = &prompb.WriteRequest{
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
Metadata: prompb.Metadata{Type: prompb.Metadata_COUNTER, Help: "help text 1", Unit: "unit text 1"},
Metadatas: []prompb.Metadata{{Type: prompb.Metadata_COUNTER, Help: "help text 1", Unit: "unit text 1"}},
},
{
Labels: []prompb.Label{
@ -70,7 +70,7 @@ var writeRequestFixture = &prompb.WriteRequest{
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
Metadata: prompb.Metadata{Type: prompb.Metadata_GAUGE, Help: "help text 2", Unit: "unit text 2"},
Metadatas: []prompb.Metadata{{Type: prompb.Metadata_GAUGE, Help: "help text 2", Unit: "unit text 2"}},
},
},
}

View file

@ -71,12 +71,14 @@ type queueManagerMetrics struct {
droppedSamplesTotal prometheus.Counter
droppedExemplarsTotal prometheus.Counter
droppedHistogramsTotal prometheus.Counter
droppedMetadataTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge
pendingExemplars prometheus.Gauge
pendingHistograms prometheus.Gauge
pendingMetadata prometheus.Gauge
shardCapacity prometheus.Gauge
numShards prometheus.Gauge
maxNumShards prometheus.Gauge
@ -246,6 +248,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "The number of histograms pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels,
})
m.pendingMetadata = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "metadata_pending",
Help: "The number of metadata pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels,
})
m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -408,10 +417,9 @@ type QueueManager struct {
clientMtx sync.RWMutex
storeClient WriteClient
seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries.
seriesLabels map[chunks.HeadSeriesRef]labels.Labels
seriesMetadata map[chunks.HeadSeriesRef]metadata.Metadata
droppedSeries map[chunks.HeadSeriesRef]struct{}
seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries.
seriesLabels map[chunks.HeadSeriesRef]labels.Labels
droppedSeries map[chunks.HeadSeriesRef]struct{}
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
@ -478,7 +486,6 @@ func NewQueueManager(
sendMetadata: enableMetadataRemoteWrite,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesMetadata: make(map[chunks.HeadSeriesRef]metadata.Metadata),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
@ -592,7 +599,6 @@ outer:
t.seriesMtx.Unlock()
continue
}
meta := t.seriesMetadata[s.Ref]
t.seriesMtx.Unlock()
// Start with a very small backoff. This should not be t.cfg.MinBackoff
// as it can happen without errors, and we want to pickup work after
@ -607,7 +613,6 @@ outer:
}
if t.shards.enqueue(s.Ref, timeSeries{
seriesLabels: lbls,
metadata: meta,
timestamp: s.T,
value: s.V,
sType: tSample,
@ -647,7 +652,6 @@ outer:
t.seriesMtx.Unlock()
continue
}
meta := t.seriesMetadata[e.Ref]
t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff
@ -659,7 +663,6 @@ outer:
}
if t.shards.enqueue(e.Ref, timeSeries{
seriesLabels: lbls,
metadata: meta,
timestamp: e.T,
value: e.V,
exemplarLabels: e.Labels,
@ -697,7 +700,6 @@ outer:
t.seriesMtx.Unlock()
continue
}
meta := t.seriesMetadata[h.Ref]
t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond)
@ -709,7 +711,6 @@ outer:
}
if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls,
metadata: meta,
timestamp: h.T,
histogram: h.H,
sType: tHistogram,
@ -746,7 +747,6 @@ outer:
t.seriesMtx.Unlock()
continue
}
meta := t.seriesMetadata[h.Ref]
t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond)
@ -758,7 +758,6 @@ outer:
}
if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls,
metadata: meta,
timestamp: h.T,
floatHistogram: h.FH,
sType: tFloatHistogram,
@ -777,6 +776,58 @@ outer:
return true
}
func (t *QueueManager) AppendWALMetadata(ms []record.RefMetadata) bool {
if !t.sendMetadata {
return true
}
outer:
for _, m := range ms {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[m.Ref]
if !ok {
t.metrics.droppedMetadataTotal.Inc()
// Track dropped metadata in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[m.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped metadata for series that was not explicitly dropped via relabelling", "ref", m.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
backoff := t.cfg.MinBackoff
for {
select {
case <-t.quit:
return false
default:
}
if t.shards.enqueue(m.Ref, timeSeries{
seriesLabels: lbls,
sType: tMetadata,
metadata: &metadata.Metadata{
Type: record.ToTextparseMetricType(m.Type),
Help: m.Help,
Unit: m.Unit,
},
}) {
continue outer
}
t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
backoff *= 2
if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff
}
}
}
return true
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
@ -853,23 +904,6 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
}
}
// StoreMetadata keeps track of known series' metadata for lookups when sending samples to remote.
func (t *QueueManager) StoreMetadata(meta []record.RefMetadata) {
if !t.sendMetadata {
return
}
t.seriesMtx.Lock()
defer t.seriesMtx.Unlock()
for _, m := range meta {
t.seriesMetadata[m.Ref] = metadata.Metadata{
Type: record.ToTextparseMetricType(m.Type),
Unit: m.Unit,
Help: m.Help,
}
}
}
// UpdateSeriesSegment updates the segment number held against the series,
// so we can trim older ones in SeriesReset.
func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) {
@ -895,7 +929,6 @@ func (t *QueueManager) SeriesReset(index int) {
delete(t.seriesSegmentIndexes, k)
t.releaseLabels(t.seriesLabels[k])
delete(t.seriesLabels, k)
delete(t.seriesMetadata, k)
delete(t.droppedSeries, k)
}
}
@ -1105,6 +1138,7 @@ type shards struct {
enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64
enqueuedHistograms atomic.Int64
enqueuedMetadata atomic.Int64
// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
@ -1120,6 +1154,7 @@ type shards struct {
samplesDroppedOnHardShutdown atomic.Uint32
exemplarsDroppedOnHardShutdown atomic.Uint32
histogramsDroppedOnHardShutdown atomic.Uint32
metadataDroppedOnHardShutdown atomic.Uint32
}
// start the shards; must be called before any call to enqueue.
@ -1148,6 +1183,7 @@ func (s *shards) start(n int) {
s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0)
s.histogramsDroppedOnHardShutdown.Store(0)
s.metadataDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
}
@ -1216,6 +1252,9 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
case tHistogram, tFloatHistogram:
s.qm.metrics.pendingHistograms.Inc()
s.enqueuedHistograms.Inc()
case tMetadata:
s.qm.metrics.pendingMetadata.Inc()
s.enqueuedMetadata.Inc()
}
return true
}
@ -1236,10 +1275,10 @@ type queue struct {
type timeSeries struct {
seriesLabels labels.Labels
metadata metadata.Metadata
value float64
histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram
metadata *metadata.Metadata
timestamp int64
exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram.
@ -1253,6 +1292,7 @@ const (
tExemplar
tHistogram
tFloatHistogram
tMetadata
)
func newQueue(batchSize, capacity int) *queue {
@ -1413,25 +1453,28 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
droppedSamples := int(s.enqueuedSamples.Load())
droppedExemplars := int(s.enqueuedExemplars.Load())
droppedHistograms := int(s.enqueuedHistograms.Load())
droppedMetadata := int(s.enqueuedMetadata.Load())
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars))
s.qm.metrics.pendingHistograms.Sub(float64(droppedHistograms))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars))
s.qm.metrics.failedHistogramsTotal.Add(float64(droppedHistograms))
s.qm.metrics.failedMetadataTotal.Add(float64(droppedMetadata))
s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples))
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
s.histogramsDroppedOnHardShutdown.Add(uint32(droppedHistograms))
s.metadataDroppedOnHardShutdown.Add(uint32(droppedMetadata))
return
case batch, ok := <-batchQueue:
if !ok {
return
}
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
n := nPendingSamples + nPendingExemplars + nPendingHistograms + nPendingMetadata
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1439,11 +1482,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms + nPendingMetadata
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms, "metadata", nPendingMetadata)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1451,8 +1494,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
@ -1462,11 +1505,7 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
if s.qm.sendMetadata {
pendingData[nPending].Metadata = prompb.Metadata{
Type: metricTypeToProtoEquivalent(d.metadata.Type),
Help: d.metadata.Help,
Unit: d.metadata.Unit,
}
pendingData[nPending].Metadatas = pendingData[nPending].Metadatas[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
@ -1493,19 +1532,27 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
case tMetadata:
pendingData[nPending].Metadatas = append(pendingData[nPending].Metadatas, prompb.Metadata{
Type: metricTypeToProtoEquivalent(d.metadata.Type),
Help: d.metadata.Help,
Unit: d.metadata.Unit,
})
nPendingMetadata++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata
}
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf)
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf)
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
s.qm.metrics.failedMetadataTotal.Add(float64(metadataCount))
}
// These counters are used to calculate the dynamic sharding, and as such
@ -1518,13 +1565,15 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
s.qm.metrics.pendingMetadata.Sub(float64(metadataCount))
s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount))
s.enqueuedHistograms.Sub(int64(histogramCount))
s.enqueuedMetadata.Sub(int64(metadataCount))
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
if err != nil {
@ -1557,11 +1606,15 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
if metadataCount > 0 {
span.SetAttributes(attribute.Int("metadata", metadataCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
err := s.qm.client().Store(ctx, *buf)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -1577,6 +1630,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
s.qm.metrics.retriedMetadataTotal.Add(float64(metadataCount))
}
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)

View file

@ -66,13 +66,14 @@ func TestSampleDelivery(t *testing.T) {
exemplars bool
histograms bool
floatHistograms bool
metadata bool
}{
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: false, name: "metadata only"},
{samples: true, exemplars: false, histograms: false, floatHistograms: false, metadata: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, metadata: false, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, floatHistograms: false, metadata: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, floatHistograms: false, metadata: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, metadata: false, name: "float histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: false, metadata: true, name: "metadata only"},
}
// Let's create an even number of send batches so we don't run into the
@ -110,6 +111,7 @@ func TestSampleDelivery(t *testing.T) {
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
metadata []record.RefMetadata
)
// Generates same series in both cases.
@ -125,6 +127,9 @@ func TestSampleDelivery(t *testing.T) {
if tc.floatHistograms {
_, floatHistograms, series = createHistograms(n, n, true)
}
if tc.metadata {
metadata, series = createMetadata(n)
}
// Apply new config.
queueConfig.Capacity = len(samples)
@ -144,10 +149,12 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
c.expectTsMetadata(metadata[:len(metadata)/2], series)
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
qm.AppendWALMetadata(metadata[:len(metadata)/2])
c.waitForExpectedData(t)
// Send second half of data.
@ -155,10 +162,12 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series)
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
c.expectTsMetadata(metadata[len(metadata)/2:], series)
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
qm.AppendWALMetadata(metadata[len(metadata)/2:])
c.waitForExpectedData(t)
})
}
@ -198,6 +207,43 @@ func TestMetadataDelivery(t *testing.T) {
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
}
func TestWALMetadataDelivery(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
cfg := config.DefaultQueueConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = cfg
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
writeConfig,
},
}
metadata, series := createMetadata(3)
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
qm.sendMetadata = true
c := NewTestWriteClient()
qm.SetClient(c)
qm.StoreSeries(series, 0)
c.expectTsMetadata(metadata, series)
qm.AppendWALMetadata(metadata)
c.waitForExpectedData(t)
}
func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
@ -312,22 +358,17 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, true)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
metadata := []record.RefMetadata{}
for j := 0; j < numSeries; j++ {
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")})
metadata = append(metadata, record.RefMetadata{Ref: chunks.HeadSeriesRef((i * 100) + j), Type: 0, Unit: "unit text", Help: "help text"})
}
m.StoreMetadata(metadata)
m.StoreSeries(series, i)
}
require.Equal(t, numSegments*numSeries, len(m.seriesLabels))
require.Equal(t, numSegments*numSeries, len(m.seriesMetadata))
m.SeriesReset(2)
require.Equal(t, numSegments*numSeries/2, len(m.seriesLabels))
require.Equal(t, numSegments*numSeries/2, len(m.seriesMetadata))
}
func TestReshard(t *testing.T) {
@ -649,6 +690,28 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
return histograms, nil, series
}
func createMetadata(numMetadata int) ([]record.RefMetadata, []record.RefSeries) {
series := make([]record.RefSeries, 0, numMetadata)
metas := make([]record.RefMetadata, 0, numMetadata)
for i := 0; i < numMetadata; i++ {
name := fmt.Sprintf("test_metric_%d", i)
metas = append(metas, record.RefMetadata{
Ref: chunks.HeadSeriesRef(i),
Type: uint8(record.Counter),
Unit: "unit text",
Help: "help text",
})
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name, "foo", "bar"),
})
}
return metas, series
}
func getSeriesNameFromRef(r record.RefSeries) string {
return r.Labels.Get("__name__")
}
@ -662,6 +725,8 @@ type TestWriteClient struct {
receivedFloatHistograms map[string][]prompb.Histogram
expectedHistograms map[string][]prompb.Histogram
expectedFloatHistograms map[string][]prompb.Histogram
receivedTsMetadata map[string][]prompb.Metadata
expectedTsMetadata map[string][]prompb.Metadata
receivedMetadata map[string][]prompb.MetricMetadata
writesReceived int
withWaitGroup bool
@ -755,6 +820,23 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
c.wg.Add(len(fhs))
}
func (c *TestWriteClient) expectTsMetadata(ms []record.RefMetadata, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedTsMetadata = map[string][]prompb.Metadata{}
c.receivedTsMetadata = map[string][]prompb.Metadata{}
for _, m := range ms {
seriesName := getSeriesNameFromRef(series[m.Ref])
c.expectedTsMetadata[seriesName] = append(c.expectedTsMetadata[seriesName], prompb.Metadata{Type: metricTypeToProtoEquivalent(record.ToTextparseMetricType(m.Type)), Unit: m.Unit, Help: m.Help})
}
c.wg.Add(len(ms))
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
if !c.withWaitGroup {
return
@ -774,6 +856,9 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
for ts, expectedFloatHistogram := range c.expectedFloatHistograms {
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts)
}
for ts, expectedMetadata := range c.expectedTsMetadata {
require.Equal(tb, expectedMetadata, c.receivedTsMetadata[ts], ts)
}
}
func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
@ -815,6 +900,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram)
}
}
for _, metadata := range ts.Metadatas {
count++
c.receivedTsMetadata[seriesName] = append(c.receivedTsMetadata[seriesName], metadata)
}
}
if c.withWaitGroup {
c.wg.Add(-count)

View file

@ -146,10 +146,12 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
}
m := metadataProtoToMetadata(ts.Metadata)
_, err := app.UpdateMetadata(0, labels, m)
if err != nil {
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
for _, mp := range ts.Metadatas {
m := metadataProtoToMetadata(mp)
_, err := app.UpdateMetadata(0, labels, m)
if err != nil {
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
}
}
}

View file

@ -80,9 +80,10 @@ func TestRemoteWriteHandler(t *testing.T) {
k++
}
m := ts.Metadata
require.Equal(t, mockMetadata{labels, int32(m.Type), m.Unit, m.Help}, appendable.metadata[l])
l++
for _, m := range ts.Metadatas {
require.Equal(t, mockMetadata{labels, int32(m.Type), m.Unit, m.Help}, appendable.metadata[l])
l++
}
}
}

View file

@ -51,8 +51,8 @@ type WriteTo interface {
AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogramSample) bool
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
AppendWALMetadata([]record.RefMetadata) bool
StoreSeries([]record.RefSeries, int)
StoreMetadata([]record.RefMetadata)
// Next two methods are intended for garbage-collection: first we call
// UpdateSeriesSegment on all current series
@ -613,7 +613,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreMetadata(meta)
w.writer.AppendWALMetadata(meta)
case record.Tombstones:
default:

View file

@ -56,6 +56,7 @@ type writeToMock struct {
exemplarsAppended int
histogramsAppended int
floatHistogramsAppended int
metadataAppended int
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
}
@ -80,12 +81,15 @@ func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSampl
return true
}
func (wtm *writeToMock) AppendWALMetadata(m []record.RefMetadata) bool {
wtm.metadataAppended += len(m)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.UpdateSeriesSegment(series, index)
}
func (wtm *writeToMock) StoreMetadata(_ []record.RefMetadata) { /* no-op */ }
func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()