diff --git a/promql/engine_test.go b/promql/engine_test.go index b3e7d4094..105cdc10d 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "math" + "os" "sort" "testing" "time" @@ -46,7 +47,9 @@ func TestMain(m *testing.M) { func TestQueryConcurrency(t *testing.T) { maxConcurrency := 10 - dir := t.TempDir() + dir, err := os.MkdirTemp("", "test_concurrency") + require.NoError(t, err) + defer os.RemoveAll(dir) queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil) t.Cleanup(queryTracker.Close) diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index a4571d59a..1726d5db7 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -84,7 +84,7 @@ func TestDB_InvalidSeries(t *testing.T) { }) } -func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *DB { +func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *DB { t.Helper() dbDir := t.TempDir() @@ -878,3 +878,21 @@ func TestDBAllowOOOSamples(t *testing.T) { require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") require.NoError(t, db.Close()) } + +func BenchmarkCreateSeries(b *testing.B) { + s := createTestAgentDB(b, nil, DefaultOptions()) + defer s.Close() + + app := s.Appender(context.Background()).(*appender) + lbls := make([]labels.Labels, b.N) + + for i, l := range labelsForTest("benchmark", b.N) { + lbls[i] = labels.New(l...) + } + + b.ResetTimer() + + for _, l := range lbls { + app.getOrCreate(l) + } +} diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index d38518f71..76e734217 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -45,14 +45,23 @@ func (m *memSeries) updateTimestamp(newTs int64) bool { return false } -// seriesHashmap is a simple hashmap for memSeries by their label set. -// It is built on top of a regular hashmap and holds a slice of series to -// resolve hash collisions. Its methods require the hash to be submitted +// seriesHashmap lets agent find a memSeries by its label set, via a 64-bit hash. +// There is one map for the common case where the hash value is unique, and a +// second map for the case that two series have the same hash value. +// Each series is in only one of the maps. Its methods require the hash to be submitted // with the label set to avoid re-computing hash throughout the code. -type seriesHashmap map[uint64][]*memSeries +type seriesHashmap struct { + unique map[uint64]*memSeries + conflicts map[uint64][]*memSeries +} -func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { - for _, s := range m[hash] { +func (m *seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { + if s, found := m.unique[hash]; found { + if labels.Equal(s.lset, lset) { + return s + } + } + for _, s := range m.conflicts[hash] { if labels.Equal(s.lset, lset) { return s } @@ -60,28 +69,49 @@ func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (m seriesHashmap) Set(hash uint64, s *memSeries) { - seriesSet := m[hash] +func (m *seriesHashmap) Set(hash uint64, s *memSeries) { + if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { + m.unique[hash] = s + return + } + if m.conflicts == nil { + m.conflicts = make(map[uint64][]*memSeries) + } + seriesSet := m.conflicts[hash] for i, prev := range seriesSet { if labels.Equal(prev.lset, s.lset) { seriesSet[i] = s return } } - m[hash] = append(seriesSet, s) + m.conflicts[hash] = append(seriesSet, s) } -func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) { +func (m *seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) { var rem []*memSeries - for _, s := range m[hash] { - if s.ref != ref { - rem = append(rem, s) + unique, found := m.unique[hash] + switch { + case !found: // Supplied hash is not stored. + return + case unique.ref == ref: + conflicts := m.conflicts[hash] + if len(conflicts) == 0 { // Exactly one series with this hash was stored + delete(m.unique, hash) + return + } + m.unique[hash] = conflicts[0] // First remaining series goes in 'unique'. + rem = conflicts[1:] // Keep the rest. + default: // The series to delete is somewhere in 'conflicts'. Keep all the ones that don't match. + for _, s := range m.conflicts[hash] { + if s.ref != ref { + rem = append(rem, s) + } } } if len(rem) == 0 { - delete(m, hash) + delete(m.conflicts, hash) } else { - m[hash] = rem + m.conflicts[hash] = rem } } @@ -117,7 +147,10 @@ func newStripeSeries(stripeSize int) *stripeSeries { s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} } for i := range s.hashes { - s.hashes[i] = seriesHashmap{} + s.hashes[i] = seriesHashmap{ + unique: map[uint64]*memSeries{}, + conflicts: nil, // Initialized on demand in set(). + } } for i := range s.exemplars { s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{} @@ -136,40 +169,49 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { defer s.gcMut.Unlock() deleted := map[chunks.HeadSeriesRef]struct{}{} + + // For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID. + check := func(hashLock int, hash uint64, series *memSeries) { + series.Lock() + + // Any series that has received a write since mint is still alive. + if series.lastTs >= mint { + series.Unlock() + return + } + + // The series is stale. We need to obtain a second lock for the + // ref if it's different than the hash lock. + refLock := int(series.ref) & (s.size - 1) + if hashLock != refLock { + s.locks[refLock].Lock() + } + + deleted[series.ref] = struct{}{} + delete(s.series[refLock], series.ref) + s.hashes[hashLock].Delete(hash, series.ref) + + // Since the series is gone, we'll also delete + // the latest stored exemplar. + delete(s.exemplars[refLock], series.ref) + + if hashLock != refLock { + s.locks[refLock].Unlock() + } + series.Unlock() + } + for hashLock := 0; hashLock < s.size; hashLock++ { s.locks[hashLock].Lock() - for hash, all := range s.hashes[hashLock] { + for hash, all := range s.hashes[hashLock].conflicts { for _, series := range all { - series.Lock() - - // Any series that has received a write since mint is still alive. - if series.lastTs >= mint { - series.Unlock() - continue - } - - // The series is stale. We need to obtain a second lock for the - // ref if it's different than the hash lock. - refLock := int(series.ref) & (s.size - 1) - if hashLock != refLock { - s.locks[refLock].Lock() - } - - deleted[series.ref] = struct{}{} - delete(s.series[refLock], series.ref) - s.hashes[hashLock].Delete(hash, series.ref) - - // Since the series is gone, we'll also delete - // the latest stored exemplar. - delete(s.exemplars[refLock], series.ref) - - if hashLock != refLock { - s.locks[refLock].Unlock() - } - series.Unlock() + check(hashLock, hash, series) } } + for hash, series := range s.hashes[hashLock].unique { + check(hashLock, hash, series) + } s.locks[hashLock].Unlock() } diff --git a/tsdb/agent/series_test.go b/tsdb/agent/series_test.go index 480f6c026..ae327d858 100644 --- a/tsdb/agent/series_test.go +++ b/tsdb/agent/series_test.go @@ -74,3 +74,63 @@ func TestNoDeadlock(t *testing.T) { require.FailNow(t, "deadlock detected") } } + +func labelsWithHashCollision() (labels.Labels, labels.Labels) { + // These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions + ls1 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y") + ls2 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "v7uDlF") + + if ls1.Hash() != ls2.Hash() { + // These ones are the same when using -tags stringlabels + ls1 = labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl") + ls2 = labels.FromStrings("__name__", "metric", "lbl", "RqcXatm") + } + + if ls1.Hash() != ls2.Hash() { + panic("This code needs to be updated: find new labels with colliding hash values.") + } + + return ls1, ls2 +} + +// stripeSeriesWithCollidingSeries returns a stripeSeries with two memSeries having the same, colliding, hash. +func stripeSeriesWithCollidingSeries(_ *testing.T) (*stripeSeries, *memSeries, *memSeries) { + lbls1, lbls2 := labelsWithHashCollision() + ms1 := memSeries{ + lset: lbls1, + } + ms2 := memSeries{ + lset: lbls2, + } + hash := lbls1.Hash() + s := newStripeSeries(1) + + s.Set(hash, &ms1) + s.Set(hash, &ms2) + + return s, &ms1, &ms2 +} + +func TestStripeSeries_Get(t *testing.T) { + s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) + hash := ms1.lset.Hash() + + // Verify that we can get both of the series despite the hash collision + got := s.GetByHash(hash, ms1.lset) + require.Same(t, ms1, got) + got = s.GetByHash(hash, ms2.lset) + require.Same(t, ms2, got) +} + +func TestStripeSeries_gc(t *testing.T) { + s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) + hash := ms1.lset.Hash() + + s.GC(1) + + // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series + got := s.GetByHash(hash, ms1.lset) + require.Nil(t, got) + got = s.GetByHash(hash, ms2.lset) + require.Nil(t, got) +} diff --git a/tsdb/tsdbutil/dir_locker_testutil.go b/tsdb/tsdbutil/dir_locker_testutil.go index a7f43d8ee..a4cf5abd6 100644 --- a/tsdb/tsdbutil/dir_locker_testutil.go +++ b/tsdb/tsdbutil/dir_locker_testutil.go @@ -60,8 +60,12 @@ func TestDirLockerUsage(t *testing.T, open func(t *testing.T, data string, creat for _, c := range cases { t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) { - tmpdir := t.TempDir() - + tmpdir, err := os.MkdirTemp("", "test") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(tmpdir)) + }) + // Test preconditions (file already exists + lockfile option) if c.fileAlreadyExists { tmpLocker, err := NewDirLocker(tmpdir, "tsdb", log.NewNopLogger(), nil) @@ -78,7 +82,7 @@ func TestDirLockerUsage(t *testing.T, open func(t *testing.T, data string, creat // Check that the lockfile is always deleted if !c.lockFileDisabled { - _, err := os.Stat(locker.path) + _, err = os.Stat(locker.path) require.True(t, os.IsNotExist(err), "lockfile was not deleted") } })