diff --git a/rules/manager.go b/rules/manager.go index 33b10d9cb..425fb01a8 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { }() for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample: numOutOfOrder++ level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) @@ -624,7 +624,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series @@ -647,7 +647,7 @@ func (g *Group) cleanupStaleSeries(ts time.Time) { for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series diff --git a/scrape/scrape.go b/scrape/scrape.go index 601fca553..fbabba247 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1121,7 +1121,7 @@ loop: } ce, ok := sl.cache.get(yoloString(met)) if ok { - switch err = app.AddFast(ce.ref, t, v); err { + switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) { case nil: if tp == nil { sl.cache.trackStaleness(ce.hash, ce.lset) @@ -1176,7 +1176,7 @@ loop: var ref uint64 ref, err = app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample: err = nil @@ -1233,7 +1233,7 @@ loop: sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target // goes away and comes back again with a new scrape loop. @@ -1330,7 +1330,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ce, ok := sl.cache.get(s) if ok { err := app.AddFast(ce.ref, t, v) - switch err { + switch errors.Cause(err) { case nil: return nil case storage.ErrNotFound: @@ -1354,7 +1354,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v lset = sl.reportSampleMutator(lset) ref, err := app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: sl.cache.addRef(s, ref, lset, hash) return nil diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index d05b3c400..608c956b8 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1807,3 +1807,35 @@ func TestReuseScrapeCache(t *testing.T) { } } } + +func TestScrapeAddFast(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + app := s.Appender() + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + &testScraper{}, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + defer cancel() + + _, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{}) + testutil.Ok(t, err) + + // Poison the cache. There is just one entry, and one series in the + // storage. Changing the ref will create a 'not found' error. + for _, v := range sl.getCache().series { + v.ref++ + } + + _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) + testutil.Ok(t, err) +} diff --git a/tsdb/block.go b/tsdb/block.go index 536940ec9..bcdec5c88 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -77,7 +77,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - // Returns ErrNotFound if the ref does not resolve to a known series. + // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error // LabelNames returns all the unique label names present in the index in sorted order. diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 1febb4004..458ed5351 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,6 +34,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -309,7 +310,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - if errors.Cause(err) != tsdb.ErrNotFound { + if errors.Cause(err) != storage.ErrNotFound { panic(err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2493ffea6..e824b0c40 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -215,7 +215,7 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, err) err = app2.AddFast(9999999, 1, 1) - testutil.Equals(t, ErrNotFound, errors.Cause(err)) + testutil.Equals(t, storage.ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -363,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) testutil.Ok(t, app.Rollback()) } @@ -398,7 +398,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) } func TestEmptyLabelsetCausesError(t *testing.T) { @@ -1660,7 +1660,7 @@ func TestNoEmptyBlocks(t *testing.T) { app = db.Appender() _, err = app.Add(defaultLabel, 1, 0) - testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") // Adding new blocks. currentTime := db.Head().MaxTime() diff --git a/tsdb/head.go b/tsdb/head.go index d4b8a7997..bbdfead9d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -39,21 +39,6 @@ import ( ) var ( - // ErrNotFound is returned if a looked up resource was not found. - ErrNotFound = errors.Errorf("not found") - - // ErrOutOfOrderSample is returned if an appended sample has a - // timestamp smaller than the most recent sample. - ErrOutOfOrderSample = errors.New("out of order sample") - - // ErrAmendSample is returned if an appended sample has the same timestamp - // as the most recent sample but a different value. - ErrAmendSample = errors.New("amending sample") - - // ErrOutOfBounds is returned if an appended sample is out of the - // writable time range. - ErrOutOfBounds = errors.New("out of bounds") - // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") @@ -841,7 +826,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *initAppender) AddFast(ref uint64, t int64, v float64) error { if a.app == nil { - return ErrNotFound + return storage.ErrNotFound } return a.app.AddFast(ref, t, v) } @@ -954,7 +939,7 @@ type headAppender struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { - return 0, ErrOutOfBounds + return 0, storage.ErrOutOfBounds } // Ensure no empty labels have gotten through. @@ -980,12 +965,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < a.minValidTime { - return ErrOutOfBounds + return storage.ErrOutOfBounds } s := a.head.series.getByID(ref) if s == nil { - return errors.Wrap(ErrNotFound, "unknown series") + return errors.Wrap(storage.ErrNotFound, "unknown series") } s.Lock() if err := s.appendable(t, v); err != nil { @@ -1318,7 +1303,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s := h.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Lock() @@ -1328,7 +1313,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { // the specified range. if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Unlock() @@ -1474,7 +1459,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if s == nil { h.head.metrics.seriesNotFound.Inc() - return ErrNotFound + return storage.ErrNotFound } *lbls = append((*lbls)[:0], s.lset...) @@ -1818,12 +1803,12 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } if t < c.maxTime { - return ErrOutOfOrderSample + return storage.ErrOutOfOrderSample } // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - return ErrAmendSample + return storage.ErrDuplicateSampleForTimestamp } return nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index def09fe2c..f7aa9975c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1012,7 +1012,7 @@ func TestGCChunkAccess(t *testing.T) { testutil.Ok(t, h.Truncate(1500)) // Remove a chunk. _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) } @@ -1066,9 +1066,9 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1)) _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { diff --git a/tsdb/querier.go b/tsdb/querier.go index a5181c3f6..83f7b6ad6 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -739,7 +739,7 @@ func (s *baseChunkSeries) Next() bool { ref := s.p.At() if err := s.index.Series(ref, &lset, &chkMetas); err != nil { // Postings may be stale. Skip if no underlying series exists. - if errors.Cause(err) == ErrNotFound { + if errors.Cause(err) == storage.ErrNotFound { continue } s.err = err @@ -819,7 +819,7 @@ func (s *populatedChunkSeries) Next() bool { c.Chunk, s.err = s.chunks.Chunk(c.Ref) if s.err != nil { // This means that the chunk has be garbage collected. Remove it from the list. - if s.err == ErrNotFound { + if s.err == storage.ErrNotFound { s.err = nil // Delete in-place. s.chks = append(chks[:j], chks[j+1:]...) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 778081737..52e316051 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1491,7 +1491,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { - return ErrNotFound + return storage.ErrNotFound } *lset = append((*lset)[:0], s.l...) *chks = append((*chks)[:0], s.chunks...)