Approach bundling metadata along with samples and exemplars

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
This commit is contained in:
Paschalis Tsilias 2022-11-28 15:02:52 +02:00
parent 7cd9f8a340
commit ac83b3aaa9
4 changed files with 115 additions and 10 deletions

View file

@ -965,6 +965,10 @@ type MetadataConfig struct {
SendInterval model.Duration `yaml:"send_interval"` SendInterval model.Duration `yaml:"send_interval"`
// Maximum number of samples per send. // Maximum number of samples per send.
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
// SendFromWAL controls whether we send metadata from the WAL
// TODO (@tpaschalis) Maybe this should also be the feature flag that
// disables the current MetadataWatcher?
SendFromWAL bool `yaml:"send_from_wal,omitempty"`
} }
// RemoteReadConfig is the configuration for reading from remote storage. // RemoteReadConfig is the configuration for reading from remote storage.

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
@ -70,12 +71,14 @@ type queueManagerMetrics struct {
droppedSamplesTotal prometheus.Counter droppedSamplesTotal prometheus.Counter
droppedExemplarsTotal prometheus.Counter droppedExemplarsTotal prometheus.Counter
droppedHistogramsTotal prometheus.Counter droppedHistogramsTotal prometheus.Counter
droppedMetadataTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge pendingSamples prometheus.Gauge
pendingExemplars prometheus.Gauge pendingExemplars prometheus.Gauge
pendingHistograms prometheus.Gauge pendingHistograms prometheus.Gauge
pendingMetadata prometheus.Gauge
shardCapacity prometheus.Gauge shardCapacity prometheus.Gauge
numShards prometheus.Gauge numShards prometheus.Gauge
maxNumShards prometheus.Gauge maxNumShards prometheus.Gauge
@ -763,6 +766,58 @@ outer:
return true return true
} }
func (t *QueueManager) AppendWALMetadata(ms []record.RefMetadata) bool {
if !t.mcfg.SendFromWAL {
return true
}
outer:
for _, m := range ms {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[m.Ref]
if !ok {
t.metrics.droppedMetadataTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[m.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", m.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff
for {
select {
case <-t.quit:
return false
default:
}
if t.shards.enqueue(m.Ref, timeSeries{
sType: tMetadata,
seriesLabels: lbls, // TODO (@tpaschalis) We take the labels here so we can refer to the metric's name on populateTimeSeries. There's probably a better way to do that.
metadata: &metadata.Metadata{
Help: m.Help,
Unit: m.Unit,
Type: record.ToTextparseMetricType(m.Type),
},
}) {
continue outer
}
t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff
}
}
}
return true
}
// Start the queue manager sending samples to the remote storage. // Start the queue manager sending samples to the remote storage.
// Does not block. // Does not block.
func (t *QueueManager) Start() { func (t *QueueManager) Start() {
@ -1073,6 +1128,7 @@ type shards struct {
enqueuedSamples atomic.Int64 enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64 enqueuedExemplars atomic.Int64
enqueuedHistograms atomic.Int64 enqueuedHistograms atomic.Int64
enqueuedMetadata atomic.Int64
// Emulate a wait group with a channel and an atomic int, as you // Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group. // cannot select on a wait group.
@ -1088,6 +1144,7 @@ type shards struct {
samplesDroppedOnHardShutdown atomic.Uint32 samplesDroppedOnHardShutdown atomic.Uint32
exemplarsDroppedOnHardShutdown atomic.Uint32 exemplarsDroppedOnHardShutdown atomic.Uint32
histogramsDroppedOnHardShutdown atomic.Uint32 histogramsDroppedOnHardShutdown atomic.Uint32
metadataDroppedOnHardShutdown atomic.Uint32
} }
// start the shards; must be called before any call to enqueue. // start the shards; must be called before any call to enqueue.
@ -1184,6 +1241,9 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
case tHistogram, tFloatHistogram: case tHistogram, tFloatHistogram:
s.qm.metrics.pendingHistograms.Inc() s.qm.metrics.pendingHistograms.Inc()
s.enqueuedHistograms.Inc() s.enqueuedHistograms.Inc()
case tMetadata:
s.qm.metrics.pendingMetadata.Inc()
s.enqueuedMetadata.Inc()
} }
return true return true
} }
@ -1207,6 +1267,7 @@ type timeSeries struct {
value float64 value float64
histogram *histogram.Histogram histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram floatHistogram *histogram.FloatHistogram
metadata *metadata.Metadata
timestamp int64 timestamp int64
exemplarLabels labels.Labels exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram. // The type of series: sample, exemplar, or histogram.
@ -1220,6 +1281,7 @@ const (
tExemplar tExemplar
tHistogram tHistogram
tFloatHistogram tFloatHistogram
tMetadata
) )
func newQueue(batchSize, capacity int) *queue { func newQueue(batchSize, capacity int) *queue {
@ -1360,6 +1422,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
pendingMetadata := make([]prompb.MetricMetadata, s.qm.mcfg.MaxSamplesPerSend)
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() { stop := func() {
if !timer.Stop() { if !timer.Stop() {
@ -1379,25 +1443,28 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
droppedSamples := int(s.enqueuedSamples.Load()) droppedSamples := int(s.enqueuedSamples.Load())
droppedExemplars := int(s.enqueuedExemplars.Load()) droppedExemplars := int(s.enqueuedExemplars.Load())
droppedHistograms := int(s.enqueuedHistograms.Load()) droppedHistograms := int(s.enqueuedHistograms.Load())
droppedMetadata := int(s.enqueuedMetadata.Load())
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars)) s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars))
s.qm.metrics.pendingHistograms.Sub(float64(droppedHistograms)) s.qm.metrics.pendingHistograms.Sub(float64(droppedHistograms))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars)) s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars))
s.qm.metrics.failedHistogramsTotal.Add(float64(droppedHistograms)) s.qm.metrics.failedHistogramsTotal.Add(float64(droppedHistograms))
s.qm.metrics.failedMetadataTotal.Add(float64(droppedMetadata))
s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples)) s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples))
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars)) s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
s.histogramsDroppedOnHardShutdown.Add(uint32(droppedHistograms)) s.histogramsDroppedOnHardShutdown.Add(uint32(droppedHistograms))
s.metadataDroppedOnHardShutdown.Add(uint32(droppedMetadata))
return return
case batch, ok := <-batchQueue: case batch, ok := <-batchQueue:
if !ok { if !ok {
return return
} }
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata)
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
stop() stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1405,11 +1472,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C: case <-timer.C:
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1417,8 +1488,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) { func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, pendingMetadata []prompb.MetricMetadata) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars { if s.qm.sendExemplars {
@ -1452,19 +1523,26 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
case tFloatHistogram: case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++ nPendingHistograms++
case tMetadata:
pendingMetadata[nPendingMetadata].MetricFamilyName = d.seriesLabels.Get(labels.MetricName)
pendingMetadata[nPendingMetadata].Type = metricTypeToMetricTypeProto(d.metadata.Type)
pendingMetadata[nPendingMetadata].Help = d.metadata.Help
pendingMetadata[nPendingMetadata].Unit = d.metadata.Unit
nPendingMetadata++
} }
} }
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now() begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) err := s.sendSamplesWithBackoff(ctx, samples, metadata, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf)
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
s.qm.metrics.failedMetadataTotal.Add(float64(metadataCount))
} }
// These counters are used to calculate the dynamic sharding, and as such // These counters are used to calculate the dynamic sharding, and as such
@ -1477,15 +1555,20 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
s.qm.metrics.pendingMetadata.Sub(float64(metadataCount))
s.enqueuedSamples.Sub(int64(sampleCount)) s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount)) s.enqueuedExemplars.Sub(int64(exemplarCount))
s.enqueuedHistograms.Sub(int64(histogramCount)) s.enqueuedHistograms.Sub(int64(histogramCount))
s.enqueuedMetadata.Sub(int64(metadataCount))
} }
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { // TODO(@tpaschalis) If we're going to reuse this method for metadata as well,
// we need a better name.
// TODO(@tpaschalis) Add metadata-specific metrics and attributes.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) req, highest, err := buildWriteRequest(samples, metadata, pBuf, *buf)
if err != nil { if err != nil {
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
@ -1521,6 +1604,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(histogramCount))
err := s.qm.client().Store(ctx, *buf) err := s.qm.client().Store(ctx, *buf)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -1536,6 +1620,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
s.qm.metrics.retriedMetadataTotal.Add(float64(metadataCount))
} }
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)

View file

@ -51,6 +51,7 @@ type WriteTo interface {
AppendExemplars([]record.RefExemplar) bool AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogramSample) bool AppendHistograms([]record.RefHistogramSample) bool
AppendFloatHistograms([]record.RefFloatHistogramSample) bool AppendFloatHistograms([]record.RefFloatHistogramSample) bool
AppendWALMetadata([]record.RefMetadata) bool
StoreSeries([]record.RefSeries, int) StoreSeries([]record.RefSeries, int)
// Next two methods are intended for garbage-collection: first we call // Next two methods are intended for garbage-collection: first we call
@ -488,6 +489,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
histogramsToSend []record.RefHistogramSample histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample floatHistogramsToSend []record.RefFloatHistogramSample
metadata []record.RefMetadata
) )
for r.Next() && !isClosed(w.quit) { for r.Next() && !isClosed(w.quit) {
rec := r.Record() rec := r.Record()
@ -599,6 +601,14 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.AppendFloatHistograms(floatHistogramsToSend) w.writer.AppendFloatHistograms(floatHistogramsToSend)
floatHistogramsToSend = floatHistogramsToSend[:0] floatHistogramsToSend = floatHistogramsToSend[:0]
} }
case record.Metadata:
meta, err := dec.Metadata(rec, metadata[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.AppendWALMetadata(meta)
case record.Tombstones: case record.Tombstones:
default: default:

View file

@ -56,6 +56,7 @@ type writeToMock struct {
exemplarsAppended int exemplarsAppended int
histogramsAppended int histogramsAppended int
floatHistogramsAppended int floatHistogramsAppended int
metadataAppended int
seriesLock sync.Mutex seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int seriesSegmentIndexes map[chunks.HeadSeriesRef]int
} }
@ -80,6 +81,11 @@ func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSampl
return true return true
} }
func (wtm *writeToMock) AppendWALMetadata(m []record.RefMetadata) bool {
wtm.metadataAppended += len(m)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.UpdateSeriesSegment(series, index) wtm.UpdateSeriesSegment(series, index)
} }