made stripe size configurable (#6644)

Signed-off-by: Thor <thansen@digitalocean.com>
This commit is contained in:
Thor 2020-01-30 07:12:43 +00:00 committed by GitHub
parent 9c67fce6e0
commit 17d8c49919
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 69 additions and 52 deletions

View file

@ -320,7 +320,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}
func createHead(tb testing.TB, series []Series) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
head, err := NewHead(nil, nil, nil, 2*60*60*1000, DefaultStripeSize)
testutil.Ok(tb, err)
defer head.Close()

View file

@ -870,7 +870,7 @@ func BenchmarkCompactionFromHead(b *testing.B) {
for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
labelValues := totalSeries / labelNames
b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender()

View file

@ -57,6 +57,7 @@ var DefaultOptions = &Options{
NoLockfile: false,
AllowOverlappingBlocks: false,
WALCompression: false,
StripeSize: DefaultStripeSize,
}
// Options of the DB storage.
@ -89,6 +90,9 @@ type Options struct {
// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
// StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact perfomance.
StripeSize int
}
// Appender allows appending a batch of data. It must be completed with a
@ -309,7 +313,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error {
if err != nil {
return err
}
head, err := NewHead(nil, db.logger, w, 1)
head, err := NewHead(nil, db.logger, w, 1, DefaultStripeSize)
if err != nil {
return err
}
@ -356,7 +360,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
blocks[i] = b
}
head, err := NewHead(nil, db.logger, nil, 1)
head, err := NewHead(nil, db.logger, nil, 1, DefaultStripeSize)
if err != nil {
return nil, err
}
@ -371,7 +375,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
if err != nil {
return nil, err
}
head, err = NewHead(nil, db.logger, w, 1)
head, err = NewHead(nil, db.logger, w, 1, DefaultStripeSize)
if err != nil {
return nil, err
}
@ -488,6 +492,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts == nil {
opts = DefaultOptions
}
if opts.StripeSize <= 0 {
opts.StripeSize = DefaultStripeSize
}
// Fixup bad format written by Prometheus 2.1.
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err
@ -549,7 +556,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
}
}
db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0])
db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0], opts.StripeSize)
if err != nil {
return nil, err
}

View file

@ -257,7 +257,10 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
}
// NewHead opens the head block in dir.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) {
// stripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, stripeSize int) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
@ -270,7 +273,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
chunkRange: chunkRange,
minTime: math.MaxInt64,
maxTime: math.MinInt64,
series: newStripeSeries(),
series: newStripeSeries(stripeSize),
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
@ -1491,29 +1494,35 @@ func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
}
}
const (
// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
DefaultStripeSize = 1 << 14
)
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention.
// The locks are padded to not be on the same cache line. Filling the padded space
// with the maps was profiled to be slower likely due to the additional pointer
// dereferences.
type stripeSeries struct {
series [stripeSize]map[uint64]*memSeries
hashes [stripeSize]seriesHashmap
locks [stripeSize]stripeLock
size int
series []map[uint64]*memSeries
hashes []seriesHashmap
locks []stripeLock
}
const (
stripeSize = 1 << 14
stripeMask = stripeSize - 1
)
type stripeLock struct {
sync.RWMutex
// Padding to avoid multiple locks being on the same cache line.
_ [40]byte
}
func newStripeSeries() *stripeSeries {
s := &stripeSeries{}
func newStripeSeries(stripeSize int) *stripeSeries {
s := &stripeSeries{
size: stripeSize,
series: make([]map[uint64]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize),
locks: make([]stripeLock, stripeSize),
}
for i := range s.series {
s.series[i] = map[uint64]*memSeries{}
@ -1533,7 +1542,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
)
// Run through all series and truncate old chunks. Mark those with no
// chunks left as deleted and store their ID.
for i := 0; i < stripeSize; i++ {
for i := 0; i < s.size; i++ {
s.locks[i].Lock()
for hash, all := range s.hashes[i] {
@ -1551,7 +1560,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
// series alike.
// If we don't hold them all, there's a very small chance that a series receives
// samples again while we are half-way into deleting it.
j := int(series.ref & stripeMask)
j := int(series.ref) & (s.size - 1)
if i != j {
s.locks[j].Lock()
@ -1576,7 +1585,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
}
func (s *stripeSeries) getByID(id uint64) *memSeries {
i := id & stripeMask
i := id & uint64(s.size-1)
s.locks[i].RLock()
series := s.series[i][id]
@ -1586,7 +1595,7 @@ func (s *stripeSeries) getByID(id uint64) *memSeries {
}
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
i := hash & stripeMask
i := hash & uint64(s.size-1)
s.locks[i].RLock()
series := s.hashes[i].get(hash, lset)
@ -1596,7 +1605,7 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
}
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
i := hash & stripeMask
i := hash & uint64(s.size-1)
s.locks[i].Lock()
@ -1607,7 +1616,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
s.hashes[i].set(hash, series)
s.locks[i].Unlock()
i = series.ref & stripeMask
i = series.ref & uint64(s.size-1)
s.locks[i].Lock()
s.series[i][series.ref] = series

View file

@ -24,7 +24,7 @@ import (
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(b, err)
defer h.Close()
@ -35,7 +35,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(b, err)
defer h.Close()

View file

@ -41,7 +41,7 @@ import (
func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0)
h, err := NewHead(nil, nil, nil, 10000)
h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize)
testutil.Ok(b, err)
defer h.Close()
@ -171,7 +171,7 @@ func BenchmarkLoadWAL(b *testing.B) {
// Load the WAL.
for i := 0; i < b.N; i++ {
h, err := NewHead(nil, nil, w, 1000)
h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(b, err)
h.Init(0)
}
@ -218,7 +218,7 @@ func TestHead_ReadWAL(t *testing.T) {
defer w.Close()
populateTestWAL(t, w, entries)
head, err := NewHead(nil, nil, w, 1000)
head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(math.MinInt64))
@ -259,7 +259,7 @@ func TestHead_WALMultiRef(t *testing.T) {
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
head, err := NewHead(nil, nil, w, 1000)
head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(0))
@ -283,7 +283,7 @@ func TestHead_WALMultiRef(t *testing.T) {
w, err = wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
head, err = NewHead(nil, nil, w, 1000)
head, err = NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(0))
defer head.Close()
@ -295,7 +295,7 @@ func TestHead_WALMultiRef(t *testing.T) {
}
func TestHead_Truncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -432,7 +432,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
defer w.Close()
populateTestWAL(t, w, entries)
head, err := NewHead(nil, nil, w, 1000)
head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(math.MinInt64))
@ -506,7 +506,7 @@ func TestHeadDeleteSimple(t *testing.T) {
testutil.Ok(t, err)
defer w.Close()
head, err := NewHead(nil, nil, w, 1000)
head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer head.Close()
@ -536,7 +536,7 @@ func TestHeadDeleteSimple(t *testing.T) {
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err)
defer reloadedW.Close()
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer reloadedHead.Close()
testutil.Ok(t, reloadedHead.Init(0))
@ -585,7 +585,7 @@ func TestHeadDeleteSimple(t *testing.T) {
func TestDeleteUntilCurMax(t *testing.T) {
numSamples := int64(10)
hb, err := NewHead(nil, nil, nil, 1000000)
hb, err := NewHead(nil, nil, nil, 1000000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
@ -636,7 +636,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
// Enough samples to cause a checkpoint.
numSamples := 10000
hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10)
hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
for i := 0; i < numSamples; i++ {
@ -730,7 +730,7 @@ func TestDelete_e2e(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
hb, err := NewHead(nil, nil, nil, 100000)
hb, err := NewHead(nil, nil, nil, 100000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
@ -952,7 +952,7 @@ func TestMemSeries_append(t *testing.T) {
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -992,7 +992,7 @@ func TestGCChunkAccess(t *testing.T) {
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -1033,7 +1033,7 @@ func TestGCSeriesAccess(t *testing.T) {
}
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -1060,7 +1060,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
}
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -1102,7 +1102,7 @@ func TestHead_LogRollback(t *testing.T) {
w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err)
defer w.Close()
h, err := NewHead(nil, nil, w, 1000)
h, err := NewHead(nil, nil, w, 1000, DefaultStripeSize)
testutil.Ok(t, err)
app := h.Appender()
@ -1190,7 +1190,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
testutil.Ok(t, w.Log(test.rec))
}
h, err := NewHead(nil, nil, w, 1)
h, err := NewHead(nil, nil, w, 1, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
@ -1239,7 +1239,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, wlog, 1000)
h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
add := func(ts int64) {

View file

@ -30,7 +30,7 @@ const (
)
func BenchmarkPostingsForMatchers(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, h.Close())
@ -126,7 +126,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
}
func BenchmarkQuerierSelect(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(b, err)
defer h.Close()
app := h.Appender()

View file

@ -1812,7 +1812,7 @@ func TestFindSetMatches(t *testing.T) {
}
func TestPostingsForMatchers(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000)
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, h.Close())

View file

@ -16,10 +16,11 @@ package tsdb
import (
"context"
"fmt"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"os"
"path/filepath"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
)
var InvalidTimesError = fmt.Errorf("max time is lesser than min time")
@ -32,7 +33,7 @@ type MetricSample struct {
// CreateHead creates a TSDB writer head to write the sample data to.
func CreateHead(samples []*MetricSample, chunkRange int64, logger log.Logger) (*Head, error) {
head, err := NewHead(nil, logger, nil, chunkRange)
head, err := NewHead(nil, logger, nil, chunkRange, DefaultStripeSize)
if err != nil {
return nil, err
}

View file

@ -1877,7 +1877,7 @@ func (f *fakeDB) Dir() string {
}
func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err }
func (f *fakeDB) Head() *tsdb.Head {
h, _ := tsdb.NewHead(nil, nil, nil, 1000)
h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize)
return h
}