diff --git a/tsdb/head.go b/tsdb/head.go index d5f7144fd..5972a9c5d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1552,7 +1552,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) + deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) @@ -1561,7 +1561,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { h.numSeries.Sub(uint64(seriesRemoved)) // Remove deleted series IDs from the postings lists. - h.postings.Delete(deleted) + h.postings.Delete(deleted, affected) // Remove tombstones referring to the deleted series. h.tombstones.DeleteTombstones(deleted) @@ -1869,9 +1869,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct // and there's no easy way to cast maps. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. -func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) { +func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { var ( deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} rmChunks = 0 actualMint int64 = math.MaxInt64 minOOOTime int64 = math.MaxInt64 @@ -1927,6 +1928,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( } deleted[storage.SeriesRef(series.ref)] = struct{}{} + series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) s.hashes[hashShard].del(hash, series.ref) delete(s.series[refShard], series.ref) deletedForCallback[series.ref] = series.lset @@ -1938,7 +1940,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( actualMint = mint } - return deleted, rmChunks, actualMint, minOOOTime, minMmapFile + return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile } // The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index bb437ab59..93f046e5b 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -814,6 +814,80 @@ func TestHead_UnknownWALRecord(t *testing.T) { require.NoError(t, head.Close()) } +// BenchmarkHead_Truncate is quite heavy, so consider running it with +// -benchtime=10x or similar to get more stable and comparable results. +func BenchmarkHead_Truncate(b *testing.B) { + const total = 1e6 + + prepare := func(b *testing.B, churn int) *Head { + h, _ := newTestHead(b, 1000, wlog.CompressionNone, false) + b.Cleanup(func() { + require.NoError(b, h.Close()) + }) + + h.initTime(0) + + internedItoa := map[int]string{} + var mtx sync.RWMutex + itoa := func(i int) string { + mtx.RLock() + s, ok := internedItoa[i] + mtx.RUnlock() + if ok { + return s + } + mtx.Lock() + s = strconv.Itoa(i) + internedItoa[i] = s + mtx.Unlock() + return s + } + + allSeries := [total]labels.Labels{} + nameValues := make([]string, 0, 100) + for i := 0; i < total; i++ { + nameValues = nameValues[:0] + + // A thousand labels like lbl_x_of_1000, each with total/1000 values + thousand := "lbl_" + itoa(i%1000) + "_of_1000" + nameValues = append(nameValues, thousand, itoa(i/1000)) + // A hundred labels like lbl_x_of_100, each with total/100 values. + hundred := "lbl_" + itoa(i%100) + "_of_100" + nameValues = append(nameValues, hundred, itoa(i/100)) + + if i%13 == 0 { + ten := "lbl_" + itoa(i%10) + "_of_10" + nameValues = append(nameValues, ten, itoa(i%10)) + } + + allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...) + s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i]) + s.mmappedChunks = []*mmappedChunk{ + {minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)}, + } + } + + return h + } + + for _, churn := range []int{10, 100, 1000} { + b.Run(fmt.Sprintf("churn=%d", churn), func(b *testing.B) { + if b.N > total/churn { + // Just to make sure that benchmark still makes sense. + panic("benchmark not prepared") + } + h := prepare(b, churn) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + require.NoError(b, h.Truncate(1000*int64(i))) + // Make sure the benchmark is meaningful and it's actually truncating the expected amount of series. + require.Equal(b, total-churn*i, int(h.NumSeries())) + } + }) + } +} + func TestHead_Truncate(t *testing.T) { h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 6b654f6b5..d9b5b69de 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -288,89 +288,34 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { } // Delete removes all ids in the given map from the postings lists. -func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { - // We will take an optimistic read lock for the entire method, - // and only lock for writing when we actually find something to delete. - // - // Each SeriesRef can appear in several Postings. - // To change each one, we need to know the label name and value that it is indexed under. - // We iterate over all label names, then for each name all values, - // and look for individual series to be deleted. - p.mtx.RLock() - defer p.mtx.RUnlock() +// affectedLabels contains all the labels that are affected by the deletion, there's no need to check other labels. +func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) { + p.mtx.Lock() + defer p.mtx.Unlock() - // Collect all keys relevant for deletion once. New keys added afterwards - // can by definition not be affected by any of the given deletes. - keys := make([]string, 0, len(p.m)) - maxVals := 0 - for n := range p.m { - keys = append(keys, n) - if len(p.m[n]) > maxVals { - maxVals = len(p.m[n]) + process := func(l labels.Label) { + orig := p.m[l.Name][l.Value] + repl := make([]storage.SeriesRef, 0, len(orig)) + for _, id := range orig { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } + } + if len(repl) > 0 { + p.m[l.Name][l.Value] = repl + } else { + delete(p.m[l.Name], l.Value) + // Delete the key if we removed all values. + if len(p.m[l.Name]) == 0 { + delete(p.m, l.Name) + } } } - vals := make([]string, 0, maxVals) - for _, n := range keys { - // Copy the values and iterate the copy: if we unlock in the loop below, - // another goroutine might modify the map while we are part-way through it. - vals = vals[:0] - for v := range p.m[n] { - vals = append(vals, v) - } - - // For each posting we first analyse whether the postings list is affected by the deletes. - // If no, we remove the label value from the vals list. - // This way we only need to Lock once later. - for i := 0; i < len(vals); { - found := false - refs := p.m[n][vals[i]] - for _, id := range refs { - if _, ok := deleted[id]; ok { - i++ - found = true - break - } - } - - if !found { - // Didn't match, bring the last value to this position, make the slice shorter and check again. - // The order of the slice doesn't matter as it comes from a map iteration. - vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] - } - } - - // If no label values have deleted ids, just continue. - if len(vals) == 0 { - continue - } - - // The only vals left here are the ones that contain deleted ids. - // Now we take the write lock and remove the ids. - p.mtx.RUnlock() - p.mtx.Lock() - for _, l := range vals { - repl := make([]storage.SeriesRef, 0, len(p.m[n][l])) - - for _, id := range p.m[n][l] { - if _, ok := deleted[id]; !ok { - repl = append(repl, id) - } - } - if len(repl) > 0 { - p.m[n][l] = repl - } else { - delete(p.m[n], l) - } - } - - // Delete the key if we removed all values. - if len(p.m[n]) == 0 { - delete(p.m, n) - } - p.mtx.Unlock() - p.mtx.RLock() + for l := range affected { + process(l) } + process(allPostingsKey) } // Iter calls f for each postings list. It aborts if f returns an error and returns it. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 4f34cc47e..96c9ed124 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -979,9 +979,13 @@ func TestMemPostings_Delete(t *testing.T) { p.Add(3, labels.FromStrings("lbl2", "a")) before := p.Get(allPostingsKey.Name, allPostingsKey.Value) - p.Delete(map[storage.SeriesRef]struct{}{ + deletedRefs := map[storage.SeriesRef]struct{}{ 2: {}, - }) + } + affectedLabels := map[labels.Label]struct{}{ + {Name: "lbl1", Value: "b"}: {}, + } + p.Delete(deletedRefs, affectedLabels) after := p.Get(allPostingsKey.Name, allPostingsKey.Value) // Make sure postings gotten before the delete have the old data when @@ -1022,33 +1026,23 @@ func BenchmarkMemPostings_Delete(b *testing.B) { } const total = 1e6 - prepare := func() *MemPostings { - var ref storage.SeriesRef - next := func() storage.SeriesRef { - ref++ - return ref + allSeries := [total]labels.Labels{} + nameValues := make([]string, 0, 100) + for i := 0; i < total; i++ { + nameValues = nameValues[:0] + + // A thousand labels like lbl_x_of_1000, each with total/1000 values + thousand := "lbl_" + itoa(i%1000) + "_of_1000" + nameValues = append(nameValues, thousand, itoa(i/1000)) + // A hundred labels like lbl_x_of_100, each with total/100 values. + hundred := "lbl_" + itoa(i%100) + "_of_100" + nameValues = append(nameValues, hundred, itoa(i/100)) + + if i < 100 { + ten := "lbl_" + itoa(i%10) + "_of_10" + nameValues = append(nameValues, ten, itoa(i%10)) } - - p := NewMemPostings() - nameValues := make([]string, 0, 100) - for i := 0; i < total; i++ { - nameValues = nameValues[:0] - - // A thousand labels like lbl_x_of_1000, each with total/1000 values - thousand := "lbl_" + itoa(i%1000) + "_of_1000" - nameValues = append(nameValues, thousand, itoa(i/1000)) - // A hundred labels like lbl_x_of_100, each with total/100 values. - hundred := "lbl_" + itoa(i%100) + "_of_100" - nameValues = append(nameValues, hundred, itoa(i/100)) - - if i < 100 { - ten := "lbl_" + itoa(i%10) + "_of_10" - nameValues = append(nameValues, ten, itoa(i%10)) - } - - p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)) - } - return p + allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...) } for _, refs := range []int{1, 100, 10_000} { @@ -1060,7 +1054,11 @@ func BenchmarkMemPostings_Delete(b *testing.B) { panic("benchmark not prepared") } - p := prepare() + p := NewMemPostings() + for i := range allSeries { + p.Add(storage.SeriesRef(i), allSeries[i]) + } + stop := make(chan struct{}) wg := sync.WaitGroup{} for i := 0; i < reads; i++ { @@ -1086,11 +1084,16 @@ func BenchmarkMemPostings_Delete(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - deleted := map[storage.SeriesRef]struct{}{} + deleted := make(map[storage.SeriesRef]struct{}, refs) + affected := make(map[labels.Label]struct{}, refs) for i := 0; i < refs; i++ { - deleted[storage.SeriesRef(n*refs+i)] = struct{}{} + ref := storage.SeriesRef(n*refs + i) + deleted[ref] = struct{}{} + allSeries[ref].Range(func(l labels.Label) { + affected[l] = struct{}{} + }) } - p.Delete(deleted) + p.Delete(deleted, affected) } }) }