Merge pull request #12304 from bboreham/labels-symboltable

Labels: reduce memory by de-duplicating strings in SymbolTables
This commit is contained in:
Bryan Boreham 2024-02-26 14:06:27 +00:00 committed by GitHub
commit d2817e9c91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 1208 additions and 234 deletions

View file

@ -19,6 +19,7 @@ jobs:
- run: make GO_ONLY=1 SKIP_GOLANGCI_LINT=1
- run: go test ./tsdb/ -test.tsdb-isolation=false
- run: go test --tags=stringlabels ./...
- run: go test --tags=dedupelabels ./...
- run: GOARCH=386 go test ./cmd/prometheus
- run: make -C documentation/examples/remote_storage
- run: make -C documentation/examples

View file

@ -127,7 +127,8 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
ctx := context.Background()
app := w.Appender(ctx)
p := textparse.NewOpenMetricsParser(input)
symbolTable := labels.NewSymbolTable() // One table per block means it won't grow too large.
p := textparse.NewOpenMetricsParser(input, symbolTable)
samplesCount := 0
for {
e, err := p.Next()
@ -216,7 +217,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
}
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration) (err error) {
p := textparse.NewOpenMetricsParser(input)
p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps.
maxt, mint, err := getMinAndMaxTimestamps(p)
if err != nil {
return fmt.Errorf("getting min and max timestamp: %w", err)

View file

@ -82,7 +82,7 @@ func Load(s string, expandExternalLabels bool, logger log.Logger) (*Config, erro
return cfg, nil
}
b := labels.ScratchBuilder{}
b := labels.NewScratchBuilder(0)
cfg.GlobalConfig.ExternalLabels.Range(func(v labels.Label) {
newV := os.Expand(v.Value, func(s string) string {
if s == "$" {
@ -97,6 +97,7 @@ func Load(s string, expandExternalLabels bool, logger log.Logger) (*Config, erro
if newV != v.Value {
level.Debug(logger).Log("msg", "External label replaced", "label", v.Name, "input", v.Value, "output", newV)
}
// Note newV can be blank. https://github.com/prometheus/prometheus/issues/11024
b.Add(v.Name, newV)
})
cfg.GlobalConfig.ExternalLabels = b.Labels()

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !stringlabels
//go:build !stringlabels && !dedupelabels
package labels
@ -371,6 +371,25 @@ func (ls Labels) ReleaseStrings(release func(string)) {
}
}
// Builder allows modifying Labels.
type Builder struct {
base Labels
del []string
add []Label
}
// Reset clears all current state for the builder.
func (b *Builder) Reset(base Labels) {
b.base = base
b.del = b.del[:0]
b.add = b.add[:0]
b.base.Range(func(l Label) {
if l.Value == "" {
b.del = append(b.del, l.Name)
}
})
}
// Labels returns the labels from the builder.
// If no modifications were made, the original labels are returned.
func (b *Builder) Labels() Labels {
@ -401,11 +420,32 @@ type ScratchBuilder struct {
add Labels
}
// Symbol-table is no-op, just for api parity with dedupelabels.
type SymbolTable struct{}
func NewSymbolTable() *SymbolTable { return nil }
func (t *SymbolTable) Len() int { return 0 }
// NewScratchBuilder creates a ScratchBuilder initialized for Labels with n entries.
func NewScratchBuilder(n int) ScratchBuilder {
return ScratchBuilder{add: make([]Label, 0, n)}
}
// NewBuilderWithSymbolTable creates a Builder, for api parity with dedupelabels.
func NewBuilderWithSymbolTable(_ *SymbolTable) *Builder {
return NewBuilder(EmptyLabels())
}
// NewScratchBuilderWithSymbolTable creates a ScratchBuilder, for api parity with dedupelabels.
func NewScratchBuilderWithSymbolTable(_ *SymbolTable, n int) ScratchBuilder {
return NewScratchBuilder(n)
}
func (b *ScratchBuilder) SetSymbolTable(_ *SymbolTable) {
// no-op
}
func (b *ScratchBuilder) Reset() {
b.add = b.add[:0]
}

View file

@ -123,13 +123,6 @@ func FromMap(m map[string]string) Labels {
return New(l...)
}
// Builder allows modifying Labels.
type Builder struct {
base Labels
del []string
add []Label
}
// NewBuilder returns a new LabelsBuilder.
func NewBuilder(base Labels) *Builder {
b := &Builder{
@ -140,18 +133,6 @@ func NewBuilder(base Labels) *Builder {
return b
}
// Reset clears all current state for the builder.
func (b *Builder) Reset(base Labels) {
b.base = base
b.del = b.del[:0]
b.add = b.add[:0]
b.base.Range(func(l Label) {
if l.Value == "" {
b.del = append(b.del, l.Name)
}
})
}
// Del deletes the label of the given name.
func (b *Builder) Del(ns ...string) *Builder {
for _, n := range ns {

View file

@ -0,0 +1,807 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build dedupelabels
package labels
import (
"bytes"
"strings"
"sync"
"unsafe"
"github.com/cespare/xxhash/v2"
"golang.org/x/exp/slices"
)
// Labels is implemented by a SymbolTable and string holding name/value
// pairs encoded as indexes into the table in varint encoding.
// Names are in alphabetical order.
type Labels struct {
syms *nameTable
data string
}
// Split SymbolTable into the part used by Labels and the part used by Builder. Only the latter needs the map.
// This part is used by Labels. All fields are immutable after construction.
type nameTable struct {
byNum []string // This slice header is never changed, even while we are building the symbol table.
symbolTable *SymbolTable // If we need to use it in a Builder.
}
// SymbolTable is used to map strings into numbers so they can be packed together.
type SymbolTable struct {
mx sync.Mutex
*nameTable
nextNum int
byName map[string]int
}
const defaultSymbolTableSize = 1024
func NewSymbolTable() *SymbolTable {
t := &SymbolTable{
nameTable: &nameTable{byNum: make([]string, defaultSymbolTableSize)},
byName: make(map[string]int, defaultSymbolTableSize),
}
t.nameTable.symbolTable = t
return t
}
func (t *SymbolTable) Len() int {
t.mx.Lock()
defer t.mx.Unlock()
return len(t.byName)
}
// ToNum maps a string to an integer, adding the string to the table if it is not already there.
// Note: copies the string before adding, in case the caller passed part of
// a buffer that should not be kept alive by this SymbolTable.
func (t *SymbolTable) ToNum(name string) int {
t.mx.Lock()
defer t.mx.Unlock()
return t.toNumUnlocked(name)
}
func (t *SymbolTable) toNumUnlocked(name string) int {
if i, found := t.byName[name]; found {
return i
}
i := t.nextNum
if t.nextNum == cap(t.byNum) {
// Name table is full; copy to a new one. Don't touch the existing slice, as nameTable is immutable after construction.
newSlice := make([]string, cap(t.byNum)*2)
copy(newSlice, t.byNum)
t.nameTable = &nameTable{byNum: newSlice, symbolTable: t}
}
name = strings.Clone(name)
t.byNum[i] = name
t.byName[name] = i
t.nextNum++
return i
}
func (t *SymbolTable) checkNum(name string) (int, bool) {
t.mx.Lock()
defer t.mx.Unlock()
i, bool := t.byName[name]
return i, bool
}
// ToName maps an integer to a string.
func (t *nameTable) ToName(num int) string {
return t.byNum[num]
}
func decodeVarint(data string, index int) (int, int) {
// Fast-path for common case of a single byte, value 0..127.
b := data[index]
index++
if b < 0x80 {
return int(b), index
}
value := int(b & 0x7F)
for shift := uint(7); ; shift += 7 {
// Just panic if we go of the end of data, since all Labels strings are constructed internally and
// malformed data indicates a bug, or memory corruption.
b := data[index]
index++
value |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
return value, index
}
func decodeString(t *nameTable, data string, index int) (string, int) {
var num int
num, index = decodeVarint(data, index)
return t.ToName(num), index
}
// Bytes returns ls as a byte slice.
// It uses non-printing characters and so should not be used for printing.
func (ls Labels) Bytes(buf []byte) []byte {
b := bytes.NewBuffer(buf[:0])
for i := 0; i < len(ls.data); {
if i > 0 {
b.WriteByte(seps[0])
}
var name, value string
name, i = decodeString(ls.syms, ls.data, i)
value, i = decodeString(ls.syms, ls.data, i)
b.WriteString(name)
b.WriteByte(seps[0])
b.WriteString(value)
}
return b.Bytes()
}
// IsZero implements yaml.IsZeroer - if we don't have this then 'omitempty' fields are always omitted.
func (ls Labels) IsZero() bool {
return len(ls.data) == 0
}
// MatchLabels returns a subset of Labels that matches/does not match with the provided label names based on the 'on' boolean.
// If on is set to true, it returns the subset of labels that match with the provided label names and its inverse when 'on' is set to false.
// TODO: This is only used in printing an error message
func (ls Labels) MatchLabels(on bool, names ...string) Labels {
b := NewBuilder(ls)
if on {
b.Keep(names...)
} else {
b.Del(MetricName)
b.Del(names...)
}
return b.Labels()
}
// Hash returns a hash value for the label set.
// Note: the result is not guaranteed to be consistent across different runs of Prometheus.
func (ls Labels) Hash() uint64 {
// Use xxhash.Sum64(b) for fast path as it's faster.
b := make([]byte, 0, 1024)
for pos := 0; pos < len(ls.data); {
name, newPos := decodeString(ls.syms, ls.data, pos)
value, newPos := decodeString(ls.syms, ls.data, newPos)
if len(b)+len(name)+len(value)+2 >= cap(b) {
// If labels entry is 1KB+, hash the rest of them via Write().
h := xxhash.New()
_, _ = h.Write(b)
for pos < len(ls.data) {
name, pos = decodeString(ls.syms, ls.data, pos)
value, pos = decodeString(ls.syms, ls.data, pos)
_, _ = h.WriteString(name)
_, _ = h.Write(seps)
_, _ = h.WriteString(value)
_, _ = h.Write(seps)
}
return h.Sum64()
}
b = append(b, name...)
b = append(b, seps[0])
b = append(b, value...)
b = append(b, seps[0])
pos = newPos
}
return xxhash.Sum64(b)
}
// HashForLabels returns a hash value for the labels matching the provided names.
// 'names' have to be sorted in ascending order.
func (ls Labels) HashForLabels(b []byte, names ...string) (uint64, []byte) {
b = b[:0]
j := 0
for i := 0; i < len(ls.data); {
var name, value string
name, i = decodeString(ls.syms, ls.data, i)
value, i = decodeString(ls.syms, ls.data, i)
for j < len(names) && names[j] < name {
j++
}
if j == len(names) {
break
}
if name == names[j] {
b = append(b, name...)
b = append(b, seps[0])
b = append(b, value...)
b = append(b, seps[0])
}
}
return xxhash.Sum64(b), b
}
// HashWithoutLabels returns a hash value for all labels except those matching
// the provided names.
// 'names' have to be sorted in ascending order.
func (ls Labels) HashWithoutLabels(b []byte, names ...string) (uint64, []byte) {
b = b[:0]
j := 0
for i := 0; i < len(ls.data); {
var name, value string
name, i = decodeString(ls.syms, ls.data, i)
value, i = decodeString(ls.syms, ls.data, i)
for j < len(names) && names[j] < name {
j++
}
if name == MetricName || (j < len(names) && name == names[j]) {
continue
}
b = append(b, name...)
b = append(b, seps[0])
b = append(b, value...)
b = append(b, seps[0])
}
return xxhash.Sum64(b), b
}
// BytesWithLabels is just as Bytes(), but only for labels matching names.
// 'names' have to be sorted in ascending order.
func (ls Labels) BytesWithLabels(buf []byte, names ...string) []byte {
b := bytes.NewBuffer(buf[:0])
j := 0
for pos := 0; pos < len(ls.data); {
lName, newPos := decodeString(ls.syms, ls.data, pos)
lValue, newPos := decodeString(ls.syms, ls.data, newPos)
for j < len(names) && names[j] < lName {
j++
}
if j == len(names) {
break
}
if lName == names[j] {
if b.Len() > 1 {
b.WriteByte(seps[0])
}
b.WriteString(lName)
b.WriteByte(seps[0])
b.WriteString(lValue)
}
pos = newPos
}
return b.Bytes()
}
// BytesWithoutLabels is just as Bytes(), but only for labels not matching names.
// 'names' have to be sorted in ascending order.
func (ls Labels) BytesWithoutLabels(buf []byte, names ...string) []byte {
b := bytes.NewBuffer(buf[:0])
j := 0
for pos := 0; pos < len(ls.data); {
lName, newPos := decodeString(ls.syms, ls.data, pos)
lValue, newPos := decodeString(ls.syms, ls.data, newPos)
for j < len(names) && names[j] < lName {
j++
}
if j == len(names) || lName != names[j] {
if b.Len() > 1 {
b.WriteByte(seps[0])
}
b.WriteString(lName)
b.WriteByte(seps[0])
b.WriteString(lValue)
}
pos = newPos
}
return b.Bytes()
}
// Copy returns a copy of the labels.
func (ls Labels) Copy() Labels {
return Labels{syms: ls.syms, data: strings.Clone(ls.data)}
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
if name == "" { // Avoid crash in loop if someone asks for "".
return "" // Prometheus does not store blank label names.
}
for i := 0; i < len(ls.data); {
var lName, lValue string
lName, i = decodeString(ls.syms, ls.data, i)
if lName == name {
lValue, _ = decodeString(ls.syms, ls.data, i)
return lValue
} else if lName[0] > name[0] { // Stop looking if we've gone past.
break
}
_, i = decodeVarint(ls.data, i)
}
return ""
}
// Has returns true if the label with the given name is present.
func (ls Labels) Has(name string) bool {
if name == "" { // Avoid crash in loop if someone asks for "".
return false // Prometheus does not store blank label names.
}
for i := 0; i < len(ls.data); {
var lName string
lName, i = decodeString(ls.syms, ls.data, i)
if lName == name {
return true
} else if lName[0] > name[0] { // Stop looking if we've gone past.
break
}
_, i = decodeVarint(ls.data, i)
}
return false
}
// HasDuplicateLabelNames returns whether ls has duplicate label names.
// It assumes that the labelset is sorted.
func (ls Labels) HasDuplicateLabelNames() (string, bool) {
prevNum := -1
for i := 0; i < len(ls.data); {
var lNum int
lNum, i = decodeVarint(ls.data, i)
_, i = decodeVarint(ls.data, i)
if lNum == prevNum {
return ls.syms.ToName(lNum), true
}
prevNum = lNum
}
return "", false
}
// WithoutEmpty returns the labelset without empty labels.
// May return the same labelset.
func (ls Labels) WithoutEmpty() Labels {
if ls.IsEmpty() {
return ls
}
// Idea: have a constant symbol for blank, then we don't have to look it up.
blank, ok := ls.syms.symbolTable.checkNum("")
if !ok { // Symbol table has no entry for blank - none of the values can be blank.
return ls
}
for pos := 0; pos < len(ls.data); {
_, newPos := decodeVarint(ls.data, pos)
lValue, newPos := decodeVarint(ls.data, newPos)
if lValue != blank {
pos = newPos
continue
}
// Do not copy the slice until it's necessary.
// TODO: could optimise the case where all blanks are at the end.
// Note: we size the new buffer on the assumption there is exactly one blank value.
buf := make([]byte, pos, pos+(len(ls.data)-newPos))
copy(buf, ls.data[:pos]) // copy the initial non-blank labels
pos = newPos // move past the first blank value
for pos < len(ls.data) {
var newPos int
_, newPos = decodeVarint(ls.data, pos)
lValue, newPos = decodeVarint(ls.data, newPos)
if lValue != blank {
buf = append(buf, ls.data[pos:newPos]...)
}
pos = newPos
}
return Labels{syms: ls.syms, data: yoloString(buf)}
}
return ls
}
// Equal returns whether the two label sets are equal.
func Equal(a, b Labels) bool {
if a.syms == b.syms {
return a.data == b.data
}
la, lb := len(a.data), len(b.data)
ia, ib := 0, 0
for ia < la && ib < lb {
var aValue, bValue string
aValue, ia = decodeString(a.syms, a.data, ia)
bValue, ib = decodeString(b.syms, b.data, ib)
if aValue != bValue {
return false
}
}
if ia != la || ib != lb {
return false
}
return true
}
// EmptyLabels returns an empty Labels value, for convenience.
func EmptyLabels() Labels {
return Labels{}
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
// New returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
// Note this function is not efficient; should not be used in performance-critical places.
func New(ls ...Label) Labels {
slices.SortFunc(ls, func(a, b Label) int { return strings.Compare(a.Name, b.Name) })
syms := NewSymbolTable()
var stackSpace [16]int
size, nums := mapLabelsToNumbers(syms, ls, stackSpace[:])
buf := make([]byte, size)
marshalNumbersToSizedBuffer(nums, buf)
return Labels{syms: syms.nameTable, data: yoloString(buf)}
}
// FromStrings creates new labels from pairs of strings.
func FromStrings(ss ...string) Labels {
if len(ss)%2 != 0 {
panic("invalid number of strings")
}
ls := make([]Label, 0, len(ss)/2)
for i := 0; i < len(ss); i += 2 {
ls = append(ls, Label{Name: ss[i], Value: ss[i+1]})
}
return New(ls...)
}
// Compare compares the two label sets.
// The result will be 0 if a==b, <0 if a < b, and >0 if a > b.
func Compare(a, b Labels) int {
la, lb := len(a.data), len(b.data)
ia, ib := 0, 0
for ia < la && ib < lb {
var aName, bName string
aName, ia = decodeString(a.syms, a.data, ia)
bName, ib = decodeString(b.syms, b.data, ib)
if aName != bName {
if aName < bName {
return -1
}
return 1
}
var aValue, bValue string
aValue, ia = decodeString(a.syms, a.data, ia)
bValue, ib = decodeString(b.syms, b.data, ib)
if aValue != bValue {
if aValue < bValue {
return -1
}
return 1
}
}
// If all labels so far were in common, the set with fewer labels comes first.
return (la - ia) - (lb - ib)
}
// Copy labels from b on top of whatever was in ls previously, reusing memory or expanding if needed.
func (ls *Labels) CopyFrom(b Labels) {
*ls = b // Straightforward memberwise copy is all we need.
}
// IsEmpty returns true if ls represents an empty set of labels.
func (ls Labels) IsEmpty() bool {
return len(ls.data) == 0
}
// Len returns the number of labels; it is relatively slow.
func (ls Labels) Len() int {
count := 0
for i := 0; i < len(ls.data); {
_, i = decodeVarint(ls.data, i)
_, i = decodeVarint(ls.data, i)
count++
}
return count
}
// Range calls f on each label.
func (ls Labels) Range(f func(l Label)) {
for i := 0; i < len(ls.data); {
var lName, lValue string
lName, i = decodeString(ls.syms, ls.data, i)
lValue, i = decodeString(ls.syms, ls.data, i)
f(Label{Name: lName, Value: lValue})
}
}
// Validate calls f on each label. If f returns a non-nil error, then it returns that error cancelling the iteration.
func (ls Labels) Validate(f func(l Label) error) error {
for i := 0; i < len(ls.data); {
var lName, lValue string
lName, i = decodeString(ls.syms, ls.data, i)
lValue, i = decodeString(ls.syms, ls.data, i)
err := f(Label{Name: lName, Value: lValue})
if err != nil {
return err
}
}
return nil
}
// InternStrings calls intern on every string value inside ls, replacing them with what it returns.
func (ls *Labels) InternStrings(intern func(string) string) {
// TODO: remove these calls as there is nothing to do.
}
// ReleaseStrings calls release on every string value inside ls.
func (ls Labels) ReleaseStrings(release func(string)) {
// TODO: remove these calls as there is nothing to do.
}
// DropMetricName returns Labels with "__name__" removed.
func (ls Labels) DropMetricName() Labels {
for i := 0; i < len(ls.data); {
lName, i2 := decodeString(ls.syms, ls.data, i)
_, i2 = decodeVarint(ls.data, i2)
if lName == MetricName {
if i == 0 { // Make common case fast with no allocations.
ls.data = ls.data[i2:]
} else {
ls.data = ls.data[:i] + ls.data[i2:]
}
break
} else if lName[0] > MetricName[0] { // Stop looking if we've gone past.
break
}
i = i2
}
return ls
}
// Builder allows modifying Labels.
type Builder struct {
syms *SymbolTable
nums []int
base Labels
del []string
add []Label
}
// NewBuilderWithSymbolTable returns a new LabelsBuilder not based on any labels, but with the SymbolTable.
func NewBuilderWithSymbolTable(s *SymbolTable) *Builder {
return &Builder{
syms: s,
}
}
// Reset clears all current state for the builder.
func (b *Builder) Reset(base Labels) {
if base.syms != nil { // If base has a symbol table, use that.
b.syms = base.syms.symbolTable
} else if b.syms == nil { // Or continue using previous symbol table in builder.
b.syms = NewSymbolTable() // Don't do this in performance-sensitive code.
}
b.base = base
b.del = b.del[:0]
b.add = b.add[:0]
base.Range(func(l Label) {
if l.Value == "" {
b.del = append(b.del, l.Name)
}
})
}
// Labels returns the labels from the builder.
// If no modifications were made, the original labels are returned.
func (b *Builder) Labels() Labels {
if len(b.del) == 0 && len(b.add) == 0 {
return b.base
}
slices.SortFunc(b.add, func(a, b Label) int { return strings.Compare(a.Name, b.Name) })
slices.Sort(b.del)
a, d, newSize := 0, 0, 0
newSize, b.nums = mapLabelsToNumbers(b.syms, b.add, b.nums)
bufSize := len(b.base.data) + newSize
buf := make([]byte, 0, bufSize)
for pos := 0; pos < len(b.base.data); {
oldPos := pos
var lName string
lName, pos = decodeString(b.base.syms, b.base.data, pos)
_, pos = decodeVarint(b.base.data, pos)
for d < len(b.del) && b.del[d] < lName {
d++
}
if d < len(b.del) && b.del[d] == lName {
continue // This label has been deleted.
}
for ; a < len(b.add) && b.add[a].Name < lName; a++ {
buf = appendLabelTo(b.nums[a*2], b.nums[a*2+1], buf) // Insert label that was not in the base set.
}
if a < len(b.add) && b.add[a].Name == lName {
buf = appendLabelTo(b.nums[a*2], b.nums[a*2+1], buf)
a++
continue // This label has been replaced.
}
buf = append(buf, b.base.data[oldPos:pos]...) // If base had a symbol-table we are using it, so we don't need to look up these symbols.
}
// We have come to the end of the base set; add any remaining labels.
for ; a < len(b.add); a++ {
buf = appendLabelTo(b.nums[a*2], b.nums[a*2+1], buf)
}
return Labels{syms: b.syms.nameTable, data: yoloString(buf)}
}
func marshalNumbersToSizedBuffer(nums []int, data []byte) int {
i := len(data)
for index := len(nums) - 1; index >= 0; index-- {
i = encodeVarint(data, i, nums[index])
}
return len(data) - i
}
func sizeVarint(x uint64) (n int) {
// Most common case first
if x < 1<<7 {
return 1
}
if x >= 1<<56 {
return 9
}
if x >= 1<<28 {
x >>= 28
n = 4
}
if x >= 1<<14 {
x >>= 14
n += 2
}
if x >= 1<<7 {
n++
}
return n + 1
}
func encodeVarintSlow(data []byte, offset int, v uint64) int {
offset -= sizeVarint(v)
base := offset
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
return base
}
// Special code for the common case that a value is less than 128
func encodeVarint(data []byte, offset, v int) int {
if v < 1<<7 {
offset--
data[offset] = uint8(v)
return offset
}
return encodeVarintSlow(data, offset, uint64(v))
}
// Map all the strings in lbls to the symbol table; return the total size required to hold them and all the individual mappings.
func mapLabelsToNumbers(t *SymbolTable, lbls []Label, buf []int) (totalSize int, nums []int) {
nums = buf[:0]
t.mx.Lock()
defer t.mx.Unlock()
// we just encode name/value/name/value, without any extra tags or length bytes
for _, m := range lbls {
// strings are encoded as a single varint, the index into the symbol table.
i := t.toNumUnlocked(m.Name)
nums = append(nums, i)
totalSize += sizeVarint(uint64(i))
i = t.toNumUnlocked(m.Value)
nums = append(nums, i)
totalSize += sizeVarint(uint64(i))
}
return totalSize, nums
}
func appendLabelTo(nameNum, valueNum int, buf []byte) []byte {
size := sizeVarint(uint64(nameNum)) + sizeVarint(uint64(valueNum))
sizeRequired := len(buf) + size
if cap(buf) >= sizeRequired {
buf = buf[:sizeRequired]
} else {
bufSize := cap(buf)
// Double size of buffer each time it needs to grow, to amortise copying cost.
for bufSize < sizeRequired {
bufSize = bufSize*2 + 1
}
newBuf := make([]byte, sizeRequired, bufSize)
copy(newBuf, buf)
buf = newBuf
}
i := sizeRequired
i = encodeVarint(buf, i, valueNum)
i = encodeVarint(buf, i, nameNum)
return buf
}
// ScratchBuilder allows efficient construction of a Labels from scratch.
type ScratchBuilder struct {
syms *SymbolTable
nums []int
add []Label
output Labels
overwriteBuffer []byte
}
// NewScratchBuilder creates a ScratchBuilder initialized for Labels with n entries.
// Warning: expensive; don't call in tight loops.
func NewScratchBuilder(n int) ScratchBuilder {
return ScratchBuilder{syms: NewSymbolTable(), add: make([]Label, 0, n)}
}
// NewScratchBuilderWithSymbolTable creates a ScratchBuilder initialized for Labels with n entries.
func NewScratchBuilderWithSymbolTable(s *SymbolTable, n int) ScratchBuilder {
return ScratchBuilder{syms: s, add: make([]Label, 0, n)}
}
func (b *ScratchBuilder) SetSymbolTable(s *SymbolTable) {
b.syms = s
}
func (b *ScratchBuilder) Reset() {
b.add = b.add[:0]
b.output = EmptyLabels()
}
// Add a name/value pair.
// Note if you Add the same name twice you will get a duplicate label, which is invalid.
func (b *ScratchBuilder) Add(name, value string) {
b.add = append(b.add, Label{Name: name, Value: value})
}
// Add a name/value pair, using []byte instead of string to reduce memory allocations.
// The values must remain live until Labels() is called.
func (b *ScratchBuilder) UnsafeAddBytes(name, value []byte) {
b.add = append(b.add, Label{Name: yoloString(name), Value: yoloString(value)})
}
// Sort the labels added so far by name.
func (b *ScratchBuilder) Sort() {
slices.SortFunc(b.add, func(a, b Label) int { return strings.Compare(a.Name, b.Name) })
}
// Assign is for when you already have a Labels which you want this ScratchBuilder to return.
func (b *ScratchBuilder) Assign(l Labels) {
b.output = l
}
// Labels returns the name/value pairs added as a Labels object. Calling Add() after Labels() has no effect.
// Note: if you want them sorted, call Sort() first.
func (b *ScratchBuilder) Labels() Labels {
if b.output.IsEmpty() {
var size int
size, b.nums = mapLabelsToNumbers(b.syms, b.add, b.nums)
buf := make([]byte, size)
marshalNumbersToSizedBuffer(b.nums, buf)
b.output = Labels{syms: b.syms.nameTable, data: yoloString(buf)}
}
return b.output
}
// Write the newly-built Labels out to ls, reusing an internal buffer.
// Callers must ensure that there are no other references to ls, or any strings fetched from it.
func (b *ScratchBuilder) Overwrite(ls *Labels) {
var size int
size, b.nums = mapLabelsToNumbers(b.syms, b.add, b.nums)
if size <= cap(b.overwriteBuffer) {
b.overwriteBuffer = b.overwriteBuffer[:size]
} else {
b.overwriteBuffer = make([]byte, size)
}
marshalNumbersToSizedBuffer(b.nums, b.overwriteBuffer)
ls.syms = b.syms.nameTable
ls.data = yoloString(b.overwriteBuffer)
}

View file

@ -188,8 +188,7 @@ func (ls Labels) BytesWithoutLabels(buf []byte, names ...string) []byte {
// Copy returns a copy of the labels.
func (ls Labels) Copy() Labels {
buf := append([]byte{}, ls.data...)
return Labels{data: yoloString(buf)}
return Labels{data: strings.Clone(ls.data)}
}
// Get returns the value for the label with the given name.
@ -458,6 +457,25 @@ func (ls *Labels) InternStrings(intern func(string) string) {
func (ls Labels) ReleaseStrings(release func(string)) {
}
// Builder allows modifying Labels.
type Builder struct {
base Labels
del []string
add []Label
}
// Reset clears all current state for the builder.
func (b *Builder) Reset(base Labels) {
b.base = base
b.del = b.del[:0]
b.add = b.add[:0]
b.base.Range(func(l Label) {
if l.Value == "" {
b.del = append(b.del, l.Name)
}
})
}
// Labels returns the labels from the builder.
// If no modifications were made, the original labels are returned.
func (b *Builder) Labels() Labels {
@ -662,3 +680,24 @@ func (b *ScratchBuilder) Overwrite(ls *Labels) {
marshalLabelsToSizedBuffer(b.add, b.overwriteBuffer)
ls.data = yoloString(b.overwriteBuffer)
}
// Symbol-table is no-op, just for api parity with dedupelabels.
type SymbolTable struct{}
func NewSymbolTable() *SymbolTable { return nil }
func (t *SymbolTable) Len() int { return 0 }
// NewBuilderWithSymbolTable creates a Builder, for api parity with dedupelabels.
func NewBuilderWithSymbolTable(_ *SymbolTable) *Builder {
return NewBuilder(EmptyLabels())
}
// NewScratchBuilderWithSymbolTable creates a ScratchBuilder, for api parity with dedupelabels.
func NewScratchBuilderWithSymbolTable(_ *SymbolTable, n int) ScratchBuilder {
return NewScratchBuilder(n)
}
func (b *ScratchBuilder) SetSymbolTable(_ *SymbolTable) {
// no-op
}

View file

@ -114,7 +114,7 @@ func TestLabels_MatchLabels(t *testing.T) {
for i, test := range tests {
got := labels.MatchLabels(test.on, test.providedNames...)
require.Equal(t, test.expected, got, "unexpected labelset for test case %d", i)
require.True(t, Equal(test.expected, got), "unexpected labelset for test case %d", i)
}
}
@ -207,7 +207,7 @@ func TestLabels_WithoutEmpty(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
require.Equal(t, test.expected, test.input.WithoutEmpty())
require.True(t, Equal(test.expected, test.input.WithoutEmpty()))
})
}
}
@ -569,6 +569,7 @@ func TestLabels_BytesWithoutLabels(t *testing.T) {
}
func TestBuilder(t *testing.T) {
reuseBuilder := NewBuilderWithSymbolTable(NewSymbolTable())
for i, tcase := range []struct {
base Labels
del []string
@ -580,6 +581,11 @@ func TestBuilder(t *testing.T) {
base: FromStrings("aaa", "111"),
want: FromStrings("aaa", "111"),
},
{
base: EmptyLabels(),
set: []Label{{"aaa", "444"}, {"bbb", "555"}, {"ccc", "666"}},
want: FromStrings("aaa", "444", "bbb", "555", "ccc", "666"),
},
{
base: FromStrings("aaa", "111", "bbb", "222", "ccc", "333"),
set: []Label{{"aaa", "444"}, {"bbb", "555"}, {"ccc", "666"}},
@ -638,8 +644,7 @@ func TestBuilder(t *testing.T) {
want: FromStrings("aaa", "111", "ddd", "444"),
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
b := NewBuilder(tcase.base)
test := func(t *testing.T, b *Builder) {
for _, lbl := range tcase.set {
b.Set(lbl.Name, lbl.Value)
}
@ -647,7 +652,7 @@ func TestBuilder(t *testing.T) {
b.Keep(tcase.keep...)
}
b.Del(tcase.del...)
require.Equal(t, tcase.want, b.Labels())
require.True(t, Equal(tcase.want, b.Labels()))
// Check what happens when we call Range and mutate the builder.
b.Range(func(l Label) {
@ -656,6 +661,18 @@ func TestBuilder(t *testing.T) {
}
})
require.Equal(t, tcase.want.BytesWithoutLabels(nil, "aaa", "bbb"), b.Labels().Bytes(nil))
}
t.Run(fmt.Sprintf("NewBuilder %d", i), func(t *testing.T) {
test(t, NewBuilder(tcase.base))
})
t.Run(fmt.Sprintf("NewSymbolTable %d", i), func(t *testing.T) {
b := NewBuilderWithSymbolTable(NewSymbolTable())
b.Reset(tcase.base)
test(t, b)
})
t.Run(fmt.Sprintf("reuseBuilder %d", i), func(t *testing.T) {
reuseBuilder.Reset(tcase.base)
test(t, reuseBuilder)
})
}
t.Run("set_after_del", func(t *testing.T) {
@ -694,14 +711,14 @@ func TestScratchBuilder(t *testing.T) {
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
b := ScratchBuilder{}
b := NewScratchBuilder(len(tcase.add))
for _, lbl := range tcase.add {
b.Add(lbl.Name, lbl.Value)
}
b.Sort()
require.Equal(t, tcase.want, b.Labels())
require.True(t, Equal(tcase.want, b.Labels()))
b.Assign(tcase.want)
require.Equal(t, tcase.want, b.Labels())
require.True(t, Equal(tcase.want, b.Labels()))
})
}
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !stringlabels
//go:build !stringlabels && !dedupelabels
package labels

View file

@ -0,0 +1,52 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build dedupelabels
package labels
import (
"github.com/cespare/xxhash/v2"
)
// StableHash is a labels hashing implementation which is guaranteed to not change over time.
// This function should be used whenever labels hashing backward compatibility must be guaranteed.
func StableHash(ls Labels) uint64 {
// Use xxhash.Sum64(b) for fast path as it's faster.
b := make([]byte, 0, 1024)
for pos := 0; pos < len(ls.data); {
name, newPos := decodeString(ls.syms, ls.data, pos)
value, newPos := decodeString(ls.syms, ls.data, newPos)
if len(b)+len(name)+len(value)+2 >= cap(b) {
// If labels entry is 1KB+, hash the rest of them via Write().
h := xxhash.New()
_, _ = h.Write(b)
for pos < len(ls.data) {
name, pos = decodeString(ls.syms, ls.data, pos)
value, pos = decodeString(ls.syms, ls.data, pos)
_, _ = h.WriteString(name)
_, _ = h.Write(seps)
_, _ = h.WriteString(value)
_, _ = h.Write(seps)
}
return h.Sum64()
}
b = append(b, name...)
b = append(b, seps[0])
b = append(b, value...)
b = append(b, seps[0])
pos = newPos
}
return xxhash.Sum64(b)
}

View file

@ -50,7 +50,7 @@ func ReadLabels(fn string, n int) ([]Labels, error) {
defer f.Close()
scanner := bufio.NewScanner(f)
b := ScratchBuilder{}
b := NewScratchBuilder(0)
var mets []Labels
hashes := map[uint64]struct{}{}

View file

@ -80,22 +80,22 @@ type Parser interface {
//
// This function always returns a valid parser, but might additionally
// return an error if the content type cannot be parsed.
func New(b []byte, contentType string, parseClassicHistograms bool) (Parser, error) {
func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.SymbolTable) (Parser, error) {
if contentType == "" {
return NewPromParser(b), nil
return NewPromParser(b, st), nil
}
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return NewPromParser(b), err
return NewPromParser(b, st), err
}
switch mediaType {
case "application/openmetrics-text":
return NewOpenMetricsParser(b), nil
return NewOpenMetricsParser(b, st), nil
case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms), nil
return NewProtobufParser(b, parseClassicHistograms, st), nil
default:
return NewPromParser(b), nil
return NewPromParser(b, st), nil
}
}

View file

@ -17,6 +17,8 @@ import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
func TestNewParser(t *testing.T) {
@ -91,7 +93,7 @@ func TestNewParser(t *testing.T) {
tt := tt // Copy to local variable before going parallel.
t.Parallel()
p, err := New([]byte{}, tt.contentType, false)
p, err := New([]byte{}, tt.contentType, false, labels.NewSymbolTable())
tt.validateParser(t, p)
if tt.err == "" {
require.NoError(t, err)

View file

@ -97,8 +97,11 @@ type OpenMetricsParser struct {
}
// NewOpenMetricsParser returns a new parser of the byte slice.
func NewOpenMetricsParser(b []byte) Parser {
return &OpenMetricsParser{l: &openMetricsLexer{b: b}}
func NewOpenMetricsParser(b []byte, st *labels.SymbolTable) Parser {
return &OpenMetricsParser{
l: &openMetricsLexer{b: b},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
}
}
// Series returns the bytes of the series, the timestamp if set, and the value

View file

@ -247,7 +247,7 @@ foo_total 17.0 1520879607.789 # {id="counter-test"} 5`
},
}
p := NewOpenMetricsParser([]byte(input))
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable())
i := 0
var res labels.Labels
@ -378,7 +378,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
},
}
p := NewOpenMetricsParser([]byte(input))
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable())
i := 0
var res labels.Labels
@ -400,7 +400,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
require.Equal(t, exp[i].m, string(m))
require.Equal(t, exp[i].t, ts)
require.Equal(t, exp[i].v, v)
require.Equal(t, exp[i].lset, res)
testutil.RequireEqual(t, exp[i].lset, res)
if exp[i].e == nil {
require.False(t, found)
} else {
@ -727,7 +727,7 @@ func TestOpenMetricsParseErrors(t *testing.T) {
}
for i, c := range cases {
p := NewOpenMetricsParser([]byte(c.input))
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable())
var err error
for err == nil {
_, err = p.Next()
@ -792,7 +792,7 @@ func TestOMNullByteHandling(t *testing.T) {
}
for i, c := range cases {
p := NewOpenMetricsParser([]byte(c.input))
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable())
var err error
for err == nil {
_, err = p.Next()

View file

@ -166,8 +166,11 @@ type PromParser struct {
}
// NewPromParser returns a new parser of the byte slice.
func NewPromParser(b []byte) Parser {
return &PromParser{l: &promlexer{b: append(b, '\n')}}
func NewPromParser(b []byte, st *labels.SymbolTable) Parser {
return &PromParser{
l: &promlexer{b: append(b, '\n')},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
}
}
// Series returns the bytes of the series, the timestamp if set, and the value

View file

@ -178,7 +178,7 @@ testmetric{label="\"bar\""} 1`
},
}
p := NewPromParser([]byte(input))
p := NewPromParser([]byte(input), labels.NewSymbolTable())
i := 0
var res labels.Labels
@ -304,7 +304,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
},
}
p := NewPromParser([]byte(input))
p := NewPromParser([]byte(input), labels.NewSymbolTable())
i := 0
var res labels.Labels
@ -325,7 +325,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
require.Equal(t, exp[i].m, string(m))
require.Equal(t, exp[i].t, ts)
require.Equal(t, exp[i].v, v)
require.Equal(t, exp[i].lset, res)
testutil.RequireEqual(t, exp[i].lset, res)
case EntryType:
m, typ := p.Type()
@ -422,7 +422,7 @@ func TestPromParseErrors(t *testing.T) {
}
for i, c := range cases {
p := NewPromParser([]byte(c.input))
p := NewPromParser([]byte(c.input), labels.NewSymbolTable())
var err error
for err == nil {
_, err = p.Next()
@ -476,7 +476,7 @@ func TestPromNullByteHandling(t *testing.T) {
}
for i, c := range cases {
p := NewPromParser([]byte(c.input))
p := NewPromParser([]byte(c.input), labels.NewSymbolTable())
var err error
for err == nil {
_, err = p.Next()
@ -497,7 +497,7 @@ const (
)
func BenchmarkParse(b *testing.B) {
for parserName, parser := range map[string]func([]byte) Parser{
for parserName, parser := range map[string]func([]byte, *labels.SymbolTable) Parser{
"prometheus": NewPromParser,
"openmetrics": NewOpenMetricsParser,
} {
@ -516,8 +516,9 @@ func BenchmarkParse(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
st := labels.NewSymbolTable()
for i := 0; i < b.N; i += promtestdataSampleCount {
p := parser(buf)
p := parser(buf, st)
Outer:
for i < b.N {
@ -544,8 +545,9 @@ func BenchmarkParse(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
st := labels.NewSymbolTable()
for i := 0; i < b.N; i += promtestdataSampleCount {
p := parser(buf)
p := parser(buf, st)
Outer:
for i < b.N {
@ -577,8 +579,9 @@ func BenchmarkParse(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
st := labels.NewSymbolTable()
for i := 0; i < b.N; i += promtestdataSampleCount {
p := parser(buf)
p := parser(buf, st)
Outer:
for i < b.N {

View file

@ -80,13 +80,14 @@ type ProtobufParser struct {
}
// NewProtobufParser returns a parser for the payload in the byte slice.
func NewProtobufParser(b []byte, parseClassicHistograms bool) Parser {
func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable) Parser {
return &ProtobufParser{
in: b,
state: EntryInvalid,
mf: &dto.MetricFamily{},
metricBytes: &bytes.Buffer{},
parseClassicHistograms: parseClassicHistograms,
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
}
}

View file

@ -743,7 +743,7 @@ func TestProtobufParse(t *testing.T) {
}{
{
name: "ignore classic buckets of native histograms",
parser: NewProtobufParser(inputBuf.Bytes(), false),
parser: NewProtobufParser(inputBuf.Bytes(), false, labels.NewSymbolTable()),
expected: []parseResult{
{
m: "go_build_info",
@ -1280,7 +1280,7 @@ func TestProtobufParse(t *testing.T) {
},
{
name: "parse classic and native buckets",
parser: NewProtobufParser(inputBuf.Bytes(), true),
parser: NewProtobufParser(inputBuf.Bytes(), true, labels.NewSymbolTable()),
expected: []parseResult{
{ // 0
m: "go_build_info",

View file

@ -20,6 +20,7 @@ import (
"errors"
"io"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/promql/parser"
)
@ -56,8 +57,11 @@ const (
maxInputSize = 10240
)
// Use package-scope symbol table to avoid memory allocation on every fuzzing operation.
var symbolTable = labels.NewSymbolTable()
func fuzzParseMetricWithContentType(in []byte, contentType string) int {
p, warning := textparse.New(in, contentType, false)
p, warning := textparse.New(in, contentType, false, symbolTable)
if warning != nil {
// An invalid content type is being passed, which should not happen
// in this context.

View file

@ -356,6 +356,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
// or update the expression value for existing elements.
resultFPs := map[uint64]struct{}{}
lb := labels.NewBuilder(labels.EmptyLabels())
sb := labels.NewScratchBuilder(0)
var vec promql.Vector
alerts := make(map[uint64]*Alert, len(res))
for _, smpl := range res {
@ -391,14 +393,14 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
return result
}
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName)
lb.Reset(smpl.Metric)
lb.Del(labels.MetricName)
r.labels.Range(func(l labels.Label) {
lb.Set(l.Name, expand(l.Value))
})
lb.Set(labels.AlertName, r.Name())
sb := labels.ScratchBuilder{}
sb.Reset()
r.annotations.Range(func(a labels.Label) {
sb.Add(a.Name, expand(a.Value))
})

View file

@ -523,11 +523,12 @@ scrape_configs:
loops: map[uint64]loop{
1: noopLoop(),
},
newLoop: newLoop,
logger: nil,
config: cfg1.ScrapeConfigs[0],
client: http.DefaultClient,
metrics: scrapeManager.metrics,
newLoop: newLoop,
logger: nil,
config: cfg1.ScrapeConfigs[0],
client: http.DefaultClient,
metrics: scrapeManager.metrics,
symbolTable: labels.NewSymbolTable(),
}
scrapeManager.scrapePools = map[string]*scrapePool{
"job1": sp,

View file

@ -73,6 +73,10 @@ type scrapePool struct {
client *http.Client
loops map[uint64]loop
symbolTable *labels.SymbolTable
lastSymbolTableCheck time.Time
initialSymbolTableLen int
targetMtx sync.Mutex
// activeTargets and loops must always be synchronized to have the same
// set of hashes.
@ -130,16 +134,18 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
metrics: metrics,
httpOpts: options.HTTPClientOptions,
noDefaultPort: options.NoDefaultPort,
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
symbolTable: labels.NewSymbolTable(),
lastSymbolTableCheck: time.Now(),
logger: logger,
metrics: metrics,
httpOpts: options.HTTPClientOptions,
noDefaultPort: options.NoDefaultPort,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
@ -160,6 +166,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func(ctx context.Context) storage.Appender { return app.Appender(ctx) },
cache,
sp.symbolTable,
offsetSeed,
opts.honorTimestamps,
opts.trackTimestampsStaleness,
@ -348,6 +355,21 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.metrics.targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(start).Seconds(),
)
// Here we take steps to clear out the symbol table if it has grown a lot.
// After waiting some time for things to settle, we take the size of the symbol-table.
// If, after some more time, the table has grown to twice that size, we start a new one.
const minTimeToCleanSymbolTable = 5 * time.Minute
if time.Since(sp.lastSymbolTableCheck) > minTimeToCleanSymbolTable {
if sp.initialSymbolTableLen == 0 {
sp.initialSymbolTableLen = sp.symbolTable.Len()
} else if sp.symbolTable.Len() > 2*sp.initialSymbolTableLen {
sp.symbolTable = labels.NewSymbolTable()
sp.initialSymbolTableLen = 0
}
sp.lastSymbolTableCheck = time.Now()
}
return nil
}
@ -361,7 +383,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.targetMtx.Lock()
var all []*Target
var targets []*Target
lb := labels.NewBuilder(labels.EmptyLabels())
lb := labels.NewBuilderWithSymbolTable(sp.symbolTable)
sp.droppedTargets = []*Target{}
sp.droppedTargetsCount = 0
for _, tg := range tgs {
@ -806,6 +828,7 @@ type scrapeLoop struct {
enableCTZeroIngestion bool
appender func(ctx context.Context) storage.Appender
symbolTable *labels.SymbolTable
sampleMutator labelsMutator
reportSampleMutator labelsMutator
@ -1085,6 +1108,7 @@ func newScrapeLoop(ctx context.Context,
reportSampleMutator labelsMutator,
appender func(ctx context.Context) storage.Appender,
cache *scrapeCache,
symbolTable *labels.SymbolTable,
offsetSeed uint64,
honorTimestamps bool,
trackTimestampsStaleness bool,
@ -1130,6 +1154,7 @@ func newScrapeLoop(ctx context.Context,
buffers: buffers,
cache: cache,
appender: appender,
symbolTable: symbolTable,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
@ -1428,7 +1453,7 @@ type appendErrors struct {
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms)
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable)
if err != nil {
level.Debug(sl.l).Log(
"msg", "Invalid content type on scrape, using prometheus parser as fallback.",
@ -1778,30 +1803,31 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
if scrapeErr == nil {
health = 1
}
b := labels.NewBuilderWithSymbolTable(sl.symbolTable)
if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health, b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds(), b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped), b); err != nil {
return
}
if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(added)); err != nil {
if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(added), b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded), b); err != nil {
return
}
if sl.reportExtraMetrics {
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, sl.timeout.Seconds()); err != nil {
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, sl.timeout.Seconds(), b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, float64(sl.sampleLimit)); err != nil {
if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, float64(sl.sampleLimit), b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeBodySizeBytesMetricName, ts, float64(bytes)); err != nil {
if err = sl.addReportSample(app, scrapeBodySizeBytesMetricName, ts, float64(bytes), b); err != nil {
return
}
}
@ -1812,37 +1838,38 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
ts := timestamp.FromTime(start)
stale := math.Float64frombits(value.StaleNaN)
b := labels.NewBuilder(labels.EmptyLabels())
if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale, b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale, b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale, b); err != nil {
return
}
if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale, b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale, b); err != nil {
return
}
if sl.reportExtraMetrics {
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, stale, b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, stale, b); err != nil {
return
}
if err = sl.addReportSample(app, scrapeBodySizeBytesMetricName, ts, stale); err != nil {
if err = sl.addReportSample(app, scrapeBodySizeBytesMetricName, ts, stale, b); err != nil {
return
}
}
return
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v float64) error {
func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v float64, b *labels.Builder) error {
ce, ok := sl.cache.get(s)
var ref storage.SeriesRef
var lset labels.Labels
@ -1853,8 +1880,9 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
// with scraped metrics in the cache.
// We have to drop it when building the actual metric.
lset = labels.FromStrings(labels.MetricName, string(s[:len(s)-1]))
lset = sl.reportSampleMutator(lset)
b.Reset(labels.EmptyLabels())
b.Set(labels.MetricName, string(s[:len(s)-1]))
lset = sl.reportSampleMutator(b.Labels())
}
ref, err := app.Append(ref, lset, t, v)

View file

@ -279,6 +279,7 @@ func TestScrapePoolReload(t *testing.T) {
logger: nil,
client: http.DefaultClient,
metrics: newTestScrapeMetrics(t),
symbolTable: labels.NewSymbolTable(),
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
@ -357,10 +358,11 @@ func TestScrapePoolReloadPreserveRelabeledIntervalTimeout(t *testing.T) {
loops: map[uint64]loop{
1: noopLoop(),
},
newLoop: newLoop,
logger: nil,
client: http.DefaultClient,
metrics: newTestScrapeMetrics(t),
newLoop: newLoop,
logger: nil,
client: http.DefaultClient,
metrics: newTestScrapeMetrics(t),
symbolTable: labels.NewSymbolTable(),
}
err := sp.reload(reloadCfg)
@ -391,6 +393,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
logger: log.NewNopLogger(),
client: http.DefaultClient,
metrics: newTestScrapeMetrics(t),
symbolTable: labels.NewSymbolTable(),
}
tgs := []*targetgroup.Group{}
@ -623,6 +626,7 @@ func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
logger: nil,
client: http.DefaultClient,
metrics: newTestScrapeMetrics(t),
symbolTable: labels.NewSymbolTable(),
}
tgs := []*targetgroup.Group{
@ -660,6 +664,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
nopMutator,
app,
nil,
labels.NewSymbolTable(),
0,
true,
false,
@ -800,6 +805,7 @@ func TestScrapeLoopRun(t *testing.T) {
nopMutator,
app,
nil,
nil,
0,
true,
false,
@ -942,6 +948,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
nopMutator,
func(ctx context.Context) storage.Appender { return nopAppender{} },
cache,
labels.NewSymbolTable(),
0,
true,
false,
@ -1470,7 +1477,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
fakeRef := storage.SeriesRef(1)
expValue := float64(1)
metric := []byte(`metric{n="1"} 1`)
p, warning := textparse.New(metric, "", false)
p, warning := textparse.New(metric, "", false, labels.NewSymbolTable())
require.NoError(t, warning)
var lset labels.Labels

View file

@ -169,8 +169,8 @@ func (t *Target) offset(interval time.Duration, offsetSeed uint64) time.Duration
}
// Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels() labels.Labels {
b := labels.NewScratchBuilder(t.labels.Len())
func (t *Target) Labels(b *labels.ScratchBuilder) labels.Labels {
b.Reset()
t.labels.Range(func(l labels.Label) {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
b.Add(l.Name, l.Value)

View file

@ -42,7 +42,8 @@ const (
func TestTargetLabels(t *testing.T) {
target := newTestTarget("example.com:80", 0, labels.FromStrings("job", "some_job", "foo", "bar"))
want := labels.FromStrings(model.JobLabel, "some_job", "foo", "bar")
got := target.Labels()
b := labels.NewScratchBuilder(0)
got := target.Labels(&b)
require.Equal(t, want, got)
i := 0
target.LabelsRange(func(l labels.Label) {

View file

@ -176,12 +176,13 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
// FromQueryResult unpacks and sorts a QueryResult proto.
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
b := labels.NewScratchBuilder(0)
series := make([]storage.Series, 0, len(res.Timeseries))
for _, ts := range res.Timeseries {
if err := validateLabelsAndMetricName(ts.Labels); err != nil {
return errSeriesSet{err: err}
}
lbls := labelProtosToLabels(ts.Labels)
lbls := labelProtosToLabels(&b, ts.Labels)
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
}
@ -616,11 +617,11 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
return result, nil
}
func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemplar.Exemplar {
timestamp := ep.Timestamp
return exemplar.Exemplar{
Labels: labelProtosToLabels(ep.Labels),
Labels: labelProtosToLabels(b, ep.Labels),
Value: ep.Value,
Ts: timestamp,
HasTs: timestamp != 0,
@ -760,8 +761,8 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
return metric
}
func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
b := labels.ScratchBuilder{}
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels {
b.Reset()
for _, l := range labelPairs {
b.Add(l.Name, l.Value)
}

View file

@ -788,10 +788,11 @@ func (m *mockWriter) Write(p []byte) (n int, err error) {
type mockChunkSeriesSet struct {
chunkedSeries []*prompb.ChunkedSeries
index int
builder labels.ScratchBuilder
}
func newMockChunkSeriesSet(ss []*prompb.ChunkedSeries) storage.ChunkSeriesSet {
return &mockChunkSeriesSet{chunkedSeries: ss, index: -1}
return &mockChunkSeriesSet{chunkedSeries: ss, index: -1, builder: labels.NewScratchBuilder(0)}
}
func (c *mockChunkSeriesSet) Next() bool {
@ -801,7 +802,7 @@ func (c *mockChunkSeriesSet) Next() bool {
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
return &storage.ChunkSeriesEntry{
Lset: labelProtosToLabels(c.chunkedSeries[c.index].Labels),
Lset: labelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
return &mockChunkIterator{
chunks: c.chunkedSeries[c.index].Chunks,

View file

@ -163,16 +163,23 @@ func TestSampleDelivery(t *testing.T) {
}
}
func TestMetadataDelivery(t *testing.T) {
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration) (*TestWriteClient, *QueueManager) {
c := NewTestWriteClient()
dir := t.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c)
}
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
return m
}
func TestMetadataDelivery(t *testing.T) {
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
m.Start()
defer m.Stop()
@ -192,7 +199,7 @@ func TestMetadataDelivery(t *testing.T) {
require.Len(t, c.receivedMetadata, numMetadata)
// One more write than the rounded qoutient should be performed in order to get samples that didn't
// fit into MaxSamplesPerSend.
require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived)
require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived)
// Make sure the last samples were sent.
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
}
@ -201,17 +208,13 @@ func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
samples, series := createTimeseries(n, n)
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
c := NewTestWriteClient()
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -244,16 +247,8 @@ func TestSampleDeliveryOrder(t *testing.T) {
})
}
c := NewTestWriteClient()
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
c.expectSamples(samples, series)
dir := t.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0)
m.Start()
@ -267,13 +262,10 @@ func TestShutdown(t *testing.T) {
deadline := 1 * time.Second
c := NewTestBlockedWriteClient()
dir := t.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := newTestQueueManager(t, cfg, mcfg, deadline, c)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -306,12 +298,10 @@ func TestSeriesReset(t *testing.T) {
numSegments := 4
numSeries := 25
dir := t.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := newTestQueueManager(t, cfg, mcfg, deadline, c)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -330,17 +320,12 @@ func TestReshard(t *testing.T) {
nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
c := NewTestWriteClient()
c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
c := NewTestWriteClient()
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
m.Start()
@ -363,7 +348,7 @@ func TestReshard(t *testing.T) {
c.waitForExpectedData(t)
}
func TestReshardRaceWithStop(*testing.T) {
func TestReshardRaceWithStop(t *testing.T) {
c := NewTestWriteClient()
var m *QueueManager
h := sync.Mutex{}
@ -375,8 +360,7 @@ func TestReshardRaceWithStop(*testing.T) {
exitCh := make(chan struct{})
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c)
m.Start()
h.Unlock()
h.Lock()
@ -410,8 +394,7 @@ func TestReshardPartialBatch(t *testing.T) {
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c)
m.StoreSeries(series, 0)
m.Start()
@ -454,9 +437,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -479,11 +460,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
}
func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
m.Start()
defer m.Stop()
@ -525,12 +502,8 @@ func TestShouldReshard(t *testing.T) {
},
}
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@ -550,7 +523,7 @@ func TestShouldReshard(t *testing.T) {
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.ScratchBuilder{}
lb := labels.NewScratchBuilder(1 + len(extraLabels))
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ {
@ -787,9 +760,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
return err
}
builder := labels.NewScratchBuilder(0)
count := 0
for _, ts := range reqProto.Timeseries {
labels := labelProtosToLabels(ts.Labels)
labels := labelProtosToLabels(&builder, ts.Labels)
seriesName := labels.Get("__name__")
for _, sample := range ts.Samples {
count++
@ -904,10 +878,7 @@ func BenchmarkSampleSend(b *testing.B) {
cfg.MinShards = 20
cfg.MaxShards = 20
dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -1083,15 +1054,9 @@ func TestProcessExternalLabels(t *testing.T) {
}
func TestCalculateDesiredShards(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
samplesIn := m.dataIn
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
@ -1160,15 +1125,8 @@ func TestCalculateDesiredShards(t *testing.T) {
}
func TestCalculateDesiredShardsDetail(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
samplesIn := m.dataIn
for _, tc := range []struct {
name string
@ -1393,10 +1351,7 @@ func TestDropOldTimeSeries(t *testing.T) {
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c)
m.StoreSeries(series, 0)
m.Start()
@ -1416,7 +1371,7 @@ func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...la
newSamples := make([]record.RefSample, 0, numSamples)
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.ScratchBuilder{}
lb := labels.NewScratchBuilder(1 + len(extraLabels))
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
// We create half of the samples in the past.

View file

@ -195,6 +195,7 @@ func TestSeriesSetFilter(t *testing.T) {
type mockedRemoteClient struct {
got *prompb.Query
store []*prompb.TimeSeries
b labels.ScratchBuilder
}
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
@ -210,7 +211,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom
q := &prompb.QueryResult{}
for _, s := range c.store {
l := labelProtosToLabels(s.Labels)
l := labelProtosToLabels(&c.b, s.Labels)
var notMatch bool
for _, m := range matchers {
@ -242,6 +243,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
{Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
{Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
},
b: labels.NewScratchBuilder(0),
}
for _, tc := range []struct {

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
@ -112,9 +113,10 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
err = app.Commit()
}()
b := labels.NewScratchBuilder(0)
var exemplarErr error
for _, ts := range req.Timeseries {
labels := labelProtosToLabels(ts.Labels)
labels := labelProtosToLabels(&b, ts.Labels)
if !labels.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
samplesWithInvalidLabels++
@ -137,7 +139,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
e := exemplarProtoToExemplar(&b, ep)
_, exemplarErr = app.AppendExemplar(0, labels, e)
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)

View file

@ -55,18 +55,19 @@ func TestRemoteWriteHandler(t *testing.T) {
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
b := labels.NewScratchBuilder(0)
i := 0
j := 0
k := 0
for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(ts.Labels)
labels := labelProtosToLabels(&b, ts.Labels)
for _, s := range ts.Samples {
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
exemplarLabels := labelProtosToLabels(&b, e.Labels)
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}

View file

@ -417,7 +417,8 @@ func (db *DB) replayWAL() error {
func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
var (
dec record.Decoder
syms = labels.NewSymbolTable() // One table for the whole WAL.
dec = record.NewDecoder(syms)
lastRef = chunks.HeadSeriesRef(db.nextRef.Load())
decoded = make(chan interface{}, 10)

View file

@ -184,7 +184,7 @@ func TestCommit(t *testing.T) {
// Read records from WAL and check for expected count of series, samples, and exemplars.
var (
r = wlog.NewReader(sr)
dec record.Decoder
dec = record.NewDecoder(labels.NewSymbolTable())
walSeriesCount, walSamplesCount, walExemplarsCount, walHistogramCount, walFloatHistogramCount int
)
@ -293,7 +293,7 @@ func TestRollback(t *testing.T) {
// Read records from WAL and check for expected count of series and samples.
var (
r = wlog.NewReader(sr)
dec record.Decoder
dec = record.NewDecoder(labels.NewSymbolTable())
walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int
)
@ -737,7 +737,7 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
defer sr.Close()
r := wlog.NewReader(sr)
var dec record.Decoder
dec := record.NewDecoder(labels.NewSymbolTable())
for r.Next() {
rec := r.Record()
if dec.Type(rec) == record.Exemplars {

View file

@ -4031,7 +4031,7 @@ func TestOOOWALWrite(t *testing.T) {
var (
records []interface{}
dec record.Decoder
dec record.Decoder = record.NewDecoder(labels.NewSymbolTable())
)
for r.Next() {
rec := r.Record()
@ -5390,7 +5390,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
require.NoError(t, err)
sr, err := wlog.NewSegmentsReader(originalWblDir)
require.NoError(t, err)
var dec record.Decoder
dec := record.NewDecoder(labels.NewSymbolTable())
r, markers, addedRecs := wlog.NewReader(sr), 0, 0
for r.Next() {
rec := r.Record()

View file

@ -717,6 +717,7 @@ func (h *Head) Init(minValidTime int64) error {
h.startWALReplayStatus(startFrom, endAt)
syms := labels.NewSymbolTable() // One table for the whole WAL.
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
if err == nil && startFrom >= snapIdx {
sr, err := wlog.NewSegmentsReader(dir)
@ -731,7 +732,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil {
return fmt.Errorf("backfill checkpoint: %w", err)
}
h.updateWALReplayStatusRead(startFrom)
@ -764,7 +765,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
}
err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
}
@ -792,7 +793,7 @@ func (h *Head) Init(minValidTime int64) error {
}
sr := wlog.NewSegmentBufReader(s)
err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef)
err = h.loadWBL(wlog.NewReader(sr), syms, multiRef, lastMmapRef)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
}

View file

@ -172,7 +172,7 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
require.NoError(t, sr.Close())
}()
var dec record.Decoder
dec := record.NewDecoder(labels.NewSymbolTable())
r := wlog.NewReader(sr)
for r.Next() {

View file

@ -52,7 +52,7 @@ type histogramRecord struct {
fh *histogram.FloatHistogram
}
func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs atomic.Uint64
@ -69,7 +69,6 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
processors = make([]walSubsetProcessor, concurrency)
exemplarsInput chan record.RefExemplar
dec record.Decoder
shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
@ -137,6 +136,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
go func() {
defer close(decoded)
var err error
dec := record.NewDecoder(syms)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
@ -645,7 +645,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
}
func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, m-map markers, that referenced a series we don't know about
// for error reporting.
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
@ -657,7 +657,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, concurrency)
dec record.Decoder
dec = record.NewDecoder(syms)
shards = make([][]record.RefSample, concurrency)
decodedCh = make(chan interface{}, 10)
@ -1360,7 +1360,8 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
errChan = make(chan error, concurrency)
refSeries map[chunks.HeadSeriesRef]*memSeries
exemplarBuf []record.RefExemplar
dec record.Decoder
syms = labels.NewSymbolTable() // New table for the whole snapshot.
dec = record.NewDecoder(syms)
)
wg.Add(concurrency)

View file

@ -1118,6 +1118,7 @@ type Reader struct {
symbols *Symbols
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
// as there are not many and they are half of all lookups.
st *labels.SymbolTable // TODO: see if we can merge this with nameSymbols.
dec *Decoder
@ -1177,6 +1178,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
b: b,
c: c,
postings: map[string][]postingOffset{},
st: labels.NewSymbolTable(),
}
// Verify header.
@ -1653,6 +1655,8 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
if d.Err() != nil {
return d.Err()
}
builder.SetSymbolTable(r.st)
builder.Reset()
err := r.dec.Series(d.Get(), builder, chks)
if err != nil {
return fmt.Errorf("read series: %w", err)

View file

@ -192,11 +192,14 @@ type RefMmapMarker struct {
}
// Decoder decodes series, sample, metadata and tombstone records.
// The zero value is ready to use.
type Decoder struct {
builder labels.ScratchBuilder
}
func NewDecoder(t *labels.SymbolTable) Decoder { // FIXME remove t
return Decoder{builder: labels.NewScratchBuilder(0)}
}
// Type returns the type of the record.
// Returns RecordUnknown if no valid record type is found.
func (d *Decoder) Type(rec []byte) Type {

View file

@ -29,7 +29,7 @@ import (
func TestRecord_EncodeDecode(t *testing.T) {
var enc Encoder
var dec Decoder
dec := NewDecoder(labels.NewSymbolTable())
series := []RefSeries{
{
@ -187,7 +187,7 @@ func TestRecord_EncodeDecode(t *testing.T) {
// Bugfix check for pull/521 and pull/523.
func TestRecord_Corrupted(t *testing.T) {
var enc Encoder
var dec Decoder
dec := NewDecoder(labels.NewSymbolTable())
t.Run("Test corrupted series record", func(t *testing.T) {
series := []RefSeries{

View file

@ -31,6 +31,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
@ -859,6 +860,7 @@ func newWALReader(files []*segmentFile, l log.Logger) *walReader {
files: files,
buf: make([]byte, 0, 128*4096),
crc32: newCRC32(),
dec: record.NewDecoder(labels.NewSymbolTable()),
}
}

View file

@ -510,7 +510,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
r := wlog.NewReader(sr)
var res []interface{}
var dec record.Decoder
dec := record.NewDecoder(labels.NewSymbolTable())
for r.Next() {
rec := r.Record()

View file

@ -28,6 +28,7 @@ import (
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
@ -154,7 +155,8 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
tstones []tombstones.Stone
exemplars []record.RefExemplar
metadata []record.RefMetadata
dec record.Decoder
st = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function.
dec = record.NewDecoder(st)
enc record.Encoder
buf []byte
recs [][]byte

View file

@ -237,7 +237,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, err)
defer sr.Close()
var dec record.Decoder
dec := record.NewDecoder(labels.NewSymbolTable())
var series []record.RefSeries
var metadata []record.RefMetadata
r := NewReader(sr)

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/record"
)
@ -532,7 +533,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var (
dec record.Decoder
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
@ -669,7 +670,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
var (
dec record.Decoder
dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function.
series []record.RefSeries
)
for r.Next() && !isClosed(w.quit) {

View file

@ -1009,6 +1009,7 @@ func (api *API) targets(r *http.Request) apiFuncResult {
targetsActive := api.targetRetriever(r.Context()).TargetsActive()
activeKeys, numTargets := sortKeys(targetsActive)
res.ActiveTargets = make([]*Target, 0, numTargets)
builder := labels.NewScratchBuilder(0)
for _, key := range activeKeys {
if scrapePool != "" && key != scrapePool {
@ -1025,7 +1026,7 @@ func (api *API) targets(r *http.Request) apiFuncResult {
res.ActiveTargets = append(res.ActiveTargets, &Target{
DiscoveredLabels: target.DiscoveredLabels(),
Labels: target.Labels(),
Labels: target.Labels(&builder),
ScrapePool: key,
ScrapeURL: target.URL().String(),
GlobalURL: globalURL.String(),
@ -1101,6 +1102,7 @@ func (api *API) targetMetadata(r *http.Request) apiFuncResult {
}
}
builder := labels.NewScratchBuilder(0)
metric := r.FormValue("metric")
res := []metricMetadata{}
for _, tt := range api.targetRetriever(r.Context()).TargetsActive() {
@ -1108,15 +1110,16 @@ func (api *API) targetMetadata(r *http.Request) apiFuncResult {
if limit >= 0 && len(res) >= limit {
break
}
targetLabels := t.Labels(&builder)
// Filter targets that don't satisfy the label matchers.
if matchTarget != "" && !matchLabels(t.Labels(), matchers) {
if matchTarget != "" && !matchLabels(targetLabels, matchers) {
continue
}
// If no metric is specified, get the full list for the target.
if metric == "" {
for _, md := range t.ListMetadata() {
res = append(res, metricMetadata{
Target: t.Labels(),
Target: targetLabels,
Metric: md.Metric,
Type: md.Type,
Help: md.Help,
@ -1128,7 +1131,7 @@ func (api *API) targetMetadata(r *http.Request) apiFuncResult {
// Get metadata for the specified metric.
if md, ok := t.GetMetadata(metric); ok {
res = append(res, metricMetadata{
Target: t.Labels(),
Target: targetLabels,
Type: md.Type,
Help: md.Help,
Unit: md.Unit,

View file

@ -391,7 +391,7 @@ func TestFederationWithNativeHistograms(t *testing.T) {
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
p := textparse.NewProtobufParser(body, false)
p := textparse.NewProtobufParser(body, false, labels.NewSymbolTable())
var actVec promql.Vector
metricFamilies := 0
l := labels.Labels{}