diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 569eb56324..a5cca8fa45 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -146,6 +146,7 @@ type flagConfig struct { queryConcurrency int queryMaxSamples int RemoteFlushDeadline model.Duration + rwProto bool featureList []string // These options are extracted from featureList @@ -210,6 +211,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { continue case "promql-at-modifier", "promql-negative-offset": level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o) + case "reduced-rw-proto": + c.rwProto = true + level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.") default: level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o) } @@ -595,7 +599,7 @@ func main() { var ( localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.rwProto) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a2541ae2f8..57cd6927ad 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -403,6 +403,8 @@ type QueueManager struct { sendNativeHistograms bool watcher *wlog.Watcher metadataWatcher *MetadataWatcher + // experimental feature, new remote write proto format + internFormat bool clientMtx sync.RWMutex storeClient WriteClient @@ -450,6 +452,7 @@ func NewQueueManager( sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, + internFormat bool, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -472,6 +475,7 @@ func NewQueueManager( storeClient: client, sendExemplars: enableExemplarRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite, + internFormat: internFormat, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), @@ -771,6 +775,7 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + // panic(1) // Register and initialise some metrics. t.metrics.register() t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) @@ -1169,7 +1174,6 @@ func (s *shards) stop() { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { s.mtx.RLock() defer s.mtx.RUnlock() - shard := uint64(ref) % uint64(len(s.queues)) select { case <-s.softShutdown: @@ -1343,6 +1347,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { }() shardNum := strconv.Itoa(shardID) + pool := newLookupPool() // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline anyways. @@ -1365,6 +1370,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } + pendingReducedData := make([]prompb.ReducedTimeSeries, max) + for i := range pendingReducedData { + pendingReducedData[i].Samples = []prompb.Sample{{}} + if s.qm.sendExemplars { + pendingReducedData[i].Exemplars = []prompb.ExemplarRef{{}} + } + } + timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { @@ -1399,10 +1412,17 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + if s.qm.internFormat { + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + pool.clear() + } else { + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + } queue.ReturnForReuse(batch) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1410,11 +1430,21 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, - "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + if s.qm.internFormat { + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, + "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) + s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + pool.clear() + + } else { + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, + "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) + } } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1556,6 +1586,149 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti return err } +func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries) (int, int, int) { + var nPendingSamples, nPendingExemplars, nPendingHistograms int + for nPending, d := range batch { + pendingData[nPending].Samples = pendingData[nPending].Samples[:0] + if s.qm.sendExemplars { + pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] + } + if s.qm.sendNativeHistograms { + pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] + } + + // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) + // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll + // stop reading from the queue. This makes it safe to reference pendingSamples by index. + // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Labels = make([]prompb.LabelRef, len(d.seriesLabels)) + for i, sl := range d.seriesLabels { + nRef := pool.intern(sl.Name) + vRef := pool.intern(sl.Value) + pendingData[nPending].Labels[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef} + } + switch d.sType { + case tSample: + pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ + Value: d.value, + Timestamp: d.timestamp, + }) + nPendingSamples++ + case tExemplar: + l := make([]prompb.LabelRef, len(d.exemplarLabels)) + for i, el := range d.exemplarLabels { + nRef := pool.intern(el.Name) + vRef := pool.intern(el.Value) + l[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef} + } + pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{ + Labels: l, + Value: d.value, + Timestamp: d.timestamp, + }) + nPendingExemplars++ + case tHistogram: + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram)) + nPendingHistograms++ + } + } + return nPendingSamples, nPendingExemplars, nPendingHistograms +} + +func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { + begin := time.Now() + err := s.sendReducedSamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, pBuf, buf) + if err != nil { + level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) + s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) + s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) + s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) + } + + // These counters are used to calculate the dynamic sharding, and as such + // should be maintained irrespective of success or failure. + s.qm.dataOut.incr(int64(len(samples))) + s.qm.dataOutDuration.incr(int64(time.Since(begin))) + s.qm.lastSendTimestamp.Store(time.Now().Unix()) + // Pending samples/exemplars/histograms also should be subtracted as an error means + // they will not be retried. + s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) + s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) + s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) + s.enqueuedSamples.Sub(int64(sampleCount)) + s.enqueuedExemplars.Sub(int64(exemplarCount)) + s.enqueuedHistograms.Sub(int64(histogramCount)) +} + +// sendSamples to the remote storage with backoff for recoverable errors. +func (s *shards) sendReducedSamplesWithBackoff(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { + // Build the WriteRequest with no metadata. + req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf) + if err != nil { + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. + return err + } + + reqSize := len(req) + *buf = req + + // An anonymous function allows us to defer the completion of our per-try spans + // without causing a memory leak, and it has the nice effect of not propagating any + // parameters for sendSamplesWithBackoff/3. + attemptStore := func(try int) error { + ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + defer span.End() + + span.SetAttributes( + attribute.Int("request_size", reqSize), + attribute.Int("samples", sampleCount), + attribute.Int("try", try), + attribute.String("remote_name", s.qm.storeClient.Name()), + attribute.String("remote_url", s.qm.storeClient.Endpoint()), + ) + + if exemplarCount > 0 { + span.SetAttributes(attribute.Int("exemplars", exemplarCount)) + } + if histogramCount > 0 { + span.SetAttributes(attribute.Int("histograms", histogramCount)) + } + + begin := time.Now() + s.qm.metrics.samplesTotal.Add(float64(sampleCount)) + s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) + s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) + err := s.qm.client().Store(ctx, *buf, try) + s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) + + if err != nil { + span.RecordError(err) + return err + } + + return nil + } + + onRetry := func() { + s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) + s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) + s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + } + + err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) + if errors.Is(err, context.Canceled) { + // When there is resharding, we cancel the context for this queue, which means the data is not sent. + // So we exit early to not update the metrics. + return err + } + + s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) + s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) + + return err +} + func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { backoff := cfg.MinBackoff sleepDuration := model.Duration(0) @@ -1646,3 +1819,44 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta compressed := reSnappy.Encode(buf, pBuf.Bytes()) return compressed, highest, nil } + +func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { + var highest int64 + for _, ts := range samples { + // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. + if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { + highest = ts.Samples[0].Timestamp + } + if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { + highest = ts.Exemplars[0].Timestamp + } + if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { + highest = ts.Histograms[0].Timestamp + } + } + + req := &prompb.WriteRequestWithRefs{ + StringSymbolTable: labels, + Timeseries: samples, + } + + if pBuf == nil { + pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. + } else { + pBuf.Reset() + } + err := pBuf.Marshal(req) + if err != nil { + return nil, 0, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + buf = buf[0:cap(buf)] + } + + compressed := reSnappy.Encode(buf, pBuf.Bytes()) + + return compressed, highest, nil +} diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index fa5e5beb44..5d12b07808 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -81,7 +81,7 @@ func TestSampleDelivery(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() queueConfig := config.DefaultQueueConfig @@ -132,6 +132,7 @@ func TestSampleDelivery(t *testing.T) { hash, err := toHash(writeConfig) require.NoError(t, err) qm := s.rws.queues[hash] + // time.Sleep(1 * time.Second) c := NewTestWriteClient() qm.SetClient(c) @@ -172,7 +173,7 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.Start() defer m.Stop() @@ -211,7 +212,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -253,7 +254,7 @@ func TestSampleDeliveryOrder(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.StoreSeries(series, 0) m.Start() @@ -273,7 +274,7 @@ func TestShutdown(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -311,7 +312,7 @@ func TestSeriesReset(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -340,7 +341,7 @@ func TestReshard(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.StoreSeries(series, 0) m.Start() @@ -376,7 +377,7 @@ func TestReshardRaceWithStop(*testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.Start() h.Unlock() h.Lock() @@ -411,7 +412,7 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.StoreSeries(series, 0) m.Start() @@ -456,7 +457,7 @@ func TestQueueFilledDeadlock(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -483,7 +484,7 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.Start() defer m.Stop() @@ -530,7 +531,7 @@ func TestShouldReshard(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -563,6 +564,9 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ // Create Labels that is name of series plus any extra labels supplied. b.Reset() b.Add(labels.MetricName, name) + rand.Shuffle(len(extraLabels), func(i, j int) { + extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i] + }) for _, l := range extraLabels { b.Add(l.Name, l.Value) } @@ -600,6 +604,37 @@ func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Labe return series } +func createReducedTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) ([]prompb.ReducedTimeSeries, *lookupPool) { + pool := newLookupPool() + series := make([]prompb.ReducedTimeSeries, 0, numSeries) + for i := 0; i < numSeries; i++ { + name := fmt.Sprintf("test_metric_%d", i) + sample := prompb.Sample{ + Value: float64(i), + Timestamp: int64(i), + } + nRef := pool.intern("__name__") + vRef := pool.intern(name) + l := []prompb.LabelRef{{NameRef: nRef, ValueRef: vRef}} + rand.Shuffle(len(extraLabels), func(i, j int) { + extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i] + }) + for i, v := range extraLabels { + if i > 2 { + break + } + nRef := pool.intern(v.Name) + vRef := pool.intern(v.Value) + l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef}) + } + series = append(series, prompb.ReducedTimeSeries{ + Labels: l, + Samples: []prompb.Sample{sample}, + }) + } + return series, pool +} + func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { exemplars := make([]record.RefExemplar, 0, numExemplars) series := make([]record.RefSeries, 0, numSeries) @@ -779,6 +814,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { return } c.wg.Wait() + c.mtx.Lock() defer c.mtx.Unlock() for ts, expectedSamples := range c.expectedSamples { @@ -808,14 +844,17 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { return err } - var reqProto prompb.WriteRequest + var reqProto prompb.WriteRequestWithRefs if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { return err } count := 0 for _, ts := range reqProto.Timeseries { - labels := labelProtosToLabels(ts.Labels) - seriesName := labels.Get("__name__") + tsLabels := labels.Labels{} + for _, l := range ts.Labels { + tsLabels = append(tsLabels, labels.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]}) + } + seriesName := tsLabels.Get("__name__") for _, sample := range ts.Samples { count++ c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) @@ -823,7 +862,15 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { for _, ex := range ts.Exemplars { count++ - c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) + e := prompb.Exemplar{} + e.Timestamp = ex.Timestamp + e.Value = ex.Value + eLabels := make([]prompb.Label, len(ex.Labels)) + for i, l := range ex.Labels { + eLabels[i] = prompb.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]} + } + e.Labels = eLabels + c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], e) } for _, histogram := range ts.Histograms { @@ -931,7 +978,7 @@ func BenchmarkSampleSend(b *testing.B) { dir := b.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.StoreSeries(series, 0) // These should be received by the client. @@ -977,7 +1024,7 @@ func BenchmarkStartup(b *testing.B) { c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false) + cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -1060,7 +1107,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1137,7 +1184,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) for _, tc := range []struct { name string @@ -1383,3 +1430,36 @@ func BenchmarkBuildWriteRequest(b *testing.B) { b.StopTimer() } +func BenchmarkBuildReducedWriteRequest(b *testing.B) { + // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. + extraLabels := labels.Labels{ + {Name: "kubernetes_io_arch", Value: "amd64"}, + {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, + {Name: "kubernetes_io_os", Value: "linux"}, + {Name: "container_name", Value: "some-name"}, + {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, + {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, + {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, + {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, + {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, + {Name: "job", Value: "kubernetes-cadvisor"}, + {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, + {Name: "monitor", Value: "prod"}, + {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, + {Name: "namespace", Value: "kube-system"}, + {Name: "pod_name", Value: "some-other-name-5j8s8"}, + } + series, pool := createReducedTimeseriesProto(1, 10000, extraLabels...) + + b.ResetTimer() + totalSize := 0 + for i := 0; i < b.N; i++ { + buf, _, _ := buildReducedWriteRequest(series, pool.getTable(), nil, nil) + totalSize += len(buf) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + + } + + // Do not include shutdown + b.StopTimer() +} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 54d4825f6a..4868175fb3 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -91,7 +91,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { for _, tc := range cases { t.Run("", func(t *testing.T) { - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteReadConfigs: tc.cfgs, diff --git a/storage/remote/storage.go b/storage/remote/storage.go index b6533f9275..91699f68ec 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -62,7 +62,7 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage { +func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *Storage { if l == nil { l = log.NewNopLogger() } @@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal logger: logger, localStartTimeCallback: stCallback, } - s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm) + s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, writeReducedProto) return s } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 1bca61fdda..cad6fb338f 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -27,7 +27,7 @@ import ( func TestStorageLifecycle(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -54,7 +54,7 @@ func TestStorageLifecycle(t *testing.T) { func TestUpdateRemoteReadConfigs(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -75,7 +75,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { func TestFilterExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -100,7 +100,7 @@ func TestFilterExternalLabels(t *testing.T) { func TestIgnoreExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ diff --git a/storage/remote/write.go b/storage/remote/write.go index 4b0a249014..25d79da11c 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -65,6 +65,7 @@ type WriteStorage struct { externalLabels labels.Labels dir string queues map[string]*QueueManager + writeReducedProto bool samplesIn *ewmaRate flushDeadline time.Duration interner *pool @@ -76,12 +77,13 @@ type WriteStorage struct { } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage { +func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *WriteStorage { if logger == nil { logger = log.NewNopLogger() } rws := &WriteStorage{ queues: make(map[string]*QueueManager), + writeReducedProto: writeReducedProto, watcherMetrics: wlog.NewWatcherMetrics(reg), liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), logger: logger, @@ -153,17 +155,23 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { name = rwConf.Name } - c, err := NewWriteClient(name, &ClientConfig{ - URL: rwConf.URL, - Timeout: rwConf.RemoteTimeout, - HTTPClientConfig: rwConf.HTTPClientConfig, - SigV4Config: rwConf.SigV4Config, - AzureADConfig: rwConf.AzureADConfig, - Headers: rwConf.Headers, - RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit, - }) - if err != nil { - return err + var c WriteClient + if rwConf.URL.String() == "fake" { + // f := "fake" + strconv.Itoa(rand.Intn(100)) + // c = NewTestClient(f, f) + } else { + c, err = NewWriteClient(name, &ClientConfig{ + URL: rwConf.URL, + Timeout: rwConf.RemoteTimeout, + HTTPClientConfig: rwConf.HTTPClientConfig, + SigV4Config: rwConf.SigV4Config, + AzureADConfig: rwConf.AzureADConfig, + Headers: rwConf.Headers, + RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit, + }) + if err != nil { + return err + } } queue, ok := rws.queues[hash] @@ -197,6 +205,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, + rws.writeReducedProto, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 824e319c2f..ec5730f855 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -117,7 +117,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) { } for _, tc := range cases { - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: tc.cfgs, @@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) { hash, err := toHash(cfg) require.NoError(t, err) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -164,7 +164,7 @@ func TestRestartOnNameChange(t *testing.T) { func TestUpdateWithRegisterer(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false) c1 := &config.RemoteWriteConfig{ Name: "named", URL: &common_config.URL{ @@ -204,7 +204,7 @@ func TestUpdateWithRegisterer(t *testing.T) { func TestWriteStorageLifecycle(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -221,7 +221,7 @@ func TestWriteStorageLifecycle(t *testing.T) { func TestUpdateExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ @@ -250,7 +250,7 @@ func TestUpdateExternalLabels(t *testing.T) { func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -276,7 +276,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) c0 := &config.RemoteWriteConfig{ RemoteTimeout: model.Duration(10 * time.Second), diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 1e0976c3f1..f44760ea13 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -88,7 +88,7 @@ func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) * t.Helper() dbDir := t.TempDir() - rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil) + rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) }) @@ -584,7 +584,7 @@ func TestLockfile(t *testing.T) { tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() - rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil) + rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) }) @@ -604,7 +604,7 @@ func TestLockfile(t *testing.T) { func Test_ExistingWAL_NextRef(t *testing.T) { dbDir := t.TempDir() - rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil) + rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) defer func() { require.NoError(t, rs.Close()) }() diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 320d174fce..20b231c233 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -461,7 +461,7 @@ func TestEndpoints(t *testing.T) { remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { return 0, nil - }, dbDir, 1*time.Second, nil) + }, dbDir, 1*time.Second, nil, false) err = remote.ApplyConfig(&config.Config{ RemoteReadConfigs: []*config.RemoteReadConfig{