From b9f0baf6ff2bebfd71b604fb218a37edd2be780f Mon Sep 17 00:00:00 2001 From: Dustin Hooten Date: Tue, 9 Feb 2021 07:12:48 -0700 Subject: [PATCH] Combine NewHead() args into a HeadOptions struct (#8452) * Combine NewHead() args into a HeadOptions struct Signed-off-by: Dustin Hooten * remove overrides params Signed-off-by: Dustin Hooten * address pr feedback Signed-off-by: Dustin Hooten --- tsdb/block_test.go | 4 ++- tsdb/blockwriter.go | 7 +++-- tsdb/compact_test.go | 5 +++- tsdb/db.go | 21 ++++++++++--- tsdb/head.go | 61 ++++++++++++++++++++++++++------------ tsdb/head_bench_test.go | 11 +++++-- tsdb/head_test.go | 35 +++++++++++++++++----- tsdb/querier_bench_test.go | 11 +++++-- tsdb/querier_test.go | 9 ++++-- web/api/v1/api_test.go | 5 ++-- 10 files changed, 124 insertions(+), 45 deletions(-) diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 1da34b16ac..ffaa6b7c53 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -322,7 +322,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head { - head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkDirRoot = chunkDir + head, err := NewHead(nil, nil, w, opts) require.NoError(tb, err) app := head.Appender(context.Background()) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index baf87f1400..7ede6c6304 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -27,7 +27,6 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/chunks" ) // BlockWriter is a block writer that allows appending and flushing series to disk. @@ -70,8 +69,10 @@ func (w *BlockWriter) initHead() error { return errors.Wrap(err, "create temp dir") } w.chunkDir = chunkDir - - h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = w.blockSize + opts.ChunkDirRoot = w.chunkDir + h, err := NewHead(nil, w.logger, nil, opts) if err != nil { return errors.Wrap(err, "tsdb.NewHead") } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 377ccb5531..35785f5050 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1093,7 +1093,10 @@ func BenchmarkCompactionFromHead(b *testing.B) { defer func() { require.NoError(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + h, err := NewHead(nil, nil, nil, opts) require.NoError(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender(context.Background()) diff --git a/tsdb/db.go b/tsdb/db.go index 3ddf73d21a..5c95e02c02 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -334,7 +334,9 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { if err != nil { return err } - head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkDirRoot = db.dir + head, err := NewHead(nil, db.logger, w, opts) if err != nil { return err } @@ -387,7 +389,9 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue blocks[i] = b } - head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkDirRoot = db.dir + head, err := NewHead(nil, db.logger, nil, opts) if err != nil { return nil, err } @@ -405,7 +409,9 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue if err != nil { return nil, err } - head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkDirRoot = db.dir + head, err = NewHead(nil, db.logger, w, opts) if err != nil { return nil, err } @@ -650,7 +656,14 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } } - db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.HeadChunksWriteBufferSize, opts.StripeSize, opts.SeriesLifecycleCallback) + headOpts := DefaultHeadOptions() + headOpts.ChunkRange = rngs[0] + headOpts.ChunkDirRoot = dir + headOpts.ChunkPool = db.chunkPool + headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize + headOpts.StripeSize = opts.StripeSize + headOpts.SeriesCallback = opts.SeriesLifecycleCallback + db.head, err = NewHead(r, l, wlog, headOpts) if err != nil { return nil, err } diff --git a/tsdb/head.go b/tsdb/head.go index d968caa281..138f8adfe6 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -61,6 +61,7 @@ type Head struct { lastSeriesID atomic.Uint64 metrics *headMetrics + opts *HeadOptions wal *wal.WAL logger log.Logger appendPool sync.Pool @@ -69,8 +70,7 @@ type Head struct { memChunkPool sync.Pool // All series addressable by their ID or hash. - series *stripeSeries - seriesCallback SeriesLifecycleCallback + series *stripeSeries symMtx sync.RWMutex symbols map[string]struct{} @@ -90,13 +90,36 @@ type Head struct { // chunkDiskMapper is used to write and read Head chunks to/from disk. chunkDiskMapper *chunks.ChunkDiskMapper - // chunkDirRoot is the parent directory of the chunks directory. - chunkDirRoot string closedMtx sync.Mutex closed bool } +// HeadOptions are parameters for the Head block. +type HeadOptions struct { + ChunkRange int64 + // ChunkDirRoot is the parent directory of the chunks directory. + ChunkDirRoot string + ChunkPool chunkenc.Pool + ChunkWriteBufferSize int + // StripeSize sets the number of entries in the hash map, it must be a power of 2. + // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. + // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. + StripeSize int + SeriesCallback SeriesLifecycleCallback +} + +func DefaultHeadOptions() *HeadOptions { + return &HeadOptions{ + ChunkRange: DefaultBlockDuration, + ChunkDirRoot: "", + ChunkPool: chunkenc.NewPool(), + ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, + StripeSize: DefaultStripeSize, + SeriesCallback: &noopSeriesLifecycleCallback{}, + } +} + type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc @@ -292,23 +315,21 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings } // NewHead opens the head block in dir. -// stripeSize sets the number of entries in the hash map, it must be a power of 2. -// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. -// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, chkPool chunkenc.Pool, chkWriteBufferSize, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions) (*Head, error) { if l == nil { l = log.NewNopLogger() } - if chunkRange < 1 { - return nil, errors.Errorf("invalid chunk range %d", chunkRange) + if opts.ChunkRange < 1 { + return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange) } - if seriesCallback == nil { - seriesCallback = &noopSeriesLifecycleCallback{} + if opts.SeriesCallback == nil { + opts.SeriesCallback = &noopSeriesLifecycleCallback{} } h := &Head{ wal: wal, logger: l, - series: newStripeSeries(stripeSize, seriesCallback), + opts: opts, + series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), tombstones: tombstones.NewMemTombstones(), @@ -319,21 +340,23 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int return &memChunk{} }, }, - chunkDirRoot: chkDirRoot, - seriesCallback: seriesCallback, } - h.chunkRange.Store(chunkRange) + h.chunkRange.Store(opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) h.lastWALTruncationTime.Store(math.MinInt64) h.metrics = newHeadMetrics(h, r) - if chkPool == nil { - chkPool = chunkenc.NewPool() + if opts.ChunkPool == nil { + opts.ChunkPool = chunkenc.NewPool() } var err error - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), chkPool, chkWriteBufferSize) + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + ) if err != nil { return nil, err } diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index ea7afc63cb..43cc4d5f49 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -23,7 +23,6 @@ import ( "go.uber.org/atomic" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/tsdb/chunks" ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { @@ -33,7 +32,10 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { require.NoError(b, os.RemoveAll(chunkDir)) }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + h, err := NewHead(nil, nil, nil, opts) require.NoError(b, err) defer h.Close() @@ -49,7 +51,10 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { require.NoError(b, os.RemoveAll(chunkDir)) }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + h, err := NewHead(nil, nil, nil, opts) require.NoError(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ec15d2cf01..7f254e6c7b 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -47,7 +47,10 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) - h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = chunkRange + opts.ChunkDirRoot = dir + h, err := NewHead(nil, nil, wlog, opts) require.NoError(t, err) require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) @@ -191,7 +194,10 @@ func BenchmarkLoadWAL(b *testing.B) { // Load the WAL. for i := 0; i < b.N; i++ { - h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = w.Dir() + h, err := NewHead(nil, nil, w, opts) require.NoError(b, err) h.Init(0) } @@ -302,7 +308,10 @@ func TestHead_WALMultiRef(t *testing.T) { w, err = wal.New(nil, nil, w.Dir(), false) require.NoError(t, err) - head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = w.Dir() + head, err = NewHead(nil, nil, w, opts) require.NoError(t, err) require.NoError(t, head.Init(0)) defer func() { @@ -583,7 +592,10 @@ func TestHeadDeleteSimple(t *testing.T) { // Compare the samples for both heads - before and after the reloadBlocks. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. require.NoError(t, err) - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = reloadedW.Dir() + reloadedHead, err := NewHead(nil, nil, reloadedW, opts) require.NoError(t, err) require.NoError(t, reloadedHead.Init(0)) @@ -1265,7 +1277,10 @@ func TestWalRepair_DecodingError(t *testing.T) { require.NoError(t, w.Log(test.rec)) } - h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = w.Dir() + h, err := NewHead(nil, nil, w, opts) require.NoError(t, err) require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1320,7 +1335,10 @@ func TestHeadReadWriterRepair(t *testing.T) { w, err := wal.New(nil, nil, walDir, false) require.NoError(t, err) - h, err := NewHead(nil, nil, w, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = chunkRange + opts.ChunkDirRoot = dir + h, err := NewHead(nil, nil, w, opts) require.NoError(t, err) require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) require.NoError(t, h.Init(math.MinInt64)) @@ -1550,7 +1568,10 @@ func TestMemSeriesIsolation(t *testing.T) { wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) require.NoError(t, err) - hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = wlog.Dir() + hb, err = NewHead(nil, nil, wlog, opts) defer func() { require.NoError(t, hb.Close()) }() require.NoError(t, err) require.NoError(t, hb.Init(0)) diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index dc4df74c58..6002c6c7ad 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/tsdb/chunks" ) // Make entries ~50B in size, to emulate real-world high cardinality. @@ -38,7 +37,10 @@ func BenchmarkPostingsForMatchers(b *testing.B) { defer func() { require.NoError(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + h, err := NewHead(nil, nil, nil, opts) require.NoError(b, err) defer func() { require.NoError(b, h.Close()) @@ -147,7 +149,10 @@ func BenchmarkQuerierSelect(b *testing.B) { defer func() { require.NoError(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + h, err := NewHead(nil, nil, nil, opts) require.NoError(b, err) defer h.Close() app := h.Appender(context.Background()) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index dc027f8213..c0c26dbd4b 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -407,7 +407,9 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - h, err := NewHead(nil, nil, nil, 2*time.Hour.Milliseconds(), "", nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 2 * time.Hour.Milliseconds() + h, err := NewHead(nil, nil, nil, opts) require.NoError(t, err) defer h.Close() @@ -1550,7 +1552,10 @@ func TestPostingsForMatchers(t *testing.T) { defer func() { require.NoError(t, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + h, err := NewHead(nil, nil, nil, opts) require.NoError(t, err) defer func() { require.NoError(t, h.Close()) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 1702fa5860..8240ff2894 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -56,7 +56,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/util/teststorage" ) @@ -2267,7 +2266,9 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) { retErr = err } }() - h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, chunks.DefaultWriteBufferSize, tsdb.DefaultStripeSize, nil) + opts := tsdb.DefaultHeadOptions() + opts.ChunkRange = 1000 + h, _ := tsdb.NewHead(nil, nil, nil, opts) return h.Stats(statsByLabelName), nil }