From bd85da00311d18cdb626035016d62f0451d08553 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 24 Nov 2023 18:28:05 +0100 Subject: [PATCH] TestSecondaryHashFunction: Add sub-test for conflicts Signed-off-by: Arve Knudsen --- tsdb/head.go | 10 ++-- tsdb/head_test.go | 116 ++++++++++++++++++++++++++++++---------------- 2 files changed, 80 insertions(+), 46 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 35eed6310f..9a7abd0ba9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1759,7 +1759,7 @@ func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (m seriesHashmap) set(hash uint64, s *memSeries) { +func (m *seriesHashmap) set(hash uint64, s *memSeries) { if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { m.unique[hash] = s return @@ -1777,7 +1777,7 @@ func (m seriesHashmap) set(hash uint64, s *memSeries) { m.conflicts[hash] = append(l, s) } -func (m seriesHashmap) del(hash uint64, lset labels.Labels) { +func (m *seriesHashmap) del(hash uint64, lset labels.Labels) { var rem []*memSeries unique, found := m.unique[hash] switch { @@ -1927,14 +1927,14 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) s.locks[i].Lock() - for hash, series := range s.hashes[i].unique { - check(i, hash, series, deletedForCallback) - } for hash, all := range s.hashes[i].conflicts { for _, series := range all { check(i, hash, series, deletedForCallback) } } + for hash, series := range s.hashes[i].unique { + check(i, hash, series, deletedForCallback) + } s.locks[i].Unlock() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7fe7254727..ca904cecf0 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5613,58 +5613,92 @@ func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) { } func TestSecondaryHashFunction(t *testing.T) { - dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) - require.NoError(t, err) - - opts := DefaultHeadOptions() - opts.ChunkRange = 1000 - opts.ChunkDirRoot = dir - opts.EnableExemplarStorage = true - opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) - opts.EnableNativeHistograms.Store(true) - opts.SecondaryHashFunction = func(l labels.Labels) uint32 { - return uint32(l.Len()) - } - - h, err := NewHead(nil, nil, wal, nil, opts, nil) - require.NoError(t, err) - - t.Cleanup(func() { - require.NoError(t, h.Close()) - }) - - const seriesCount = 100 - const labelsCount = 10 - - app := h.Appender(context.Background()) - for ix, s := range genSeries(seriesCount, labelsCount, 0, 0) { - _, err := app.Append(0, s.Labels(), int64(100*ix), float64(ix)) - require.NoError(t, err) - } - require.NoError(t, app.Commit()) - - checkSecondaryHashes := func(expected int) { + checkSecondaryHashes := func(t *testing.T, h *Head, labelsCount, expected int) { reportedHashes := 0 h.ForEachSecondaryHash(func(secondaryHashes []uint32) { reportedHashes += len(secondaryHashes) for _, h := range secondaryHashes { - require.Equal(t, uint32(labelsCount), h) + require.Equal(t, labelsCount, int(h)) } }) require.Equal(t, expected, reportedHashes) } - checkSecondaryHashes(seriesCount) + testCases := []struct { + name string + series func(*testing.T) []storage.Series + }{ + { + name: "without collisions", + series: func(_ *testing.T) []storage.Series { + return genSeries(100, 10, 0, 0) + }, + }, + { + name: "with collisions", + series: func(t *testing.T) []storage.Series { + // Make a couple of series with colliding label sets + collidingSet := map[string]string{"__name__": "metric"} + collidingSet["lbl"] = "qeYKm3" + series := []storage.Series{ + storage.NewListSeries( + labels.FromMap(collidingSet), []chunks.Sample{sample{t: 0, f: rand.Float64()}}, + ), + } + collidingSet["lbl"] = "2fUczT" + series = append(series, storage.NewListSeries( + labels.FromMap(collidingSet), []chunks.Sample{sample{t: 0, f: rand.Float64()}}, + )) + require.Equal(t, series[len(series)-2].Labels().Hash(), series[len(series)-1].Labels().Hash(), + "The two series should have the same label set hash") + return series + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + require.NoError(t, err) - // Truncate head, remove half of the series (because their timestamp is before supplied truncation MinT) - require.NoError(t, h.Truncate(100*(seriesCount/2))) + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = dir + opts.EnableExemplarStorage = true + opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + opts.EnableNativeHistograms.Store(true) + opts.SecondaryHashFunction = func(l labels.Labels) uint32 { + return uint32(l.Len()) + } - // There should be 50 reported series now. - checkSecondaryHashes(50) + h, err := NewHead(nil, nil, wal, nil, opts, nil) + require.NoError(t, err) - // Truncate head again, remove all series, remove half of the series (because their timestamp is before supplied truncation MinT) - require.NoError(t, h.Truncate(100*seriesCount)) - checkSecondaryHashes(0) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + app := h.Appender(context.Background()) + series := tc.series(t) + for ix, s := range series { + _, err := app.Append(0, s.Labels(), int64(100*ix), float64(ix)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + + labelsCount := series[0].Labels().Len() + checkSecondaryHashes(t, h, labelsCount, len(series)) + + // Truncate head, remove half of the series (because their timestamp is before supplied truncation MinT) + require.NoError(t, h.Truncate(100*int64(len(series)/2))) + + checkSecondaryHashes(t, h, labelsCount, len(series)/2) + + // Truncate head again, remove all series (because their timestamp is before supplied truncation MinT) + require.NoError(t, h.Truncate(100*int64(len(series)))) + checkSecondaryHashes(t, h, labelsCount, 0) + }) + } }