From a04be0bc1c85eb2d6774198c37418020115c60d4 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 19 Sep 2017 14:24:26 +0200 Subject: [PATCH 1/2] vendor: update prometheus/tsdb --- vendor/github.com/prometheus/tsdb/block.go | 5 --- vendor/github.com/prometheus/tsdb/db.go | 3 +- vendor/github.com/prometheus/tsdb/head.go | 46 ++++++++++++++-------- vendor/github.com/prometheus/tsdb/index.go | 3 ++ vendor/github.com/prometheus/tsdb/wal.go | 15 ++++--- vendor/vendor.json | 14 +++---- 6 files changed, 49 insertions(+), 37 deletions(-) diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 67cd57491..232e64e67 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -64,11 +64,6 @@ type Appendable interface { Appender() Appender } -// Queryable defines an entity which provides a Querier. -type Queryable interface { - Querier(mint, maxt int64) Querier -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index c9745cfc6..87b5ed253 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -165,8 +165,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, err } if l == nil { - l = log.NewLogfmtLogger(os.Stdout) - l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + l = log.NewNopLogger() } if opts == nil { opts = DefaultOptions diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index ea7b63f8a..a5ce94e45 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } +// ReadWAL initializes the head by consuming the write ahead log. func (h *Head) ReadWAL() error { r := h.wal.Reader() mint := h.MinTime() seriesFunc := func(series []RefSeries) error { for _, s := range series { - h.create(s.Labels.Hash(), s.Labels) + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } } return nil } @@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) + h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref) + continue } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { @@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil } deletesFunc := func(stones []Stone) error { @@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } @@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro if t < a.mint { return 0, ErrOutOfBounds } - hash := lset.Hash() - - s := a.head.series.getByHash(hash, lset) - - if s == nil { - s = a.head.create(hash, lset) + s, created := a.head.getOrCreate(lset.Hash(), lset) + if created { a.series = append(a.series, RefSeries{ Ref: s.ref, Labels: lset, - hash: hash, }) } return s.ref, a.AddFast(s.ref, t, v) @@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { return res, nil } -func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { - h.metrics.series.Inc() - h.metrics.seriesCreated.Inc() +func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { + // Just using `getOrSet` below would be semantically sufficient, but we'd create + // a new series on every sample inserted via Add(), which causes allocations + // and makes our series IDs rather random and harder to compress in postings. + s := h.series.getByHash(hash, lset) + if s != nil { + return s, false + } // Optimistically assume that we are the first one to create the series. id := atomic.AddUint64(&h.lastSeriesID, 1) + + return h.getOrCreateWithID(id, hash, lset) +} + +func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { s := newMemSeries(lset, id, h.chunkRange) s, created := h.series.getOrSet(hash, s) - // Skip indexing if we didn't actually create the series. if !created { - return s + return s, false } + h.metrics.series.Inc() + h.metrics.seriesCreated.Inc() + h.postings.add(id, lset) h.symMtx.Lock() @@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { h.symbols[l.Value] = struct{}{} } - return s + return s, true } // seriesHashmap is a simple hashmap for memSeries by their label set. It is built @@ -1023,6 +1034,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo s.locks[i].Lock() if prev := s.hashes[i].get(hash, series.lset); prev != nil { + s.locks[i].Unlock() return prev, false } s.hashes[i].set(hash, series) diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index fd9b25162..3cdaad74d 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -570,6 +570,9 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) +// NewIndexReader returns a new IndexReader on the given directory. +func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } + // newIndexReader returns a new indexReader on the given directory. func newIndexReader(dir string) (*indexReader, error) { f, err := openMmapFile(filepath.Join(dir, "index")) diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 9af9a1853..27984ea0c 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -99,9 +99,6 @@ type WALReader interface { type RefSeries struct { Ref uint64 Labels labels.Labels - - // hash for the label set. This field is not generally populated. - hash uint64 } // RefSample is a timestamp/value pair associated with a reference to a series. @@ -827,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode series entry") } - seriesf(series) + if err := seriesf(series); err != nil { + return err + } cf := r.current() @@ -842,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode samples entry") } - samplesf(samples) + if err := samplesf(samples); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current() @@ -858,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode delete entry") } - deletesf(stones) + if err := deletesf(stones); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current() diff --git a/vendor/vendor.json b/vendor/vendor.json index 907fb7e06..2b639724e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -871,22 +871,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=", + "checksumSHA1": "B5ndMoK8lqgFJ8xUZ/0V4zCpUw0=", "path": "github.com/prometheus/tsdb", - "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", - "revisionTime": "2017-09-11T08:41:33Z" + "revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a", + "revisionTime": "2017-09-19T08:20:19Z" }, { "checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", - "revisionTime": "2017-09-11T08:41:33Z" + "revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a", + "revisionTime": "2017-09-19T08:20:19Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", - "revisionTime": "2017-09-11T08:41:33Z" + "revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a", + "revisionTime": "2017-09-19T08:20:19Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", From 437f51a85f0199501b6dea8fbac1a9b573958721 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 15 Sep 2017 11:08:51 +0200 Subject: [PATCH 2/2] Fix cache maintenance on changing metric representations We were not properly maintaining the scrape cache when the same metric was exposed with a different string representation. This overall reduces the scraping cache's complexity, which fixes the issue and saves about 10% of memory in a scraping-only Prometheus instance. --- retrieval/helpers_test.go | 23 +++++++- retrieval/scrape.go | 117 +++++++++++++++----------------------- retrieval/scrape_test.go | 49 ++++++++++++++++ 3 files changed, 115 insertions(+), 74 deletions(-) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index c8ba8be34..85e63cc4e 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -32,13 +32,27 @@ func (a nopAppender) AddFast(labels.Labels, uint64, int64, float64) error { retu func (a nopAppender) Commit() error { return nil } func (a nopAppender) Rollback() error { return nil } +// collectResultAppender records all samples that were added through the appender. +// It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { + next storage.Appender result []sample } func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error { - // Not implemented. - return storage.ErrNotFound + if a.next == nil { + return storage.ErrNotFound + } + err := a.next.AddFast(m, ref, t, v) + if err != nil { + return err + } + a.result = append(a.result, sample{ + metric: m, + t: t, + v: v, + }) + return err } func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) { @@ -47,7 +61,10 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64 t: t, v: v, }) - return 0, nil + if a.next == nil { + return 0, nil + } + return a.next.Add(m, t, v) } func (a *collectResultAppender) Commit() error { return nil } diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 947701da9..3ccfffa5f 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -45,13 +45,6 @@ import ( "github.com/prometheus/prometheus/util/httputil" ) -const ( - scrapeHealthMetricName = "up" - scrapeDurationMetricName = "scrape_duration_seconds" - scrapeSamplesMetricName = "scrape_samples_scraped" - samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" -) - var ( targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -460,15 +453,11 @@ type loop interface { stop() } -type lsetCacheEntry struct { - metric string - lset labels.Labels - hash uint64 -} - -type refEntry struct { +type cacheEntry struct { ref uint64 lastIter uint64 + hash uint64 + lset labels.Labels } type scrapeLoop struct { @@ -494,8 +483,9 @@ type scrapeLoop struct { type scrapeCache struct { iter uint64 // Current scrape iteration. - refs map[string]*refEntry // Parsed string to ref. - lsets map[uint64]*lsetCacheEntry // Ref to labelset and string. + // Parsed string to an entry with information about the actual label set + // and its storage reference. + entries map[string]*cacheEntry // Cache of dropped metric strings and their iteration. The iteration must // be a pointer so we can update it without setting a new entry with an unsafe @@ -511,8 +501,7 @@ type scrapeCache struct { func newScrapeCache() *scrapeCache { return &scrapeCache{ - refs: map[string]*refEntry{}, - lsets: map[uint64]*lsetCacheEntry{}, + entries: map[string]*cacheEntry{}, dropped: map[string]*uint64{}, seriesCur: map[uint64]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{}, @@ -523,14 +512,13 @@ func (c *scrapeCache) iterDone() { // refCache and lsetCache may grow over time through series churn // or multiple string representations of the same metric. Clean up entries // that haven't appeared in the last scrape. - for s, e := range c.refs { - if e.lastIter < c.iter { - delete(c.refs, s) - delete(c.lsets, e.ref) + for s, e := range c.entries { + if c.iter-e.lastIter > 2 { + delete(c.entries, s) } } for s, iter := range c.dropped { - if *iter < c.iter { + if c.iter-*iter > 2 { delete(c.dropped, s) } } @@ -546,29 +534,20 @@ func (c *scrapeCache) iterDone() { c.iter++ } -func (c *scrapeCache) getRef(met string) (uint64, bool) { - e, ok := c.refs[met] +func (c *scrapeCache) get(met string) (*cacheEntry, bool) { + e, ok := c.entries[met] if !ok { - return 0, false + return nil, false } e.lastIter = c.iter - return e.ref, true + return e, true } func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) { if ref == 0 { return } - // Clean up the label set cache before overwriting the ref for a previously seen - // metric representation. It won't be caught by the cleanup in iterDone otherwise. - if e, ok := c.refs[met]; ok { - delete(c.lsets, e.ref) - } - c.refs[met] = &refEntry{ref: ref, lastIter: c.iter} - // met is the raw string the metric was ingested as. The label set is not ordered - // and thus it's not suitable to uniquely identify cache entries. - // We store a hash over the label set instead. - c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash} + c.entries[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} } func (c *scrapeCache) addDropped(met string) { @@ -825,14 +804,12 @@ loop: if sl.cache.getDropped(yoloString(met)) { continue } - ref, ok := sl.cache.getRef(yoloString(met)) + ce, ok := sl.cache.get(yoloString(met)) if ok { - lset := sl.cache.lsets[ref].lset - switch err = app.AddFast(lset, ref, t, v); err { + switch err = app.AddFast(ce.lset, ce.ref, t, v); err { case nil: if tp == nil { - e := sl.cache.lsets[ref] - sl.cache.trackStaleness(e.hash, e.lset) + sl.cache.trackStaleness(ce.hash, ce.lset) } case storage.ErrNotFound: ok = false @@ -862,28 +839,19 @@ loop: } } if !ok { - var ( - lset labels.Labels - mets string - hash uint64 - ) - if e, ok := sl.cache.lsets[ref]; ok { - mets = e.metric - lset = e.lset - hash = e.hash - } else { - mets = p.Metric(&lset) - hash = lset.Hash() + var lset labels.Labels - // Hash label set as it is seen local to the target. Then add target labels - // and relabeling and store the final label set. - lset = sl.sampleMutator(lset) + mets := p.Metric(&lset) + hash := lset.Hash() - // The label set may be set to nil to indicate dropping. - if lset == nil { - sl.cache.addDropped(mets) - continue - } + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + lset = sl.sampleMutator(lset) + + // The label set may be set to nil to indicate dropping. + if lset == nil { + sl.cache.addDropped(mets) + continue } var ref uint64 @@ -970,6 +938,15 @@ func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } +// The constants are suffixed with the invalid \xff unicode rune to avoid collisions +// with scraped metrics in the cache. +const ( + scrapeHealthMetricName = "up" + "\xff" + scrapeDurationMetricName = "scrape_duration_seconds" + "\xff" + scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff" + samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff" +) + func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error { sl.scraper.report(start, duration, err) @@ -1026,14 +1003,9 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - // Suffix s with the invalid \xff unicode rune to avoid collisions - // with scraped metrics. - s2 := s + "\xff" - - ref, ok := sl.cache.getRef(s2) + ce, ok := sl.cache.get(s) if ok { - lset := sl.cache.lsets[ref].lset - err := app.AddFast(lset, ref, t, v) + err := app.AddFast(ce.lset, ce.ref, t, v) switch err { case nil: return nil @@ -1048,7 +1020,10 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v } } lset := labels.Labels{ - labels.Label{Name: labels.MetricName, Value: s}, + // The constants are suffixed with the invalid \xff unicode rune to avoid collisions + // with scraped metrics in the cache. + // We have to drop it when building the actual metric. + labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]}, } hash := lset.Hash() @@ -1057,7 +1032,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, err := app.Add(lset, t, v) switch err { case nil: - sl.cache.addRef(s2, ref, lset, hash) + sl.cache.addRef(s, ref, lset, hash) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 12e366ed3..a9ca87cd2 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/testutil" ) func TestNewScrapePool(t *testing.T) { @@ -625,6 +626,54 @@ func TestScrapeLoopAppend(t *testing.T) { } } +func TestScrapeLoop_ChangingMetricString(t *testing.T) { + // This is a regression test for the scrape loop cache not properly maintaining + // IDs when the string representation of a metric changes across a scrape. Thus + // we use a real storage appender here. + s := testutil.NewStorage(t) + defer s.Close() + + app, err := s.Appender() + if err != nil { + t.Error(err) + } + capp := &collectResultAppender{next: app} + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return capp }, + ) + + now := time.Now() + _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), now) + if err != nil { + t.Fatalf("Unexpected append error: %s", err) + } + _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), now.Add(time.Minute)) + if err != nil { + t.Fatalf("Unexpected append error: %s", err) + } + + // DeepEqual will report NaNs as being different, so replace with a different value. + want := []sample{ + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(now), + v: 1, + }, + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(now.Add(time.Minute)), + v: 2, + }, + } + if !reflect.DeepEqual(want, capp.result) { + t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, capp.result) + } +} + func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{}