Combine NewHead() args into a HeadOptions struct (#8452)

* Combine NewHead() args into a HeadOptions struct

Signed-off-by: Dustin Hooten <dustinhooten@gmail.com>

* remove overrides params

Signed-off-by: Dustin Hooten <dustinhooten@gmail.com>

* address pr feedback

Signed-off-by: Dustin Hooten <dustinhooten@gmail.com>
This commit is contained in:
Dustin Hooten 2021-02-09 07:12:48 -07:00 committed by GitHub
parent f2c5c230f1
commit b9f0baf6ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 124 additions and 45 deletions

View file

@ -322,7 +322,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkDirRoot = chunkDir
head, err := NewHead(nil, nil, w, opts)
require.NoError(tb, err)
app := head.Appender(context.Background())

View file

@ -27,7 +27,6 @@ import (
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
// BlockWriter is a block writer that allows appending and flushing series to disk.
@ -70,8 +69,10 @@ func (w *BlockWriter) initHead() error {
return errors.Wrap(err, "create temp dir")
}
w.chunkDir = chunkDir
h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = w.blockSize
opts.ChunkDirRoot = w.chunkDir
h, err := NewHead(nil, w.logger, nil, opts)
if err != nil {
return errors.Wrap(err, "tsdb.NewHead")
}

View file

@ -1093,7 +1093,10 @@ func BenchmarkCompactionFromHead(b *testing.B) {
defer func() {
require.NoError(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
require.NoError(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender(context.Background())

View file

@ -334,7 +334,9 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
if err != nil {
return err
}
head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
head, err := NewHead(nil, db.logger, w, opts)
if err != nil {
return err
}
@ -387,7 +389,9 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
blocks[i] = b
}
head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
head, err := NewHead(nil, db.logger, nil, opts)
if err != nil {
return nil, err
}
@ -405,7 +409,9 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
if err != nil {
return nil, err
}
head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
head, err = NewHead(nil, db.logger, w, opts)
if err != nil {
return nil, err
}
@ -650,7 +656,14 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}
}
db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.HeadChunksWriteBufferSize, opts.StripeSize, opts.SeriesLifecycleCallback)
headOpts := DefaultHeadOptions()
headOpts.ChunkRange = rngs[0]
headOpts.ChunkDirRoot = dir
headOpts.ChunkPool = db.chunkPool
headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
db.head, err = NewHead(r, l, wlog, headOpts)
if err != nil {
return nil, err
}

View file

@ -61,6 +61,7 @@ type Head struct {
lastSeriesID atomic.Uint64
metrics *headMetrics
opts *HeadOptions
wal *wal.WAL
logger log.Logger
appendPool sync.Pool
@ -69,8 +70,7 @@ type Head struct {
memChunkPool sync.Pool
// All series addressable by their ID or hash.
series *stripeSeries
seriesCallback SeriesLifecycleCallback
series *stripeSeries
symMtx sync.RWMutex
symbols map[string]struct{}
@ -90,13 +90,36 @@ type Head struct {
// chunkDiskMapper is used to write and read Head chunks to/from disk.
chunkDiskMapper *chunks.ChunkDiskMapper
// chunkDirRoot is the parent directory of the chunks directory.
chunkDirRoot string
closedMtx sync.Mutex
closed bool
}
// HeadOptions are parameters for the Head block.
type HeadOptions struct {
ChunkRange int64
// ChunkDirRoot is the parent directory of the chunks directory.
ChunkDirRoot string
ChunkPool chunkenc.Pool
ChunkWriteBufferSize int
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
StripeSize int
SeriesCallback SeriesLifecycleCallback
}
func DefaultHeadOptions() *HeadOptions {
return &HeadOptions{
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
}
}
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
@ -292,23 +315,21 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
}
// NewHead opens the head block in dir.
// stripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, chkPool chunkenc.Pool, chkWriteBufferSize, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) {
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
if opts.ChunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange)
}
if seriesCallback == nil {
seriesCallback = &noopSeriesLifecycleCallback{}
if opts.SeriesCallback == nil {
opts.SeriesCallback = &noopSeriesLifecycleCallback{}
}
h := &Head{
wal: wal,
logger: l,
series: newStripeSeries(stripeSize, seriesCallback),
opts: opts,
series: newStripeSeries(opts.StripeSize, opts.SeriesCallback),
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: tombstones.NewMemTombstones(),
@ -319,21 +340,23 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
return &memChunk{}
},
},
chunkDirRoot: chkDirRoot,
seriesCallback: seriesCallback,
}
h.chunkRange.Store(chunkRange)
h.chunkRange.Store(opts.ChunkRange)
h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64)
h.lastWALTruncationTime.Store(math.MinInt64)
h.metrics = newHeadMetrics(h, r)
if chkPool == nil {
chkPool = chunkenc.NewPool()
if opts.ChunkPool == nil {
opts.ChunkPool = chunkenc.NewPool()
}
var err error
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), chkPool, chkWriteBufferSize)
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
mmappedChunksDir(opts.ChunkDirRoot),
opts.ChunkPool,
opts.ChunkWriteBufferSize,
)
if err != nil {
return nil, err
}

View file

@ -23,7 +23,6 @@ import (
"go.uber.org/atomic"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
)
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
@ -33,7 +32,10 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
require.NoError(b, os.RemoveAll(chunkDir))
}()
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
require.NoError(b, err)
defer h.Close()
@ -49,7 +51,10 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
require.NoError(b, os.RemoveAll(chunkDir))
}()
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
require.NoError(b, err)
defer h.Close()

View file

@ -47,7 +47,10 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
require.NoError(t, err)
h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = chunkRange
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, wlog, opts)
require.NoError(t, err)
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
@ -191,7 +194,10 @@ func BenchmarkLoadWAL(b *testing.B) {
// Load the WAL.
for i := 0; i < b.N; i++ {
h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
h, err := NewHead(nil, nil, w, opts)
require.NoError(b, err)
h.Init(0)
}
@ -302,7 +308,10 @@ func TestHead_WALMultiRef(t *testing.T) {
w, err = wal.New(nil, nil, w.Dir(), false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
head, err = NewHead(nil, nil, w, opts)
require.NoError(t, err)
require.NoError(t, head.Init(0))
defer func() {
@ -583,7 +592,10 @@ func TestHeadDeleteSimple(t *testing.T) {
// Compare the samples for both heads - before and after the reloadBlocks.
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
require.NoError(t, err)
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = reloadedW.Dir()
reloadedHead, err := NewHead(nil, nil, reloadedW, opts)
require.NoError(t, err)
require.NoError(t, reloadedHead.Init(0))
@ -1265,7 +1277,10 @@ func TestWalRepair_DecodingError(t *testing.T) {
require.NoError(t, w.Log(test.rec))
}
h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1
opts.ChunkDirRoot = w.Dir()
h, err := NewHead(nil, nil, w, opts)
require.NoError(t, err)
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
@ -1320,7 +1335,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
w, err := wal.New(nil, nil, walDir, false)
require.NoError(t, err)
h, err := NewHead(nil, nil, w, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = chunkRange
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, w, opts)
require.NoError(t, err)
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
require.NoError(t, h.Init(math.MinInt64))
@ -1550,7 +1568,10 @@ func TestMemSeriesIsolation(t *testing.T) {
wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false)
require.NoError(t, err)
hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = wlog.Dir()
hb, err = NewHead(nil, nil, wlog, opts)
defer func() { require.NoError(t, hb.Close()) }()
require.NoError(t, err)
require.NoError(t, hb.Init(0))

View file

@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
)
// Make entries ~50B in size, to emulate real-world high cardinality.
@ -38,7 +37,10 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
defer func() {
require.NoError(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
require.NoError(b, err)
defer func() {
require.NoError(b, h.Close())
@ -147,7 +149,10 @@ func BenchmarkQuerierSelect(b *testing.B) {
defer func() {
require.NoError(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
require.NoError(b, err)
defer h.Close()
app := h.Appender(context.Background())

View file

@ -407,7 +407,9 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
h, err := NewHead(nil, nil, nil, 2*time.Hour.Milliseconds(), "", nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 2 * time.Hour.Milliseconds()
h, err := NewHead(nil, nil, nil, opts)
require.NoError(t, err)
defer h.Close()
@ -1550,7 +1552,10 @@ func TestPostingsForMatchers(t *testing.T) {
defer func() {
require.NoError(t, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, h.Close())

View file

@ -56,7 +56,6 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/teststorage"
)
@ -2267,7 +2266,9 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) {
retErr = err
}
}()
h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, chunks.DefaultWriteBufferSize, tsdb.DefaultStripeSize, nil)
opts := tsdb.DefaultHeadOptions()
opts.ChunkRange = 1000
h, _ := tsdb.NewHead(nil, nil, nil, opts)
return h.Stats(statsByLabelName), nil
}