Drop old inmemory samples (#13002)

* Drop old inmemory samples

Co-authored-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Avoid copying timeseries when the feature is disabled

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Run gofmt

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Clarify docs

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Add more logging info

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Remove loggers

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* optimize function and add tests

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Simplify filter

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* rename var

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Update help info from metrics

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* use metrics to keep track of drop elements during buildWriteRequest

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* rename var in tests

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* pass time.Now as parameter

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Change buildwriterequest during retries

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Revert "Remove loggers"

This reverts commit 54f91dfcae20488944162335ab4ad8be459df1ab.

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* use log level debug for loggers

Signed-off-by: Marc Tuduri <marctc@protonmail.com>

* Fix linter

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Remove noisy debug-level logs; add 'reason' label to drop metrics

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Remove accidentally committed files

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Propagate logger to buildWriteRequest to log dropped data

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Fix docs comment

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Make drop reason more specific

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Remove unnecessary pass of logger

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Use snake_case for reason label

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

* Fix dropped samples metric

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>

---------

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Signed-off-by: Marc Tuduri <marctc@protonmail.com>
Signed-off-by: Paschalis Tsilias <tpaschalis@users.noreply.github.com>
Co-authored-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Co-authored-by: Paschalis Tsilias <tpaschalis@users.noreply.github.com>
This commit is contained in:
Marc Tudurí 2024-01-05 19:40:30 +01:00 committed by GitHub
parent 6d2e0a7424
commit 78c5ce3196
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 449 additions and 46 deletions

View file

@ -1124,6 +1124,9 @@ type QueueConfig struct {
MinBackoff model.Duration `yaml:"min_backoff,omitempty"` MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"` MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"` RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"`
// Samples older than the limit will be dropped.
SampleAgeLimit model.Duration `yaml:"sample_age_limit,omitempty"`
} }
// MetadataConfig is the configuration for sending metadata to remote // MetadataConfig is the configuration for sending metadata to remote

View file

@ -3619,6 +3619,10 @@ queue_config:
# Retry upon receiving a 429 status code from the remote-write storage. # Retry upon receiving a 429 status code from the remote-write storage.
# This is experimental and might change in the future. # This is experimental and might change in the future.
[ retry_on_http_429: <boolean> | default = false ] [ retry_on_http_429: <boolean> | default = false ]
# If set, any sample that is older than sample_age_limit
# will not be sent to the remote storage. The default value is 0s,
# which means that all samples are sent.
[ sample_age_limit: <duration> | default = 0s ]
# Configures the sending of series metadata to remote storage. # Configures the sending of series metadata to remote storage.
# Metadata configuration is subject to change at any point # Metadata configuration is subject to change at any point

View file

@ -517,7 +517,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
} }
func TestDecodeWriteRequest(t *testing.T) { func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf)) actual, err := DecodeWriteRequest(bytes.NewReader(buf))

View file

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
@ -51,6 +52,10 @@ const (
// Allow 30% too many shards before scaling down. // Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3 shardToleranceFraction = 0.3
reasonTooOld = "too_old"
reasonDroppedSeries = "dropped_series"
reasonUnintentionalDroppedSeries = "unintentionally_dropped_series"
) )
type queueManagerMetrics struct { type queueManagerMetrics struct {
@ -68,9 +73,9 @@ type queueManagerMetrics struct {
retriedExemplarsTotal prometheus.Counter retriedExemplarsTotal prometheus.Counter
retriedHistogramsTotal prometheus.Counter retriedHistogramsTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter retriedMetadataTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter droppedSamplesTotal *prometheus.CounterVec
droppedExemplarsTotal prometheus.Counter droppedExemplarsTotal *prometheus.CounterVec
droppedHistogramsTotal prometheus.Counter droppedHistogramsTotal *prometheus.CounterVec
enqueueRetriesTotal prometheus.Counter enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp highestSentTimestamp *maxTimestamp
@ -180,27 +185,27 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.", Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) })
m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.droppedSamplesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "samples_dropped_total", Name: "samples_dropped_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) }, []string{"reason"})
m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.droppedExemplarsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "exemplars_dropped_total", Name: "exemplars_dropped_total",
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) }, []string{"reason"})
m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.droppedHistogramsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "histograms_dropped_total", Name: "histograms_dropped_total",
Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels, ConstLabels: constLabels,
}) }, []string{"reason"})
m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -391,7 +396,8 @@ type WriteClient interface {
// indicated by the provided WriteClient. Implements writeTo interface // indicated by the provided WriteClient. Implements writeTo interface
// used by WAL Watcher. // used by WAL Watcher.
type QueueManager struct { type QueueManager struct {
lastSendTimestamp atomic.Int64 lastSendTimestamp atomic.Int64
buildRequestLimitTimestamp atomic.Int64
logger log.Logger logger log.Logger
flushDeadline time.Duration flushDeadline time.Duration
@ -529,7 +535,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples. // Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil)
if err != nil { if err != nil {
return err return err
} }
@ -575,18 +581,65 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
return nil return nil
} }
func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
limitTs := baseTime.Add(-sampleAgeLimit)
sampleTs := timestamp.Time(ts)
return sampleTs.Before(limitTs)
}
func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
default:
return false
}
return false
}
}
// Append queues a sample to be sent to the remote storage. Blocks until all samples are // Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received. // enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool { func (t *QueueManager) Append(samples []record.RefSample) bool {
currentTime := time.Now()
outer: outer:
for _, s := range samples { for _, s := range samples {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref] lbls, ok := t.seriesLabels[s.Ref]
if !ok { if !ok {
t.metrics.droppedSamplesTotal.Inc()
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok { if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc()
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
@ -629,17 +682,23 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
if !t.sendExemplars { if !t.sendExemplars {
return true return true
} }
currentTime := time.Now()
outer: outer:
for _, e := range exemplars { for _, e := range exemplars {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref] lbls, ok := t.seriesLabels[e.Ref]
if !ok { if !ok {
t.metrics.droppedExemplarsTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc. // Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok { if _, ok := t.droppedSeries[e.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref)
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonDroppedSeries).Inc()
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
@ -678,16 +737,22 @@ func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample)
if !t.sendNativeHistograms { if !t.sendNativeHistograms {
return true return true
} }
currentTime := time.Now()
outer: outer:
for _, h := range histograms { for _, h := range histograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref] lbls, ok := t.seriesLabels[h.Ref]
if !ok { if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok { if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc()
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
@ -725,16 +790,22 @@ func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHi
if !t.sendNativeHistograms { if !t.sendNativeHistograms {
return true return true
} }
currentTime := time.Now()
outer: outer:
for _, h := range floatHistograms { for _, h := range floatHistograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref] lbls, ok := t.seriesLabels[h.Ref]
if !ok { if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok { if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc()
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
@ -1490,7 +1561,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, *buf, nil)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil { if err != nil {
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
@ -1504,6 +1576,25 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
// without causing a memory leak, and it has the nice effect of not propagating any // without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3. // parameters for sendSamplesWithBackoff/3.
attemptStore := func(try int) error { attemptStore := func(try int) error {
currentTime := time.Now()
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildWriteRequest(
s.qm.logger,
samples,
nil,
pBuf,
*buf,
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
return err
}
*buf = req
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
defer span.End() defer span.End()
@ -1608,9 +1699,27 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
} }
} }
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64 var highest int64
for _, ts := range samples { var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp highest = ts.Samples[0].Timestamp
@ -1621,10 +1730,37 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp highest = ts.Histograms[0].Timestamp
} }
// Get lowest timestamp
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
} }
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{
Timeseries: samples, Timeseries: timeSeries,
Metadata: metadata, Metadata: metadata,
} }
@ -1635,7 +1771,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
} }
err := pBuf.Marshal(req) err := pBuf.Marshal(req)
if err != nil { if err != nil {
return nil, highest, err return nil, highest, lowest, err
} }
// snappy uses len() to see if it needs to allocate a new slice. Make the // snappy uses len() to see if it needs to allocate a new slice. Make the
@ -1644,5 +1780,5 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
buf = buf[0:cap(buf)] buf = buf[0:cap(buf)]
} }
compressed := snappy.Encode(buf, pBuf.Bytes()) compressed := snappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil return compressed, highest, lowest, nil
} }

View file

@ -548,7 +548,7 @@ func TestShouldReshard(t *testing.T) {
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) { func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples) samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries) series := make([]record.RefSeries, 0, numSeries)
b := labels.ScratchBuilder{} lb := labels.ScratchBuilder{}
for i := 0; i < numSeries; i++ { for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i) name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ { for j := 0; j < numSamples; j++ {
@ -559,15 +559,15 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
}) })
} }
// Create Labels that is name of series plus any extra labels supplied. // Create Labels that is name of series plus any extra labels supplied.
b.Reset() lb.Reset()
b.Add(labels.MetricName, name) lb.Add(labels.MetricName, name)
for _, l := range extraLabels { for _, l := range extraLabels {
b.Add(l.Name, l.Value) lb.Add(l.Name, l.Value)
} }
b.Sort() lb.Sort()
series = append(series, record.RefSeries{ series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i), Ref: chunks.HeadSeriesRef(i),
Labels: b.Labels(), Labels: lb.Labels(),
}) })
} }
return samples, series return samples, series
@ -1321,3 +1321,263 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
t.FailNow() t.FailNow()
} }
} }
func TestDropOldTimeSeries(t *testing.T) {
size := 10
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
c := NewTestWriteClient()
c.expectSamples(newSamples, series)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
m.Append(samples)
c.waitForExpectedData(t)
}
func TestIsSampleOld(t *testing.T) {
currentTime := time.Now()
require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second))))
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
}
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
newSamples := make([]record.RefSample, 0, numSamples)
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.ScratchBuilder{}
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
// We create half of the samples in the past.
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
for j := 0; j < numSamples/2; j++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: past + int64(j),
V: float64(i),
})
}
for j := 0; j < numSamples/2; j++ {
sample := record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(int(time.Now().UnixMilli()) + j),
V: float64(i),
}
samples = append(samples, sample)
newSamples = append(newSamples, sample)
}
// Create Labels that is name of series plus any extra labels supplied.
lb.Reset()
lb.Add(labels.MetricName, name)
for _, l := range extraLabels {
lb.Add(l.Name, l.Value)
}
lb.Sort()
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: lb.Labels(),
})
}
return samples, newSamples, series
}
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {
return limit > ts.Samples[0].Timestamp
}
func TestBuildTimeSeries(t *testing.T) {
testCases := []struct {
name string
ts []prompb.TimeSeries
filter func(ts prompb.TimeSeries) bool
lowestTs int64
highestTs int64
droppedSamples int
responseLen int
}{
{
name: "No filter applied",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.34,
},
},
},
},
filter: nil,
responseLen: 3,
lowestTs: 1234567890,
highestTs: 1234567892,
},
{
name: "Filter applied, samples in order",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567893,
Value: 3.45,
},
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
droppedSamples: 2,
},
{
name: "Filter applied, samples out of order",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567893,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
},
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
droppedSamples: 2,
},
{
name: "Filter applied, samples not consecutive",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567895,
Value: 6.78,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567897,
Value: 6.78,
},
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
responseLen: 2,
lowestTs: 1234567895,
highestTs: 1234567897,
droppedSamples: 2,
},
}
// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Len(t, result, tc.responseLen)
require.Equal(t, tc.highestTs, highest)
require.Equal(t, tc.lowestTs, lowest)
require.Equal(t, tc.droppedSamples, droppedSamples)
})
}
}

View file

@ -38,7 +38,7 @@ import (
) )
func TestRemoteWriteHandler(t *testing.T) { func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -84,10 +84,10 @@ func TestRemoteWriteHandler(t *testing.T) {
} }
func TestOutOfOrderSample(t *testing.T) { func TestOutOfOrderSample(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -109,10 +109,10 @@ func TestOutOfOrderSample(t *testing.T) {
// don't fail on ingestion errors since the exemplar storage is // don't fail on ingestion errors since the exemplar storage is
// still experimental. // still experimental.
func TestOutOfOrderExemplar(t *testing.T) { func TestOutOfOrderExemplar(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -132,10 +132,10 @@ func TestOutOfOrderExemplar(t *testing.T) {
} }
func TestOutOfOrderHistogram(t *testing.T) { func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
}}, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -158,13 +158,13 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
reqs := []*http.Request{} reqs := []*http.Request{}
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16) num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{ Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric"}, {Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num}, {Name: "test_label_name_" + num, Value: labelValue + num},
}, },
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(b, err) require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err) require.NoError(b, err)
@ -182,7 +182,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
} }
func TestCommitErr(t *testing.T) { func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -219,7 +219,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head()) handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err) require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -232,7 +232,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
var bufRequests [][]byte var bufRequests [][]byte
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil) buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err) require.NoError(b, err)
bufRequests = append(bufRequests, buf) bufRequests = append(bufRequests, buf)
} }