From cdc4e7fd4bb868e221fb1eb16a39362c6e1a1e77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Tue, 21 Nov 2023 16:26:30 +0100 Subject: [PATCH] Support for computing and keeping secondary hash per series (#568) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add secondary hash function to DB options, and use it calculate hashed for memSeries * Add function to count secondary hashes that fall within a set of ranges * Simplify SecondaryHash function changes to Head. Signed-off-by: Peter Štibraný * Fix TODOs. Signed-off-by: Peter Štibraný * Added test for Secondary hash function and ForEachSecondaryHash. Signed-off-by: Peter Štibraný * Make sure we don't hold any lock when calling function. Signed-off-by: Peter Štibraný --------- Signed-off-by: Peter Štibraný Co-authored-by: Patryk Prus --- tsdb/db.go | 5 +++ tsdb/head.go | 61 +++++++++++++++++++++++++++++++----- tsdb/head_read_test.go | 2 +- tsdb/head_test.go | 71 +++++++++++++++++++++++++++++++++++++----- 4 files changed, 123 insertions(+), 16 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index b3290c24ec..94df0e760d 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -236,6 +236,10 @@ type Options struct { // BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks // regardless of the `concurrent` param. BlockPostingsForMatchersCacheForce bool + + // SecondaryHashFunction is an optional function that is applied to each series in the Head. + // Values returned from this function are preserved and available by calling ForEachSecondaryHash function on the Head. + SecondaryHashFunction func(labels.Labels) uint32 } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -922,6 +926,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce + headOpts.SecondaryHashFunction = opts.SecondaryHashFunction if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency } diff --git a/tsdb/head.go b/tsdb/head.go index 7533ac44a2..a75fcc2856 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -27,9 +27,8 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" - "go.uber.org/atomic" - "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" @@ -139,6 +138,8 @@ type Head struct { writeNotified wlog.WriteNotified memTruncationInProcess atomic.Bool + + secondaryHashFunc func(labels.Labels) uint32 } type ExemplarStorage interface { @@ -189,6 +190,10 @@ type HeadOptions struct { // The default value is GOMAXPROCS. // If it is set to a negative value or zero, the default value is used. WALReplayConcurrency int + + // Optional hash function applied to each new series. Computed hash value is preserved for each series in the head, + // and values can be iterated by using Head.ForEachSecondaryHash method. + SecondaryHashFunction func(labels.Labels) uint32 } const ( @@ -268,6 +273,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea opts.MaxExemplars.Store(0) } + shf := opts.SecondaryHashFunction + if shf == nil { + shf = func(labels.Labels) uint32 { + return 0 + } + } + h := &Head{ wal: wal, wbl: wbl, @@ -278,10 +290,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea return &memChunk{} }, }, - stats: stats, - reg: r, - - pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce), + stats: stats, + reg: r, + secondaryHashFunc: shf, + pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce), } if err := h.resetInMemoryState(); err != nil { return nil, err @@ -1661,7 +1673,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) + return newMemSeries(lset, id, labels.StableHash(lset), h.secondaryHashFunc(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -1987,6 +1999,9 @@ type memSeries struct { // Series labels hash to use for sharding purposes. shardHash uint64 + // Value returned by secondary hash function. + secondaryHash uint32 + // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. // When compaction runs, chunks get moved into a block and all pointers are shifted like so: // @@ -2040,13 +2055,14 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, secondaryHash uint32, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { s := &memSeries{ lset: lset, ref: id, nextAt: math.MinInt64, chunkEndTimeVariance: chunkEndTimeVariance, shardHash: shardHash, + secondaryHash: secondaryHash, } if !isolationDisabled { s.txs = newTxRing(4) @@ -2258,3 +2274,32 @@ func (h *Head) updateWALReplayStatusRead(current int) { h.stats.WALReplayStatus.Current = current } + +// ForEachSecondaryHash iterates over all series in the Head, and passes secondary hashes of the series +// to the function. Function is called with batch of hashes, in no specific order. Hash for each series +// in the head is included exactly once. Series for corresponding hash may be deleted while the function +// is running, and series inserted while this function runs may be reported or ignored. +// +// No locks are held when function is called. +// +// Slice of hashes passed to the function is reused between calls. +func (h *Head) ForEachSecondaryHash(fn func(secondaryHash []uint32)) { + buf := make([]uint32, 512) + + for i := 0; i < h.series.size; i++ { + buf = buf[:0] + + h.series.locks[i].RLock() + for _, all := range h.series.hashes[i] { + for _, s := range all { + // No need to lock series lock, as we're only accessing its immutable secondary hash. + buf = append(buf, s.secondaryHash) + } + } + h.series.locks[i].RUnlock() + + if len(buf) > 0 { + fn(buf) + } + } +} diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 499bf79570..3d504f359d 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -537,7 +537,7 @@ func TestMemSeries_chunk(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, true) + series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, 0, true) if tc.setup != nil { tc.setup(t, series, chunkDiskMapper) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5964c7667b..d22773d919 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -338,7 +338,7 @@ func BenchmarkLoadWLs(b *testing.B) { for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. lbls := labels.Labels{} - s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with @@ -909,7 +909,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { } lbls := labels.FromStrings("a", "b") - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, cOpts) @@ -1050,7 +1050,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, true) + series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, 0, true) cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, @@ -1629,7 +1629,7 @@ func TestMemSeries_append(t *testing.T) { } lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1691,7 +1691,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { } lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1754,7 +1754,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { } lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -3089,7 +3089,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { } lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, cOpts) @@ -5695,3 +5695,60 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) { require.Equal(t, numSamples/2, storedSampleCount) } + +func TestSecondaryHashFunction(t *testing.T) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = dir + opts.EnableExemplarStorage = true + opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + opts.EnableNativeHistograms.Store(true) + opts.SecondaryHashFunction = func(l labels.Labels) uint32 { + return uint32(l.Len()) + } + + h, err := NewHead(nil, nil, wal, nil, opts, nil) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + const seriesCount = 100 + const labelsCount = 10 + + app := h.Appender(context.Background()) + for ix, s := range genSeries(seriesCount, labelsCount, 0, 0) { + _, err := app.Append(0, s.Labels(), int64(100*ix), float64(ix)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + checkSecondaryHashes := func(expected int) { + reportedHashes := 0 + h.ForEachSecondaryHash(func(secondaryHashes []uint32) { + reportedHashes += len(secondaryHashes) + + for _, h := range secondaryHashes { + require.Equal(t, uint32(labelsCount), h) + } + }) + require.Equal(t, expected, reportedHashes) + } + + checkSecondaryHashes(seriesCount) + + // Truncate head, remove half of the series (because their timestamp is before supplied truncation MinT) + require.NoError(t, h.Truncate(100*(seriesCount/2))) + + // There should be 50 reported series now. + checkSecondaryHashes(50) + + // Truncate head again, remove all series, remove half of the series (because their timestamp is before supplied truncation MinT) + require.NoError(t, h.Truncate(100*seriesCount)) + checkSecondaryHashes(0) +}