mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Simplify series create logic in head
This commit is contained in:
parent
ab8d9b9706
commit
7ada9cd805
33
head.go
33
head.go
|
@ -185,13 +185,14 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadWAL initializes the head by consuming the write ahead log.
|
||||||
func (h *Head) ReadWAL() error {
|
func (h *Head) ReadWAL() error {
|
||||||
r := h.wal.Reader()
|
r := h.wal.Reader()
|
||||||
mint := h.MinTime()
|
mint := h.MinTime()
|
||||||
|
|
||||||
seriesFunc := func(series []RefSeries) error {
|
seriesFunc := func(series []RefSeries) error {
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
h.create(s.Labels.Hash(), s.Labels)
|
h.getOrCreate(s.Labels.Hash(), s.Labels)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -379,17 +380,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
||||||
if t < a.mint {
|
if t < a.mint {
|
||||||
return 0, ErrOutOfBounds
|
return 0, ErrOutOfBounds
|
||||||
}
|
}
|
||||||
hash := lset.Hash()
|
|
||||||
|
|
||||||
s := a.head.series.getByHash(hash, lset)
|
|
||||||
|
|
||||||
if s == nil {
|
|
||||||
s = a.head.create(hash, lset)
|
|
||||||
|
|
||||||
|
s, created := a.head.getOrCreate(lset.Hash(), lset)
|
||||||
|
if created {
|
||||||
a.series = append(a.series, RefSeries{
|
a.series = append(a.series, RefSeries{
|
||||||
Ref: s.ref,
|
Ref: s.ref,
|
||||||
Labels: lset,
|
Labels: lset,
|
||||||
hash: hash,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return s.ref, a.AddFast(s.ref, t, v)
|
return s.ref, a.AddFast(s.ref, t, v)
|
||||||
|
@ -839,20 +835,27 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||||
h.metrics.series.Inc()
|
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
||||||
h.metrics.seriesCreated.Inc()
|
// a new series on every sample inserted via Add(), which causes allocations
|
||||||
|
// and makes our series IDs rather random and harder to compress in postings.
|
||||||
|
s := h.series.getByHash(hash, lset)
|
||||||
|
if s != nil {
|
||||||
|
return s, false
|
||||||
|
}
|
||||||
|
|
||||||
// Optimistically assume that we are the first one to create the series.
|
// Optimistically assume that we are the first one to create the series.
|
||||||
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
||||||
s := newMemSeries(lset, id, h.chunkRange)
|
s = newMemSeries(lset, id, h.chunkRange)
|
||||||
|
|
||||||
s, created := h.series.getOrSet(hash, s)
|
s, created := h.series.getOrSet(hash, s)
|
||||||
// Skip indexing if we didn't actually create the series.
|
|
||||||
if !created {
|
if !created {
|
||||||
return s
|
return s, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.metrics.series.Inc()
|
||||||
|
h.metrics.seriesCreated.Inc()
|
||||||
|
|
||||||
h.postings.add(id, lset)
|
h.postings.add(id, lset)
|
||||||
|
|
||||||
h.symMtx.Lock()
|
h.symMtx.Lock()
|
||||||
|
@ -870,7 +873,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
h.symbols[l.Value] = struct{}{}
|
h.symbols[l.Value] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||||||
|
|
10
head_test.go
10
head_test.go
|
@ -41,7 +41,7 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for _, l := range lbls {
|
for _, l := range lbls {
|
||||||
h.create(l.Hash(), l)
|
h.getOrCreate(l.Hash(), l)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,10 +89,10 @@ func TestHead_Truncate(t *testing.T) {
|
||||||
|
|
||||||
h.initTime(0)
|
h.initTime(0)
|
||||||
|
|
||||||
s1 := h.create(1, labels.FromStrings("a", "1", "b", "1"))
|
s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
|
||||||
s2 := h.create(2, labels.FromStrings("a", "2", "b", "1"))
|
s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
|
||||||
s3 := h.create(3, labels.FromStrings("a", "1", "b", "2"))
|
s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
|
||||||
s4 := h.create(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
|
s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
|
||||||
|
|
||||||
s1.chunks = []*memChunk{
|
s1.chunks = []*memChunk{
|
||||||
{minTime: 0, maxTime: 999},
|
{minTime: 0, maxTime: 999},
|
||||||
|
|
3
wal.go
3
wal.go
|
@ -99,9 +99,6 @@ type WALReader interface {
|
||||||
type RefSeries struct {
|
type RefSeries struct {
|
||||||
Ref uint64
|
Ref uint64
|
||||||
Labels labels.Labels
|
Labels labels.Labels
|
||||||
|
|
||||||
// hash for the label set. This field is not generally populated.
|
|
||||||
hash uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||||
|
|
Loading…
Reference in a new issue