TestSecondaryHashFunction: Add sub-test for conflicts

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2023-11-24 18:28:05 +01:00
parent f047647bb0
commit bd85da0031
2 changed files with 80 additions and 46 deletions

View file

@ -1759,7 +1759,7 @@ func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
return nil
}
func (m seriesHashmap) set(hash uint64, s *memSeries) {
func (m *seriesHashmap) set(hash uint64, s *memSeries) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
m.unique[hash] = s
return
@ -1777,7 +1777,7 @@ func (m seriesHashmap) set(hash uint64, s *memSeries) {
m.conflicts[hash] = append(l, s)
}
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
func (m *seriesHashmap) del(hash uint64, lset labels.Labels) {
var rem []*memSeries
unique, found := m.unique[hash]
switch {
@ -1927,14 +1927,14 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
s.locks[i].Lock()
for hash, series := range s.hashes[i].unique {
check(i, hash, series, deletedForCallback)
}
for hash, all := range s.hashes[i].conflicts {
for _, series := range all {
check(i, hash, series, deletedForCallback)
}
}
for hash, series := range s.hashes[i].unique {
check(i, hash, series, deletedForCallback)
}
s.locks[i].Unlock()

View file

@ -5613,58 +5613,92 @@ func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) {
}
func TestSecondaryHashFunction(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
opts.EnableNativeHistograms.Store(true)
opts.SecondaryHashFunction = func(l labels.Labels) uint32 {
return uint32(l.Len())
}
h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
const seriesCount = 100
const labelsCount = 10
app := h.Appender(context.Background())
for ix, s := range genSeries(seriesCount, labelsCount, 0, 0) {
_, err := app.Append(0, s.Labels(), int64(100*ix), float64(ix))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
checkSecondaryHashes := func(expected int) {
checkSecondaryHashes := func(t *testing.T, h *Head, labelsCount, expected int) {
reportedHashes := 0
h.ForEachSecondaryHash(func(secondaryHashes []uint32) {
reportedHashes += len(secondaryHashes)
for _, h := range secondaryHashes {
require.Equal(t, uint32(labelsCount), h)
require.Equal(t, labelsCount, int(h))
}
})
require.Equal(t, expected, reportedHashes)
}
checkSecondaryHashes(seriesCount)
testCases := []struct {
name string
series func(*testing.T) []storage.Series
}{
{
name: "without collisions",
series: func(_ *testing.T) []storage.Series {
return genSeries(100, 10, 0, 0)
},
},
{
name: "with collisions",
series: func(t *testing.T) []storage.Series {
// Make a couple of series with colliding label sets
collidingSet := map[string]string{"__name__": "metric"}
collidingSet["lbl"] = "qeYKm3"
series := []storage.Series{
storage.NewListSeries(
labels.FromMap(collidingSet), []chunks.Sample{sample{t: 0, f: rand.Float64()}},
),
}
collidingSet["lbl"] = "2fUczT"
series = append(series, storage.NewListSeries(
labels.FromMap(collidingSet), []chunks.Sample{sample{t: 0, f: rand.Float64()}},
))
require.Equal(t, series[len(series)-2].Labels().Hash(), series[len(series)-1].Labels().Hash(),
"The two series should have the same label set hash")
return series
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
require.NoError(t, err)
// Truncate head, remove half of the series (because their timestamp is before supplied truncation MinT)
require.NoError(t, h.Truncate(100*(seriesCount/2)))
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
opts.EnableNativeHistograms.Store(true)
opts.SecondaryHashFunction = func(l labels.Labels) uint32 {
return uint32(l.Len())
}
// There should be 50 reported series now.
checkSecondaryHashes(50)
h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err)
// Truncate head again, remove all series, remove half of the series (because their timestamp is before supplied truncation MinT)
require.NoError(t, h.Truncate(100*seriesCount))
checkSecondaryHashes(0)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
app := h.Appender(context.Background())
series := tc.series(t)
for ix, s := range series {
_, err := app.Append(0, s.Labels(), int64(100*ix), float64(ix))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
labelsCount := series[0].Labels().Len()
checkSecondaryHashes(t, h, labelsCount, len(series))
// Truncate head, remove half of the series (because their timestamp is before supplied truncation MinT)
require.NoError(t, h.Truncate(100*int64(len(series)/2)))
checkSecondaryHashes(t, h, labelsCount, len(series)/2)
// Truncate head again, remove all series (because their timestamp is before supplied truncation MinT)
require.NoError(t, h.Truncate(100*int64(len(series))))
checkSecondaryHashes(t, h, labelsCount, 0)
})
}
}