Make TSDB use storage errors

This fixes #6992, which was introduced by #6777. There was an
intermediate component which translated TSDB errors into storage errors,
but that component was deleted and this bug went unnoticed, until we
were watching at the Prombench results. Without this, scrape will fail
instead of dropping samples or using "Add" when the series have been
garbage collected.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-03-16 22:52:02 +01:00
parent ccc511456a
commit 8907ba6235
10 changed files with 62 additions and 44 deletions

View file

@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}()
for _, s := range vector {
if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
switch err {
switch errors.Cause(err) {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
@ -624,7 +624,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
@ -647,7 +647,7 @@ func (g *Group) cleanupStaleSeries(ts time.Time) {
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series

View file

@ -1121,7 +1121,7 @@ loop:
}
ce, ok := sl.cache.get(yoloString(met))
if ok {
switch err = app.AddFast(ce.ref, t, v); err {
switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) {
case nil:
if tp == nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
@ -1176,7 +1176,7 @@ loop:
var ref uint64
ref, err = app.Add(lset, t, v)
switch err {
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample:
err = nil
@ -1233,7 +1233,7 @@ loop:
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
switch errors.Cause(err) {
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
@ -1330,7 +1330,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
ce, ok := sl.cache.get(s)
if ok {
err := app.AddFast(ce.ref, t, v)
switch err {
switch errors.Cause(err) {
case nil:
return nil
case storage.ErrNotFound:
@ -1354,7 +1354,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
lset = sl.reportSampleMutator(lset)
ref, err := app.Add(lset, t, v)
switch err {
switch errors.Cause(err) {
case nil:
sl.cache.addRef(s, ref, lset, hash)
return nil

View file

@ -1807,3 +1807,35 @@ func TestReuseScrapeCache(t *testing.T) {
}
}
}
func TestScrapeAddFast(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
app := s.Appender()
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
&testScraper{},
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
0,
true,
)
defer cancel()
_, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{})
testutil.Ok(t, err)
// Poison the cache. There is just one entry, and one series in the
// storage. Changing the ref will create a 'not found' error.
for _, v := range sl.getCache().series {
v.ref++
}
_, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second))
testutil.Ok(t, err)
}

View file

@ -77,7 +77,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
// LabelNames returns all the unique label names present in the index in sorted order.

View file

@ -34,6 +34,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
@ -309,7 +310,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
s.ref = &ref
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
if errors.Cause(err) != tsdb.ErrNotFound {
if errors.Cause(err) != storage.ErrNotFound {
panic(err)
}

View file

@ -215,7 +215,7 @@ func TestDBAppenderAddRef(t *testing.T) {
testutil.Ok(t, err)
err = app2.AddFast(9999999, 1, 1)
testutil.Equals(t, ErrNotFound, errors.Cause(err))
testutil.Equals(t, storage.ErrNotFound, errors.Cause(err))
testutil.Ok(t, app2.Commit())
@ -363,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) {
app = db.Appender()
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
testutil.Equals(t, ErrAmendSample, err)
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
testutil.Ok(t, app.Rollback())
}
@ -398,7 +398,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
app = db.Appender()
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
testutil.Equals(t, ErrAmendSample, err)
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
}
func TestEmptyLabelsetCausesError(t *testing.T) {
@ -1660,7 +1660,7 @@ func TestNoEmptyBlocks(t *testing.T) {
app = db.Appender()
_, err = app.Add(defaultLabel, 1, 0)
testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
// Adding new blocks.
currentTime := db.Head().MaxTime()

View file

@ -39,21 +39,6 @@ import (
)
var (
// ErrNotFound is returned if a looked up resource was not found.
ErrNotFound = errors.Errorf("not found")
// ErrOutOfOrderSample is returned if an appended sample has a
// timestamp smaller than the most recent sample.
ErrOutOfOrderSample = errors.New("out of order sample")
// ErrAmendSample is returned if an appended sample has the same timestamp
// as the most recent sample but a different value.
ErrAmendSample = errors.New("amending sample")
// ErrOutOfBounds is returned if an appended sample is out of the
// writable time range.
ErrOutOfBounds = errors.New("out of bounds")
// ErrInvalidSample is returned if an appended sample is not valid and can't
// be ingested.
ErrInvalidSample = errors.New("invalid sample")
@ -841,7 +826,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
if a.app == nil {
return ErrNotFound
return storage.ErrNotFound
}
return a.app.AddFast(ref, t, v)
}
@ -954,7 +939,7 @@ type headAppender struct {
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.minValidTime {
return 0, ErrOutOfBounds
return 0, storage.ErrOutOfBounds
}
// Ensure no empty labels have gotten through.
@ -980,12 +965,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if t < a.minValidTime {
return ErrOutOfBounds
return storage.ErrOutOfBounds
}
s := a.head.series.getByID(ref)
if s == nil {
return errors.Wrap(ErrNotFound, "unknown series")
return errors.Wrap(storage.ErrNotFound, "unknown series")
}
s.Lock()
if err := s.appendable(t, v); err != nil {
@ -1318,7 +1303,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, ErrNotFound
return nil, storage.ErrNotFound
}
s.Lock()
@ -1328,7 +1313,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
// the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, ErrNotFound
return nil, storage.ErrNotFound
}
s.Unlock()
@ -1474,7 +1459,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return ErrNotFound
return storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.lset...)
@ -1818,12 +1803,12 @@ func (s *memSeries) appendable(t int64, v float64) error {
return nil
}
if t < c.maxTime {
return ErrOutOfOrderSample
return storage.ErrOutOfOrderSample
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
return ErrAmendSample
return storage.ErrDuplicateSampleForTimestamp
}
return nil
}

View file

@ -1012,7 +1012,7 @@ func TestGCChunkAccess(t *testing.T) {
testutil.Ok(t, h.Truncate(1500)) // Remove a chunk.
_, err = cr.Chunk(chunks[0].Ref)
testutil.Equals(t, ErrNotFound, err)
testutil.Equals(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
}
@ -1066,9 +1066,9 @@ func TestGCSeriesAccess(t *testing.T) {
testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1))
_, err = cr.Chunk(chunks[0].Ref)
testutil.Equals(t, ErrNotFound, err)
testutil.Equals(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Equals(t, ErrNotFound, err)
testutil.Equals(t, storage.ErrNotFound, err)
}
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {

View file

@ -739,7 +739,7 @@ func (s *baseChunkSeries) Next() bool {
ref := s.p.At()
if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == ErrNotFound {
if errors.Cause(err) == storage.ErrNotFound {
continue
}
s.err = err
@ -819,7 +819,7 @@ func (s *populatedChunkSeries) Next() bool {
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
if s.err != nil {
// This means that the chunk has be garbage collected. Remove it from the list.
if s.err == ErrNotFound {
if s.err == storage.ErrNotFound {
s.err = nil
// Delete in-place.
s.chks = append(chks[:j], chks[j+1:]...)

View file

@ -1491,7 +1491,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
s, ok := m.series[ref]
if !ok {
return ErrNotFound
return storage.ErrNotFound
}
*lset = append((*lset)[:0], s.l...)
*chks = append((*chks)[:0], s.chunks...)