still wip, this is messy

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-11-28 16:56:48 -08:00
parent a2fdac1600
commit cbdb96fbd8
8 changed files with 669 additions and 314 deletions

View file

@ -783,14 +783,17 @@ func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels {
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelsProto(lbls labels.Labels, buf []*prompb.Label) []*prompb.Label {
result := buf[:0]
//result := buf[:0]
i := 0
lbls.Range(func(l labels.Label) {
result = append(result, &prompb.Label{
Name: l.Name,
Value: l.Value,
})
buf[i].Name = l.Name
buf[i].Value = l.Value
// Name: l.Name,
// Value: l.Value,
//})
i++
})
return result
return buf[:i]
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.

View file

@ -14,7 +14,6 @@
package remote
import (
"bytes"
"fmt"
"sync"
"testing"
@ -516,14 +515,14 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
}
}
func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
require.NoError(t, err)
require.Equal(t, writeRequestFixture, actual)
}
//func TestDecodeWriteRequest(t *testing.T) {
// buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
// require.NoError(t, err)
//
// actual, err := DecodeWriteRequest(bytes.NewReader(buf))
// require.NoError(t, err)
// require.Equal(t, writeRequestFixture, actual)
//}
func TestNilHistogramProto(*testing.T) {
// This function will panic if it impromperly handles nil

BIN
storage/remote/cpu.out Normal file

Binary file not shown.

BIN
storage/remote/mem.out Normal file

Binary file not shown.

View file

@ -17,7 +17,6 @@ import (
"context"
"errors"
"math"
"strconv"
"sync"
"time"
@ -512,14 +511,14 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
})
}
pBuf := proto.NewBuffer(nil)
//pBuf := proto.NewBuffer(nil)
numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend)))
for i := 0; i < numSends; i++ {
last := (i + 1) * t.mcfg.MaxSamplesPerSend
if last > len(metadata) {
last = len(metadata)
}
err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], pBuf)
err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], nil)
if err != nil {
t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend)))
level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err)
@ -527,7 +526,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
}
}
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer) error {
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []*prompb.MetricMetadata, pBuf *[]byte) error {
// Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil)
if err != nil {
@ -1342,28 +1341,27 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}()
shardNum := strconv.Itoa(shardID)
//shardNum := strconv.Itoa(shardID)
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
pBuf = proto.NewBuffer(nil)
buf []byte
pBuf, buf []byte
)
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
}
batchQueue := queue.Chan()
pendingData := make([]*prompb.TimeSeries, max)
for i := range pendingData {
pendingData[i].Samples = []*prompb.Sample{{}}
if s.qm.sendExemplars {
pendingData[i].Exemplars = []*prompb.Exemplar{{}}
}
}
//pendingData := make([]*prompb.TimeSeries, max)
//for i := range pendingData {
// pendingData[i].Samples = []*prompb.Sample{{}}
// if s.qm.sendExemplars {
// pendingData[i].Exemplars = []*prompb.Exemplar{{}}
// }
//}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
@ -1399,24 +1397,35 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok {
return
}
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
//nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, true, true)
//queue.ReturnForReuse(batch)
//n := nPendingSamples + nPendingExemplars + nPendingHistograms
//s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendSamples(ctx, batch, &pBuf, &buf)
//for _, p := range pendingData {
// for _, l := range p.Labels {
// l.ReturnToVTPool()
// }
// p.ReturnToVTPool()
//}
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
//nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, true, true)
//n := nPendingSamples + nPendingExemplars + nPendingHistograms
//level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
// "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
//s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendSamples(ctx, batch, &pBuf, &buf)
}
queue.ReturnForReuse(batch)
//for _, p := range pendingData {
// p.ReturnToVTPool()
//}
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
}
@ -1425,6 +1434,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
func populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending] = prompb.TimeSeriesFromVTPool()
//fmt.Println("pending:", pendingData[nPending])
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
@ -1437,7 +1447,26 @@ func populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries, se
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
if len(pendingData[nPending].Labels) < len(d.seriesLabels) {
//fmt.Println("preallocating buffer")
//fmt.Println("should preallocate")
//fmt.Println("len series labels", len(d.seriesLabels))
//fmt.Println("len pending labels", len(pendingData[nPending].Labels))
//pendingData[nPending].Labels = make([]*prompb.Label, len(d.seriesLabels))
lPending := len(pendingData[nPending].Labels)
for i := 0; i < (len(d.seriesLabels) - lPending); i++ {
//fmt.Println("grabbing from ppol")
pendingData[nPending].Labels = append(pendingData[nPending].Labels, prompb.LabelFromVTPool())
}
}
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
//defer func() {
// for _, d := range pendingData {
// for _, l := range d.Labels {
// l.ReturnToVTPool()
// }
// }
//}()
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, &prompb.Sample{
@ -1503,35 +1532,35 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []*prompb.Ti
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func (s *shards) sendSamples(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf)
func (s *shards) sendSamples(ctx context.Context, batch []timeSeries, pBuf, buf *[]byte) {
//begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, batch, pBuf, buf)
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
//level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
//s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
//s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
//s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
}
// These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.dataOut.incr(int64(len(samples)))
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars/histograms also should be subtracted, as an error means
// they will not be retried.
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount))
s.enqueuedHistograms.Sub(int64(histogramCount))
//s.qm.dataOut.incr(int64(len(samples)))
//s.qm.dataOutDuration.incr(int64(time.Since(begin)))
//s.qm.lastSendTimestamp.Store(time.Now().Unix())
//// Pending samples/exemplars/histograms also should be subtracted, as an error means
//// they will not be retried.
//s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
//s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
//s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
//s.enqueuedSamples.Sub(int64(sampleCount))
//s.enqueuedExemplars.Sub(int64(exemplarCount))
//s.enqueuedHistograms.Sub(int64(histogramCount))
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, batch []timeSeries, pBuf, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
req, highest, err := buildWriteRequest(batch, nil, pBuf, buf)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
@ -1550,23 +1579,23 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.T
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
//attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
//if exemplarCount > 0 {
// span.SetAttributes(attribute.Int("exemplars", exemplarCount))
//}
//if histogramCount > 0 {
// span.SetAttributes(attribute.Int("histograms", histogramCount))
//}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
//s.qm.metrics.samplesTotal.Add(float64(sampleCount))
//s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
//s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -1579,9 +1608,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.T
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
//s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
//s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
//s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
}
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
@ -1649,24 +1678,129 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}
func buildWriteRequest(samples []*prompb.TimeSeries, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
func buildWriteRequest(batch []timeSeries, metadata []*prompb.MetricMetadata, pBuf, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
var nPending, pendingSamples, pendingExemplars, pendingHistograms int
req := prompb.WriteRequestFromVTPool()
if req.Timeseries == nil {
req.Timeseries = []*prompb.TimeSeries{}
}
if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil {
for i := len(req.Timeseries); i < len(batch); i++ {
req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool())
req.Timeseries[i].Samples = []*prompb.Sample{}
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
defer func() {
for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
s.ReturnToVTPool()
}
ts.ReturnToVTPool()
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
req.ReturnToVTPool()
}()
for _, ts := range batch {
if ts.timestamp > highest {
highest = ts.timestamp
}
switch ts.sType {
case tSample:
s := prompb.SampleFromVTPool()
s.Timestamp = ts.timestamp
s.Value = ts.value
req.Timeseries[nPending].Samples = append(req.Timeseries[nPending].Samples, s)
pendingSamples++
case tExemplar:
req.Timeseries[nPending].Exemplars = append(req.Timeseries[nPending].Exemplars, &prompb.Exemplar{
Labels: labelsToLabelsProto(ts.exemplarLabels, nil),
Value: ts.value,
Timestamp: ts.timestamp,
})
pendingExemplars++
case tHistogram:
req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, HistogramToHistogramProto(ts.timestamp, ts.histogram))
pendingHistograms++
case tFloatHistogram:
req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, FloatHistogramToHistogramProto(ts.timestamp, ts.floatHistogram))
pendingHistograms++
}
}
req := &prompb.WriteRequest{
Timeseries: samples,
Metadata: metadata,
if len(*pBuf) < req.SizeVT() {
*pBuf = make([]byte, req.SizeVT())
}
d, err := req.MarshalToVT(*pBuf)
if err != nil {
return nil, highest, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, (*pBuf)[:d])
if n := snappy.MaxEncodedLen(d); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildWriteRequestOld(batch []timeSeries, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
var highest int64
var nPending, pendingSamples, pendingExemplars, pendingHistograms int
req := prompb.WriteRequestFromVTPool()
if req.Timeseries == nil {
req.Timeseries = []*prompb.TimeSeries{}
}
if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil {
for i := len(req.Timeseries); i < len(batch); i++ {
req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool())
req.Timeseries[i].Samples = []*prompb.Sample{}
}
}
defer func() {
for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
s.ReturnToVTPool()
}
ts.ReturnToVTPool()
}
req.ReturnToVTPool()
}()
for _, ts := range batch {
if ts.timestamp > highest {
highest = ts.timestamp
}
switch ts.sType {
case tSample:
s := prompb.SampleFromVTPool()
s.Timestamp = ts.timestamp
s.Value = ts.value
req.Timeseries[nPending].Samples = append(req.Timeseries[nPending].Samples, s)
pendingSamples++
case tExemplar:
req.Timeseries[nPending].Exemplars = append(req.Timeseries[nPending].Exemplars, &prompb.Exemplar{
Labels: labelsToLabelsProto(ts.exemplarLabels, nil),
Value: ts.value,
Timestamp: ts.timestamp,
})
pendingExemplars++
case tHistogram:
req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, HistogramToHistogramProto(ts.timestamp, ts.histogram))
pendingHistograms++
case tFloatHistogram:
req.Timeseries[nPending].Histograms = append(req.Timeseries[nPending].Histograms, FloatHistogramToHistogramProto(ts.timestamp, ts.floatHistogram))
pendingHistograms++
}
}
if pBuf == nil {
@ -1687,3 +1821,132 @@ func buildWriteRequest(samples []*prompb.TimeSeries, metadata []*prompb.MetricMe
compressed := snappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil
}
func buildVTWriteRequest(batch []timeSeries) ([]byte, int64, error) {
var nPendingSamples int //, nPendingExemplars, nPendingHistograms int
nSeries := 0
req := prompb.WriteRequestFromVTPool()
if req.Timeseries == nil {
req.Timeseries = []*prompb.TimeSeries{}
}
if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil {
for i := len(req.Timeseries); i < len(batch); i++ {
req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool())
req.Timeseries[nSeries].Samples = []*prompb.Sample{}
}
}
defer req.ReturnToVTPool()
//var appendSample *prompb.Sample
for _, d := range batch {
//fmt.Println("timeseries: ", req.Timeseries)
switch d.sType {
case tSample:
//fmt.Println("sample: ", d)
//appendSample = prompb.
req.Timeseries[nSeries].Samples = append(req.Timeseries[nSeries].Samples, &prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
//fmt.Printf("\n a thing: %+v\n", req.Timeseries[nSeries])
nPendingSamples++
case tExemplar:
req.Timeseries[nSeries].Exemplars = append(req.Timeseries[nSeries].Exemplars, &prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
//nPendingExemplars++
case tHistogram:
req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
//nPendingHistograms++
case tFloatHistogram:
req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
//nPendingHistograms++
}
nSeries++
}
var highest int64
pBuf, err := req.MarshalVT()
for _, ts := range req.Timeseries {
ts.ReturnToVTPool()
}
if err != nil {
return nil, highest, err
}
//if pBuf == nil {
// pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
//} else {
// pBuf.Reset()
//}
//err := pBuf.Marshal(req)
//if err != nil {
// return nil, highest, err
//}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
//os.Exit(1)
var buf []byte
if buf != nil {
buf = buf[0:cap(buf)]
}
compressed := snappy.Encode(buf, pBuf)
return compressed, highest, nil
}
func buildWithoutMarshal(batch []timeSeries) prompb.WriteRequest {
var nPendingSamples int //, nPendingExemplars, nPendingHistograms int
nSeries := 0
req := prompb.WriteRequestFromVTPool()
if req.Timeseries == nil {
req.Timeseries = []*prompb.TimeSeries{}
}
if len(req.Timeseries) < len(batch) { //|| req.Timeseries[nSeries] == nil {
for i := len(req.Timeseries); i < len(batch); i++ {
req.Timeseries = append(req.Timeseries, prompb.TimeSeriesFromVTPool())
req.Timeseries[nSeries].Samples = []*prompb.Sample{}
}
}
defer req.ReturnToVTPool()
//var appendSample *prompb.Sample
for _, d := range batch {
//fmt.Println("timeseries: ", req.Timeseries)
switch d.sType {
case tSample:
//fmt.Println("sample: ", d)
//appendSample = prompb.
req.Timeseries[nSeries].Samples = append(req.Timeseries[nSeries].Samples, &prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
//fmt.Printf("\n a thing: %+v\n", req.Timeseries[nSeries])
nPendingSamples++
case tExemplar:
req.Timeseries[nSeries].Exemplars = append(req.Timeseries[nSeries].Exemplars, &prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
//nPendingExemplars++
case tHistogram:
req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
//nPendingHistograms++
case tFloatHistogram:
req.Timeseries[nSeries].Histograms = append(req.Timeseries[nSeries].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
//nPendingHistograms++
}
nSeries++
}
return *req
}

View file

@ -1387,7 +1387,7 @@ func createDummyTimeSeries(instances int) []timeSeries {
func BenchmarkBuildWriteRequest(b *testing.B) {
bench := func(b *testing.B, batch []timeSeries) {
buff := make([]byte, 0)
var pBuf, buf []byte
seriesBuff := make([]*prompb.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i] = &prompb.TimeSeries{
@ -1397,20 +1397,19 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
//seriesBuff[i].Samples = []*prompb.Sample{{}}
//seriesBuff[i].Exemplars = []*prompb.Exemplar{{}}
}
pBuf := proto.NewBuffer(nil)
//fmt.Printf("series buff: %+v\n", seriesBuff)
//Warmup buffers
for i := 0; i < 10; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequest(seriesBuff, nil, pBuf, buff)
//populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequest(batch, nil, &pBuf, &buf)
}
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, buff)
//populateTimeSeries(batch, seriesBuff, true, true)
req, _, err := buildWriteRequest(batch, nil, &pBuf, &buf)
if err != nil {
b.Fatal(err)
}
@ -1435,3 +1434,107 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
bench(b, hundred_batch)
})
}
func BenchmarkBuildWriteRequestOld(b *testing.B) {
bench := func(b *testing.B, batch []timeSeries) {
var buf []byte
pBuf := proto.NewBuffer(nil)
seriesBuff := make([]*prompb.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i] = &prompb.TimeSeries{
Samples: []*prompb.Sample{{}},
Exemplars: []*prompb.Exemplar{{}},
}
//seriesBuff[i].Samples = []*prompb.Sample{{}}
//seriesBuff[i].Exemplars = []*prompb.Exemplar{{}}
}
//fmt.Printf("series buff: %+v\n", seriesBuff)
//Warmup buffers
for i := 0; i < 10; i++ {
//populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequestOld(batch, nil, pBuf, buf)
}
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
//populateTimeSeries(batch, seriesBuff, true, true)
req, _, err := buildWriteRequestOld(batch, nil, pBuf, buf)
if err != nil {
b.Fatal(err)
}
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
}
two_batch := createDummyTimeSeries(2)
ten_batch := createDummyTimeSeries(10)
hundred_batch := createDummyTimeSeries(100)
b.Run("2 instances", func(b *testing.B) {
bench(b, two_batch)
})
b.Run("10 instances", func(b *testing.B) {
bench(b, ten_batch)
})
b.Run("1k instances", func(b *testing.B) {
bench(b, hundred_batch)
})
}
//func BenchmarkBuildVTWriteRequest(b *testing.B) {
// bench := func(b *testing.B, batch []timeSeries) {
// //buff := make([]byte, 0)
// seriesBuff := make([]*prompb.TimeSeries, len(batch))
// for i := range seriesBuff {
// seriesBuff[i] = &prompb.TimeSeries{
// Samples: []*prompb.Sample{{}},
// Exemplars: []*prompb.Exemplar{{}},
// }
// //seriesBuff[i].Samples = []*prompb.Sample{{}}
// //seriesBuff[i].Exemplars = []*prompb.Exemplar{{}}
// }
// //pBuf := []byte{}
//
// //fmt.Printf("series buff: %+v\n", seriesBuff)
// //Warmup buffers
// for i := 0; i < 10; i++ {
// //populateTimeSeries(batch, seriesBuff, true, true)
// buildVTWriteRequest(batch)
// }
//
// b.ResetTimer()
// totalSize := 0
// for i := 0; i < b.N; i++ {
// //populateTimeSeries(batch, seriesBuff, true, true)
// //req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, buff)
// req, _, err := buildVTWriteRequest(batch)
// if err != nil {
// b.Fatal(err)
// }
// totalSize += len(req)
// b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
// require.Equal(b, prompb.WriteRequestFromVTPool().UnmarshalVT(req), buildWithoutMarshal(batch))
// }
// }
//
// two_batch := createDummyTimeSeries(2)
// ten_batch := createDummyTimeSeries(10)
// hundred_batch := createDummyTimeSeries(100)
//
// b.Run("2 instances", func(b *testing.B) {
// bench(b, two_batch)
// })
//
// b.Run("10 instances", func(b *testing.B) {
// bench(b, ten_batch)
// })
//
// b.Run("1k instances", func(b *testing.B) {
// bench(b, hundred_batch)
// })
//}

BIN
storage/remote/remote.test Executable file

Binary file not shown.

View file

@ -14,240 +14,227 @@
package remote
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
)
func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(nil, nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
i := 0
j := 0
k := 0
for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(ts.Labels)
for _, s := range ts.Samples {
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
}
}
func TestOutOfOrderSample(t *testing.T) {
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{
latestSample: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}
// This test case currently aims to verify that the WriteHandler endpoint
// don't fail on ingestion errors since the exemplar storage is
// still experimental.
func TestOutOfOrderExemplar(t *testing.T) {
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{
latestExemplar: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
// TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental.
require.Equal(t, http.StatusNoContent, resp.StatusCode)
}
func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
}}, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{
latestHistogram: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}
func BenchmarkRemoteWritehandler(b *testing.B) {
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
reqs := []*http.Request{}
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
reqs = append(reqs, req)
}
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
b.ResetTimer()
for _, req := range reqs {
handler.ServeHTTP(recorder, req)
}
}
func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{
commitErr: fmt.Errorf("commit error"),
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Equal(t, "commit error\n", string(body))
}
func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
dir := b.TempDir()
opts := tsdb.DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
db, err := tsdb.Open(dir, nil, nil, opts, nil)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, db.Close())
})
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
require.Equal(b, http.StatusNoContent, recorder.Code)
require.Equal(b, db.Head().NumSeries(), uint64(1000))
var bufRequests [][]byte
for i := 0; i < 100; i++ {
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err)
bufRequests = append(bufRequests, buf)
}
b.ResetTimer()
for i := 0; i < 100; i++ {
req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i]))
require.NoError(b, err)
recorder = httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
require.Equal(b, http.StatusNoContent, recorder.Code)
require.Equal(b, db.Head().NumSeries(), uint64(1000))
}
}
//func TestRemoteWriteHandler(t *testing.T) {
// buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(t, err)
//
// appendable := &mockAppendable{}
// handler := NewWriteHandler(nil, nil, appendable)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// require.Equal(t, http.StatusNoContent, resp.StatusCode)
//
// i := 0
// j := 0
// k := 0
// for _, ts := range writeRequestFixture.Timeseries {
// labels := labelProtosToLabels(ts.Labels)
// for _, s := range ts.Samples {
// require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
// i++
// }
//
// for _, e := range ts.Exemplars {
// exemplarLabels := labelProtosToLabels(e.Labels)
// require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
// j++
// }
//
// for _, hp := range ts.Histograms {
// if hp.IsFloatHistogram() {
// fh := FloatHistogramProtoToFloatHistogram(hp)
// require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
// } else {
// h := HistogramProtoToHistogram(hp)
// require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
// }
//
// k++
// }
// }
//}
//
//func TestOutOfOrderSample(t *testing.T) {
// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
// Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
// Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
// }}, nil, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(t, err)
//
// appendable := &mockAppendable{
// latestSample: 100,
// }
// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// require.Equal(t, http.StatusBadRequest, resp.StatusCode)
//}
//
//// This test case currently aims to verify that the WriteHandler endpoint
//// don't fail on ingestion errors since the exemplar storage is
//// still experimental.
//func TestOutOfOrderExemplar(t *testing.T) {
// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
// Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
// Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
// }}, nil, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(t, err)
//
// appendable := &mockAppendable{
// latestExemplar: 100,
// }
// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental.
// require.Equal(t, http.StatusNoContent, resp.StatusCode)
//}
//
//func TestOutOfOrderHistogram(t *testing.T) {
// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
// Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
// Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
// }}, nil, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(t, err)
//
// appendable := &mockAppendable{
// latestHistogram: 100,
// }
// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// require.Equal(t, http.StatusBadRequest, resp.StatusCode)
//}
//
//func BenchmarkRemoteWritehandler(b *testing.B) {
// const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
// reqs := []*http.Request{}
// for i := 0; i < b.N; i++ {
// num := strings.Repeat(strconv.Itoa(i), 16)
// buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
// Labels: []*prompb.Label{
// {Name: "__name__", Value: "test_metric"},
// {Name: "test_label_name_" + num, Value: labelValue + num},
// },
// Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
// }}, nil, nil, nil)
// require.NoError(b, err)
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(b, err)
// reqs = append(reqs, req)
// }
//
// appendable := &mockAppendable{}
// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
// recorder := httptest.NewRecorder()
//
// b.ResetTimer()
// for _, req := range reqs {
// handler.ServeHTTP(recorder, req)
// }
//}
//
//func TestCommitErr(t *testing.T) {
// buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(t, err)
//
// appendable := &mockAppendable{
// commitErr: fmt.Errorf("commit error"),
// }
// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// body, err := io.ReadAll(resp.Body)
// require.NoError(t, err)
// require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
// require.Equal(t, "commit error\n", string(body))
//}
//
//func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
// dir := b.TempDir()
//
// opts := tsdb.DefaultOptions()
// opts.OutOfOrderCapMax = 30
// opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
//
// db, err := tsdb.Open(dir, nil, nil, opts, nil)
// require.NoError(b, err)
//
// b.Cleanup(func() {
// require.NoError(b, db.Close())
// })
//
// handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())
//
// buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
// require.NoError(b, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(b, err)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
// require.Equal(b, http.StatusNoContent, recorder.Code)
// require.Equal(b, db.Head().NumSeries(), uint64(1000))
//
// var bufRequests [][]byte
// for i := 0; i < 100; i++ {
// buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil)
// require.NoError(b, err)
// bufRequests = append(bufRequests, buf)
// }
//
// b.ResetTimer()
// for i := 0; i < 100; i++ {
// req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i]))
// require.NoError(b, err)
//
// recorder = httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
// require.Equal(b, http.StatusNoContent, recorder.Code)
// require.Equal(b, db.Head().NumSeries(), uint64(1000))
// }
//}
func genSeriesWithSample(numSeries int, ts int64) []*prompb.TimeSeries {
var series []*prompb.TimeSeries