mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Pass affected labels to MemPostings.Delete()
(#14307)
* Pass affected labels to MemPostings.Delete As suggested by @bboreham, we can track the labels of the deleted series and avoid iterating through all the label/value combinations. This looks much faster on the MemPostings.Delete call. We don't have a benchmark on stripeSeries.gc() where we'll pay the price of iterating the labels of each one of the deleted series. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
4f78cc809c
commit
fd1a89b7c8
10
tsdb/head.go
10
tsdb/head.go
|
@ -1552,7 +1552,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
|
@ -1561,7 +1561,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||
h.numSeries.Sub(uint64(seriesRemoved))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted)
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
||||
// Remove tombstones referring to the deleted series.
|
||||
h.tombstones.DeleteTombstones(deleted)
|
||||
|
@ -1869,9 +1869,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||||
// and there's no easy way to cast maps.
|
||||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
|
@ -1927,6 +1928,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
s.hashes[hashShard].del(hash, series.ref)
|
||||
delete(s.series[refShard], series.ref)
|
||||
deletedForCallback[series.ref] = series.lset
|
||||
|
@ -1938,7 +1940,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
actualMint = mint
|
||||
}
|
||||
|
||||
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
|
||||
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
|
||||
}
|
||||
|
||||
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
|
||||
|
|
|
@ -814,6 +814,80 @@ func TestHead_UnknownWALRecord(t *testing.T) {
|
|||
require.NoError(t, head.Close())
|
||||
}
|
||||
|
||||
// BenchmarkHead_Truncate is quite heavy, so consider running it with
|
||||
// -benchtime=10x or similar to get more stable and comparable results.
|
||||
func BenchmarkHead_Truncate(b *testing.B) {
|
||||
const total = 1e6
|
||||
|
||||
prepare := func(b *testing.B, churn int) *Head {
|
||||
h, _ := newTestHead(b, 1000, wlog.CompressionNone, false)
|
||||
b.Cleanup(func() {
|
||||
require.NoError(b, h.Close())
|
||||
})
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
internedItoa := map[int]string{}
|
||||
var mtx sync.RWMutex
|
||||
itoa := func(i int) string {
|
||||
mtx.RLock()
|
||||
s, ok := internedItoa[i]
|
||||
mtx.RUnlock()
|
||||
if ok {
|
||||
return s
|
||||
}
|
||||
mtx.Lock()
|
||||
s = strconv.Itoa(i)
|
||||
internedItoa[i] = s
|
||||
mtx.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
allSeries := [total]labels.Labels{}
|
||||
nameValues := make([]string, 0, 100)
|
||||
for i := 0; i < total; i++ {
|
||||
nameValues = nameValues[:0]
|
||||
|
||||
// A thousand labels like lbl_x_of_1000, each with total/1000 values
|
||||
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
|
||||
nameValues = append(nameValues, thousand, itoa(i/1000))
|
||||
// A hundred labels like lbl_x_of_100, each with total/100 values.
|
||||
hundred := "lbl_" + itoa(i%100) + "_of_100"
|
||||
nameValues = append(nameValues, hundred, itoa(i/100))
|
||||
|
||||
if i%13 == 0 {
|
||||
ten := "lbl_" + itoa(i%10) + "_of_10"
|
||||
nameValues = append(nameValues, ten, itoa(i%10))
|
||||
}
|
||||
|
||||
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
|
||||
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i])
|
||||
s.mmappedChunks = []*mmappedChunk{
|
||||
{minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)},
|
||||
}
|
||||
}
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
for _, churn := range []int{10, 100, 1000} {
|
||||
b.Run(fmt.Sprintf("churn=%d", churn), func(b *testing.B) {
|
||||
if b.N > total/churn {
|
||||
// Just to make sure that benchmark still makes sense.
|
||||
panic("benchmark not prepared")
|
||||
}
|
||||
h := prepare(b, churn)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
require.NoError(b, h.Truncate(1000*int64(i)))
|
||||
// Make sure the benchmark is meaningful and it's actually truncating the expected amount of series.
|
||||
require.Equal(b, total-churn*i, int(h.NumSeries()))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHead_Truncate(t *testing.T) {
|
||||
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||
defer func() {
|
||||
|
|
|
@ -288,89 +288,34 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
|
|||
}
|
||||
|
||||
// Delete removes all ids in the given map from the postings lists.
|
||||
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) {
|
||||
// We will take an optimistic read lock for the entire method,
|
||||
// and only lock for writing when we actually find something to delete.
|
||||
//
|
||||
// Each SeriesRef can appear in several Postings.
|
||||
// To change each one, we need to know the label name and value that it is indexed under.
|
||||
// We iterate over all label names, then for each name all values,
|
||||
// and look for individual series to be deleted.
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
// affectedLabels contains all the labels that are affected by the deletion, there's no need to check other labels.
|
||||
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
|
||||
// Collect all keys relevant for deletion once. New keys added afterwards
|
||||
// can by definition not be affected by any of the given deletes.
|
||||
keys := make([]string, 0, len(p.m))
|
||||
maxVals := 0
|
||||
for n := range p.m {
|
||||
keys = append(keys, n)
|
||||
if len(p.m[n]) > maxVals {
|
||||
maxVals = len(p.m[n])
|
||||
process := func(l labels.Label) {
|
||||
orig := p.m[l.Name][l.Value]
|
||||
repl := make([]storage.SeriesRef, 0, len(orig))
|
||||
for _, id := range orig {
|
||||
if _, ok := deleted[id]; !ok {
|
||||
repl = append(repl, id)
|
||||
}
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
p.m[l.Name][l.Value] = repl
|
||||
} else {
|
||||
delete(p.m[l.Name], l.Value)
|
||||
// Delete the key if we removed all values.
|
||||
if len(p.m[l.Name]) == 0 {
|
||||
delete(p.m, l.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
vals := make([]string, 0, maxVals)
|
||||
for _, n := range keys {
|
||||
// Copy the values and iterate the copy: if we unlock in the loop below,
|
||||
// another goroutine might modify the map while we are part-way through it.
|
||||
vals = vals[:0]
|
||||
for v := range p.m[n] {
|
||||
vals = append(vals, v)
|
||||
}
|
||||
|
||||
// For each posting we first analyse whether the postings list is affected by the deletes.
|
||||
// If no, we remove the label value from the vals list.
|
||||
// This way we only need to Lock once later.
|
||||
for i := 0; i < len(vals); {
|
||||
found := false
|
||||
refs := p.m[n][vals[i]]
|
||||
for _, id := range refs {
|
||||
if _, ok := deleted[id]; ok {
|
||||
i++
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
// Didn't match, bring the last value to this position, make the slice shorter and check again.
|
||||
// The order of the slice doesn't matter as it comes from a map iteration.
|
||||
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
|
||||
}
|
||||
}
|
||||
|
||||
// If no label values have deleted ids, just continue.
|
||||
if len(vals) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// The only vals left here are the ones that contain deleted ids.
|
||||
// Now we take the write lock and remove the ids.
|
||||
p.mtx.RUnlock()
|
||||
p.mtx.Lock()
|
||||
for _, l := range vals {
|
||||
repl := make([]storage.SeriesRef, 0, len(p.m[n][l]))
|
||||
|
||||
for _, id := range p.m[n][l] {
|
||||
if _, ok := deleted[id]; !ok {
|
||||
repl = append(repl, id)
|
||||
}
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
p.m[n][l] = repl
|
||||
} else {
|
||||
delete(p.m[n], l)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the key if we removed all values.
|
||||
if len(p.m[n]) == 0 {
|
||||
delete(p.m, n)
|
||||
}
|
||||
p.mtx.Unlock()
|
||||
p.mtx.RLock()
|
||||
for l := range affected {
|
||||
process(l)
|
||||
}
|
||||
process(allPostingsKey)
|
||||
}
|
||||
|
||||
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
|
||||
|
|
|
@ -979,9 +979,13 @@ func TestMemPostings_Delete(t *testing.T) {
|
|||
p.Add(3, labels.FromStrings("lbl2", "a"))
|
||||
|
||||
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
||||
p.Delete(map[storage.SeriesRef]struct{}{
|
||||
deletedRefs := map[storage.SeriesRef]struct{}{
|
||||
2: {},
|
||||
})
|
||||
}
|
||||
affectedLabels := map[labels.Label]struct{}{
|
||||
{Name: "lbl1", Value: "b"}: {},
|
||||
}
|
||||
p.Delete(deletedRefs, affectedLabels)
|
||||
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
||||
|
||||
// Make sure postings gotten before the delete have the old data when
|
||||
|
@ -1022,33 +1026,23 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
|
|||
}
|
||||
|
||||
const total = 1e6
|
||||
prepare := func() *MemPostings {
|
||||
var ref storage.SeriesRef
|
||||
next := func() storage.SeriesRef {
|
||||
ref++
|
||||
return ref
|
||||
allSeries := [total]labels.Labels{}
|
||||
nameValues := make([]string, 0, 100)
|
||||
for i := 0; i < total; i++ {
|
||||
nameValues = nameValues[:0]
|
||||
|
||||
// A thousand labels like lbl_x_of_1000, each with total/1000 values
|
||||
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
|
||||
nameValues = append(nameValues, thousand, itoa(i/1000))
|
||||
// A hundred labels like lbl_x_of_100, each with total/100 values.
|
||||
hundred := "lbl_" + itoa(i%100) + "_of_100"
|
||||
nameValues = append(nameValues, hundred, itoa(i/100))
|
||||
|
||||
if i < 100 {
|
||||
ten := "lbl_" + itoa(i%10) + "_of_10"
|
||||
nameValues = append(nameValues, ten, itoa(i%10))
|
||||
}
|
||||
|
||||
p := NewMemPostings()
|
||||
nameValues := make([]string, 0, 100)
|
||||
for i := 0; i < total; i++ {
|
||||
nameValues = nameValues[:0]
|
||||
|
||||
// A thousand labels like lbl_x_of_1000, each with total/1000 values
|
||||
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
|
||||
nameValues = append(nameValues, thousand, itoa(i/1000))
|
||||
// A hundred labels like lbl_x_of_100, each with total/100 values.
|
||||
hundred := "lbl_" + itoa(i%100) + "_of_100"
|
||||
nameValues = append(nameValues, hundred, itoa(i/100))
|
||||
|
||||
if i < 100 {
|
||||
ten := "lbl_" + itoa(i%10) + "_of_10"
|
||||
nameValues = append(nameValues, ten, itoa(i%10))
|
||||
}
|
||||
|
||||
p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...))
|
||||
}
|
||||
return p
|
||||
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
|
||||
}
|
||||
|
||||
for _, refs := range []int{1, 100, 10_000} {
|
||||
|
@ -1060,7 +1054,11 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
|
|||
panic("benchmark not prepared")
|
||||
}
|
||||
|
||||
p := prepare()
|
||||
p := NewMemPostings()
|
||||
for i := range allSeries {
|
||||
p.Add(storage.SeriesRef(i), allSeries[i])
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < reads; i++ {
|
||||
|
@ -1086,11 +1084,16 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
|
|||
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
deleted := map[storage.SeriesRef]struct{}{}
|
||||
deleted := make(map[storage.SeriesRef]struct{}, refs)
|
||||
affected := make(map[labels.Label]struct{}, refs)
|
||||
for i := 0; i < refs; i++ {
|
||||
deleted[storage.SeriesRef(n*refs+i)] = struct{}{}
|
||||
ref := storage.SeriesRef(n*refs + i)
|
||||
deleted[ref] = struct{}{}
|
||||
allSeries[ref].Range(func(l labels.Label) {
|
||||
affected[l] = struct{}{}
|
||||
})
|
||||
}
|
||||
p.Delete(deleted)
|
||||
p.Delete(deleted, affected)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue