mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 21:24:05 -08:00
remote: Added test for classic histogram grouping when sending rw; Fixed queue manager test delay. (#13421)
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
f9dc544691
commit
242158e7fc
|
@ -381,7 +381,7 @@ func (m *queueManagerMetrics) unregister() {
|
||||||
// external timeseries database.
|
// external timeseries database.
|
||||||
type WriteClient interface {
|
type WriteClient interface {
|
||||||
// Store stores the given samples in the remote storage.
|
// Store stores the given samples in the remote storage.
|
||||||
Store(context.Context, []byte, int) error
|
Store(ctx context.Context, req []byte, retryAttempt int) error
|
||||||
// Name uniquely identifies the remote storage.
|
// Name uniquely identifies the remote storage.
|
||||||
Name() string
|
Name() string
|
||||||
// Endpoint is the remote read or write endpoint for the storage client.
|
// Endpoint is the remote read or write endpoint for the storage client.
|
||||||
|
|
|
@ -45,6 +45,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/scrape"
|
"github.com/prometheus/prometheus/scrape"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/record"
|
"github.com/prometheus/prometheus/tsdb/record"
|
||||||
|
"github.com/prometheus/prometheus/util/runutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultFlushDeadline = 1 * time.Minute
|
const defaultFlushDeadline = 1 * time.Minute
|
||||||
|
@ -153,7 +154,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
qm.AppendExemplars(exemplars[:len(exemplars)/2])
|
qm.AppendExemplars(exemplars[:len(exemplars)/2])
|
||||||
qm.AppendHistograms(histograms[:len(histograms)/2])
|
qm.AppendHistograms(histograms[:len(histograms)/2])
|
||||||
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
|
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t, 30*time.Second)
|
||||||
|
|
||||||
// Send second half of data.
|
// Send second half of data.
|
||||||
c.expectSamples(samples[len(samples)/2:], series)
|
c.expectSamples(samples[len(samples)/2:], series)
|
||||||
|
@ -164,7 +165,182 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
qm.AppendExemplars(exemplars[len(exemplars)/2:])
|
qm.AppendExemplars(exemplars[len(exemplars)/2:])
|
||||||
qm.AppendHistograms(histograms[len(histograms)/2:])
|
qm.AppendHistograms(histograms[len(histograms)/2:])
|
||||||
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
|
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t, 30*time.Second)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type perRequestWriteClient struct {
|
||||||
|
*TestWriteClient
|
||||||
|
|
||||||
|
expectUnorderedRequests bool
|
||||||
|
|
||||||
|
mtx sync.Mutex
|
||||||
|
|
||||||
|
i int
|
||||||
|
requests []*TestWriteClient
|
||||||
|
expectedSeries []record.RefSeries
|
||||||
|
expectedRequestSamples [][]record.RefSample
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPerRequestWriteClient(expectUnorderedRequests bool) *perRequestWriteClient {
|
||||||
|
return &perRequestWriteClient{
|
||||||
|
expectUnorderedRequests: expectUnorderedRequests,
|
||||||
|
TestWriteClient: NewTestWriteClient(MinStrings),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) {
|
||||||
|
tc := NewTestWriteClient(MinStrings)
|
||||||
|
c.requests = append(c.requests, tc)
|
||||||
|
|
||||||
|
c.expectedSeries = series
|
||||||
|
c.expectedRequestSamples = append(c.expectedRequestSamples, ss)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *perRequestWriteClient) expectedData(t testing.TB) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
c.mtx.Lock()
|
||||||
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
c.TestWriteClient.mtx.Lock()
|
||||||
|
exp := 0
|
||||||
|
for _, ss := range c.expectedRequestSamples {
|
||||||
|
exp += len(ss)
|
||||||
|
}
|
||||||
|
got := deepLen(c.TestWriteClient.receivedSamples)
|
||||||
|
c.TestWriteClient.mtx.Unlock()
|
||||||
|
|
||||||
|
if got < exp {
|
||||||
|
t.Errorf("totally expected %v samples, got %v", exp, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, cl := range c.requests {
|
||||||
|
cl.waitForExpectedData(t, 0*time.Second) // We already waited.
|
||||||
|
t.Log("client", i, "checked")
|
||||||
|
}
|
||||||
|
if c.i != len(c.requests) {
|
||||||
|
t.Errorf("expected %v calls, got %v", len(c.requests), c.i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) error {
|
||||||
|
c.mtx.Lock()
|
||||||
|
defer c.mtx.Unlock()
|
||||||
|
defer func() { c.i++ }()
|
||||||
|
if c.i >= len(c.requests) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.TestWriteClient.Store(ctx, req, r); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
expReqSampleToUse := 0
|
||||||
|
if c.expectUnorderedRequests {
|
||||||
|
// expectUnorderedRequests tells us that multiple shards were used by queue manager,
|
||||||
|
// so we can't trust that incoming requests will match order of c.expectedRequestSamples
|
||||||
|
// slice. However, for successful test case we can assume that first sample value will
|
||||||
|
// match, so find such expected request if any.
|
||||||
|
// NOTE: This assumes sample values have unique values in our tests.
|
||||||
|
for i, es := range c.expectedRequestSamples {
|
||||||
|
if len(es) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, rs := range c.TestWriteClient.receivedSamples {
|
||||||
|
if len(rs) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if es[0].V != rs[0].GetValue() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
expReqSampleToUse = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We tried our best, use normal flow otherwise.
|
||||||
|
}
|
||||||
|
c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries)
|
||||||
|
c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...)
|
||||||
|
return c.requests[c.i].Store(ctx, req, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDefaultQueueConfig() config.QueueConfig {
|
||||||
|
cfg := config.DefaultQueueConfig
|
||||||
|
// For faster unit tests we don't wait default 5 seconds.
|
||||||
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHistogramSampleBatching tests current way of how classic histogram series
|
||||||
|
// are grouped in queue manager.
|
||||||
|
// This is a first step of exploring PRW 2.0 self-contained classic histograms.
|
||||||
|
func TestHistogramSampleBatching(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
series, samples := createTestClassicHistogram(10)
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
queueConfig config.QueueConfig
|
||||||
|
expRequestSamples [][]record.RefSample
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "OneShardDefaultBatch",
|
||||||
|
queueConfig: func() config.QueueConfig {
|
||||||
|
cfg := testDefaultQueueConfig()
|
||||||
|
cfg.MaxShards = 1
|
||||||
|
cfg.MinShards = 1
|
||||||
|
return cfg
|
||||||
|
}(),
|
||||||
|
expRequestSamples: [][]record.RefSample{samples},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "OneShardLimitedBatch",
|
||||||
|
queueConfig: func() config.QueueConfig {
|
||||||
|
cfg := testDefaultQueueConfig()
|
||||||
|
cfg.MaxShards = 1
|
||||||
|
cfg.MinShards = 1
|
||||||
|
cfg.MaxSamplesPerSend = 5
|
||||||
|
return cfg
|
||||||
|
}(),
|
||||||
|
expRequestSamples: [][]record.RefSample{
|
||||||
|
samples[:5], samples[5:10], samples[10:],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "TwoShards",
|
||||||
|
queueConfig: func() config.QueueConfig {
|
||||||
|
cfg := testDefaultQueueConfig()
|
||||||
|
cfg.MaxShards = 2
|
||||||
|
cfg.MinShards = 2
|
||||||
|
return cfg
|
||||||
|
}(),
|
||||||
|
expRequestSamples: [][]record.RefSample{
|
||||||
|
{samples[0], samples[2], samples[4], samples[6], samples[8], samples[10]},
|
||||||
|
{samples[1], samples[3], samples[5], samples[7], samples[9], samples[11]},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
c := newPerRequestWriteClient(tc.queueConfig.MaxShards > 1)
|
||||||
|
|
||||||
|
for _, s := range tc.expRequestSamples {
|
||||||
|
c.expectRequestSamples(s, series)
|
||||||
|
}
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, MinStrings)
|
||||||
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
|
m.Start()
|
||||||
|
m.Append(samples)
|
||||||
|
m.Stop()
|
||||||
|
c.expectedData(t)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -174,7 +350,7 @@ func TestMetadataDelivery(t *testing.T) {
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
@ -211,10 +387,9 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
samples, series := createTimeseries(n, n)
|
samples, series := createTimeseries(n, n)
|
||||||
c := NewTestWriteClient(rwFormat)
|
c := NewTestWriteClient(rwFormat)
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
@ -227,11 +402,11 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
// Send the samples twice, waiting for the samples in the meantime.
|
// Send the samples twice, waiting for the samples in the meantime.
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
m.Append(samples)
|
m.Append(samples)
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t, 30*time.Second)
|
||||||
|
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
m.Append(samples)
|
m.Append(samples)
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t, 30*time.Second)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,7 +436,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
@ -272,7 +447,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
m.Append(samples)
|
m.Append(samples)
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t, 30*time.Second)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -283,7 +458,7 @@ func TestShutdown(t *testing.T) {
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
|
||||||
|
@ -322,7 +497,7 @@ func TestSeriesReset(t *testing.T) {
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
|
||||||
|
@ -349,7 +524,7 @@ func TestReshard(t *testing.T) {
|
||||||
c := NewTestWriteClient(rwFormat)
|
c := NewTestWriteClient(rwFormat)
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
|
|
||||||
|
@ -376,7 +551,7 @@ func TestReshard(t *testing.T) {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t, 30*time.Second)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -390,7 +565,7 @@ func TestReshardRaceWithStop(t *testing.T) {
|
||||||
|
|
||||||
h.Lock()
|
h.Lock()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
exitCh := make(chan struct{})
|
exitCh := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -427,7 +602,7 @@ func TestReshardPartialBatch(t *testing.T) {
|
||||||
|
|
||||||
c := NewTestBlockedWriteClient()
|
c := NewTestBlockedWriteClient()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
batchSendDeadline := time.Millisecond
|
batchSendDeadline := time.Millisecond
|
||||||
|
@ -473,7 +648,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
|
|
||||||
c := NewNopWriteClient()
|
c := NewNopWriteClient()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
cfg.MaxSamplesPerSend = 10
|
cfg.MaxSamplesPerSend = 10
|
||||||
|
@ -511,7 +686,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
func TestReleaseNoninternedString(t *testing.T) {
|
func TestReleaseNoninternedString(t *testing.T) {
|
||||||
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
|
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
|
||||||
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
|
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
c := NewTestWriteClient(rwFormat)
|
c := NewTestWriteClient(rwFormat)
|
||||||
|
@ -559,7 +734,7 @@ func TestShouldReshard(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
@ -681,6 +856,41 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
|
||||||
return histograms, nil, series
|
return histograms, nil, series
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) {
|
||||||
|
samples := make([]record.RefSample, buckets+2)
|
||||||
|
series := make([]record.RefSeries, buckets+2)
|
||||||
|
|
||||||
|
for i := range samples {
|
||||||
|
samples[i] = record.RefSample{
|
||||||
|
Ref: chunks.HeadSeriesRef(i), T: int64(i), V: float64(i),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < buckets; i++ {
|
||||||
|
le := fmt.Sprintf("%v", i)
|
||||||
|
if i == 0 {
|
||||||
|
le = "+Inf"
|
||||||
|
}
|
||||||
|
series[i] = record.RefSeries{
|
||||||
|
Ref: chunks.HeadSeriesRef(i),
|
||||||
|
Labels: labels.FromStrings(
|
||||||
|
"__name__", "http_request_duration_seconds_bucket",
|
||||||
|
"le", le,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
series[buckets] = record.RefSeries{
|
||||||
|
Ref: chunks.HeadSeriesRef(buckets),
|
||||||
|
Labels: labels.FromStrings("__name__", "http_request_duration_seconds_sum"),
|
||||||
|
}
|
||||||
|
series[buckets+1] = record.RefSeries{
|
||||||
|
Ref: chunks.HeadSeriesRef(buckets + 1),
|
||||||
|
Labels: labels.FromStrings("__name__", "http_request_duration_seconds_count"),
|
||||||
|
}
|
||||||
|
return series, samples
|
||||||
|
}
|
||||||
|
|
||||||
func getSeriesNameFromRef(r record.RefSeries) string {
|
func getSeriesNameFromRef(r record.RefSeries) string {
|
||||||
return r.Labels.Get("__name__")
|
return r.Labels.Get("__name__")
|
||||||
}
|
}
|
||||||
|
@ -696,8 +906,6 @@ type TestWriteClient struct {
|
||||||
expectedFloatHistograms map[string][]prompb.Histogram
|
expectedFloatHistograms map[string][]prompb.Histogram
|
||||||
receivedMetadata map[string][]prompb.MetricMetadata
|
receivedMetadata map[string][]prompb.MetricMetadata
|
||||||
writesReceived int
|
writesReceived int
|
||||||
withWaitGroup bool
|
|
||||||
wg sync.WaitGroup
|
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
buf []byte
|
buf []byte
|
||||||
rwFormat RemoteWriteFormat
|
rwFormat RemoteWriteFormat
|
||||||
|
@ -705,7 +913,6 @@ type TestWriteClient struct {
|
||||||
|
|
||||||
func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient {
|
func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient {
|
||||||
return &TestWriteClient{
|
return &TestWriteClient{
|
||||||
withWaitGroup: true,
|
|
||||||
receivedSamples: map[string][]prompb.Sample{},
|
receivedSamples: map[string][]prompb.Sample{},
|
||||||
expectedSamples: map[string][]prompb.Sample{},
|
expectedSamples: map[string][]prompb.Sample{},
|
||||||
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
||||||
|
@ -714,9 +921,6 @@ func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
||||||
if !c.withWaitGroup {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -730,13 +934,9 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R
|
||||||
Value: s.V,
|
Value: s.V,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
c.wg.Add(len(ss))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
|
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
|
||||||
if !c.withWaitGroup {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -752,13 +952,9 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
|
||||||
}
|
}
|
||||||
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
|
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
|
||||||
}
|
}
|
||||||
c.wg.Add(len(ss))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) {
|
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) {
|
||||||
if !c.withWaitGroup {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -769,13 +965,9 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
|
||||||
seriesName := getSeriesNameFromRef(series[h.Ref])
|
seriesName := getSeriesNameFromRef(series[h.Ref])
|
||||||
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
|
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
|
||||||
}
|
}
|
||||||
c.wg.Add(len(hh))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) {
|
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) {
|
||||||
if !c.withWaitGroup {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -786,14 +978,36 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
|
||||||
seriesName := getSeriesNameFromRef(series[fh.Ref])
|
seriesName := getSeriesNameFromRef(series[fh.Ref])
|
||||||
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
|
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
|
||||||
}
|
}
|
||||||
c.wg.Add(len(fhs))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
|
func deepLen[M any](ms ...map[string][]M) int {
|
||||||
if !c.withWaitGroup {
|
l := 0
|
||||||
return
|
for _, m := range ms {
|
||||||
|
for _, v := range m {
|
||||||
|
l += len(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Duration) {
|
||||||
|
tb.Helper()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
|
||||||
|
c.mtx.Lock()
|
||||||
|
exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms)
|
||||||
|
got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms)
|
||||||
|
c.mtx.Unlock()
|
||||||
|
|
||||||
|
if got < exp {
|
||||||
|
return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms, got %v", exp, got)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
tb.Error(err)
|
||||||
}
|
}
|
||||||
c.wg.Wait()
|
|
||||||
|
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
@ -839,44 +1053,31 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("error: ", err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
count := 0
|
|
||||||
for _, ts := range reqProto.Timeseries {
|
for _, ts := range reqProto.Timeseries {
|
||||||
ls := labelProtosToLabels(ts.Labels)
|
ls := labelProtosToLabels(ts.Labels)
|
||||||
seriesName := ls.Get("__name__")
|
seriesName := ls.Get("__name__")
|
||||||
for _, sample := range ts.Samples {
|
if len(ts.Samples) > 0 {
|
||||||
count++
|
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], ts.Samples...)
|
||||||
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
|
|
||||||
}
|
}
|
||||||
|
if len(ts.Exemplars) > 0 {
|
||||||
for _, ex := range ts.Exemplars {
|
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ts.Exemplars...)
|
||||||
count++
|
|
||||||
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
|
|
||||||
}
|
}
|
||||||
|
for _, h := range ts.Histograms {
|
||||||
for _, hist := range ts.Histograms {
|
if h.IsFloatHistogram() {
|
||||||
count++
|
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], h)
|
||||||
if hist.IsFloatHistogram() {
|
|
||||||
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], hist)
|
|
||||||
} else {
|
} else {
|
||||||
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], hist)
|
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], h)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c.withWaitGroup {
|
|
||||||
c.wg.Add(-count)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range reqProto.Metadata {
|
for _, m := range reqProto.Metadata {
|
||||||
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
|
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.writesReceived++
|
c.writesReceived++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -953,7 +1154,7 @@ func BenchmarkSampleSend(b *testing.B) {
|
||||||
|
|
||||||
c := NewNopWriteClient()
|
c := NewNopWriteClient()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
||||||
cfg.MinShards = 20
|
cfg.MinShards = 20
|
||||||
|
@ -1002,7 +1203,7 @@ func BenchmarkStartup(b *testing.B) {
|
||||||
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
|
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
|
||||||
logger = log.With(logger, "caller", log.DefaultCaller)
|
logger = log.With(logger, "caller", log.DefaultCaller)
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
@ -1086,7 +1287,7 @@ func TestProcessExternalLabels(t *testing.T) {
|
||||||
|
|
||||||
func TestCalculateDesiredShards(t *testing.T) {
|
func TestCalculateDesiredShards(t *testing.T) {
|
||||||
c := NewNopWriteClient()
|
c := NewNopWriteClient()
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
@ -1164,7 +1365,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
||||||
|
|
||||||
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
||||||
c := NewTestWriteClient(Base1)
|
c := NewTestWriteClient(Base1)
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
Loading…
Reference in a new issue