Merge branch 'prometheus:main' into makefile-check-runtime-versions-target

This commit is contained in:
Pratham Jagga 2024-10-13 18:41:21 +05:30 committed by GitHub
commit 0ccbacc303
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 990 additions and 1606 deletions

View file

@ -233,21 +233,21 @@ type azureClient struct {
var _ client = &azureClient{}
// createAzureClient is a helper function for creating an Azure compute client to ARM.
func createAzureClient(cfg SDConfig, logger *slog.Logger) (client, error) {
cloudConfiguration, err := CloudConfigurationFromName(cfg.Environment)
// createAzureClient is a helper method for creating an Azure compute client to ARM.
func (d *Discovery) createAzureClient() (client, error) {
cloudConfiguration, err := CloudConfigurationFromName(d.cfg.Environment)
if err != nil {
return &azureClient{}, err
}
var c azureClient
c.logger = logger
c.logger = d.logger
telemetry := policy.TelemetryOptions{
ApplicationID: userAgent,
}
credential, err := newCredential(cfg, policy.ClientOptions{
credential, err := newCredential(*d.cfg, policy.ClientOptions{
Cloud: cloudConfiguration,
Telemetry: telemetry,
})
@ -255,7 +255,7 @@ func createAzureClient(cfg SDConfig, logger *slog.Logger) (client, error) {
return &azureClient{}, err
}
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "azure_sd")
client, err := config_util.NewClientFromConfig(d.cfg.HTTPClientConfig, "azure_sd")
if err != nil {
return &azureClient{}, err
}
@ -267,22 +267,22 @@ func createAzureClient(cfg SDConfig, logger *slog.Logger) (client, error) {
},
}
c.vm, err = armcompute.NewVirtualMachinesClient(cfg.SubscriptionID, credential, options)
c.vm, err = armcompute.NewVirtualMachinesClient(d.cfg.SubscriptionID, credential, options)
if err != nil {
return &azureClient{}, err
}
c.nic, err = armnetwork.NewInterfacesClient(cfg.SubscriptionID, credential, options)
c.nic, err = armnetwork.NewInterfacesClient(d.cfg.SubscriptionID, credential, options)
if err != nil {
return &azureClient{}, err
}
c.vmss, err = armcompute.NewVirtualMachineScaleSetsClient(cfg.SubscriptionID, credential, options)
c.vmss, err = armcompute.NewVirtualMachineScaleSetsClient(d.cfg.SubscriptionID, credential, options)
if err != nil {
return &azureClient{}, err
}
c.vmssvm, err = armcompute.NewVirtualMachineScaleSetVMsClient(cfg.SubscriptionID, credential, options)
c.vmssvm, err = armcompute.NewVirtualMachineScaleSetVMsClient(d.cfg.SubscriptionID, credential, options)
if err != nil {
return &azureClient{}, err
}
@ -350,15 +350,7 @@ func newAzureResourceFromID(id string, logger *slog.Logger) (*arm.ResourceID, er
return resourceID, nil
}
func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
defer d.logger.Debug("Azure discovery completed")
client, err := createAzureClient(*d.cfg, d.logger)
if err != nil {
d.metrics.failuresCount.Inc()
return nil, fmt.Errorf("could not create Azure client: %w", err)
}
func (d *Discovery) refreshAzureClient(ctx context.Context, client client) ([]*targetgroup.Group, error) {
machines, err := client.getVMs(ctx, d.cfg.ResourceGroup)
if err != nil {
d.metrics.failuresCount.Inc()
@ -418,6 +410,18 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
return []*targetgroup.Group{&tg}, nil
}
func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
defer d.logger.Debug("Azure discovery completed")
client, err := d.createAzureClient()
if err != nil {
d.metrics.failuresCount.Inc()
return nil, fmt.Errorf("could not create Azure client: %w", err)
}
return d.refreshAzureClient(ctx, client)
}
func (d *Discovery) vmToLabelSet(ctx context.Context, client client, vm virtualMachine) (model.LabelSet, error) {
r, err := newAzureResourceFromID(vm.ID, d.logger)
if err != nil {

View file

@ -15,19 +15,34 @@ package azure
import (
"context"
"fmt"
"log/slog"
"net/http"
"slices"
"strings"
"testing"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
azfake "github.com/Azure/azure-sdk-for-go/sdk/azcore/fake"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
fake "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5/fake"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4"
fakenetwork "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4/fake"
cache "github.com/Code-Hex/go-generics-cache"
"github.com/Code-Hex/go-generics-cache/policy/lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
const defaultMockNetworkID string = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkInterfaces/{networkInterfaceName}"
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("github.com/Code-Hex/go-generics-cache.(*janitor).run.func1"),
@ -96,13 +111,12 @@ func TestVMToLabelSet(t *testing.T) {
vmType := "type"
location := "westeurope"
computerName := "computer_name"
networkID := "/subscriptions/00000000-0000-0000-0000-000000000000/network1"
ipAddress := "10.20.30.40"
primary := true
networkProfile := armcompute.NetworkProfile{
NetworkInterfaces: []*armcompute.NetworkInterfaceReference{
{
ID: &networkID,
ID: to.Ptr(defaultMockNetworkID),
Properties: &armcompute.NetworkInterfaceReferenceProperties{Primary: &primary},
},
},
@ -139,7 +153,7 @@ func TestVMToLabelSet(t *testing.T) {
Location: location,
OsType: "Linux",
Tags: map[string]*string{},
NetworkInterfaces: []string{networkID},
NetworkInterfaces: []string{defaultMockNetworkID},
Size: size,
}
@ -154,7 +168,8 @@ func TestVMToLabelSet(t *testing.T) {
cache: cache.New(cache.AsLRU[string, *armnetwork.Interface](lru.WithCapacity(5))),
}
network := armnetwork.Interface{
Name: &networkID,
Name: to.Ptr(defaultMockNetworkID),
ID: to.Ptr(defaultMockNetworkID),
Properties: &armnetwork.InterfacePropertiesFormat{
Primary: &primary,
IPConfigurations: []*armnetwork.InterfaceIPConfiguration{
@ -164,9 +179,9 @@ func TestVMToLabelSet(t *testing.T) {
},
},
}
client := &mockAzureClient{
networkInterface: &network,
}
client := createMockAzureClient(t, nil, nil, nil, network, nil)
labelSet, err := d.vmToLabelSet(context.Background(), client, actualVM)
require.NoError(t, err)
require.Len(t, labelSet, 11)
@ -475,34 +490,372 @@ func TestNewAzureResourceFromID(t *testing.T) {
}
}
func TestAzureRefresh(t *testing.T) {
tests := []struct {
scenario string
vmResp []armcompute.VirtualMachinesClientListAllResponse
vmssResp []armcompute.VirtualMachineScaleSetsClientListAllResponse
vmssvmResp []armcompute.VirtualMachineScaleSetVMsClientListResponse
interfacesResp armnetwork.Interface
expectedTG []*targetgroup.Group
}{
{
scenario: "VMs, VMSS and VMSSVMs in Multiple Responses",
vmResp: []armcompute.VirtualMachinesClientListAllResponse{
{
VirtualMachineListResult: armcompute.VirtualMachineListResult{
Value: []*armcompute.VirtualMachine{
defaultVMWithIDAndName(to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm1"), to.Ptr("vm1")),
defaultVMWithIDAndName(to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm2"), to.Ptr("vm2")),
},
},
},
{
VirtualMachineListResult: armcompute.VirtualMachineListResult{
Value: []*armcompute.VirtualMachine{
defaultVMWithIDAndName(to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm3"), to.Ptr("vm3")),
defaultVMWithIDAndName(to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm4"), to.Ptr("vm4")),
},
},
},
},
vmssResp: []armcompute.VirtualMachineScaleSetsClientListAllResponse{
{
VirtualMachineScaleSetListWithLinkResult: armcompute.VirtualMachineScaleSetListWithLinkResult{
Value: []*armcompute.VirtualMachineScaleSet{
{
ID: to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachineScaleSets/vmScaleSet1"),
Name: to.Ptr("vmScaleSet1"),
Location: to.Ptr("australiaeast"),
Type: to.Ptr("Microsoft.Compute/virtualMachineScaleSets"),
},
},
},
},
},
vmssvmResp: []armcompute.VirtualMachineScaleSetVMsClientListResponse{
{
VirtualMachineScaleSetVMListResult: armcompute.VirtualMachineScaleSetVMListResult{
Value: []*armcompute.VirtualMachineScaleSetVM{
defaultVMSSVMWithIDAndName(to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachineScaleSets/vmScaleSet1/virtualMachines/vmScaleSet1_vm1"), to.Ptr("vmScaleSet1_vm1")),
defaultVMSSVMWithIDAndName(to.Ptr("/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachineScaleSets/vmScaleSet1/virtualMachines/vmScaleSet1_vm2"), to.Ptr("vmScaleSet1_vm2")),
},
},
},
},
interfacesResp: armnetwork.Interface{
ID: to.Ptr(defaultMockNetworkID),
Properties: &armnetwork.InterfacePropertiesFormat{
Primary: to.Ptr(true),
IPConfigurations: []*armnetwork.InterfaceIPConfiguration{
{Properties: &armnetwork.InterfaceIPConfigurationPropertiesFormat{
PrivateIPAddress: to.Ptr("10.0.0.1"),
}},
},
},
},
expectedTG: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "10.0.0.1:80",
"__meta_azure_machine_computer_name": "computer_name",
"__meta_azure_machine_id": "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm1",
"__meta_azure_machine_location": "australiaeast",
"__meta_azure_machine_name": "vm1",
"__meta_azure_machine_os_type": "Linux",
"__meta_azure_machine_private_ip": "10.0.0.1",
"__meta_azure_machine_resource_group": "{resourceGroup}",
"__meta_azure_machine_size": "size",
"__meta_azure_machine_tag_prometheus": "",
"__meta_azure_subscription_id": "",
"__meta_azure_tenant_id": "",
},
{
"__address__": "10.0.0.1:80",
"__meta_azure_machine_computer_name": "computer_name",
"__meta_azure_machine_id": "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm2",
"__meta_azure_machine_location": "australiaeast",
"__meta_azure_machine_name": "vm2",
"__meta_azure_machine_os_type": "Linux",
"__meta_azure_machine_private_ip": "10.0.0.1",
"__meta_azure_machine_resource_group": "{resourceGroup}",
"__meta_azure_machine_size": "size",
"__meta_azure_machine_tag_prometheus": "",
"__meta_azure_subscription_id": "",
"__meta_azure_tenant_id": "",
},
{
"__address__": "10.0.0.1:80",
"__meta_azure_machine_computer_name": "computer_name",
"__meta_azure_machine_id": "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm3",
"__meta_azure_machine_location": "australiaeast",
"__meta_azure_machine_name": "vm3",
"__meta_azure_machine_os_type": "Linux",
"__meta_azure_machine_private_ip": "10.0.0.1",
"__meta_azure_machine_resource_group": "{resourceGroup}",
"__meta_azure_machine_size": "size",
"__meta_azure_machine_tag_prometheus": "",
"__meta_azure_subscription_id": "",
"__meta_azure_tenant_id": "",
},
{
"__address__": "10.0.0.1:80",
"__meta_azure_machine_computer_name": "computer_name",
"__meta_azure_machine_id": "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/vm4",
"__meta_azure_machine_location": "australiaeast",
"__meta_azure_machine_name": "vm4",
"__meta_azure_machine_os_type": "Linux",
"__meta_azure_machine_private_ip": "10.0.0.1",
"__meta_azure_machine_resource_group": "{resourceGroup}",
"__meta_azure_machine_size": "size",
"__meta_azure_machine_tag_prometheus": "",
"__meta_azure_subscription_id": "",
"__meta_azure_tenant_id": "",
},
{
"__address__": "10.0.0.1:80",
"__meta_azure_machine_computer_name": "computer_name",
"__meta_azure_machine_id": "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachineScaleSets/vmScaleSet1/virtualMachines/vmScaleSet1_vm1",
"__meta_azure_machine_location": "australiaeast",
"__meta_azure_machine_name": "vmScaleSet1_vm1",
"__meta_azure_machine_os_type": "Linux",
"__meta_azure_machine_private_ip": "10.0.0.1",
"__meta_azure_machine_resource_group": "{resourceGroup}",
"__meta_azure_machine_scale_set": "vmScaleSet1",
"__meta_azure_machine_size": "size",
"__meta_azure_machine_tag_prometheus": "",
"__meta_azure_subscription_id": "",
"__meta_azure_tenant_id": "",
},
{
"__address__": "10.0.0.1:80",
"__meta_azure_machine_computer_name": "computer_name",
"__meta_azure_machine_id": "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachineScaleSets/vmScaleSet1/virtualMachines/vmScaleSet1_vm2",
"__meta_azure_machine_location": "australiaeast",
"__meta_azure_machine_name": "vmScaleSet1_vm2",
"__meta_azure_machine_os_type": "Linux",
"__meta_azure_machine_private_ip": "10.0.0.1",
"__meta_azure_machine_resource_group": "{resourceGroup}",
"__meta_azure_machine_scale_set": "vmScaleSet1",
"__meta_azure_machine_size": "size",
"__meta_azure_machine_tag_prometheus": "",
"__meta_azure_subscription_id": "",
"__meta_azure_tenant_id": "",
},
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.scenario, func(t *testing.T) {
t.Parallel()
azureSDConfig := &DefaultSDConfig
azureClient := createMockAzureClient(t, tc.vmResp, tc.vmssResp, tc.vmssvmResp, tc.interfacesResp, nil)
reg := prometheus.NewRegistry()
refreshMetrics := discovery.NewRefreshMetrics(reg)
metrics := azureSDConfig.NewDiscovererMetrics(reg, refreshMetrics)
sd, err := NewDiscovery(azureSDConfig, nil, metrics)
require.NoError(t, err)
tg, err := sd.refreshAzureClient(context.Background(), azureClient)
require.NoError(t, err)
sortTargetsByID(tg[0].Targets)
require.Equal(t, tc.expectedTG, tg)
})
}
}
type mockAzureClient struct {
networkInterface *armnetwork.Interface
azureClient
}
var _ client = &mockAzureClient{}
func createMockAzureClient(t *testing.T, vmResp []armcompute.VirtualMachinesClientListAllResponse, vmssResp []armcompute.VirtualMachineScaleSetsClientListAllResponse, vmssvmResp []armcompute.VirtualMachineScaleSetVMsClientListResponse, interfaceResp armnetwork.Interface, logger *slog.Logger) client {
t.Helper()
mockVMServer := defaultMockVMServer(vmResp)
mockVMSSServer := defaultMockVMSSServer(vmssResp)
mockVMScaleSetVMServer := defaultMockVMSSVMServer(vmssvmResp)
mockInterfaceServer := defaultMockInterfaceServer(interfaceResp)
func (*mockAzureClient) getVMs(ctx context.Context, resourceGroup string) ([]virtualMachine, error) {
return nil, nil
}
vmClient, err := armcompute.NewVirtualMachinesClient("fake-subscription-id", &azfake.TokenCredential{}, &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: fake.NewVirtualMachinesServerTransport(&mockVMServer),
},
})
require.NoError(t, err)
func (*mockAzureClient) getScaleSets(ctx context.Context, resourceGroup string) ([]armcompute.VirtualMachineScaleSet, error) {
return nil, nil
}
vmssClient, err := armcompute.NewVirtualMachineScaleSetsClient("fake-subscription-id", &azfake.TokenCredential{}, &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: fake.NewVirtualMachineScaleSetsServerTransport(&mockVMSSServer),
},
})
require.NoError(t, err)
func (*mockAzureClient) getScaleSetVMs(ctx context.Context, scaleSet armcompute.VirtualMachineScaleSet) ([]virtualMachine, error) {
return nil, nil
}
vmssvmClient, err := armcompute.NewVirtualMachineScaleSetVMsClient("fake-subscription-id", &azfake.TokenCredential{}, &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: fake.NewVirtualMachineScaleSetVMsServerTransport(&mockVMScaleSetVMServer),
},
})
require.NoError(t, err)
func (m *mockAzureClient) getVMNetworkInterfaceByID(ctx context.Context, networkInterfaceID string) (*armnetwork.Interface, error) {
if networkInterfaceID == "" {
return nil, fmt.Errorf("parameter networkInterfaceID cannot be empty")
interfacesClient, err := armnetwork.NewInterfacesClient("fake-subscription-id", &azfake.TokenCredential{}, &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: fakenetwork.NewInterfacesServerTransport(&mockInterfaceServer),
},
})
require.NoError(t, err)
return &mockAzureClient{
azureClient: azureClient{
vm: vmClient,
vmss: vmssClient,
vmssvm: vmssvmClient,
nic: interfacesClient,
logger: logger,
},
}
return m.networkInterface, nil
}
func (m *mockAzureClient) getVMScaleSetVMNetworkInterfaceByID(ctx context.Context, networkInterfaceID, scaleSetName, instanceID string) (*armnetwork.Interface, error) {
if scaleSetName == "" {
return nil, fmt.Errorf("parameter virtualMachineScaleSetName cannot be empty")
func defaultMockInterfaceServer(interfaceResp armnetwork.Interface) fakenetwork.InterfacesServer {
return fakenetwork.InterfacesServer{
Get: func(ctx context.Context, resourceGroupName, networkInterfaceName string, options *armnetwork.InterfacesClientGetOptions) (resp azfake.Responder[armnetwork.InterfacesClientGetResponse], errResp azfake.ErrorResponder) {
resp.SetResponse(http.StatusOK, armnetwork.InterfacesClientGetResponse{Interface: interfaceResp}, nil)
return
},
GetVirtualMachineScaleSetNetworkInterface: func(ctx context.Context, resourceGroupName, virtualMachineScaleSetName, virtualmachineIndex, networkInterfaceName string, options *armnetwork.InterfacesClientGetVirtualMachineScaleSetNetworkInterfaceOptions) (resp azfake.Responder[armnetwork.InterfacesClientGetVirtualMachineScaleSetNetworkInterfaceResponse], errResp azfake.ErrorResponder) {
resp.SetResponse(http.StatusOK, armnetwork.InterfacesClientGetVirtualMachineScaleSetNetworkInterfaceResponse{Interface: interfaceResp}, nil)
return
},
}
return m.networkInterface, nil
}
func defaultMockVMServer(vmResp []armcompute.VirtualMachinesClientListAllResponse) fake.VirtualMachinesServer {
return fake.VirtualMachinesServer{
NewListAllPager: func(options *armcompute.VirtualMachinesClientListAllOptions) (resp azfake.PagerResponder[armcompute.VirtualMachinesClientListAllResponse]) {
for _, page := range vmResp {
resp.AddPage(http.StatusOK, page, nil)
}
return
},
}
}
func defaultMockVMSSServer(vmssResp []armcompute.VirtualMachineScaleSetsClientListAllResponse) fake.VirtualMachineScaleSetsServer {
return fake.VirtualMachineScaleSetsServer{
NewListAllPager: func(options *armcompute.VirtualMachineScaleSetsClientListAllOptions) (resp azfake.PagerResponder[armcompute.VirtualMachineScaleSetsClientListAllResponse]) {
for _, page := range vmssResp {
resp.AddPage(http.StatusOK, page, nil)
}
return
},
}
}
func defaultMockVMSSVMServer(vmssvmResp []armcompute.VirtualMachineScaleSetVMsClientListResponse) fake.VirtualMachineScaleSetVMsServer {
return fake.VirtualMachineScaleSetVMsServer{
NewListPager: func(resourceGroupName, virtualMachineScaleSetName string, options *armcompute.VirtualMachineScaleSetVMsClientListOptions) (resp azfake.PagerResponder[armcompute.VirtualMachineScaleSetVMsClientListResponse]) {
for _, page := range vmssvmResp {
resp.AddPage(http.StatusOK, page, nil)
}
return
},
}
}
func defaultVMWithIDAndName(id, name *string) *armcompute.VirtualMachine {
vmSize := armcompute.VirtualMachineSizeTypes("size")
osType := armcompute.OperatingSystemTypesLinux
defaultID := "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachine/testVM"
defaultName := "testVM"
if id == nil {
id = &defaultID
}
if name == nil {
name = &defaultName
}
return &armcompute.VirtualMachine{
ID: id,
Name: name,
Type: to.Ptr("Microsoft.Compute/virtualMachines"),
Location: to.Ptr("australiaeast"),
Properties: &armcompute.VirtualMachineProperties{
OSProfile: &armcompute.OSProfile{
ComputerName: to.Ptr("computer_name"),
},
StorageProfile: &armcompute.StorageProfile{
OSDisk: &armcompute.OSDisk{
OSType: &osType,
},
},
NetworkProfile: &armcompute.NetworkProfile{
NetworkInterfaces: []*armcompute.NetworkInterfaceReference{
{
ID: to.Ptr(defaultMockNetworkID),
},
},
},
HardwareProfile: &armcompute.HardwareProfile{
VMSize: &vmSize,
},
},
Tags: map[string]*string{
"prometheus": new(string),
},
}
}
func defaultVMSSVMWithIDAndName(id, name *string) *armcompute.VirtualMachineScaleSetVM {
vmSize := armcompute.VirtualMachineSizeTypes("size")
osType := armcompute.OperatingSystemTypesLinux
defaultID := "/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualMachineScaleSets/testVMScaleSet/virtualMachines/testVM"
defaultName := "testVM"
if id == nil {
id = &defaultID
}
if name == nil {
name = &defaultName
}
return &armcompute.VirtualMachineScaleSetVM{
ID: id,
Name: name,
Type: to.Ptr("Microsoft.Compute/virtualMachines"),
InstanceID: to.Ptr("123"),
Location: to.Ptr("australiaeast"),
Properties: &armcompute.VirtualMachineScaleSetVMProperties{
OSProfile: &armcompute.OSProfile{
ComputerName: to.Ptr("computer_name"),
},
StorageProfile: &armcompute.StorageProfile{
OSDisk: &armcompute.OSDisk{
OSType: &osType,
},
},
NetworkProfile: &armcompute.NetworkProfile{
NetworkInterfaces: []*armcompute.NetworkInterfaceReference{
{ID: to.Ptr(defaultMockNetworkID)},
},
},
HardwareProfile: &armcompute.HardwareProfile{
VMSize: &vmSize,
},
},
Tags: map[string]*string{
"prometheus": new(string),
},
}
}
func sortTargetsByID(targets []model.LabelSet) {
slices.SortFunc(targets, func(a, b model.LabelSet) int {
return strings.Compare(string(a["__meta_azure_machine_id"]), string(b["__meta_azure_machine_id"]))
})
}

File diff suppressed because it is too large Load diff

8
go.mod
View file

@ -18,7 +18,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/dennwc/varint v1.0.0
github.com/digitalocean/godo v1.126.0
github.com/docker/docker v27.2.0+incompatible
github.com/docker/docker v27.3.1+incompatible
github.com/edsrzf/mmap-go v1.1.0
github.com/envoyproxy/go-control-plane v0.13.0
github.com/envoyproxy/protoc-gen-validate v1.1.0
@ -84,9 +84,9 @@ require (
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.0
k8s.io/apimachinery v0.31.0
k8s.io/client-go v0.31.0
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.130.1
)

16
go.sum
View file

@ -127,8 +127,8 @@ github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/docker/docker v27.2.0+incompatible h1:Rk9nIVdfH3+Vz4cyI/uhbINhEZ/oLmc+CBXmH6fbNk4=
github.com/docker/docker v27.2.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI=
github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
@ -985,12 +985,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.31.0 h1:b9LiSjR2ym/SzTOlfMHm1tr7/21aD7fSkqgD/CVJBCo=
k8s.io/api v0.31.0/go.mod h1:0YiFF+JfFxMM6+1hQei8FY8M7s1Mth+z/q7eF1aJkTE=
k8s.io/apimachinery v0.31.0 h1:m9jOiSr3FoSSL5WO9bjm1n6B9KROYYgNZOb4tyZ1lBc=
k8s.io/apimachinery v0.31.0/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.0 h1:QqEJzNjbN2Yv1H79SsS+SWnXkBgVu4Pj3CJQgbx0gI8=
k8s.io/client-go v0.31.0/go.mod h1:Y9wvC76g4fLjmU0BA+rV+h2cncoadjvjjkkIGoTLcGU=
k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU=
k8s.io/api v0.31.1/go.mod h1:sbN1g6eY6XVLeqNsZGLnI5FwVseTrZX7Fv3O26rhAaI=
k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U=
k8s.io/apimachinery v0.31.1/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.1 h1:f0ugtWSbWpxHR7sjVpQwuvw9a3ZKLXX0u0itkFXufb0=
k8s.io/client-go v0.31.1/go.mod h1:sKI8871MJN2OyeqRlmA4W4KM9KBdBUpDLu/43eGemCg=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=

View file

@ -17,6 +17,7 @@
package textparse
import (
"bytes"
"errors"
"fmt"
"io"
@ -24,6 +25,7 @@ import (
"strings"
"unicode/utf8"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
@ -75,6 +77,7 @@ type OpenMetricsParser struct {
l *openMetricsLexer
builder labels.ScratchBuilder
series []byte
mfNameLen int // length of metric family name to get from series.
text []byte
mtype model.MetricType
val float64
@ -98,9 +101,9 @@ type OpenMetricsParser struct {
// Created timestamp parsing state.
ct int64
ctHashSet uint64
// visitedName is the metric name of the last visited metric when peeking ahead
// visitedMFName is the metric family name of the last visited metric when peeking ahead
// for _created series during the execution of the CreatedTimestamp method.
visitedName string
visitedMFName []byte
skipCTSeries bool
}
@ -260,25 +263,24 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
if !typeRequiresCT(p.mtype) {
// Not a CT supported metric type, fast path.
p.ct = 0
p.visitedName = ""
p.ctHashSet = 0
p.ctHashSet = 0 // Use ctHashSet as a single way of telling "empty cache"
return nil
}
var (
currLset labels.Labels
buf []byte
peekWithoutNameLsetHash uint64
currName []byte
)
p.Metric(&currLset)
currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
currName := currLset.Get(model.MetricNameLabel)
currName = findBaseMetricName(currName)
if len(p.series) > 1 && p.series[0] == '{' && p.series[1] == '"' {
// special case for UTF-8 encoded metric family names.
currName = p.series[p.offsets[0]-p.start : p.mfNameLen+2]
} else {
currName = p.series[p.offsets[0]-p.start : p.mfNameLen]
}
// make sure we're on a new metric before returning
if currName == p.visitedName && currFamilyLsetHash == p.ctHashSet && p.visitedName != "" && p.ctHashSet > 0 && p.ct > 0 {
// CT is already known, fast path.
currHash := p.seriesHash(&buf, currName)
// Check cache, perhaps we fetched something already.
if currHash == p.ctHashSet && p.ct > 0 {
return &p.ct
}
@ -309,17 +311,15 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
return nil
}
var peekedLset labels.Labels
p.Metric(&peekedLset)
peekedName := peekedLset.Get(model.MetricNameLabel)
if !strings.HasSuffix(peekedName, "_created") {
peekedName := p.series[p.offsets[0]-p.start : p.offsets[1]-p.start]
if len(peekedName) < 8 || string(peekedName[len(peekedName)-8:]) != "_created" {
// Not a CT line, search more.
continue
}
// We got a CT line here, but let's search if CT line is actually for our series, edge case.
peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
if peekWithoutNameLsetHash != currFamilyLsetHash {
// Remove _created suffix.
peekedHash := p.seriesHash(&buf, peekedName[:len(peekedName)-8])
if peekedHash != currHash {
// Found CT line for a different series, for our series no CT.
p.resetCTParseValues(resetLexer)
return nil
@ -328,44 +328,63 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
// All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps
ct := int64(p.val * 1000.0)
p.setCTParseValues(ct, currFamilyLsetHash, currName, true, resetLexer)
p.setCTParseValues(ct, currHash, currName, true, resetLexer)
return &ct
}
}
var (
leBytes = []byte{108, 101}
quantileBytes = []byte{113, 117, 97, 110, 116, 105, 108, 101}
)
// seriesHash generates a hash based on the metric family name and the offsets
// of label names and values from the parsed OpenMetrics data. It skips quantile
// and le labels for summaries and histograms respectively.
func (p *OpenMetricsParser) seriesHash(offsetsArr *[]byte, metricFamilyName []byte) uint64 {
// Iterate through p.offsets to find the label names and values.
for i := 2; i < len(p.offsets); i += 4 {
lStart := p.offsets[i] - p.start
lEnd := p.offsets[i+1] - p.start
label := p.series[lStart:lEnd]
// Skip quantile and le labels for summaries and histograms.
if p.mtype == model.MetricTypeSummary && bytes.Equal(label, quantileBytes) {
continue
}
if p.mtype == model.MetricTypeHistogram && bytes.Equal(label, leBytes) {
continue
}
*offsetsArr = append(*offsetsArr, p.series[lStart:lEnd]...)
vStart := p.offsets[i+2] - p.start
vEnd := p.offsets[i+3] - p.start
*offsetsArr = append(*offsetsArr, p.series[vStart:vEnd]...)
}
*offsetsArr = append(*offsetsArr, metricFamilyName...)
hashedOffsets := xxhash.Sum64(*offsetsArr)
// Reset the offsets array for later reuse.
*offsetsArr = (*offsetsArr)[:0]
return hashedOffsets
}
// setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found.
// This is useful to prevent re-parsing the same series again and early return the CT value.
func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, visitedName string, skipCTSeries bool, resetLexer *openMetricsLexer) {
func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, mfName []byte, skipCTSeries bool, resetLexer *openMetricsLexer) {
p.ct = ct
p.l = resetLexer
p.ctHashSet = ctHashSet
p.visitedName = visitedName
p.skipCTSeries = skipCTSeries
p.visitedMFName = mfName
p.skipCTSeries = skipCTSeries // Do we need to set it?
}
// resetCtParseValues resets the parser to the state before CreatedTimestamp method was called.
func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) {
p.l = resetLexer
p.ct = 0
p.ctHashSet = 0
p.visitedName = ""
p.skipCTSeries = true
}
// findBaseMetricName returns the metric name without reserved suffixes such as "_created",
// "_sum", etc. based on the OpenMetrics specification found at
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md.
// If no suffix is found, the original name is returned.
func findBaseMetricName(name string) string {
suffixes := []string{"_created", "_count", "_sum", "_bucket", "_total", "_gcount", "_gsum", "_info"}
for _, suffix := range suffixes {
if strings.HasSuffix(name, suffix) {
return strings.TrimSuffix(name, suffix)
}
}
return name
}
// typeRequiresCT returns true if the metric type requires a _created timestamp.
func typeRequiresCT(t model.MetricType) bool {
switch t {
@ -419,6 +438,7 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
mStart++
mEnd--
}
p.mfNameLen = mEnd - mStart
p.offsets = append(p.offsets, mStart, mEnd)
default:
return EntryInvalid, p.parseError("expected metric name after "+t.String(), t2)

View file

@ -14,6 +14,7 @@
package textparse
import (
"fmt"
"io"
"testing"
@ -71,6 +72,8 @@ foo_total 17.0 1520879607.789 # {id="counter-test"} 5
foo_created 1520872607.123
foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5
foo_created{a="b"} 1520872607.123
foo_total{le="c"} 21.0
foo_created{le="c"} 1520872621.123
# HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far
# TYPE bar summary
bar_count 17.0
@ -294,6 +297,11 @@ foobar{quantile="0.99"} 150.1`
{Labels: labels.FromStrings("id", "counter-test"), Value: 5},
},
ct: int64p(1520872607123),
}, {
m: `foo_total{le="c"}`,
v: 21.0,
lset: labels.FromStrings("__name__", "foo_total", "le", "c"),
ct: int64p(1520872621123),
}, {
m: "bar",
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
@ -820,7 +828,7 @@ func TestOpenMetricsParseErrors(t *testing.T) {
for err == nil {
_, err = p.Next()
}
require.EqualError(t, err, c.err, "test %d: %s", i, c.input)
require.Equal(t, c.err, err.Error(), "test %d: %s", i, c.input)
}
}
@ -899,42 +907,97 @@ func TestOMNullByteHandling(t *testing.T) {
// current OM spec limitations or clients with broken OM format.
// TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
func TestCTParseFailures(t *testing.T) {
input := `# HELP thing Histogram with _created as first line
for _, tcase := range []struct {
name string
input string
expected []parsedEntry
}{
{
name: "_created line is a first one",
input: `# HELP thing histogram with _created as first line
# TYPE thing histogram
thing_created 1520872607.123
thing_count 17
thing_sum 324789.3
thing_bucket{le="0.0"} 0
thing_bucket{le="+Inf"} 17`
input += "\n# EOF\n"
exp := []parsedEntry{
thing_bucket{le="+Inf"} 17
# HELP thing_c counter with _created as first line
# TYPE thing_c counter
thing_c_created 1520872607.123
thing_c_total 14123.232
# EOF
`,
expected: []parsedEntry{
{
m: "thing",
help: "Histogram with _created as first line",
}, {
help: "histogram with _created as first line",
},
{
m: "thing",
typ: model.MetricTypeHistogram,
}, {
},
{
m: `thing_count`,
ct: nil, // Should be int64p(1520872607123).
}, {
},
{
m: `thing_sum`,
ct: nil, // Should be int64p(1520872607123).
}, {
},
{
m: `thing_bucket{le="0.0"}`,
ct: nil, // Should be int64p(1520872607123).
}, {
},
{
m: `thing_bucket{le="+Inf"}`,
ct: nil, // Should be int64p(1520872607123),
},
}
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
{
m: "thing_c",
help: "counter with _created as first line",
},
{
m: "thing_c",
typ: model.MetricTypeCounter,
},
{
m: `thing_c_total`,
ct: nil, // Should be int64p(1520872607123).
},
},
},
{
// TODO(bwplotka): Kind of correct bevaviour? If yes, let's move to the OK tests above.
name: "maybe counter with no meta",
input: `foo_total 17.0
foo_created 1520872607.123
foo_total{a="b"} 17.0
foo_created{a="b"} 1520872608.123
# EOF
`,
expected: []parsedEntry{
{
m: `foo_total`,
},
{
m: `foo_created`,
},
{
m: `foo_total{a="b"}`,
},
{
m: `foo_created{a="b"}`,
},
},
},
} {
t.Run(fmt.Sprintf("case=%v", tcase.name), func(t *testing.T) {
p := NewOpenMetricsParser([]byte(tcase.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
got := testParse(t, p)
resetValAndLset(got) // Keep this test focused on metric, basic entries and CT only.
requireEntries(t, exp, got)
requireEntries(t, tcase.expected, got)
})
}
}
func resetValAndLset(e []parsedEntry) {

View file

@ -16,6 +16,8 @@ package notifier
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"io"
@ -35,6 +37,7 @@ import (
"github.com/prometheus/common/sigv4"
"github.com/prometheus/common/version"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
@ -257,6 +260,16 @@ func (n *Manager) ApplyConfig(conf *config.Config) error {
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
amSets := make(map[string]*alertmanagerSet)
// configToAlertmanagers maps alertmanager sets for each unique AlertmanagerConfig,
// helping to avoid dropping known alertmanagers and re-use them without waiting for SD updates when applying the config.
configToAlertmanagers := make(map[string]*alertmanagerSet, len(n.alertmanagers))
for _, oldAmSet := range n.alertmanagers {
hash, err := oldAmSet.configHash()
if err != nil {
return err
}
configToAlertmanagers[hash] = oldAmSet
}
for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() {
ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics)
@ -264,6 +277,16 @@ func (n *Manager) ApplyConfig(conf *config.Config) error {
return err
}
hash, err := ams.configHash()
if err != nil {
return err
}
if oldAmSet, ok := configToAlertmanagers[hash]; ok {
ams.ams = oldAmSet.ams
ams.droppedAms = oldAmSet.droppedAms
}
amSets[k] = ams
}
@ -803,6 +826,15 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) {
}
}
func (s *alertmanagerSet) configHash() (string, error) {
b, err := yaml.Marshal(s.cfg)
if err != nil {
return "", err
}
hash := md5.Sum(b)
return hex.EncodeToString(hash[:]), nil
}
func postPath(pre string, v config.AlertmanagerAPIVersion) string {
alertPushEndpoint := fmt.Sprintf("/api/%v/alerts", string(v))
return path.Join("/", pre, alertPushEndpoint)

View file

@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/config"
_ "github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
@ -1017,3 +1018,107 @@ func TestStop_DrainingEnabled(t *testing.T) {
require.Equal(t, int64(2), alertsReceived.Load())
}
func TestApplyConfig(t *testing.T) {
targetURL := "alertmanager:9093"
targetGroup := &targetgroup.Group{
Targets: []model.LabelSet{
{
"__address__": model.LabelValue(targetURL),
},
},
}
alertmanagerURL := fmt.Sprintf("http://%s/api/v2/alerts", targetURL)
n := NewManager(&Options{}, nil)
cfg := &config.Config{}
s := `
alerting:
alertmanagers:
- file_sd_configs:
- files:
- foo.json
`
// 1. Ensure known alertmanagers are not dropped during ApplyConfig.
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1)
// First, apply the config and reload.
require.NoError(t, n.ApplyConfig(cfg))
tgs := map[string][]*targetgroup.Group{"config-0": {targetGroup}}
n.reload(tgs)
require.Len(t, n.Alertmanagers(), 1)
require.Equal(t, alertmanagerURL, n.Alertmanagers()[0].String())
// Reapply the config.
require.NoError(t, n.ApplyConfig(cfg))
// Ensure the known alertmanagers are not dropped.
require.Len(t, n.Alertmanagers(), 1)
require.Equal(t, alertmanagerURL, n.Alertmanagers()[0].String())
// 2. Ensure known alertmanagers are not dropped during ApplyConfig even when
// the config order changes.
s = `
alerting:
alertmanagers:
- static_configs:
- file_sd_configs:
- files:
- foo.json
`
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2)
require.NoError(t, n.ApplyConfig(cfg))
require.Len(t, n.Alertmanagers(), 1)
// Ensure no unnecessary alertmanagers are injected.
require.Empty(t, n.alertmanagers["config-0"].ams)
// Ensure the config order is taken into account.
ams := n.alertmanagers["config-1"].ams
require.Len(t, ams, 1)
require.Equal(t, alertmanagerURL, ams[0].url().String())
// 3. Ensure known alertmanagers are reused for new config with identical AlertmanagerConfig.
s = `
alerting:
alertmanagers:
- file_sd_configs:
- files:
- foo.json
- file_sd_configs:
- files:
- foo.json
`
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2)
require.NoError(t, n.ApplyConfig(cfg))
require.Len(t, n.Alertmanagers(), 2)
for cfgIdx := range 2 {
ams := n.alertmanagers[fmt.Sprintf("config-%d", cfgIdx)].ams
require.Len(t, ams, 1)
require.Equal(t, alertmanagerURL, ams[0].url().String())
}
// 4. Ensure known alertmanagers are reused only for identical AlertmanagerConfig.
s = `
alerting:
alertmanagers:
- file_sd_configs:
- files:
- foo.json
path_prefix: /bar
- file_sd_configs:
- files:
- foo.json
relabel_configs:
- source_labels: ['__address__']
regex: 'doesntmatter:1234'
action: drop
`
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2)
require.NoError(t, n.ApplyConfig(cfg))
require.Empty(t, n.Alertmanagers())
}

View file

@ -1557,7 +1557,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
metadataChanged bool
)
exemplars := make([]exemplar.Exemplar, 1)
exemplars := make([]exemplar.Exemplar, 0, 1)
// updateMetadata updates the current iteration's metadata object and the
// metadataChanged value if we have metadata in the scrape cache AND the

View file

@ -29,14 +29,15 @@ in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
# Chunk
```
┌───────────────┬───────────────────┬─────────────┬────────────────┐
│ len <uvarint> │ encoding <1 byte> │ data <bytes> │ CRC32 <4 byte>
└───────────────┴───────────────────┴─────────────┴────────────────┘
┌───────────────┬───────────────────┬─────────────┬────────────────┐
│ len <uvarint> │ encoding <1 byte> │ data <data> │ CRC32 <4 byte>
└───────────────┴───────────────────┴─────────────┴────────────────┘
```
Notes:
* `<uvarint>` has 1 to 10 bytes.
* `encoding`: Currently either `XOR`, `histogram`, or `float histogram`.
* `encoding`: Currently either `XOR`, `histogram`, or `floathistogram`, see
[code for numerical values](https://github.com/prometheus/prometheus/blob/02d0de9987ad99dee5de21853715954fadb3239f/tsdb/chunkenc/chunk.go#L28-L47).
* `data`: See below for each encoding.
## XOR chunk data
@ -67,9 +68,9 @@ Notes:
## Histogram chunk data
```
┌──────────────────────┬──────────────────────────┬───────────────────────────────┬─────────────────────┬──────────────────┬──────────────────┬────────────────┬──────────────────┐
│ num_samples <uint16> │ histogram_flags <1 byte> │ zero_threshold <1 or 9 bytes> │ schema <varbit_int> │ pos_spans <data> │ neg_spans <data> │ samples <data> │ padding <x bits>
└──────────────────────┴──────────────────────────┴───────────────────────────────┴─────────────────────┴──────────────────┴──────────────────┴────────────────┴──────────────────┘
┌──────────────────────┬──────────────────────────┬───────────────────────────────┬─────────────────────┬──────────────────┬──────────────────┬──────────────────────┬────────────────┬──────────────────┐
│ num_samples <uint16> │ histogram_flags <1 byte> │ zero_threshold <1 or 9 bytes> │ schema <varbit_int> │ pos_spans <data> │ neg_spans <data>custom_values <data>samples <data> │ padding <x bits>
└──────────────────────┴──────────────────────────┴───────────────────────────────┴─────────────────────┴──────────────────┴──────────────────┴──────────────────────┴────────────────┴──────────────────┘
```
### Positive and negative spans data:
@ -80,6 +81,16 @@ Notes:
└─────────────────────────┴────────────────────────┴───────────────────────┴────────────────────────┴───────────────────────┴─────┴────────────────────────┴───────────────────────┘
```
### Custom values data:
The `custom_values` data is currently only used for schema -53 (custom bucket boundaries). For other schemas, it is empty (length of zero).
```
┌──────────────────────────┬──────────────────┬──────────────────┬─────┬──────────────────┐
│ num_values <varbit_uint> │ value_0 <custom> │ value_1 <custom> │ ... │ value_n <custom>
└──────────────────────────┴─────────────────────────────────────┴─────┴──────────────────┘
```
### Samples data:
```
@ -131,7 +142,9 @@ Notes:
* If 0, it is a single zero byte.
* If a power of two between 2^-243 and 2^10, it is a single byte between 1 and 254.
* Otherwise, it is a byte with all bits set (255), followed by a float64, resulting in 9 bytes length.
* `schema` is a specific value defined by the exposition format. Currently valid values are -4 <= n <= 8.
* `schema` is a specific value defined by the exposition format. Currently
valid values are either -4 <= n <= 8 (standard exponential schemas) or -53
(custom bucket boundaries).
* `<varbit_int>` is a variable bitwidth encoding for signed integers, optimized for “delta of deltas” of bucket deltas. It has between 1 bit and 9 bytes.
See [code for details](https://github.com/prometheus/prometheus/blob/8c1507ebaa4ca552958ffb60c2d1b21afb7150e4/tsdb/chunkenc/varbit.go#L31-L60).
* `<varbit_uint>` is a variable bitwidth encoding for unsigned integers with the same bit-bucketing as `<varbit_int>`.
@ -143,6 +156,28 @@ Notes:
* The chunk can have as few as one sample, i.e. sample 1 and following are optional.
* Similarly, there could be down to zero spans and down to zero buckets.
The `<custom>` encoding within the custom values data depends on the schema.
For schema -53 (custom bucket boundaries, currently the only use case for
custom values), the values to encode are bucket boundaries in the form of
floats. The encoding of a given float value _x_ works as follows:
1. Create an intermediate value _y_ = _x_ * 1000.
2. If 0 ≤ _y_ ≤ 33554430 _and_ if the decimal value of _y_ is integer, store
_y_ + 1 as `<varbit_uint>`.
3. Otherwise, store a 0 bit, followed by the 64 bit of the original _x_
encoded as plain `<float64>`.
Note that values stored as per (2) will always start with a 1 bit, which allow
decoders to recognize this case in contrast to values stores as per (3), which
always start with a 0 bit.
The rational behind this encoding is that most custom bucket boundaries are set
by humans as decimal numbers with not very many decimal places. In most cases,
the encoding will therefore result in a short varbit representation. The upper
bound of 33554430 is picked so that the varbit encoded value will take at most
4 bytes.
## Float histogram chunk data
Float histograms have the same layout as histograms apart from the encoding of samples.

View file

@ -36,7 +36,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
defer h.Close()
for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4)))
}
}
@ -54,8 +54,8 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := count.Inc()
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))))
i := int(count.Inc())
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4)))
}
})
}
@ -75,7 +75,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
defer h.Close()
for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4)))
}
}

View file

@ -392,13 +392,14 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {
p.mtx.Unlock()
}
func appendWithExponentialGrowth[T any](a []T, v T) []T {
func appendWithExponentialGrowth[T any](a []T, v T) (_ []T, copied bool) {
if cap(a) < len(a)+1 {
newList := make([]T, len(a), len(a)*2+1)
copy(newList, a)
a = newList
copied = true
}
return append(a, v)
return append(a, v), copied
}
func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
@ -407,16 +408,26 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm = map[string][]storage.SeriesRef{}
p.m[l.Name] = nm
}
list := appendWithExponentialGrowth(nm[l.Value], id)
list, copied := appendWithExponentialGrowth(nm[l.Value], id)
nm[l.Value] = list
if !p.ordered {
// Return if it shouldn't be ordered, if it only has one element or if it's already ordered.
// The invariant is that the first n-1 items in the list are already sorted.
if !p.ordered || len(list) == 1 || list[len(list)-1] >= list[len(list)-2] {
return
}
// There is no guarantee that no higher ID was inserted before as they may
// be generated independently before adding them to postings.
// We repair order violations on insert. The invariant is that the first n-1
// items in the list are already sorted.
if !copied {
// We have appended to the existing slice,
// and readers may already have a copy of this postings slice,
// so we need to copy it before sorting.
old := list
list = make([]storage.SeriesRef, len(old), cap(old))
copy(list, old)
nm[l.Value] = list
}
// Repair order violations.
for i := len(list) - 1; i >= 1; i-- {
if list[i] >= list[i-1] {
break

View file

@ -1507,3 +1507,58 @@ func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Error(t, p.Err())
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}
func TestMemPostings_Unordered_Add_Get(t *testing.T) {
mp := NewMemPostings()
for ref := storage.SeriesRef(1); ref < 8; ref += 2 {
// First, add next series.
next := ref + 1
mp.Add(next, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(next))))
nextPostings := mp.Get(labels.MetricName, "test")
// Now add current ref.
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
// Next postings should still reference the next series.
nextExpanded, err := ExpandPostings(nextPostings)
require.NoError(t, err)
require.Len(t, nextExpanded, int(ref))
require.Equal(t, next, nextExpanded[len(nextExpanded)-1])
}
}
func TestMemPostings_Concurrent_Add_Get(t *testing.T) {
refs := make(chan storage.SeriesRef)
wg := sync.WaitGroup{}
wg.Add(1)
t.Cleanup(wg.Wait)
t.Cleanup(func() { close(refs) })
mp := NewMemPostings()
go func() {
defer wg.Done()
for ref := range refs {
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
p := mp.Get(labels.MetricName, "test")
_, err := ExpandPostings(p)
if err != nil {
t.Errorf("unexpected error: %s", err)
}
}
}()
for ref := storage.SeriesRef(1); ref < 8; ref += 2 {
// Add next ref in another goroutine so they would race.
refs <- ref + 1
// Add current ref here
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
// We don't read the value of the postings here,
// this is tested in TestMemPostings_Unordered_Add_Get where it's easier to achieve the determinism.
// This test just checks that there's no data race.
p := mp.Get(labels.MetricName, "test")
_, err := ExpandPostings(p)
require.NoError(t, err)
}
}