Support for computing and keeping secondary hash per series (#568)

* Add secondary hash function to DB options, and use it calculate hashed for memSeries

* Add function to count secondary hashes that fall within a set of ranges

* Simplify SecondaryHash function changes to Head.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix TODOs.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Added test for Secondary hash function and ForEachSecondaryHash.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Make sure we don't hold any lock when calling function.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

---------

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Co-authored-by: Patryk Prus <patryk.prus@grafana.com>
This commit is contained in:
Peter Štibraný 2023-11-21 16:26:30 +01:00 committed by GitHub
parent 152aea3bc6
commit cdc4e7fd4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 16 deletions

View file

@ -236,6 +236,10 @@ type Options struct {
// BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks // BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks
// regardless of the `concurrent` param. // regardless of the `concurrent` param.
BlockPostingsForMatchersCacheForce bool BlockPostingsForMatchersCacheForce bool
// SecondaryHashFunction is an optional function that is applied to each series in the Head.
// Values returned from this function are preserved and available by calling ForEachSecondaryHash function on the Head.
SecondaryHashFunction func(labels.Labels) uint32
} }
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -922,6 +926,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems
headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes
headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce
headOpts.SecondaryHashFunction = opts.SecondaryHashFunction
if opts.WALReplayConcurrency > 0 { if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
} }

View file

@ -27,9 +27,8 @@ import (
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
@ -139,6 +138,8 @@ type Head struct {
writeNotified wlog.WriteNotified writeNotified wlog.WriteNotified
memTruncationInProcess atomic.Bool memTruncationInProcess atomic.Bool
secondaryHashFunc func(labels.Labels) uint32
} }
type ExemplarStorage interface { type ExemplarStorage interface {
@ -189,6 +190,10 @@ type HeadOptions struct {
// The default value is GOMAXPROCS. // The default value is GOMAXPROCS.
// If it is set to a negative value or zero, the default value is used. // If it is set to a negative value or zero, the default value is used.
WALReplayConcurrency int WALReplayConcurrency int
// Optional hash function applied to each new series. Computed hash value is preserved for each series in the head,
// and values can be iterated by using Head.ForEachSecondaryHash method.
SecondaryHashFunction func(labels.Labels) uint32
} }
const ( const (
@ -268,6 +273,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea
opts.MaxExemplars.Store(0) opts.MaxExemplars.Store(0)
} }
shf := opts.SecondaryHashFunction
if shf == nil {
shf = func(labels.Labels) uint32 {
return 0
}
}
h := &Head{ h := &Head{
wal: wal, wal: wal,
wbl: wbl, wbl: wbl,
@ -278,10 +290,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea
return &memChunk{} return &memChunk{}
}, },
}, },
stats: stats, stats: stats,
reg: r, reg: r,
secondaryHashFunc: shf,
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce), pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce),
} }
if err := h.resetInMemoryState(); err != nil { if err := h.resetInMemoryState(); err != nil {
return nil, err return nil, err
@ -1661,7 +1673,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) return newMemSeries(lset, id, labels.StableHash(lset), h.secondaryHashFunc(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled)
}) })
if err != nil { if err != nil {
return nil, false, err return nil, false, err
@ -1987,6 +1999,9 @@ type memSeries struct {
// Series labels hash to use for sharding purposes. // Series labels hash to use for sharding purposes.
shardHash uint64 shardHash uint64
// Value returned by secondary hash function.
secondaryHash uint32
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
// When compaction runs, chunks get moved into a block and all pointers are shifted like so: // When compaction runs, chunks get moved into a block and all pointers are shifted like so:
// //
@ -2040,13 +2055,14 @@ type memSeriesOOOFields struct {
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
} }
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, secondaryHash uint32, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: id, ref: id,
nextAt: math.MinInt64, nextAt: math.MinInt64,
chunkEndTimeVariance: chunkEndTimeVariance, chunkEndTimeVariance: chunkEndTimeVariance,
shardHash: shardHash, shardHash: shardHash,
secondaryHash: secondaryHash,
} }
if !isolationDisabled { if !isolationDisabled {
s.txs = newTxRing(4) s.txs = newTxRing(4)
@ -2258,3 +2274,32 @@ func (h *Head) updateWALReplayStatusRead(current int) {
h.stats.WALReplayStatus.Current = current h.stats.WALReplayStatus.Current = current
} }
// ForEachSecondaryHash iterates over all series in the Head, and passes secondary hashes of the series
// to the function. Function is called with batch of hashes, in no specific order. Hash for each series
// in the head is included exactly once. Series for corresponding hash may be deleted while the function
// is running, and series inserted while this function runs may be reported or ignored.
//
// No locks are held when function is called.
//
// Slice of hashes passed to the function is reused between calls.
func (h *Head) ForEachSecondaryHash(fn func(secondaryHash []uint32)) {
buf := make([]uint32, 512)
for i := 0; i < h.series.size; i++ {
buf = buf[:0]
h.series.locks[i].RLock()
for _, all := range h.series.hashes[i] {
for _, s := range all {
// No need to lock series lock, as we're only accessing its immutable secondary hash.
buf = append(buf, s.secondaryHash)
}
}
h.series.locks[i].RUnlock()
if len(buf) > 0 {
fn(buf)
}
}
}

View file

@ -537,7 +537,7 @@ func TestMemSeries_chunk(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close()) require.NoError(t, chunkDiskMapper.Close())
}() }()
series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, true) series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, 0, true)
if tc.setup != nil { if tc.setup != nil {
tc.setup(t, series, chunkDiskMapper) tc.setup(t, series, chunkDiskMapper)

View file

@ -338,7 +338,7 @@ func BenchmarkLoadWLs(b *testing.B) {
for k := 0; k < c.batches*c.seriesPerBatch; k++ { for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time. // Create one mmapped chunk per series, with one sample at the given time.
lbls := labels.Labels{} lbls := labels.Labels{}
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled) s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, cOpts) s.append(c.mmappedChunkT, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks() // There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
@ -909,7 +909,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
} }
lbls := labels.FromStrings("a", "b") lbls := labels.FromStrings("a", "b")
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 { for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, cOpts) ok, _ := s.append(int64(i), float64(i), 0, cOpts)
@ -1050,7 +1050,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close()) require.NoError(t, chunkDiskMapper.Close())
}() }()
series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, true) series := newMemSeries(labels.EmptyLabels(), 1, labels.StableHash(labels.EmptyLabels()), 0, 0, true)
cOpts := chunkOpts{ cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper, chunkDiskMapper: chunkDiskMapper,
@ -1629,7 +1629,7 @@ func TestMemSeries_append(t *testing.T) {
} }
lbls := labels.Labels{} lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled)
// Add first two samples at the very end of a chunk range and the next two // Add first two samples at the very end of a chunk range and the next two
// on and after it. // on and after it.
@ -1691,7 +1691,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
} }
lbls := labels.Labels{} lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled)
histograms := tsdbutil.GenerateTestHistograms(4) histograms := tsdbutil.GenerateTestHistograms(4)
histogramWithOneMoreBucket := histograms[3].Copy() histogramWithOneMoreBucket := histograms[3].Copy()
@ -1754,7 +1754,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
} }
lbls := labels.Labels{} lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled)
// At this slow rate, we will fill the chunk in two block durations. // At this slow rate, we will fill the chunk in two block durations.
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
@ -3089,7 +3089,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
} }
lbls := labels.Labels{} lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, 0, defaultIsolationDisabled)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, cOpts) ok, _ := s.append(int64(i), float64(i), 0, cOpts)
@ -5695,3 +5695,60 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
require.Equal(t, numSamples/2, storedSampleCount) require.Equal(t, numSamples/2, storedSampleCount)
} }
func TestSecondaryHashFunction(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
opts.EnableNativeHistograms.Store(true)
opts.SecondaryHashFunction = func(l labels.Labels) uint32 {
return uint32(l.Len())
}
h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
const seriesCount = 100
const labelsCount = 10
app := h.Appender(context.Background())
for ix, s := range genSeries(seriesCount, labelsCount, 0, 0) {
_, err := app.Append(0, s.Labels(), int64(100*ix), float64(ix))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
checkSecondaryHashes := func(expected int) {
reportedHashes := 0
h.ForEachSecondaryHash(func(secondaryHashes []uint32) {
reportedHashes += len(secondaryHashes)
for _, h := range secondaryHashes {
require.Equal(t, uint32(labelsCount), h)
}
})
require.Equal(t, expected, reportedHashes)
}
checkSecondaryHashes(seriesCount)
// Truncate head, remove half of the series (because their timestamp is before supplied truncation MinT)
require.NoError(t, h.Truncate(100*(seriesCount/2)))
// There should be 50 reported series now.
checkSecondaryHashes(50)
// Truncate head again, remove all series, remove half of the series (because their timestamp is before supplied truncation MinT)
require.NoError(t, h.Truncate(100*seriesCount))
checkSecondaryHashes(0)
}