mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 14:57:40 -08:00
a09465baee
* storage/remote: disable resharding during active retry backoffs Today, remote_write reshards based on pure throughput. This is problematic if throughput has been diminished because of HTTP 429s; increasing the number of shards due to backpressure will only exacerbate the problem. This commit disables resharding for twice the retry backoff, ensuring that resharding will never occur during an active backoff, and that resharding does not become enabled again until enough time has elapsed to allow any pending requests to be retried. Signed-off-by: Robert Fratto <robertfratto@gmail.com> * storage/remote: test that resharding is disabled on retry Signed-off-by: Robert Fratto <robertfratto@gmail.com> * storage/remote: address review feedback Signed-off-by: Robert Fratto <robertfratto@gmail.com> * storage/remote: track time where resharding initially got disabled This change introduces a second atomic int64 to roughly track when resharding got disabled. This int64 is only updated after updating the disabled timestamp if resharding was previously enabled. Signed-off-by: Robert Fratto <robertfratto@gmail.com> --------- Signed-off-by: Robert Fratto <robertfratto@gmail.com>
1672 lines
48 KiB
Go
1672 lines
48 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"runtime/pprof"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/golang/snappy"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/relabel"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/prompb"
|
|
"github.com/prometheus/prometheus/scrape"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
const defaultFlushDeadline = 1 * time.Minute
|
|
|
|
func newHighestTimestampMetric() *maxTimestamp {
|
|
return &maxTimestamp{
|
|
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "highest_timestamp_in_seconds",
|
|
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
|
|
}),
|
|
}
|
|
}
|
|
|
|
func TestSampleDelivery(t *testing.T) {
|
|
testcases := []struct {
|
|
name string
|
|
samples bool
|
|
exemplars bool
|
|
histograms bool
|
|
floatHistograms bool
|
|
}{
|
|
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
|
|
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
|
|
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
|
|
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
|
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
|
}
|
|
|
|
// Let's create an even number of send batches so we don't run into the
|
|
// batch timeout case.
|
|
n := 3
|
|
|
|
dir := t.TempDir()
|
|
|
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
|
defer s.Close()
|
|
|
|
queueConfig := config.DefaultQueueConfig
|
|
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
queueConfig.MaxShards = 1
|
|
|
|
// We need to set URL's so that metric creation doesn't panic.
|
|
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
|
|
writeConfig.QueueConfig = queueConfig
|
|
writeConfig.SendExemplars = true
|
|
writeConfig.SendNativeHistograms = true
|
|
|
|
conf := &config.Config{
|
|
GlobalConfig: config.DefaultGlobalConfig,
|
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
|
writeConfig,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testcases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
var (
|
|
series []record.RefSeries
|
|
samples []record.RefSample
|
|
exemplars []record.RefExemplar
|
|
histograms []record.RefHistogramSample
|
|
floatHistograms []record.RefFloatHistogramSample
|
|
)
|
|
|
|
// Generates same series in both cases.
|
|
if tc.samples {
|
|
samples, series = createTimeseries(n, n)
|
|
}
|
|
if tc.exemplars {
|
|
exemplars, series = createExemplars(n, n)
|
|
}
|
|
if tc.histograms {
|
|
histograms, _, series = createHistograms(n, n, false)
|
|
}
|
|
if tc.floatHistograms {
|
|
_, floatHistograms, series = createHistograms(n, n, true)
|
|
}
|
|
|
|
// Apply new config.
|
|
queueConfig.Capacity = len(samples)
|
|
queueConfig.MaxSamplesPerSend = len(samples) / 2
|
|
require.NoError(t, s.ApplyConfig(conf))
|
|
hash, err := toHash(writeConfig)
|
|
require.NoError(t, err)
|
|
qm := s.rws.queues[hash]
|
|
|
|
c := NewTestWriteClient()
|
|
qm.SetClient(c)
|
|
|
|
qm.StoreSeries(series, 0)
|
|
|
|
// Send first half of data.
|
|
c.expectSamples(samples[:len(samples)/2], series)
|
|
c.expectExemplars(exemplars[:len(exemplars)/2], series)
|
|
c.expectHistograms(histograms[:len(histograms)/2], series)
|
|
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
|
|
qm.Append(samples[:len(samples)/2])
|
|
qm.AppendExemplars(exemplars[:len(exemplars)/2])
|
|
qm.AppendHistograms(histograms[:len(histograms)/2])
|
|
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
|
|
c.waitForExpectedData(t)
|
|
|
|
// Send second half of data.
|
|
c.expectSamples(samples[len(samples)/2:], series)
|
|
c.expectExemplars(exemplars[len(exemplars)/2:], series)
|
|
c.expectHistograms(histograms[len(histograms)/2:], series)
|
|
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
|
|
qm.Append(samples[len(samples)/2:])
|
|
qm.AppendExemplars(exemplars[len(exemplars)/2:])
|
|
qm.AppendHistograms(histograms[len(histograms)/2:])
|
|
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
|
|
c.waitForExpectedData(t)
|
|
})
|
|
}
|
|
}
|
|
|
|
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration) (*TestWriteClient, *QueueManager) {
|
|
c := NewTestWriteClient()
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c)
|
|
}
|
|
|
|
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient) *QueueManager {
|
|
dir := t.TempDir()
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
|
|
|
return m
|
|
}
|
|
|
|
func TestMetadataDelivery(t *testing.T) {
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
metadata := []scrape.MetricMetadata{}
|
|
numMetadata := 1532
|
|
for i := 0; i < numMetadata; i++ {
|
|
metadata = append(metadata, scrape.MetricMetadata{
|
|
Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i),
|
|
Type: model.MetricTypeCounter,
|
|
Help: "a nice help text",
|
|
Unit: "",
|
|
})
|
|
}
|
|
|
|
m.AppendMetadata(context.Background(), metadata)
|
|
|
|
require.Len(t, c.receivedMetadata, numMetadata)
|
|
// One more write than the rounded qoutient should be performed in order to get samples that didn't
|
|
// fit into MaxSamplesPerSend.
|
|
require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived)
|
|
// Make sure the last samples were sent.
|
|
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
|
|
}
|
|
|
|
func TestSampleDeliveryTimeout(t *testing.T) {
|
|
// Let's send one less sample than batch size, and wait the timeout duration
|
|
n := 9
|
|
samples, series := createTimeseries(n, n)
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
cfg.MaxShards = 1
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
|
|
c := NewTestWriteClient()
|
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c)
|
|
m.StoreSeries(series, 0)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
// Send the samples twice, waiting for the samples in the meantime.
|
|
c.expectSamples(samples, series)
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t)
|
|
|
|
c.expectSamples(samples, series)
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t)
|
|
}
|
|
|
|
func TestSampleDeliveryOrder(t *testing.T) {
|
|
ts := 10
|
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
|
samples := make([]record.RefSample, 0, n)
|
|
series := make([]record.RefSeries, 0, n)
|
|
for i := 0; i < n; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i%ts)
|
|
samples = append(samples, record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(i),
|
|
V: float64(i),
|
|
})
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("__name__", name),
|
|
})
|
|
}
|
|
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
|
|
c.expectSamples(samples, series)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
defer m.Stop()
|
|
// These should be received by the client.
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t)
|
|
}
|
|
|
|
func TestShutdown(t *testing.T) {
|
|
deadline := 1 * time.Second
|
|
c := NewTestBlockedWriteClient()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c)
|
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
|
samples, series := createTimeseries(n, n)
|
|
m.StoreSeries(series, 0)
|
|
m.Start()
|
|
|
|
// Append blocks to guarantee delivery, so we do it in the background.
|
|
go func() {
|
|
m.Append(samples)
|
|
}()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Test to ensure that Stop doesn't block.
|
|
start := time.Now()
|
|
m.Stop()
|
|
// The samples will never be delivered, so duration should
|
|
// be at least equal to deadline, otherwise the flush deadline
|
|
// was not respected.
|
|
duration := time.Since(start)
|
|
if duration > deadline+(deadline/10) {
|
|
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
|
|
}
|
|
if duration < deadline {
|
|
t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline)
|
|
}
|
|
}
|
|
|
|
func TestSeriesReset(t *testing.T) {
|
|
c := NewTestBlockedWriteClient()
|
|
deadline := 5 * time.Second
|
|
numSegments := 4
|
|
numSeries := 25
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c)
|
|
|
|
for i := 0; i < numSegments; i++ {
|
|
series := []record.RefSeries{}
|
|
for j := 0; j < numSeries; j++ {
|
|
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")})
|
|
}
|
|
m.StoreSeries(series, i)
|
|
}
|
|
require.Len(t, m.seriesLabels, numSegments*numSeries)
|
|
m.SeriesReset(2)
|
|
require.Len(t, m.seriesLabels, numSegments*numSeries/2)
|
|
}
|
|
|
|
func TestReshard(t *testing.T) {
|
|
size := 10 // Make bigger to find more races.
|
|
nSeries := 6
|
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
|
samples, series := createTimeseries(nSamples, nSeries)
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
cfg.MaxShards = 1
|
|
|
|
c := NewTestWriteClient()
|
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c)
|
|
c.expectSamples(samples, series)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
go func() {
|
|
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
|
|
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
|
|
require.True(t, sent, "samples not sent")
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
|
|
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
|
|
m.shards.stop()
|
|
m.shards.start(i)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
c.waitForExpectedData(t)
|
|
}
|
|
|
|
func TestReshardRaceWithStop(t *testing.T) {
|
|
c := NewTestWriteClient()
|
|
var m *QueueManager
|
|
h := sync.Mutex{}
|
|
|
|
h.Lock()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
exitCh := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c)
|
|
m.Start()
|
|
h.Unlock()
|
|
h.Lock()
|
|
m.Stop()
|
|
|
|
select {
|
|
case exitCh <- struct{}{}:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i := 1; i < 100; i++ {
|
|
h.Lock()
|
|
m.reshardChan <- i
|
|
h.Unlock()
|
|
}
|
|
<-exitCh
|
|
}
|
|
|
|
func TestReshardPartialBatch(t *testing.T) {
|
|
samples, series := createTimeseries(1, 10)
|
|
|
|
c := NewTestBlockedWriteClient()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
batchSendDeadline := time.Millisecond
|
|
flushDeadline := 10 * time.Millisecond
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
m.Append(samples)
|
|
time.Sleep(batchSendDeadline)
|
|
m.shards.stop()
|
|
m.shards.start(1)
|
|
done <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Deadlock between sending and stopping detected")
|
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
|
t.FailNow()
|
|
}
|
|
}
|
|
// We can only call stop if there was not a deadlock.
|
|
m.Stop()
|
|
}
|
|
|
|
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
|
|
// where a large scrape (> capacity + max samples per send) is appended at the
|
|
// same time as a batch times out according to the batch send deadline.
|
|
func TestQueueFilledDeadlock(t *testing.T) {
|
|
samples, series := createTimeseries(50, 1)
|
|
|
|
c := NewNopWriteClient()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
cfg.MaxSamplesPerSend = 10
|
|
cfg.Capacity = 20
|
|
flushDeadline := time.Second
|
|
batchSendDeadline := time.Millisecond
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c)
|
|
m.StoreSeries(series, 0)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
time.Sleep(batchSendDeadline)
|
|
m.Append(samples)
|
|
done <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Deadlock between sending and appending detected")
|
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
|
t.FailNow()
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestReleaseNoninternedString(t *testing.T) {
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
for i := 1; i < 1000; i++ {
|
|
m.StoreSeries([]record.RefSeries{
|
|
{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("asdf", fmt.Sprintf("%d", i)),
|
|
},
|
|
}, 0)
|
|
m.SeriesReset(1)
|
|
}
|
|
|
|
metric := client_testutil.ToFloat64(noReferenceReleases)
|
|
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
|
|
}
|
|
|
|
func TestShouldReshard(t *testing.T) {
|
|
type testcase struct {
|
|
startingShards int
|
|
samplesIn, samplesOut, lastSendTimestamp int64
|
|
expectedToReshard bool
|
|
}
|
|
cases := []testcase{
|
|
{
|
|
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
|
|
startingShards: 10,
|
|
samplesIn: 1000,
|
|
samplesOut: 10,
|
|
lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second),
|
|
expectedToReshard: false,
|
|
},
|
|
{
|
|
startingShards: 5,
|
|
samplesIn: 1000,
|
|
samplesOut: 10,
|
|
lastSendTimestamp: time.Now().Unix(),
|
|
expectedToReshard: true,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
|
|
m.numShards = c.startingShards
|
|
m.dataIn.incr(c.samplesIn)
|
|
m.dataOut.incr(c.samplesOut)
|
|
m.lastSendTimestamp.Store(c.lastSendTimestamp)
|
|
|
|
m.Start()
|
|
|
|
desiredShards := m.calculateDesiredShards()
|
|
shouldReshard := m.shouldReshard(desiredShards)
|
|
|
|
m.Stop()
|
|
|
|
require.Equal(t, c.expectedToReshard, shouldReshard)
|
|
}
|
|
}
|
|
|
|
// TestDisableReshardOnRetry asserts that resharding should be disabled when a
|
|
// recoverable error is returned from remote_write.
|
|
func TestDisableReshardOnRetry(t *testing.T) {
|
|
onStoredContext, onStoreCalled := context.WithCancel(context.Background())
|
|
defer onStoreCalled()
|
|
|
|
var (
|
|
fakeSamples, fakeSeries = createTimeseries(100, 100)
|
|
|
|
cfg = config.DefaultQueueConfig
|
|
mcfg = config.DefaultMetadataConfig
|
|
retryAfter = time.Second
|
|
|
|
metrics = newQueueManagerMetrics(nil, "", "")
|
|
|
|
client = &MockWriteClient{
|
|
StoreFunc: func(ctx context.Context, b []byte, i int) error {
|
|
onStoreCalled()
|
|
|
|
return RecoverableError{
|
|
error: fmt.Errorf("fake error"),
|
|
retryAfter: model.Duration(retryAfter),
|
|
}
|
|
},
|
|
NameFunc: func() string { return "mock" },
|
|
EndpointFunc: func() string { return "http://fake:9090/api/v1/write" },
|
|
}
|
|
)
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false)
|
|
m.StoreSeries(fakeSeries, 0)
|
|
|
|
// Attempt to samples while the manager is running. We immediately stop the
|
|
// manager after the recoverable error is generated to prevent the manager
|
|
// from resharding itself.
|
|
m.Start()
|
|
{
|
|
m.Append(fakeSamples)
|
|
|
|
select {
|
|
case <-onStoredContext.Done():
|
|
case <-time.After(time.Minute):
|
|
require.FailNow(t, "timed out waiting for client to be sent metrics")
|
|
}
|
|
}
|
|
m.Stop()
|
|
|
|
require.Eventually(t, func() bool {
|
|
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
|
|
// the reason resharding is disabled.
|
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
|
return m.shouldReshard(m.numShards+1) == false
|
|
}, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled")
|
|
|
|
// After 2x retryAfter, resharding should be enabled again.
|
|
require.Eventually(t, func() bool {
|
|
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
|
|
// the reason resharding is disabled.
|
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
|
return m.shouldReshard(m.numShards+1) == true
|
|
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled")
|
|
}
|
|
|
|
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
|
|
samples := make([]record.RefSample, 0, numSamples)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
lb := labels.NewScratchBuilder(1 + len(extraLabels))
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
for j := 0; j < numSamples; j++ {
|
|
samples = append(samples, record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
V: float64(i),
|
|
})
|
|
}
|
|
// Create Labels that is name of series plus any extra labels supplied.
|
|
lb.Reset()
|
|
lb.Add(labels.MetricName, name)
|
|
for _, l := range extraLabels {
|
|
lb.Add(l.Name, l.Value)
|
|
}
|
|
lb.Sort()
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: lb.Labels(),
|
|
})
|
|
}
|
|
return samples, series
|
|
}
|
|
|
|
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
|
|
exemplars := make([]record.RefExemplar, 0, numExemplars)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
for j := 0; j < numExemplars; j++ {
|
|
e := record.RefExemplar{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
V: float64(i),
|
|
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
|
|
}
|
|
exemplars = append(exemplars, e)
|
|
}
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("__name__", name),
|
|
})
|
|
}
|
|
return exemplars, series
|
|
}
|
|
|
|
func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.RefHistogramSample, []record.RefFloatHistogramSample, []record.RefSeries) {
|
|
histograms := make([]record.RefHistogramSample, 0, numSamples)
|
|
floatHistograms := make([]record.RefFloatHistogramSample, 0, numSamples)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
for j := 0; j < numSamples; j++ {
|
|
hist := &histogram.Histogram{
|
|
Schema: 2,
|
|
ZeroThreshold: 1e-128,
|
|
ZeroCount: 0,
|
|
Count: 2,
|
|
Sum: 0,
|
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
PositiveBuckets: []int64{int64(i) + 1},
|
|
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
NegativeBuckets: []int64{int64(-i) - 1},
|
|
}
|
|
|
|
if floatHistogram {
|
|
fh := record.RefFloatHistogramSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
FH: hist.ToFloat(nil),
|
|
}
|
|
floatHistograms = append(floatHistograms, fh)
|
|
} else {
|
|
h := record.RefHistogramSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
H: hist,
|
|
}
|
|
histograms = append(histograms, h)
|
|
}
|
|
}
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("__name__", name),
|
|
})
|
|
}
|
|
if floatHistogram {
|
|
return nil, floatHistograms, series
|
|
}
|
|
return histograms, nil, series
|
|
}
|
|
|
|
func getSeriesNameFromRef(r record.RefSeries) string {
|
|
return r.Labels.Get("__name__")
|
|
}
|
|
|
|
type TestWriteClient struct {
|
|
receivedSamples map[string][]prompb.Sample
|
|
expectedSamples map[string][]prompb.Sample
|
|
receivedExemplars map[string][]prompb.Exemplar
|
|
expectedExemplars map[string][]prompb.Exemplar
|
|
receivedHistograms map[string][]prompb.Histogram
|
|
receivedFloatHistograms map[string][]prompb.Histogram
|
|
expectedHistograms map[string][]prompb.Histogram
|
|
expectedFloatHistograms map[string][]prompb.Histogram
|
|
receivedMetadata map[string][]prompb.MetricMetadata
|
|
writesReceived int
|
|
withWaitGroup bool
|
|
wg sync.WaitGroup
|
|
mtx sync.Mutex
|
|
buf []byte
|
|
}
|
|
|
|
func NewTestWriteClient() *TestWriteClient {
|
|
return &TestWriteClient{
|
|
withWaitGroup: true,
|
|
receivedSamples: map[string][]prompb.Sample{},
|
|
expectedSamples: map[string][]prompb.Sample{},
|
|
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
|
if !c.withWaitGroup {
|
|
return
|
|
}
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedSamples = map[string][]prompb.Sample{}
|
|
c.receivedSamples = map[string][]prompb.Sample{}
|
|
|
|
for _, s := range ss {
|
|
seriesName := getSeriesNameFromRef(series[s.Ref])
|
|
c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{
|
|
Timestamp: s.T,
|
|
Value: s.V,
|
|
})
|
|
}
|
|
c.wg.Add(len(ss))
|
|
}
|
|
|
|
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
|
|
if !c.withWaitGroup {
|
|
return
|
|
}
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedExemplars = map[string][]prompb.Exemplar{}
|
|
c.receivedExemplars = map[string][]prompb.Exemplar{}
|
|
|
|
for _, s := range ss {
|
|
seriesName := getSeriesNameFromRef(series[s.Ref])
|
|
e := prompb.Exemplar{
|
|
Labels: labelsToLabelsProto(s.Labels, nil),
|
|
Timestamp: s.T,
|
|
Value: s.V,
|
|
}
|
|
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
|
|
}
|
|
c.wg.Add(len(ss))
|
|
}
|
|
|
|
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) {
|
|
if !c.withWaitGroup {
|
|
return
|
|
}
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedHistograms = map[string][]prompb.Histogram{}
|
|
c.receivedHistograms = map[string][]prompb.Histogram{}
|
|
|
|
for _, h := range hh {
|
|
seriesName := getSeriesNameFromRef(series[h.Ref])
|
|
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
|
|
}
|
|
c.wg.Add(len(hh))
|
|
}
|
|
|
|
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) {
|
|
if !c.withWaitGroup {
|
|
return
|
|
}
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedFloatHistograms = map[string][]prompb.Histogram{}
|
|
c.receivedFloatHistograms = map[string][]prompb.Histogram{}
|
|
|
|
for _, fh := range fhs {
|
|
seriesName := getSeriesNameFromRef(series[fh.Ref])
|
|
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
|
|
}
|
|
c.wg.Add(len(fhs))
|
|
}
|
|
|
|
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
|
|
if !c.withWaitGroup {
|
|
return
|
|
}
|
|
c.wg.Wait()
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
for ts, expectedSamples := range c.expectedSamples {
|
|
require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts)
|
|
}
|
|
for ts, expectedExemplar := range c.expectedExemplars {
|
|
require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts)
|
|
}
|
|
for ts, expectedHistogram := range c.expectedHistograms {
|
|
require.Equal(tb, expectedHistogram, c.receivedHistograms[ts], ts)
|
|
}
|
|
for ts, expectedFloatHistogram := range c.expectedFloatHistograms {
|
|
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts)
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
// nil buffers are ok for snappy, ignore cast error.
|
|
if c.buf != nil {
|
|
c.buf = c.buf[:cap(c.buf)]
|
|
}
|
|
reqBuf, err := snappy.Decode(c.buf, req)
|
|
c.buf = reqBuf
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var reqProto prompb.WriteRequest
|
|
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
|
|
return err
|
|
}
|
|
builder := labels.NewScratchBuilder(0)
|
|
count := 0
|
|
for _, ts := range reqProto.Timeseries {
|
|
labels := labelProtosToLabels(&builder, ts.Labels)
|
|
seriesName := labels.Get("__name__")
|
|
for _, sample := range ts.Samples {
|
|
count++
|
|
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
|
|
}
|
|
|
|
for _, ex := range ts.Exemplars {
|
|
count++
|
|
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
|
|
}
|
|
|
|
for _, histogram := range ts.Histograms {
|
|
count++
|
|
if histogram.IsFloatHistogram() {
|
|
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram)
|
|
} else {
|
|
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram)
|
|
}
|
|
|
|
}
|
|
}
|
|
if c.withWaitGroup {
|
|
c.wg.Add(-count)
|
|
}
|
|
|
|
for _, m := range reqProto.Metadata {
|
|
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
|
|
}
|
|
|
|
c.writesReceived++
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *TestWriteClient) Name() string {
|
|
return "testwriteclient"
|
|
}
|
|
|
|
func (c *TestWriteClient) Endpoint() string {
|
|
return "http://test-remote.com/1234"
|
|
}
|
|
|
|
// TestBlockingWriteClient is a queue_manager WriteClient which will block
|
|
// on any calls to Store(), until the request's Context is cancelled, at which
|
|
// point the `numCalls` property will contain a count of how many times Store()
|
|
// was called.
|
|
type TestBlockingWriteClient struct {
|
|
numCalls atomic.Uint64
|
|
}
|
|
|
|
func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
|
return &TestBlockingWriteClient{}
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error {
|
|
c.numCalls.Inc()
|
|
<-ctx.Done()
|
|
return nil
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) NumCalls() uint64 {
|
|
return c.numCalls.Load()
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) Name() string {
|
|
return "testblockingwriteclient"
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) Endpoint() string {
|
|
return "http://test-remote-blocking.com/1234"
|
|
}
|
|
|
|
// For benchmarking the send and not the receive side.
|
|
type NopWriteClient struct{}
|
|
|
|
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
|
|
func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil }
|
|
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
|
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
|
|
|
type MockWriteClient struct {
|
|
StoreFunc func(context.Context, []byte, int) error
|
|
NameFunc func() string
|
|
EndpointFunc func() string
|
|
}
|
|
|
|
func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) error {
|
|
return c.StoreFunc(ctx, bb, n)
|
|
}
|
|
func (c *MockWriteClient) Name() string { return c.NameFunc() }
|
|
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
|
|
|
|
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
|
var extraLabels []labels.Label = []labels.Label{
|
|
{Name: "kubernetes_io_arch", Value: "amd64"},
|
|
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
|
|
{Name: "kubernetes_io_os", Value: "linux"},
|
|
{Name: "container_name", Value: "some-name"},
|
|
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
|
|
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
|
|
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
|
|
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
|
|
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
|
|
{Name: "job", Value: "kubernetes-cadvisor"},
|
|
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
|
|
{Name: "monitor", Value: "prod"},
|
|
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
|
|
{Name: "namespace", Value: "kube-system"},
|
|
{Name: "pod_name", Value: "some-other-name-5j8s8"},
|
|
}
|
|
|
|
func BenchmarkSampleSend(b *testing.B) {
|
|
// Send one sample per series, which is the typical remote_write case
|
|
const numSamples = 1
|
|
const numSeries = 10000
|
|
|
|
samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
|
|
|
|
c := NewNopWriteClient()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
cfg.MinShards = 20
|
|
cfg.MaxShards = 20
|
|
|
|
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c)
|
|
m.StoreSeries(series, 0)
|
|
|
|
// These should be received by the client.
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
m.Append(samples)
|
|
m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
|
|
m.SeriesReset(i + 1)
|
|
}
|
|
// Do not include shutdown
|
|
b.StopTimer()
|
|
}
|
|
|
|
// Check how long it takes to add N series, including external labels processing.
|
|
func BenchmarkStoreSeries(b *testing.B) {
|
|
externalLabels := []labels.Label{
|
|
{Name: "cluster", Value: "mycluster"},
|
|
{Name: "replica", Value: "1"},
|
|
}
|
|
relabelConfigs := []*relabel.Config{{
|
|
SourceLabels: model.LabelNames{"namespace"},
|
|
Separator: ";",
|
|
Regex: relabel.MustNewRegexp("kube.*"),
|
|
TargetLabel: "job",
|
|
Replacement: "$1",
|
|
Action: relabel.Replace,
|
|
}}
|
|
testCases := []struct {
|
|
name string
|
|
externalLabels []labels.Label
|
|
ts []prompb.TimeSeries
|
|
relabelConfigs []*relabel.Config
|
|
}{
|
|
{name: "plain"},
|
|
{name: "externalLabels", externalLabels: externalLabels},
|
|
{name: "relabel", relabelConfigs: relabelConfigs},
|
|
{
|
|
name: "externalLabels+relabel",
|
|
externalLabels: externalLabels,
|
|
relabelConfigs: relabelConfigs,
|
|
},
|
|
}
|
|
|
|
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
|
|
const numSeries = 1000
|
|
_, series := createTimeseries(0, numSeries, extraLabels...)
|
|
|
|
for _, tc := range testCases {
|
|
b.Run(tc.name, func(b *testing.B) {
|
|
for i := 0; i < b.N; i++ {
|
|
c := NewTestWriteClient()
|
|
dir := b.TempDir()
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
|
m.externalLabels = tc.externalLabels
|
|
m.relabelConfigs = tc.relabelConfigs
|
|
|
|
m.StoreSeries(series, 0)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkStartup(b *testing.B) {
|
|
dir := os.Getenv("WALDIR")
|
|
if dir == "" {
|
|
b.Skip("WALDIR env var not set")
|
|
}
|
|
|
|
// Find the second largest segment; we will replay up to this.
|
|
// (Second largest as WALWatcher will start tailing the largest).
|
|
dirents, err := os.ReadDir(dir)
|
|
require.NoError(b, err)
|
|
|
|
var segments []int
|
|
for _, dirent := range dirents {
|
|
if i, err := strconv.Atoi(dirent.Name()); err != nil {
|
|
segments = append(segments, i)
|
|
}
|
|
}
|
|
sort.Ints(segments)
|
|
|
|
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
|
|
logger = log.With(logger, "caller", log.DefaultCaller)
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
for n := 0; n < b.N; n++ {
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
c := NewTestBlockedWriteClient()
|
|
m := NewQueueManager(metrics, nil, nil, logger, dir,
|
|
newEWMARate(ewmaWeight, shardUpdateDuration),
|
|
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false)
|
|
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
|
m.watcher.MaxSegment = segments[len(segments)-2]
|
|
err := m.watcher.Run()
|
|
require.NoError(b, err)
|
|
}
|
|
}
|
|
|
|
func TestProcessExternalLabels(t *testing.T) {
|
|
b := labels.NewBuilder(labels.EmptyLabels())
|
|
for i, tc := range []struct {
|
|
labels labels.Labels
|
|
externalLabels []labels.Label
|
|
expected labels.Labels
|
|
}{
|
|
// Test adding labels at the end.
|
|
{
|
|
labels: labels.FromStrings("a", "b"),
|
|
externalLabels: []labels.Label{{Name: "c", Value: "d"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d"),
|
|
},
|
|
|
|
// Test adding labels at the beginning.
|
|
{
|
|
labels: labels.FromStrings("c", "d"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d"),
|
|
},
|
|
|
|
// Test we don't override existing labels.
|
|
{
|
|
labels: labels.FromStrings("a", "b"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "c"}},
|
|
expected: labels.FromStrings("a", "b"),
|
|
},
|
|
|
|
// Test empty externalLabels.
|
|
{
|
|
labels: labels.FromStrings("a", "b"),
|
|
externalLabels: []labels.Label{},
|
|
expected: labels.FromStrings("a", "b"),
|
|
},
|
|
|
|
// Test empty labels.
|
|
{
|
|
labels: labels.EmptyLabels(),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}},
|
|
expected: labels.FromStrings("a", "b"),
|
|
},
|
|
|
|
// Test labels is longer than externalLabels.
|
|
{
|
|
labels: labels.FromStrings("a", "b", "c", "d"),
|
|
externalLabels: []labels.Label{{Name: "e", Value: "f"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
|
},
|
|
|
|
// Test externalLabels is longer than labels.
|
|
{
|
|
labels: labels.FromStrings("c", "d"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}, {Name: "e", Value: "f"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
|
},
|
|
|
|
// Adding with and without clashing labels.
|
|
{
|
|
labels: labels.FromStrings("a", "b", "c", "d"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "xxx"}, {Name: "c", Value: "yyy"}, {Name: "e", Value: "f"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
|
},
|
|
} {
|
|
b.Reset(tc.labels)
|
|
processExternalLabels(b, tc.externalLabels)
|
|
testutil.RequireEqual(t, tc.expected, b.Labels(), "test %d", i)
|
|
}
|
|
}
|
|
|
|
func TestCalculateDesiredShards(t *testing.T) {
|
|
cfg := config.DefaultQueueConfig
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
|
|
samplesIn := m.dataIn
|
|
|
|
// Need to start the queue manager so the proper metrics are initialized.
|
|
// However we can stop it right away since we don't need to do any actual
|
|
// processing.
|
|
m.Start()
|
|
m.Stop()
|
|
|
|
inputRate := int64(50000)
|
|
var pendingSamples int64
|
|
|
|
// Two minute startup, no samples are sent.
|
|
startedAt := time.Now().Add(-2 * time.Minute)
|
|
|
|
// helper function for adding samples.
|
|
addSamples := func(s int64, ts time.Duration) {
|
|
pendingSamples += s
|
|
samplesIn.incr(s)
|
|
samplesIn.tick()
|
|
|
|
m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix()))
|
|
}
|
|
|
|
// helper function for sending samples.
|
|
sendSamples := func(s int64, ts time.Duration) {
|
|
pendingSamples -= s
|
|
m.dataOut.incr(s)
|
|
m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration))
|
|
|
|
// highest sent is how far back pending samples would be at our input rate.
|
|
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
|
|
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
|
|
|
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
|
}
|
|
|
|
ts := time.Duration(0)
|
|
for ; ts < 120*time.Second; ts += shardUpdateDuration {
|
|
addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts)
|
|
m.numShards = m.calculateDesiredShards()
|
|
require.Equal(t, 1, m.numShards)
|
|
}
|
|
|
|
// Assume 100ms per request, or 10 requests per second per shard.
|
|
// Shard calculation should never drop below barely keeping up.
|
|
minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10
|
|
// This test should never go above 200 shards, that would be more resources than needed.
|
|
maxShards := 200
|
|
|
|
for ; ts < 15*time.Minute; ts += shardUpdateDuration {
|
|
sin := inputRate * int64(shardUpdateDuration/time.Second)
|
|
addSamples(sin, ts)
|
|
|
|
sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond))
|
|
// You can't send samples that don't exist so cap at the number of pending samples.
|
|
if sout > pendingSamples {
|
|
sout = pendingSamples
|
|
}
|
|
sendSamples(sout, ts)
|
|
|
|
t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples)
|
|
m.numShards = m.calculateDesiredShards()
|
|
require.GreaterOrEqual(t, m.numShards, minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second)
|
|
require.LessOrEqual(t, m.numShards, maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second)
|
|
}
|
|
require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples)
|
|
}
|
|
|
|
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline)
|
|
samplesIn := m.dataIn
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
prevShards int
|
|
dataIn int64 // Quantities normalised to seconds.
|
|
dataOut int64
|
|
dataDropped int64
|
|
dataOutDuration float64
|
|
backlog float64
|
|
expectedShards int
|
|
}{
|
|
{
|
|
name: "nothing in or out 1",
|
|
prevShards: 1,
|
|
expectedShards: 1, // Shards stays the same.
|
|
},
|
|
{
|
|
name: "nothing in or out 10",
|
|
prevShards: 10,
|
|
expectedShards: 10, // Shards stays the same.
|
|
},
|
|
{
|
|
name: "steady throughput",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1,
|
|
expectedShards: 1,
|
|
},
|
|
{
|
|
name: "scale down",
|
|
prevShards: 10,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 5,
|
|
expectedShards: 5,
|
|
},
|
|
{
|
|
name: "scale down constrained",
|
|
prevShards: 7,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 5,
|
|
expectedShards: 7,
|
|
},
|
|
{
|
|
name: "scale up",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 10,
|
|
expectedShards: 10,
|
|
},
|
|
{
|
|
name: "scale up constrained",
|
|
prevShards: 8,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 10,
|
|
expectedShards: 8,
|
|
},
|
|
{
|
|
name: "backlogged 20s",
|
|
prevShards: 2,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 2,
|
|
backlog: 20,
|
|
expectedShards: 4,
|
|
},
|
|
{
|
|
name: "backlogged 90s",
|
|
prevShards: 4,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 4,
|
|
backlog: 90,
|
|
expectedShards: 22,
|
|
},
|
|
{
|
|
name: "backlog reduced",
|
|
prevShards: 22,
|
|
dataIn: 10,
|
|
dataOut: 20,
|
|
dataOutDuration: 4,
|
|
backlog: 10,
|
|
expectedShards: 3,
|
|
},
|
|
{
|
|
name: "backlog eliminated",
|
|
prevShards: 3,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 2,
|
|
backlog: 0,
|
|
expectedShards: 2, // Shard back down.
|
|
},
|
|
{
|
|
name: "slight slowdown",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.2,
|
|
expectedShards: 2, // 1.2 is rounded up to 2.
|
|
},
|
|
{
|
|
name: "bigger slowdown",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.4,
|
|
expectedShards: 2,
|
|
},
|
|
{
|
|
name: "speed up",
|
|
prevShards: 2,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.2,
|
|
backlog: 0,
|
|
expectedShards: 2, // No reaction - 1.2 is rounded up to 2.
|
|
},
|
|
{
|
|
name: "speed up more",
|
|
prevShards: 2,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 0.9,
|
|
backlog: 0,
|
|
expectedShards: 1,
|
|
},
|
|
{
|
|
name: "marginal decision A",
|
|
prevShards: 3,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 2.01,
|
|
backlog: 0,
|
|
expectedShards: 3, // 2.01 rounds up to 3.
|
|
},
|
|
{
|
|
name: "marginal decision B",
|
|
prevShards: 3,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.99,
|
|
backlog: 0,
|
|
expectedShards: 2, // 1.99 rounds up to 2.
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
m.numShards = tc.prevShards
|
|
forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second))
|
|
samplesIn.tick()
|
|
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second))
|
|
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second))
|
|
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration)))
|
|
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value.
|
|
|
|
require.Equal(t, tc.expectedShards, m.calculateDesiredShards())
|
|
})
|
|
}
|
|
}
|
|
|
|
func forceEMWA(r *ewmaRate, rate int64) {
|
|
r.init = false
|
|
r.newEvents.Store(rate)
|
|
}
|
|
|
|
func TestQueueManagerMetrics(t *testing.T) {
|
|
reg := prometheus.NewPedanticRegistry()
|
|
metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234")
|
|
|
|
// Make sure metrics pass linting.
|
|
problems, err := client_testutil.GatherAndLint(reg)
|
|
require.NoError(t, err)
|
|
require.Empty(t, problems, "Metric linting problems detected: %v", problems)
|
|
|
|
// Make sure all metrics were unregistered. A failure here means you need
|
|
// unregister a metric in `queueManagerMetrics.unregister()`.
|
|
metrics.unregister()
|
|
err = client_testutil.GatherAndCompare(reg, strings.NewReader(""))
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
|
|
capacity := 100
|
|
batchSize := 10
|
|
queue := newQueue(batchSize, capacity)
|
|
for i := 0; i < capacity+batchSize; i++ {
|
|
queue.Append(timeSeries{})
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go queue.FlushAndShutdown(done)
|
|
go func() {
|
|
// Give enough time for FlushAndShutdown to acquire the lock. queue.Batch()
|
|
// should not block forever even if the lock is acquired.
|
|
time.Sleep(10 * time.Millisecond)
|
|
queue.Batch()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Deadlock in FlushAndShutdown detected")
|
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
|
t.FailNow()
|
|
}
|
|
}
|
|
|
|
func TestDropOldTimeSeries(t *testing.T) {
|
|
size := 10
|
|
nSeries := 6
|
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
|
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
|
|
|
|
c := NewTestWriteClient()
|
|
c.expectSamples(newSamples, series)
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
|
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t)
|
|
}
|
|
|
|
func TestIsSampleOld(t *testing.T) {
|
|
currentTime := time.Now()
|
|
require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second))))
|
|
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
|
|
}
|
|
|
|
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
|
|
newSamples := make([]record.RefSample, 0, numSamples)
|
|
samples := make([]record.RefSample, 0, numSamples)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
lb := labels.NewScratchBuilder(1 + len(extraLabels))
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
// We create half of the samples in the past.
|
|
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
|
|
for j := 0; j < numSamples/2; j++ {
|
|
samples = append(samples, record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: past + int64(j),
|
|
V: float64(i),
|
|
})
|
|
}
|
|
for j := 0; j < numSamples/2; j++ {
|
|
sample := record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(int(time.Now().UnixMilli()) + j),
|
|
V: float64(i),
|
|
}
|
|
samples = append(samples, sample)
|
|
newSamples = append(newSamples, sample)
|
|
}
|
|
// Create Labels that is name of series plus any extra labels supplied.
|
|
lb.Reset()
|
|
lb.Add(labels.MetricName, name)
|
|
for _, l := range extraLabels {
|
|
lb.Add(l.Name, l.Value)
|
|
}
|
|
lb.Sort()
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: lb.Labels(),
|
|
})
|
|
}
|
|
return samples, newSamples, series
|
|
}
|
|
|
|
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {
|
|
return limit > ts.Samples[0].Timestamp
|
|
}
|
|
|
|
func TestBuildTimeSeries(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
ts []prompb.TimeSeries
|
|
filter func(ts prompb.TimeSeries) bool
|
|
lowestTs int64
|
|
highestTs int64
|
|
droppedSamples int
|
|
responseLen int
|
|
}{
|
|
{
|
|
name: "No filter applied",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567891,
|
|
Value: 2.34,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.34,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: nil,
|
|
responseLen: 3,
|
|
lowestTs: 1234567890,
|
|
highestTs: 1234567892,
|
|
},
|
|
{
|
|
name: "Filter applied, samples in order",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567891,
|
|
Value: 2.34,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567893,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
|
|
responseLen: 2,
|
|
lowestTs: 1234567892,
|
|
highestTs: 1234567893,
|
|
droppedSamples: 2,
|
|
},
|
|
{
|
|
name: "Filter applied, samples out of order",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567893,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567891,
|
|
Value: 2.34,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
|
|
responseLen: 2,
|
|
lowestTs: 1234567892,
|
|
highestTs: 1234567893,
|
|
droppedSamples: 2,
|
|
},
|
|
{
|
|
name: "Filter applied, samples not consecutive",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567895,
|
|
Value: 6.78,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567897,
|
|
Value: 6.78,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
|
|
responseLen: 2,
|
|
lowestTs: 1234567895,
|
|
highestTs: 1234567897,
|
|
droppedSamples: 2,
|
|
},
|
|
}
|
|
|
|
// Run the test cases
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
|
|
require.NotNil(t, result)
|
|
require.Len(t, result, tc.responseLen)
|
|
require.Equal(t, tc.highestTs, highest)
|
|
require.Equal(t, tc.lowestTs, lowest)
|
|
require.Equal(t, tc.droppedSamples, droppedSamples)
|
|
})
|
|
}
|
|
}
|