diff --git a/tsdb/block_test.go b/tsdb/block_test.go index ef19468a34..3a2fe3af91 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -320,7 +320,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createHead(tb testing.TB, series []Series) *Head { - head, err := NewHead(nil, nil, nil, 2*60*60*1000) + head, err := NewHead(nil, nil, nil, 2*60*60*1000, DefaultStripeSize) testutil.Ok(tb, err) defer head.Close() diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index d4ebd65b0c..515c3f4ef5 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -870,7 +870,7 @@ func BenchmarkCompactionFromHead(b *testing.B) { for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { labelValues := totalSeries / labelNames b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender() diff --git a/tsdb/db.go b/tsdb/db.go index c42bda493d..d1cd35cf65 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -57,6 +57,7 @@ var DefaultOptions = &Options{ NoLockfile: false, AllowOverlappingBlocks: false, WALCompression: false, + StripeSize: DefaultStripeSize, } // Options of the DB storage. @@ -89,6 +90,9 @@ type Options struct { // WALCompression will turn on Snappy compression for records on the WAL. WALCompression bool + + // StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact perfomance. + StripeSize int } // Appender allows appending a batch of data. It must be completed with a @@ -309,7 +313,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error { if err != nil { return err } - head, err := NewHead(nil, db.logger, w, 1) + head, err := NewHead(nil, db.logger, w, 1, DefaultStripeSize) if err != nil { return err } @@ -356,7 +360,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { blocks[i] = b } - head, err := NewHead(nil, db.logger, nil, 1) + head, err := NewHead(nil, db.logger, nil, 1, DefaultStripeSize) if err != nil { return nil, err } @@ -371,7 +375,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { if err != nil { return nil, err } - head, err = NewHead(nil, db.logger, w, 1) + head, err = NewHead(nil, db.logger, w, 1, DefaultStripeSize) if err != nil { return nil, err } @@ -488,6 +492,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if opts == nil { opts = DefaultOptions } + if opts.StripeSize <= 0 { + opts.StripeSize = DefaultStripeSize + } // Fixup bad format written by Prometheus 2.1. if err := repairBadIndexVersion(l, dir); err != nil { return nil, err @@ -549,7 +556,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } } - db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0]) + db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0], opts.StripeSize) if err != nil { return nil, err } diff --git a/tsdb/head.go b/tsdb/head.go index 2bf250894b..ce4a367192 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -257,7 +257,10 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) { +// stripeSize sets the number of entries in the hash map, it must be a power of 2. +// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. +// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, stripeSize int) (*Head, error) { if l == nil { l = log.NewNopLogger() } @@ -270,7 +273,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int chunkRange: chunkRange, minTime: math.MaxInt64, maxTime: math.MinInt64, - series: newStripeSeries(), + series: newStripeSeries(stripeSize), values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), @@ -1491,29 +1494,35 @@ func (m seriesHashmap) del(hash uint64, lset labels.Labels) { } } +const ( + // DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map. + DefaultStripeSize = 1 << 14 +) + // stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. // The locks are padded to not be on the same cache line. Filling the padded space // with the maps was profiled to be slower – likely due to the additional pointer // dereferences. type stripeSeries struct { - series [stripeSize]map[uint64]*memSeries - hashes [stripeSize]seriesHashmap - locks [stripeSize]stripeLock + size int + series []map[uint64]*memSeries + hashes []seriesHashmap + locks []stripeLock } -const ( - stripeSize = 1 << 14 - stripeMask = stripeSize - 1 -) - type stripeLock struct { sync.RWMutex // Padding to avoid multiple locks being on the same cache line. _ [40]byte } -func newStripeSeries() *stripeSeries { - s := &stripeSeries{} +func newStripeSeries(stripeSize int) *stripeSeries { + s := &stripeSeries{ + size: stripeSize, + series: make([]map[uint64]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + locks: make([]stripeLock, stripeSize), + } for i := range s.series { s.series[i] = map[uint64]*memSeries{} @@ -1533,7 +1542,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { ) // Run through all series and truncate old chunks. Mark those with no // chunks left as deleted and store their ID. - for i := 0; i < stripeSize; i++ { + for i := 0; i < s.size; i++ { s.locks[i].Lock() for hash, all := range s.hashes[i] { @@ -1551,7 +1560,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { // series alike. // If we don't hold them all, there's a very small chance that a series receives // samples again while we are half-way into deleting it. - j := int(series.ref & stripeMask) + j := int(series.ref) & (s.size - 1) if i != j { s.locks[j].Lock() @@ -1576,7 +1585,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { } func (s *stripeSeries) getByID(id uint64) *memSeries { - i := id & stripeMask + i := id & uint64(s.size-1) s.locks[i].RLock() series := s.series[i][id] @@ -1586,7 +1595,7 @@ func (s *stripeSeries) getByID(id uint64) *memSeries { } func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { - i := hash & stripeMask + i := hash & uint64(s.size-1) s.locks[i].RLock() series := s.hashes[i].get(hash, lset) @@ -1596,7 +1605,7 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { } func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) { - i := hash & stripeMask + i := hash & uint64(s.size-1) s.locks[i].Lock() @@ -1607,7 +1616,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo s.hashes[i].set(hash, series) s.locks[i].Unlock() - i = series.ref & stripeMask + i = series.ref & uint64(s.size-1) s.locks[i].Lock() s.series[i][series.ref] = series diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 0f807814ed..c1ada34171 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -24,7 +24,7 @@ import ( func BenchmarkHeadStripeSeriesCreate(b *testing.B) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() @@ -35,7 +35,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 70a0f56e85..164666db5e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -41,7 +41,7 @@ import ( func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, err := NewHead(nil, nil, nil, 10000) + h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() @@ -171,7 +171,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Load the WAL. for i := 0; i < b.N; i++ { - h, err := NewHead(nil, nil, w, 1000) + h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) testutil.Ok(b, err) h.Init(0) } @@ -218,7 +218,7 @@ func TestHead_ReadWAL(t *testing.T) { defer w.Close() populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 1000) + head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) testutil.Ok(t, err) testutil.Ok(t, head.Init(math.MinInt64)) @@ -259,7 +259,7 @@ func TestHead_WALMultiRef(t *testing.T) { w, err := wal.New(nil, nil, dir, false) testutil.Ok(t, err) - head, err := NewHead(nil, nil, w, 1000) + head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) testutil.Ok(t, err) testutil.Ok(t, head.Init(0)) @@ -283,7 +283,7 @@ func TestHead_WALMultiRef(t *testing.T) { w, err = wal.New(nil, nil, dir, false) testutil.Ok(t, err) - head, err = NewHead(nil, nil, w, 1000) + head, err = NewHead(nil, nil, w, 10000, DefaultStripeSize) testutil.Ok(t, err) testutil.Ok(t, head.Init(0)) defer head.Close() @@ -295,7 +295,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_Truncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() @@ -432,7 +432,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { defer w.Close() populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 1000) + head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) testutil.Ok(t, err) testutil.Ok(t, head.Init(math.MinInt64)) @@ -506,7 +506,7 @@ func TestHeadDeleteSimple(t *testing.T) { testutil.Ok(t, err) defer w.Close() - head, err := NewHead(nil, nil, w, 1000) + head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) testutil.Ok(t, err) defer head.Close() @@ -536,7 +536,7 @@ func TestHeadDeleteSimple(t *testing.T) { reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) defer reloadedW.Close() - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, DefaultStripeSize) testutil.Ok(t, err) defer reloadedHead.Close() testutil.Ok(t, reloadedHead.Init(0)) @@ -585,7 +585,7 @@ func TestHeadDeleteSimple(t *testing.T) { func TestDeleteUntilCurMax(t *testing.T) { numSamples := int64(10) - hb, err := NewHead(nil, nil, nil, 1000000) + hb, err := NewHead(nil, nil, nil, 1000000, DefaultStripeSize) testutil.Ok(t, err) defer hb.Close() app := hb.Appender() @@ -636,7 +636,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { // Enough samples to cause a checkpoint. numSamples := 10000 - hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10) + hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10, DefaultStripeSize) testutil.Ok(t, err) defer hb.Close() for i := 0; i < numSamples; i++ { @@ -730,7 +730,7 @@ func TestDelete_e2e(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - hb, err := NewHead(nil, nil, nil, 100000) + hb, err := NewHead(nil, nil, nil, 100000, DefaultStripeSize) testutil.Ok(t, err) defer hb.Close() app := hb.Appender() @@ -952,7 +952,7 @@ func TestMemSeries_append(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() @@ -992,7 +992,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() @@ -1033,7 +1033,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() @@ -1060,7 +1060,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() @@ -1102,7 +1102,7 @@ func TestHead_LogRollback(t *testing.T) { w, err := wal.New(nil, nil, dir, compress) testutil.Ok(t, err) defer w.Close() - h, err := NewHead(nil, nil, w, 1000) + h, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) testutil.Ok(t, err) app := h.Appender() @@ -1190,7 +1190,7 @@ func TestWalRepair_DecodingError(t *testing.T) { testutil.Ok(t, w.Log(test.rec)) } - h, err := NewHead(nil, nil, w, 1) + h, err := NewHead(nil, nil, w, 1, DefaultStripeSize) testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1239,7 +1239,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { wlog, err := wal.NewSize(nil, nil, dir, 32768, false) testutil.Ok(t, err) - h, err := NewHead(nil, nil, wlog, 1000) + h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() add := func(ts int64) { diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 39f3b7c901..bd7364b451 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -30,7 +30,7 @@ const ( ) func BenchmarkPostingsForMatchers(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(b, err) defer func() { testutil.Ok(b, h.Close()) @@ -126,7 +126,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { } func BenchmarkQuerierSelect(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() app := h.Appender() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 44782ce86e..f391dc7917 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1812,7 +1812,7 @@ func TestFindSetMatches(t *testing.T) { } func TestPostingsForMatchers(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000) + h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) testutil.Ok(t, err) defer func() { testutil.Ok(t, h.Close()) diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index eb17560195..891876bcaa 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -16,10 +16,11 @@ package tsdb import ( "context" "fmt" - "github.com/go-kit/kit/log" - "github.com/prometheus/prometheus/pkg/labels" "os" "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" ) var InvalidTimesError = fmt.Errorf("max time is lesser than min time") @@ -32,7 +33,7 @@ type MetricSample struct { // CreateHead creates a TSDB writer head to write the sample data to. func CreateHead(samples []*MetricSample, chunkRange int64, logger log.Logger) (*Head, error) { - head, err := NewHead(nil, logger, nil, chunkRange) + head, err := NewHead(nil, logger, nil, chunkRange, DefaultStripeSize) if err != nil { return nil, err } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index fd6d490003..bc4d710584 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1877,7 +1877,7 @@ func (f *fakeDB) Dir() string { } func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err } func (f *fakeDB) Head() *tsdb.Head { - h, _ := tsdb.NewHead(nil, nil, nil, 1000) + h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize) return h }