diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 1b74b2ea4..d19f6b41f 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -21,7 +21,8 @@ import ( type nopAppender struct{} -func (a nopAppender) Append(*model.Sample) { +func (a nopAppender) Append(*model.Sample) error { + return nil } func (a nopAppender) NeedsThrottling() bool { @@ -33,13 +34,14 @@ type collectResultAppender struct { throttled bool } -func (a *collectResultAppender) Append(s *model.Sample) { +func (a *collectResultAppender) Append(s *model.Sample) error { for ln, lv := range s.Metric { if len(lv) == 0 { delete(s.Metric, ln) } } a.result = append(a.result, s) + return nil } func (a *collectResultAppender) NeedsThrottling() bool { diff --git a/retrieval/target.go b/retrieval/target.go index 133009f4c..bf9adb0c6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) @@ -449,15 +450,30 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - var samples model.Vector + var ( + samples model.Vector + numOutOfOrder int + logger = log.With("target", t.InstanceIdentifier()) + ) for { if err = sdec.Decode(&samples); err != nil { break } for _, s := range samples { - appender.Append(s) + err := appender.Append(s) + if err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + logger.With("sample", s).Warnf("Error inserting sample: %s", err) + } + } + } } + if numOutOfOrder > 0 { + logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + } if err == io.EOF { return nil @@ -472,7 +488,7 @@ type ruleLabelsAppender struct { labels model.LabelSet } -func (app ruleLabelsAppender) Append(s *model.Sample) { +func (app ruleLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if v, ok := s.Metric[ln]; ok && v != "" { s.Metric[model.ExportedLabelPrefix+ln] = v @@ -480,7 +496,7 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } type honorLabelsAppender struct { @@ -491,14 +507,14 @@ type honorLabelsAppender struct { // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Append(s *model.Sample) { +func (app honorLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if _, ok := s.Metric[ln]; !ok { s.Metric[ln] = lv } } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric @@ -508,19 +524,18 @@ type relabelAppender struct { relabelings []*config.RelabelConfig } -func (app relabelAppender) Append(s *model.Sample) { +func (app relabelAppender) Append(s *model.Sample) error { labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) if err != nil { - log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) - return + return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err) } // Check if the timeseries was dropped. if labels == nil { - return + return nil } s.Metric = model.Metric(labels) - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/storage/local/interface.go b/storage/local/interface.go index e260cc762..454c2d9d5 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -33,7 +33,7 @@ type Storage interface { // processing.) The implementation might remove labels with empty value // from the provided Sample as those labels are considered equivalent to // a label not present at all. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the Storage has too many chunks in memory // already or has too many chunks waiting for persistence. NeedsThrottling() bool diff --git a/storage/local/storage.go b/storage/local/storage.go index 29700163a..62453d89d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -567,8 +567,10 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") + // Append implements Storage. -func (s *memorySeriesStorage) Append(sample *model.Sample) { +func (s *memorySeriesStorage) Append(sample *model.Sample) error { for ln, lv := range sample.Metric { if len(lv) == 0 { delete(sample.Metric, ln) @@ -589,16 +591,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { series := s.getOrCreateSeries(fp, sample.Metric) if sample.Timestamp <= series.lastTime { + s.fpLocker.Unlock(fp) // Don't log and track equal timestamps, as they are a common occurrence // when using client-side timestamps (e.g. Pushgateway or federation). // It would be even better to also compare the sample values here, but // we don't have efficient access to a series's last value. if sample.Timestamp != series.lastTime { - log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) s.outOfOrderSamplesCount.Inc() + return ErrOutOfOrderSample } - s.fpLocker.Unlock(fp) - return + return nil } completedChunksCount := series.add(&model.SamplePair{ Value: sample.Value, @@ -607,6 +609,8 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.fpLocker.Unlock(fp) s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) + + return nil } // NeedsThrottling implements Storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ae3528c7d..4ed739707 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -133,13 +133,15 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. -func (t *StorageQueueManager) Append(s *model.Sample) { +// Always returns nil. +func (t *StorageQueueManager) Append(s *model.Sample) error { select { case t.queue <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") } + return nil } // Stop stops sending samples to the remote storage and waits for pending diff --git a/storage/remote/remote.go b/storage/remote/remote.go index d295f8f37..6c0ddba9d 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -104,8 +104,8 @@ func (s *Storage) Stop() { } } -// Append implements storage.SampleAppender. -func (s *Storage) Append(smpl *model.Sample) { +// Append implements storage.SampleAppender. Always returns nil. +func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() var snew model.Sample @@ -122,6 +122,7 @@ func (s *Storage) Append(smpl *model.Sample) { for _, q := range s.queues { q.Append(&snew) } + return nil } // NeedsThrottling implements storage.SampleAppender. It will always return diff --git a/storage/storage.go b/storage/storage.go index 71b6bcfa8..5acae673e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -25,9 +25,8 @@ type SampleAppender interface { // of the sample after Append has returned. Remote storage // implementation will simply drop samples if they cannot keep up with // sending samples. Local storage implementations will only drop metrics - // upon unrecoverable errors. Reporting any errors is done via metrics - // and logs and not the concern of the caller. - Append(*model.Sample) + // upon unrecoverable errors. + Append(*model.Sample) error // NeedsThrottling returns true if the underlying storage wishes to not // receive any more samples. Append will still work but might lead to // undue resource usage. It is recommended to call NeedsThrottling once @@ -53,10 +52,16 @@ type Fanout []SampleAppender // Append implements SampleAppender. It appends the provided sample to all // SampleAppenders in the Fanout slice and waits for each append to complete // before proceeding with the next. -func (f Fanout) Append(s *model.Sample) { +// If any of the SampleAppenders returns an error, the first one is returned +// at the end. +func (f Fanout) Append(s *model.Sample) error { + var err error for _, a := range f { - a.Append(s) + if e := a.Append(s); e != nil && err == nil { + err = e + } } + return err } // NeedsThrottling returns true if at least one of the SampleAppenders in the