remote: Added test for classic histogram grouping when sending rw; Fixed queue manager test delay.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2023-12-21 12:42:46 +00:00
parent 4d44da2deb
commit 341064ff64
2 changed files with 269 additions and 57 deletions

View file

@ -380,7 +380,7 @@ func (m *queueManagerMetrics) unregister() {
// external timeseries database.
type WriteClient interface {
// Store stores the given samples in the remote storage.
Store(context.Context, []byte, int) error
Store(ctx context.Context, req []byte, retryAttempt int) error
// Name uniquely identifies the remote storage.
Name() string
// Endpoint is the remote read or write endpoint for the storage client.

View file

@ -43,6 +43,7 @@ import (
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/util/runutil"
)
const defaultFlushDeadline = 1 * time.Minute
@ -145,7 +146,7 @@ func TestSampleDelivery(t *testing.T) {
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t)
c.waitForExpectedData(t, 30*time.Second)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
@ -156,6 +157,186 @@ func TestSampleDelivery(t *testing.T) {
qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
c.waitForExpectedData(t, 30*time.Second)
})
}
}
type perRequestWriteClient struct {
*TestWriteClient
allowUnOrderedRequests bool
mtx sync.Mutex
i int
requests []*TestWriteClient
expectedSeries []record.RefSeries
expectedRequestSamples [][]record.RefSample
}
func newPerRequestWriteClient(allowUnOrderedRequests bool) *perRequestWriteClient {
return &perRequestWriteClient{
allowUnOrderedRequests: allowUnOrderedRequests,
TestWriteClient: NewTestWriteClient(),
}
}
func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) {
tc := NewTestWriteClient()
c.requests = append(c.requests, tc)
c.expectedSeries = series
c.expectedRequestSamples = append(c.expectedRequestSamples, ss)
}
func (c *perRequestWriteClient) waitForExpectedData(t testing.TB) {
t.Helper()
// Wait for all expected samples.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
c.TestWriteClient.mtx.Lock()
exp := 0
for _, ss := range c.expectedRequestSamples {
exp += len(ss)
}
got := deepLen(c.TestWriteClient.receivedSamples)
c.TestWriteClient.mtx.Unlock()
if got < exp {
return fmt.Errorf("totally expected %v samples, got %v", exp, got)
}
return nil
}); err != nil {
t.Error(err)
}
for i, cl := range c.requests {
cl.waitForExpectedData(t, 0*time.Second) // We already waited.
t.Log("client", i, "checked")
}
if c.i != len(c.requests) {
t.Errorf("expected %v calls, got %v", len(c.requests), c.i)
}
}
func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) error {
c.mtx.Lock()
defer c.mtx.Unlock()
defer func() { c.i++ }()
if c.i >= len(c.requests) {
return nil
}
if err := c.TestWriteClient.Store(ctx, req, r); err != nil {
return err
}
expReqSampleToUse := 0
if c.allowUnOrderedRequests {
// Best effort "sorting" of incoming requests for concurrent sharding if requested.
// This assumes sample values have unique values in our tests.
for i, es := range c.expectedRequestSamples {
if len(es) == 0 {
continue
}
for _, rs := range c.TestWriteClient.receivedSamples {
if len(rs) == 0 {
continue
}
if es[0].V != rs[0].GetValue() {
break
}
expReqSampleToUse = i
break
}
}
// We tried our best, use normal flow otherwise.
}
c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries)
c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...)
return c.requests[c.i].Store(ctx, req, r)
}
func testDefaultQueueConfig() config.QueueConfig {
cfg := config.DefaultQueueConfig
// For faster unit tests we don't wait default 5 seconds.
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
return cfg
}
// TestHistogramSampleBatching tests current way of how classic histogram series
// are grouped in queue manager.
// This is a first step of exploring PRW 2.0 self-contained classic histograms.
func TestHistogramSampleBatching(t *testing.T) {
t.Parallel()
series, samples := createTestClassicHistogram(10)
for _, tc := range []struct {
name string
queueConfig config.QueueConfig
expRequestSamples [][]record.RefSample
allowUnOrderedRequests bool
}{
{
name: "OneShardDefaultBatch",
queueConfig: func() config.QueueConfig {
cfg := testDefaultQueueConfig()
cfg.MaxShards = 1
cfg.MinShards = 1
return cfg
}(),
expRequestSamples: [][]record.RefSample{samples},
},
{
name: "OneShardLimitedBatch",
queueConfig: func() config.QueueConfig {
cfg := testDefaultQueueConfig()
cfg.MaxShards = 1
cfg.MinShards = 1
cfg.MaxSamplesPerSend = 5
return cfg
}(),
expRequestSamples: [][]record.RefSample{
samples[:5], samples[5:10], samples[10:],
},
},
{
name: "TwoShardsLimitedBatch",
queueConfig: func() config.QueueConfig {
cfg := testDefaultQueueConfig()
cfg.MaxShards = 2
cfg.MinShards = 2
return cfg
}(),
expRequestSamples: [][]record.RefSample{
{samples[0], samples[2], samples[4], samples[6], samples[8], samples[10]},
{samples[1], samples[3], samples[5], samples[7], samples[9], samples[11]},
},
allowUnOrderedRequests: true, // No order guarantee for requests from shards.
},
} {
t.Run(tc.name, func(t *testing.T) {
c := newPerRequestWriteClient(tc.allowUnOrderedRequests)
for _, s := range tc.expRequestSamples {
c.expectRequestSamples(s, series)
}
dir := t.TempDir()
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
m.Append(samples)
c.waitForExpectedData(t)
})
}
@ -166,7 +347,7 @@ func TestMetadataDelivery(t *testing.T) {
dir := t.TempDir()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
@ -201,10 +382,9 @@ func TestSampleDeliveryTimeout(t *testing.T) {
samples, series := createTimeseries(n, n)
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
dir := t.TempDir()
@ -217,11 +397,11 @@ func TestSampleDeliveryTimeout(t *testing.T) {
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
c.waitForExpectedData(t, 30*time.Second)
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
c.waitForExpectedData(t, 30*time.Second)
}
func TestSampleDeliveryOrder(t *testing.T) {
@ -247,7 +427,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
dir := t.TempDir()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
@ -258,7 +438,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
defer m.Stop()
// These should be received by the client.
m.Append(samples)
c.waitForExpectedData(t)
c.waitForExpectedData(t, 30*time.Second)
}
func TestShutdown(t *testing.T) {
@ -267,7 +447,7 @@ func TestShutdown(t *testing.T) {
dir := t.TempDir()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
@ -306,7 +486,7 @@ func TestSeriesReset(t *testing.T) {
dir := t.TempDir()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
@ -331,7 +511,7 @@ func TestReshard(t *testing.T) {
c := NewTestWriteClient()
c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
@ -358,7 +538,7 @@ func TestReshard(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
c.waitForExpectedData(t)
c.waitForExpectedData(t, 30*time.Second)
}
func TestReshardRaceWithStop(*testing.T) {
@ -368,7 +548,7 @@ func TestReshardRaceWithStop(*testing.T) {
h.Lock()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
exitCh := make(chan struct{})
go func() {
@ -401,7 +581,7 @@ func TestReshardPartialBatch(t *testing.T) {
c := NewTestBlockedWriteClient()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
batchSendDeadline := time.Millisecond
@ -443,7 +623,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
c := NewNopWriteClient()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.MaxSamplesPerSend = 10
@ -476,8 +656,8 @@ func TestQueueFilledDeadlock(t *testing.T) {
}
}
func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig
func TestReleaseNonInternedString(t *testing.T) {
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
@ -523,7 +703,7 @@ func TestShouldReshard(t *testing.T) {
},
}
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
@ -641,6 +821,41 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
return histograms, nil, series
}
func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) {
samples := make([]record.RefSample, buckets+2)
series := make([]record.RefSeries, buckets+2)
for i := range samples {
samples[i] = record.RefSample{
Ref: chunks.HeadSeriesRef(i), T: int64(i), V: float64(i),
}
}
for i := 0; i < buckets; i++ {
le := fmt.Sprintf("%v", i)
if i == 0 {
le = "+Inf"
}
series[i] = record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings(
"__name__", "http_request_duration_seconds_bucket",
"le", le,
),
}
}
series[buckets] = record.RefSeries{
Ref: chunks.HeadSeriesRef(buckets),
Labels: labels.FromStrings("__name__", "http_request_duration_seconds_sum"),
}
series[buckets+1] = record.RefSeries{
Ref: chunks.HeadSeriesRef(buckets + 1),
Labels: labels.FromStrings("__name__", "http_request_duration_seconds_count"),
}
return series, samples
}
func getSeriesNameFromRef(r record.RefSeries) string {
return r.Labels.Get("__name__")
}
@ -656,15 +871,13 @@ type TestWriteClient struct {
expectedFloatHistograms map[string][]prompb.Histogram
receivedMetadata map[string][]prompb.MetricMetadata
writesReceived int
withWaitGroup bool
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
mtx sync.Mutex
buf []byte
}
func NewTestWriteClient() *TestWriteClient {
return &TestWriteClient{
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
@ -672,9 +885,6 @@ func NewTestWriteClient() *TestWriteClient {
}
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
@ -688,13 +898,9 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R
Value: s.V,
})
}
c.wg.Add(len(ss))
}
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
@ -710,13 +916,9 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
}
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
}
c.wg.Add(len(ss))
}
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
@ -727,13 +929,9 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
seriesName := getSeriesNameFromRef(series[h.Ref])
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
}
c.wg.Add(len(hh))
}
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
@ -744,14 +942,37 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
seriesName := getSeriesNameFromRef(series[fh.Ref])
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
}
c.wg.Add(len(fhs))
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
if !c.withWaitGroup {
return
func deepLen[M any](ms ...map[string][]M) int {
l := 0
for _, m := range ms {
for _, v := range m {
l += len(v)
}
}
c.wg.Wait()
return l
}
func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Duration) {
tb.Helper()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
c.mtx.Lock()
exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms)
got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms)
c.mtx.Unlock()
if got < exp {
return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms, got %v", exp, got)
}
return nil
}); err != nil {
tb.Error(err)
}
c.mtx.Lock()
defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples {
@ -785,22 +1006,18 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
return err
}
count := 0
for _, ts := range reqProto.Timeseries {
labels := labelProtosToLabels(ts.Labels)
seriesName := labels.Get("__name__")
for _, sample := range ts.Samples {
count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
}
for _, ex := range ts.Exemplars {
count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
}
for _, histogram := range ts.Histograms {
count++
if histogram.IsFloatHistogram() {
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram)
} else {
@ -809,16 +1026,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
}
}
if c.withWaitGroup {
c.wg.Add(-count)
}
for _, m := range reqProto.Metadata {
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
}
c.writesReceived++
return nil
}
@ -895,7 +1107,7 @@ func BenchmarkSampleSend(b *testing.B) {
c := NewNopWriteClient()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MinShards = 20
@ -943,7 +1155,7 @@ func BenchmarkStartup(b *testing.B) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
logger = log.With(logger, "caller", log.DefaultCaller)
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "")
@ -1026,7 +1238,7 @@ func TestProcessExternalLabels(t *testing.T) {
func TestCalculateDesiredShards(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
dir := t.TempDir()
@ -1103,7 +1315,7 @@ func TestCalculateDesiredShards(t *testing.T) {
func TestCalculateDesiredShardsDetail(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
dir := t.TempDir()