mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
TSDB: optimize series creation on PreCreation() failure (#8620)
Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
4b5ab80ca6
commit
6248e685b2
27
tsdb/head.go
27
tsdb/head.go
|
@ -1871,7 +1871,7 @@ func (h *headIndexReader) LabelValueFor(id uint64, label string) (string, error)
|
|||
}
|
||||
|
||||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
||||
// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create
|
||||
// a new series on every sample inserted via Add(), which causes allocations
|
||||
// and makes our series IDs rather random and harder to compress in postings.
|
||||
s := h.series.getByHash(hash, lset)
|
||||
|
@ -1886,9 +1886,9 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
|||
}
|
||||
|
||||
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
s := newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool)
|
||||
|
||||
s, created, err := h.series.getOrSet(hash, s)
|
||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||
return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
@ -2077,27 +2077,34 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
|
|||
return series
|
||||
}
|
||||
|
||||
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool, error) {
|
||||
func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) {
|
||||
// PreCreation is called here to avoid calling it inside the lock.
|
||||
// It is not necessary to call it just before creating a series,
|
||||
// rather it gives a 'hint' whether to create a series or not.
|
||||
createSeriesErr := s.seriesLifecycleCallback.PreCreation(series.lset)
|
||||
preCreationErr := s.seriesLifecycleCallback.PreCreation(lset)
|
||||
|
||||
// Create the series, unless the PreCreation() callback as failed.
|
||||
// If failed, we'll not allow to create a new series anyway.
|
||||
var series *memSeries
|
||||
if preCreationErr == nil {
|
||||
series = createSeries()
|
||||
}
|
||||
|
||||
i := hash & uint64(s.size-1)
|
||||
s.locks[i].Lock()
|
||||
|
||||
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
||||
if prev := s.hashes[i].get(hash, lset); prev != nil {
|
||||
s.locks[i].Unlock()
|
||||
return prev, false, nil
|
||||
}
|
||||
if createSeriesErr == nil {
|
||||
if preCreationErr == nil {
|
||||
s.hashes[i].set(hash, series)
|
||||
}
|
||||
s.locks[i].Unlock()
|
||||
|
||||
if createSeriesErr != nil {
|
||||
if preCreationErr != nil {
|
||||
// The callback prevented creation of series.
|
||||
return nil, false, createSeriesErr
|
||||
return nil, false, preCreationErr
|
||||
}
|
||||
// Setting the series in the s.hashes marks the creation of series
|
||||
// as any further calls to this methods would return that series.
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
|
@ -67,3 +68,32 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
|
||||
chunkDir, err := ioutil.TempDir("", "chunk_dir")
|
||||
require.NoError(b, err)
|
||||
defer func() {
|
||||
require.NoError(b, os.RemoveAll(chunkDir))
|
||||
}()
|
||||
// Put a series, select it. GC it and then access it.
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
opts.ChunkDirRoot = chunkDir
|
||||
|
||||
// Mock the PreCreation() callback to fail on each series.
|
||||
opts.SeriesCallback = failingSeriesLifecycleCallback{}
|
||||
|
||||
h, err := NewHead(nil, nil, nil, opts)
|
||||
require.NoError(b, err)
|
||||
defer h.Close()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
|
||||
}
|
||||
}
|
||||
|
||||
type failingSeriesLifecycleCallback struct{}
|
||||
|
||||
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }
|
||||
func (failingSeriesLifecycleCallback) PostCreation(labels.Labels) {}
|
||||
func (failingSeriesLifecycleCallback) PostDeletion(...labels.Labels) {}
|
||||
|
|
Loading…
Reference in a new issue