Merge branch 'dev-2.0' into go-kit/log

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-09-15 23:15:27 +05:30
commit 3f0267c548
26 changed files with 1027 additions and 602 deletions

View file

@ -4,6 +4,7 @@ language: go
go:
- 1.8.x
- 1.x
go_import_path: github.com/prometheus/prometheus

View file

@ -1,13 +1,17 @@
## v2.0.0-beta.2 / 2017-08-18
## v2.0.0-beta.4 / 2017-09-14
This release includes numerous changes to the new storage layer. The main changes are:
* [CHANGES] Deterministic block boundaries
* [ENHANCEMENTS] Avoid memory usage spikes during compactions
* [CHANGES] Single, compacted write ahead log
* [CHANGES] Single in-memory block with garbage collection
* [ENHANCEMENTS] Cache series dropped via `metric_relabel_configs`
* [ENHANCEMENTS] Pool byte buffers for scraping
It's generally advised to start with a clean storage directory. As a best effort,
running `sed -i .bkp 's/generation/level/g' */meta.json` from within the directory
should be sufficient to migrate data written by v2.0.0-beta.0.
Overall the changes achieve a baseline reduction in memory consumption and reduce
peak memory usage by 30-40% compared to the 2.0.0-beta.2 release.
This release requires a clean storage directory and is not compatible with files
created by previous beta releases.
## 1.7.1 / 2017-06-12

View file

@ -1 +1 @@
2.0.0-beta.2
2.0.0-beta.4

View file

@ -1010,10 +1010,11 @@ type KubernetesRole string
// The valid options for KubernetesRole.
const (
KubernetesRoleNode = "node"
KubernetesRolePod = "pod"
KubernetesRoleService = "service"
KubernetesRoleEndpoint = "endpoints"
KubernetesRoleNode KubernetesRole = "node"
KubernetesRolePod KubernetesRole = "pod"
KubernetesRoleService KubernetesRole = "service"
KubernetesRoleEndpoint KubernetesRole = "endpoints"
KubernetesRoleIngress KubernetesRole = "ingress"
)
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -1022,7 +1023,7 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error
return err
}
switch *c {
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint:
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleIngress:
return nil
default:
return fmt.Errorf("Unknown Kubernetes SD role %q", *c)
@ -1141,6 +1142,7 @@ type EC2SDConfig struct {
AccessKey string `yaml:"access_key,omitempty"`
SecretKey Secret `yaml:"secret_key,omitempty"`
Profile string `yaml:"profile,omitempty"`
RoleARN string `yaml:"role_arn,omitempty"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
Port int `yaml:"port"`

View file

@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
@ -72,6 +73,7 @@ type Discovery struct {
aws *aws.Config
interval time.Duration
profile string
roleARN string
port int
logger log.Logger
}
@ -91,6 +93,7 @@ func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery {
Credentials: creds,
},
profile: conf.Profile,
roleARN: conf.RoleARN,
interval: time.Duration(conf.RefreshInterval),
port: conf.Port,
logger: logger,
@ -151,7 +154,13 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
return nil, fmt.Errorf("could not create aws session: %s", err)
}
ec2s := ec2.New(sess)
var ec2s *ec2.EC2
if d.roleARN != "" {
creds := stscreds.NewCredentials(sess, d.roleARN)
ec2s = ec2.New(sess, &aws.Config{Credentials: creds})
} else {
ec2s = ec2.New(sess)
}
tg = &config.TargetGroup{
Source: *d.aws.Region,
}

View file

@ -158,18 +158,19 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) {
endpoints, isEndpoints := o.(*apiv1.Endpoints)
if !isEndpoints {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
endpoints, ok = deletedState.Obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj)
}
endpoints, ok := o.(*apiv1.Endpoints)
if ok {
return endpoints, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
endpoints, ok = deletedState.Obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj)
}
return endpoints, nil
}

View file

@ -0,0 +1,185 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
)
// Ingress implements discovery of Kubernetes ingresss.
type Ingress struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
}
// NewIngress returns a new ingress discovery.
func NewIngress(l log.Logger, inf cache.SharedInformer) *Ingress {
return &Ingress{logger: l, informer: inf, store: inf.GetStore()}
}
// Run implements the TargetProvider interface.
func (s *Ingress) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range s.store.List() {
tg := s.buildIngress(o.(*v1beta1.Ingress))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for ingress updates.
send := func(tg *config.TargetGroup) {
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
}
}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "add").Inc()
ingress, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error())
return
}
send(s.buildIngress(ingress))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "delete").Inc()
ingress, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error())
return
}
send(&config.TargetGroup{Source: ingressSource(ingress)})
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("ingress", "update").Inc()
ingress, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error())
return
}
send(s.buildIngress(ingress))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func convertToIngress(o interface{}) (*v1beta1.Ingress, error) {
ingress, ok := o.(*v1beta1.Ingress)
if ok {
return ingress, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
ingress, ok = deletedState.Obj.(*v1beta1.Ingress)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Ingress object: %v", deletedState.Obj)
}
return ingress, nil
}
func ingressSource(s *v1beta1.Ingress) string {
return "ingress/" + s.Namespace + "/" + s.Name
}
const (
ingressNameLabel = metaLabelPrefix + "ingress_name"
ingressLabelPrefix = metaLabelPrefix + "ingress_label_"
ingressAnnotationPrefix = metaLabelPrefix + "ingress_annotation_"
ingressSchemeLabel = metaLabelPrefix + "ingress_scheme"
ingressHostLabel = metaLabelPrefix + "ingress_host"
ingressPathLabel = metaLabelPrefix + "ingress_path"
)
func ingressLabels(ingress *v1beta1.Ingress) model.LabelSet {
ls := make(model.LabelSet, len(ingress.Labels)+len(ingress.Annotations)+2)
ls[ingressNameLabel] = lv(ingress.Name)
ls[namespaceLabel] = lv(ingress.Namespace)
for k, v := range ingress.Labels {
ln := strutil.SanitizeLabelName(ingressLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
for k, v := range ingress.Annotations {
ln := strutil.SanitizeLabelName(ingressAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}
func pathsFromIngressRule(rv *v1beta1.IngressRuleValue) []string {
if rv.HTTP == nil {
return []string{"/"}
}
paths := make([]string, len(rv.HTTP.Paths))
for n, p := range rv.HTTP.Paths {
path := p.Path
if path == "" {
path = "/"
}
paths[n] = path
}
return paths
}
func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *config.TargetGroup {
tg := &config.TargetGroup{
Source: ingressSource(ingress),
}
tg.Labels = ingressLabels(ingress)
schema := "http"
if ingress.Spec.TLS != nil {
schema = "https"
}
for _, rule := range ingress.Spec.Rules {
paths := pathsFromIngressRule(&rule.IngressRuleValue)
for _, path := range paths {
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(rule.Host),
ingressSchemeLabel: lv(schema),
ingressHostLabel: lv(rule.Host),
ingressPathLabel: lv(path),
})
}
}
return tg
}

View file

@ -0,0 +1,136 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"testing"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
)
func ingressStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1beta1.Ingress).ObjectMeta.Name, nil
}
func newFakeIngressInformer() *fakeInformer {
return newFakeInformer(ingressStoreKeyFunc)
}
func makeTestIngressDiscovery() (*Ingress, *fakeInformer) {
i := newFakeIngressInformer()
return NewIngress(nil, i), i
}
func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
return &v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "testingress",
Namespace: "default",
Labels: map[string]string{"testlabel": "testvalue"},
Annotations: map[string]string{"testannotation": "testannotationvalue"},
},
Spec: v1beta1.IngressSpec{
TLS: tls,
Rules: []v1beta1.IngressRule{
{
Host: "example.com",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{Path: "/"},
{Path: "/foo"},
},
},
},
},
{
// No backend config, ignored
Host: "nobackend.example.com",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{},
},
},
{
Host: "test.example.com",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{{}},
},
},
},
},
},
}
}
func expectedTargetGroups(tls bool) []*config.TargetGroup {
scheme := "http"
if tls {
scheme = "https"
}
return []*config.TargetGroup{
{
Targets: []model.LabelSet{
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/",
"__address__": "example.com",
},
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/foo",
"__address__": "example.com",
},
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
"__meta_kubernetes_ingress_host": "test.example.com",
"__address__": "test.example.com",
"__meta_kubernetes_ingress_path": "/",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_ingress_name": "testingress",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_ingress_label_testlabel": "testvalue",
"__meta_kubernetes_ingress_annotation_testannotation": "testannotationvalue",
},
Source: "ingress/default/testingress",
},
}
}
func TestIngressDiscoveryInitial(t *testing.T) {
n, i := makeTestIngressDiscovery()
i.GetStore().Add(makeIngress(nil))
k8sDiscoveryTest{
discovery: n,
expectedInitial: expectedTargetGroups(false),
}.Run(t)
}
func TestIngressDiscoveryInitialTLS(t *testing.T) {
n, i := makeTestIngressDiscovery()
i.GetStore().Add(makeIngress([]v1beta1.IngressTLS{{}}))
k8sDiscoveryTest{
discovery: n,
expectedInitial: expectedTargetGroups(true),
}.Run(t)
}

View file

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
apiv1 "k8s.io/client-go/pkg/api/v1"
extensionsv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
@ -155,6 +156,7 @@ const resyncPeriod = 10 * time.Minute
// Run implements the TargetProvider interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
rclient := d.client.Core().RESTClient()
reclient := d.client.Extensions().RESTClient()
namespaces := d.getNamespaces()
@ -197,7 +199,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
for _, namespace := range namespaces {
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
pod := NewPod(
log.With(d.logger, "k8s_sd", "pod"),
log.With(d.logger, "role", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go pod.informer.Run(ctx.Done())
@ -217,7 +219,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
for _, namespace := range namespaces {
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
svc := NewService(
log.With(d.logger, "k8s_sd", "service"),
log.With(d.logger, "role", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
)
go svc.informer.Run(ctx.Done())
@ -232,10 +234,30 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}()
}
wg.Wait()
case "ingress":
var wg sync.WaitGroup
for _, namespace := range namespaces {
ilw := cache.NewListWatchFromClient(reclient, "ingresses", namespace, nil)
ingress := NewIngress(
log.With(d.logger, "role", "ingress"),
cache.NewSharedInformer(ilw, &extensionsv1beta1.Ingress{}, resyncPeriod),
)
go ingress.informer.Run(ctx.Done())
for !ingress.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
ingress.Run(ctx, ch)
}()
}
wg.Wait()
case "node":
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
node := NewNode(
log.With(d.logger, "k8s_sd", "node"),
log.With(d.logger, "role", "node"),
cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod),
)
go node.informer.Run(ctx.Done())

View file

@ -103,18 +103,19 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
func convertToNode(o interface{}) (*apiv1.Node, error) {
node, isNode := o.(*apiv1.Node)
if !isNode {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
node, ok = deletedState.Obj.(*apiv1.Node)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
}
node, ok := o.(*apiv1.Node)
if ok {
return node, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
node, ok = deletedState.Obj.(*apiv1.Node)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
}
return node, nil
}
@ -130,7 +131,7 @@ const (
)
func nodeLabels(n *apiv1.Node) model.LabelSet {
ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2)
ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+1)
ls[nodeNameLabel] = lv(n.Name)

View file

@ -111,18 +111,19 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
func convertToPod(o interface{}) (*apiv1.Pod, error) {
pod, isPod := o.(*apiv1.Pod)
if !isPod {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
pod, ok = deletedState.Obj.(*apiv1.Pod)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
}
pod, ok := o.(*apiv1.Pod)
if ok {
return pod, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
pod, ok = deletedState.Obj.(*apiv1.Pod)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
}
return pod, nil
}

View file

@ -102,18 +102,18 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
func convertToService(o interface{}) (*apiv1.Service, error) {
service, isService := o.(*apiv1.Service)
if !isService {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
service, ok = deletedState.Obj.(*apiv1.Service)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj)
}
service, ok := o.(*apiv1.Service)
if ok {
return service, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
service, ok = deletedState.Obj.(*apiv1.Service)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj)
}
return service, nil
}
@ -133,6 +133,7 @@ func serviceLabels(svc *apiv1.Service) model.LabelSet {
ls := make(model.LabelSet, len(svc.Labels)+len(svc.Annotations)+2)
ls[serviceNameLabel] = lv(svc.Name)
ls[namespaceLabel] = lv(svc.Namespace)
for k, v := range svc.Labels {
ln := strutil.SanitizeLabelName(serviceLabelPrefix + k)
@ -151,7 +152,6 @@ func (s *Service) buildService(svc *apiv1.Service) *config.TargetGroup {
Source: serviceSource(svc),
}
tg.Labels = serviceLabels(svc)
tg.Labels[namespaceLabel] = lv(svc.Namespace)
for _, port := range svc.Spec.Ports {
addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10))

View file

@ -192,7 +192,7 @@ scrape_configs:
- source_labels: [__address__]
target_label: __param_target
- target_label: __address__
replacement: blackbox
replacement: blackbox-exporter.example.com:9115
- source_labels: [__param_target]
target_label: instance
- action: labelmap
@ -202,6 +202,40 @@ scrape_configs:
- source_labels: [__meta_kubernetes_service_name]
target_label: kubernetes_name
# Example scrape config for probing ingresses via the Blackbox Exporter.
#
# The relabeling allows the actual ingress scrape endpoint to be configured
# via the following annotations:
#
# * `prometheus.io/probe`: Only probe services that have a value of `true`
- job_name: 'kubernetes-ingresses'
metrics_path: /probe
params:
module: [http_2xx]
kubernetes_sd_configs:
- role: ingress
relabel_configs:
- source_labels: [__meta_kubernetes_ingress_annotation_prometheus_io_probe]
action: keep
regex: true
- source_labels: [__meta_kubernetes_ingress_scheme,__address__,__meta_kubernetes_ingress_path]
regex: (.+);(.+);(.+)
replacement: ${1}://${2}${3}
target_label: __param_target
- target_label: __address__
replacement: blackbox-exporter.example.com:9115
- source_labels: [__param_target]
target_label: instance
- action: labelmap
regex: __meta_kubernetes_ingress_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_ingress_name]
target_label: kubernetes_name
# Example scrape config for pods
#
# The relabeling allows the actual pod scrape endpoint to be configured via the

View file

@ -48,6 +48,10 @@ func NewClient(logger log.Logger, conf influx.HTTPConfig, db string, rp string)
os.Exit(1)
}
if logger == nil {
logger = log.NewNopLogger()
}
return &Client{
logger: logger,
client: c,

75
pkg/pool/pool.go Normal file
View file

@ -0,0 +1,75 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool
import "sync"
// BytesPool is a bucketed pool for variably sized byte slices.
type BytesPool struct {
buckets []sync.Pool
sizes []int
}
// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize
// increasing by the given factor.
func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool {
if minSize < 1 {
panic("invalid minimum pool size")
}
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
panic("invalid factor")
}
var sizes []int
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BytesPool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
}
return p
}
// Get returns a new byte slices that fits the given size.
func (p *BytesPool) Get(sz int) []byte {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b, ok := p.buckets[i].Get().([]byte)
if !ok {
b = make([]byte, 0, bktSize)
}
return b
}
return make([]byte, 0, sz)
}
// Put returns a byte slice to the right bucket in the pool.
func (p *BytesPool) Put(b []byte) {
for i, bktSize := range p.sizes {
if cap(b) > bktSize {
continue
}
p.buckets[i].Put(b[:0])
return
}
}

View file

@ -28,6 +28,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"golang.org/x/net/context"
@ -35,6 +36,8 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/pool"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
@ -121,8 +124,8 @@ func init() {
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable Appendable
ctx context.Context
logger log.Logger
ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
@ -133,39 +136,46 @@ type scrapePool struct {
loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
logger log.Logger
maxAheadTime time.Duration
newLoop func(*Target, scraper) loop
}
const maxAheadTime = 10 * time.Minute
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
if logger == nil {
logger = log.NewNopLogger()
}
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
}
newLoop := func(
ctx context.Context,
s scraper,
app, reportApp func() storage.Appender,
l log.Logger,
) loop {
return newScrapeLoop(ctx, s, app, reportApp, l)
buffers := pool.NewBytesPool(163, 100e6, 3)
sp := &scrapePool{
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(t *Target, s scraper) loop {
return newScrapeLoop(sp.ctx, s,
log.With(logger, "target", t),
buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) },
sp.appender,
)
}
return &scrapePool{
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: logger,
maxAheadTime: 10 * time.Minute,
}
return sp
}
// stop terminates all scrape loops and returns after they all terminated.
@ -217,15 +227,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
log.With(sp.logger, "target", t.labels),
)
newLoop = sp.newLoop(t, s)
)
wg.Add(1)
@ -287,15 +289,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
log.With(sp.logger, "target", t.labels),
)
l := sp.newLoop(t, s)
sp.targets[hash] = t
sp.loops[hash] = l
@ -327,18 +321,58 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait()
}
// sampleAppender returns an appender for ingested samples from the target.
func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels {
lb := labels.NewBuilder(lset)
if sp.config.HonorLabels {
for _, l := range target.Labels() {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
} else {
for _, l := range target.Labels() {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
}
lb.Set(l.Name, l.Value)
}
}
res := lb.Labels()
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
res = relabel.Process(res, mrc...)
}
return res
}
func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
lb := labels.NewBuilder(lset)
for _, l := range target.Labels() {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
}
lb.Set(l.Name, l.Value)
}
return lb.Labels()
}
// appender returns an appender for ingested samples from the target.
func (sp *scrapePool) appender() storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
if sp.maxAheadTime > 0 {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)),
}
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The limit is applied after metrics are potentially dropped via relabeling.
@ -348,42 +382,9 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
limit: int(sp.config.SampleLimit),
}
}
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
Appender: app,
relabelings: mrc,
}
}
if sp.config.HonorLabels {
app = honorLabelsAppender{
Appender: app,
labels: target.Labels(),
}
} else {
app = ruleLabelsAppender{
Appender: app,
labels: target.Labels(),
}
}
return app
}
// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
return ruleLabelsAppender{
Appender: app,
labels: target.Labels(),
}
}
// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
scrape(ctx context.Context, w io.Writer) error
@ -471,12 +472,15 @@ type refEntry struct {
}
type scrapeLoop struct {
scraper scraper
l log.Logger
cache *scrapeCache
scraper scraper
l log.Logger
cache *scrapeCache
lastScrapeSize int
buffers *pool.BytesPool
appender func() storage.Appender
reportAppender func() storage.Appender
appender func() storage.Appender
sampleMutator labelsMutator
reportSampleMutator labelsMutator
ctx context.Context
scrapeCtx context.Context
@ -493,6 +497,11 @@ type scrapeCache struct {
refs map[string]*refEntry // Parsed string to ref.
lsets map[uint64]*lsetCacheEntry // Ref to labelset and string.
// Cache of dropped metric strings and their iteration. The iteration must
// be a pointer so we can update it without setting a new entry with an unsafe
// string in addDropped().
dropped map[string]*uint64
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
@ -504,6 +513,7 @@ func newScrapeCache() *scrapeCache {
return &scrapeCache{
refs: map[string]*refEntry{},
lsets: map[uint64]*lsetCacheEntry{},
dropped: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
}
@ -519,6 +529,11 @@ func (c *scrapeCache) iterDone() {
delete(c.lsets, e.ref)
}
}
for s, iter := range c.dropped {
if *iter < c.iter {
delete(c.dropped, s)
}
}
// Swap current and previous series.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
@ -556,6 +571,19 @@ func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash ui
c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash}
}
func (c *scrapeCache) addDropped(met string) {
iter := c.iter
c.dropped[met] = &iter
}
func (c *scrapeCache) getDropped(met string) bool {
iterp, ok := c.dropped[met]
if ok {
*iterp = c.iter
}
return ok
}
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
c.seriesCur[hash] = lset
}
@ -573,20 +601,28 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
func newScrapeLoop(
ctx context.Context,
sc scraper,
app, reportApp func() storage.Appender,
l log.Logger,
buffers *pool.BytesPool,
sampleMutator labelsMutator,
reportSampleMutator labelsMutator,
appender func() storage.Appender,
) *scrapeLoop {
if l == nil {
l = log.NewNopLogger()
}
if buffers == nil {
buffers = pool.NewBytesPool(1e3, 1e6, 3)
}
sl := &scrapeLoop{
scraper: sc,
appender: app,
cache: newScrapeCache(),
reportAppender: reportApp,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
scraper: sc,
buffers: buffers,
cache: newScrapeCache(),
appender: appender,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
}
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
@ -632,12 +668,20 @@ mainLoop:
time.Since(last).Seconds(),
)
}
b := sl.buffers.Get(sl.lastScrapeSize)
buf := bytes.NewBuffer(b)
scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel()
var b []byte
if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
@ -657,6 +701,8 @@ mainLoop:
}
}
sl.buffers.Put(b)
if scrapeErr == nil {
scrapeErr = appErr
}
@ -776,6 +822,9 @@ loop:
t = *tp
}
if sl.cache.getDropped(yoloString(met)) {
continue
}
ref, ok := sl.cache.getRef(yoloString(met))
if ok {
lset := sl.cache.lsets[ref].lset
@ -787,9 +836,6 @@ loop:
}
case storage.ErrNotFound:
ok = false
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
@ -828,6 +874,16 @@ loop:
} else {
mets = p.Metric(&lset)
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
// The label set may be set to nil to indicate dropping.
if lset == nil {
sl.cache.addDropped(mets)
continue
}
}
var ref uint64
@ -835,9 +891,6 @@ loop:
// TODO(fabxc): also add a dropped-cache?
switch err {
case nil:
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample:
err = nil
numOutOfOrder++
@ -892,8 +945,6 @@ loop:
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case errSeriesDropped:
err = nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
@ -928,8 +979,7 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
if err == nil {
health = 1
}
app := sl.reportAppender()
app := sl.appender()
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
app.Rollback()
@ -952,7 +1002,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
func (sl *scrapeLoop) reportStale(start time.Time) error {
ts := timestamp.FromTime(start)
app := sl.reportAppender()
app := sl.appender()
stale := math.Float64frombits(value.StaleNaN)
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
@ -999,10 +1050,14 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
lset := labels.Labels{
labels.Label{Name: labels.MetricName, Value: s},
}
hash := lset.Hash()
lset = sl.reportSampleMutator(lset)
ref, err := app.Add(lset, t, v)
switch err {
case nil:
sl.cache.addRef(s2, ref, lset, lset.Hash())
sl.cache.addRef(s2, ref, lset, hash)
return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil

View file

@ -28,7 +28,6 @@ import (
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
@ -145,7 +144,7 @@ func TestScrapePoolReload(t *testing.T) {
}
// On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender, _ log.Logger) loop {
newLoop := func(_ *Target, s scraper) loop {
l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second {
@ -167,7 +166,7 @@ func TestScrapePoolReload(t *testing.T) {
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: log.NewNopLogger(),
logger: nil,
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
@ -228,92 +227,48 @@ func TestScrapePoolReload(t *testing.T) {
}
}
func TestScrapePoolReportAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, nil)
cfg.HonorLabels = false
wrapped := sp.reportAppender(target)
wrapped := sp.appender()
rl, ok := wrapped.(ruleLabelsAppender)
tl, ok := wrapped.(*timeLimitAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
t.Fatalf("Expected timeLimitAppender but got %T", wrapped)
}
if _, ok := rl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", rl.Appender)
if _, ok := tl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", tl.Appender)
}
cfg.HonorLabels = true
wrapped = sp.reportAppender(target)
hl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if _, ok := rl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", hl.Appender)
}
}
func TestScrapePoolSampleAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, nil)
sp.maxAheadTime = 0
cfg.HonorLabels = false
wrapped := sp.sampleAppender(target)
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
re, ok := rl.Appender.(relabelAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.Appender)
}
if _, ok := re.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", re.Appender)
}
cfg.HonorLabels = true
cfg.SampleLimit = 100
wrapped = sp.sampleAppender(target)
hl, ok := wrapped.(honorLabelsAppender)
wrapped = sp.appender()
sl, ok := wrapped.(*limitAppender)
if !ok {
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
t.Fatalf("Expected limitAppender but got %T", wrapped)
}
re, ok = hl.Appender.(relabelAppender)
tl, ok = sl.Appender.(*timeLimitAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.Appender)
t.Fatalf("Expected limitAppender but got %T", sl.Appender)
}
lm, ok := re.Appender.(*limitAppender)
if !ok {
t.Fatalf("Expected limitAppender but got %T", lm.Appender)
}
if _, ok := lm.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", re.Appender)
if _, ok := tl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", tl.Appender)
}
}
func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil)
sl := newScrapeLoop(context.Background(),
scraper,
nil, nil,
nopMutator,
nopMutator,
nil,
)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are started asynchronously. Thus it's possible, that a loop is stopped
@ -358,22 +313,28 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
}
}
func TestScrapeLoopStop(t *testing.T) {
appender := &collectResultAppender{}
reportAppender := &collectResultAppender{}
var (
signal = make(chan struct{})
func nopMutator(l labels.Labels) labels.Labels { return l }
scraper = &testScraper{}
app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return reportAppender }
numScrapes = 0
func TestScrapeLoopStop(t *testing.T) {
var (
signal = make(chan struct{})
appender = &collectResultAppender{}
scraper = &testScraper{}
app = func() storage.Appender { return appender }
)
defer close(signal)
sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil)
sl := newScrapeLoop(context.Background(),
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// Terminate loop after 2 scrapes.
numScrapes := 0
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes == 2 {
@ -394,25 +355,25 @@ func TestScrapeLoopStop(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
if len(appender.result) < 2 {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result))
// We expected 1 actual sample for each scrape plus 4 for report samples.
// At least 2 scrapes were made, plus the final stale markers.
if len(appender.result) < 5*3 || len(appender.result)%5 != 0 {
t.Fatalf("Expected at least 3 scrapes with 4 samples each, got %d samples", len(appender.result))
}
if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v))
// All samples in a scrape must have the same timestmap.
var ts int64
for i, s := range appender.result {
if i%5 == 0 {
ts = s.t
} else if s.t != ts {
t.Fatalf("Unexpected multiple timestamps within single scrape")
}
}
if len(reportAppender.result) < 8 {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 8, len(reportAppender.result))
}
if len(reportAppender.result)%4 != 0 {
t.Fatalf("Appended samples not as expected. Wanted: samples mod 4 == 0 Got: %d samples", len(reportAppender.result))
}
if !value.IsStaleNaN(reportAppender.result[len(reportAppender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(reportAppender.result[len(reportAppender.result)].v))
}
if reportAppender.result[len(reportAppender.result)-1].t != appender.result[len(appender.result)-1].t {
t.Fatalf("Expected last append and report sample to have same timestamp. Append: stale NaN Report: %x", appender.result[len(appender.result)-1].t, reportAppender.result[len(reportAppender.result)-1].t)
// All samples from the last scrape must be stale markers.
for _, s := range appender.result[len(appender.result)-5:] {
if !value.IsStaleNaN(s.v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v))
}
}
}
@ -421,14 +382,19 @@ func TestScrapeLoopRun(t *testing.T) {
signal = make(chan struct{})
errc = make(chan error)
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
reportApp = func() storage.Appender { return &nopAppender{} }
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// The loop must terminate during the initial offset if the context
// is canceled.
@ -466,7 +432,13 @@ func TestScrapeLoopRun(t *testing.T) {
}
ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, reportApp, nil)
sl = newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
go func() {
sl.run(time.Second, 100*time.Millisecond, errc)
@ -501,19 +473,23 @@ func TestScrapeLoopRun(t *testing.T) {
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{}
var (
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return &nopAppender{} }
numScrapes = 0
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// Succeed once, several failures, then stop.
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
@ -523,7 +499,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
} else if numScrapes == 5 {
cancel()
}
return fmt.Errorf("Scrape failed.")
return fmt.Errorf("scrape failed")
}
go func() {
@ -537,31 +513,37 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
if len(appender.result) != 2 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result))
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// each scrape successful or not.
if len(appender.result) != 22 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
}
if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42)
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0)
}
if !value.IsStaleNaN(appender.result[1].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v))
if !value.IsStaleNaN(appender.result[5].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v))
}
}
func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
appender := &collectResultAppender{}
var (
signal = make(chan struct{})
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return &nopAppender{} }
numScrapes = 0
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -576,7 +558,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
} else if numScrapes == 3 {
cancel()
}
return fmt.Errorf("Scrape failed.")
return fmt.Errorf("scrape failed")
}
go func() {
@ -590,25 +572,29 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
if len(appender.result) != 2 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result))
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// each scrape successful or not.
if len(appender.result) != 14 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
}
if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42)
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0)
}
if !value.IsStaleNaN(appender.result[1].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v))
if !value.IsStaleNaN(appender.result[5].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v))
}
}
func TestScrapeLoopAppend(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now)
if err != nil {
@ -641,10 +627,12 @@ func TestScrapeLoopAppend(t *testing.T) {
func TestScrapeLoopAppendStaleness(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
@ -684,10 +672,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
@ -710,128 +699,23 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
if !reflect.DeepEqual(want, app.result) {
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result)
}
}
func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) {
cases := []struct {
appender func() storage.Appender
up float64
scrapeSamplesScraped float64
scrapeSamplesScrapedPostMetricRelabelling float64
}{
{
appender: func() storage.Appender { return nopAppender{} },
up: 1,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 3,
},
{
appender: func() storage.Appender {
return &limitAppender{Appender: nopAppender{}, limit: 3}
},
up: 1,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 3,
},
{
appender: func() storage.Appender {
return &limitAppender{Appender: nopAppender{}, limit: 2}
},
up: 0,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 3,
},
{
appender: func() storage.Appender {
return &relabelAppender{
Appender: &limitAppender{Appender: nopAppender{}, limit: 2},
relabelings: []*config.RelabelConfig{
&config.RelabelConfig{
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp("a"),
Action: config.RelabelDrop,
},
},
}
},
up: 1,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 2,
},
}
for i, c := range cases {
reportAppender := &collectResultAppender{}
var (
signal = make(chan struct{})
scraper = &testScraper{}
numScrapes = 0
reportApp = func() storage.Appender {
// Get result of the 2nd scrape.
if numScrapes == 2 {
return reportAppender
} else {
return nopAppender{}
}
}
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil)
// Setup a series to be stale, then 3 samples, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
if numScrapes == 1 {
w.Write([]byte("stale 0\n"))
return nil
} else if numScrapes == 2 {
w.Write([]byte("a 0\nb 0\nc 0 \n"))
return nil
} else if numScrapes == 3 {
cancel()
}
return fmt.Errorf("Scrape failed.")
}
go func() {
sl.run(10*time.Millisecond, time.Hour, nil)
signal <- struct{}{}
}()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
if len(reportAppender.result) != 4 {
t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result))
}
if reportAppender.result[0].v != c.up {
t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0])
}
if reportAppender.result[2].v != c.scrapeSamplesScraped {
t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2])
}
if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling {
t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3])
}
}
}
func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
var (
scraper = &testScraper{}
reportAppender = &collectResultAppender{}
reportApp = func() storage.Appender { return reportAppender }
scraper = &testScraper{}
appender = &collectResultAppender{}
app = func() storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
cancel()
@ -840,31 +724,37 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
sl.run(10*time.Millisecond, time.Hour, nil)
if reportAppender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v)
if appender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v)
}
}
func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
var (
scraper = &testScraper{}
reportAppender = &collectResultAppender{}
reportApp = func() storage.Appender { return reportAppender }
scraper = &testScraper{}
appender = &collectResultAppender{}
app = func() storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
cancel()
w.Write([]byte("a{l=\"\xff\"} 0\n"))
w.Write([]byte("a{l=\"\xff\"} 1\n"))
return nil
}
sl.run(10*time.Millisecond, time.Hour, nil)
if reportAppender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v)
if appender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v)
}
}
@ -891,10 +781,13 @@ func (app *errorAppender) AddFast(lset labels.Labels, ref uint64, t int64, v flo
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
app := &errorAppender{}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
sl := newScrapeLoop(context.Background(),
nil,
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
)
now := time.Unix(1, 0)
@ -916,15 +809,17 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil,
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender {
return &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
}
},
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now().Add(20 * time.Minute)

View file

@ -253,63 +253,6 @@ func (app *timeLimitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v
return nil
}
// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
storage.Appender
labels labels.Labels
}
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
}
lb.Set(l.Name, l.Value)
}
return app.Appender.Add(lb.Labels(), t, v)
}
type honorLabelsAppender struct {
storage.Appender
labels labels.Labels
}
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
return app.Appender.Add(lb.Labels(), t, v)
}
// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
storage.Appender
relabelings []*config.RelabelConfig
}
var errSeriesDropped = errors.New("series dropped")
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lset = relabel.Process(lset, app.relabelings...)
if lset == nil {
return 0, errSeriesDropped
}
return app.Appender.Add(lset, t, v)
}
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling.

View file

@ -17,7 +17,6 @@ import (
"math/rand"
"os"
"path/filepath"
"runtime"
"sort"
"time"
@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}
c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds())
// We might have done quite a few allocs. Enforce a GC so they do not accumulate
// with subsequent compactions or head GCs.
runtime.GC()
}(time.Now())
dir := filepath.Join(dest, meta.ULID.String())
@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "write postings")
}
}
// Write a postings list containing all series.
all := make([]uint64, i)
for i := range all {
all[i] = uint64(i)
}
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return errors.Wrap(err, "write 'all' postings")
}
return nil
}

View file

@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"sync"
@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) {
}
changes = true
runtime.GC()
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
runtime.GC()
}
// Check for compactions of multiple blocks.
@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) {
return changes, errors.Wrap(err, "delete compacted block")
}
}
runtime.GC()
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
runtime.GC()
}
return changes, nil

View file

@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// uvarintTempStr decodes like uvarintStr but the returned string is
// not safe to use if the underyling buffer changes.
func (d *decbuf) uvarintTempStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := yoloString(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {

View file

@ -15,7 +15,6 @@ package tsdb
import (
"math"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -402,10 +401,13 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if s == nil {
return errors.Wrap(ErrNotFound, "unknown series")
}
if err := s.appendable(t, v); err != nil {
s.Lock()
err := s.appendable(t, v)
s.Unlock()
if err != nil {
return err
}
if t < a.mint {
return ErrOutOfBounds
}
@ -435,7 +437,10 @@ func (a *headAppender) Commit() error {
total := len(a.samples)
for _, s := range a.samples {
s.series.Lock()
ok, chunkCreated := s.series.append(s.T, s.V)
s.series.Unlock()
if !ok {
total--
}
@ -509,8 +514,6 @@ Outer:
// gc removes data before the minimum timestmap from the head.
func (h *Head) gc() {
defer runtime.GC()
// Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime()
@ -672,9 +675,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
s := h.head.series.getByID(sid)
s.mtx.RLock()
s.Lock()
c := s.chunk(int(cid))
s.mtx.RUnlock()
s.Unlock()
// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
@ -694,9 +697,10 @@ type safeChunk struct {
}
func (c *safeChunk) Iterator() chunks.Iterator {
c.s.mtx.RLock()
defer c.s.mtx.RUnlock()
return c.s.iterator(c.cid)
c.s.Lock()
it := c.s.iterator(c.cid)
c.s.Unlock()
return it
}
// func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") }
@ -803,8 +807,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
}
*lbls = append((*lbls)[:0], s.lset...)
s.mtx.RLock()
defer s.mtx.RUnlock()
s.Lock()
defer s.Unlock()
*chks = (*chks)[:0]
@ -956,11 +960,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
for hash, all := range s.hashes[i] {
for _, series := range all {
series.mtx.Lock()
series.Lock()
rmChunks += series.truncateChunksBefore(mint)
if len(series.chunks) > 0 {
series.mtx.Unlock()
series.Unlock()
continue
}
@ -983,7 +987,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
s.locks[j].Unlock()
}
series.mtx.Unlock()
series.Unlock()
}
}
@ -1040,8 +1044,10 @@ type sample struct {
v float64
}
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and its the callers responsibility to lock it.
type memSeries struct {
mtx sync.RWMutex
sync.Mutex
ref uint64
lset labels.Labels
@ -1143,8 +1149,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
const samplesPerChunk = 120
s.mtx.Lock()
c := s.head()
if c == nil {
@ -1152,7 +1156,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
chunkCreated = true
}
if c.maxTime >= t {
s.mtx.Unlock()
return false, chunkCreated
}
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
@ -1175,8 +1178,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
s.mtx.Unlock()
return true, chunkCreated
}

View file

@ -292,10 +292,22 @@ func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkM
w.buf2.putUvarint(len(chunks))
for _, c := range chunks {
if len(chunks) > 0 {
c := chunks[0]
w.buf2.putVarint64(c.MinTime)
w.buf2.putVarint64(c.MaxTime)
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
w.buf2.putUvarint64(c.Ref)
t0 := c.MaxTime
ref0 := int64(c.Ref)
for _, c := range chunks[1:] {
w.buf2.putUvarint64(uint64(c.MinTime - t0))
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
t0 = c.MaxTime
w.buf2.putVarint64(int64(c.Ref) - ref0)
ref0 = int64(c.Ref)
}
}
w.buf1.reset()
@ -335,10 +347,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
for _, s := range symbols {
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
// NOTE: len(s) gives the number of runes, not the number of bytes.
// Therefore the read-back length for strings with unicode characters will
// be off when not using putUvarintStr.
w.buf2.putUvarintStr(s)
}
@ -636,7 +644,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ {
keys = append(keys, d2.uvarintStr())
keys = append(keys, d2.uvarintTempStr())
}
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
@ -673,7 +681,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
d := r.decbufAt(int(o))
s := d.uvarintStr()
s := d.uvarintTempStr()
if d.err() != nil {
return "", errors.Wrapf(d.err(), "read symbol at %d", o)
}
@ -688,7 +696,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) {
sym := make(map[string]struct{}, count)
for ; count > 0; count-- {
s := d2.uvarintStr()
s := d2.uvarintTempStr()
sym[s] = struct{}{}
}
@ -775,17 +783,34 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
// Read the chunks meta data.
k = int(d2.uvarint())
for i := 0; i < k; i++ {
mint := d2.varint64()
maxt := d2.varint64()
off := d2.uvarint64()
if k == 0 {
return nil
}
t0 := d2.varint64()
maxt := int64(d2.uvarint64()) + t0
ref0 := int64(d2.uvarint64())
*chks = append(*chks, ChunkMeta{
Ref: uint64(ref0),
MinTime: t0,
MaxTime: maxt,
})
t0 = maxt
for i := 1; i < k; i++ {
mint := int64(d2.uvarint64()) + t0
maxt := int64(d2.uvarint64()) + mint
ref0 += d2.varint64()
t0 = maxt
if d2.err() != nil {
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
}
*chks = append(*chks, ChunkMeta{
Ref: off,
Ref: uint64(ref0),
MinTime: mint,
MaxTime: maxt,
})

View file

@ -52,13 +52,16 @@ const (
WALEntryDeletes WALEntryType = 4
)
// SamplesCB is the callback after reading samples.
// SamplesCB is the callback after reading samples. The passed slice
// is only valid until the call returns.
type SamplesCB func([]RefSample) error
// SeriesCB is the callback after reading series.
// SeriesCB is the callback after reading series. The passed slice
// is only valid until the call returns.
type SeriesCB func([]RefSeries) error
// DeletesCB is the callback after reading deletes.
// DeletesCB is the callback after reading deletes. The passed slice
// is only valid until the call returns.
type DeletesCB func([]Stone) error
// WAL is a write ahead log that can log new series labels and samples.
@ -395,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error {
buf := w.getBuffer()
flag := w.encodeSeries(buf, series)
w.mtx.Lock()
defer w.mtx.Unlock()
err := w.write(WALEntrySeries, flag, buf.get())
w.putBuffer(buf)
@ -410,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error {
tf.minSeries = s.Ref
}
}
if w.flushInterval <= 0 {
return errors.Wrap(w.Sync(), "sync")
}
return nil
}
@ -422,6 +425,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
buf := w.getBuffer()
flag := w.encodeSamples(buf, samples)
w.mtx.Lock()
defer w.mtx.Unlock()
err := w.write(WALEntrySamples, flag, buf.get())
w.putBuffer(buf)
@ -436,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
tf.maxTime = s.T
}
}
if w.flushInterval <= 0 {
return errors.Wrap(w.Sync(), "sync")
}
return nil
}
@ -448,6 +451,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error {
buf := w.getBuffer()
flag := w.encodeDeletes(buf, stones)
w.mtx.Lock()
defer w.mtx.Unlock()
err := w.write(WALEntryDeletes, flag, buf.get())
w.putBuffer(buf)
@ -464,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error {
}
}
}
if w.flushInterval <= 0 {
return errors.Wrap(w.Sync(), "sync")
}
return nil
}
@ -522,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) {
func (w *SegmentWAL) cut() error {
// Sync current head to disk and close.
if hf := w.head(); hf != nil {
if err := w.sync(); err != nil {
return err
}
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := hf.Truncate(off); err != nil {
return err
}
if err := hf.Close(); err != nil {
if err := w.flush(); err != nil {
return err
}
// Finish last segment asynchronously to not block the WAL moving along
// in the new segment.
go func() {
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Truncate(off); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Sync(); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Close(); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
}()
}
p, _, err := nextSequenceFile(w.dirFile.Name())
@ -546,9 +556,11 @@ func (w *SegmentWAL) cut() error {
return err
}
if err = w.dirFile.Sync(); err != nil {
return err
}
go func() {
if err = w.dirFile.Sync(); err != nil {
w.logger.Log("msg", "sync WAL directory", "err", err)
}
}()
w.files = append(w.files, newSegmentFile(f))
@ -594,6 +606,9 @@ func (w *SegmentWAL) sync() error {
if err := w.flush(); err != nil {
return err
}
if w.head() == nil {
return nil
}
return fileutil.Fdatasync(w.head().File)
}
@ -655,8 +670,6 @@ const (
)
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
w.mtx.Lock()
defer w.mtx.Unlock()
// Cut to the next segment if the entry exceeds the file size unless it would also
// exceed the size of a new segment.
// TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
@ -769,6 +782,10 @@ type walReader struct {
curBuf []byte
lastOffset int64 // offset after last successfully read entry
seriesBuf []RefSeries
sampleBuf []RefSample
tombstoneBuf []Stone
err error
}
@ -996,7 +1013,8 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
}
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
series := []RefSeries{}
r.seriesBuf = r.seriesBuf[:0]
dec := decbuf{b: b}
for len(dec.b) > 0 && dec.err() == nil {
@ -1010,7 +1028,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
}
sort.Sort(lset)
series = append(series, RefSeries{
r.seriesBuf = append(r.seriesBuf, RefSeries{
Ref: ref,
Labels: lset,
})
@ -1019,16 +1037,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
return nil, dec.err()
}
if len(dec.b) > 0 {
return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return series, nil
return r.seriesBuf, nil
}
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
if len(b) == 0 {
return nil, nil
}
samples := []RefSample{}
r.sampleBuf = r.sampleBuf[:0]
dec := decbuf{b: b}
var (
@ -1041,7 +1059,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
dtime := dec.varint64()
val := dec.be64()
samples = append(samples, RefSample{
r.sampleBuf = append(r.sampleBuf, RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
@ -1049,20 +1067,20 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
}
if dec.err() != nil {
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples))
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf))
}
if len(dec.b) > 0 {
return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return samples, nil
return r.sampleBuf, nil
}
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
dec := &decbuf{b: b}
var stones []Stone
r.tombstoneBuf = r.tombstoneBuf[:0]
for dec.len() > 0 && dec.err() == nil {
stones = append(stones, Stone{
r.tombstoneBuf = append(r.tombstoneBuf, Stone{
ref: dec.be64(),
intervals: Intervals{
{Mint: dec.varint64(), Maxt: dec.varint64()},
@ -1073,7 +1091,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
return nil, dec.err()
}
if len(dec.b) > 0 {
return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return stones, nil
return r.tombstoneBuf, nil
}

14
vendor/vendor.json vendored
View file

@ -871,22 +871,22 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "AoNkGFKIyLNi4a/QcO8p5D7xIXs=",
"checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=",
"path": "github.com/prometheus/tsdb",
"revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02",
"revisionTime": "2017-09-07T11:04:02Z"
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
"revisionTime": "2017-09-11T08:41:33Z"
},
{
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02",
"revisionTime": "2017-09-07T11:04:02Z"
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
"revisionTime": "2017-09-11T08:41:33Z"
},
{
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02",
"revisionTime": "2017-09-07T11:04:02Z"
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
"revisionTime": "2017-09-11T08:41:33Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",

View file

@ -284,19 +284,23 @@ func serveDebug(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
subpath := route.Param(ctx, "subpath")
// Based off paths from init() in golang.org/src/net/http/pprof/pprof.go
if subpath == "/pprof/" {
pprof.Index(w, req)
} else if subpath == "/pprof/cmdline" {
pprof.Cmdline(w, req)
} else if subpath == "/pprof/profile" {
pprof.Profile(w, req)
} else if subpath == "/pprof/symbol" {
pprof.Symbol(w, req)
} else if subpath == "/pprof/trace" {
pprof.Trace(w, req)
} else {
if !strings.HasPrefix(subpath, "/pprof/") {
http.NotFound(w, req)
return
}
subpath = strings.TrimPrefix(subpath, "/pprof/")
switch subpath {
case "cmdline":
pprof.Cmdline(w, req)
case "profile":
pprof.Profile(w, req)
case "symbol":
pprof.Symbol(w, req)
case "trace":
pprof.Trace(w, req)
default:
pprof.Index(w, req)
}
}