mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
TSDB: Only pay for hash collisions when they happen
Instead of a map of slices of `*memSeries`, ready for any of them to hold series where hash values collide, split into a map of `*memSeries` and a map of slices which is usually empty, since hash collisions are a one-in-a-billion thing. The `del` method gets more complicated, to maintain the invariant that a series is only in one of the two maps. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
ce4e757704
commit
e6c0f69f98
75
tsdb/head.go
75
tsdb/head.go
|
@ -1667,26 +1667,34 @@ func (h *Head) mmapHeadChunks() {
|
||||||
var count int
|
var count int
|
||||||
for i := 0; i < h.series.size; i++ {
|
for i := 0; i < h.series.size; i++ {
|
||||||
h.series.locks[i].RLock()
|
h.series.locks[i].RLock()
|
||||||
for _, all := range h.series.hashes[i] {
|
for _, series := range h.series.series[i] {
|
||||||
for _, series := range all {
|
series.Lock()
|
||||||
series.Lock()
|
count += series.mmapChunks(h.chunkDiskMapper)
|
||||||
count += series.mmapChunks(h.chunkDiskMapper)
|
series.Unlock()
|
||||||
series.Unlock()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
h.series.locks[i].RUnlock()
|
h.series.locks[i].RUnlock()
|
||||||
}
|
}
|
||||||
h.metrics.mmapChunksTotal.Add(float64(count))
|
h.metrics.mmapChunksTotal.Add(float64(count))
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
// seriesHashmap lets TSDB find a memSeries by its label set, via a 64-bit hash.
|
||||||
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
|
// There is one map for the common case where the hash value is unique, and a
|
||||||
|
// second map for the case that two series have the same hash value.
|
||||||
|
// Each series is in only one of the maps.
|
||||||
// Its methods require the hash to be submitted with it to avoid re-computations throughout
|
// Its methods require the hash to be submitted with it to avoid re-computations throughout
|
||||||
// the code.
|
// the code.
|
||||||
type seriesHashmap map[uint64][]*memSeries
|
type seriesHashmap struct {
|
||||||
|
unique map[uint64]*memSeries
|
||||||
|
conflicts map[uint64][]*memSeries
|
||||||
|
}
|
||||||
|
|
||||||
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
||||||
for _, s := range m[hash] {
|
if s, found := m.unique[hash]; found {
|
||||||
|
if labels.Equal(s.lset, lset) {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, s := range m.conflicts[hash] {
|
||||||
if labels.Equal(s.lset, lset) {
|
if labels.Equal(s.lset, lset) {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -1695,27 +1703,46 @@ func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m seriesHashmap) set(hash uint64, s *memSeries) {
|
func (m seriesHashmap) set(hash uint64, s *memSeries) {
|
||||||
l := m[hash]
|
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
|
||||||
|
m.unique[hash] = s
|
||||||
|
return
|
||||||
|
}
|
||||||
|
l := m.conflicts[hash]
|
||||||
for i, prev := range l {
|
for i, prev := range l {
|
||||||
if labels.Equal(prev.lset, s.lset) {
|
if labels.Equal(prev.lset, s.lset) {
|
||||||
l[i] = s
|
l[i] = s
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m[hash] = append(l, s)
|
m.conflicts[hash] = append(l, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
|
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
|
||||||
var rem []*memSeries
|
var rem []*memSeries
|
||||||
for _, s := range m[hash] {
|
unique, found := m.unique[hash]
|
||||||
if !labels.Equal(s.lset, lset) {
|
switch {
|
||||||
rem = append(rem, s)
|
case !found:
|
||||||
|
return
|
||||||
|
case labels.Equal(unique.lset, lset):
|
||||||
|
conflicts := m.conflicts[hash]
|
||||||
|
if len(conflicts) == 0 {
|
||||||
|
delete(m.unique, hash)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rem = conflicts
|
||||||
|
default:
|
||||||
|
rem = append(rem, unique)
|
||||||
|
for _, s := range m.conflicts[hash] {
|
||||||
|
if !labels.Equal(s.lset, lset) {
|
||||||
|
rem = append(rem, s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(rem) == 0 {
|
m.unique[hash] = rem[0]
|
||||||
delete(m, hash)
|
if len(rem) == 1 {
|
||||||
|
delete(m.conflicts, hash)
|
||||||
} else {
|
} else {
|
||||||
m[hash] = rem
|
m.conflicts[hash] = rem[1:]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1757,7 +1784,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
||||||
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
|
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
|
||||||
}
|
}
|
||||||
for i := range s.hashes {
|
for i := range s.hashes {
|
||||||
s.hashes[i] = seriesHashmap{}
|
s.hashes[i] = seriesHashmap{
|
||||||
|
unique: map[uint64]*memSeries{},
|
||||||
|
conflicts: map[uint64][]*memSeries{},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -1837,7 +1867,10 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
||||||
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
|
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
|
||||||
s.locks[i].Lock()
|
s.locks[i].Lock()
|
||||||
|
|
||||||
for hash, all := range s.hashes[i] {
|
for hash, series := range s.hashes[i].unique {
|
||||||
|
check(i, hash, series, deletedForCallback)
|
||||||
|
}
|
||||||
|
for hash, all := range s.hashes[i].conflicts {
|
||||||
for _, series := range all {
|
for _, series := range all {
|
||||||
check(i, hash, series, deletedForCallback)
|
check(i, hash, series, deletedForCallback)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue