mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-27 22:49:40 -08:00
Merge pull request #3192 from prometheus/scrapecache
Fix cache maintenance on changing metric representations
This commit is contained in:
commit
18078e3fcc
|
@ -32,13 +32,27 @@ func (a nopAppender) AddFast(labels.Labels, uint64, int64, float64) error { retu
|
||||||
func (a nopAppender) Commit() error { return nil }
|
func (a nopAppender) Commit() error { return nil }
|
||||||
func (a nopAppender) Rollback() error { return nil }
|
func (a nopAppender) Rollback() error { return nil }
|
||||||
|
|
||||||
|
// collectResultAppender records all samples that were added through the appender.
|
||||||
|
// It can be used as its zero value or be backed by another appender it writes samples through.
|
||||||
type collectResultAppender struct {
|
type collectResultAppender struct {
|
||||||
|
next storage.Appender
|
||||||
result []sample
|
result []sample
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error {
|
func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error {
|
||||||
// Not implemented.
|
if a.next == nil {
|
||||||
return storage.ErrNotFound
|
return storage.ErrNotFound
|
||||||
|
}
|
||||||
|
err := a.next.AddFast(m, ref, t, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.result = append(a.result, sample{
|
||||||
|
metric: m,
|
||||||
|
t: t,
|
||||||
|
v: v,
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
|
@ -47,7 +61,10 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64
|
||||||
t: t,
|
t: t,
|
||||||
v: v,
|
v: v,
|
||||||
})
|
})
|
||||||
return 0, nil
|
if a.next == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return a.next.Add(m, t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *collectResultAppender) Commit() error { return nil }
|
func (a *collectResultAppender) Commit() error { return nil }
|
||||||
|
|
|
@ -45,13 +45,6 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/httputil"
|
"github.com/prometheus/prometheus/util/httputil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
scrapeHealthMetricName = "up"
|
|
||||||
scrapeDurationMetricName = "scrape_duration_seconds"
|
|
||||||
scrapeSamplesMetricName = "scrape_samples_scraped"
|
|
||||||
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
targetIntervalLength = prometheus.NewSummaryVec(
|
targetIntervalLength = prometheus.NewSummaryVec(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
|
@ -460,15 +453,11 @@ type loop interface {
|
||||||
stop()
|
stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
type lsetCacheEntry struct {
|
type cacheEntry struct {
|
||||||
metric string
|
|
||||||
lset labels.Labels
|
|
||||||
hash uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type refEntry struct {
|
|
||||||
ref uint64
|
ref uint64
|
||||||
lastIter uint64
|
lastIter uint64
|
||||||
|
hash uint64
|
||||||
|
lset labels.Labels
|
||||||
}
|
}
|
||||||
|
|
||||||
type scrapeLoop struct {
|
type scrapeLoop struct {
|
||||||
|
@ -494,8 +483,9 @@ type scrapeLoop struct {
|
||||||
type scrapeCache struct {
|
type scrapeCache struct {
|
||||||
iter uint64 // Current scrape iteration.
|
iter uint64 // Current scrape iteration.
|
||||||
|
|
||||||
refs map[string]*refEntry // Parsed string to ref.
|
// Parsed string to an entry with information about the actual label set
|
||||||
lsets map[uint64]*lsetCacheEntry // Ref to labelset and string.
|
// and its storage reference.
|
||||||
|
entries map[string]*cacheEntry
|
||||||
|
|
||||||
// Cache of dropped metric strings and their iteration. The iteration must
|
// Cache of dropped metric strings and their iteration. The iteration must
|
||||||
// be a pointer so we can update it without setting a new entry with an unsafe
|
// be a pointer so we can update it without setting a new entry with an unsafe
|
||||||
|
@ -511,8 +501,7 @@ type scrapeCache struct {
|
||||||
|
|
||||||
func newScrapeCache() *scrapeCache {
|
func newScrapeCache() *scrapeCache {
|
||||||
return &scrapeCache{
|
return &scrapeCache{
|
||||||
refs: map[string]*refEntry{},
|
entries: map[string]*cacheEntry{},
|
||||||
lsets: map[uint64]*lsetCacheEntry{},
|
|
||||||
dropped: map[string]*uint64{},
|
dropped: map[string]*uint64{},
|
||||||
seriesCur: map[uint64]labels.Labels{},
|
seriesCur: map[uint64]labels.Labels{},
|
||||||
seriesPrev: map[uint64]labels.Labels{},
|
seriesPrev: map[uint64]labels.Labels{},
|
||||||
|
@ -523,14 +512,13 @@ func (c *scrapeCache) iterDone() {
|
||||||
// refCache and lsetCache may grow over time through series churn
|
// refCache and lsetCache may grow over time through series churn
|
||||||
// or multiple string representations of the same metric. Clean up entries
|
// or multiple string representations of the same metric. Clean up entries
|
||||||
// that haven't appeared in the last scrape.
|
// that haven't appeared in the last scrape.
|
||||||
for s, e := range c.refs {
|
for s, e := range c.entries {
|
||||||
if e.lastIter < c.iter {
|
if c.iter-e.lastIter > 2 {
|
||||||
delete(c.refs, s)
|
delete(c.entries, s)
|
||||||
delete(c.lsets, e.ref)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for s, iter := range c.dropped {
|
for s, iter := range c.dropped {
|
||||||
if *iter < c.iter {
|
if c.iter-*iter > 2 {
|
||||||
delete(c.dropped, s)
|
delete(c.dropped, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -546,29 +534,20 @@ func (c *scrapeCache) iterDone() {
|
||||||
c.iter++
|
c.iter++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *scrapeCache) getRef(met string) (uint64, bool) {
|
func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
|
||||||
e, ok := c.refs[met]
|
e, ok := c.entries[met]
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, false
|
return nil, false
|
||||||
}
|
}
|
||||||
e.lastIter = c.iter
|
e.lastIter = c.iter
|
||||||
return e.ref, true
|
return e, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
|
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
|
||||||
if ref == 0 {
|
if ref == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Clean up the label set cache before overwriting the ref for a previously seen
|
c.entries[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
|
||||||
// metric representation. It won't be caught by the cleanup in iterDone otherwise.
|
|
||||||
if e, ok := c.refs[met]; ok {
|
|
||||||
delete(c.lsets, e.ref)
|
|
||||||
}
|
|
||||||
c.refs[met] = &refEntry{ref: ref, lastIter: c.iter}
|
|
||||||
// met is the raw string the metric was ingested as. The label set is not ordered
|
|
||||||
// and thus it's not suitable to uniquely identify cache entries.
|
|
||||||
// We store a hash over the label set instead.
|
|
||||||
c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *scrapeCache) addDropped(met string) {
|
func (c *scrapeCache) addDropped(met string) {
|
||||||
|
@ -825,14 +804,12 @@ loop:
|
||||||
if sl.cache.getDropped(yoloString(met)) {
|
if sl.cache.getDropped(yoloString(met)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ref, ok := sl.cache.getRef(yoloString(met))
|
ce, ok := sl.cache.get(yoloString(met))
|
||||||
if ok {
|
if ok {
|
||||||
lset := sl.cache.lsets[ref].lset
|
switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
|
||||||
switch err = app.AddFast(lset, ref, t, v); err {
|
|
||||||
case nil:
|
case nil:
|
||||||
if tp == nil {
|
if tp == nil {
|
||||||
e := sl.cache.lsets[ref]
|
sl.cache.trackStaleness(ce.hash, ce.lset)
|
||||||
sl.cache.trackStaleness(e.hash, e.lset)
|
|
||||||
}
|
}
|
||||||
case storage.ErrNotFound:
|
case storage.ErrNotFound:
|
||||||
ok = false
|
ok = false
|
||||||
|
@ -862,28 +839,19 @@ loop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
var (
|
var lset labels.Labels
|
||||||
lset labels.Labels
|
|
||||||
mets string
|
|
||||||
hash uint64
|
|
||||||
)
|
|
||||||
if e, ok := sl.cache.lsets[ref]; ok {
|
|
||||||
mets = e.metric
|
|
||||||
lset = e.lset
|
|
||||||
hash = e.hash
|
|
||||||
} else {
|
|
||||||
mets = p.Metric(&lset)
|
|
||||||
hash = lset.Hash()
|
|
||||||
|
|
||||||
// Hash label set as it is seen local to the target. Then add target labels
|
mets := p.Metric(&lset)
|
||||||
// and relabeling and store the final label set.
|
hash := lset.Hash()
|
||||||
lset = sl.sampleMutator(lset)
|
|
||||||
|
|
||||||
// The label set may be set to nil to indicate dropping.
|
// Hash label set as it is seen local to the target. Then add target labels
|
||||||
if lset == nil {
|
// and relabeling and store the final label set.
|
||||||
sl.cache.addDropped(mets)
|
lset = sl.sampleMutator(lset)
|
||||||
continue
|
|
||||||
}
|
// The label set may be set to nil to indicate dropping.
|
||||||
|
if lset == nil {
|
||||||
|
sl.cache.addDropped(mets)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var ref uint64
|
var ref uint64
|
||||||
|
@ -970,6 +938,15 @@ func yoloString(b []byte) string {
|
||||||
return *((*string)(unsafe.Pointer(&b)))
|
return *((*string)(unsafe.Pointer(&b)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||||
|
// with scraped metrics in the cache.
|
||||||
|
const (
|
||||||
|
scrapeHealthMetricName = "up" + "\xff"
|
||||||
|
scrapeDurationMetricName = "scrape_duration_seconds" + "\xff"
|
||||||
|
scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff"
|
||||||
|
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
|
||||||
|
)
|
||||||
|
|
||||||
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
|
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
|
||||||
sl.scraper.report(start, duration, err)
|
sl.scraper.report(start, duration, err)
|
||||||
|
|
||||||
|
@ -1026,14 +1003,9 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
|
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
|
||||||
// Suffix s with the invalid \xff unicode rune to avoid collisions
|
ce, ok := sl.cache.get(s)
|
||||||
// with scraped metrics.
|
|
||||||
s2 := s + "\xff"
|
|
||||||
|
|
||||||
ref, ok := sl.cache.getRef(s2)
|
|
||||||
if ok {
|
if ok {
|
||||||
lset := sl.cache.lsets[ref].lset
|
err := app.AddFast(ce.lset, ce.ref, t, v)
|
||||||
err := app.AddFast(lset, ref, t, v)
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
return nil
|
return nil
|
||||||
|
@ -1048,7 +1020,10 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lset := labels.Labels{
|
lset := labels.Labels{
|
||||||
labels.Label{Name: labels.MetricName, Value: s},
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||||
|
// with scraped metrics in the cache.
|
||||||
|
// We have to drop it when building the actual metric.
|
||||||
|
labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := lset.Hash()
|
hash := lset.Hash()
|
||||||
|
@ -1057,7 +1032,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
|
||||||
ref, err := app.Add(lset, t, v)
|
ref, err := app.Add(lset, t, v)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
sl.cache.addRef(s2, ref, lset, hash)
|
sl.cache.addRef(s, ref, lset, hash)
|
||||||
return nil
|
return nil
|
||||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -37,6 +37,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
"github.com/prometheus/prometheus/pkg/value"
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewScrapePool(t *testing.T) {
|
func TestNewScrapePool(t *testing.T) {
|
||||||
|
@ -625,6 +626,54 @@ func TestScrapeLoopAppend(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
||||||
|
// This is a regression test for the scrape loop cache not properly maintaining
|
||||||
|
// IDs when the string representation of a metric changes across a scrape. Thus
|
||||||
|
// we use a real storage appender here.
|
||||||
|
s := testutil.NewStorage(t)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
app, err := s.Appender()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
capp := &collectResultAppender{next: app}
|
||||||
|
|
||||||
|
sl := newScrapeLoop(context.Background(),
|
||||||
|
nil, nil, nil,
|
||||||
|
nopMutator,
|
||||||
|
nopMutator,
|
||||||
|
func() storage.Appender { return capp },
|
||||||
|
)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), now)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected append error: %s", err)
|
||||||
|
}
|
||||||
|
_, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), now.Add(time.Minute))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected append error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeepEqual will report NaNs as being different, so replace with a different value.
|
||||||
|
want := []sample{
|
||||||
|
{
|
||||||
|
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
||||||
|
t: timestamp.FromTime(now),
|
||||||
|
v: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
||||||
|
t: timestamp.FromTime(now.Add(time.Minute)),
|
||||||
|
v: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(want, capp.result) {
|
||||||
|
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, capp.result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestScrapeLoopAppendStaleness(t *testing.T) {
|
func TestScrapeLoopAppendStaleness(t *testing.T) {
|
||||||
app := &collectResultAppender{}
|
app := &collectResultAppender{}
|
||||||
|
|
||||||
|
|
5
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
5
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -64,11 +64,6 @@ type Appendable interface {
|
||||||
Appender() Appender
|
Appender() Appender
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queryable defines an entity which provides a Querier.
|
|
||||||
type Queryable interface {
|
|
||||||
Querier(mint, maxt int64) Querier
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlockMeta provides meta information about a block.
|
// BlockMeta provides meta information about a block.
|
||||||
type BlockMeta struct {
|
type BlockMeta struct {
|
||||||
// Unique identifier for the block and its contents. Changes on compaction.
|
// Unique identifier for the block and its contents. Changes on compaction.
|
||||||
|
|
3
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
3
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -165,8 +165,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = log.NewLogfmtLogger(os.Stdout)
|
l = log.NewNopLogger()
|
||||||
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
|
||||||
}
|
}
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultOptions
|
opts = DefaultOptions
|
||||||
|
|
46
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
46
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadWAL initializes the head by consuming the write ahead log.
|
||||||
func (h *Head) ReadWAL() error {
|
func (h *Head) ReadWAL() error {
|
||||||
r := h.wal.Reader()
|
r := h.wal.Reader()
|
||||||
mint := h.MinTime()
|
mint := h.MinTime()
|
||||||
|
|
||||||
seriesFunc := func(series []RefSeries) error {
|
seriesFunc := func(series []RefSeries) error {
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
h.create(s.Labels.Hash(), s.Labels)
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||||
|
|
||||||
|
if h.lastSeriesID < s.Ref {
|
||||||
|
h.lastSeriesID = s.Ref
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error {
|
||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
|
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
_, chunkCreated := ms.append(s.T, s.V)
|
_, chunkCreated := ms.append(s.T, s.V)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
|
@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error {
|
||||||
h.metrics.chunks.Inc()
|
h.metrics.chunks.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
deletesFunc := func(stones []Stone) error {
|
deletesFunc := func(stones []Stone) error {
|
||||||
|
@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error {
|
||||||
h.tombstones.add(s.ref, itv)
|
h.tombstones.add(s.ref, itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
||||||
if t < a.mint {
|
if t < a.mint {
|
||||||
return 0, ErrOutOfBounds
|
return 0, ErrOutOfBounds
|
||||||
}
|
}
|
||||||
hash := lset.Hash()
|
|
||||||
|
|
||||||
s := a.head.series.getByHash(hash, lset)
|
|
||||||
|
|
||||||
if s == nil {
|
|
||||||
s = a.head.create(hash, lset)
|
|
||||||
|
|
||||||
|
s, created := a.head.getOrCreate(lset.Hash(), lset)
|
||||||
|
if created {
|
||||||
a.series = append(a.series, RefSeries{
|
a.series = append(a.series, RefSeries{
|
||||||
Ref: s.ref,
|
Ref: s.ref,
|
||||||
Labels: lset,
|
Labels: lset,
|
||||||
hash: hash,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return s.ref, a.AddFast(s.ref, t, v)
|
return s.ref, a.AddFast(s.ref, t, v)
|
||||||
|
@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||||
h.metrics.series.Inc()
|
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
||||||
h.metrics.seriesCreated.Inc()
|
// a new series on every sample inserted via Add(), which causes allocations
|
||||||
|
// and makes our series IDs rather random and harder to compress in postings.
|
||||||
|
s := h.series.getByHash(hash, lset)
|
||||||
|
if s != nil {
|
||||||
|
return s, false
|
||||||
|
}
|
||||||
|
|
||||||
// Optimistically assume that we are the first one to create the series.
|
// Optimistically assume that we are the first one to create the series.
|
||||||
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
||||||
|
|
||||||
|
return h.getOrCreateWithID(id, hash, lset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||||
s := newMemSeries(lset, id, h.chunkRange)
|
s := newMemSeries(lset, id, h.chunkRange)
|
||||||
|
|
||||||
s, created := h.series.getOrSet(hash, s)
|
s, created := h.series.getOrSet(hash, s)
|
||||||
// Skip indexing if we didn't actually create the series.
|
|
||||||
if !created {
|
if !created {
|
||||||
return s
|
return s, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.metrics.series.Inc()
|
||||||
|
h.metrics.seriesCreated.Inc()
|
||||||
|
|
||||||
h.postings.add(id, lset)
|
h.postings.add(id, lset)
|
||||||
|
|
||||||
h.symMtx.Lock()
|
h.symMtx.Lock()
|
||||||
|
@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
h.symbols[l.Value] = struct{}{}
|
h.symbols[l.Value] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||||||
|
@ -1023,6 +1034,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
|
||||||
s.locks[i].Lock()
|
s.locks[i].Lock()
|
||||||
|
|
||||||
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
||||||
|
s.locks[i].Unlock()
|
||||||
return prev, false
|
return prev, false
|
||||||
}
|
}
|
||||||
s.hashes[i].set(hash, series)
|
s.hashes[i].set(hash, series)
|
||||||
|
|
3
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
3
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
|
@ -570,6 +570,9 @@ var (
|
||||||
errInvalidFlag = fmt.Errorf("invalid flag")
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewIndexReader returns a new IndexReader on the given directory.
|
||||||
|
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) }
|
||||||
|
|
||||||
// newIndexReader returns a new indexReader on the given directory.
|
// newIndexReader returns a new indexReader on the given directory.
|
||||||
func newIndexReader(dir string) (*indexReader, error) {
|
func newIndexReader(dir string) (*indexReader, error) {
|
||||||
f, err := openMmapFile(filepath.Join(dir, "index"))
|
f, err := openMmapFile(filepath.Join(dir, "index"))
|
||||||
|
|
15
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
15
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
|
@ -99,9 +99,6 @@ type WALReader interface {
|
||||||
type RefSeries struct {
|
type RefSeries struct {
|
||||||
Ref uint64
|
Ref uint64
|
||||||
Labels labels.Labels
|
Labels labels.Labels
|
||||||
|
|
||||||
// hash for the label set. This field is not generally populated.
|
|
||||||
hash uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||||
|
@ -827,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode series entry")
|
return errors.Wrap(err, "decode series entry")
|
||||||
}
|
}
|
||||||
seriesf(series)
|
if err := seriesf(series); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
||||||
|
@ -842,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode samples entry")
|
return errors.Wrap(err, "decode samples entry")
|
||||||
}
|
}
|
||||||
samplesf(samples)
|
if err := samplesf(samples); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
@ -858,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode delete entry")
|
return errors.Wrap(err, "decode delete entry")
|
||||||
}
|
}
|
||||||
deletesf(stones)
|
if err := deletesf(stones); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
14
vendor/vendor.json
vendored
14
vendor/vendor.json
vendored
|
@ -871,22 +871,22 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=",
|
"checksumSHA1": "B5ndMoK8lqgFJ8xUZ/0V4zCpUw0=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
|
"revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a",
|
||||||
"revisionTime": "2017-09-11T08:41:33Z"
|
"revisionTime": "2017-09-19T08:20:19Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
|
"revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a",
|
||||||
"revisionTime": "2017-09-11T08:41:33Z"
|
"revisionTime": "2017-09-19T08:20:19Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
|
"revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a",
|
||||||
"revisionTime": "2017-09-11T08:41:33Z"
|
"revisionTime": "2017-09-19T08:20:19Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in a new issue