diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index d38518f71..76e734217 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -45,14 +45,23 @@ func (m *memSeries) updateTimestamp(newTs int64) bool { return false } -// seriesHashmap is a simple hashmap for memSeries by their label set. -// It is built on top of a regular hashmap and holds a slice of series to -// resolve hash collisions. Its methods require the hash to be submitted +// seriesHashmap lets agent find a memSeries by its label set, via a 64-bit hash. +// There is one map for the common case where the hash value is unique, and a +// second map for the case that two series have the same hash value. +// Each series is in only one of the maps. Its methods require the hash to be submitted // with the label set to avoid re-computing hash throughout the code. -type seriesHashmap map[uint64][]*memSeries +type seriesHashmap struct { + unique map[uint64]*memSeries + conflicts map[uint64][]*memSeries +} -func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { - for _, s := range m[hash] { +func (m *seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { + if s, found := m.unique[hash]; found { + if labels.Equal(s.lset, lset) { + return s + } + } + for _, s := range m.conflicts[hash] { if labels.Equal(s.lset, lset) { return s } @@ -60,28 +69,49 @@ func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (m seriesHashmap) Set(hash uint64, s *memSeries) { - seriesSet := m[hash] +func (m *seriesHashmap) Set(hash uint64, s *memSeries) { + if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { + m.unique[hash] = s + return + } + if m.conflicts == nil { + m.conflicts = make(map[uint64][]*memSeries) + } + seriesSet := m.conflicts[hash] for i, prev := range seriesSet { if labels.Equal(prev.lset, s.lset) { seriesSet[i] = s return } } - m[hash] = append(seriesSet, s) + m.conflicts[hash] = append(seriesSet, s) } -func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) { +func (m *seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) { var rem []*memSeries - for _, s := range m[hash] { - if s.ref != ref { - rem = append(rem, s) + unique, found := m.unique[hash] + switch { + case !found: // Supplied hash is not stored. + return + case unique.ref == ref: + conflicts := m.conflicts[hash] + if len(conflicts) == 0 { // Exactly one series with this hash was stored + delete(m.unique, hash) + return + } + m.unique[hash] = conflicts[0] // First remaining series goes in 'unique'. + rem = conflicts[1:] // Keep the rest. + default: // The series to delete is somewhere in 'conflicts'. Keep all the ones that don't match. + for _, s := range m.conflicts[hash] { + if s.ref != ref { + rem = append(rem, s) + } } } if len(rem) == 0 { - delete(m, hash) + delete(m.conflicts, hash) } else { - m[hash] = rem + m.conflicts[hash] = rem } } @@ -117,7 +147,10 @@ func newStripeSeries(stripeSize int) *stripeSeries { s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} } for i := range s.hashes { - s.hashes[i] = seriesHashmap{} + s.hashes[i] = seriesHashmap{ + unique: map[uint64]*memSeries{}, + conflicts: nil, // Initialized on demand in set(). + } } for i := range s.exemplars { s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{} @@ -136,40 +169,49 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { defer s.gcMut.Unlock() deleted := map[chunks.HeadSeriesRef]struct{}{} + + // For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID. + check := func(hashLock int, hash uint64, series *memSeries) { + series.Lock() + + // Any series that has received a write since mint is still alive. + if series.lastTs >= mint { + series.Unlock() + return + } + + // The series is stale. We need to obtain a second lock for the + // ref if it's different than the hash lock. + refLock := int(series.ref) & (s.size - 1) + if hashLock != refLock { + s.locks[refLock].Lock() + } + + deleted[series.ref] = struct{}{} + delete(s.series[refLock], series.ref) + s.hashes[hashLock].Delete(hash, series.ref) + + // Since the series is gone, we'll also delete + // the latest stored exemplar. + delete(s.exemplars[refLock], series.ref) + + if hashLock != refLock { + s.locks[refLock].Unlock() + } + series.Unlock() + } + for hashLock := 0; hashLock < s.size; hashLock++ { s.locks[hashLock].Lock() - for hash, all := range s.hashes[hashLock] { + for hash, all := range s.hashes[hashLock].conflicts { for _, series := range all { - series.Lock() - - // Any series that has received a write since mint is still alive. - if series.lastTs >= mint { - series.Unlock() - continue - } - - // The series is stale. We need to obtain a second lock for the - // ref if it's different than the hash lock. - refLock := int(series.ref) & (s.size - 1) - if hashLock != refLock { - s.locks[refLock].Lock() - } - - deleted[series.ref] = struct{}{} - delete(s.series[refLock], series.ref) - s.hashes[hashLock].Delete(hash, series.ref) - - // Since the series is gone, we'll also delete - // the latest stored exemplar. - delete(s.exemplars[refLock], series.ref) - - if hashLock != refLock { - s.locks[refLock].Unlock() - } - series.Unlock() + check(hashLock, hash, series) } } + for hash, series := range s.hashes[hashLock].unique { + check(hashLock, hash, series) + } s.locks[hashLock].Unlock() } diff --git a/tsdb/agent/series_test.go b/tsdb/agent/series_test.go index 480f6c026..ae327d858 100644 --- a/tsdb/agent/series_test.go +++ b/tsdb/agent/series_test.go @@ -74,3 +74,63 @@ func TestNoDeadlock(t *testing.T) { require.FailNow(t, "deadlock detected") } } + +func labelsWithHashCollision() (labels.Labels, labels.Labels) { + // These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions + ls1 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y") + ls2 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "v7uDlF") + + if ls1.Hash() != ls2.Hash() { + // These ones are the same when using -tags stringlabels + ls1 = labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl") + ls2 = labels.FromStrings("__name__", "metric", "lbl", "RqcXatm") + } + + if ls1.Hash() != ls2.Hash() { + panic("This code needs to be updated: find new labels with colliding hash values.") + } + + return ls1, ls2 +} + +// stripeSeriesWithCollidingSeries returns a stripeSeries with two memSeries having the same, colliding, hash. +func stripeSeriesWithCollidingSeries(_ *testing.T) (*stripeSeries, *memSeries, *memSeries) { + lbls1, lbls2 := labelsWithHashCollision() + ms1 := memSeries{ + lset: lbls1, + } + ms2 := memSeries{ + lset: lbls2, + } + hash := lbls1.Hash() + s := newStripeSeries(1) + + s.Set(hash, &ms1) + s.Set(hash, &ms2) + + return s, &ms1, &ms2 +} + +func TestStripeSeries_Get(t *testing.T) { + s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) + hash := ms1.lset.Hash() + + // Verify that we can get both of the series despite the hash collision + got := s.GetByHash(hash, ms1.lset) + require.Same(t, ms1, got) + got = s.GetByHash(hash, ms2.lset) + require.Same(t, ms2, got) +} + +func TestStripeSeries_gc(t *testing.T) { + s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) + hash := ms1.lset.Hash() + + s.GC(1) + + // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series + got := s.GetByHash(hash, ms1.lset) + require.Nil(t, got) + got = s.GetByHash(hash, ms2.lset) + require.Nil(t, got) +}