From 59f1e722df7242f665cdb28d337c8874b1e3f80e Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 2 Feb 2016 14:01:44 +0100 Subject: [PATCH 1/3] Return error on sample appending --- retrieval/target.go | 36 +++++++++++++++++++++++---------- storage/local/interface.go | 2 +- storage/local/storage.go | 10 ++++++--- storage/remote/queue_manager.go | 4 +++- storage/remote/remote.go | 5 +++-- storage/storage.go | 12 ++++++++--- 6 files changed, 48 insertions(+), 21 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index 133009f4c..f45287dd2 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) @@ -449,15 +450,29 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - var samples model.Vector + var ( + samples model.Vector + numOutOfOrder int + ) for { if err = sdec.Decode(&samples); err != nil { break } for _, s := range samples { - appender.Append(s) + err := appender.Append(s) + if err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + log.Warnf("Error inserting sample %v: %s", s, err) + } + } + } } + if numOutOfOrder > 0 { + log.Warnf("Error on ingesting %d out-of-order samples") + } if err == io.EOF { return nil @@ -472,7 +487,7 @@ type ruleLabelsAppender struct { labels model.LabelSet } -func (app ruleLabelsAppender) Append(s *model.Sample) { +func (app ruleLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if v, ok := s.Metric[ln]; ok && v != "" { s.Metric[model.ExportedLabelPrefix+ln] = v @@ -480,7 +495,7 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } type honorLabelsAppender struct { @@ -491,14 +506,14 @@ type honorLabelsAppender struct { // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Append(s *model.Sample) { +func (app honorLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if _, ok := s.Metric[ln]; !ok { s.Metric[ln] = lv } } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric @@ -508,19 +523,18 @@ type relabelAppender struct { relabelings []*config.RelabelConfig } -func (app relabelAppender) Append(s *model.Sample) { +func (app relabelAppender) Append(s *model.Sample) error { labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) if err != nil { - log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) - return + return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err) } // Check if the timeseries was dropped. if labels == nil { - return + return nil } s.Metric = model.Metric(labels) - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/storage/local/interface.go b/storage/local/interface.go index e260cc762..454c2d9d5 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -33,7 +33,7 @@ type Storage interface { // processing.) The implementation might remove labels with empty value // from the provided Sample as those labels are considered equivalent to // a label not present at all. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the Storage has too many chunks in memory // already or has too many chunks waiting for persistence. NeedsThrottling() bool diff --git a/storage/local/storage.go b/storage/local/storage.go index 29700163a..e7a4c292a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -567,8 +567,10 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") + // Append implements Storage. -func (s *memorySeriesStorage) Append(sample *model.Sample) { +func (s *memorySeriesStorage) Append(sample *model.Sample) error { for ln, lv := range sample.Metric { if len(lv) == 0 { delete(sample.Metric, ln) @@ -594,11 +596,11 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { // It would be even better to also compare the sample values here, but // we don't have efficient access to a series's last value. if sample.Timestamp != series.lastTime { - log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) s.outOfOrderSamplesCount.Inc() + return ErrOutOfOrderSample } s.fpLocker.Unlock(fp) - return + return nil } completedChunksCount := series.add(&model.SamplePair{ Value: sample.Value, @@ -607,6 +609,8 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.fpLocker.Unlock(fp) s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) + + return nil } // NeedsThrottling implements Storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ae3528c7d..4ed739707 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -133,13 +133,15 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. -func (t *StorageQueueManager) Append(s *model.Sample) { +// Always returns nil. +func (t *StorageQueueManager) Append(s *model.Sample) error { select { case t.queue <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") } + return nil } // Stop stops sending samples to the remote storage and waits for pending diff --git a/storage/remote/remote.go b/storage/remote/remote.go index d295f8f37..6c0ddba9d 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -104,8 +104,8 @@ func (s *Storage) Stop() { } } -// Append implements storage.SampleAppender. -func (s *Storage) Append(smpl *model.Sample) { +// Append implements storage.SampleAppender. Always returns nil. +func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() var snew model.Sample @@ -122,6 +122,7 @@ func (s *Storage) Append(smpl *model.Sample) { for _, q := range s.queues { q.Append(&snew) } + return nil } // NeedsThrottling implements storage.SampleAppender. It will always return diff --git a/storage/storage.go b/storage/storage.go index 71b6bcfa8..86730d643 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -27,7 +27,7 @@ type SampleAppender interface { // sending samples. Local storage implementations will only drop metrics // upon unrecoverable errors. Reporting any errors is done via metrics // and logs and not the concern of the caller. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the underlying storage wishes to not // receive any more samples. Append will still work but might lead to // undue resource usage. It is recommended to call NeedsThrottling once @@ -53,10 +53,16 @@ type Fanout []SampleAppender // Append implements SampleAppender. It appends the provided sample to all // SampleAppenders in the Fanout slice and waits for each append to complete // before proceeding with the next. -func (f Fanout) Append(s *model.Sample) { +// If any of the SampleAppenders returns an error, the first one is returned +// at the end. +func (f Fanout) Append(s *model.Sample) error { + var err error for _, a := range f { - a.Append(s) + if e := a.Append(s); e != nil && err == nil { + err = e + } } + return err } // NeedsThrottling returns true if at least one of the SampleAppenders in the From d0d2c38c68be373d6c6e3726083fb262175e152f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 3 Feb 2016 10:17:08 +0100 Subject: [PATCH 2/3] Fix tests for append API changes --- retrieval/helpers_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 1b74b2ea4..d19f6b41f 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -21,7 +21,8 @@ import ( type nopAppender struct{} -func (a nopAppender) Append(*model.Sample) { +func (a nopAppender) Append(*model.Sample) error { + return nil } func (a nopAppender) NeedsThrottling() bool { @@ -33,13 +34,14 @@ type collectResultAppender struct { throttled bool } -func (a *collectResultAppender) Append(s *model.Sample) { +func (a *collectResultAppender) Append(s *model.Sample) error { for ln, lv := range s.Metric { if len(lv) == 0 { delete(s.Metric, ln) } } a.result = append(a.result, s) + return nil } func (a *collectResultAppender) NeedsThrottling() bool { From 1f877f3d2abc7d87e7cbfadeb7ca8c24ce8dc029 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 3 Feb 2016 10:39:34 +0100 Subject: [PATCH 3/3] Fix deadlock, structure target logging --- retrieval/target.go | 5 +++-- storage/local/storage.go | 2 +- storage/storage.go | 3 +-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index f45287dd2..bf9adb0c6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -453,6 +453,7 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { var ( samples model.Vector numOutOfOrder int + logger = log.With("target", t.InstanceIdentifier()) ) for { if err = sdec.Decode(&samples); err != nil { @@ -464,14 +465,14 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { if err == local.ErrOutOfOrderSample { numOutOfOrder++ } else { - log.Warnf("Error inserting sample %v: %s", s, err) + logger.With("sample", s).Warnf("Error inserting sample: %s", err) } } } } if numOutOfOrder > 0 { - log.Warnf("Error on ingesting %d out-of-order samples") + logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") } if err == io.EOF { diff --git a/storage/local/storage.go b/storage/local/storage.go index e7a4c292a..62453d89d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -591,6 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { series := s.getOrCreateSeries(fp, sample.Metric) if sample.Timestamp <= series.lastTime { + s.fpLocker.Unlock(fp) // Don't log and track equal timestamps, as they are a common occurrence // when using client-side timestamps (e.g. Pushgateway or federation). // It would be even better to also compare the sample values here, but @@ -599,7 +600,6 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.outOfOrderSamplesCount.Inc() return ErrOutOfOrderSample } - s.fpLocker.Unlock(fp) return nil } completedChunksCount := series.add(&model.SamplePair{ diff --git a/storage/storage.go b/storage/storage.go index 86730d643..5acae673e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -25,8 +25,7 @@ type SampleAppender interface { // of the sample after Append has returned. Remote storage // implementation will simply drop samples if they cannot keep up with // sending samples. Local storage implementations will only drop metrics - // upon unrecoverable errors. Reporting any errors is done via metrics - // and logs and not the concern of the caller. + // upon unrecoverable errors. Append(*model.Sample) error // NeedsThrottling returns true if the underlying storage wishes to not // receive any more samples. Append will still work but might lead to