prometheus/storage/remote/queue_manager_test.go
Dieter Plaetinck cda025b5b5
TSDB: demistify SeriesRefs and ChunkRefs (#9536)
* TSDB: demistify seriesRefs and ChunkRefs

The TSDB package contains many types of series and chunk references,
all shrouded in uint types.  Often the same uint value may
actually mean one of different types, in non-obvious ways.

This PR aims to clarify the code and help navigating to relevant docs,
usage, etc much quicker.

Concretely:

* Use appropriately named types and document their semantics and
  relations.
* Make multiplexing and demuxing of types explicit
  (on the boundaries between concrete implementations and generic
  interfaces).
* Casting between different types should be free.  None of the changes
  should have any impact on how the code runs.

TODO: Implement BlockSeriesRef where appropriate (for a future PR)

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* feedback

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* agent: demistify seriesRefs and ChunkRefs

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>
2021-11-06 15:40:04 +05:30

959 lines
29 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
const defaultFlushDeadline = 1 * time.Minute
func newHighestTimestampMetric() *maxTimestamp {
return &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
}
}
func TestSampleDelivery(t *testing.T) {
testcases := []struct {
name string
samples bool
exemplars bool
}{
{samples: true, exemplars: false, name: "samples only"},
{samples: true, exemplars: true, name: "both samples and exemplars"},
{samples: false, exemplars: true, name: "exemplars only"},
}
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := 3
dir, err := ioutil.TempDir("", "TestSampleDelivery")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
writeConfig := config.DefaultRemoteWriteConfig
// We need to set URL's so that metric creation doesn't panic.
writeConfig.URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
writeConfig.QueueConfig = queueConfig
writeConfig.SendExemplars = true
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
&writeConfig,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var (
series []record.RefSeries
samples []record.RefSample
exemplars []record.RefExemplar
)
// Generates same series in both cases.
if tc.samples {
samples, series = createTimeseries(n, n)
}
if tc.exemplars {
exemplars, series = createExemplars(n, n)
}
// Apply new config.
queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples) / 2
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient()
qm.SetClient(c)
qm.StoreSeries(series, 0)
// Send first half of data.
c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series)
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
c.waitForExpectedData(t)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series)
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
c.waitForExpectedData(t)
})
}
}
func TestMetadataDelivery(t *testing.T) {
c := NewTestWriteClient()
dir, err := ioutil.TempDir("", "TestMetadataDelivery")
require.NoError(t, err)
defer os.RemoveAll(dir)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.Start()
defer m.Stop()
metadata := []scrape.MetricMetadata{}
numMetadata := 1532
for i := 0; i < numMetadata; i++ {
metadata = append(metadata, scrape.MetricMetadata{
Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i),
Type: textparse.MetricTypeCounter,
Help: "a nice help text",
Unit: "",
})
}
m.AppendMetadata(context.Background(), metadata)
require.Equal(t, numMetadata, len(c.receivedMetadata))
// One more write than the rounded qoutient should be performed in order to get samples that didn't
// fit into MaxSamplesPerSend.
require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived)
// Make sure the last samples were sent.
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
}
func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
samples, series := createTimeseries(n, n)
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
}
func TestSampleDeliveryOrder(t *testing.T) {
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
samples := make([]record.RefSample, 0, n)
series := make([]record.RefSeries, 0, n)
for i := 0; i < n; i++ {
name := fmt.Sprintf("test_metric_%d", i%ts)
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(i),
V: float64(i),
})
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.Labels{labels.Label{Name: "__name__", Value: name}},
})
}
c := NewTestWriteClient()
c.expectSamples(samples, series)
dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(samples)
c.waitForExpectedData(t)
}
func TestShutdown(t *testing.T) {
deadline := 1 * time.Second
c := NewTestBlockedWriteClient()
dir, err := ioutil.TempDir("", "TestShutdown")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
m.Start()
// Append blocks to guarantee delivery, so we do it in the background.
go func() {
m.Append(samples)
}()
time.Sleep(100 * time.Millisecond)
// Test to ensure that Stop doesn't block.
start := time.Now()
m.Stop()
// The samples will never be delivered, so duration should
// be at least equal to deadline, otherwise the flush deadline
// was not respected.
duration := time.Since(start)
if duration > deadline+(deadline/10) {
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
}
if duration < deadline {
t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline)
}
}
func TestSeriesReset(t *testing.T) {
c := NewTestBlockedWriteClient()
deadline := 5 * time.Second
numSegments := 4
numSeries := 25
dir, err := ioutil.TempDir("", "TestSeriesReset")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.Labels{{Name: "a", Value: "a"}}})
}
m.StoreSeries(series, i)
}
require.Equal(t, numSegments*numSeries, len(m.seriesLabels))
m.SeriesReset(2)
require.Equal(t, numSegments*numSeries/2, len(m.seriesLabels))
}
func TestReshard(t *testing.T) {
size := 10 // Make bigger to find more races.
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
c := NewTestWriteClient()
c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
dir, err := ioutil.TempDir("", "TestReshard")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
go func() {
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
require.True(t, sent, "samples not sent")
time.Sleep(100 * time.Millisecond)
}
}()
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
time.Sleep(100 * time.Millisecond)
}
c.waitForExpectedData(t)
}
func TestReshardRaceWithStop(t *testing.T) {
c := NewTestWriteClient()
var m *QueueManager
h := sync.Mutex{}
h.Lock()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.Start()
h.Unlock()
h.Lock()
m.Stop()
}
}()
for i := 1; i < 100; i++ {
h.Lock()
m.reshardChan <- i
h.Unlock()
}
}
func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.Start()
defer m.Stop()
for i := 1; i < 1000; i++ {
m.StoreSeries([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.Labels{
labels.Label{
Name: "asdf",
Value: fmt.Sprintf("%d", i),
},
},
},
}, 0)
m.SeriesReset(1)
}
metric := client_testutil.ToFloat64(noReferenceReleases)
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
}
func TestShouldReshard(t *testing.T) {
type testcase struct {
startingShards int
samplesIn, samplesOut, lastSendTimestamp int64
expectedToReshard bool
}
cases := []testcase{
{
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
startingShards: 10,
samplesIn: 1000,
samplesOut: 10,
lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second),
expectedToReshard: false,
},
{
startingShards: 5,
samplesIn: 1000,
samplesOut: 10,
lastSendTimestamp: time.Now().Unix(),
expectedToReshard: true,
},
}
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
m.lastSendTimestamp.Store(c.lastSendTimestamp)
m.Start()
desiredShards := m.calculateDesiredShards()
shouldReshard := m.shouldReshard(desiredShards)
m.Stop()
require.Equal(t, c.expectedToReshard, shouldReshard)
}
}
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
V: float64(i),
})
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...),
})
}
return samples, series
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numExemplars; j++ {
e := record.RefExemplar{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
V: float64(i),
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
}
exemplars = append(exemplars, e)
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.Labels{{Name: "__name__", Value: name}},
})
}
return exemplars, series
}
func getSeriesNameFromRef(r record.RefSeries) string {
for _, l := range r.Labels {
if l.Name == "__name__" {
return l.Value
}
}
return ""
}
type TestWriteClient struct {
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
receivedExemplars map[string][]prompb.Exemplar
expectedExemplars map[string][]prompb.Exemplar
receivedMetadata map[string][]prompb.MetricMetadata
writesReceived int
withWaitGroup bool
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
}
func NewTestWriteClient() *TestWriteClient {
return &TestWriteClient{
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
}
}
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedSamples = map[string][]prompb.Sample{}
c.receivedSamples = map[string][]prompb.Sample{}
for _, s := range ss {
seriesName := getSeriesNameFromRef(series[s.Ref])
c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{
Timestamp: s.T,
Value: s.V,
})
}
c.wg.Add(len(ss))
}
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedExemplars = map[string][]prompb.Exemplar{}
c.receivedExemplars = map[string][]prompb.Exemplar{}
for _, s := range ss {
seriesName := getSeriesNameFromRef(series[s.Ref])
e := prompb.Exemplar{
Labels: labelsToLabelsProto(s.Labels, nil),
Timestamp: s.T,
Value: s.V,
}
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
}
c.wg.Add(len(ss))
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
if !c.withWaitGroup {
return
}
c.wg.Wait()
c.mtx.Lock()
defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples {
require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts)
}
for ts, expectedExemplar := range c.expectedExemplars {
require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts)
}
}
func (c *TestWriteClient) expectDataCount(numSamples int) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.wg.Add(numSamples)
}
func (c *TestWriteClient) waitForExpectedDataCount() {
if !c.withWaitGroup {
return
}
c.wg.Wait()
}
func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
c.mtx.Lock()
defer c.mtx.Unlock()
// nil buffers are ok for snappy, ignore cast error.
if c.buf != nil {
c.buf = c.buf[:cap(c.buf)]
}
reqBuf, err := snappy.Decode(c.buf, req)
c.buf = reqBuf
if err != nil {
return err
}
var reqProto prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
return err
}
count := 0
for _, ts := range reqProto.Timeseries {
var seriesName string
labels := labelProtosToLabels(ts.Labels)
for _, label := range labels {
if label.Name == "__name__" {
seriesName = label.Value
}
}
for _, sample := range ts.Samples {
count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
}
for _, ex := range ts.Exemplars {
count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
}
}
if c.withWaitGroup {
c.wg.Add(-count)
}
for _, m := range reqProto.Metadata {
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
}
c.writesReceived++
return nil
}
func (c *TestWriteClient) Name() string {
return "testwriteclient"
}
func (c *TestWriteClient) Endpoint() string {
return "http://test-remote.com/1234"
}
// TestBlockingWriteClient is a queue_manager WriteClient which will block
// on any calls to Store(), until the request's Context is cancelled, at which
// point the `numCalls` property will contain a count of how many times Store()
// was called.
type TestBlockingWriteClient struct {
numCalls atomic.Uint64
}
func NewTestBlockedWriteClient() *TestBlockingWriteClient {
return &TestBlockingWriteClient{}
}
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error {
c.numCalls.Inc()
<-ctx.Done()
return nil
}
func (c *TestBlockingWriteClient) NumCalls() uint64 {
return c.numCalls.Load()
}
func (c *TestBlockingWriteClient) Name() string {
return "testblockingwriteclient"
}
func (c *TestBlockingWriteClient) Endpoint() string {
return "http://test-remote-blocking.com/1234"
}
func BenchmarkSampleDelivery(b *testing.B) {
// Send one sample per series, which is the typical remote_write case
const numSamples = 1
const numSeries = 10000
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
extraLabels := labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1
dir, err := ioutil.TempDir("", "BenchmarkSampleDelivery")
require.NoError(b, err)
defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
// These should be received by the client.
m.Start()
defer m.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.expectDataCount(len(samples))
go m.Append(samples)
m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1)
c.waitForExpectedDataCount()
}
// Do not include shutdown
b.StopTimer()
}
func BenchmarkStartup(b *testing.B) {
dir := os.Getenv("WALDIR")
if dir == "" {
return
}
// Find the second largest segment; we will replay up to this.
// (Second largest as WALWatcher will start tailing the largest).
dirents, err := ioutil.ReadDir(dir)
require.NoError(b, err)
var segments []int
for _, dirent := range dirents {
if i, err := strconv.Atoi(dirent.Name()); err != nil {
segments = append(segments, i)
}
}
sort.Ints(segments)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
logger = log.With(logger, "caller", log.DefaultCaller)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
require.NoError(b, err)
}
}
func TestProcessExternalLabels(t *testing.T) {
for _, tc := range []struct {
labels labels.Labels
externalLabels labels.Labels
expected labels.Labels
}{
// Test adding labels at the end.
{
labels: labels.Labels{{Name: "a", Value: "b"}},
externalLabels: labels.Labels{{Name: "c", Value: "d"}},
expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
},
// Test adding labels at the beginning.
{
labels: labels.Labels{{Name: "c", Value: "d"}},
externalLabels: labels.Labels{{Name: "a", Value: "b"}},
expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
},
// Test we don't override existing labels.
{
labels: labels.Labels{{Name: "a", Value: "b"}},
externalLabels: labels.Labels{{Name: "a", Value: "c"}},
expected: labels.Labels{{Name: "a", Value: "b"}},
},
// Test empty externalLabels.
{
labels: labels.Labels{{Name: "a", Value: "b"}},
externalLabels: labels.Labels{},
expected: labels.Labels{{Name: "a", Value: "b"}},
},
// Test empty labels.
{
labels: labels.Labels{},
externalLabels: labels.Labels{{Name: "a", Value: "b"}},
expected: labels.Labels{{Name: "a", Value: "b"}},
},
// Test labels is longer than externalLabels.
{
labels: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
externalLabels: labels.Labels{{Name: "e", Value: "f"}},
expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}, {Name: "e", Value: "f"}},
},
// Test externalLabels is longer than labels.
{
labels: labels.Labels{{Name: "c", Value: "d"}},
externalLabels: labels.Labels{{Name: "a", Value: "b"}, {Name: "e", Value: "f"}},
expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}, {Name: "e", Value: "f"}},
},
} {
require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels))
}
}
func TestCalculateDesiredShards(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
dir, err := ioutil.TempDir("", "TestCalculateDesiredShards")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
// processing.
m.Start()
m.Stop()
inputRate := int64(50000)
var pendingSamples int64
// Two minute startup, no samples are sent.
startedAt := time.Now().Add(-2 * time.Minute)
// helper function for adding samples.
addSamples := func(s int64, ts time.Duration) {
pendingSamples += s
samplesIn.incr(s)
samplesIn.tick()
m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix()))
}
// helper function for sending samples.
sendSamples := func(s int64, ts time.Duration) {
pendingSamples -= s
m.dataOut.incr(s)
m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration))
// highest sent is how far back pending samples would be at our input rate.
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
m.lastSendTimestamp.Store(time.Now().Unix())
}
ts := time.Duration(0)
for ; ts < 120*time.Second; ts += shardUpdateDuration {
addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts)
m.numShards = m.calculateDesiredShards()
require.Equal(t, 1, m.numShards)
}
// Assume 100ms per request, or 10 requests per second per shard.
// Shard calculation should never drop below barely keeping up.
minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10
// This test should never go above 200 shards, that would be more resources than needed.
maxShards := 200
for ; ts < 15*time.Minute; ts += shardUpdateDuration {
sin := inputRate * int64(shardUpdateDuration/time.Second)
addSamples(sin, ts)
sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond))
// You can't send samples that don't exist so cap at the number of pending samples.
if sout > pendingSamples {
sout = pendingSamples
}
sendSamples(sout, ts)
t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples)
m.numShards = m.calculateDesiredShards()
require.GreaterOrEqual(t, m.numShards, minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second)
require.LessOrEqual(t, m.numShards, maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second)
}
require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples)
}
func TestQueueManagerMetrics(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234")
// Make sure metrics pass linting.
problems, err := client_testutil.GatherAndLint(reg)
require.NoError(t, err)
require.Equal(t, 0, len(problems), "Metric linting problems detected: %v", problems)
// Make sure all metrics were unregistered. A failure here means you need
// unregister a metric in `queueManagerMetrics.unregister()`.
metrics.unregister()
err = client_testutil.GatherAndCompare(reg, strings.NewReader(""))
require.NoError(t, err)
}