add sender-side tests and fix failing ones

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-10-03 14:58:16 -03:00
parent e909eaea31
commit c7a77af3e2

View file

@ -67,23 +67,25 @@ func TestSampleDelivery(t *testing.T) {
exemplars bool
histograms bool
floatHistograms bool
internProto bool
}{
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
{internProto: true, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
{internProto: true, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
{internProto: true, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
{internProto: true, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
{internProto: true, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
}
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := 3
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
@ -103,6 +105,10 @@ func TestSampleDelivery(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.internProto)
defer s.Close()
var (
series []record.RefSeries
samples []record.RefSample
@ -133,7 +139,7 @@ func TestSampleDelivery(t *testing.T) {
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient()
c := NewTestWriteClient(tc.internProto)
qm.SetClient(c)
qm.StoreSeries(series, 0)
@ -164,7 +170,7 @@ func TestSampleDelivery(t *testing.T) {
}
func TestMetadataDelivery(t *testing.T) {
c := NewTestWriteClient()
c := NewTestWriteClient(false)
dir := t.TempDir()
@ -198,69 +204,79 @@ func TestMetadataDelivery(t *testing.T) {
}
func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
samples, series := createTimeseries(n, n)
c := NewTestWriteClient()
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
samples, series := createTimeseries(n, n)
c := NewTestWriteClient(internProto)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
dir := t.TempDir()
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
c.expectSamples(samples, series)
m.Append(samples)
c.waitForExpectedData(t)
})
}
}
func TestSampleDeliveryOrder(t *testing.T) {
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
samples := make([]record.RefSample, 0, n)
series := make([]record.RefSeries, 0, n)
for i := 0; i < n; i++ {
name := fmt.Sprintf("test_metric_%d", i%ts)
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(i),
V: float64(i),
})
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name),
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
samples := make([]record.RefSample, 0, n)
series := make([]record.RefSeries, 0, n)
for i := 0; i < n; i++ {
name := fmt.Sprintf("test_metric_%d", i%ts)
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(i),
V: float64(i),
})
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name),
})
}
c := NewTestWriteClient(internProto)
c.expectSamples(samples, series)
dir := t.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(samples)
c.waitForExpectedData(t)
})
}
c := NewTestWriteClient()
c.expectSamples(samples, series)
dir := t.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(samples)
c.waitForExpectedData(t)
}
func TestShutdown(t *testing.T) {
@ -325,180 +341,205 @@ func TestSeriesReset(t *testing.T) {
}
func TestReshard(t *testing.T) {
size := 10 // Make bigger to find more races.
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
size := 10 // Make bigger to find more races.
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
c := NewTestWriteClient()
c.expectSamples(samples, series)
c := NewTestWriteClient(internProto)
c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
dir := t.TempDir()
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
m.Start()
defer m.Stop()
go func() {
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
require.True(t, sent, "samples not sent")
time.Sleep(100 * time.Millisecond)
}
}()
go func() {
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
require.True(t, sent, "samples not sent")
time.Sleep(100 * time.Millisecond)
}
}()
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
time.Sleep(100 * time.Millisecond)
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
time.Sleep(100 * time.Millisecond)
}
c.waitForExpectedData(t)
})
}
c.waitForExpectedData(t)
}
func TestReshardRaceWithStop(*testing.T) {
c := NewTestWriteClient()
var m *QueueManager
h := sync.Mutex{}
func TestReshardRaceWithStop(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
c := NewTestWriteClient(internProto)
var m *QueueManager
h := sync.Mutex{}
h.Lock()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
exitCh := make(chan struct{})
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start()
h.Unlock()
h.Lock()
m.Stop()
select {
case exitCh <- struct{}{}:
return
default:
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
exitCh := make(chan struct{})
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.Start()
h.Unlock()
h.Lock()
m.Stop()
select {
case exitCh <- struct{}{}:
return
default:
}
}
}()
for i := 1; i < 100; i++ {
h.Lock()
m.reshardChan <- i
h.Unlock()
}
}
}()
for i := 1; i < 100; i++ {
h.Lock()
m.reshardChan <- i
h.Unlock()
<-exitCh
})
}
<-exitCh
}
func TestReshardPartialBatch(t *testing.T) {
samples, series := createTimeseries(1, 10)
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
samples, series := createTimeseries(1, 10)
c := NewTestBlockedWriteClient()
c := NewTestBlockedWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
batchSendDeadline := time.Millisecond
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
batchSendDeadline := time.Millisecond
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.StoreSeries(series, 0)
m.Start()
m.Start()
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
m.Append(samples)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and stopping detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow()
}
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
m.Append(samples)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and stopping detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow()
}
}
// We can only call stop if there was not a deadlock.
m.Stop()
})
}
// We can only call stop if there was not a deadlock.
m.Stop()
}
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
// where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) {
samples, series := createTimeseries(50, 1)
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
samples, series := createTimeseries(50, 1)
c := NewNopWriteClient()
c := NewNopWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.MaxSamplesPerSend = 10
cfg.Capacity = 20
flushDeadline := time.Second
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.MaxSamplesPerSend = 10
cfg.Capacity = 20
flushDeadline := time.Second
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(samples)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and appending detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow()
}
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(samples)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and appending detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow()
}
}
})
}
}
func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start()
defer m.Stop()
for _, proto := range []string{"interned", "non-interned"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient(internProto)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m.Start()
defer m.Stop()
for i := 1; i < 1000; i++ {
m.StoreSeries([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("asdf", fmt.Sprintf("%d", i)),
},
}, 0)
m.SeriesReset(1)
for i := 1; i < 1000; i++ {
m.StoreSeries([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("asdf", fmt.Sprintf("%d", i)),
},
}, 0)
m.SeriesReset(1)
}
metric := client_testutil.ToFloat64(noReferenceReleases)
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
})
}
metric := client_testutil.ToFloat64(noReferenceReleases)
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
}
func TestShouldReshard(t *testing.T) {
@ -529,7 +570,7 @@ func TestShouldReshard(t *testing.T) {
mcfg := config.DefaultMetadataConfig
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
client := NewTestWriteClient(false)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
@ -721,14 +762,16 @@ type TestWriteClient struct {
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
expectInternProto bool
}
func NewTestWriteClient() *TestWriteClient {
func NewTestWriteClient(expectIntern bool) *TestWriteClient {
return &TestWriteClient{
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
expectInternProto: expectIntern,
}
}
@ -843,17 +886,26 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
return err
}
var reqProto prompb.WriteRequestWithRefs
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
var reqProto *prompb.WriteRequest
if c.expectInternProto {
var reqReduced prompb.WriteRequestWithRefs
err = proto.Unmarshal(reqBuf, &reqReduced)
if err == nil {
reqProto, err = ReducedWriteRequestToWriteRequest(&reqReduced)
}
} else {
reqProto = &prompb.WriteRequest{}
err = proto.Unmarshal(reqBuf, reqProto)
}
if err != nil {
return err
}
count := 0
for _, ts := range reqProto.Timeseries {
tsLabels := labels.Labels{}
for _, l := range ts.Labels {
tsLabels = append(tsLabels, labels.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]})
}
seriesName := tsLabels.Get("__name__")
labels := labelProtosToLabels(ts.Labels)
seriesName := labels.Get("__name__")
for _, sample := range ts.Samples {
count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
@ -861,15 +913,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
for _, ex := range ts.Exemplars {
count++
e := prompb.Exemplar{}
e.Timestamp = ex.Timestamp
e.Value = ex.Value
eLabels := make([]prompb.Label, len(ex.Labels))
for i, l := range ex.Labels {
eLabels[i] = prompb.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]}
}
e.Labels = eLabels
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], e)
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
}
for _, histogram := range ts.Histograms {
@ -1098,7 +1142,7 @@ func TestProcessExternalLabels(t *testing.T) {
}
func TestCalculateDesiredShards(t *testing.T) {
c := NewTestWriteClient()
c := NewNopWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
@ -1175,7 +1219,7 @@ func TestCalculateDesiredShards(t *testing.T) {
}
func TestCalculateDesiredShardsDetail(t *testing.T) {
c := NewTestWriteClient()
c := NewTestWriteClient(false)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig