diff --git a/config/config.go b/config/config.go index 5c51d5a0d8..7d21ba004e 100644 --- a/config/config.go +++ b/config/config.go @@ -965,6 +965,10 @@ type MetadataConfig struct { SendInterval model.Duration `yaml:"send_interval"` // Maximum number of samples per send. MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` + // SendFromWAL controls whether we send metadata from the WAL + // TODO (@tpaschalis) Maybe this should also be the feature flag that + // disables the current MetadataWatcher? + SendFromWAL bool `yaml:"send_from_wal,omitempty"` } // RemoteReadConfig is the configuration for reading from remote storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3edd31b918..db4a1f5902 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" @@ -70,12 +71,14 @@ type queueManagerMetrics struct { droppedSamplesTotal prometheus.Counter droppedExemplarsTotal prometheus.Counter droppedHistogramsTotal prometheus.Counter + droppedMetadataTotal prometheus.Counter enqueueRetriesTotal prometheus.Counter sentBatchDuration prometheus.Histogram highestSentTimestamp *maxTimestamp pendingSamples prometheus.Gauge pendingExemplars prometheus.Gauge pendingHistograms prometheus.Gauge + pendingMetadata prometheus.Gauge shardCapacity prometheus.Gauge numShards prometheus.Gauge maxNumShards prometheus.Gauge @@ -763,6 +766,58 @@ outer: return true } +func (t *QueueManager) AppendWALMetadata(ms []record.RefMetadata) bool { + if !t.mcfg.SendFromWAL { + return true + } + +outer: + for _, m := range ms { + t.seriesMtx.Lock() + lbls, ok := t.seriesLabels[m.Ref] + if !ok { + t.metrics.droppedMetadataTotal.Inc() + // Track dropped exemplars in the same EWMA for sharding calc. + t.dataDropped.incr(1) + if _, ok := t.droppedSeries[m.Ref]; !ok { + level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", m.Ref) + } + t.seriesMtx.Unlock() + continue + } + t.seriesMtx.Unlock() + // This will only loop if the queues are being resharded. + backoff := t.cfg.MinBackoff + for { + select { + case <-t.quit: + return false + default: + } + if t.shards.enqueue(m.Ref, timeSeries{ + sType: tMetadata, + seriesLabels: lbls, // TODO (@tpaschalis) We take the labels here so we can refer to the metric's name on populateTimeSeries. There's probably a better way to do that. + metadata: &metadata.Metadata{ + Help: m.Help, + Unit: m.Unit, + Type: record.ToTextparseMetricType(m.Type), + }, + }) { + continue outer + } + + t.metrics.enqueueRetriesTotal.Inc() + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + if backoff > t.cfg.MaxBackoff { + backoff = t.cfg.MaxBackoff + } + } + } + + return true +} + // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { @@ -1073,6 +1128,7 @@ type shards struct { enqueuedSamples atomic.Int64 enqueuedExemplars atomic.Int64 enqueuedHistograms atomic.Int64 + enqueuedMetadata atomic.Int64 // Emulate a wait group with a channel and an atomic int, as you // cannot select on a wait group. @@ -1088,6 +1144,7 @@ type shards struct { samplesDroppedOnHardShutdown atomic.Uint32 exemplarsDroppedOnHardShutdown atomic.Uint32 histogramsDroppedOnHardShutdown atomic.Uint32 + metadataDroppedOnHardShutdown atomic.Uint32 } // start the shards; must be called before any call to enqueue. @@ -1184,6 +1241,9 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { case tHistogram, tFloatHistogram: s.qm.metrics.pendingHistograms.Inc() s.enqueuedHistograms.Inc() + case tMetadata: + s.qm.metrics.pendingMetadata.Inc() + s.enqueuedMetadata.Inc() } return true } @@ -1207,6 +1267,7 @@ type timeSeries struct { value float64 histogram *histogram.Histogram floatHistogram *histogram.FloatHistogram + metadata *metadata.Metadata timestamp int64 exemplarLabels labels.Labels // The type of series: sample, exemplar, or histogram. @@ -1220,6 +1281,7 @@ const ( tExemplar tHistogram tFloatHistogram + tMetadata ) func newQueue(batchSize, capacity int) *queue { @@ -1360,6 +1422,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } + pendingMetadata := make([]prompb.MetricMetadata, s.qm.mcfg.MaxSamplesPerSend) + timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { @@ -1379,25 +1443,28 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { droppedSamples := int(s.enqueuedSamples.Load()) droppedExemplars := int(s.enqueuedExemplars.Load()) droppedHistograms := int(s.enqueuedHistograms.Load()) + droppedMetadata := int(s.enqueuedMetadata.Load()) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars)) s.qm.metrics.pendingHistograms.Sub(float64(droppedHistograms)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars)) s.qm.metrics.failedHistogramsTotal.Add(float64(droppedHistograms)) + s.qm.metrics.failedMetadataTotal.Add(float64(droppedMetadata)) s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples)) s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars)) s.histogramsDroppedOnHardShutdown.Add(uint32(droppedHistograms)) + s.metadataDroppedOnHardShutdown.Add(uint32(droppedMetadata)) return case batch, ok := <-batchQueue: if !ok { return } - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata) queue.ReturnForReuse(batch) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1405,11 +1472,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata) n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) + s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf) } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1417,8 +1488,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } -func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) { - var nPendingSamples, nPendingExemplars, nPendingHistograms int +func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, pendingMetadata []prompb.MetricMetadata) (int, int, int, int) { + var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] if s.qm.sendExemplars { @@ -1452,19 +1523,26 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim case tFloatHistogram: pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) nPendingHistograms++ + case tMetadata: + pendingMetadata[nPendingMetadata].MetricFamilyName = d.seriesLabels.Get(labels.MetricName) + pendingMetadata[nPendingMetadata].Type = metricTypeToMetricTypeProto(d.metadata.Type) + pendingMetadata[nPendingMetadata].Help = d.metadata.Help + pendingMetadata[nPendingMetadata].Unit = d.metadata.Unit + nPendingMetadata++ } } - return nPendingSamples, nPendingExemplars, nPendingHistograms + return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) + err := s.sendSamplesWithBackoff(ctx, samples, metadata, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf) if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) + s.qm.metrics.failedMetadataTotal.Add(float64(metadataCount)) } // These counters are used to calculate the dynamic sharding, and as such @@ -1477,15 +1555,20 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) + s.qm.metrics.pendingMetadata.Sub(float64(metadataCount)) s.enqueuedSamples.Sub(int64(sampleCount)) s.enqueuedExemplars.Sub(int64(exemplarCount)) s.enqueuedHistograms.Sub(int64(histogramCount)) + s.enqueuedMetadata.Sub(int64(metadataCount)) } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { +// TODO(@tpaschalis) If we're going to reuse this method for metadata as well, +// we need a better name. +// TODO(@tpaschalis) Add metadata-specific metrics and attributes. +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error { // Build the WriteRequest with no metadata. - req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) + req, highest, err := buildWriteRequest(samples, metadata, pBuf, *buf) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. @@ -1521,6 +1604,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) + s.qm.metrics.metadataTotal.Add(float64(histogramCount)) err := s.qm.client().Store(ctx, *buf) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) @@ -1536,6 +1620,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + s.qm.metrics.retriedMetadataTotal.Add(float64(metadataCount)) } err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index b0c17dcbac..aa27eb8f01 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -51,6 +51,7 @@ type WriteTo interface { AppendExemplars([]record.RefExemplar) bool AppendHistograms([]record.RefHistogramSample) bool AppendFloatHistograms([]record.RefFloatHistogramSample) bool + AppendWALMetadata([]record.RefMetadata) bool StoreSeries([]record.RefSeries, int) // Next two methods are intended for garbage-collection: first we call @@ -488,6 +489,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { histogramsToSend []record.RefHistogramSample floatHistograms []record.RefFloatHistogramSample floatHistogramsToSend []record.RefFloatHistogramSample + metadata []record.RefMetadata ) for r.Next() && !isClosed(w.quit) { rec := r.Record() @@ -599,6 +601,14 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.writer.AppendFloatHistograms(floatHistogramsToSend) floatHistogramsToSend = floatHistogramsToSend[:0] } + + case record.Metadata: + meta, err := dec.Metadata(rec, metadata[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.AppendWALMetadata(meta) case record.Tombstones: default: diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 530d0ffb4a..7e1fb1cf7a 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -56,6 +56,7 @@ type writeToMock struct { exemplarsAppended int histogramsAppended int floatHistogramsAppended int + metadataAppended int seriesLock sync.Mutex seriesSegmentIndexes map[chunks.HeadSeriesRef]int } @@ -80,6 +81,11 @@ func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSampl return true } +func (wtm *writeToMock) AppendWALMetadata(m []record.RefMetadata) bool { + wtm.metadataAppended += len(m) + return true +} + func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { wtm.UpdateSeriesSegment(series, index) }