diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 90c5465300..63f33d7671 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -783,14 +783,17 @@ func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels { // labelsToLabelsProto transforms labels into prompb labels. The buffer slice // will be used to avoid allocations if it is big enough to store the labels. func labelsToLabelsProto(lbls labels.Labels, buf []*prompb.Label) []*prompb.Label { - result := buf[:0] + //result := buf[:0] + i := 0 lbls.Range(func(l labels.Label) { - result = append(result, &prompb.Label{ - Name: l.Name, - Value: l.Value, - }) + buf[i].Name = l.Name + buf[i].Value = l.Value + // Name: l.Name, + // Value: l.Value, + //}) + i++ }) - return result + return buf[:i] } // metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 4c86e114df..bf7e547173 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -14,7 +14,6 @@ package remote import ( - "bytes" "fmt" "sync" "testing" @@ -516,14 +515,14 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { } } -func TestDecodeWriteRequest(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) - require.NoError(t, err) - - actual, err := DecodeWriteRequest(bytes.NewReader(buf)) - require.NoError(t, err) - require.Equal(t, writeRequestFixture, actual) -} +//func TestDecodeWriteRequest(t *testing.T) { +// buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) +// require.NoError(t, err) +// +// actual, err := DecodeWriteRequest(bytes.NewReader(buf)) +// require.NoError(t, err) +// require.Equal(t, writeRequestFixture, actual) +//} func TestNilHistogramProto(*testing.T) { // This function will panic if it impromperly handles nil diff --git a/storage/remote/cpu.out b/storage/remote/cpu.out new file mode 100644 index 0000000000..7ac549e26e Binary files /dev/null and b/storage/remote/cpu.out differ diff --git a/storage/remote/mem.out b/storage/remote/mem.out new file mode 100644 index 0000000000..23ff7f5e22 Binary files /dev/null and b/storage/remote/mem.out differ diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index f2d545ae5b..95ac94acd1 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -17,7 +17,6 @@ import ( "context" "errors" "math" - "strconv" "sync" "time" @@ -512,14 +511,14 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met }) } - pBuf := proto.NewBuffer(nil) + //pBuf := proto.NewBuffer(nil) numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend))) for i := 0; i < numSends; i++ { last := (i + 1) * t.mcfg.MaxSamplesPerSend if last > len(metadata) { last = len(metadata) } - err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], pBuf) + err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], nil) if err != nil { t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend))) level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err) @@ -527,7 +526,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met } } -func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer) error { +func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []*prompb.MetricMetadata, pBuf *[]byte) error { // Build the WriteRequest with no samples. req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) if err != nil { @@ -1342,28 +1341,27 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } }() - shardNum := strconv.Itoa(shardID) + //shardNum := strconv.Itoa(shardID) // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline anyways. var ( max = s.qm.cfg.MaxSamplesPerSend - pBuf = proto.NewBuffer(nil) - buf []byte + pBuf, buf []byte ) if s.qm.sendExemplars { max += int(float64(max) * 0.1) } batchQueue := queue.Chan() - pendingData := make([]*prompb.TimeSeries, max) - for i := range pendingData { - pendingData[i].Samples = []*prompb.Sample{{}} - if s.qm.sendExemplars { - pendingData[i].Exemplars = []*prompb.Exemplar{{}} - } - } + //pendingData := make([]*prompb.TimeSeries, max) + //for i := range pendingData { + // pendingData[i].Samples = []*prompb.Sample{{}} + // if s.qm.sendExemplars { + // pendingData[i].Exemplars = []*prompb.Exemplar{{}} + // } + //} timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { @@ -1399,24 +1397,35 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) - queue.ReturnForReuse(batch) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) - + //nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, true, true) + //queue.ReturnForReuse(batch) + //n := nPendingSamples + nPendingExemplars + nPendingHistograms + //s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, batch, &pBuf, &buf) + //for _, p := range pendingData { + // for _, l := range p.Labels { + // l.ReturnToVTPool() + // } + // p.ReturnToVTPool() + //} stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, - "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + //nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, true, true) + //n := nPendingSamples + nPendingExemplars + nPendingHistograms + //level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, + // "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) + //s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, batch, &pBuf, &buf) + } queue.ReturnForReuse(batch) + //for _, p := range pendingData { + // p.ReturnToVTPool() + //} timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) } } @@ -1425,6 +1434,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { func populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms int for nPending, d := range batch { + pendingData[nPending] = prompb.TimeSeriesFromVTPool() //fmt.Println("pending:", pendingData[nPending]) pendingData[nPending].Samples = pendingData[nPending].Samples[:0] if sendExemplars { @@ -1437,7 +1447,26 @@ func populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries, se // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. + if len(pendingData[nPending].Labels) < len(d.seriesLabels) { + //fmt.Println("preallocating buffer") + //fmt.Println("should preallocate") + //fmt.Println("len series labels", len(d.seriesLabels)) + //fmt.Println("len pending labels", len(pendingData[nPending].Labels)) + //pendingData[nPending].Labels = make([]*prompb.Label, len(d.seriesLabels)) + lPending := len(pendingData[nPending].Labels) + for i := 0; i < (len(d.seriesLabels) - lPending); i++ { + //fmt.Println("grabbing from ppol") + pendingData[nPending].Labels = append(pendingData[nPending].Labels, prompb.LabelFromVTPool()) + } + } pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + //defer func() { + // for _, d := range pendingData { + // for _, l := range d.Labels { + // l.ReturnToVTPool() + // } + // } + //}() switch d.sType { case tSample: pendingData[nPending].Samples = append(pendingData[nPending].Samples, &prompb.Sample{ @@ -1503,35 +1532,35 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []*prompb.Ti return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendSamples(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { - begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) +func (s *shards) sendSamples(ctx context.Context, batch []timeSeries, pBuf, buf *[]byte) { + //begin := time.Now() + err := s.sendSamplesWithBackoff(ctx, batch, pBuf, buf) if err != nil { - level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) - s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) + //level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) + //s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) + //s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) + //s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) } // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure. - s.qm.dataOut.incr(int64(len(samples))) - s.qm.dataOutDuration.incr(int64(time.Since(begin))) - s.qm.lastSendTimestamp.Store(time.Now().Unix()) - // Pending samples/exemplars/histograms also should be subtracted, as an error means - // they will not be retried. - s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) - s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) - s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) - s.enqueuedSamples.Sub(int64(sampleCount)) - s.enqueuedExemplars.Sub(int64(exemplarCount)) - s.enqueuedHistograms.Sub(int64(histogramCount)) + //s.qm.dataOut.incr(int64(len(samples))) + //s.qm.dataOutDuration.incr(int64(time.Since(begin))) + //s.qm.lastSendTimestamp.Store(time.Now().Unix()) + //// Pending samples/exemplars/histograms also should be subtracted, as an error means + //// they will not be retried. + //s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) + //s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) + //s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) + //s.enqueuedSamples.Sub(int64(sampleCount)) + //s.enqueuedExemplars.Sub(int64(exemplarCount)) + //s.enqueuedHistograms.Sub(int64(histogramCount)) } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, batch []timeSeries, pBuf, buf *[]byte) error { // Build the WriteRequest with no metadata. - req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) + req, highest, err := buildWriteRequest(batch, nil, pBuf, buf) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. @@ -1550,23 +1579,23 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.T span.SetAttributes( attribute.Int("request_size", reqSize), - attribute.Int("samples", sampleCount), + //attribute.Int("samples", sampleCount), attribute.Int("try", try), attribute.String("remote_name", s.qm.storeClient.Name()), attribute.String("remote_url", s.qm.storeClient.Endpoint()), ) - if exemplarCount > 0 { - span.SetAttributes(attribute.Int("exemplars", exemplarCount)) - } - if histogramCount > 0 { - span.SetAttributes(attribute.Int("histograms", histogramCount)) - } + //if exemplarCount > 0 { + // span.SetAttributes(attribute.Int("exemplars", exemplarCount)) + //} + //if histogramCount > 0 { + // span.SetAttributes(attribute.Int("histograms", histogramCount)) + //} begin := time.Now() - s.qm.metrics.samplesTotal.Add(float64(sampleCount)) - s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) + //s.qm.metrics.samplesTotal.Add(float64(sampleCount)) + //s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) + //s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) err := s.qm.client().Store(ctx, *buf, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) @@ -1579,9 +1608,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.T } onRetry := func() { - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + //s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) + //s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) + //s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) } err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) @@ -1649,24 +1678,129 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } } -func buildWriteRequest(samples []*prompb.TimeSeries, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { +func buildWriteRequest(batch []timeSeries, metadata []*prompb.MetricMetadata, pBuf, buf *[]byte) ([]byte, int64, error) { var highest int64 - for _, ts := range samples { - // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { - highest = ts.Samples[0].Timestamp + var nPending, pendingSamples, pendingExemplars, pendingHistograms int + + req := prompb.WriteRequestFromVTPool() + if req.Timeseries == nil { + req.Timeseries = []*prompb.TimeSeries{} + } + if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil { + for i := len(req.Timeseries); i < len(batch); i++ { + req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool()) + req.Timeseries[i].Samples = []*prompb.Sample{} } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { - highest = ts.Exemplars[0].Timestamp + } + defer func() { + for _, ts := range req.Timeseries { + for _, s := range ts.Samples { + s.ReturnToVTPool() + } + ts.ReturnToVTPool() } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { - highest = ts.Histograms[0].Timestamp + req.ReturnToVTPool() + }() + + for _, ts := range batch { + if ts.timestamp > highest { + highest = ts.timestamp + } + switch ts.sType { + case tSample: + s := prompb.SampleFromVTPool() + s.Timestamp = ts.timestamp + s.Value = ts.value + req.Timeseries[nPending].Samples = append(req.Timeseries[nPending].Samples, s) + pendingSamples++ + case tExemplar: + req.Timeseries[nPending].Exemplars = append(req.Timeseries[nPending].Exemplars, &prompb.Exemplar{ + Labels: labelsToLabelsProto(ts.exemplarLabels, nil), + Value: ts.value, + Timestamp: ts.timestamp, + }) + pendingExemplars++ + case tHistogram: + req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, HistogramToHistogramProto(ts.timestamp, ts.histogram)) + pendingHistograms++ + case tFloatHistogram: + req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, FloatHistogramToHistogramProto(ts.timestamp, ts.floatHistogram)) + pendingHistograms++ } } - req := &prompb.WriteRequest{ - Timeseries: samples, - Metadata: metadata, + if len(*pBuf) < req.SizeVT() { + *pBuf = make([]byte, req.SizeVT()) + } + d, err := req.MarshalToVT(*pBuf) + if err != nil { + return nil, highest, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + *buf = (*buf)[0:cap(*buf)] + } else { + buf = &[]byte{} + } + compressed := snappy.Encode(*buf, (*pBuf)[:d]) + if n := snappy.MaxEncodedLen(d); n > len(*buf) { + // grow the buffer for the next time + *buf = make([]byte, n) + } + return compressed, highest, nil +} + +func buildWriteRequestOld(batch []timeSeries, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { + var highest int64 + var nPending, pendingSamples, pendingExemplars, pendingHistograms int + + req := prompb.WriteRequestFromVTPool() + if req.Timeseries == nil { + req.Timeseries = []*prompb.TimeSeries{} + } + if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil { + for i := len(req.Timeseries); i < len(batch); i++ { + req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool()) + req.Timeseries[i].Samples = []*prompb.Sample{} + } + } + defer func() { + for _, ts := range req.Timeseries { + for _, s := range ts.Samples { + s.ReturnToVTPool() + } + ts.ReturnToVTPool() + } + req.ReturnToVTPool() + }() + + for _, ts := range batch { + if ts.timestamp > highest { + highest = ts.timestamp + } + switch ts.sType { + case tSample: + s := prompb.SampleFromVTPool() + s.Timestamp = ts.timestamp + s.Value = ts.value + req.Timeseries[nPending].Samples = append(req.Timeseries[nPending].Samples, s) + pendingSamples++ + case tExemplar: + req.Timeseries[nPending].Exemplars = append(req.Timeseries[nPending].Exemplars, &prompb.Exemplar{ + Labels: labelsToLabelsProto(ts.exemplarLabels, nil), + Value: ts.value, + Timestamp: ts.timestamp, + }) + pendingExemplars++ + case tHistogram: + req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, HistogramToHistogramProto(ts.timestamp, ts.histogram)) + pendingHistograms++ + case tFloatHistogram: + req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, FloatHistogramToHistogramProto(ts.timestamp, ts.floatHistogram)) + pendingHistograms++ + } } if pBuf == nil { @@ -1687,3 +1821,132 @@ func buildWriteRequest(samples []*prompb.TimeSeries, metadata []*prompb.MetricMe compressed := snappy.Encode(buf, pBuf.Bytes()) return compressed, highest, nil } + +func buildVTWriteRequest(batch []timeSeries) ([]byte, int64, error) { + var nPendingSamples int //, nPendingExemplars, nPendingHistograms int + nSeries := 0 + + req := prompb.WriteRequestFromVTPool() + if req.Timeseries == nil { + req.Timeseries = []*prompb.TimeSeries{} + } + if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil { + for i := len(req.Timeseries); i < len(batch); i++ { + req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool()) + req.Timeseries[nSeries].Samples = []*prompb.Sample{} + } + + } + defer req.ReturnToVTPool() + + //var appendSample *prompb.Sample + for _, d := range batch { + //fmt.Println("timeseries: ", req.Timeseries) + + switch d.sType { + case tSample: + //fmt.Println("sample: ", d) + //appendSample = prompb. + req.Timeseries[nSeries].Samples = append(req.Timeseries[nSeries].Samples, &prompb.Sample{ + Value: d.value, + Timestamp: d.timestamp, + }) + //fmt.Printf("\n a thing: %+v\n", req.Timeseries[nSeries]) + nPendingSamples++ + case tExemplar: + req.Timeseries[nSeries].Exemplars = append(req.Timeseries[nSeries].Exemplars, &prompb.Exemplar{ + Labels: labelsToLabelsProto(d.exemplarLabels, nil), + Value: d.value, + Timestamp: d.timestamp, + }) + //nPendingExemplars++ + case tHistogram: + req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram)) + //nPendingHistograms++ + case tFloatHistogram: + req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) + //nPendingHistograms++ + } + nSeries++ + } + var highest int64 + + pBuf, err := req.MarshalVT() + + for _, ts := range req.Timeseries { + ts.ReturnToVTPool() + } + if err != nil { + return nil, highest, err + } + + //if pBuf == nil { + // pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. + //} else { + // pBuf.Reset() + //} + //err := pBuf.Marshal(req) + //if err != nil { + // return nil, highest, err + //} + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + //os.Exit(1) + var buf []byte + if buf != nil { + buf = buf[0:cap(buf)] + } + compressed := snappy.Encode(buf, pBuf) + return compressed, highest, nil +} + +func buildWithoutMarshal(batch []timeSeries) prompb.WriteRequest { + var nPendingSamples int //, nPendingExemplars, nPendingHistograms int + nSeries := 0 + + req := prompb.WriteRequestFromVTPool() + if req.Timeseries == nil { + req.Timeseries = []*prompb.TimeSeries{} + } + if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil { + for i := len(req.Timeseries); i < len(batch); i++ { + req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool()) + req.Timeseries[nSeries].Samples = []*prompb.Sample{} + } + + } + defer req.ReturnToVTPool() + + //var appendSample *prompb.Sample + for _, d := range batch { + //fmt.Println("timeseries: ", req.Timeseries) + + switch d.sType { + case tSample: + //fmt.Println("sample: ", d) + //appendSample = prompb. + req.Timeseries[nSeries].Samples = append(req.Timeseries[nSeries].Samples, &prompb.Sample{ + Value: d.value, + Timestamp: d.timestamp, + }) + //fmt.Printf("\n a thing: %+v\n", req.Timeseries[nSeries]) + nPendingSamples++ + case tExemplar: + req.Timeseries[nSeries].Exemplars = append(req.Timeseries[nSeries].Exemplars, &prompb.Exemplar{ + Labels: labelsToLabelsProto(d.exemplarLabels, nil), + Value: d.value, + Timestamp: d.timestamp, + }) + //nPendingExemplars++ + case tHistogram: + req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram)) + //nPendingHistograms++ + case tFloatHistogram: + req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) + //nPendingHistograms++ + } + nSeries++ + } + return *req +} diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 5ac2c76bdb..902820db26 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1387,7 +1387,7 @@ func createDummyTimeSeries(instances int) []timeSeries { func BenchmarkBuildWriteRequest(b *testing.B) { bench := func(b *testing.B, batch []timeSeries) { - buff := make([]byte, 0) + var pBuf, buf []byte seriesBuff := make([]*prompb.TimeSeries, len(batch)) for i := range seriesBuff { seriesBuff[i] = &prompb.TimeSeries{ @@ -1397,20 +1397,19 @@ func BenchmarkBuildWriteRequest(b *testing.B) { //seriesBuff[i].Samples = []*prompb.Sample{{}} //seriesBuff[i].Exemplars = []*prompb.Exemplar{{}} } - pBuf := proto.NewBuffer(nil) //fmt.Printf("series buff: %+v\n", seriesBuff) //Warmup buffers for i := 0; i < 10; i++ { - populateTimeSeries(batch, seriesBuff, true, true) - buildWriteRequest(seriesBuff, nil, pBuf, buff) + //populateTimeSeries(batch, seriesBuff, true, true) + buildWriteRequest(batch, nil, &pBuf, &buf) } b.ResetTimer() totalSize := 0 for i := 0; i < b.N; i++ { - populateTimeSeries(batch, seriesBuff, true, true) - req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, buff) + //populateTimeSeries(batch, seriesBuff, true, true) + req, _, err := buildWriteRequest(batch, nil, &pBuf, &buf) if err != nil { b.Fatal(err) } @@ -1435,3 +1434,107 @@ func BenchmarkBuildWriteRequest(b *testing.B) { bench(b, hundred_batch) }) } + +func BenchmarkBuildWriteRequestOld(b *testing.B) { + bench := func(b *testing.B, batch []timeSeries) { + var buf []byte + pBuf := proto.NewBuffer(nil) + seriesBuff := make([]*prompb.TimeSeries, len(batch)) + for i := range seriesBuff { + seriesBuff[i] = &prompb.TimeSeries{ + Samples: []*prompb.Sample{{}}, + Exemplars: []*prompb.Exemplar{{}}, + } + //seriesBuff[i].Samples = []*prompb.Sample{{}} + //seriesBuff[i].Exemplars = []*prompb.Exemplar{{}} + } + + //fmt.Printf("series buff: %+v\n", seriesBuff) + //Warmup buffers + for i := 0; i < 10; i++ { + //populateTimeSeries(batch, seriesBuff, true, true) + buildWriteRequestOld(batch, nil, pBuf, buf) + } + + b.ResetTimer() + totalSize := 0 + for i := 0; i < b.N; i++ { + //populateTimeSeries(batch, seriesBuff, true, true) + req, _, err := buildWriteRequestOld(batch, nil, pBuf, buf) + if err != nil { + b.Fatal(err) + } + totalSize += len(req) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + } + } + + two_batch := createDummyTimeSeries(2) + ten_batch := createDummyTimeSeries(10) + hundred_batch := createDummyTimeSeries(100) + + b.Run("2 instances", func(b *testing.B) { + bench(b, two_batch) + }) + + b.Run("10 instances", func(b *testing.B) { + bench(b, ten_batch) + }) + + b.Run("1k instances", func(b *testing.B) { + bench(b, hundred_batch) + }) +} + +//func BenchmarkBuildVTWriteRequest(b *testing.B) { +// bench := func(b *testing.B, batch []timeSeries) { +// //buff := make([]byte, 0) +// seriesBuff := make([]*prompb.TimeSeries, len(batch)) +// for i := range seriesBuff { +// seriesBuff[i] = &prompb.TimeSeries{ +// Samples: []*prompb.Sample{{}}, +// Exemplars: []*prompb.Exemplar{{}}, +// } +// //seriesBuff[i].Samples = []*prompb.Sample{{}} +// //seriesBuff[i].Exemplars = []*prompb.Exemplar{{}} +// } +// //pBuf := []byte{} +// +// //fmt.Printf("series buff: %+v\n", seriesBuff) +// //Warmup buffers +// for i := 0; i < 10; i++ { +// //populateTimeSeries(batch, seriesBuff, true, true) +// buildVTWriteRequest(batch) +// } +// +// b.ResetTimer() +// totalSize := 0 +// for i := 0; i < b.N; i++ { +// //populateTimeSeries(batch, seriesBuff, true, true) +// //req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, buff) +// req, _, err := buildVTWriteRequest(batch) +// if err != nil { +// b.Fatal(err) +// } +// totalSize += len(req) +// b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") +// require.Equal(b, prompb.WriteRequestFromVTPool().UnmarshalVT(req), buildWithoutMarshal(batch)) +// } +// } +// +// two_batch := createDummyTimeSeries(2) +// ten_batch := createDummyTimeSeries(10) +// hundred_batch := createDummyTimeSeries(100) +// +// b.Run("2 instances", func(b *testing.B) { +// bench(b, two_batch) +// }) +// +// b.Run("10 instances", func(b *testing.B) { +// bench(b, ten_batch) +// }) +// +// b.Run("1k instances", func(b *testing.B) { +// bench(b, hundred_batch) +// }) +//} diff --git a/storage/remote/remote.test b/storage/remote/remote.test new file mode 100755 index 0000000000..b04022611e Binary files /dev/null and b/storage/remote/remote.test differ diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 2b28a0d077..4c875dff64 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -14,240 +14,227 @@ package remote import ( - "bytes" "context" "fmt" - "io" - "net/http" - "net/http/httptest" - "strconv" - "strings" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/stretchr/testify/require" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" ) -func TestRemoteWriteHandler(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{} - handler := NewWriteHandler(nil, nil, appendable) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusNoContent, resp.StatusCode) - - i := 0 - j := 0 - k := 0 - for _, ts := range writeRequestFixture.Timeseries { - labels := labelProtosToLabels(ts.Labels) - for _, s := range ts.Samples { - require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) - i++ - } - - for _, e := range ts.Exemplars { - exemplarLabels := labelProtosToLabels(e.Labels) - require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) - j++ - } - - for _, hp := range ts.Histograms { - if hp.IsFloatHistogram() { - fh := FloatHistogramProtoToFloatHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) - } else { - h := HistogramProtoToHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) - } - - k++ - } - } -} - -func TestOutOfOrderSample(t *testing.T) { - buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ - Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - latestSample: 100, - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) -} - -// This test case currently aims to verify that the WriteHandler endpoint -// don't fail on ingestion errors since the exemplar storage is -// still experimental. -func TestOutOfOrderExemplar(t *testing.T) { - buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ - Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - latestExemplar: 100, - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. - require.Equal(t, http.StatusNoContent, resp.StatusCode) -} - -func TestOutOfOrderHistogram(t *testing.T) { - buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ - Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())}, - }}, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - latestHistogram: 100, - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) -} - -func BenchmarkRemoteWritehandler(b *testing.B) { - const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" - reqs := []*http.Request{} - for i := 0; i < b.N; i++ { - num := strings.Repeat(strconv.Itoa(i), 16) - buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ - Labels: []*prompb.Label{ - {Name: "__name__", Value: "test_metric"}, - {Name: "test_label_name_" + num, Value: labelValue + num}, - }, - Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, - }}, nil, nil, nil) - require.NoError(b, err) - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(b, err) - reqs = append(reqs, req) - } - - appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - recorder := httptest.NewRecorder() - - b.ResetTimer() - for _, req := range reqs { - handler.ServeHTTP(recorder, req) - } -} - -func TestCommitErr(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - commitErr: fmt.Errorf("commit error"), - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.Equal(t, http.StatusInternalServerError, resp.StatusCode) - require.Equal(t, "commit error\n", string(body)) -} - -func BenchmarkRemoteWriteOOOSamples(b *testing.B) { - dir := b.TempDir() - - opts := tsdb.DefaultOptions() - opts.OutOfOrderCapMax = 30 - opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() - - db, err := tsdb.Open(dir, nil, nil, opts, nil) - require.NoError(b, err) - - b.Cleanup(func() { - require.NoError(b, db.Close()) - }) - - handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head()) - - buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) - require.NoError(b, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(b, err) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - require.Equal(b, http.StatusNoContent, recorder.Code) - require.Equal(b, db.Head().NumSeries(), uint64(1000)) - - var bufRequests [][]byte - for i := 0; i < 100; i++ { - buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil) - require.NoError(b, err) - bufRequests = append(bufRequests, buf) - } - - b.ResetTimer() - for i := 0; i < 100; i++ { - req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i])) - require.NoError(b, err) - - recorder = httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - require.Equal(b, http.StatusNoContent, recorder.Code) - require.Equal(b, db.Head().NumSeries(), uint64(1000)) - } -} +//func TestRemoteWriteHandler(t *testing.T) { +// buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) +// require.NoError(t, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(t, err) +// +// appendable := &mockAppendable{} +// handler := NewWriteHandler(nil, nil, appendable) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// +// resp := recorder.Result() +// require.Equal(t, http.StatusNoContent, resp.StatusCode) +// +// i := 0 +// j := 0 +// k := 0 +// for _, ts := range writeRequestFixture.Timeseries { +// labels := labelProtosToLabels(ts.Labels) +// for _, s := range ts.Samples { +// require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) +// i++ +// } +// +// for _, e := range ts.Exemplars { +// exemplarLabels := labelProtosToLabels(e.Labels) +// require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) +// j++ +// } +// +// for _, hp := range ts.Histograms { +// if hp.IsFloatHistogram() { +// fh := FloatHistogramProtoToFloatHistogram(hp) +// require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) +// } else { +// h := HistogramProtoToHistogram(hp) +// require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) +// } +// +// k++ +// } +// } +//} +// +//func TestOutOfOrderSample(t *testing.T) { +// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ +// Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}}, +// Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}}, +// }}, nil, nil, nil) +// require.NoError(t, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(t, err) +// +// appendable := &mockAppendable{ +// latestSample: 100, +// } +// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// +// resp := recorder.Result() +// require.Equal(t, http.StatusBadRequest, resp.StatusCode) +//} +// +//// This test case currently aims to verify that the WriteHandler endpoint +//// don't fail on ingestion errors since the exemplar storage is +//// still experimental. +//func TestOutOfOrderExemplar(t *testing.T) { +// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ +// Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}}, +// Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, +// }}, nil, nil, nil) +// require.NoError(t, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(t, err) +// +// appendable := &mockAppendable{ +// latestExemplar: 100, +// } +// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// +// resp := recorder.Result() +// // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. +// require.Equal(t, http.StatusNoContent, resp.StatusCode) +//} +// +//func TestOutOfOrderHistogram(t *testing.T) { +// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ +// Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}}, +// Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())}, +// }}, nil, nil, nil) +// require.NoError(t, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(t, err) +// +// appendable := &mockAppendable{ +// latestHistogram: 100, +// } +// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// +// resp := recorder.Result() +// require.Equal(t, http.StatusBadRequest, resp.StatusCode) +//} +// +//func BenchmarkRemoteWritehandler(b *testing.B) { +// const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" +// reqs := []*http.Request{} +// for i := 0; i < b.N; i++ { +// num := strings.Repeat(strconv.Itoa(i), 16) +// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{ +// Labels: []*prompb.Label{ +// {Name: "__name__", Value: "test_metric"}, +// {Name: "test_label_name_" + num, Value: labelValue + num}, +// }, +// Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, +// }}, nil, nil, nil) +// require.NoError(b, err) +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(b, err) +// reqs = append(reqs, req) +// } +// +// appendable := &mockAppendable{} +// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) +// recorder := httptest.NewRecorder() +// +// b.ResetTimer() +// for _, req := range reqs { +// handler.ServeHTTP(recorder, req) +// } +//} +// +//func TestCommitErr(t *testing.T) { +// buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) +// require.NoError(t, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(t, err) +// +// appendable := &mockAppendable{ +// commitErr: fmt.Errorf("commit error"), +// } +// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// +// resp := recorder.Result() +// body, err := io.ReadAll(resp.Body) +// require.NoError(t, err) +// require.Equal(t, http.StatusInternalServerError, resp.StatusCode) +// require.Equal(t, "commit error\n", string(body)) +//} +// +//func BenchmarkRemoteWriteOOOSamples(b *testing.B) { +// dir := b.TempDir() +// +// opts := tsdb.DefaultOptions() +// opts.OutOfOrderCapMax = 30 +// opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() +// +// db, err := tsdb.Open(dir, nil, nil, opts, nil) +// require.NoError(b, err) +// +// b.Cleanup(func() { +// require.NoError(b, db.Close()) +// }) +// +// handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head()) +// +// buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) +// require.NoError(b, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// require.NoError(b, err) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// require.Equal(b, http.StatusNoContent, recorder.Code) +// require.Equal(b, db.Head().NumSeries(), uint64(1000)) +// +// var bufRequests [][]byte +// for i := 0; i < 100; i++ { +// buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil) +// require.NoError(b, err) +// bufRequests = append(bufRequests, buf) +// } +// +// b.ResetTimer() +// for i := 0; i < 100; i++ { +// req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i])) +// require.NoError(b, err) +// +// recorder = httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// require.Equal(b, http.StatusNoContent, recorder.Code) +// require.Equal(b, db.Head().NumSeries(), uint64(1000)) +// } +//} func genSeriesWithSample(numSeries int, ts int64) []*prompb.TimeSeries { var series []*prompb.TimeSeries