Merge branch 'master' of https://github.com/prometheus/prometheus into update_k8s

This commit is contained in:
tariqibrahim 2018-12-03 19:16:17 -08:00
commit 08d6c83657
46 changed files with 899 additions and 256 deletions

View file

@ -12,11 +12,11 @@ build:
path: ./cmd/promtool path: ./cmd/promtool
flags: -mod=vendor -a -tags netgo flags: -mod=vendor -a -tags netgo
ldflags: | ldflags: |
-X {{repoPath}}/vendor/github.com/prometheus/common/version.Version={{.Version}} -X github.com/prometheus/common/version.Version={{.Version}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.Revision={{.Revision}} -X github.com/prometheus/common/version.Revision={{.Revision}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.Branch={{.Branch}} -X github.com/prometheus/common/version.Branch={{.Branch}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.BuildUser={{user}}@{{host}} -X github.com/prometheus/common/version.BuildUser={{user}}@{{host}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.BuildDate={{date "20060102-15:04:05"}} -X github.com/prometheus/common/version.BuildDate={{date "20060102-15:04:05"}}
tarball: tarball:
files: files:
- consoles - consoles

View file

@ -1,6 +1,6 @@
Maintainers of this repository with their focus areas: Maintainers of this repository with their focus areas:
* Brian Brazil <brian.brazil@robustperception.io> @brian-brazil: Console templates; semantics of PromQL, service discovery, and relabeling. * Brian Brazil <brian.brazil@robustperception.io> @brian-brazil: Console templates; semantics of PromQL, service discovery, and relabeling.
* Fabian Reinartz <fabian.reinartz@coreos.com> @fabxc: PromQL parsing and evaluation; implementation of retrieval, alert notification, and service discovery. * Fabian Reinartz <freinartz@google.com> @fabxc: PromQL parsing and evaluation; implementation of retrieval, alert notification, and service discovery.
* Julius Volz <julius.volz@gmail.com> @juliusv: Remote storage integrations; web UI. * Julius Volz <julius.volz@gmail.com> @juliusv: Remote storage integrations; web UI.

View file

@ -175,9 +175,11 @@ common-docker-tag-latest:
promu: $(PROMU) promu: $(PROMU)
$(PROMU): $(PROMU):
curl -s -L $(PROMU_URL) | tar -xvz -C /tmp $(eval PROMU_TMP := $(shell mktemp -d))
mkdir -v -p $(FIRST_GOPATH)/bin curl -s -L $(PROMU_URL) | tar -xvzf - -C $(PROMU_TMP)
cp -v /tmp/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(PROMU) mkdir -p $(FIRST_GOPATH)/bin
cp $(PROMU_TMP)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(FIRST_GOPATH)/bin/promu
rm -r $(PROMU_TMP)
.PHONY: proto .PHONY: proto
proto: proto:

View file

@ -233,6 +233,9 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
// Do global overrides and validate unique names. // Do global overrides and validate unique names.
jobNames := map[string]struct{}{} jobNames := map[string]struct{}{}
for _, scfg := range c.ScrapeConfigs { for _, scfg := range c.ScrapeConfigs {
if scfg == nil {
return fmt.Errorf("empty or null scrape config section")
}
// First set the correct scrape interval, then check that the timeout // First set the correct scrape interval, then check that the timeout
// (inferred or explicit) is not greater than that. // (inferred or explicit) is not greater than that.
if scfg.ScrapeInterval == 0 { if scfg.ScrapeInterval == 0 {
@ -254,6 +257,16 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
} }
jobNames[scfg.JobName] = struct{}{} jobNames[scfg.JobName] = struct{}{}
} }
for _, rwcfg := range c.RemoteWriteConfigs {
if rwcfg == nil {
return fmt.Errorf("empty or null remote write config section")
}
}
for _, rrcfg := range c.RemoteReadConfigs {
if rrcfg == nil {
return fmt.Errorf("empty or null remote read config section")
}
}
return nil return nil
} }
@ -360,6 +373,13 @@ func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err return err
} }
// The UnmarshalYAML method of ServiceDiscoveryConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here.
if err := c.ServiceDiscoveryConfig.Validate(); err != nil {
return err
}
// Check for users putting URLs in target groups. // Check for users putting URLs in target groups.
if len(c.RelabelConfigs) == 0 { if len(c.RelabelConfigs) == 0 {
for _, tg := range c.ServiceDiscoveryConfig.StaticConfigs { for _, tg := range c.ServiceDiscoveryConfig.StaticConfigs {
@ -371,6 +391,17 @@ func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
} }
} }
for _, rlcfg := range c.RelabelConfigs {
if rlcfg == nil {
return fmt.Errorf("empty or null target relabeling rule in scrape config")
}
}
for _, rlcfg := range c.MetricRelabelConfigs {
if rlcfg == nil {
return fmt.Errorf("empty or null metric relabeling rule in scrape config")
}
}
// Add index to the static config target groups for unique identification // Add index to the static config target groups for unique identification
// within scrape pool. // within scrape pool.
for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs { for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs {
@ -392,7 +423,16 @@ func (c *AlertingConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
// by the default due to the YAML parser behavior for empty blocks. // by the default due to the YAML parser behavior for empty blocks.
*c = AlertingConfig{} *c = AlertingConfig{}
type plain AlertingConfig type plain AlertingConfig
return unmarshal((*plain)(c)) if err := unmarshal((*plain)(c)); err != nil {
return err
}
for _, rlcfg := range c.AlertRelabelConfigs {
if rlcfg == nil {
return fmt.Errorf("empty or null alert relabeling rule")
}
}
return nil
} }
// AlertmanagerConfig configures how Alertmanagers can be discovered and communicated with. // AlertmanagerConfig configures how Alertmanagers can be discovered and communicated with.
@ -429,6 +469,13 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
return err return err
} }
// The UnmarshalYAML method of ServiceDiscoveryConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here.
if err := c.ServiceDiscoveryConfig.Validate(); err != nil {
return err
}
// Check for users putting URLs in target groups. // Check for users putting URLs in target groups.
if len(c.RelabelConfigs) == 0 { if len(c.RelabelConfigs) == 0 {
for _, tg := range c.ServiceDiscoveryConfig.StaticConfigs { for _, tg := range c.ServiceDiscoveryConfig.StaticConfigs {
@ -440,6 +487,12 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
} }
} }
for _, rlcfg := range c.RelabelConfigs {
if rlcfg == nil {
return fmt.Errorf("empty or null Alertmanager target relabeling rule")
}
}
// Add index to the static config target groups for unique identification // Add index to the static config target groups for unique identification
// within scrape pool. // within scrape pool.
for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs { for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs {
@ -632,6 +685,11 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
if c.URL == nil { if c.URL == nil {
return fmt.Errorf("url for remote_write is empty") return fmt.Errorf("url for remote_write is empty")
} }
for _, rlcfg := range c.WriteRelabelConfigs {
if rlcfg == nil {
return fmt.Errorf("empty or null relabeling rule in remote write config")
}
}
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer. // The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs. // We cannot make it a pointer as the parser panics for inlined pointer structs.

View file

@ -751,6 +751,58 @@ var expectedErrors = []struct {
filename: "section_key_dup.bad.yml", filename: "section_key_dup.bad.yml",
errMsg: "field scrape_configs already set in type config.plain", errMsg: "field scrape_configs already set in type config.plain",
}, },
{
filename: "azure_client_id_missing.bad.yml",
errMsg: "Azure SD configuration requires a client_id",
},
{
filename: "azure_client_secret_missing.bad.yml",
errMsg: "Azure SD configuration requires a client_secret",
},
{
filename: "azure_subscription_id_missing.bad.yml",
errMsg: "Azure SD configuration requires a subscription_id",
},
{
filename: "azure_tenant_id_missing.bad.yml",
errMsg: "Azure SD configuration requires a tenant_id",
},
{
filename: "empty_scrape_config.bad.yml",
errMsg: "empty or null scrape config section",
},
{
filename: "empty_rw_config.bad.yml",
errMsg: "empty or null remote write config section",
},
{
filename: "empty_rr_config.bad.yml",
errMsg: "empty or null remote read config section",
},
{
filename: "empty_target_relabel_config.bad.yml",
errMsg: "empty or null target relabeling rule",
},
{
filename: "empty_metric_relabel_config.bad.yml",
errMsg: "empty or null metric relabeling rule",
},
{
filename: "empty_alert_relabel_config.bad.yml",
errMsg: "empty or null alert relabeling rule",
},
{
filename: "empty_alertmanager_relabel_config.bad.yml",
errMsg: "empty or null Alertmanager target relabeling rule",
},
{
filename: "empty_rw_relabel_config.bad.yml",
errMsg: "empty or null relabeling rule in remote write config",
},
{
filename: "empty_static_config.bad.yml",
errMsg: "empty or null section in static_configs",
},
} }
func TestBadConfigs(t *testing.T) { func TestBadConfigs(t *testing.T) {

View file

@ -0,0 +1,7 @@
scrape_configs:
- job_name: azure
azure_sd_configs:
- subscription_id: 11AAAA11-A11A-111A-A111-1111A1111A11
tenant_id: BBBB222B-B2B2-2B22-B222-2BB2222BB2B2
client_id:
client_secret: mysecret

View file

@ -0,0 +1,7 @@
scrape_configs:
- job_name: azure
azure_sd_configs:
- subscription_id: 11AAAA11-A11A-111A-A111-1111A1111A11
tenant_id: BBBB222B-B2B2-2B22-B222-2BB2222BB2B2
client_id: 333333CC-3C33-3333-CCC3-33C3CCCCC33C
client_secret:

View file

@ -0,0 +1,7 @@
scrape_configs:
- job_name: azure
azure_sd_configs:
- subscription_id:
tenant_id: BBBB222B-B2B2-2B22-B222-2BB2222BB2B2
client_id: 333333CC-3C33-3333-CCC3-33C3CCCCC33C
client_secret: mysecret

View file

@ -0,0 +1,7 @@
scrape_configs:
- job_name: azure
azure_sd_configs:
- subscription_id: 11AAAA11-A11A-111A-A111-1111A1111A11
tenant_id:
client_id: 333333CC-3C33-3333-CCC3-33C3CCCCC33C
client_secret: mysecret

View file

@ -0,0 +1,3 @@
alerting:
alert_relabel_configs:
-

View file

@ -0,0 +1,4 @@
alerting:
alertmanagers:
- relabel_configs:
-

View file

@ -0,0 +1,4 @@
scrape_configs:
- job_name: "test"
metric_relabel_configs:
-

View file

@ -0,0 +1,2 @@
remote_read:
-

View file

@ -0,0 +1,2 @@
remote_write:
-

View file

@ -0,0 +1,4 @@
remote_write:
- url: "foo"
write_relabel_configs:
-

View file

@ -0,0 +1,2 @@
scrape_configs:
-

View file

@ -0,0 +1,4 @@
scrape_configs:
- job_name: "test"
static_configs:
-

View file

@ -0,0 +1,4 @@
scrape_configs:
- job_name: "test"
relabel_configs:
-

View file

@ -47,6 +47,7 @@ const (
azureLabelMachinePrivateIP = azureLabel + "machine_private_ip" azureLabelMachinePrivateIP = azureLabel + "machine_private_ip"
azureLabelMachineTag = azureLabel + "machine_tag_" azureLabelMachineTag = azureLabel + "machine_tag_"
azureLabelMachineScaleSet = azureLabel + "machine_scale_set" azureLabelMachineScaleSet = azureLabel + "machine_scale_set"
azureLabelPowerState = azureLabel + "machine_power_state"
) )
var ( var (
@ -80,6 +81,13 @@ type SDConfig struct {
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
} }
func validateAuthParam(param, name string) error {
if len(param) == 0 {
return fmt.Errorf("Azure SD configuration requires a %s", name)
}
return nil
}
// UnmarshalYAML implements the yaml.Unmarshaler interface. // UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultSDConfig *c = DefaultSDConfig
@ -88,8 +96,17 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err != nil { if err != nil {
return err return err
} }
if c.SubscriptionID == "" { if err = validateAuthParam(c.SubscriptionID, "subscription_id"); err != nil {
return fmt.Errorf("Azure SD configuration requires a subscription_id") return err
}
if err = validateAuthParam(c.TenantID, "tenant_id"); err != nil {
return err
}
if err = validateAuthParam(c.ClientID, "client_id"); err != nil {
return err
}
if err = validateAuthParam(string(c.ClientSecret), "client_secret"); err != nil {
return err
} }
return nil return nil
} }
@ -212,6 +229,7 @@ type virtualMachine struct {
ScaleSet string ScaleSet string
Tags map[string]*string Tags map[string]*string
NetworkProfile compute.NetworkProfile NetworkProfile compute.NetworkProfile
PowerStateCode string
} }
// Create a new azureResource object from an ID string. // Create a new azureResource object from an ID string.
@ -286,12 +304,21 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) {
return return
} }
// We check if the virtual machine has been deallocated.
// If so, we skip them in service discovery.
if strings.EqualFold(vm.PowerStateCode, "PowerState/deallocated") {
level.Debug(d.logger).Log("msg", "Skipping virtual machine", "machine", vm.Name, "power_state", vm.PowerStateCode)
ch <- target{}
return
}
labels := model.LabelSet{ labels := model.LabelSet{
azureLabelMachineID: model.LabelValue(vm.ID), azureLabelMachineID: model.LabelValue(vm.ID),
azureLabelMachineName: model.LabelValue(vm.Name), azureLabelMachineName: model.LabelValue(vm.Name),
azureLabelMachineOSType: model.LabelValue(vm.OsType), azureLabelMachineOSType: model.LabelValue(vm.OsType),
azureLabelMachineLocation: model.LabelValue(vm.Location), azureLabelMachineLocation: model.LabelValue(vm.Location),
azureLabelMachineResourceGroup: model.LabelValue(r.ResourceGroup), azureLabelMachineResourceGroup: model.LabelValue(r.ResourceGroup),
azureLabelPowerState: model.LabelValue(vm.PowerStateCode),
} }
if vm.ScaleSet != "" { if vm.ScaleSet != "" {
@ -319,16 +346,6 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) {
continue continue
} }
// Unfortunately Azure does not return information on whether a VM is deallocated.
// This information is available via another API call however the Go SDK does not
// yet support this. On deallocated machines, this value happens to be nil so it
// is a cheap and easy way to determine if a machine is allocated or not.
if networkInterface.Properties.Primary == nil {
level.Debug(d.logger).Log("msg", "Skipping deallocated virtual machine", "machine", vm.Name)
ch <- target{}
return
}
if *networkInterface.Properties.Primary { if *networkInterface.Properties.Primary {
for _, ip := range *networkInterface.Properties.IPConfigurations { for _, ip := range *networkInterface.Properties.IPConfigurations {
if ip.Properties.PrivateIPAddress != nil { if ip.Properties.PrivateIPAddress != nil {
@ -456,6 +473,7 @@ func mapFromVM(vm compute.VirtualMachine) virtualMachine {
ScaleSet: "", ScaleSet: "",
Tags: tags, Tags: tags,
NetworkProfile: *(vm.Properties.NetworkProfile), NetworkProfile: *(vm.Properties.NetworkProfile),
PowerStateCode: getPowerStateFromVMInstanceView(vm.Properties.InstanceView),
} }
} }
@ -476,6 +494,7 @@ func mapFromVMScaleSetVM(vm compute.VirtualMachineScaleSetVM, scaleSetName strin
ScaleSet: scaleSetName, ScaleSet: scaleSetName,
Tags: tags, Tags: tags,
NetworkProfile: *(vm.Properties.NetworkProfile), NetworkProfile: *(vm.Properties.NetworkProfile),
PowerStateCode: getPowerStateFromVMInstanceView(vm.Properties.InstanceView),
} }
} }
@ -508,3 +527,16 @@ func (client *azureClient) getNetworkInterfaceByID(networkInterfaceID string) (n
return result, nil return result, nil
} }
func getPowerStateFromVMInstanceView(instanceView *compute.VirtualMachineInstanceView) (powerState string) {
if instanceView.Statuses == nil {
return
}
for _, ivs := range *instanceView.Statuses {
code := *(ivs.Code)
if strings.HasPrefix(code, "PowerState") {
powerState = code
}
}
return
}

View file

@ -26,6 +26,10 @@ func TestMapFromVMWithEmptyTags(t *testing.T) {
vmType := "type" vmType := "type"
location := "westeurope" location := "westeurope"
networkProfile := compute.NetworkProfile{} networkProfile := compute.NetworkProfile{}
provisioningStatusCode := "ProvisioningState/succeeded"
provisionDisplayStatus := "Provisioning succeeded"
powerStatusCode := "PowerState/running"
powerDisplayStatus := "VM running"
properties := &compute.VirtualMachineProperties{ properties := &compute.VirtualMachineProperties{
StorageProfile: &compute.StorageProfile{ StorageProfile: &compute.StorageProfile{
OsDisk: &compute.OSDisk{ OsDisk: &compute.OSDisk{
@ -33,6 +37,20 @@ func TestMapFromVMWithEmptyTags(t *testing.T) {
}, },
}, },
NetworkProfile: &networkProfile, NetworkProfile: &networkProfile,
InstanceView: &compute.VirtualMachineInstanceView{
Statuses: &[]compute.InstanceViewStatus{
{
Code: &provisioningStatusCode,
Level: "Info",
DisplayStatus: &provisionDisplayStatus,
},
{
Code: &powerStatusCode,
Level: "Info",
DisplayStatus: &powerDisplayStatus,
},
},
},
} }
testVM := compute.VirtualMachine{ testVM := compute.VirtualMachine{
@ -52,6 +70,7 @@ func TestMapFromVMWithEmptyTags(t *testing.T) {
OsType: "Linux", OsType: "Linux",
Tags: map[string]*string{}, Tags: map[string]*string{},
NetworkProfile: networkProfile, NetworkProfile: networkProfile,
PowerStateCode: "PowerState/running",
} }
actualVM := mapFromVM(testVM) actualVM := mapFromVM(testVM)
@ -69,6 +88,10 @@ func TestMapFromVMWithTags(t *testing.T) {
tags := map[string]*string{ tags := map[string]*string{
"prometheus": new(string), "prometheus": new(string),
} }
provisioningStatusCode := "ProvisioningState/succeeded"
provisionDisplayStatus := "Provisioning succeeded"
powerStatusCode := "PowerState/running"
powerDisplayStatus := "VM running"
networkProfile := compute.NetworkProfile{} networkProfile := compute.NetworkProfile{}
properties := &compute.VirtualMachineProperties{ properties := &compute.VirtualMachineProperties{
StorageProfile: &compute.StorageProfile{ StorageProfile: &compute.StorageProfile{
@ -77,6 +100,20 @@ func TestMapFromVMWithTags(t *testing.T) {
}, },
}, },
NetworkProfile: &networkProfile, NetworkProfile: &networkProfile,
InstanceView: &compute.VirtualMachineInstanceView{
Statuses: &[]compute.InstanceViewStatus{
{
Code: &provisioningStatusCode,
Level: "Info",
DisplayStatus: &provisionDisplayStatus,
},
{
Code: &powerStatusCode,
Level: "Info",
DisplayStatus: &powerDisplayStatus,
},
},
},
} }
testVM := compute.VirtualMachine{ testVM := compute.VirtualMachine{
@ -96,6 +133,7 @@ func TestMapFromVMWithTags(t *testing.T) {
OsType: "Linux", OsType: "Linux",
Tags: tags, Tags: tags,
NetworkProfile: networkProfile, NetworkProfile: networkProfile,
PowerStateCode: "PowerState/running",
} }
actualVM := mapFromVM(testVM) actualVM := mapFromVM(testVM)
@ -111,6 +149,10 @@ func TestMapFromVMScaleSetVMWithEmptyTags(t *testing.T) {
vmType := "type" vmType := "type"
location := "westeurope" location := "westeurope"
networkProfile := compute.NetworkProfile{} networkProfile := compute.NetworkProfile{}
provisioningStatusCode := "ProvisioningState/succeeded"
provisionDisplayStatus := "Provisioning succeeded"
powerStatusCode := "PowerState/running"
powerDisplayStatus := "VM running"
properties := &compute.VirtualMachineScaleSetVMProperties{ properties := &compute.VirtualMachineScaleSetVMProperties{
StorageProfile: &compute.StorageProfile{ StorageProfile: &compute.StorageProfile{
OsDisk: &compute.OSDisk{ OsDisk: &compute.OSDisk{
@ -118,6 +160,20 @@ func TestMapFromVMScaleSetVMWithEmptyTags(t *testing.T) {
}, },
}, },
NetworkProfile: &networkProfile, NetworkProfile: &networkProfile,
InstanceView: &compute.VirtualMachineInstanceView{
Statuses: &[]compute.InstanceViewStatus{
{
Code: &provisioningStatusCode,
Level: "Info",
DisplayStatus: &provisionDisplayStatus,
},
{
Code: &powerStatusCode,
Level: "Info",
DisplayStatus: &powerDisplayStatus,
},
},
},
} }
testVM := compute.VirtualMachineScaleSetVM{ testVM := compute.VirtualMachineScaleSetVM{
@ -139,6 +195,7 @@ func TestMapFromVMScaleSetVMWithEmptyTags(t *testing.T) {
Tags: map[string]*string{}, Tags: map[string]*string{},
NetworkProfile: networkProfile, NetworkProfile: networkProfile,
ScaleSet: scaleSet, ScaleSet: scaleSet,
PowerStateCode: "PowerState/running",
} }
actualVM := mapFromVMScaleSetVM(testVM, scaleSet) actualVM := mapFromVMScaleSetVM(testVM, scaleSet)
@ -157,6 +214,10 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) {
"prometheus": new(string), "prometheus": new(string),
} }
networkProfile := compute.NetworkProfile{} networkProfile := compute.NetworkProfile{}
provisioningStatusCode := "ProvisioningState/succeeded"
provisionDisplayStatus := "Provisioning succeeded"
powerStatusCode := "PowerState/running"
powerDisplayStatus := "VM running"
properties := &compute.VirtualMachineScaleSetVMProperties{ properties := &compute.VirtualMachineScaleSetVMProperties{
StorageProfile: &compute.StorageProfile{ StorageProfile: &compute.StorageProfile{
OsDisk: &compute.OSDisk{ OsDisk: &compute.OSDisk{
@ -164,6 +225,20 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) {
}, },
}, },
NetworkProfile: &networkProfile, NetworkProfile: &networkProfile,
InstanceView: &compute.VirtualMachineInstanceView{
Statuses: &[]compute.InstanceViewStatus{
{
Code: &provisioningStatusCode,
Level: "Info",
DisplayStatus: &provisionDisplayStatus,
},
{
Code: &powerStatusCode,
Level: "Info",
DisplayStatus: &powerDisplayStatus,
},
},
},
} }
testVM := compute.VirtualMachineScaleSetVM{ testVM := compute.VirtualMachineScaleSetVM{
@ -185,6 +260,7 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) {
Tags: tags, Tags: tags,
NetworkProfile: networkProfile, NetworkProfile: networkProfile,
ScaleSet: scaleSet, ScaleSet: scaleSet,
PowerStateCode: "PowerState/running",
} }
actualVM := mapFromVMScaleSetVM(testVM, scaleSet) actualVM := mapFromVMScaleSetVM(testVM, scaleSet)
@ -193,3 +269,52 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) {
t.Errorf("Expected %v got %v", expectedVM, actualVM) t.Errorf("Expected %v got %v", expectedVM, actualVM)
} }
} }
func TestGetPowerStatusFromVM(t *testing.T) {
provisioningStatusCode := "ProvisioningState/succeeded"
provisionDisplayStatus := "Provisioning succeeded"
powerStatusCode := "PowerState/running"
powerDisplayStatus := "VM running"
properties := &compute.VirtualMachineScaleSetVMProperties{
StorageProfile: &compute.StorageProfile{
OsDisk: &compute.OSDisk{
OsType: "Linux",
},
},
InstanceView: &compute.VirtualMachineInstanceView{
Statuses: &[]compute.InstanceViewStatus{
{
Code: &provisioningStatusCode,
Level: "Info",
DisplayStatus: &provisionDisplayStatus,
},
{
Code: &powerStatusCode,
Level: "Info",
DisplayStatus: &powerDisplayStatus,
},
},
},
}
testVM := compute.VirtualMachineScaleSetVM{
Properties: properties,
}
actual := getPowerStateFromVMInstanceView(testVM.Properties.InstanceView)
expected := "PowerState/running"
if actual != expected {
t.Errorf("expected powerStatus %s, but got %s instead", expected, actual)
}
// Noq we test a virtualMachine with an empty InstanceView struct.
testVM.Properties.InstanceView = &compute.VirtualMachineInstanceView{}
actual = getPowerStateFromVMInstanceView(testVM.Properties.InstanceView)
if actual != "" {
t.Errorf("expected powerStatus %s, but got %s instead", expected, actual)
}
}

View file

@ -14,6 +14,8 @@
package config package config
import ( import (
"fmt"
"github.com/prometheus/prometheus/discovery/azure" "github.com/prometheus/prometheus/discovery/azure"
"github.com/prometheus/prometheus/discovery/consul" "github.com/prometheus/prometheus/discovery/consul"
"github.com/prometheus/prometheus/discovery/dns" "github.com/prometheus/prometheus/discovery/dns"
@ -58,8 +60,67 @@ type ServiceDiscoveryConfig struct {
TritonSDConfigs []*triton.SDConfig `yaml:"triton_sd_configs,omitempty"` TritonSDConfigs []*triton.SDConfig `yaml:"triton_sd_configs,omitempty"`
} }
// UnmarshalYAML implements the yaml.Unmarshaler interface. // Validate validates the ServiceDiscoveryConfig.
func (c *ServiceDiscoveryConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { func (c *ServiceDiscoveryConfig) Validate() error {
type plain ServiceDiscoveryConfig for _, cfg := range c.AzureSDConfigs {
return unmarshal((*plain)(c)) if cfg == nil {
return fmt.Errorf("empty or null section in azure_sd_configs")
}
}
for _, cfg := range c.ConsulSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in consul_sd_configs")
}
}
for _, cfg := range c.DNSSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in dns_sd_configs")
}
}
for _, cfg := range c.EC2SDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in ec2_sd_configs")
}
}
for _, cfg := range c.FileSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in file_sd_configs")
}
}
for _, cfg := range c.GCESDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in gce_sd_configs")
}
}
for _, cfg := range c.KubernetesSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in kubernetes_sd_configs")
}
}
for _, cfg := range c.MarathonSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in marathon_sd_configs")
}
}
for _, cfg := range c.NerveSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in nerve_sd_configs")
}
}
for _, cfg := range c.OpenstackSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in openstack_sd_configs")
}
}
for _, cfg := range c.ServersetSDConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in serverset_sd_configs")
}
}
for _, cfg := range c.StaticConfigs {
if cfg == nil {
return fmt.Errorf("empty or null section in static_configs")
}
}
return nil
} }

View file

@ -46,6 +46,7 @@ const (
ec2LabelPlatform = ec2Label + "platform" ec2LabelPlatform = ec2Label + "platform"
ec2LabelPublicDNS = ec2Label + "public_dns_name" ec2LabelPublicDNS = ec2Label + "public_dns_name"
ec2LabelPublicIP = ec2Label + "public_ip" ec2LabelPublicIP = ec2Label + "public_ip"
ec2LabelPrivateDNS = ec2Label + "private_dns_name"
ec2LabelPrivateIP = ec2Label + "private_ip" ec2LabelPrivateIP = ec2Label + "private_ip"
ec2LabelPrimarySubnetID = ec2Label + "primary_subnet_id" ec2LabelPrimarySubnetID = ec2Label + "primary_subnet_id"
ec2LabelSubnetID = ec2Label + "subnet_id" ec2LabelSubnetID = ec2Label + "subnet_id"
@ -250,6 +251,9 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) {
} }
labels[ec2LabelPrivateIP] = model.LabelValue(*inst.PrivateIpAddress) labels[ec2LabelPrivateIP] = model.LabelValue(*inst.PrivateIpAddress)
if inst.PrivateDnsName != nil {
labels[ec2LabelPrivateDNS] = model.LabelValue(*inst.PrivateDnsName)
}
addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", d.port)) addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", d.port))
labels[model.AddressLabel] = model.LabelValue(addr) labels[model.AddressLabel] = model.LabelValue(addr)

View file

@ -314,11 +314,13 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group {
} }
func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) { func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) {
var added bool
add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) { add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
t := reflect.TypeOf(cfg).String() t := reflect.TypeOf(cfg).String()
for _, p := range m.providers { for _, p := range m.providers {
if reflect.DeepEqual(cfg, p.config) { if reflect.DeepEqual(cfg, p.config) {
p.subs = append(p.subs, setName) p.subs = append(p.subs, setName)
added = true
return return
} }
} }
@ -337,6 +339,7 @@ func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setNam
subs: []string{setName}, subs: []string{setName},
} }
m.providers = append(m.providers, &provider) m.providers = append(m.providers, &provider)
added = true
} }
for _, c := range cfg.DNSSDConfigs { for _, c := range cfg.DNSSDConfigs {
@ -401,7 +404,17 @@ func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setNam
} }
if len(cfg.StaticConfigs) > 0 { if len(cfg.StaticConfigs) > 0 {
add(setName, func() (Discoverer, error) { add(setName, func() (Discoverer, error) {
return &StaticProvider{cfg.StaticConfigs}, nil return &StaticProvider{TargetGroups: cfg.StaticConfigs}, nil
})
}
if !added {
// Add an empty target group to force the refresh of the corresponding
// scrape pool and to notify the receiver that this target set has no
// current targets.
// It can happen because the combined set of SD configurations is empty
// or because we fail to instantiate all the SD configurations.
add(setName, func() (Discoverer, error) {
return &StaticProvider{TargetGroups: []*targetgroup.Group{&targetgroup.Group{}}}, nil
}) })
} }
} }

View file

@ -719,6 +719,7 @@ func assertEqualGroups(t *testing.T, got, expected []*targetgroup.Group, msg fun
} }
func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
t.Helper()
if _, ok := tSets[poolKey]; !ok { if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return return
@ -741,7 +742,7 @@ func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Grou
if !present { if !present {
msg = "not" msg = "not"
} }
t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets) t.Fatalf("%q should %s be present in Targets labels: %q", label, msg, mergedTargets)
} }
} }
@ -781,7 +782,7 @@ scrape_configs:
- targets: ["foo:9090"] - targets: ["foo:9090"]
` `
if err := yaml.UnmarshalStrict([]byte(sTwo), cfg); err != nil { if err := yaml.UnmarshalStrict([]byte(sTwo), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err) t.Fatalf("Unable to load YAML config sTwo: %s", err)
} }
c = make(map[string]sd_config.ServiceDiscoveryConfig) c = make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs { for _, v := range cfg.ScrapeConfigs {
@ -794,6 +795,67 @@ scrape_configs:
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", false) verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", false)
} }
// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after
// removing all targets from the static_configs sends an update with empty targetGroups.
// This is required to signal the receiver that this target set has no current targets.
func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
cfg := &config.Config{}
sOne := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
`
if err := yaml.UnmarshalStrict([]byte(sOne), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger())
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true)
sTwo := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
`
if err := yaml.UnmarshalStrict([]byte(sTwo), cfg); err != nil {
t.Fatalf("Unable to load YAML config sTwo: %s", err)
}
c = make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
pkey := poolKey{setName: "prometheus", provider: "string/0"}
targetGroups, ok := discoveryManager.targets[pkey]
if !ok {
t.Fatalf("'%v' should be present in target groups", pkey)
}
group, ok := targetGroups[""]
if !ok {
t.Fatalf("missing '' key in target groups %v", targetGroups)
}
if len(group.Targets) != 0 {
t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets))
}
}
func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "sd") tmpFile, err := ioutil.TempFile("", "sd")
if err != nil { if err != nil {

View file

@ -263,10 +263,11 @@ The following meta labels are available on targets during relabeling:
* `__meta_azure_machine_location`: the location the machine runs in * `__meta_azure_machine_location`: the location the machine runs in
* `__meta_azure_machine_name`: the machine name * `__meta_azure_machine_name`: the machine name
* `__meta_azure_machine_os_type`: the machine operating system * `__meta_azure_machine_os_type`: the machine operating system
* `__meta_azure_machine_power_state`: the current power state of the machine
* `__meta_azure_machine_private_ip`: the machine's private IP * `__meta_azure_machine_private_ip`: the machine's private IP
* `__meta_azure_machine_resource_group`: the machine's resource group * `__meta_azure_machine_resource_group`: the machine's resource group
* `__meta_azure_machine_tag_<tagname>`: each tag value of the machine
* `__meta_azure_machine_scale_set`: the name of the scale set which the vm is part of (this value is only set if you are using a [scale set](https://docs.microsoft.com/en-us/azure/virtual-machine-scale-sets/)) * `__meta_azure_machine_scale_set`: the name of the scale set which the vm is part of (this value is only set if you are using a [scale set](https://docs.microsoft.com/en-us/azure/virtual-machine-scale-sets/))
* `__meta_azure_machine_tag_<tagname>`: each tag value of the machine
See below for the configuration options for Azure discovery: See below for the configuration options for Azure discovery:
@ -407,6 +408,7 @@ The following meta labels are available on targets during [relabeling](#relabel_
* `__meta_ec2_owner_id`: the ID of the AWS account that owns the EC2 instance * `__meta_ec2_owner_id`: the ID of the AWS account that owns the EC2 instance
* `__meta_ec2_platform`: the Operating System platform, set to 'windows' on Windows servers, absent otherwise * `__meta_ec2_platform`: the Operating System platform, set to 'windows' on Windows servers, absent otherwise
* `__meta_ec2_primary_subnet_id`: the subnet ID of the primary network interface, if available * `__meta_ec2_primary_subnet_id`: the subnet ID of the primary network interface, if available
* `__meta_ec2_private_dns_name`: the private DNS name of the instance, if available
* `__meta_ec2_private_ip`: the private IP address of the instance, if present * `__meta_ec2_private_ip`: the private IP address of the instance, if present
* `__meta_ec2_public_dns_name`: the public DNS name of the instance, if available * `__meta_ec2_public_dns_name`: the public DNS name of the instance, if available
* `__meta_ec2_public_ip`: the public IP address of the instance, if available * `__meta_ec2_public_ip`: the public IP address of the instance, if available

View file

@ -24,6 +24,10 @@ and one of the following HTTP response codes:
Other non-`2xx` codes may be returned for errors occurring before the API Other non-`2xx` codes may be returned for errors occurring before the API
endpoint is reached. endpoint is reached.
An array of warnings may be returned if there are errors that do
not inhibit the request execution. All of the data that was successfully
collected will be returned in the data field.
The JSON response envelope format is as follows: The JSON response envelope format is as follows:
``` ```
@ -34,7 +38,11 @@ The JSON response envelope format is as follows:
// Only set if status is "error". The data field may still hold // Only set if status is "error". The data field may still hold
// additional data. // additional data.
"errorType": "<string>", "errorType": "<string>",
"error": "<string>" "error": "<string>",
// Only if there were warnings while executing the request.
// There will still be data in the data field.
"warnings": ["<string>"]
} }
``` ```

View file

@ -25,6 +25,7 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
) )
@ -34,6 +35,15 @@ type customSD struct {
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
} }
func fingerprint(group *targetgroup.Group) model.Fingerprint {
groupFingerprint := model.LabelSet{}.Fingerprint()
for _, targets := range group.Targets {
groupFingerprint ^= targets.Fingerprint()
}
groupFingerprint ^= group.Labels.Fingerprint()
return groupFingerprint
}
// Adapter runs an unknown service discovery implementation and converts its target groups // Adapter runs an unknown service discovery implementation and converts its target groups
// to JSON and writes to a file for file_sd. // to JSON and writes to a file for file_sd.
type Adapter struct { type Adapter struct {
@ -57,13 +67,7 @@ func mapToArray(m map[string]*customSD) []customSD {
func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD { func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD {
groups := make(map[string]*customSD) groups := make(map[string]*customSD)
for k, sdTargetGroups := range allTargetGroups { for k, sdTargetGroups := range allTargetGroups {
for i, group := range sdTargetGroups { for _, group := range sdTargetGroups {
// There is no target, so no need to keep it.
if len(group.Targets) <= 0 {
continue
}
newTargets := make([]string, 0) newTargets := make([]string, 0)
newLabels := make(map[string]string) newLabels := make(map[string]string)
@ -76,12 +80,16 @@ func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[s
for name, value := range group.Labels { for name, value := range group.Labels {
newLabels[string(name)] = string(value) newLabels[string(name)] = string(value)
} }
// Make a unique key, including the current index, in case the sd_type (map key) and group.Source is not unique.
key := fmt.Sprintf("%s:%s:%d", k, group.Source, i) sdGroup := customSD{
groups[key] = &customSD{
Targets: newTargets, Targets: newTargets,
Labels: newLabels, Labels: newLabels,
} }
// Make a unique key, including group's fingerprint, in case the sd_type (map key) and group.Source is not unique.
groupFingerprint := fingerprint(group)
key := fmt.Sprintf("%s:%s:%s", k, group.Source, groupFingerprint.String())
groups[key] = &sdGroup
} }
} }

View file

@ -41,7 +41,16 @@ func TestGenerateTargetGroups(t *testing.T) {
}, },
}, },
}, },
expectedCustomSD: map[string]*customSD{}, expectedCustomSD: map[string]*customSD{
"customSD:Consul:0000000000000000": {
Targets: []string{},
Labels: map[string]string{},
},
"customSD:Kubernetes:0000000000000000": {
Targets: []string{},
Labels: map[string]string{},
},
},
}, },
{ {
title: "targetGroup filled", title: "targetGroup filled",
@ -78,7 +87,7 @@ func TestGenerateTargetGroups(t *testing.T) {
}, },
}, },
expectedCustomSD: map[string]*customSD{ expectedCustomSD: map[string]*customSD{
"customSD:Azure:0": { "customSD:Azure:282a007a18fadbbb": {
Targets: []string{ Targets: []string{
"host1", "host1",
"host2", "host2",
@ -87,7 +96,7 @@ func TestGenerateTargetGroups(t *testing.T) {
"__meta_test_label": "label_test_1", "__meta_test_label": "label_test_1",
}, },
}, },
"customSD:Openshift:1": { "customSD:Openshift:281c007a18ea2ad0": {
Targets: []string{ Targets: []string{
"host3", "host3",
"host4", "host4",
@ -125,7 +134,7 @@ func TestGenerateTargetGroups(t *testing.T) {
}, },
}, },
expectedCustomSD: map[string]*customSD{ expectedCustomSD: map[string]*customSD{
"customSD:GCE:0": { "customSD:GCE:282a007a18fadbbb": {
Targets: []string{ Targets: []string{
"host1", "host1",
"host2", "host2",
@ -134,6 +143,12 @@ func TestGenerateTargetGroups(t *testing.T) {
"__meta_test_label": "label_test_1", "__meta_test_label": "label_test_1",
}, },
}, },
"customSD:Kubernetes:282e007a18fad483": {
Targets: []string{},
Labels: map[string]string{
"__meta_test_label": "label_test_2",
},
},
}, },
}, },
} }

View file

@ -111,8 +111,9 @@ type MatrixSelector struct {
Offset time.Duration Offset time.Duration
LabelMatchers []*labels.Matcher LabelMatchers []*labels.Matcher
// The series are populated at query preparation time. // The unexpanded seriesSet populated at query preparation time.
series []storage.Series unexpandedSeriesSet storage.SeriesSet
series []storage.Series
} }
// NumberLiteral represents a number. // NumberLiteral represents a number.
@ -144,8 +145,9 @@ type VectorSelector struct {
Offset time.Duration Offset time.Duration
LabelMatchers []*labels.Matcher LabelMatchers []*labels.Matcher
// The series are populated at query preparation time. // The unexpanded seriesSet populated at query preparation time.
series []storage.Series unexpandedSeriesSet storage.SeriesSet
series []storage.Series
} }
func (e *AggregateExpr) Type() ValueType { return ValueTypeVector } func (e *AggregateExpr) Type() ValueType { return ValueTypeVector }

View file

@ -154,8 +154,8 @@ func (q *query) Exec(ctx context.Context) *Result {
span.SetTag(queryTag, q.stmt.String()) span.SetTag(queryTag, q.stmt.String())
} }
res, err := q.ng.exec(ctx, q) res, err, warnings := q.ng.exec(ctx, q)
return &Result{Err: err, Value: res} return &Result{Err: err, Value: res, Warnings: warnings}
} }
// contextDone returns an error if the context was canceled or timed out. // contextDone returns an error if the context was canceled or timed out.
@ -332,7 +332,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
// //
// At this point per query only one EvalStmt is evaluated. Alert and record // At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine. // statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { func (ng *Engine) exec(ctx context.Context, q *query) (Value, error, storage.Warnings) {
ng.metrics.currentQueries.Inc() ng.metrics.currentQueries.Inc()
defer ng.metrics.currentQueries.Dec() defer ng.metrics.currentQueries.Dec()
@ -345,7 +345,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
if err := ng.gate.Start(ctx); err != nil { if err := ng.gate.Start(ctx); err != nil {
return nil, contextErr(err, "query queue") return nil, contextErr(err, "query queue"), nil
} }
defer ng.gate.Done() defer ng.gate.Done()
@ -361,14 +361,14 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
// The base context might already be canceled on the first iteration (e.g. during shutdown). // The base context might already be canceled on the first iteration (e.g. during shutdown).
if err := contextDone(ctx, env); err != nil { if err := contextDone(ctx, env); err != nil {
return nil, err return nil, err, nil
} }
switch s := q.Statement().(type) { switch s := q.Statement().(type) {
case *EvalStmt: case *EvalStmt:
return ng.execEvalStmt(ctx, q, s) return ng.execEvalStmt(ctx, q, s)
case testStmt: case testStmt:
return nil, s(ctx) return nil, s(ctx), nil
} }
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
@ -383,9 +383,9 @@ func durationMilliseconds(d time.Duration) int64 {
} }
// execEvalStmt evaluates the expression of an evaluation statement for the given time range. // execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) { func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error, storage.Warnings) {
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
querier, err := ng.populateSeries(ctxPrepare, query.queryable, s) querier, err, warnings := ng.populateSeries(ctxPrepare, query.queryable, s)
prepareSpanTimer.Finish() prepareSpanTimer.Finish()
// XXX(fabxc): the querier returned by populateSeries might be instantiated // XXX(fabxc): the querier returned by populateSeries might be instantiated
@ -396,7 +396,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
} }
if err != nil { if err != nil {
return nil, err return nil, err, warnings
} }
evalSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) evalSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
@ -413,7 +413,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
} }
val, err := evaluator.Eval(s.Expr) val, err := evaluator.Eval(s.Expr)
if err != nil { if err != nil {
return nil, err return nil, err, warnings
} }
evalSpanTimer.Finish() evalSpanTimer.Finish()
@ -432,11 +432,11 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
// timestamp as that is when we ran the evaluation. // timestamp as that is when we ran the evaluation.
vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}} vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}}
} }
return vector, nil return vector, nil, warnings
case ValueTypeScalar: case ValueTypeScalar:
return Scalar{V: mat[0].Points[0].V, T: start}, nil return Scalar{V: mat[0].Points[0].V, T: start}, nil, warnings
case ValueTypeMatrix: case ValueTypeMatrix:
return mat, nil return mat, nil, warnings
default: default:
panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type()))
} }
@ -454,7 +454,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
} }
val, err := evaluator.Eval(s.Expr) val, err := evaluator.Eval(s.Expr)
if err != nil { if err != nil {
return nil, err return nil, err, warnings
} }
evalSpanTimer.Finish() evalSpanTimer.Finish()
@ -465,7 +465,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
query.matrix = mat query.matrix = mat
if err := contextDone(ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
return nil, err return nil, err, warnings
} }
// TODO(fabxc): order ensured by storage? // TODO(fabxc): order ensured by storage?
@ -474,10 +474,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
sort.Sort(mat) sort.Sort(mat)
sortSpanTimer.Finish() sortSpanTimer.Finish()
return mat, nil return mat, nil, warnings
} }
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error, storage.Warnings) {
var maxOffset time.Duration var maxOffset time.Duration
Inspect(s.Expr, func(node Node, _ []Node) error { Inspect(s.Expr, func(node Node, _ []Node) error {
switch n := node.(type) { switch n := node.(type) {
@ -503,11 +503,14 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End)) querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
if err != nil { if err != nil {
return nil, err return nil, err, nil
} }
var warnings storage.Warnings
Inspect(s.Expr, func(node Node, path []Node) error { Inspect(s.Expr, func(node Node, path []Node) error {
var set storage.SeriesSet var set storage.SeriesSet
var wrn storage.Warnings
params := &storage.SelectParams{ params := &storage.SelectParams{
Start: timestamp.FromTime(s.Start), Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End), End: timestamp.FromTime(s.End),
@ -524,17 +527,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
params.End = params.End - offsetMilliseconds params.End = params.End - offsetMilliseconds
} }
set, err = querier.Select(params, n.LabelMatchers...) set, err, wrn = querier.Select(params, n.LabelMatchers...)
warnings = append(warnings, wrn...)
if err != nil { if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return err return err
} }
n.series, err = expandSeriesSet(ctx, set) n.unexpandedSeriesSet = set
if err != nil {
// TODO(fabxc): use multi-error.
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return err
}
case *MatrixSelector: case *MatrixSelector:
params.Func = extractFuncFromPath(path) params.Func = extractFuncFromPath(path)
@ -547,20 +546,17 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
params.End = params.End - offsetMilliseconds params.End = params.End - offsetMilliseconds
} }
set, err = querier.Select(params, n.LabelMatchers...) set, err, wrn = querier.Select(params, n.LabelMatchers...)
warnings = append(warnings, wrn...)
if err != nil { if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return err return err
} }
n.series, err = expandSeriesSet(ctx, set) n.unexpandedSeriesSet = set
if err != nil {
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return err
}
} }
return nil return nil
}) })
return querier, err return querier, err, warnings
} }
// extractFuncFromPath walks up the path and searches for the first instance of // extractFuncFromPath walks up the path and searches for the first instance of
@ -582,6 +578,30 @@ func extractFuncFromPath(p []Node) string {
return extractFuncFromPath(p[:len(p)-1]) return extractFuncFromPath(p[:len(p)-1])
} }
func checkForSeriesSetExpansion(expr Expr, ctx context.Context) error {
switch e := expr.(type) {
case *MatrixSelector:
if e.series == nil {
series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet)
if err != nil {
panic(err)
} else {
e.series = series
}
}
case *VectorSelector:
if e.series == nil {
series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet)
if err != nil {
panic(err)
} else {
e.series = series
}
}
}
return nil
}
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) { func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) {
for it.Next() { for it.Next() {
select { select {
@ -887,6 +907,9 @@ func (ev *evaluator) eval(expr Expr) Value {
} }
sel := e.Args[matrixArgIndex].(*MatrixSelector) sel := e.Args[matrixArgIndex].(*MatrixSelector)
if err := checkForSeriesSetExpansion(sel, ev.ctx); err != nil {
ev.error(err)
}
mat := make(Matrix, 0, len(sel.series)) // Output matrix. mat := make(Matrix, 0, len(sel.series)) // Output matrix.
offset := durationMilliseconds(sel.Offset) offset := durationMilliseconds(sel.Offset)
selRange := durationMilliseconds(sel.Range) selRange := durationMilliseconds(sel.Range)
@ -1018,6 +1041,9 @@ func (ev *evaluator) eval(expr Expr) Value {
}) })
case *VectorSelector: case *VectorSelector:
if err := checkForSeriesSetExpansion(e, ev.ctx); err != nil {
ev.error(err)
}
mat := make(Matrix, 0, len(e.series)) mat := make(Matrix, 0, len(e.series))
it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
for i, s := range e.series { for i, s := range e.series {
@ -1058,6 +1084,10 @@ func (ev *evaluator) eval(expr Expr) Value {
// vectorSelector evaluates a *VectorSelector expression. // vectorSelector evaluates a *VectorSelector expression.
func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil {
ev.error(err)
}
var ( var (
vec = make(Vector, 0, len(node.series)) vec = make(Vector, 0, len(node.series))
) )
@ -1127,17 +1157,20 @@ func putPointSlice(p []Point) {
// matrixSelector evaluates a *MatrixSelector expression. // matrixSelector evaluates a *MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil {
ev.error(err)
}
var ( var (
offset = durationMilliseconds(node.Offset) offset = durationMilliseconds(node.Offset)
maxt = ev.startTimestamp - offset maxt = ev.startTimestamp - offset
mint = maxt - durationMilliseconds(node.Range) mint = maxt - durationMilliseconds(node.Range)
matrix = make(Matrix, 0, len(node.series)) matrix = make(Matrix, 0, len(node.series))
err error
) )
it := storage.NewBuffer(durationMilliseconds(node.Range)) it := storage.NewBuffer(durationMilliseconds(node.Range))
for i, s := range node.series { for i, s := range node.series {
if err = contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
it.Reset(s.Iterator()) it.Reset(s.Iterator())

View file

@ -169,8 +169,8 @@ type errQuerier struct {
err error err error
} }
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) { func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
return errSeriesSet{err: q.err}, q.err return errSeriesSet{err: q.err}, q.err, nil
} }
func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil } func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
func (*errQuerier) LabelNames() ([]string, error) { return nil, nil } func (*errQuerier) LabelNames() ([]string, error) { return nil, nil }
@ -425,7 +425,8 @@ load 10s
MaxSamples: 1, MaxSamples: 1,
Result: Result{ Result: Result{
nil, nil,
Scalar{V: 1, T: 1000}}, Scalar{V: 1, T: 1000},
nil},
Start: time.Unix(1, 0), Start: time.Unix(1, 0),
}, },
{ {
@ -434,6 +435,7 @@ load 10s
Result: Result{ Result: Result{
ErrTooManySamples(env), ErrTooManySamples(env),
nil, nil,
nil,
}, },
Start: time.Unix(1, 0), Start: time.Unix(1, 0),
}, },
@ -443,6 +445,7 @@ load 10s
Result: Result{ Result: Result{
ErrTooManySamples(env), ErrTooManySamples(env),
nil, nil,
nil,
}, },
Start: time.Unix(1, 0), Start: time.Unix(1, 0),
}, },
@ -455,6 +458,7 @@ load 10s
Sample{Point: Point{V: 1, T: 1000}, Sample{Point: Point{V: 1, T: 1000},
Metric: labels.FromStrings("__name__", "metric")}, Metric: labels.FromStrings("__name__", "metric")},
}, },
nil,
}, },
Start: time.Unix(1, 0), Start: time.Unix(1, 0),
}, },
@ -467,6 +471,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}}, Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
Metric: labels.FromStrings("__name__", "metric")}, Metric: labels.FromStrings("__name__", "metric")},
}, },
nil,
}, },
Start: time.Unix(10, 0), Start: time.Unix(10, 0),
}, },
@ -476,6 +481,7 @@ load 10s
Result: Result{ Result: Result{
ErrTooManySamples(env), ErrTooManySamples(env),
nil, nil,
nil,
}, },
Start: time.Unix(10, 0), Start: time.Unix(10, 0),
}, },
@ -489,6 +495,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
Metric: labels.FromStrings()}, Metric: labels.FromStrings()},
}, },
nil,
}, },
Start: time.Unix(0, 0), Start: time.Unix(0, 0),
End: time.Unix(2, 0), End: time.Unix(2, 0),
@ -500,6 +507,7 @@ load 10s
Result: Result{ Result: Result{
ErrTooManySamples(env), ErrTooManySamples(env),
nil, nil,
nil,
}, },
Start: time.Unix(0, 0), Start: time.Unix(0, 0),
End: time.Unix(2, 0), End: time.Unix(2, 0),
@ -514,6 +522,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
Metric: labels.FromStrings("__name__", "metric")}, Metric: labels.FromStrings("__name__", "metric")},
}, },
nil,
}, },
Start: time.Unix(0, 0), Start: time.Unix(0, 0),
End: time.Unix(2, 0), End: time.Unix(2, 0),
@ -525,6 +534,7 @@ load 10s
Result: Result{ Result: Result{
ErrTooManySamples(env), ErrTooManySamples(env),
nil, nil,
nil,
}, },
Start: time.Unix(0, 0), Start: time.Unix(0, 0),
End: time.Unix(2, 0), End: time.Unix(2, 0),
@ -539,6 +549,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}}, Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
Metric: labels.FromStrings("__name__", "metric")}, Metric: labels.FromStrings("__name__", "metric")},
}, },
nil,
}, },
Start: time.Unix(0, 0), Start: time.Unix(0, 0),
End: time.Unix(10, 0), End: time.Unix(10, 0),
@ -550,6 +561,7 @@ load 10s
Result: Result{ Result: Result{
ErrTooManySamples(env), ErrTooManySamples(env),
nil, nil,
nil,
}, },
Start: time.Unix(0, 0), Start: time.Unix(0, 0),
End: time.Unix(10, 0), End: time.Unix(10, 0),

View file

@ -20,6 +20,7 @@ import (
"strings" "strings"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
) )
// Value is a generic interface for values resulting from a query evaluation. // Value is a generic interface for values resulting from a query evaluation.
@ -201,8 +202,9 @@ func (m Matrix) ContainsSameLabelset() bool {
// Result holds the resulting value of an execution or an error // Result holds the resulting value of an execution or an error
// if any occurred. // if any occurred.
type Result struct { type Result struct {
Err error Err error
Value Value Value Value
Warnings storage.Warnings
} }
// Vector returns a Vector if the result value is one. An error is returned if // Vector returns a Vector if the result value is one. An error is returned if

View file

@ -490,6 +490,7 @@ func (g *Group) RestoreForState(ts time.Time) {
level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err) level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err)
return return
} }
defer q.Close()
for _, rule := range g.Rules() { for _, rule := range g.Rules() {
alertRule, ok := rule.(*AlertingRule) alertRule, ok := rule.(*AlertingRule)
@ -517,7 +518,7 @@ func (g *Group) RestoreForState(ts time.Time) {
matchers = append(matchers, mt) matchers = append(matchers, mt)
} }
sset, err := q.Select(nil, matchers...) sset, err, _ := q.Select(nil, matchers...)
if err != nil { if err != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state", level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)

View file

@ -538,7 +538,7 @@ func TestStaleness(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
testutil.Ok(t, err) testutil.Ok(t, err)
set, err := querier.Select(nil, matcher) set, err, _ := querier.Select(nil, matcher)
testutil.Ok(t, err) testutil.Ok(t, err)
samples, err := readSeriesSet(set) samples, err := readSeriesSet(set)

View file

@ -68,23 +68,23 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error)
queriers := make([]Querier, 0, 1+len(f.secondaries)) queriers := make([]Querier, 0, 1+len(f.secondaries))
// Add primary querier // Add primary querier
querier, err := f.primary.Querier(ctx, mint, maxt) primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
queriers = append(queriers, querier) queriers = append(queriers, primaryQuerier)
// Add secondary queriers // Add secondary queriers
for _, storage := range f.secondaries { for _, storage := range f.secondaries {
querier, err := storage.Querier(ctx, mint, maxt) querier, err := storage.Querier(ctx, mint, maxt)
if err != nil { if err != nil {
NewMergeQuerier(queriers).Close() NewMergeQuerier(primaryQuerier, queriers).Close()
return nil, err return nil, err
} }
queriers = append(queriers, querier) queriers = append(queriers, querier)
} }
return NewMergeQuerier(queriers), nil return NewMergeQuerier(primaryQuerier, queriers), nil
} }
func (f *fanout) Appender() (Appender, error) { func (f *fanout) Appender() (Appender, error) {
@ -190,14 +190,18 @@ func (f *fanoutAppender) Rollback() (err error) {
// mergeQuerier implements Querier. // mergeQuerier implements Querier.
type mergeQuerier struct { type mergeQuerier struct {
queriers []Querier primaryQuerier Querier
queriers []Querier
failedQueriers map[Querier]struct{}
setQuerierMap map[SeriesSet]Querier
} }
// NewMergeQuerier returns a new Querier that merges results of input queriers. // NewMergeQuerier returns a new Querier that merges results of input queriers.
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it, // NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
// and will filter NoopQueriers from its arguments, in order to reduce overhead // and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed. // when only one querier is passed.
func NewMergeQuerier(queriers []Querier) Querier { func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
filtered := make([]Querier, 0, len(queriers)) filtered := make([]Querier, 0, len(queriers))
for _, querier := range queriers { for _, querier := range queriers {
if querier != NoopQuerier() { if querier != NoopQuerier() {
@ -205,6 +209,9 @@ func NewMergeQuerier(queriers []Querier) Querier {
} }
} }
setQuerierMap := make(map[SeriesSet]Querier)
failedQueriers := make(map[Querier]struct{})
switch len(filtered) { switch len(filtered) {
case 0: case 0:
return NoopQuerier() return NoopQuerier()
@ -212,22 +219,37 @@ func NewMergeQuerier(queriers []Querier) Querier {
return filtered[0] return filtered[0]
default: default:
return &mergeQuerier{ return &mergeQuerier{
queriers: filtered, primaryQuerier: primaryQuerier,
queriers: filtered,
failedQueriers: failedQueriers,
setQuerierMap: setQuerierMap,
} }
} }
} }
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error) { func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error, Warnings) {
seriesSets := make([]SeriesSet, 0, len(q.queriers)) seriesSets := make([]SeriesSet, 0, len(q.queriers))
var warnings Warnings
for _, querier := range q.queriers { for _, querier := range q.queriers {
set, err := querier.Select(params, matchers...) set, err, wrn := querier.Select(params, matchers...)
q.setQuerierMap[set] = querier
if wrn != nil {
warnings = append(warnings, wrn...)
}
if err != nil { if err != nil {
return nil, err q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primary querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
return nil, err, nil
}
} }
seriesSets = append(seriesSets, set) seriesSets = append(seriesSets, set)
} }
return NewMergeSeriesSet(seriesSets), nil return NewMergeSeriesSet(seriesSets, q), nil, warnings
} }
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
@ -243,6 +265,11 @@ func (q *mergeQuerier) LabelValues(name string) ([]string, error) {
return mergeStringSlices(results), nil return mergeStringSlices(results), nil
} }
func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool {
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
return isFailedQuerier
}
func mergeStringSlices(ss [][]string) []string { func mergeStringSlices(ss [][]string) []string {
switch len(ss) { switch len(ss) {
case 0: case 0:
@ -322,11 +349,13 @@ type mergeSeriesSet struct {
currentSets []SeriesSet currentSets []SeriesSet
heap seriesSetHeap heap seriesSetHeap
sets []SeriesSet sets []SeriesSet
querier *mergeQuerier
} }
// NewMergeSeriesSet returns a new series set that merges (deduplicates) // NewMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the input series sets when iterating. // series returned by the input series sets when iterating.
func NewMergeSeriesSet(sets []SeriesSet) SeriesSet { func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
if len(sets) == 1 { if len(sets) == 1 {
return sets[0] return sets[0]
} }
@ -335,34 +364,53 @@ func NewMergeSeriesSet(sets []SeriesSet) SeriesSet {
// series under the cursor. // series under the cursor.
var h seriesSetHeap var h seriesSetHeap
for _, set := range sets { for _, set := range sets {
if set == nil {
continue
}
if set.Next() { if set.Next() {
heap.Push(&h, set) heap.Push(&h, set)
} }
} }
return &mergeSeriesSet{ return &mergeSeriesSet{
heap: h, heap: h,
sets: sets, sets: sets,
querier: querier,
} }
} }
func (c *mergeSeriesSet) Next() bool { func (c *mergeSeriesSet) Next() bool {
// Firstly advance all the current series sets. If any of them have run out // Run in a loop because the "next" series sets may not be valid anymore.
// we can drop them, otherwise they should be inserted back into the heap. // If a remote querier fails, we discard all series sets from that querier.
for _, set := range c.currentSets { // If, for the current label set, all the next series sets come from
if set.Next() { // failed remote storage sources, we want to keep trying with the next label set.
heap.Push(&c.heap, set) for {
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
} }
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets. // Now, pop items of the heap that have equal label sets.
c.currentSets = nil c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels() c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet) set := heap.Pop(&c.heap).(SeriesSet)
c.currentSets = append(c.currentSets, set) if c.querier != nil && c.querier.IsFailedSet(set) {
continue
}
c.currentSets = append(c.currentSets, set)
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len(c.currentSets) != 0 {
break
}
} }
return true return true
} }

View file

@ -109,7 +109,7 @@ func TestMergeSeriesSet(t *testing.T) {
), ),
}, },
} { } {
merged := NewMergeSeriesSet(tc.input) merged := NewMergeSeriesSet(tc.input, nil)
for merged.Next() { for merged.Next() {
require.True(t, tc.expected.Next()) require.True(t, tc.expected.Next())
actualSeries := merged.At() actualSeries := merged.At()
@ -262,7 +262,7 @@ func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
for i := 0; i < numSeriesSets; i++ { for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples)) seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples))
} }
return NewMergeSeriesSet(seriesSets) return NewMergeSeriesSet(seriesSets, nil)
} }
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {

View file

@ -52,7 +52,7 @@ type Queryable interface {
// Querier provides reading access to time series data. // Querier provides reading access to time series data.
type Querier interface { type Querier interface {
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings)
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
LabelValues(name string) ([]string, error) LabelValues(name string) ([]string, error)
@ -122,3 +122,5 @@ type SeriesIterator interface {
// Err returns the current error. // Err returns the current error.
Err() error Err() error
} }
type Warnings []error

View file

@ -26,8 +26,8 @@ func NoopQuerier() Querier {
return noopQuerier{} return noopQuerier{}
} }
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) { func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings) {
return NoopSeriesSet(), nil return NoopSeriesSet(), nil, nil
} }
func (noopQuerier) LabelValues(name string) ([]string, error) { func (noopQuerier) LabelValues(name string) ([]string, error) {

View file

@ -59,10 +59,10 @@ type querier struct {
// Select implements storage.Querier and uses the given matchers to read series // Select implements storage.Querier and uses the given matchers to read series
// sets from the Client. // sets from the Client.
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
query, err := ToQuery(q.mint, q.maxt, matchers, p) query, err := ToQuery(q.mint, q.maxt, matchers, p)
if err != nil { if err != nil {
return nil, err return nil, err, nil
} }
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name()) remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name())
@ -71,10 +71,10 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (
res, err := q.client.Read(q.ctx, query) res, err := q.client.Read(q.ctx, query)
if err != nil { if err != nil {
return nil, err return nil, err, nil
} }
return FromQueryResult(res), nil return FromQueryResult(res), nil, nil
} }
// LabelValues implements storage.Querier and is a noop. // LabelValues implements storage.Querier and is a noop.
@ -117,13 +117,13 @@ type externalLabelsQuerier struct {
// Select adds equality matchers for all external labels to the list of matchers // Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are // before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets. // removed from the returned series sets.
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
m, added := q.addExternalLabels(matchers) m, added := q.addExternalLabels(matchers)
s, err := q.Querier.Select(p, m...) s, err, warnings := q.Querier.Select(p, m...)
if err != nil { if err != nil {
return nil, err return nil, err, warnings
} }
return newSeriesSetFilter(s, added), nil return newSeriesSetFilter(s, added), nil, warnings
} }
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier // PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
@ -170,7 +170,7 @@ type requiredMatchersQuerier struct {
// Select returns a NoopSeriesSet if the given matchers don't match the label // Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
ms := q.requiredMatchers ms := q.requiredMatchers
for _, m := range matchers { for _, m := range matchers {
for i, r := range ms { for i, r := range ms {
@ -184,7 +184,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la
} }
} }
if len(ms) > 0 { if len(ms) > 0 {
return storage.NoopSeriesSet(), nil return storage.NoopSeriesSet(), nil, nil
} }
return q.Querier.Select(p, matchers...) return q.Querier.Select(p, matchers...)
} }
@ -225,6 +225,15 @@ func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.S
type seriesSetFilter struct { type seriesSetFilter struct {
storage.SeriesSet storage.SeriesSet
toFilter model.LabelSet toFilter model.LabelSet
querier storage.Querier
}
func (ssf *seriesSetFilter) GetQuerier() storage.Querier {
return ssf.querier
}
func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) {
ssf.querier = querier
} }
func (ssf seriesSetFilter) At() storage.Series { func (ssf seriesSetFilter) At() storage.Series {

View file

@ -42,7 +42,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
externalLabels: model.LabelSet{"region": "europe"}, externalLabels: model.LabelSet{"region": "europe"},
} }
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
have, err := q.Select(nil, matchers...) have, err, _ := q.Select(nil, matchers...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -157,8 +157,8 @@ type mockSeriesSet struct {
storage.SeriesSet storage.SeriesSet
} }
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) { func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
return mockSeriesSet{}, nil return mockSeriesSet{}, nil, nil
} }
func TestPreferLocalStorageFilter(t *testing.T) { func TestPreferLocalStorageFilter(t *testing.T) {
@ -313,7 +313,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
requiredMatchers: test.requiredMatchers, requiredMatchers: test.requiredMatchers,
} }
have, err := q.Select(nil, test.matchers...) have, err, _ := q.Select(nil, test.matchers...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View file

@ -140,7 +140,7 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
} }
queriers = append(queriers, q) queriers = append(queriers, q)
} }
return storage.NewMergeQuerier(queriers), nil return storage.NewMergeQuerier(nil, queriers), nil
} }
// Close the background processing of the storage queues. // Close the background processing of the storage queues.

View file

@ -230,7 +230,7 @@ type querier struct {
q tsdb.Querier q tsdb.Querier
} }
func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) { func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
ms := make([]tsdbLabels.Matcher, 0, len(oms)) ms := make([]tsdbLabels.Matcher, 0, len(oms))
for _, om := range oms { for _, om := range oms {
@ -238,9 +238,9 @@ func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storag
} }
set, err := q.q.Select(ms...) set, err := q.q.Select(ms...)
if err != nil { if err != nil {
return nil, err return nil, err, nil
} }
return seriesSet{set: set}, nil return seriesSet{set: set}, nil, nil
} }
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }

View file

@ -119,6 +119,14 @@ type response struct {
Data interface{} `json:"data,omitempty"` Data interface{} `json:"data,omitempty"`
ErrorType errorType `json:"errorType,omitempty"` ErrorType errorType `json:"errorType,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}
type apiFuncResult struct {
data interface{}
err *apiError
warnings storage.Warnings
finalizer func()
} }
// Enables cross-site script calls. // Enables cross-site script calls.
@ -128,7 +136,7 @@ func setCORS(w http.ResponseWriter) {
} }
} }
type apiFunc func(r *http.Request) (interface{}, *apiError, func()) type apiFunc func(r *http.Request) apiFuncResult
// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations. // TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations.
type TSDBAdmin interface { type TSDBAdmin interface {
@ -204,16 +212,16 @@ func (api *API) Register(r *route.Router) {
wrap := func(f apiFunc) http.HandlerFunc { wrap := func(f apiFunc) http.HandlerFunc {
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
setCORS(w) setCORS(w)
data, err, finalizer := f(r) result := f(r)
if err != nil { if result.err != nil {
api.respondError(w, err, data) api.respondError(w, result.err, result.data)
} else if data != nil { } else if result.data != nil {
api.respond(w, data) api.respond(w, result.data, result.warnings)
} else { } else {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
if finalizer != nil { if result.finalizer != nil {
finalizer() result.finalizer()
} }
}) })
return api.ready(httputil.CompressionHandler{ return api.ready(httputil.CompressionHandler{
@ -258,17 +266,17 @@ type queryData struct {
Stats *stats.QueryStats `json:"stats,omitempty"` Stats *stats.QueryStats `json:"stats,omitempty"`
} }
func (api *API) options(r *http.Request) (interface{}, *apiError, func()) { func (api *API) options(r *http.Request) apiFuncResult {
return nil, nil, nil return apiFuncResult{nil, nil, nil, nil}
} }
func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { func (api *API) query(r *http.Request) apiFuncResult {
var ts time.Time var ts time.Time
if t := r.FormValue("time"); t != "" { if t := r.FormValue("time"); t != "" {
var err error var err error
ts, err = parseTime(t) ts, err = parseTime(t)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
} else { } else {
ts = api.now() ts = api.now()
@ -279,7 +287,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
var cancel context.CancelFunc var cancel context.CancelFunc
timeout, err := parseDuration(to) timeout, err := parseDuration(to)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
ctx, cancel = context.WithTimeout(ctx, timeout) ctx, cancel = context.WithTimeout(ctx, timeout)
@ -288,12 +296,12 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
res := qry.Exec(ctx) res := qry.Exec(ctx)
if res.Err != nil { if res.Err != nil {
return nil, returnAPIError(res.Err), qry.Close return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
} }
// Optional stats field in response if parameter "stats" is not empty. // Optional stats field in response if parameter "stats" is not empty.
@ -302,42 +310,42 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
qs = stats.NewQueryStats(qry.Stats()) qs = stats.NewQueryStats(qry.Stats())
} }
return &queryData{ return apiFuncResult{&queryData{
ResultType: res.Value.Type(), ResultType: res.Value.Type(),
Result: res.Value, Result: res.Value,
Stats: qs, Stats: qs,
}, nil, qry.Close }, nil, res.Warnings, qry.Close}
} }
func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { func (api *API) queryRange(r *http.Request) apiFuncResult {
start, err := parseTime(r.FormValue("start")) start, err := parseTime(r.FormValue("start"))
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
end, err := parseTime(r.FormValue("end")) end, err := parseTime(r.FormValue("end"))
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
if end.Before(start) { if end.Before(start) {
err := errors.New("end timestamp must not be before start time") err := errors.New("end timestamp must not be before start time")
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
step, err := parseDuration(r.FormValue("step")) step, err := parseDuration(r.FormValue("step"))
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
if step <= 0 { if step <= 0 {
err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
// For safety, limit the number of returned points per timeseries. // For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year. // This is sufficient for 60s resolution for a week or 1h resolution for a year.
if end.Sub(start)/step > 11000 { if end.Sub(start)/step > 11000 {
err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
ctx := r.Context() ctx := r.Context()
@ -345,7 +353,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
var cancel context.CancelFunc var cancel context.CancelFunc
timeout, err := parseDuration(to) timeout, err := parseDuration(to)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
ctx, cancel = context.WithTimeout(ctx, timeout) ctx, cancel = context.WithTimeout(ctx, timeout)
@ -354,12 +362,12 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
res := qry.Exec(ctx) res := qry.Exec(ctx)
if res.Err != nil { if res.Err != nil {
return nil, returnAPIError(res.Err), qry.Close return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
} }
// Optional stats field in response if parameter "stats" is not empty. // Optional stats field in response if parameter "stats" is not empty.
@ -368,11 +376,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
qs = stats.NewQueryStats(qry.Stats()) qs = stats.NewQueryStats(qry.Stats())
} }
return &queryData{ return apiFuncResult{&queryData{
ResultType: res.Value.Type(), ResultType: res.Value.Type(),
Result: res.Value, Result: res.Value,
Stats: qs, Stats: qs,
}, nil, qry.Close }, nil, res.Warnings, qry.Close}
} }
func returnAPIError(err error) *apiError { func returnAPIError(err error) *apiError {
@ -392,39 +400,39 @@ func returnAPIError(err error) *apiError {
return &apiError{errorExec, err} return &apiError{errorExec, err}
} }
func (api *API) labelNames(r *http.Request) (interface{}, *apiError, func()) { func (api *API) labelNames(r *http.Request) apiFuncResult {
q, err := api.Queryable.Querier(r.Context(), math.MinInt64, math.MaxInt64) q, err := api.Queryable.Querier(r.Context(), math.MinInt64, math.MaxInt64)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err}, nil return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
} }
defer q.Close() defer q.Close()
names, err := q.LabelNames() names, err := q.LabelNames()
if err != nil { if err != nil {
return nil, &apiError{errorExec, err}, nil return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
} }
return names, nil, nil return apiFuncResult{names, nil, nil, nil}
} }
func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) { func (api *API) labelValues(r *http.Request) apiFuncResult {
ctx := r.Context() ctx := r.Context()
name := route.Param(ctx, "name") name := route.Param(ctx, "name")
if !model.LabelNameRE.MatchString(name) { if !model.LabelNameRE.MatchString(name) {
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil, nil}
} }
q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64) q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err}, nil return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
} }
defer q.Close() defer q.Close()
vals, err := q.LabelValues(name) vals, err := q.LabelValues(name)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err}, nil return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
} }
return vals, nil, nil return apiFuncResult{vals, nil, nil, nil}
} }
var ( var (
@ -432,12 +440,12 @@ var (
maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999) maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999)
) )
func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { func (api *API) series(r *http.Request) apiFuncResult {
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil, nil}
} }
if len(r.Form["match[]"]) == 0 { if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil, nil}
} }
var start time.Time var start time.Time
@ -445,7 +453,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
var err error var err error
start, err = parseTime(t) start, err = parseTime(t)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
} else { } else {
start = minTime start = minTime
@ -456,7 +464,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
var err error var err error
end, err = parseTime(t) end, err = parseTime(t)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
} else { } else {
end = maxTime end = maxTime
@ -466,40 +474,42 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
for _, s := range r.Form["match[]"] { for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
matcherSets = append(matcherSets, matchers) matcherSets = append(matcherSets, matchers)
} }
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil { if err != nil {
return nil, &apiError{errorExec, err}, nil return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
} }
defer q.Close() defer q.Close()
var sets []storage.SeriesSet var sets []storage.SeriesSet
var warnings storage.Warnings
for _, mset := range matcherSets { for _, mset := range matcherSets {
s, err := q.Select(nil, mset...) s, err, wrn := q.Select(nil, mset...) //TODO
warnings = append(warnings, wrn...)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err}, nil return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
} }
sets = append(sets, s) sets = append(sets, s)
} }
set := storage.NewMergeSeriesSet(sets) set := storage.NewMergeSeriesSet(sets, nil)
metrics := []labels.Labels{} metrics := []labels.Labels{}
for set.Next() { for set.Next() {
metrics = append(metrics, set.At().Labels()) metrics = append(metrics, set.At().Labels())
} }
if set.Err() != nil { if set.Err() != nil {
return nil, &apiError{errorExec, set.Err()}, nil return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, nil}
} }
return metrics, nil, nil return apiFuncResult{metrics, nil, warnings, nil}
} }
func (api *API) dropSeries(r *http.Request) (interface{}, *apiError, func()) { func (api *API) dropSeries(r *http.Request) apiFuncResult {
return nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil, nil}
} }
// Target has the information for one target. // Target has the information for one target.
@ -528,7 +538,7 @@ type TargetDiscovery struct {
DroppedTargets []*DroppedTarget `json:"droppedTargets"` DroppedTargets []*DroppedTarget `json:"droppedTargets"`
} }
func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) { func (api *API) targets(r *http.Request) apiFuncResult {
flatten := func(targets map[string][]*scrape.Target) []*scrape.Target { flatten := func(targets map[string][]*scrape.Target) []*scrape.Target {
var n int var n int
keys := make([]string, 0, len(targets)) keys := make([]string, 0, len(targets))
@ -570,7 +580,7 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) {
DiscoveredLabels: t.DiscoveredLabels().Map(), DiscoveredLabels: t.DiscoveredLabels().Map(),
}) })
} }
return res, nil, nil return apiFuncResult{res, nil, nil, nil}
} }
func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool { func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool {
@ -582,18 +592,18 @@ func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool {
return true return true
} }
func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func()) { func (api *API) targetMetadata(r *http.Request) apiFuncResult {
limit := -1 limit := -1
if s := r.FormValue("limit"); s != "" { if s := r.FormValue("limit"); s != "" {
var err error var err error
if limit, err = strconv.Atoi(s); err != nil { if limit, err = strconv.Atoi(s); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}, nil, nil}
} }
} }
matchers, err := promql.ParseMetricSelector(r.FormValue("match_target")) matchers, err := promql.ParseMetricSelector(r.FormValue("match_target"))
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
metric := r.FormValue("metric") metric := r.FormValue("metric")
@ -633,9 +643,9 @@ func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func())
} }
} }
if len(res) == 0 { if len(res) == 0 {
return nil, &apiError{errorNotFound, errors.New("specified metadata not found")}, nil return apiFuncResult{nil, &apiError{errorNotFound, errors.New("specified metadata not found")}, nil, nil}
} }
return res, nil, nil return apiFuncResult{res, nil, nil, nil}
} }
type metricMetadata struct { type metricMetadata struct {
@ -657,7 +667,7 @@ type AlertmanagerTarget struct {
URL string `json:"url"` URL string `json:"url"`
} }
func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func()) { func (api *API) alertmanagers(r *http.Request) apiFuncResult {
urls := api.alertmanagerRetriever.Alertmanagers() urls := api.alertmanagerRetriever.Alertmanagers()
droppedURLS := api.alertmanagerRetriever.DroppedAlertmanagers() droppedURLS := api.alertmanagerRetriever.DroppedAlertmanagers()
ams := &AlertmanagerDiscovery{ActiveAlertmanagers: make([]*AlertmanagerTarget, len(urls)), DroppedAlertmanagers: make([]*AlertmanagerTarget, len(droppedURLS))} ams := &AlertmanagerDiscovery{ActiveAlertmanagers: make([]*AlertmanagerTarget, len(urls)), DroppedAlertmanagers: make([]*AlertmanagerTarget, len(droppedURLS))}
@ -667,7 +677,7 @@ func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func())
for i, url := range droppedURLS { for i, url := range droppedURLS {
ams.DroppedAlertmanagers[i] = &AlertmanagerTarget{URL: url.String()} ams.DroppedAlertmanagers[i] = &AlertmanagerTarget{URL: url.String()}
} }
return ams, nil, nil return apiFuncResult{ams, nil, nil, nil}
} }
// AlertDiscovery has info for all active alerts. // AlertDiscovery has info for all active alerts.
@ -684,7 +694,7 @@ type Alert struct {
Value float64 `json:"value"` Value float64 `json:"value"`
} }
func (api *API) alerts(r *http.Request) (interface{}, *apiError, func()) { func (api *API) alerts(r *http.Request) apiFuncResult {
alertingRules := api.rulesRetriever.AlertingRules() alertingRules := api.rulesRetriever.AlertingRules()
alerts := []*Alert{} alerts := []*Alert{}
@ -697,7 +707,7 @@ func (api *API) alerts(r *http.Request) (interface{}, *apiError, func()) {
res := &AlertDiscovery{Alerts: alerts} res := &AlertDiscovery{Alerts: alerts}
return res, nil, nil return apiFuncResult{res, nil, nil, nil}
} }
func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert { func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert {
@ -756,7 +766,7 @@ type recordingRule struct {
Type string `json:"type"` Type string `json:"type"`
} }
func (api *API) rules(r *http.Request) (interface{}, *apiError, func()) { func (api *API) rules(r *http.Request) apiFuncResult {
ruleGroups := api.rulesRetriever.RuleGroups() ruleGroups := api.rulesRetriever.RuleGroups()
res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))} res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))}
for i, grp := range ruleGroups { for i, grp := range ruleGroups {
@ -799,29 +809,29 @@ func (api *API) rules(r *http.Request) (interface{}, *apiError, func()) {
} }
default: default:
err := fmt.Errorf("failed to assert type of rule '%v'", rule.Name()) err := fmt.Errorf("failed to assert type of rule '%v'", rule.Name())
return nil, &apiError{errorInternal, err}, nil return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }
apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule) apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule)
} }
res.RuleGroups[i] = apiRuleGroup res.RuleGroups[i] = apiRuleGroup
} }
return res, nil, nil return apiFuncResult{res, nil, nil, nil}
} }
type prometheusConfig struct { type prometheusConfig struct {
YAML string `json:"yaml"` YAML string `json:"yaml"`
} }
func (api *API) serveConfig(r *http.Request) (interface{}, *apiError, func()) { func (api *API) serveConfig(r *http.Request) apiFuncResult {
cfg := &prometheusConfig{ cfg := &prometheusConfig{
YAML: api.config().String(), YAML: api.config().String(),
} }
return cfg, nil, nil return apiFuncResult{cfg, nil, nil, nil}
} }
func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) { func (api *API) serveFlags(r *http.Request) apiFuncResult {
return api.flagsMap, nil, nil return apiFuncResult{api.flagsMap, nil, nil, nil}
} }
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
@ -873,7 +883,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
} }
} }
set, err := querier.Select(selectParams, filteredMatchers...) set, err, _ := querier.Select(selectParams, filteredMatchers...)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -911,20 +921,20 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
} }
} }
func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil}
} }
db := api.db() db := api.db()
if db == nil { if db == nil {
return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
} }
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil, nil}
} }
if len(r.Form["match[]"]) == 0 { if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil, nil}
} }
var start time.Time var start time.Time
@ -932,7 +942,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
var err error var err error
start, err = parseTime(t) start, err = parseTime(t)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
} else { } else {
start = minTime start = minTime
@ -943,7 +953,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
var err error var err error
end, err = parseTime(t) end, err = parseTime(t)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
} else { } else {
end = maxTime end = maxTime
@ -952,7 +962,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
for _, s := range r.Form["match[]"] { for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err}, nil return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
var selector tsdbLabels.Selector var selector tsdbLabels.Selector
@ -961,16 +971,16 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
} }
if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), selector...); err != nil { if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), selector...); err != nil {
return nil, &apiError{errorInternal, err}, nil return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }
} }
return nil, nil, nil return apiFuncResult{nil, nil, nil, nil}
} }
func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) { func (api *API) snapshot(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil}
} }
var ( var (
skipHead bool skipHead bool
@ -979,13 +989,13 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) {
if r.FormValue("skip_head") != "" { if r.FormValue("skip_head") != "" {
skipHead, err = strconv.ParseBool(r.FormValue("skip_head")) skipHead, err = strconv.ParseBool(r.FormValue("skip_head"))
if err != nil { if err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("unable to parse boolean 'skip_head' argument: %v", err)}, nil return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("unable to parse boolean 'skip_head' argument: %v", err)}, nil, nil}
} }
} }
db := api.db() db := api.db()
if db == nil { if db == nil {
return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
} }
var ( var (
@ -996,31 +1006,31 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) {
dir = filepath.Join(snapdir, name) dir = filepath.Join(snapdir, name)
) )
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil, nil}
} }
if err := db.Snapshot(dir, !skipHead); err != nil { if err := db.Snapshot(dir, !skipHead); err != nil {
return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil, nil}
} }
return struct { return apiFuncResult{struct {
Name string `json:"name"` Name string `json:"name"`
}{name}, nil, nil }{name}, nil, nil, nil}
} }
func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError, func()) { func (api *API) cleanTombstones(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil}
} }
db := api.db() db := api.db()
if db == nil { if db == nil {
return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
} }
if err := db.CleanTombstones(); err != nil { if err := db.CleanTombstones(); err != nil {
return nil, &apiError{errorInternal, err}, nil return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }
return nil, nil, nil return apiFuncResult{nil, nil, nil, nil}
} }
func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher {
@ -1075,11 +1085,17 @@ func mergeLabels(primary, secondary []*prompb.Label) []*prompb.Label {
return result return result
} }
func (api *API) respond(w http.ResponseWriter, data interface{}) { func (api *API) respond(w http.ResponseWriter, data interface{}, warnings storage.Warnings) {
statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
warningStrings = append(warningStrings, warning.Error())
}
json := jsoniter.ConfigCompatibleWithStandardLibrary json := jsoniter.ConfigCompatibleWithStandardLibrary
b, err := json.Marshal(&response{ b, err := json.Marshal(&response{
Status: statusSuccess, Status: statusMessage,
Data: data, Data: data,
Warnings: warningStrings,
}) })
if err != nil { if err != nil {
level.Error(api.logger).Log("msg", "error marshaling json response", "err", err) level.Error(api.logger).Log("msg", "error marshaling json response", "err", err)
@ -1134,6 +1150,7 @@ func (api *API) respondError(w http.ResponseWriter, apiErr *apiError, data inter
func parseTime(s string) (time.Time, error) { func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil { if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t) s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
} }
if t, err := time.Parse(time.RFC3339Nano, s); err == nil { if t, err := time.Parse(time.RFC3339Nano, s); err == nil {

View file

@ -349,9 +349,9 @@ func TestLabelNames(t *testing.T) {
ctx := context.Background() ctx := context.Background()
req, err := request(method) req, err := request(method)
testutil.Ok(t, err) testutil.Ok(t, err)
resp, apiErr, _ := api.labelNames(req.WithContext(ctx)) res := api.labelNames(req.WithContext(ctx))
assertAPIError(t, apiErr, "") assertAPIError(t, res.err, "")
assertAPIResponse(t, resp, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"}) assertAPIResponse(t, res.data, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"})
} }
} }
@ -379,7 +379,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
} }
defer querier.Close() defer querier.Close()
set, err := querier.Select(selectParams, matchers...) set, err, _ := querier.Select(selectParams, matchers...)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -857,9 +857,9 @@ func testEndpoints(t *testing.T, api *API, testLabelAPI bool) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
resp, apiErr, _ := test.endpoint(req.WithContext(ctx)) res := test.endpoint(req.WithContext(ctx))
assertAPIError(t, apiErr, test.errType) assertAPIError(t, res.err, test.errType)
assertAPIResponse(t, resp, test.response) assertAPIResponse(t, res.data, test.response)
} }
} }
} }
@ -1202,8 +1202,8 @@ func TestAdminEndpoints(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Error when creating test request: %s", err) t.Fatalf("Error when creating test request: %s", err)
} }
_, apiErr, _ := endpoint(req) res := endpoint(req)
assertAPIError(t, apiErr, tc.errType) assertAPIError(t, res.err, tc.errType)
}) })
} }
} }
@ -1211,7 +1211,7 @@ func TestAdminEndpoints(t *testing.T) {
func TestRespondSuccess(t *testing.T) { func TestRespondSuccess(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
api := API{} api := API{}
api.respond(w, "test") api.respond(w, "test", nil)
})) }))
defer s.Close() defer s.Close()
@ -1318,6 +1318,10 @@ func TestParseTime(t *testing.T) {
}, { }, {
input: "2015-06-03T14:21:58.555+01:00", input: "2015-06-03T14:21:58.555+01:00",
result: ts, result: ts,
}, {
// Test float rounding.
input: "1543578564.705",
result: time.Unix(1543578564, 705*1e6),
}, },
} }
@ -1502,7 +1506,7 @@ func TestRespond(t *testing.T) {
for _, c := range cases { for _, c := range cases {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
api := API{} api := API{}
api.respond(w, c.response) api.respond(w, c.response, nil)
})) }))
defer s.Close() defer s.Close()
@ -1543,6 +1547,6 @@ func BenchmarkRespond(b *testing.B) {
b.ResetTimer() b.ResetTimer()
api := API{} api := API{}
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
api.respond(&testResponseWriter, response) api.respond(&testResponseWriter, response, nil)
} }
} }

View file

@ -37,6 +37,10 @@ var (
Name: "prometheus_web_federation_errors_total", Name: "prometheus_web_federation_errors_total",
Help: "Total number of errors that occurred while sending federation responses.", Help: "Total number of errors that occurred while sending federation responses.",
}) })
federationWarnings = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_web_federation_warnings_total",
Help: "Total number of warnings that occurred while sending federation responses.",
})
) )
func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
@ -83,7 +87,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
var sets []storage.SeriesSet var sets []storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
s, err := q.Select(params, mset...) s, err, wrns := q.Select(params, mset...)
if wrns != nil {
level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns)
federationErrors.Add(float64(len(wrns)))
}
if err != nil { if err != nil {
federationErrors.Inc() federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -92,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sets = append(sets, s) sets = append(sets, s)
} }
set := storage.NewMergeSeriesSet(sets) set := storage.NewMergeSeriesSet(sets, nil)
it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6)) it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6))
for set.Next() { for set.Next() {
s := set.At() s := set.At()

View file

@ -55,7 +55,6 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
@ -191,6 +190,12 @@ type Options struct {
RemoteReadConcurrencyLimit int RemoteReadConcurrencyLimit int
} }
func instrumentHandlerWithPrefix(prefix string) func(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
return func(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
return instrumentHandler(prefix+handlerName, handler)
}
}
func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
return promhttp.InstrumentHandlerDuration( return promhttp.InstrumentHandlerDuration(
requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}),
@ -459,7 +464,7 @@ func (h *Handler) Run(ctx context.Context) error {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/", h.router) mux.Handle("/", h.router)
av1 := route.New().WithInstrumentation(instrumentHandler) av1 := route.New().WithInstrumentation(instrumentHandlerWithPrefix("/api/v1"))
h.apiV1.Register(av1) h.apiV1.Register(av1)
apiPath := "/api" apiPath := "/api"
if h.options.RoutePrefix != "/" { if h.options.RoutePrefix != "/" {
@ -703,7 +708,8 @@ func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
tps := h.scrapeManager.TargetsActive() tps := h.scrapeManager.TargetsActive()
for _, targets := range tps { for _, targets := range tps {
sort.Slice(targets, func(i, j int) bool { sort.Slice(targets, func(i, j int) bool {
return targets[i].Labels().Get(labels.InstanceName) < targets[j].Labels().Get(labels.InstanceName) return targets[i].Labels().Get(model.JobLabel) < targets[j].Labels().Get(model.JobLabel) ||
targets[i].Labels().Get(model.InstanceLabel) < targets[j].Labels().Get(model.InstanceLabel)
}) })
} }