Merge pull request #12048 from bboreham/faster-targets

Scraping targets are synced by creating the full set, then adding/removing any which have changed.
This PR speeds up the process of creating the full set.

I added a benchmark for `TargetsFromGroup`; it uses configuration from a typical Kubernetes SD.

The crux of the change is to do relabeling inside labels.Builder instead of converting to labels.Labels and back again for every rule. The change is broken into several commits for easier review.

This is a breaking change to `scrape.PopulateLabels()`, but `relabel.Process` is left as-is, with a new `relabel.ProcessBuilder` option.
This commit is contained in:
Bryan Boreham 2023-03-09 11:10:01 +00:00 committed by GitHub
commit b96b89ef8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 224 additions and 62 deletions

View file

@ -1261,8 +1261,11 @@ func checkTargetGroupsForAlertmanager(targetGroups []*targetgroup.Group, amcfg *
}
func checkTargetGroupsForScrapeConfig(targetGroups []*targetgroup.Group, scfg *config.ScrapeConfig) error {
var targets []*scrape.Target
lb := labels.NewBuilder(labels.EmptyLabels())
for _, tg := range targetGroups {
_, failures := scrape.TargetsFromGroup(tg, scfg, false)
var failures []error
targets, failures = scrape.TargetsFromGroup(tg, scfg, false, targets, lb)
if len(failures) > 0 {
first := failures[0]
return first

View file

@ -115,22 +115,22 @@ outerLoop:
func getSDCheckResult(targetGroups []*targetgroup.Group, scrapeConfig *config.ScrapeConfig, noDefaultScrapePort bool) []sdCheckResult {
sdCheckResults := []sdCheckResult{}
lb := labels.NewBuilder(labels.EmptyLabels())
for _, targetGroup := range targetGroups {
for _, target := range targetGroup.Targets {
labelSlice := make([]labels.Label, 0, len(target)+len(targetGroup.Labels))
lb.Reset(labels.EmptyLabels())
for name, value := range target {
labelSlice = append(labelSlice, labels.Label{Name: string(name), Value: string(value)})
lb.Set(string(name), string(value))
}
for name, value := range targetGroup.Labels {
if _, ok := target[name]; !ok {
labelSlice = append(labelSlice, labels.Label{Name: string(name), Value: string(value)})
lb.Set(string(name), string(value))
}
}
targetLabels := labels.New(labelSlice...)
res, orig, err := scrape.PopulateLabels(targetLabels, scrapeConfig, noDefaultScrapePort)
res, orig, err := scrape.PopulateLabels(lb, scrapeConfig, noDefaultScrapePort)
result := sdCheckResult{
DiscoveredLabels: orig,
Labels: res,

View file

@ -530,6 +530,43 @@ func (b *Builder) Set(n, v string) *Builder {
return b
}
func (b *Builder) Get(n string) string {
for _, d := range b.del {
if d == n {
return ""
}
}
for _, a := range b.add {
if a.Name == n {
return a.Value
}
}
return b.base.Get(n)
}
// Range calls f on each label in the Builder.
// If f calls Set or Del on b then this may affect what callbacks subsequently happen.
func (b *Builder) Range(f func(l Label)) {
origAdd, origDel := b.add, b.del
b.base.Range(func(l Label) {
if !slices.Contains(origDel, l.Name) && !contains(origAdd, l.Name) {
f(l)
}
})
for _, a := range origAdd {
f(a)
}
}
func contains(s []Label, n string) bool {
for _, a := range s {
if a.Name == n {
return true
}
}
return false
}
// Labels returns the labels from the builder, adding them to res if non-nil.
// Argument res can be the same as b.base, if caller wants to overwrite that slice.
// If no modifications were made, the original labels are returned.
@ -545,20 +582,12 @@ func (b *Builder) Labels(res Labels) Labels {
} else {
res = res[:0]
}
Outer:
// Justification that res can be the same slice as base: in this loop
// we move forward through base, and either skip an element or assign
// it to res at its current position or an earlier position.
for _, l := range b.base {
for _, n := range b.del {
if l.Name == n {
continue Outer
}
}
for _, la := range b.add {
if l.Name == la.Name {
continue Outer
}
if slices.Contains(b.del, l.Name) || contains(b.add, l.Name) {
continue
}
res = append(res, l)
}

View file

@ -587,6 +587,41 @@ func (b *Builder) Set(n, v string) *Builder {
return b
}
func (b *Builder) Get(n string) string {
if slices.Contains(b.del, n) {
return ""
}
for _, a := range b.add {
if a.Name == n {
return a.Value
}
}
return b.base.Get(n)
}
// Range calls f on each label in the Builder.
// If f calls Set or Del on b then this may affect what callbacks subsequently happen.
func (b *Builder) Range(f func(l Label)) {
origAdd, origDel := b.add, b.del
b.base.Range(func(l Label) {
if !slices.Contains(origDel, l.Name) && !contains(origAdd, l.Name) {
f(l)
}
})
for _, a := range origAdd {
f(a)
}
}
func contains(s []Label, n string) bool {
for _, a := range s {
if a.Name == n {
return true
}
}
return false
}
// Labels returns the labels from the builder, adding them to res if non-nil.
// Argument res can be the same as b.base, if caller wants to overwrite that slice.
// If no modifications were made, the original labels are returned.

View file

@ -207,45 +207,52 @@ func (re Regexp) String() string {
// If a label set is dropped, EmptyLabels and false is returned.
// May return the input labelSet modified.
func Process(lbls labels.Labels, cfgs ...*Config) (ret labels.Labels, keep bool) {
lb := labels.NewBuilder(labels.EmptyLabels())
for _, cfg := range cfgs {
lbls, keep = relabel(lbls, cfg, lb)
if !keep {
return labels.EmptyLabels(), false
}
lb := labels.NewBuilder(lbls)
if !ProcessBuilder(lb, cfgs...) {
return labels.EmptyLabels(), false
}
return lbls, true
return lb.Labels(lbls), true
}
func relabel(lset labels.Labels, cfg *Config, lb *labels.Builder) (ret labels.Labels, keep bool) {
// ProcessBuilder is like Process, but the caller passes a labels.Builder
// containing the initial set of labels, which is mutated by the rules.
func ProcessBuilder(lb *labels.Builder, cfgs ...*Config) (keep bool) {
for _, cfg := range cfgs {
keep = relabel(cfg, lb)
if !keep {
return false
}
}
return true
}
func relabel(cfg *Config, lb *labels.Builder) (keep bool) {
var va [16]string
values := va[:0]
if len(cfg.SourceLabels) > cap(values) {
values = make([]string, 0, len(cfg.SourceLabels))
}
for _, ln := range cfg.SourceLabels {
values = append(values, lset.Get(string(ln)))
values = append(values, lb.Get(string(ln)))
}
val := strings.Join(values, cfg.Separator)
lb.Reset(lset)
switch cfg.Action {
case Drop:
if cfg.Regex.MatchString(val) {
return labels.EmptyLabels(), false
return false
}
case Keep:
if !cfg.Regex.MatchString(val) {
return labels.EmptyLabels(), false
return false
}
case DropEqual:
if lset.Get(cfg.TargetLabel) == val {
return labels.EmptyLabels(), false
if lb.Get(cfg.TargetLabel) == val {
return false
}
case KeepEqual:
if lset.Get(cfg.TargetLabel) != val {
return labels.EmptyLabels(), false
if lb.Get(cfg.TargetLabel) != val {
return false
}
case Replace:
indexes := cfg.Regex.FindStringSubmatchIndex(val)
@ -274,20 +281,20 @@ func relabel(lset labels.Labels, cfg *Config, lb *labels.Builder) (ret labels.La
mod := binary.BigEndian.Uint64(hash[8:]) % cfg.Modulus
lb.Set(cfg.TargetLabel, fmt.Sprintf("%d", mod))
case LabelMap:
lset.Range(func(l labels.Label) {
lb.Range(func(l labels.Label) {
if cfg.Regex.MatchString(l.Name) {
res := cfg.Regex.ReplaceAllString(l.Name, cfg.Replacement)
lb.Set(res, l.Value)
}
})
case LabelDrop:
lset.Range(func(l labels.Label) {
lb.Range(func(l labels.Label) {
if cfg.Regex.MatchString(l.Name) {
lb.Del(l.Name)
}
})
case LabelKeep:
lset.Range(func(l labels.Label) {
lb.Range(func(l labels.Label) {
if !cfg.Regex.MatchString(l.Name) {
lb.Del(l.Name)
}
@ -296,5 +303,5 @@ func relabel(lset labels.Labels, cfg *Config, lb *labels.Builder) (ret labels.La
panic(fmt.Errorf("relabel: unknown relabel action type %q", cfg.Action))
}
return lb.Labels(lset), true
return true
}

View file

@ -431,7 +431,7 @@ func TestPopulateLabels(t *testing.T) {
for _, c := range cases {
in := c.in.Copy()
res, orig, err := PopulateLabels(c.in, c.cfg, c.noDefaultPort)
res, orig, err := PopulateLabels(labels.NewBuilder(c.in), c.cfg, c.noDefaultPort)
if c.err != "" {
require.EqualError(t, err, c.err)
} else {
@ -443,7 +443,7 @@ func TestPopulateLabels(t *testing.T) {
}
}
func loadConfiguration(t *testing.T, c string) *config.Config {
func loadConfiguration(t testing.TB, c string) *config.Config {
t.Helper()
cfg := &config.Config{}

View file

@ -490,9 +490,11 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.targetMtx.Lock()
var all []*Target
var targets []*Target
lb := labels.NewBuilder(labels.EmptyLabels())
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, failures := TargetsFromGroup(tg, sp.config, sp.noDefaultPort)
targets, failures := TargetsFromGroup(tg, sp.config, sp.noDefaultPort, targets, lb)
for _, err := range failures {
level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
}

View file

@ -349,7 +349,7 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels,
// PopulateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort bool) (res, orig labels.Labels, err error) {
func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig, noDefaultPort bool) (res, orig labels.Labels, err error) {
// Copy labels into the labelset for the target if they are not set already.
scrapeLabels := []labels.Label{
{Name: model.JobLabel, Value: cfg.JobName},
@ -358,10 +358,9 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
{Name: model.SchemeLabel, Value: cfg.Scheme},
}
lb := labels.NewBuilder(lset)
for _, l := range scrapeLabels {
if lv := lset.Get(l.Name); lv == "" {
if lb.Get(l.Name) == "" {
lb.Set(l.Name, l.Value)
}
}
@ -373,18 +372,16 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
}
preRelabelLabels := lb.Labels(labels.EmptyLabels())
lset, keep := relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)
keep := relabel.ProcessBuilder(lb, cfg.RelabelConfigs...)
// Check if the target was dropped.
if !keep {
return labels.EmptyLabels(), preRelabelLabels, nil
}
if v := lset.Get(model.AddressLabel); v == "" {
if v := lb.Get(model.AddressLabel); v == "" {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("no address")
}
lb = labels.NewBuilder(lset)
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) (string, string, bool) {
@ -398,8 +395,8 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
return "", "", err == nil
}
addr := lset.Get(model.AddressLabel)
scheme := lset.Get(model.SchemeLabel)
addr := lb.Get(model.AddressLabel)
scheme := lb.Get(model.SchemeLabel)
host, port, add := addPort(addr)
// If it's an address with no trailing port, infer it based on the used scheme
// unless the no-default-scrape-port feature flag is present.
@ -435,7 +432,7 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
return labels.EmptyLabels(), labels.EmptyLabels(), err
}
interval := lset.Get(model.ScrapeIntervalLabel)
interval := lb.Get(model.ScrapeIntervalLabel)
intervalDuration, err := model.ParseDuration(interval)
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.Errorf("error parsing scrape interval: %v", err)
@ -444,7 +441,7 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape interval cannot be 0")
}
timeout := lset.Get(model.ScrapeTimeoutLabel)
timeout := lb.Get(model.ScrapeTimeoutLabel)
timeoutDuration, err := model.ParseDuration(timeout)
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.Errorf("error parsing scrape timeout: %v", err)
@ -459,14 +456,14 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
lset.Range(func(l labels.Label) {
lb.Range(func(l labels.Label) {
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
lb.Del(l.Name)
}
})
// Default the instance label to the target address.
if v := lset.Get(model.InstanceLabel); v == "" {
if v := lb.Get(model.InstanceLabel); v == "" {
lb.Set(model.InstanceLabel, addr)
}
@ -485,25 +482,23 @@ func PopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig, noDefaultPort
}
// TargetsFromGroup builds targets based on the given TargetGroup and config.
func TargetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig, noDefaultPort bool) ([]*Target, []error) {
targets := make([]*Target, 0, len(tg.Targets))
func TargetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig, noDefaultPort bool, targets []*Target, lb *labels.Builder) ([]*Target, []error) {
targets = targets[:0]
failures := []error{}
for i, tlset := range tg.Targets {
lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))
lb.Reset(labels.EmptyLabels())
for ln, lv := range tlset {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
lb.Set(string(ln), string(lv))
}
for ln, lv := range tg.Labels {
if _, ok := tlset[ln]; !ok {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
lb.Set(string(ln), string(lv))
}
}
lset := labels.New(lbls...)
lset, origLabels, err := PopulateLabels(lset, cfg, noDefaultPort)
lset, origLabels, err := PopulateLabels(lb, cfg, noDefaultPort)
if err != nil {
failures = append(failures, errors.Wrapf(err, "instance %d in group %s", i, tg))
}

View file

@ -375,7 +375,8 @@ func TestTargetsFromGroup(t *testing.T) {
ScrapeTimeout: model.Duration(10 * time.Second),
ScrapeInterval: model.Duration(1 * time.Minute),
}
targets, failures := TargetsFromGroup(&targetgroup.Group{Targets: []model.LabelSet{{}, {model.AddressLabel: "localhost:9090"}}}, &cfg, false)
lb := labels.NewBuilder(labels.EmptyLabels())
targets, failures := TargetsFromGroup(&targetgroup.Group{Targets: []model.LabelSet{{}, {model.AddressLabel: "localhost:9090"}}}, &cfg, false, nil, lb)
if len(targets) != 1 {
t.Fatalf("Expected 1 target, got %v", len(targets))
}
@ -386,3 +387,93 @@ func TestTargetsFromGroup(t *testing.T) {
t.Fatalf("Expected error %s, got %s", expectedError, failures[0])
}
}
func BenchmarkTargetsFromGroup(b *testing.B) {
// Simulate Kubernetes service-discovery and use subset of rules from typical Prometheus config.
cfgText := `
scrape_configs:
- job_name: job1
scrape_interval: 15s
scrape_timeout: 10s
relabel_configs:
- source_labels: [__meta_kubernetes_pod_container_port_name]
separator: ;
regex: .*-metrics
replacement: $1
action: keep
- source_labels: [__meta_kubernetes_pod_phase]
separator: ;
regex: Succeeded|Failed
replacement: $1
action: drop
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_pod_label_name]
separator: /
regex: (.*)
target_label: job
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_namespace]
separator: ;
regex: (.*)
target_label: namespace
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_pod_name]
separator: ;
regex: (.*)
target_label: pod
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_pod_container_name]
separator: ;
regex: (.*)
target_label: container
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_pod_name, __meta_kubernetes_pod_container_name,
__meta_kubernetes_pod_container_port_name]
separator: ':'
regex: (.*)
target_label: instance
replacement: $1
action: replace
- separator: ;
regex: (.*)
target_label: cluster
replacement: dev-us-central-0
action: replace
`
config := loadConfiguration(b, cfgText)
for _, nTargets := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%d_targets", nTargets), func(b *testing.B) {
targets := []model.LabelSet{}
for i := 0; i < nTargets; i++ {
labels := model.LabelSet{
model.AddressLabel: model.LabelValue(fmt.Sprintf("localhost:%d", i)),
"__meta_kubernetes_namespace": "some_namespace",
"__meta_kubernetes_pod_container_name": "some_container",
"__meta_kubernetes_pod_container_port_name": "http-metrics",
"__meta_kubernetes_pod_container_port_number": "80",
"__meta_kubernetes_pod_label_name": "some_name",
"__meta_kubernetes_pod_name": "some_pod",
"__meta_kubernetes_pod_phase": "Running",
}
// Add some more labels, because Kubernetes SD generates a lot
for i := 0; i < 10; i++ {
labels[model.LabelName(fmt.Sprintf("__meta_kubernetes_pod_label_extra%d", i))] = "a_label_abcdefgh"
labels[model.LabelName(fmt.Sprintf("__meta_kubernetes_pod_labelpresent_extra%d", i))] = "true"
}
targets = append(targets, labels)
}
var tgets []*Target
lb := labels.NewBuilder(labels.EmptyLabels())
group := &targetgroup.Group{Targets: targets}
for i := 0; i < b.N; i++ {
tgets, _ = TargetsFromGroup(group, config.ScrapeConfigs[0], false, tgets, lb)
if len(targets) != nTargets {
b.Fatalf("Expected %d targets, got %d", nTargets, len(targets))
}
}
})
}
}