mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
add sender-side tests and fix failing ones
This commit is contained in:
parent
eebf7ac1fc
commit
3be59f0ca6
|
@ -67,23 +67,25 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
exemplars bool
|
exemplars bool
|
||||||
histograms bool
|
histograms bool
|
||||||
floatHistograms bool
|
floatHistograms bool
|
||||||
|
internProto bool
|
||||||
}{
|
}{
|
||||||
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
|
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
|
||||||
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
|
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
|
||||||
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
|
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
|
||||||
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
||||||
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
||||||
|
|
||||||
|
{internProto: true, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
|
||||||
|
{internProto: true, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
|
||||||
|
{internProto: true, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
|
||||||
|
{internProto: true, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
|
||||||
|
{internProto: true, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let's create an even number of send batches so we don't run into the
|
// Let's create an even number of send batches so we don't run into the
|
||||||
// batch timeout case.
|
// batch timeout case.
|
||||||
n := 3
|
n := 3
|
||||||
|
|
||||||
dir := t.TempDir()
|
|
||||||
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
queueConfig := config.DefaultQueueConfig
|
queueConfig := config.DefaultQueueConfig
|
||||||
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
||||||
queueConfig.MaxShards = 1
|
queueConfig.MaxShards = 1
|
||||||
|
@ -103,6 +105,10 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.internProto)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
series []record.RefSeries
|
series []record.RefSeries
|
||||||
samples []record.RefSample
|
samples []record.RefSample
|
||||||
|
@ -133,7 +139,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
qm := s.rws.queues[hash]
|
qm := s.rws.queues[hash]
|
||||||
|
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(tc.internProto)
|
||||||
qm.SetClient(c)
|
qm.SetClient(c)
|
||||||
|
|
||||||
qm.StoreSeries(series, 0)
|
qm.StoreSeries(series, 0)
|
||||||
|
@ -164,7 +170,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetadataDelivery(t *testing.T) {
|
func TestMetadataDelivery(t *testing.T) {
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(false)
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
@ -198,10 +204,13 @@ func TestMetadataDelivery(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSampleDeliveryTimeout(t *testing.T) {
|
func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
// Let's send one less sample than batch size, and wait the timeout duration
|
// Let's send one less sample than batch size, and wait the timeout duration
|
||||||
n := 9
|
n := 9
|
||||||
samples, series := createTimeseries(n, n)
|
samples, series := createTimeseries(n, n)
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(internProto)
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
@ -211,7 +220,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -224,9 +233,14 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
m.Append(samples)
|
m.Append(samples)
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSampleDeliveryOrder(t *testing.T) {
|
func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
ts := 10
|
ts := 10
|
||||||
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
||||||
samples := make([]record.RefSample, 0, n)
|
samples := make([]record.RefSample, 0, n)
|
||||||
|
@ -244,7 +258,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(internProto)
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
@ -253,7 +267,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -261,6 +275,8 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
m.Append(samples)
|
m.Append(samples)
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestShutdown(t *testing.T) {
|
func TestShutdown(t *testing.T) {
|
||||||
|
@ -325,12 +341,15 @@ func TestSeriesReset(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReshard(t *testing.T) {
|
func TestReshard(t *testing.T) {
|
||||||
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
size := 10 // Make bigger to find more races.
|
size := 10 // Make bigger to find more races.
|
||||||
nSeries := 6
|
nSeries := 6
|
||||||
nSamples := config.DefaultQueueConfig.Capacity * size
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
||||||
samples, series := createTimeseries(nSamples, nSeries)
|
samples, series := createTimeseries(nSamples, nSeries)
|
||||||
|
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(internProto)
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
|
@ -340,7 +359,7 @@ func TestReshard(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -361,10 +380,15 @@ func TestReshard(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.waitForExpectedData(t)
|
c.waitForExpectedData(t)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReshardRaceWithStop(*testing.T) {
|
func TestReshardRaceWithStop(t *testing.T) {
|
||||||
c := NewTestWriteClient()
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
|
c := NewTestWriteClient(internProto)
|
||||||
var m *QueueManager
|
var m *QueueManager
|
||||||
h := sync.Mutex{}
|
h := sync.Mutex{}
|
||||||
|
|
||||||
|
@ -376,7 +400,7 @@ func TestReshardRaceWithStop(*testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.Start()
|
m.Start()
|
||||||
h.Unlock()
|
h.Unlock()
|
||||||
h.Lock()
|
h.Lock()
|
||||||
|
@ -396,9 +420,14 @@ func TestReshardRaceWithStop(*testing.T) {
|
||||||
h.Unlock()
|
h.Unlock()
|
||||||
}
|
}
|
||||||
<-exitCh
|
<-exitCh
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReshardPartialBatch(t *testing.T) {
|
func TestReshardPartialBatch(t *testing.T) {
|
||||||
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
samples, series := createTimeseries(1, 10)
|
samples, series := createTimeseries(1, 10)
|
||||||
|
|
||||||
c := NewTestBlockedWriteClient()
|
c := NewTestBlockedWriteClient()
|
||||||
|
@ -411,7 +440,7 @@ func TestReshardPartialBatch(t *testing.T) {
|
||||||
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -435,12 +464,17 @@ func TestReshardPartialBatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
// We can only call stop if there was not a deadlock.
|
// We can only call stop if there was not a deadlock.
|
||||||
m.Stop()
|
m.Stop()
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
|
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
|
||||||
// where a large scrape (> capacity + max samples per send) is appended at the
|
// where a large scrape (> capacity + max samples per send) is appended at the
|
||||||
// same time as a batch times out according to the batch send deadline.
|
// same time as a batch times out according to the batch send deadline.
|
||||||
func TestQueueFilledDeadlock(t *testing.T) {
|
func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
samples, series := createTimeseries(50, 1)
|
samples, series := createTimeseries(50, 1)
|
||||||
|
|
||||||
c := NewNopWriteClient()
|
c := NewNopWriteClient()
|
||||||
|
@ -456,7 +490,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -476,14 +510,19 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReleaseNoninternedString(t *testing.T) {
|
func TestReleaseNoninternedString(t *testing.T) {
|
||||||
|
for _, proto := range []string{"interned", "non-interned"} {
|
||||||
|
t.Run(proto, func(t *testing.T) {
|
||||||
|
internProto := proto == "interned"
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(internProto)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
|
||||||
|
@ -499,6 +538,8 @@ func TestReleaseNoninternedString(t *testing.T) {
|
||||||
|
|
||||||
metric := client_testutil.ToFloat64(noReferenceReleases)
|
metric := client_testutil.ToFloat64(noReferenceReleases)
|
||||||
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
|
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestShouldReshard(t *testing.T) {
|
func TestShouldReshard(t *testing.T) {
|
||||||
|
@ -529,7 +570,7 @@ func TestShouldReshard(t *testing.T) {
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
client := NewTestWriteClient()
|
client := NewTestWriteClient(false)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||||
m.numShards = c.startingShards
|
m.numShards = c.startingShards
|
||||||
m.dataIn.incr(c.samplesIn)
|
m.dataIn.incr(c.samplesIn)
|
||||||
|
@ -721,14 +762,16 @@ type TestWriteClient struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
buf []byte
|
buf []byte
|
||||||
|
expectInternProto bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestWriteClient() *TestWriteClient {
|
func NewTestWriteClient(expectIntern bool) *TestWriteClient {
|
||||||
return &TestWriteClient{
|
return &TestWriteClient{
|
||||||
withWaitGroup: true,
|
withWaitGroup: true,
|
||||||
receivedSamples: map[string][]prompb.Sample{},
|
receivedSamples: map[string][]prompb.Sample{},
|
||||||
expectedSamples: map[string][]prompb.Sample{},
|
expectedSamples: map[string][]prompb.Sample{},
|
||||||
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
||||||
|
expectInternProto: expectIntern,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -843,17 +886,26 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var reqProto prompb.WriteRequestWithRefs
|
var reqProto *prompb.WriteRequest
|
||||||
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
|
if c.expectInternProto {
|
||||||
|
var reqReduced prompb.WriteRequestWithRefs
|
||||||
|
err = proto.Unmarshal(reqBuf, &reqReduced)
|
||||||
|
if err == nil {
|
||||||
|
reqProto, err = ReducedWriteRequestToWriteRequest(&reqReduced)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
reqProto = &prompb.WriteRequest{}
|
||||||
|
err = proto.Unmarshal(reqBuf, reqProto)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
count := 0
|
count := 0
|
||||||
for _, ts := range reqProto.Timeseries {
|
for _, ts := range reqProto.Timeseries {
|
||||||
tsLabels := labels.Labels{}
|
labels := labelProtosToLabels(ts.Labels)
|
||||||
for _, l := range ts.Labels {
|
seriesName := labels.Get("__name__")
|
||||||
tsLabels = append(tsLabels, labels.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]})
|
|
||||||
}
|
|
||||||
seriesName := tsLabels.Get("__name__")
|
|
||||||
for _, sample := range ts.Samples {
|
for _, sample := range ts.Samples {
|
||||||
count++
|
count++
|
||||||
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
|
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
|
||||||
|
@ -861,15 +913,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
|
|
||||||
for _, ex := range ts.Exemplars {
|
for _, ex := range ts.Exemplars {
|
||||||
count++
|
count++
|
||||||
e := prompb.Exemplar{}
|
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
|
||||||
e.Timestamp = ex.Timestamp
|
|
||||||
e.Value = ex.Value
|
|
||||||
eLabels := make([]prompb.Label, len(ex.Labels))
|
|
||||||
for i, l := range ex.Labels {
|
|
||||||
eLabels[i] = prompb.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]}
|
|
||||||
}
|
|
||||||
e.Labels = eLabels
|
|
||||||
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], e)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, histogram := range ts.Histograms {
|
for _, histogram := range ts.Histograms {
|
||||||
|
@ -1098,7 +1142,7 @@ func TestProcessExternalLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalculateDesiredShards(t *testing.T) {
|
func TestCalculateDesiredShards(t *testing.T) {
|
||||||
c := NewTestWriteClient()
|
c := NewNopWriteClient()
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
|
@ -1175,7 +1219,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
||||||
c := NewTestWriteClient()
|
c := NewTestWriteClient(false)
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue