mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Merge pull request #6993 from roidelapluie/addfast
Fix tsdb to use errors from the storage package
This commit is contained in:
commit
ca7375fcff
|
@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
}()
|
||||
for _, s := range vector {
|
||||
if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case storage.ErrOutOfOrderSample:
|
||||
numOutOfOrder++
|
||||
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
|
||||
|
@ -624,7 +624,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
if _, ok := seriesReturned[metric]; !ok {
|
||||
// Series no longer exposed, mark it stale.
|
||||
_, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Do not count these in logging, as this is expected if series
|
||||
|
@ -647,7 +647,7 @@ func (g *Group) cleanupStaleSeries(ts time.Time) {
|
|||
for _, s := range g.staleSeries {
|
||||
// Rule that produced series no longer configured, mark it stale.
|
||||
_, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Do not count these in logging, as this is expected if series
|
||||
|
|
|
@ -1121,7 +1121,7 @@ loop:
|
|||
}
|
||||
ce, ok := sl.cache.get(yoloString(met))
|
||||
if ok {
|
||||
switch err = app.AddFast(ce.ref, t, v); err {
|
||||
switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) {
|
||||
case nil:
|
||||
if tp == nil {
|
||||
sl.cache.trackStaleness(ce.hash, ce.lset)
|
||||
|
@ -1176,7 +1176,7 @@ loop:
|
|||
|
||||
var ref uint64
|
||||
ref, err = app.Add(lset, t, v)
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample:
|
||||
err = nil
|
||||
|
@ -1233,7 +1233,7 @@ loop:
|
|||
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
||||
// Series no longer exposed, mark it stale.
|
||||
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Do not count these in logging, as this is expected if a target
|
||||
// goes away and comes back again with a new scrape loop.
|
||||
|
@ -1330,7 +1330,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
|
|||
ce, ok := sl.cache.get(s)
|
||||
if ok {
|
||||
err := app.AddFast(ce.ref, t, v)
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
return nil
|
||||
case storage.ErrNotFound:
|
||||
|
@ -1354,7 +1354,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
|
|||
lset = sl.reportSampleMutator(lset)
|
||||
|
||||
ref, err := app.Add(lset, t, v)
|
||||
switch err {
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
sl.cache.addRef(s, ref, lset, hash)
|
||||
return nil
|
||||
|
|
|
@ -1807,3 +1807,35 @@ func TestReuseScrapeCache(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapeAddFast(t *testing.T) {
|
||||
s := teststorage.New(t)
|
||||
defer s.Close()
|
||||
|
||||
app := s.Appender()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx,
|
||||
&testScraper{},
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return app },
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
_, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{})
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// Poison the cache. There is just one entry, and one series in the
|
||||
// storage. Changing the ref will create a 'not found' error.
|
||||
for _, v := range sl.getCache().series {
|
||||
v.ref++
|
||||
}
|
||||
|
||||
_, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second))
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ type IndexReader interface {
|
|||
|
||||
// Series populates the given labels and chunk metas for the series identified
|
||||
// by the reference.
|
||||
// Returns ErrNotFound if the ref does not resolve to a known series.
|
||||
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
|
||||
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
|
||||
|
||||
// LabelNames returns all the unique label names present in the index in sorted order.
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
"github.com/go-kit/kit/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
|
@ -309,7 +310,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
|
|||
s.ref = &ref
|
||||
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
||||
|
||||
if errors.Cause(err) != tsdb.ErrNotFound {
|
||||
if errors.Cause(err) != storage.ErrNotFound {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -215,7 +215,7 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
|
||||
err = app2.AddFast(9999999, 1, 1)
|
||||
testutil.Equals(t, ErrNotFound, errors.Cause(err))
|
||||
testutil.Equals(t, storage.ErrNotFound, errors.Cause(err))
|
||||
|
||||
testutil.Ok(t, app2.Commit())
|
||||
|
||||
|
@ -363,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
|||
|
||||
app = db.Appender()
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
testutil.Equals(t, ErrAmendSample, err)
|
||||
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
|
||||
testutil.Ok(t, app.Rollback())
|
||||
}
|
||||
|
||||
|
@ -398,7 +398,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
|||
|
||||
app = db.Appender()
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
|
||||
testutil.Equals(t, ErrAmendSample, err)
|
||||
testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err)
|
||||
}
|
||||
|
||||
func TestEmptyLabelsetCausesError(t *testing.T) {
|
||||
|
@ -1660,7 +1660,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
|
||||
app = db.Appender()
|
||||
_, err = app.Add(defaultLabel, 1, 0)
|
||||
testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
|
||||
testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
|
||||
|
||||
// Adding new blocks.
|
||||
currentTime := db.Head().MaxTime()
|
||||
|
|
33
tsdb/head.go
33
tsdb/head.go
|
@ -39,21 +39,6 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// ErrNotFound is returned if a looked up resource was not found.
|
||||
ErrNotFound = errors.Errorf("not found")
|
||||
|
||||
// ErrOutOfOrderSample is returned if an appended sample has a
|
||||
// timestamp smaller than the most recent sample.
|
||||
ErrOutOfOrderSample = errors.New("out of order sample")
|
||||
|
||||
// ErrAmendSample is returned if an appended sample has the same timestamp
|
||||
// as the most recent sample but a different value.
|
||||
ErrAmendSample = errors.New("amending sample")
|
||||
|
||||
// ErrOutOfBounds is returned if an appended sample is out of the
|
||||
// writable time range.
|
||||
ErrOutOfBounds = errors.New("out of bounds")
|
||||
|
||||
// ErrInvalidSample is returned if an appended sample is not valid and can't
|
||||
// be ingested.
|
||||
ErrInvalidSample = errors.New("invalid sample")
|
||||
|
@ -841,7 +826,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
|||
|
||||
func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if a.app == nil {
|
||||
return ErrNotFound
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
return a.app.AddFast(ref, t, v)
|
||||
}
|
||||
|
@ -954,7 +939,7 @@ type headAppender struct {
|
|||
|
||||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if t < a.minValidTime {
|
||||
return 0, ErrOutOfBounds
|
||||
return 0, storage.ErrOutOfBounds
|
||||
}
|
||||
|
||||
// Ensure no empty labels have gotten through.
|
||||
|
@ -980,12 +965,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
|||
|
||||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if t < a.minValidTime {
|
||||
return ErrOutOfBounds
|
||||
return storage.ErrOutOfBounds
|
||||
}
|
||||
|
||||
s := a.head.series.getByID(ref)
|
||||
if s == nil {
|
||||
return errors.Wrap(ErrNotFound, "unknown series")
|
||||
return errors.Wrap(storage.ErrNotFound, "unknown series")
|
||||
}
|
||||
s.Lock()
|
||||
if err := s.appendable(t, v); err != nil {
|
||||
|
@ -1318,7 +1303,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
|
|||
s := h.head.series.getByID(sid)
|
||||
// This means that the series has been garbage collected.
|
||||
if s == nil {
|
||||
return nil, ErrNotFound
|
||||
return nil, storage.ErrNotFound
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
|
@ -1328,7 +1313,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
|
|||
// the specified range.
|
||||
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||
s.Unlock()
|
||||
return nil, ErrNotFound
|
||||
return nil, storage.ErrNotFound
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
|
@ -1474,7 +1459,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
|
|||
|
||||
if s == nil {
|
||||
h.head.metrics.seriesNotFound.Inc()
|
||||
return ErrNotFound
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
*lbls = append((*lbls)[:0], s.lset...)
|
||||
|
||||
|
@ -1818,12 +1803,12 @@ func (s *memSeries) appendable(t int64, v float64) error {
|
|||
return nil
|
||||
}
|
||||
if t < c.maxTime {
|
||||
return ErrOutOfOrderSample
|
||||
return storage.ErrOutOfOrderSample
|
||||
}
|
||||
// We are allowing exact duplicates as we can encounter them in valid cases
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
|
||||
return ErrAmendSample
|
||||
return storage.ErrDuplicateSampleForTimestamp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1012,7 +1012,7 @@ func TestGCChunkAccess(t *testing.T) {
|
|||
testutil.Ok(t, h.Truncate(1500)) // Remove a chunk.
|
||||
|
||||
_, err = cr.Chunk(chunks[0].Ref)
|
||||
testutil.Equals(t, ErrNotFound, err)
|
||||
testutil.Equals(t, storage.ErrNotFound, err)
|
||||
_, err = cr.Chunk(chunks[1].Ref)
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
@ -1066,9 +1066,9 @@ func TestGCSeriesAccess(t *testing.T) {
|
|||
testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1))
|
||||
|
||||
_, err = cr.Chunk(chunks[0].Ref)
|
||||
testutil.Equals(t, ErrNotFound, err)
|
||||
testutil.Equals(t, storage.ErrNotFound, err)
|
||||
_, err = cr.Chunk(chunks[1].Ref)
|
||||
testutil.Equals(t, ErrNotFound, err)
|
||||
testutil.Equals(t, storage.ErrNotFound, err)
|
||||
}
|
||||
|
||||
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
||||
|
|
|
@ -739,7 +739,7 @@ func (s *baseChunkSeries) Next() bool {
|
|||
ref := s.p.At()
|
||||
if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
|
||||
// Postings may be stale. Skip if no underlying series exists.
|
||||
if errors.Cause(err) == ErrNotFound {
|
||||
if errors.Cause(err) == storage.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
s.err = err
|
||||
|
@ -819,7 +819,7 @@ func (s *populatedChunkSeries) Next() bool {
|
|||
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
|
||||
if s.err != nil {
|
||||
// This means that the chunk has be garbage collected. Remove it from the list.
|
||||
if s.err == ErrNotFound {
|
||||
if s.err == storage.ErrNotFound {
|
||||
s.err = nil
|
||||
// Delete in-place.
|
||||
s.chks = append(chks[:j], chks[j+1:]...)
|
||||
|
|
|
@ -1491,7 +1491,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
|
|||
func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
*lset = append((*lset)[:0], s.l...)
|
||||
*chks = append((*chks)[:0], s.chunks...)
|
||||
|
|
Loading…
Reference in a new issue