diff --git a/.promu.yml b/.promu.yml index 68016677b5..5937513b56 100644 --- a/.promu.yml +++ b/.promu.yml @@ -12,11 +12,11 @@ build: path: ./cmd/promtool flags: -mod=vendor -a -tags netgo ldflags: | - -X {{repoPath}}/vendor/github.com/prometheus/common/version.Version={{.Version}} - -X {{repoPath}}/vendor/github.com/prometheus/common/version.Revision={{.Revision}} - -X {{repoPath}}/vendor/github.com/prometheus/common/version.Branch={{.Branch}} - -X {{repoPath}}/vendor/github.com/prometheus/common/version.BuildUser={{user}}@{{host}} - -X {{repoPath}}/vendor/github.com/prometheus/common/version.BuildDate={{date "20060102-15:04:05"}} + -X github.com/prometheus/common/version.Version={{.Version}} + -X github.com/prometheus/common/version.Revision={{.Revision}} + -X github.com/prometheus/common/version.Branch={{.Branch}} + -X github.com/prometheus/common/version.BuildUser={{user}}@{{host}} + -X github.com/prometheus/common/version.BuildDate={{date "20060102-15:04:05"}} tarball: files: - consoles diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 48330b13e9..e00da702c0 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -1,6 +1,6 @@ Maintainers of this repository with their focus areas: * Brian Brazil @brian-brazil: Console templates; semantics of PromQL, service discovery, and relabeling. -* Fabian Reinartz @fabxc: PromQL parsing and evaluation; implementation of retrieval, alert notification, and service discovery. +* Fabian Reinartz @fabxc: PromQL parsing and evaluation; implementation of retrieval, alert notification, and service discovery. * Julius Volz @juliusv: Remote storage integrations; web UI. diff --git a/Makefile.common b/Makefile.common index 741579e60f..8e135c5545 100644 --- a/Makefile.common +++ b/Makefile.common @@ -175,9 +175,11 @@ common-docker-tag-latest: promu: $(PROMU) $(PROMU): - curl -s -L $(PROMU_URL) | tar -xvz -C /tmp - mkdir -v -p $(FIRST_GOPATH)/bin - cp -v /tmp/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(PROMU) + $(eval PROMU_TMP := $(shell mktemp -d)) + curl -s -L $(PROMU_URL) | tar -xvzf - -C $(PROMU_TMP) + mkdir -p $(FIRST_GOPATH)/bin + cp $(PROMU_TMP)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(FIRST_GOPATH)/bin/promu + rm -r $(PROMU_TMP) .PHONY: proto proto: diff --git a/config/config.go b/config/config.go index 8ac49209fc..253fb2c38b 100644 --- a/config/config.go +++ b/config/config.go @@ -233,6 +233,9 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { // Do global overrides and validate unique names. jobNames := map[string]struct{}{} for _, scfg := range c.ScrapeConfigs { + if scfg == nil { + return fmt.Errorf("empty or null scrape config section") + } // First set the correct scrape interval, then check that the timeout // (inferred or explicit) is not greater than that. if scfg.ScrapeInterval == 0 { @@ -254,6 +257,16 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { } jobNames[scfg.JobName] = struct{}{} } + for _, rwcfg := range c.RemoteWriteConfigs { + if rwcfg == nil { + return fmt.Errorf("empty or null remote write config section") + } + } + for _, rrcfg := range c.RemoteReadConfigs { + if rrcfg == nil { + return fmt.Errorf("empty or null remote read config section") + } + } return nil } @@ -360,6 +373,13 @@ func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } + // The UnmarshalYAML method of ServiceDiscoveryConfig is not being called because it's not a pointer. + // We cannot make it a pointer as the parser panics for inlined pointer structs. + // Thus we just do its validation here. + if err := c.ServiceDiscoveryConfig.Validate(); err != nil { + return err + } + // Check for users putting URLs in target groups. if len(c.RelabelConfigs) == 0 { for _, tg := range c.ServiceDiscoveryConfig.StaticConfigs { @@ -371,6 +391,17 @@ func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { } } + for _, rlcfg := range c.RelabelConfigs { + if rlcfg == nil { + return fmt.Errorf("empty or null target relabeling rule in scrape config") + } + } + for _, rlcfg := range c.MetricRelabelConfigs { + if rlcfg == nil { + return fmt.Errorf("empty or null metric relabeling rule in scrape config") + } + } + // Add index to the static config target groups for unique identification // within scrape pool. for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs { @@ -392,7 +423,16 @@ func (c *AlertingConfig) UnmarshalYAML(unmarshal func(interface{}) error) error // by the default due to the YAML parser behavior for empty blocks. *c = AlertingConfig{} type plain AlertingConfig - return unmarshal((*plain)(c)) + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + for _, rlcfg := range c.AlertRelabelConfigs { + if rlcfg == nil { + return fmt.Errorf("empty or null alert relabeling rule") + } + } + return nil } // AlertmanagerConfig configures how Alertmanagers can be discovered and communicated with. @@ -429,6 +469,13 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er return err } + // The UnmarshalYAML method of ServiceDiscoveryConfig is not being called because it's not a pointer. + // We cannot make it a pointer as the parser panics for inlined pointer structs. + // Thus we just do its validation here. + if err := c.ServiceDiscoveryConfig.Validate(); err != nil { + return err + } + // Check for users putting URLs in target groups. if len(c.RelabelConfigs) == 0 { for _, tg := range c.ServiceDiscoveryConfig.StaticConfigs { @@ -440,6 +487,12 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er } } + for _, rlcfg := range c.RelabelConfigs { + if rlcfg == nil { + return fmt.Errorf("empty or null Alertmanager target relabeling rule") + } + } + // Add index to the static config target groups for unique identification // within scrape pool. for i, tg := range c.ServiceDiscoveryConfig.StaticConfigs { @@ -632,6 +685,11 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err if c.URL == nil { return fmt.Errorf("url for remote_write is empty") } + for _, rlcfg := range c.WriteRelabelConfigs { + if rlcfg == nil { + return fmt.Errorf("empty or null relabeling rule in remote write config") + } + } // The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer. // We cannot make it a pointer as the parser panics for inlined pointer structs. diff --git a/config/config_test.go b/config/config_test.go index 63ed951259..e04fd56d25 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -751,6 +751,58 @@ var expectedErrors = []struct { filename: "section_key_dup.bad.yml", errMsg: "field scrape_configs already set in type config.plain", }, + { + filename: "azure_client_id_missing.bad.yml", + errMsg: "Azure SD configuration requires a client_id", + }, + { + filename: "azure_client_secret_missing.bad.yml", + errMsg: "Azure SD configuration requires a client_secret", + }, + { + filename: "azure_subscription_id_missing.bad.yml", + errMsg: "Azure SD configuration requires a subscription_id", + }, + { + filename: "azure_tenant_id_missing.bad.yml", + errMsg: "Azure SD configuration requires a tenant_id", + }, + { + filename: "empty_scrape_config.bad.yml", + errMsg: "empty or null scrape config section", + }, + { + filename: "empty_rw_config.bad.yml", + errMsg: "empty or null remote write config section", + }, + { + filename: "empty_rr_config.bad.yml", + errMsg: "empty or null remote read config section", + }, + { + filename: "empty_target_relabel_config.bad.yml", + errMsg: "empty or null target relabeling rule", + }, + { + filename: "empty_metric_relabel_config.bad.yml", + errMsg: "empty or null metric relabeling rule", + }, + { + filename: "empty_alert_relabel_config.bad.yml", + errMsg: "empty or null alert relabeling rule", + }, + { + filename: "empty_alertmanager_relabel_config.bad.yml", + errMsg: "empty or null Alertmanager target relabeling rule", + }, + { + filename: "empty_rw_relabel_config.bad.yml", + errMsg: "empty or null relabeling rule in remote write config", + }, + { + filename: "empty_static_config.bad.yml", + errMsg: "empty or null section in static_configs", + }, } func TestBadConfigs(t *testing.T) { diff --git a/config/testdata/azure_client_id_missing.bad.yml b/config/testdata/azure_client_id_missing.bad.yml new file mode 100644 index 0000000000..f8da2ff9c9 --- /dev/null +++ b/config/testdata/azure_client_id_missing.bad.yml @@ -0,0 +1,7 @@ +scrape_configs: + - job_name: azure + azure_sd_configs: + - subscription_id: 11AAAA11-A11A-111A-A111-1111A1111A11 + tenant_id: BBBB222B-B2B2-2B22-B222-2BB2222BB2B2 + client_id: + client_secret: mysecret \ No newline at end of file diff --git a/config/testdata/azure_client_secret_missing.bad.yml b/config/testdata/azure_client_secret_missing.bad.yml new file mode 100644 index 0000000000..1295c8ad57 --- /dev/null +++ b/config/testdata/azure_client_secret_missing.bad.yml @@ -0,0 +1,7 @@ +scrape_configs: + - job_name: azure + azure_sd_configs: + - subscription_id: 11AAAA11-A11A-111A-A111-1111A1111A11 + tenant_id: BBBB222B-B2B2-2B22-B222-2BB2222BB2B2 + client_id: 333333CC-3C33-3333-CCC3-33C3CCCCC33C + client_secret: \ No newline at end of file diff --git a/config/testdata/azure_subscription_id_missing.bad.yml b/config/testdata/azure_subscription_id_missing.bad.yml new file mode 100644 index 0000000000..9976138823 --- /dev/null +++ b/config/testdata/azure_subscription_id_missing.bad.yml @@ -0,0 +1,7 @@ +scrape_configs: + - job_name: azure + azure_sd_configs: + - subscription_id: + tenant_id: BBBB222B-B2B2-2B22-B222-2BB2222BB2B2 + client_id: 333333CC-3C33-3333-CCC3-33C3CCCCC33C + client_secret: mysecret \ No newline at end of file diff --git a/config/testdata/azure_tenant_id_missing.bad.yml b/config/testdata/azure_tenant_id_missing.bad.yml new file mode 100644 index 0000000000..ac714d9b52 --- /dev/null +++ b/config/testdata/azure_tenant_id_missing.bad.yml @@ -0,0 +1,7 @@ +scrape_configs: + - job_name: azure + azure_sd_configs: + - subscription_id: 11AAAA11-A11A-111A-A111-1111A1111A11 + tenant_id: + client_id: 333333CC-3C33-3333-CCC3-33C3CCCCC33C + client_secret: mysecret \ No newline at end of file diff --git a/config/testdata/empty_alert_relabel_config.bad.yml b/config/testdata/empty_alert_relabel_config.bad.yml new file mode 100644 index 0000000000..b863bf23a0 --- /dev/null +++ b/config/testdata/empty_alert_relabel_config.bad.yml @@ -0,0 +1,3 @@ +alerting: + alert_relabel_configs: + - diff --git a/config/testdata/empty_alertmanager_relabel_config.bad.yml b/config/testdata/empty_alertmanager_relabel_config.bad.yml new file mode 100644 index 0000000000..6d99ac4dc6 --- /dev/null +++ b/config/testdata/empty_alertmanager_relabel_config.bad.yml @@ -0,0 +1,4 @@ +alerting: + alertmanagers: + - relabel_configs: + - diff --git a/config/testdata/empty_metric_relabel_config.bad.yml b/config/testdata/empty_metric_relabel_config.bad.yml new file mode 100644 index 0000000000..d2485e3527 --- /dev/null +++ b/config/testdata/empty_metric_relabel_config.bad.yml @@ -0,0 +1,4 @@ +scrape_configs: +- job_name: "test" + metric_relabel_configs: + - diff --git a/config/testdata/empty_rr_config.bad.yml b/config/testdata/empty_rr_config.bad.yml new file mode 100644 index 0000000000..e3bcca598c --- /dev/null +++ b/config/testdata/empty_rr_config.bad.yml @@ -0,0 +1,2 @@ +remote_read: +- diff --git a/config/testdata/empty_rw_config.bad.yml b/config/testdata/empty_rw_config.bad.yml new file mode 100644 index 0000000000..6f16030e65 --- /dev/null +++ b/config/testdata/empty_rw_config.bad.yml @@ -0,0 +1,2 @@ +remote_write: +- diff --git a/config/testdata/empty_rw_relabel_config.bad.yml b/config/testdata/empty_rw_relabel_config.bad.yml new file mode 100644 index 0000000000..6d5418290c --- /dev/null +++ b/config/testdata/empty_rw_relabel_config.bad.yml @@ -0,0 +1,4 @@ +remote_write: + - url: "foo" + write_relabel_configs: + - \ No newline at end of file diff --git a/config/testdata/empty_scrape_config.bad.yml b/config/testdata/empty_scrape_config.bad.yml new file mode 100644 index 0000000000..8c300deaab --- /dev/null +++ b/config/testdata/empty_scrape_config.bad.yml @@ -0,0 +1,2 @@ +scrape_configs: +- \ No newline at end of file diff --git a/config/testdata/empty_static_config.bad.yml b/config/testdata/empty_static_config.bad.yml new file mode 100644 index 0000000000..464a0a6fbe --- /dev/null +++ b/config/testdata/empty_static_config.bad.yml @@ -0,0 +1,4 @@ +scrape_configs: +- job_name: "test" + static_configs: + - diff --git a/config/testdata/empty_target_relabel_config.bad.yml b/config/testdata/empty_target_relabel_config.bad.yml new file mode 100644 index 0000000000..7324b10411 --- /dev/null +++ b/config/testdata/empty_target_relabel_config.bad.yml @@ -0,0 +1,4 @@ +scrape_configs: +- job_name: "test" + relabel_configs: + - diff --git a/discovery/azure/azure.go b/discovery/azure/azure.go index 6a553af960..58a7ca8e00 100644 --- a/discovery/azure/azure.go +++ b/discovery/azure/azure.go @@ -47,6 +47,7 @@ const ( azureLabelMachinePrivateIP = azureLabel + "machine_private_ip" azureLabelMachineTag = azureLabel + "machine_tag_" azureLabelMachineScaleSet = azureLabel + "machine_scale_set" + azureLabelPowerState = azureLabel + "machine_power_state" ) var ( @@ -80,6 +81,13 @@ type SDConfig struct { RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` } +func validateAuthParam(param, name string) error { + if len(param) == 0 { + return fmt.Errorf("Azure SD configuration requires a %s", name) + } + return nil +} + // UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultSDConfig @@ -88,8 +96,17 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err != nil { return err } - if c.SubscriptionID == "" { - return fmt.Errorf("Azure SD configuration requires a subscription_id") + if err = validateAuthParam(c.SubscriptionID, "subscription_id"); err != nil { + return err + } + if err = validateAuthParam(c.TenantID, "tenant_id"); err != nil { + return err + } + if err = validateAuthParam(c.ClientID, "client_id"); err != nil { + return err + } + if err = validateAuthParam(string(c.ClientSecret), "client_secret"); err != nil { + return err } return nil } @@ -212,6 +229,7 @@ type virtualMachine struct { ScaleSet string Tags map[string]*string NetworkProfile compute.NetworkProfile + PowerStateCode string } // Create a new azureResource object from an ID string. @@ -286,12 +304,21 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) { return } + // We check if the virtual machine has been deallocated. + // If so, we skip them in service discovery. + if strings.EqualFold(vm.PowerStateCode, "PowerState/deallocated") { + level.Debug(d.logger).Log("msg", "Skipping virtual machine", "machine", vm.Name, "power_state", vm.PowerStateCode) + ch <- target{} + return + } + labels := model.LabelSet{ azureLabelMachineID: model.LabelValue(vm.ID), azureLabelMachineName: model.LabelValue(vm.Name), azureLabelMachineOSType: model.LabelValue(vm.OsType), azureLabelMachineLocation: model.LabelValue(vm.Location), azureLabelMachineResourceGroup: model.LabelValue(r.ResourceGroup), + azureLabelPowerState: model.LabelValue(vm.PowerStateCode), } if vm.ScaleSet != "" { @@ -319,16 +346,6 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) { continue } - // Unfortunately Azure does not return information on whether a VM is deallocated. - // This information is available via another API call however the Go SDK does not - // yet support this. On deallocated machines, this value happens to be nil so it - // is a cheap and easy way to determine if a machine is allocated or not. - if networkInterface.Properties.Primary == nil { - level.Debug(d.logger).Log("msg", "Skipping deallocated virtual machine", "machine", vm.Name) - ch <- target{} - return - } - if *networkInterface.Properties.Primary { for _, ip := range *networkInterface.Properties.IPConfigurations { if ip.Properties.PrivateIPAddress != nil { @@ -456,6 +473,7 @@ func mapFromVM(vm compute.VirtualMachine) virtualMachine { ScaleSet: "", Tags: tags, NetworkProfile: *(vm.Properties.NetworkProfile), + PowerStateCode: getPowerStateFromVMInstanceView(vm.Properties.InstanceView), } } @@ -476,6 +494,7 @@ func mapFromVMScaleSetVM(vm compute.VirtualMachineScaleSetVM, scaleSetName strin ScaleSet: scaleSetName, Tags: tags, NetworkProfile: *(vm.Properties.NetworkProfile), + PowerStateCode: getPowerStateFromVMInstanceView(vm.Properties.InstanceView), } } @@ -508,3 +527,16 @@ func (client *azureClient) getNetworkInterfaceByID(networkInterfaceID string) (n return result, nil } + +func getPowerStateFromVMInstanceView(instanceView *compute.VirtualMachineInstanceView) (powerState string) { + if instanceView.Statuses == nil { + return + } + for _, ivs := range *instanceView.Statuses { + code := *(ivs.Code) + if strings.HasPrefix(code, "PowerState") { + powerState = code + } + } + return +} diff --git a/discovery/azure/azure_test.go b/discovery/azure/azure_test.go index daed79ea6f..7dab352430 100644 --- a/discovery/azure/azure_test.go +++ b/discovery/azure/azure_test.go @@ -26,6 +26,10 @@ func TestMapFromVMWithEmptyTags(t *testing.T) { vmType := "type" location := "westeurope" networkProfile := compute.NetworkProfile{} + provisioningStatusCode := "ProvisioningState/succeeded" + provisionDisplayStatus := "Provisioning succeeded" + powerStatusCode := "PowerState/running" + powerDisplayStatus := "VM running" properties := &compute.VirtualMachineProperties{ StorageProfile: &compute.StorageProfile{ OsDisk: &compute.OSDisk{ @@ -33,6 +37,20 @@ func TestMapFromVMWithEmptyTags(t *testing.T) { }, }, NetworkProfile: &networkProfile, + InstanceView: &compute.VirtualMachineInstanceView{ + Statuses: &[]compute.InstanceViewStatus{ + { + Code: &provisioningStatusCode, + Level: "Info", + DisplayStatus: &provisionDisplayStatus, + }, + { + Code: &powerStatusCode, + Level: "Info", + DisplayStatus: &powerDisplayStatus, + }, + }, + }, } testVM := compute.VirtualMachine{ @@ -52,6 +70,7 @@ func TestMapFromVMWithEmptyTags(t *testing.T) { OsType: "Linux", Tags: map[string]*string{}, NetworkProfile: networkProfile, + PowerStateCode: "PowerState/running", } actualVM := mapFromVM(testVM) @@ -69,6 +88,10 @@ func TestMapFromVMWithTags(t *testing.T) { tags := map[string]*string{ "prometheus": new(string), } + provisioningStatusCode := "ProvisioningState/succeeded" + provisionDisplayStatus := "Provisioning succeeded" + powerStatusCode := "PowerState/running" + powerDisplayStatus := "VM running" networkProfile := compute.NetworkProfile{} properties := &compute.VirtualMachineProperties{ StorageProfile: &compute.StorageProfile{ @@ -77,6 +100,20 @@ func TestMapFromVMWithTags(t *testing.T) { }, }, NetworkProfile: &networkProfile, + InstanceView: &compute.VirtualMachineInstanceView{ + Statuses: &[]compute.InstanceViewStatus{ + { + Code: &provisioningStatusCode, + Level: "Info", + DisplayStatus: &provisionDisplayStatus, + }, + { + Code: &powerStatusCode, + Level: "Info", + DisplayStatus: &powerDisplayStatus, + }, + }, + }, } testVM := compute.VirtualMachine{ @@ -96,6 +133,7 @@ func TestMapFromVMWithTags(t *testing.T) { OsType: "Linux", Tags: tags, NetworkProfile: networkProfile, + PowerStateCode: "PowerState/running", } actualVM := mapFromVM(testVM) @@ -111,6 +149,10 @@ func TestMapFromVMScaleSetVMWithEmptyTags(t *testing.T) { vmType := "type" location := "westeurope" networkProfile := compute.NetworkProfile{} + provisioningStatusCode := "ProvisioningState/succeeded" + provisionDisplayStatus := "Provisioning succeeded" + powerStatusCode := "PowerState/running" + powerDisplayStatus := "VM running" properties := &compute.VirtualMachineScaleSetVMProperties{ StorageProfile: &compute.StorageProfile{ OsDisk: &compute.OSDisk{ @@ -118,6 +160,20 @@ func TestMapFromVMScaleSetVMWithEmptyTags(t *testing.T) { }, }, NetworkProfile: &networkProfile, + InstanceView: &compute.VirtualMachineInstanceView{ + Statuses: &[]compute.InstanceViewStatus{ + { + Code: &provisioningStatusCode, + Level: "Info", + DisplayStatus: &provisionDisplayStatus, + }, + { + Code: &powerStatusCode, + Level: "Info", + DisplayStatus: &powerDisplayStatus, + }, + }, + }, } testVM := compute.VirtualMachineScaleSetVM{ @@ -139,6 +195,7 @@ func TestMapFromVMScaleSetVMWithEmptyTags(t *testing.T) { Tags: map[string]*string{}, NetworkProfile: networkProfile, ScaleSet: scaleSet, + PowerStateCode: "PowerState/running", } actualVM := mapFromVMScaleSetVM(testVM, scaleSet) @@ -157,6 +214,10 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) { "prometheus": new(string), } networkProfile := compute.NetworkProfile{} + provisioningStatusCode := "ProvisioningState/succeeded" + provisionDisplayStatus := "Provisioning succeeded" + powerStatusCode := "PowerState/running" + powerDisplayStatus := "VM running" properties := &compute.VirtualMachineScaleSetVMProperties{ StorageProfile: &compute.StorageProfile{ OsDisk: &compute.OSDisk{ @@ -164,6 +225,20 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) { }, }, NetworkProfile: &networkProfile, + InstanceView: &compute.VirtualMachineInstanceView{ + Statuses: &[]compute.InstanceViewStatus{ + { + Code: &provisioningStatusCode, + Level: "Info", + DisplayStatus: &provisionDisplayStatus, + }, + { + Code: &powerStatusCode, + Level: "Info", + DisplayStatus: &powerDisplayStatus, + }, + }, + }, } testVM := compute.VirtualMachineScaleSetVM{ @@ -185,6 +260,7 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) { Tags: tags, NetworkProfile: networkProfile, ScaleSet: scaleSet, + PowerStateCode: "PowerState/running", } actualVM := mapFromVMScaleSetVM(testVM, scaleSet) @@ -193,3 +269,52 @@ func TestMapFromVMScaleSetVMWithTags(t *testing.T) { t.Errorf("Expected %v got %v", expectedVM, actualVM) } } + +func TestGetPowerStatusFromVM(t *testing.T) { + provisioningStatusCode := "ProvisioningState/succeeded" + provisionDisplayStatus := "Provisioning succeeded" + powerStatusCode := "PowerState/running" + powerDisplayStatus := "VM running" + properties := &compute.VirtualMachineScaleSetVMProperties{ + StorageProfile: &compute.StorageProfile{ + OsDisk: &compute.OSDisk{ + OsType: "Linux", + }, + }, + InstanceView: &compute.VirtualMachineInstanceView{ + Statuses: &[]compute.InstanceViewStatus{ + { + Code: &provisioningStatusCode, + Level: "Info", + DisplayStatus: &provisionDisplayStatus, + }, + { + Code: &powerStatusCode, + Level: "Info", + DisplayStatus: &powerDisplayStatus, + }, + }, + }, + } + + testVM := compute.VirtualMachineScaleSetVM{ + Properties: properties, + } + + actual := getPowerStateFromVMInstanceView(testVM.Properties.InstanceView) + + expected := "PowerState/running" + + if actual != expected { + t.Errorf("expected powerStatus %s, but got %s instead", expected, actual) + } + + // Noq we test a virtualMachine with an empty InstanceView struct. + testVM.Properties.InstanceView = &compute.VirtualMachineInstanceView{} + + actual = getPowerStateFromVMInstanceView(testVM.Properties.InstanceView) + + if actual != "" { + t.Errorf("expected powerStatus %s, but got %s instead", expected, actual) + } +} diff --git a/discovery/config/config.go b/discovery/config/config.go index 68a9bc1c5e..80cab513b0 100644 --- a/discovery/config/config.go +++ b/discovery/config/config.go @@ -14,6 +14,8 @@ package config import ( + "fmt" + "github.com/prometheus/prometheus/discovery/azure" "github.com/prometheus/prometheus/discovery/consul" "github.com/prometheus/prometheus/discovery/dns" @@ -58,8 +60,67 @@ type ServiceDiscoveryConfig struct { TritonSDConfigs []*triton.SDConfig `yaml:"triton_sd_configs,omitempty"` } -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (c *ServiceDiscoveryConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { - type plain ServiceDiscoveryConfig - return unmarshal((*plain)(c)) +// Validate validates the ServiceDiscoveryConfig. +func (c *ServiceDiscoveryConfig) Validate() error { + for _, cfg := range c.AzureSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in azure_sd_configs") + } + } + for _, cfg := range c.ConsulSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in consul_sd_configs") + } + } + for _, cfg := range c.DNSSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in dns_sd_configs") + } + } + for _, cfg := range c.EC2SDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in ec2_sd_configs") + } + } + for _, cfg := range c.FileSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in file_sd_configs") + } + } + for _, cfg := range c.GCESDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in gce_sd_configs") + } + } + for _, cfg := range c.KubernetesSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in kubernetes_sd_configs") + } + } + for _, cfg := range c.MarathonSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in marathon_sd_configs") + } + } + for _, cfg := range c.NerveSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in nerve_sd_configs") + } + } + for _, cfg := range c.OpenstackSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in openstack_sd_configs") + } + } + for _, cfg := range c.ServersetSDConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in serverset_sd_configs") + } + } + for _, cfg := range c.StaticConfigs { + if cfg == nil { + return fmt.Errorf("empty or null section in static_configs") + } + } + return nil } diff --git a/discovery/ec2/ec2.go b/discovery/ec2/ec2.go index ca3f767eb3..c4bc7f6748 100644 --- a/discovery/ec2/ec2.go +++ b/discovery/ec2/ec2.go @@ -46,6 +46,7 @@ const ( ec2LabelPlatform = ec2Label + "platform" ec2LabelPublicDNS = ec2Label + "public_dns_name" ec2LabelPublicIP = ec2Label + "public_ip" + ec2LabelPrivateDNS = ec2Label + "private_dns_name" ec2LabelPrivateIP = ec2Label + "private_ip" ec2LabelPrimarySubnetID = ec2Label + "primary_subnet_id" ec2LabelSubnetID = ec2Label + "subnet_id" @@ -250,6 +251,9 @@ func (d *Discovery) refresh() (tg *targetgroup.Group, err error) { } labels[ec2LabelPrivateIP] = model.LabelValue(*inst.PrivateIpAddress) + if inst.PrivateDnsName != nil { + labels[ec2LabelPrivateDNS] = model.LabelValue(*inst.PrivateDnsName) + } addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", d.port)) labels[model.AddressLabel] = model.LabelValue(addr) diff --git a/discovery/manager.go b/discovery/manager.go index 6f5ad2d03d..00a6258282 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -314,11 +314,13 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { } func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) { + var added bool add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) { t := reflect.TypeOf(cfg).String() for _, p := range m.providers { if reflect.DeepEqual(cfg, p.config) { p.subs = append(p.subs, setName) + added = true return } } @@ -337,6 +339,7 @@ func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setNam subs: []string{setName}, } m.providers = append(m.providers, &provider) + added = true } for _, c := range cfg.DNSSDConfigs { @@ -401,7 +404,17 @@ func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setNam } if len(cfg.StaticConfigs) > 0 { add(setName, func() (Discoverer, error) { - return &StaticProvider{cfg.StaticConfigs}, nil + return &StaticProvider{TargetGroups: cfg.StaticConfigs}, nil + }) + } + if !added { + // Add an empty target group to force the refresh of the corresponding + // scrape pool and to notify the receiver that this target set has no + // current targets. + // It can happen because the combined set of SD configurations is empty + // or because we fail to instantiate all the SD configurations. + add(setName, func() (Discoverer, error) { + return &StaticProvider{TargetGroups: []*targetgroup.Group{&targetgroup.Group{}}}, nil }) } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index d58f1643fc..220c99b8ae 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -719,6 +719,7 @@ func assertEqualGroups(t *testing.T, got, expected []*targetgroup.Group, msg fun } func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { + t.Helper() if _, ok := tSets[poolKey]; !ok { t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) return @@ -741,7 +742,7 @@ func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Grou if !present { msg = "not" } - t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets) + t.Fatalf("%q should %s be present in Targets labels: %q", label, msg, mergedTargets) } } @@ -781,7 +782,7 @@ scrape_configs: - targets: ["foo:9090"] ` if err := yaml.UnmarshalStrict([]byte(sTwo), cfg); err != nil { - t.Fatalf("Unable to load YAML config sOne: %s", err) + t.Fatalf("Unable to load YAML config sTwo: %s", err) } c = make(map[string]sd_config.ServiceDiscoveryConfig) for _, v := range cfg.ScrapeConfigs { @@ -794,6 +795,67 @@ scrape_configs: verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", false) } +// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after +// removing all targets from the static_configs sends an update with empty targetGroups. +// This is required to signal the receiver that this target set has no current targets. +func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { + cfg := &config.Config{} + + sOne := ` +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ["foo:9090"] +` + if err := yaml.UnmarshalStrict([]byte(sOne), cfg); err != nil { + t.Fatalf("Unable to load YAML config sOne: %s", err) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true) + + sTwo := ` +scrape_configs: + - job_name: 'prometheus' + static_configs: +` + if err := yaml.UnmarshalStrict([]byte(sTwo), cfg); err != nil { + t.Fatalf("Unable to load YAML config sTwo: %s", err) + } + c = make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + + pkey := poolKey{setName: "prometheus", provider: "string/0"} + targetGroups, ok := discoveryManager.targets[pkey] + if !ok { + t.Fatalf("'%v' should be present in target groups", pkey) + } + group, ok := targetGroups[""] + if !ok { + t.Fatalf("missing '' key in target groups %v", targetGroups) + } + + if len(group.Targets) != 0 { + t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets)) + } +} + func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { tmpFile, err := ioutil.TempFile("", "sd") if err != nil { diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 83f4002dda..c7b1eb5c41 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -263,10 +263,11 @@ The following meta labels are available on targets during relabeling: * `__meta_azure_machine_location`: the location the machine runs in * `__meta_azure_machine_name`: the machine name * `__meta_azure_machine_os_type`: the machine operating system +* `__meta_azure_machine_power_state`: the current power state of the machine * `__meta_azure_machine_private_ip`: the machine's private IP * `__meta_azure_machine_resource_group`: the machine's resource group -* `__meta_azure_machine_tag_`: each tag value of the machine * `__meta_azure_machine_scale_set`: the name of the scale set which the vm is part of (this value is only set if you are using a [scale set](https://docs.microsoft.com/en-us/azure/virtual-machine-scale-sets/)) +* `__meta_azure_machine_tag_`: each tag value of the machine See below for the configuration options for Azure discovery: @@ -407,6 +408,7 @@ The following meta labels are available on targets during [relabeling](#relabel_ * `__meta_ec2_owner_id`: the ID of the AWS account that owns the EC2 instance * `__meta_ec2_platform`: the Operating System platform, set to 'windows' on Windows servers, absent otherwise * `__meta_ec2_primary_subnet_id`: the subnet ID of the primary network interface, if available +* `__meta_ec2_private_dns_name`: the private DNS name of the instance, if available * `__meta_ec2_private_ip`: the private IP address of the instance, if present * `__meta_ec2_public_dns_name`: the public DNS name of the instance, if available * `__meta_ec2_public_ip`: the public IP address of the instance, if available diff --git a/docs/querying/api.md b/docs/querying/api.md index 82ce78699b..fdf93b6d0a 100644 --- a/docs/querying/api.md +++ b/docs/querying/api.md @@ -24,6 +24,10 @@ and one of the following HTTP response codes: Other non-`2xx` codes may be returned for errors occurring before the API endpoint is reached. +An array of warnings may be returned if there are errors that do +not inhibit the request execution. All of the data that was successfully +collected will be returned in the data field. + The JSON response envelope format is as follows: ``` @@ -34,7 +38,11 @@ The JSON response envelope format is as follows: // Only set if status is "error". The data field may still hold // additional data. "errorType": "", - "error": "" + "error": "", + + // Only if there were warnings while executing the request. + // There will still be data in the data field. + "warnings": [""] } ``` diff --git a/documentation/examples/custom-sd/adapter/adapter.go b/documentation/examples/custom-sd/adapter/adapter.go index 3d9e7afe13..4fd0ea9db4 100644 --- a/documentation/examples/custom-sd/adapter/adapter.go +++ b/documentation/examples/custom-sd/adapter/adapter.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" ) @@ -34,6 +35,15 @@ type customSD struct { Labels map[string]string `json:"labels"` } +func fingerprint(group *targetgroup.Group) model.Fingerprint { + groupFingerprint := model.LabelSet{}.Fingerprint() + for _, targets := range group.Targets { + groupFingerprint ^= targets.Fingerprint() + } + groupFingerprint ^= group.Labels.Fingerprint() + return groupFingerprint +} + // Adapter runs an unknown service discovery implementation and converts its target groups // to JSON and writes to a file for file_sd. type Adapter struct { @@ -57,13 +67,7 @@ func mapToArray(m map[string]*customSD) []customSD { func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD { groups := make(map[string]*customSD) for k, sdTargetGroups := range allTargetGroups { - for i, group := range sdTargetGroups { - - // There is no target, so no need to keep it. - if len(group.Targets) <= 0 { - continue - } - + for _, group := range sdTargetGroups { newTargets := make([]string, 0) newLabels := make(map[string]string) @@ -76,12 +80,16 @@ func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[s for name, value := range group.Labels { newLabels[string(name)] = string(value) } - // Make a unique key, including the current index, in case the sd_type (map key) and group.Source is not unique. - key := fmt.Sprintf("%s:%s:%d", k, group.Source, i) - groups[key] = &customSD{ + + sdGroup := customSD{ + Targets: newTargets, Labels: newLabels, } + // Make a unique key, including group's fingerprint, in case the sd_type (map key) and group.Source is not unique. + groupFingerprint := fingerprint(group) + key := fmt.Sprintf("%s:%s:%s", k, group.Source, groupFingerprint.String()) + groups[key] = &sdGroup } } diff --git a/documentation/examples/custom-sd/adapter/adapter_test.go b/documentation/examples/custom-sd/adapter/adapter_test.go index 812da09d06..7a97ef1cf6 100644 --- a/documentation/examples/custom-sd/adapter/adapter_test.go +++ b/documentation/examples/custom-sd/adapter/adapter_test.go @@ -41,7 +41,16 @@ func TestGenerateTargetGroups(t *testing.T) { }, }, }, - expectedCustomSD: map[string]*customSD{}, + expectedCustomSD: map[string]*customSD{ + "customSD:Consul:0000000000000000": { + Targets: []string{}, + Labels: map[string]string{}, + }, + "customSD:Kubernetes:0000000000000000": { + Targets: []string{}, + Labels: map[string]string{}, + }, + }, }, { title: "targetGroup filled", @@ -78,7 +87,7 @@ func TestGenerateTargetGroups(t *testing.T) { }, }, expectedCustomSD: map[string]*customSD{ - "customSD:Azure:0": { + "customSD:Azure:282a007a18fadbbb": { Targets: []string{ "host1", "host2", @@ -87,7 +96,7 @@ func TestGenerateTargetGroups(t *testing.T) { "__meta_test_label": "label_test_1", }, }, - "customSD:Openshift:1": { + "customSD:Openshift:281c007a18ea2ad0": { Targets: []string{ "host3", "host4", @@ -125,7 +134,7 @@ func TestGenerateTargetGroups(t *testing.T) { }, }, expectedCustomSD: map[string]*customSD{ - "customSD:GCE:0": { + "customSD:GCE:282a007a18fadbbb": { Targets: []string{ "host1", "host2", @@ -134,6 +143,12 @@ func TestGenerateTargetGroups(t *testing.T) { "__meta_test_label": "label_test_1", }, }, + "customSD:Kubernetes:282e007a18fad483": { + Targets: []string{}, + Labels: map[string]string{ + "__meta_test_label": "label_test_2", + }, + }, }, }, } diff --git a/promql/ast.go b/promql/ast.go index e55989bbb9..3ec7c5e9bf 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -111,8 +111,9 @@ type MatrixSelector struct { Offset time.Duration LabelMatchers []*labels.Matcher - // The series are populated at query preparation time. - series []storage.Series + // The unexpanded seriesSet populated at query preparation time. + unexpandedSeriesSet storage.SeriesSet + series []storage.Series } // NumberLiteral represents a number. @@ -144,8 +145,9 @@ type VectorSelector struct { Offset time.Duration LabelMatchers []*labels.Matcher - // The series are populated at query preparation time. - series []storage.Series + // The unexpanded seriesSet populated at query preparation time. + unexpandedSeriesSet storage.SeriesSet + series []storage.Series } func (e *AggregateExpr) Type() ValueType { return ValueTypeVector } diff --git a/promql/engine.go b/promql/engine.go index 1d6620500b..7dc1cc4a6a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -154,8 +154,8 @@ func (q *query) Exec(ctx context.Context) *Result { span.SetTag(queryTag, q.stmt.String()) } - res, err := q.ng.exec(ctx, q) - return &Result{Err: err, Value: res} + res, err, warnings := q.ng.exec(ctx, q) + return &Result{Err: err, Value: res, Warnings: warnings} } // contextDone returns an error if the context was canceled or timed out. @@ -332,7 +332,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. -func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { +func (ng *Engine) exec(ctx context.Context, q *query) (Value, error, storage.Warnings) { ng.metrics.currentQueries.Inc() defer ng.metrics.currentQueries.Dec() @@ -345,7 +345,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) if err := ng.gate.Start(ctx); err != nil { - return nil, contextErr(err, "query queue") + return nil, contextErr(err, "query queue"), nil } defer ng.gate.Done() @@ -361,14 +361,14 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { // The base context might already be canceled on the first iteration (e.g. during shutdown). if err := contextDone(ctx, env); err != nil { - return nil, err + return nil, err, nil } switch s := q.Statement().(type) { case *EvalStmt: return ng.execEvalStmt(ctx, q, s) case testStmt: - return nil, s(ctx) + return nil, s(ctx), nil } panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) @@ -383,9 +383,9 @@ func durationMilliseconds(d time.Duration) int64 { } // execEvalStmt evaluates the expression of an evaluation statement for the given time range. -func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) { +func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error, storage.Warnings) { prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) - querier, err := ng.populateSeries(ctxPrepare, query.queryable, s) + querier, err, warnings := ng.populateSeries(ctxPrepare, query.queryable, s) prepareSpanTimer.Finish() // XXX(fabxc): the querier returned by populateSeries might be instantiated @@ -396,7 +396,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } if err != nil { - return nil, err + return nil, err, warnings } evalSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) @@ -413,7 +413,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } val, err := evaluator.Eval(s.Expr) if err != nil { - return nil, err + return nil, err, warnings } evalSpanTimer.Finish() @@ -432,11 +432,11 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( // timestamp as that is when we ran the evaluation. vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}} } - return vector, nil + return vector, nil, warnings case ValueTypeScalar: - return Scalar{V: mat[0].Points[0].V, T: start}, nil + return Scalar{V: mat[0].Points[0].V, T: start}, nil, warnings case ValueTypeMatrix: - return mat, nil + return mat, nil, warnings default: panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) } @@ -454,7 +454,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } val, err := evaluator.Eval(s.Expr) if err != nil { - return nil, err + return nil, err, warnings } evalSpanTimer.Finish() @@ -465,7 +465,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( query.matrix = mat if err := contextDone(ctx, "expression evaluation"); err != nil { - return nil, err + return nil, err, warnings } // TODO(fabxc): order ensured by storage? @@ -474,10 +474,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( sort.Sort(mat) sortSpanTimer.Finish() - return mat, nil + return mat, nil, warnings } -func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { +func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error, storage.Warnings) { var maxOffset time.Duration Inspect(s.Expr, func(node Node, _ []Node) error { switch n := node.(type) { @@ -503,11 +503,14 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End)) if err != nil { - return nil, err + return nil, err, nil } + var warnings storage.Warnings + Inspect(s.Expr, func(node Node, path []Node) error { var set storage.SeriesSet + var wrn storage.Warnings params := &storage.SelectParams{ Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End), @@ -524,17 +527,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev params.End = params.End - offsetMilliseconds } - set, err = querier.Select(params, n.LabelMatchers...) + set, err, wrn = querier.Select(params, n.LabelMatchers...) + warnings = append(warnings, wrn...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return err } - n.series, err = expandSeriesSet(ctx, set) - if err != nil { - // TODO(fabxc): use multi-error. - level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) - return err - } + n.unexpandedSeriesSet = set case *MatrixSelector: params.Func = extractFuncFromPath(path) @@ -547,20 +546,17 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev params.End = params.End - offsetMilliseconds } - set, err = querier.Select(params, n.LabelMatchers...) + set, err, wrn = querier.Select(params, n.LabelMatchers...) + warnings = append(warnings, wrn...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return err } - n.series, err = expandSeriesSet(ctx, set) - if err != nil { - level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) - return err - } + n.unexpandedSeriesSet = set } return nil }) - return querier, err + return querier, err, warnings } // extractFuncFromPath walks up the path and searches for the first instance of @@ -582,6 +578,30 @@ func extractFuncFromPath(p []Node) string { return extractFuncFromPath(p[:len(p)-1]) } +func checkForSeriesSetExpansion(expr Expr, ctx context.Context) error { + switch e := expr.(type) { + case *MatrixSelector: + if e.series == nil { + series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet) + if err != nil { + panic(err) + } else { + e.series = series + } + } + case *VectorSelector: + if e.series == nil { + series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet) + if err != nil { + panic(err) + } else { + e.series = series + } + } + } + return nil +} + func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) { for it.Next() { select { @@ -887,6 +907,9 @@ func (ev *evaluator) eval(expr Expr) Value { } sel := e.Args[matrixArgIndex].(*MatrixSelector) + if err := checkForSeriesSetExpansion(sel, ev.ctx); err != nil { + ev.error(err) + } mat := make(Matrix, 0, len(sel.series)) // Output matrix. offset := durationMilliseconds(sel.Offset) selRange := durationMilliseconds(sel.Range) @@ -1018,6 +1041,9 @@ func (ev *evaluator) eval(expr Expr) Value { }) case *VectorSelector: + if err := checkForSeriesSetExpansion(e, ev.ctx); err != nil { + ev.error(err) + } mat := make(Matrix, 0, len(e.series)) it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) for i, s := range e.series { @@ -1058,6 +1084,10 @@ func (ev *evaluator) eval(expr Expr) Value { // vectorSelector evaluates a *VectorSelector expression. func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { + if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil { + ev.error(err) + } + var ( vec = make(Vector, 0, len(node.series)) ) @@ -1127,17 +1157,20 @@ func putPointSlice(p []Point) { // matrixSelector evaluates a *MatrixSelector expression. func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { + if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil { + ev.error(err) + } + var ( offset = durationMilliseconds(node.Offset) maxt = ev.startTimestamp - offset mint = maxt - durationMilliseconds(node.Range) matrix = make(Matrix, 0, len(node.series)) - err error ) it := storage.NewBuffer(durationMilliseconds(node.Range)) for i, s := range node.series { - if err = contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } it.Reset(s.Iterator()) diff --git a/promql/engine_test.go b/promql/engine_test.go index 61cdba430b..da7fcb41b9 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -169,8 +169,8 @@ type errQuerier struct { err error } -func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) { - return errSeriesSet{err: q.err}, q.err +func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) { + return errSeriesSet{err: q.err}, q.err, nil } func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil } func (*errQuerier) LabelNames() ([]string, error) { return nil, nil } @@ -425,7 +425,8 @@ load 10s MaxSamples: 1, Result: Result{ nil, - Scalar{V: 1, T: 1000}}, + Scalar{V: 1, T: 1000}, + nil}, Start: time.Unix(1, 0), }, { @@ -434,6 +435,7 @@ load 10s Result: Result{ ErrTooManySamples(env), nil, + nil, }, Start: time.Unix(1, 0), }, @@ -443,6 +445,7 @@ load 10s Result: Result{ ErrTooManySamples(env), nil, + nil, }, Start: time.Unix(1, 0), }, @@ -455,6 +458,7 @@ load 10s Sample{Point: Point{V: 1, T: 1000}, Metric: labels.FromStrings("__name__", "metric")}, }, + nil, }, Start: time.Unix(1, 0), }, @@ -467,6 +471,7 @@ load 10s Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}}, Metric: labels.FromStrings("__name__", "metric")}, }, + nil, }, Start: time.Unix(10, 0), }, @@ -476,6 +481,7 @@ load 10s Result: Result{ ErrTooManySamples(env), nil, + nil, }, Start: time.Unix(10, 0), }, @@ -489,6 +495,7 @@ load 10s Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, Metric: labels.FromStrings()}, }, + nil, }, Start: time.Unix(0, 0), End: time.Unix(2, 0), @@ -500,6 +507,7 @@ load 10s Result: Result{ ErrTooManySamples(env), nil, + nil, }, Start: time.Unix(0, 0), End: time.Unix(2, 0), @@ -514,6 +522,7 @@ load 10s Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, Metric: labels.FromStrings("__name__", "metric")}, }, + nil, }, Start: time.Unix(0, 0), End: time.Unix(2, 0), @@ -525,6 +534,7 @@ load 10s Result: Result{ ErrTooManySamples(env), nil, + nil, }, Start: time.Unix(0, 0), End: time.Unix(2, 0), @@ -539,6 +549,7 @@ load 10s Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}}, Metric: labels.FromStrings("__name__", "metric")}, }, + nil, }, Start: time.Unix(0, 0), End: time.Unix(10, 0), @@ -550,6 +561,7 @@ load 10s Result: Result{ ErrTooManySamples(env), nil, + nil, }, Start: time.Unix(0, 0), End: time.Unix(10, 0), diff --git a/promql/value.go b/promql/value.go index e63c8ce37f..548673e903 100644 --- a/promql/value.go +++ b/promql/value.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" ) // Value is a generic interface for values resulting from a query evaluation. @@ -201,8 +202,9 @@ func (m Matrix) ContainsSameLabelset() bool { // Result holds the resulting value of an execution or an error // if any occurred. type Result struct { - Err error - Value Value + Err error + Value Value + Warnings storage.Warnings } // Vector returns a Vector if the result value is one. An error is returned if diff --git a/rules/manager.go b/rules/manager.go index d654b6d57e..8aee9c3573 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -490,6 +490,7 @@ func (g *Group) RestoreForState(ts time.Time) { level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err) return } + defer q.Close() for _, rule := range g.Rules() { alertRule, ok := rule.(*AlertingRule) @@ -517,7 +518,7 @@ func (g *Group) RestoreForState(ts time.Time) { matchers = append(matchers, mt) } - sset, err := q.Select(nil, matchers...) + sset, err, _ := q.Select(nil, matchers...) if err != nil { level.Error(g.logger).Log("msg", "Failed to restore 'for' state", labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) diff --git a/rules/manager_test.go b/rules/manager_test.go index b52cd8ff23..365a4f51fa 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -538,7 +538,7 @@ func TestStaleness(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) - set, err := querier.Select(nil, matcher) + set, err, _ := querier.Select(nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) diff --git a/storage/fanout.go b/storage/fanout.go index af4fe1b327..1b464c4d5d 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -68,23 +68,23 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) queriers := make([]Querier, 0, 1+len(f.secondaries)) // Add primary querier - querier, err := f.primary.Querier(ctx, mint, maxt) + primaryQuerier, err := f.primary.Querier(ctx, mint, maxt) if err != nil { return nil, err } - queriers = append(queriers, querier) + queriers = append(queriers, primaryQuerier) // Add secondary queriers for _, storage := range f.secondaries { querier, err := storage.Querier(ctx, mint, maxt) if err != nil { - NewMergeQuerier(queriers).Close() + NewMergeQuerier(primaryQuerier, queriers).Close() return nil, err } queriers = append(queriers, querier) } - return NewMergeQuerier(queriers), nil + return NewMergeQuerier(primaryQuerier, queriers), nil } func (f *fanout) Appender() (Appender, error) { @@ -190,14 +190,18 @@ func (f *fanoutAppender) Rollback() (err error) { // mergeQuerier implements Querier. type mergeQuerier struct { - queriers []Querier + primaryQuerier Querier + queriers []Querier + + failedQueriers map[Querier]struct{} + setQuerierMap map[SeriesSet]Querier } // NewMergeQuerier returns a new Querier that merges results of input queriers. // NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it, // and will filter NoopQueriers from its arguments, in order to reduce overhead // when only one querier is passed. -func NewMergeQuerier(queriers []Querier) Querier { +func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { filtered := make([]Querier, 0, len(queriers)) for _, querier := range queriers { if querier != NoopQuerier() { @@ -205,6 +209,9 @@ func NewMergeQuerier(queriers []Querier) Querier { } } + setQuerierMap := make(map[SeriesSet]Querier) + failedQueriers := make(map[Querier]struct{}) + switch len(filtered) { case 0: return NoopQuerier() @@ -212,22 +219,37 @@ func NewMergeQuerier(queriers []Querier) Querier { return filtered[0] default: return &mergeQuerier{ - queriers: filtered, + primaryQuerier: primaryQuerier, + queriers: filtered, + failedQueriers: failedQueriers, + setQuerierMap: setQuerierMap, } } } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error) { +func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error, Warnings) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) + var warnings Warnings for _, querier := range q.queriers { - set, err := querier.Select(params, matchers...) + set, err, wrn := querier.Select(params, matchers...) + q.setQuerierMap[set] = querier + if wrn != nil { + warnings = append(warnings, wrn...) + } if err != nil { - return nil, err + q.failedQueriers[querier] = struct{}{} + // If the error source isn't the primary querier, return the error as a warning and continue. + if querier != q.primaryQuerier { + warnings = append(warnings, err) + continue + } else { + return nil, err, nil + } } seriesSets = append(seriesSets, set) } - return NewMergeSeriesSet(seriesSets), nil + return NewMergeSeriesSet(seriesSets, q), nil, warnings } // LabelValues returns all potential values for a label name. @@ -243,6 +265,11 @@ func (q *mergeQuerier) LabelValues(name string) ([]string, error) { return mergeStringSlices(results), nil } +func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool { + _, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]] + return isFailedQuerier +} + func mergeStringSlices(ss [][]string) []string { switch len(ss) { case 0: @@ -322,11 +349,13 @@ type mergeSeriesSet struct { currentSets []SeriesSet heap seriesSetHeap sets []SeriesSet + + querier *mergeQuerier } // NewMergeSeriesSet returns a new series set that merges (deduplicates) // series returned by the input series sets when iterating. -func NewMergeSeriesSet(sets []SeriesSet) SeriesSet { +func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet { if len(sets) == 1 { return sets[0] } @@ -335,34 +364,53 @@ func NewMergeSeriesSet(sets []SeriesSet) SeriesSet { // series under the cursor. var h seriesSetHeap for _, set := range sets { + if set == nil { + continue + } if set.Next() { heap.Push(&h, set) } } return &mergeSeriesSet{ - heap: h, - sets: sets, + heap: h, + sets: sets, + querier: querier, } } func (c *mergeSeriesSet) Next() bool { - // Firstly advance all the current series sets. If any of them have run out - // we can drop them, otherwise they should be inserted back into the heap. - for _, set := range c.currentSets { - if set.Next() { - heap.Push(&c.heap, set) + // Run in a loop because the "next" series sets may not be valid anymore. + // If a remote querier fails, we discard all series sets from that querier. + // If, for the current label set, all the next series sets come from + // failed remote storage sources, we want to keep trying with the next label set. + for { + // Firstly advance all the current series sets. If any of them have run out + // we can drop them, otherwise they should be inserted back into the heap. + for _, set := range c.currentSets { + if set.Next() { + heap.Push(&c.heap, set) + } + } + if len(c.heap) == 0 { + return false } - } - if len(c.heap) == 0 { - return false - } - // Now, pop items of the heap that have equal label sets. - c.currentSets = nil - c.currentLabels = c.heap[0].At().Labels() - for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { - set := heap.Pop(&c.heap).(SeriesSet) - c.currentSets = append(c.currentSets, set) + // Now, pop items of the heap that have equal label sets. + c.currentSets = nil + c.currentLabels = c.heap[0].At().Labels() + for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { + set := heap.Pop(&c.heap).(SeriesSet) + if c.querier != nil && c.querier.IsFailedSet(set) { + continue + } + c.currentSets = append(c.currentSets, set) + } + + // As long as the current set contains at least 1 set, + // then it should return true. + if len(c.currentSets) != 0 { + break + } } return true } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 3328673efb..cf637f5f18 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -109,7 +109,7 @@ func TestMergeSeriesSet(t *testing.T) { ), }, } { - merged := NewMergeSeriesSet(tc.input) + merged := NewMergeSeriesSet(tc.input, nil) for merged.Next() { require.True(t, tc.expected.Next()) actualSeries := merged.At() @@ -262,7 +262,7 @@ func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet { for i := 0; i < numSeriesSets; i++ { seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples)) } - return NewMergeSeriesSet(seriesSets) + return NewMergeSeriesSet(seriesSets, nil) } func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { diff --git a/storage/interface.go b/storage/interface.go index 45f97d8ed7..94ced86890 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,7 +52,7 @@ type Queryable interface { // Querier provides reading access to time series data. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) + Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings) // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, error) @@ -122,3 +122,5 @@ type SeriesIterator interface { // Err returns the current error. Err() error } + +type Warnings []error diff --git a/storage/noop.go b/storage/noop.go index a0ead036e6..8ff69acc2b 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -26,8 +26,8 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) { - return NoopSeriesSet(), nil +func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings) { + return NoopSeriesSet(), nil, nil } func (noopQuerier) LabelValues(name string) ([]string, error) { diff --git a/storage/remote/read.go b/storage/remote/read.go index 7d9652f721..ead43def4a 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -59,10 +59,10 @@ type querier struct { // Select implements storage.Querier and uses the given matchers to read series // sets from the Client. -func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) { query, err := ToQuery(q.mint, q.maxt, matchers, p) if err != nil { - return nil, err + return nil, err, nil } remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name()) @@ -71,10 +71,10 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) ( res, err := q.client.Read(q.ctx, query) if err != nil { - return nil, err + return nil, err, nil } - return FromQueryResult(res), nil + return FromQueryResult(res), nil, nil } // LabelValues implements storage.Querier and is a noop. @@ -117,13 +117,13 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { +func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) { m, added := q.addExternalLabels(matchers) - s, err := q.Querier.Select(p, m...) + s, err, warnings := q.Querier.Select(p, m...) if err != nil { - return nil, err + return nil, err, warnings } - return newSeriesSetFilter(s, added), nil + return newSeriesSetFilter(s, added), nil, warnings } // PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier @@ -170,7 +170,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { +func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { @@ -184,7 +184,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la } } if len(ms) > 0 { - return storage.NoopSeriesSet(), nil + return storage.NoopSeriesSet(), nil, nil } return q.Querier.Select(p, matchers...) } @@ -225,6 +225,15 @@ func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.S type seriesSetFilter struct { storage.SeriesSet toFilter model.LabelSet + querier storage.Querier +} + +func (ssf *seriesSetFilter) GetQuerier() storage.Querier { + return ssf.querier +} + +func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) { + ssf.querier = querier } func (ssf seriesSetFilter) At() storage.Series { diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 326cd8016c..00e2c7f09c 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -42,7 +42,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { externalLabels: model.LabelSet{"region": "europe"}, } want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - have, err := q.Select(nil, matchers...) + have, err, _ := q.Select(nil, matchers...) if err != nil { t.Error(err) } @@ -157,8 +157,8 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) { - return mockSeriesSet{}, nil +func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) { + return mockSeriesSet{}, nil, nil } func TestPreferLocalStorageFilter(t *testing.T) { @@ -313,7 +313,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - have, err := q.Select(nil, test.matchers...) + have, err, _ := q.Select(nil, test.matchers...) if err != nil { t.Error(err) } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 92d96c2840..a8a2a386cb 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -140,7 +140,7 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie } queriers = append(queriers, q) } - return storage.NewMergeQuerier(queriers), nil + return storage.NewMergeQuerier(nil, queriers), nil } // Close the background processing of the storage queues. diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index c0973342d8..4dccf3d89e 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -230,7 +230,7 @@ type querier struct { q tsdb.Querier } -func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) { ms := make([]tsdbLabels.Matcher, 0, len(oms)) for _, om := range oms { @@ -238,9 +238,9 @@ func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storag } set, err := q.q.Select(ms...) if err != nil { - return nil, err + return nil, err, nil } - return seriesSet{set: set}, nil + return seriesSet{set: set}, nil, nil } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index e02b62a7d7..0bd5096ff0 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -119,6 +119,14 @@ type response struct { Data interface{} `json:"data,omitempty"` ErrorType errorType `json:"errorType,omitempty"` Error string `json:"error,omitempty"` + Warnings []string `json:"warnings,omitempty"` +} + +type apiFuncResult struct { + data interface{} + err *apiError + warnings storage.Warnings + finalizer func() } // Enables cross-site script calls. @@ -128,7 +136,7 @@ func setCORS(w http.ResponseWriter) { } } -type apiFunc func(r *http.Request) (interface{}, *apiError, func()) +type apiFunc func(r *http.Request) apiFuncResult // TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations. type TSDBAdmin interface { @@ -204,16 +212,16 @@ func (api *API) Register(r *route.Router) { wrap := func(f apiFunc) http.HandlerFunc { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { setCORS(w) - data, err, finalizer := f(r) - if err != nil { - api.respondError(w, err, data) - } else if data != nil { - api.respond(w, data) + result := f(r) + if result.err != nil { + api.respondError(w, result.err, result.data) + } else if result.data != nil { + api.respond(w, result.data, result.warnings) } else { w.WriteHeader(http.StatusNoContent) } - if finalizer != nil { - finalizer() + if result.finalizer != nil { + result.finalizer() } }) return api.ready(httputil.CompressionHandler{ @@ -258,17 +266,17 @@ type queryData struct { Stats *stats.QueryStats `json:"stats,omitempty"` } -func (api *API) options(r *http.Request) (interface{}, *apiError, func()) { - return nil, nil, nil +func (api *API) options(r *http.Request) apiFuncResult { + return apiFuncResult{nil, nil, nil, nil} } -func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) query(r *http.Request) apiFuncResult { var ts time.Time if t := r.FormValue("time"); t != "" { var err error ts, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } } else { ts = api.now() @@ -279,7 +287,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -288,12 +296,12 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } res := qry.Exec(ctx) if res.Err != nil { - return nil, returnAPIError(res.Err), qry.Close + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } // Optional stats field in response if parameter "stats" is not empty. @@ -302,42 +310,42 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { qs = stats.NewQueryStats(qry.Stats()) } - return &queryData{ + return apiFuncResult{&queryData{ ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil, qry.Close + }, nil, res.Warnings, qry.Close} } -func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) queryRange(r *http.Request) apiFuncResult { start, err := parseTime(r.FormValue("start")) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } end, err := parseTime(r.FormValue("end")) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } if end.Before(start) { err := errors.New("end timestamp must not be before start time") - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } step, err := parseDuration(r.FormValue("step")) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } if step <= 0 { err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. if end.Sub(start)/step > 11000 { err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } ctx := r.Context() @@ -345,7 +353,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -354,12 +362,12 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } res := qry.Exec(ctx) if res.Err != nil { - return nil, returnAPIError(res.Err), qry.Close + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } // Optional stats field in response if parameter "stats" is not empty. @@ -368,11 +376,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { qs = stats.NewQueryStats(qry.Stats()) } - return &queryData{ + return apiFuncResult{&queryData{ ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil, qry.Close + }, nil, res.Warnings, qry.Close} } func returnAPIError(err error) *apiError { @@ -392,39 +400,39 @@ func returnAPIError(err error) *apiError { return &apiError{errorExec, err} } -func (api *API) labelNames(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) labelNames(r *http.Request) apiFuncResult { q, err := api.Queryable.Querier(r.Context(), math.MinInt64, math.MaxInt64) if err != nil { - return nil, &apiError{errorExec, err}, nil + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } defer q.Close() names, err := q.LabelNames() if err != nil { - return nil, &apiError{errorExec, err}, nil + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } - return names, nil, nil + return apiFuncResult{names, nil, nil, nil} } -func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) labelValues(r *http.Request) apiFuncResult { ctx := r.Context() name := route.Param(ctx, "name") if !model.LabelNameRE.MatchString(name) { - return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil, nil} } q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { - return nil, &apiError{errorExec, err}, nil + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } defer q.Close() vals, err := q.LabelValues(name) if err != nil { - return nil, &apiError{errorExec, err}, nil + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } - return vals, nil, nil + return apiFuncResult{vals, nil, nil, nil} } var ( @@ -432,12 +440,12 @@ var ( maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999) ) -func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) series(r *http.Request) apiFuncResult { if err := r.ParseForm(); err != nil { - return nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil, nil} } if len(r.Form["match[]"]) == 0 { - return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil, nil} } var start time.Time @@ -445,7 +453,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { var err error start, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } } else { start = minTime @@ -456,7 +464,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { var err error end, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } } else { end = maxTime @@ -466,40 +474,42 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } matcherSets = append(matcherSets, matchers) } q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { - return nil, &apiError{errorExec, err}, nil + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } defer q.Close() var sets []storage.SeriesSet + var warnings storage.Warnings for _, mset := range matcherSets { - s, err := q.Select(nil, mset...) + s, err, wrn := q.Select(nil, mset...) //TODO + warnings = append(warnings, wrn...) if err != nil { - return nil, &apiError{errorExec, err}, nil + return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} } sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets) + set := storage.NewMergeSeriesSet(sets, nil) metrics := []labels.Labels{} for set.Next() { metrics = append(metrics, set.At().Labels()) } if set.Err() != nil { - return nil, &apiError{errorExec, set.Err()}, nil + return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, nil} } - return metrics, nil, nil + return apiFuncResult{metrics, nil, warnings, nil} } -func (api *API) dropSeries(r *http.Request) (interface{}, *apiError, func()) { - return nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil +func (api *API) dropSeries(r *http.Request) apiFuncResult { + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil, nil} } // Target has the information for one target. @@ -528,7 +538,7 @@ type TargetDiscovery struct { DroppedTargets []*DroppedTarget `json:"droppedTargets"` } -func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) targets(r *http.Request) apiFuncResult { flatten := func(targets map[string][]*scrape.Target) []*scrape.Target { var n int keys := make([]string, 0, len(targets)) @@ -570,7 +580,7 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) { DiscoveredLabels: t.DiscoveredLabels().Map(), }) } - return res, nil, nil + return apiFuncResult{res, nil, nil, nil} } func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool { @@ -582,18 +592,18 @@ func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool { return true } -func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) targetMetadata(r *http.Request) apiFuncResult { limit := -1 if s := r.FormValue("limit"); s != "" { var err error if limit, err = strconv.Atoi(s); err != nil { - return nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}, nil, nil} } } matchers, err := promql.ParseMetricSelector(r.FormValue("match_target")) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } metric := r.FormValue("metric") @@ -633,9 +643,9 @@ func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func()) } } if len(res) == 0 { - return nil, &apiError{errorNotFound, errors.New("specified metadata not found")}, nil + return apiFuncResult{nil, &apiError{errorNotFound, errors.New("specified metadata not found")}, nil, nil} } - return res, nil, nil + return apiFuncResult{res, nil, nil, nil} } type metricMetadata struct { @@ -657,7 +667,7 @@ type AlertmanagerTarget struct { URL string `json:"url"` } -func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) alertmanagers(r *http.Request) apiFuncResult { urls := api.alertmanagerRetriever.Alertmanagers() droppedURLS := api.alertmanagerRetriever.DroppedAlertmanagers() ams := &AlertmanagerDiscovery{ActiveAlertmanagers: make([]*AlertmanagerTarget, len(urls)), DroppedAlertmanagers: make([]*AlertmanagerTarget, len(droppedURLS))} @@ -667,7 +677,7 @@ func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func()) for i, url := range droppedURLS { ams.DroppedAlertmanagers[i] = &AlertmanagerTarget{URL: url.String()} } - return ams, nil, nil + return apiFuncResult{ams, nil, nil, nil} } // AlertDiscovery has info for all active alerts. @@ -684,7 +694,7 @@ type Alert struct { Value float64 `json:"value"` } -func (api *API) alerts(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) alerts(r *http.Request) apiFuncResult { alertingRules := api.rulesRetriever.AlertingRules() alerts := []*Alert{} @@ -697,7 +707,7 @@ func (api *API) alerts(r *http.Request) (interface{}, *apiError, func()) { res := &AlertDiscovery{Alerts: alerts} - return res, nil, nil + return apiFuncResult{res, nil, nil, nil} } func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert { @@ -756,7 +766,7 @@ type recordingRule struct { Type string `json:"type"` } -func (api *API) rules(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) rules(r *http.Request) apiFuncResult { ruleGroups := api.rulesRetriever.RuleGroups() res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))} for i, grp := range ruleGroups { @@ -799,29 +809,29 @@ func (api *API) rules(r *http.Request) (interface{}, *apiError, func()) { } default: err := fmt.Errorf("failed to assert type of rule '%v'", rule.Name()) - return nil, &apiError{errorInternal, err}, nil + return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule) } res.RuleGroups[i] = apiRuleGroup } - return res, nil, nil + return apiFuncResult{res, nil, nil, nil} } type prometheusConfig struct { YAML string `json:"yaml"` } -func (api *API) serveConfig(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) serveConfig(r *http.Request) apiFuncResult { cfg := &prometheusConfig{ YAML: api.config().String(), } - return cfg, nil, nil + return apiFuncResult{cfg, nil, nil, nil} } -func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) { - return api.flagsMap, nil, nil +func (api *API) serveFlags(r *http.Request) apiFuncResult { + return apiFuncResult{api.flagsMap, nil, nil, nil} } func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { @@ -873,7 +883,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } - set, err := querier.Select(selectParams, filteredMatchers...) + set, err, _ := querier.Select(selectParams, filteredMatchers...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -911,20 +921,20 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } -func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil + return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil} } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil + return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} } if err := r.ParseForm(); err != nil { - return nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil, nil} } if len(r.Form["match[]"]) == 0 { - return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil, nil} } var start time.Time @@ -932,7 +942,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { var err error start, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } } else { start = minTime @@ -943,7 +953,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { var err error end, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } } else { end = maxTime @@ -952,7 +962,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, &apiError{errorBadData, err}, nil + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } var selector tsdbLabels.Selector @@ -961,16 +971,16 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { } if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), selector...); err != nil { - return nil, &apiError{errorInternal, err}, nil + return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } } - return nil, nil, nil + return apiFuncResult{nil, nil, nil, nil} } -func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) snapshot(r *http.Request) apiFuncResult { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil + return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil} } var ( skipHead bool @@ -979,13 +989,13 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) { if r.FormValue("skip_head") != "" { skipHead, err = strconv.ParseBool(r.FormValue("skip_head")) if err != nil { - return nil, &apiError{errorBadData, fmt.Errorf("unable to parse boolean 'skip_head' argument: %v", err)}, nil + return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("unable to parse boolean 'skip_head' argument: %v", err)}, nil, nil} } } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil + return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} } var ( @@ -996,31 +1006,31 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) { dir = filepath.Join(snapdir, name) ) if err := os.MkdirAll(dir, 0777); err != nil { - return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil, nil} } if err := db.Snapshot(dir, !skipHead); err != nil { - return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil, nil} } - return struct { + return apiFuncResult{struct { Name string `json:"name"` - }{name}, nil, nil + }{name}, nil, nil, nil} } -func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError, func()) { +func (api *API) cleanTombstones(r *http.Request) apiFuncResult { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil + return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil} } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil + return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} } if err := db.CleanTombstones(); err != nil { - return nil, &apiError{errorInternal, err}, nil + return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } - return nil, nil, nil + return apiFuncResult{nil, nil, nil, nil} } func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { @@ -1075,11 +1085,17 @@ func mergeLabels(primary, secondary []*prompb.Label) []*prompb.Label { return result } -func (api *API) respond(w http.ResponseWriter, data interface{}) { +func (api *API) respond(w http.ResponseWriter, data interface{}, warnings storage.Warnings) { + statusMessage := statusSuccess + var warningStrings []string + for _, warning := range warnings { + warningStrings = append(warningStrings, warning.Error()) + } json := jsoniter.ConfigCompatibleWithStandardLibrary b, err := json.Marshal(&response{ - Status: statusSuccess, - Data: data, + Status: statusMessage, + Data: data, + Warnings: warningStrings, }) if err != nil { level.Error(api.logger).Log("msg", "error marshaling json response", "err", err) @@ -1134,6 +1150,7 @@ func (api *API) respondError(w http.ResponseWriter, apiErr *apiError, data inter func parseTime(s string) (time.Time, error) { if t, err := strconv.ParseFloat(s, 64); err == nil { s, ns := math.Modf(t) + ns = math.Round(ns*1000) / 1000 return time.Unix(int64(s), int64(ns*float64(time.Second))), nil } if t, err := time.Parse(time.RFC3339Nano, s); err == nil { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index f0b35db9ff..bdc6c2d64f 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -349,9 +349,9 @@ func TestLabelNames(t *testing.T) { ctx := context.Background() req, err := request(method) testutil.Ok(t, err) - resp, apiErr, _ := api.labelNames(req.WithContext(ctx)) - assertAPIError(t, apiErr, "") - assertAPIResponse(t, resp, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"}) + res := api.labelNames(req.WithContext(ctx)) + assertAPIError(t, res.err, "") + assertAPIResponse(t, res.data, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"}) } } @@ -379,7 +379,7 @@ func setupRemote(s storage.Storage) *httptest.Server { } defer querier.Close() - set, err := querier.Select(selectParams, matchers...) + set, err, _ := querier.Select(selectParams, matchers...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -857,9 +857,9 @@ func testEndpoints(t *testing.T, api *API, testLabelAPI bool) { if err != nil { t.Fatal(err) } - resp, apiErr, _ := test.endpoint(req.WithContext(ctx)) - assertAPIError(t, apiErr, test.errType) - assertAPIResponse(t, resp, test.response) + res := test.endpoint(req.WithContext(ctx)) + assertAPIError(t, res.err, test.errType) + assertAPIResponse(t, res.data, test.response) } } } @@ -1202,8 +1202,8 @@ func TestAdminEndpoints(t *testing.T) { if err != nil { t.Fatalf("Error when creating test request: %s", err) } - _, apiErr, _ := endpoint(req) - assertAPIError(t, apiErr, tc.errType) + res := endpoint(req) + assertAPIError(t, res.err, tc.errType) }) } } @@ -1211,7 +1211,7 @@ func TestAdminEndpoints(t *testing.T) { func TestRespondSuccess(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { api := API{} - api.respond(w, "test") + api.respond(w, "test", nil) })) defer s.Close() @@ -1318,6 +1318,10 @@ func TestParseTime(t *testing.T) { }, { input: "2015-06-03T14:21:58.555+01:00", result: ts, + }, { + // Test float rounding. + input: "1543578564.705", + result: time.Unix(1543578564, 705*1e6), }, } @@ -1502,7 +1506,7 @@ func TestRespond(t *testing.T) { for _, c := range cases { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { api := API{} - api.respond(w, c.response) + api.respond(w, c.response, nil) })) defer s.Close() @@ -1543,6 +1547,6 @@ func BenchmarkRespond(b *testing.B) { b.ResetTimer() api := API{} for n := 0; n < b.N; n++ { - api.respond(&testResponseWriter, response) + api.respond(&testResponseWriter, response, nil) } } diff --git a/web/federate.go b/web/federate.go index e34136317a..5dad7d4142 100644 --- a/web/federate.go +++ b/web/federate.go @@ -37,6 +37,10 @@ var ( Name: "prometheus_web_federation_errors_total", Help: "Total number of errors that occurred while sending federation responses.", }) + federationWarnings = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_web_federation_warnings_total", + Help: "Total number of warnings that occurred while sending federation responses.", + }) ) func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { @@ -83,7 +87,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(params, mset...) + s, err, wrns := q.Select(params, mset...) + if wrns != nil { + level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns) + federationErrors.Add(float64(len(wrns))) + } if err != nil { federationErrors.Inc() http.Error(w, err.Error(), http.StatusInternalServerError) @@ -92,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets) + set := storage.NewMergeSeriesSet(sets, nil) it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6)) for set.Next() { s := set.At() diff --git a/web/web.go b/web/web.go index f4336cfb22..8cc63a7f4e 100644 --- a/web/web.go +++ b/web/web.go @@ -55,7 +55,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/scrape" @@ -191,6 +190,12 @@ type Options struct { RemoteReadConcurrencyLimit int } +func instrumentHandlerWithPrefix(prefix string) func(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + return func(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + return instrumentHandler(prefix+handlerName, handler) + } +} + func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { return promhttp.InstrumentHandlerDuration( requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), @@ -459,7 +464,7 @@ func (h *Handler) Run(ctx context.Context) error { mux := http.NewServeMux() mux.Handle("/", h.router) - av1 := route.New().WithInstrumentation(instrumentHandler) + av1 := route.New().WithInstrumentation(instrumentHandlerWithPrefix("/api/v1")) h.apiV1.Register(av1) apiPath := "/api" if h.options.RoutePrefix != "/" { @@ -703,7 +708,8 @@ func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { tps := h.scrapeManager.TargetsActive() for _, targets := range tps { sort.Slice(targets, func(i, j int) bool { - return targets[i].Labels().Get(labels.InstanceName) < targets[j].Labels().Get(labels.InstanceName) + return targets[i].Labels().Get(model.JobLabel) < targets[j].Labels().Get(model.JobLabel) || + targets[i].Labels().Get(model.InstanceLabel) < targets[j].Labels().Get(model.InstanceLabel) }) }