mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Copy tombstone intervals to avoid race (#12245)
Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
This commit is contained in:
parent
c66fdc19f9
commit
80b7f73d26
|
@ -501,6 +501,46 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing.T) {
|
||||||
|
c := blockQuerierTestCase{
|
||||||
|
mint: 2,
|
||||||
|
maxt: 6,
|
||||||
|
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "a", "a")},
|
||||||
|
exp: newMockSeriesSet([]storage.Series{
|
||||||
|
storage.NewListSeries(labels.FromStrings("a", "a"),
|
||||||
|
[]tsdbutil.Sample{sample{3, 4, nil, nil}, sample{5, 2, nil, nil}, sample{6, 3, nil, nil}},
|
||||||
|
),
|
||||||
|
storage.NewListSeries(labels.FromStrings("a", "a", "b", "b"),
|
||||||
|
[]tsdbutil.Sample{sample{3, 3, nil, nil}, sample{5, 3, nil, nil}, sample{6, 6, nil, nil}},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
|
||||||
|
storage.NewListChunkSeriesFromSamples(labels.FromStrings("a", "a"),
|
||||||
|
[]tsdbutil.Sample{sample{3, 4, nil, nil}}, []tsdbutil.Sample{sample{5, 2, nil, nil}, sample{6, 3, nil, nil}},
|
||||||
|
),
|
||||||
|
storage.NewListChunkSeriesFromSamples(labels.FromStrings("a", "a", "b", "b"),
|
||||||
|
[]tsdbutil.Sample{sample{3, 3, nil, nil}}, []tsdbutil.Sample{sample{5, 3, nil, nil}, sample{6, 6, nil, nil}},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
ir, cr, _, _ := createIdxChkReaders(t, testData)
|
||||||
|
stones := tombstones.NewMemTombstones()
|
||||||
|
p, err := ir.Postings("a", "a")
|
||||||
|
require.NoError(t, err)
|
||||||
|
refs, err := index.ExpandPostings(p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for _, ref := range refs {
|
||||||
|
stones.AddInterval(ref, tombstones.Interval{Mint: 1, Maxt: 2})
|
||||||
|
}
|
||||||
|
testBlockQuerier(t, c, ir, cr, stones)
|
||||||
|
for _, ref := range refs {
|
||||||
|
intervals, err := stones.Get(ref)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// Without copy, the intervals could be [math.MinInt64, 2].
|
||||||
|
require.Equal(t, tombstones.Intervals{{Mint: 1, Maxt: 2}}, intervals)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var testData = []seriesSamples{
|
var testData = []seriesSamples{
|
||||||
{
|
{
|
||||||
lset: map[string]string{"a": "a"},
|
lset: map[string]string{"a": "a"},
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash"
|
"hash"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -252,7 +253,14 @@ func NewTestMemTombstones(intervals []Intervals) *MemTombstones {
|
||||||
func (t *MemTombstones) Get(ref storage.SeriesRef) (Intervals, error) {
|
func (t *MemTombstones) Get(ref storage.SeriesRef) (Intervals, error) {
|
||||||
t.mtx.RLock()
|
t.mtx.RLock()
|
||||||
defer t.mtx.RUnlock()
|
defer t.mtx.RUnlock()
|
||||||
return t.intvlGroups[ref], nil
|
intervals, ok := t.intvlGroups[ref]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// Make a copy to avoid race.
|
||||||
|
res := make(Intervals, len(intervals))
|
||||||
|
copy(res, intervals)
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *MemTombstones) DeleteTombstones(refs map[storage.SeriesRef]struct{}) {
|
func (t *MemTombstones) DeleteTombstones(refs map[storage.SeriesRef]struct{}) {
|
||||||
|
@ -349,17 +357,23 @@ func (in Intervals) Add(n Interval) Intervals {
|
||||||
// Find min and max indexes of intervals that overlap with the new interval.
|
// Find min and max indexes of intervals that overlap with the new interval.
|
||||||
// Intervals are closed [t1, t2] and t is discreet, so if neighbour intervals are 1 step difference
|
// Intervals are closed [t1, t2] and t is discreet, so if neighbour intervals are 1 step difference
|
||||||
// to the new one, we can merge those together.
|
// to the new one, we can merge those together.
|
||||||
mini := sort.Search(len(in), func(i int) bool { return in[i].Maxt >= n.Mint-1 })
|
mini := 0
|
||||||
if mini == len(in) {
|
if n.Mint != math.MinInt64 { // Avoid overflow.
|
||||||
return append(in, n)
|
mini = sort.Search(len(in), func(i int) bool { return in[i].Maxt >= n.Mint-1 })
|
||||||
|
if mini == len(in) {
|
||||||
|
return append(in, n)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
maxi := sort.Search(len(in)-mini, func(i int) bool { return in[mini+i].Mint > n.Maxt+1 })
|
maxi := len(in)
|
||||||
if maxi == 0 {
|
if n.Maxt != math.MaxInt64 { // Avoid overflow.
|
||||||
if mini == 0 {
|
maxi = sort.Search(len(in)-mini, func(i int) bool { return in[mini+i].Mint > n.Maxt+1 })
|
||||||
return append(Intervals{n}, in...)
|
if maxi == 0 {
|
||||||
|
if mini == 0 {
|
||||||
|
return append(Intervals{n}, in...)
|
||||||
|
}
|
||||||
|
return append(in[:mini], append(Intervals{n}, in[mini:]...)...)
|
||||||
}
|
}
|
||||||
return append(in[:mini], append(Intervals{n}, in[mini:]...)...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.Mint < in[mini].Mint {
|
if n.Mint < in[mini].Mint {
|
||||||
|
|
|
@ -81,6 +81,22 @@ func TestDeletingTombstones(t *testing.T) {
|
||||||
require.Empty(t, intervals)
|
require.Empty(t, intervals)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTombstonesGetWithCopy(t *testing.T) {
|
||||||
|
stones := NewMemTombstones()
|
||||||
|
stones.AddInterval(1, Intervals{{Mint: 1, Maxt: 2}, {Mint: 7, Maxt: 8}, {Mint: 11, Maxt: 12}}...)
|
||||||
|
|
||||||
|
intervals0, err := stones.Get(1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, Intervals{{Mint: 1, Maxt: 2}, {Mint: 7, Maxt: 8}, {Mint: 11, Maxt: 12}}, intervals0)
|
||||||
|
intervals1 := intervals0.Add(Interval{Mint: 4, Maxt: 6})
|
||||||
|
require.Equal(t, Intervals{{Mint: 1, Maxt: 2}, {Mint: 4, Maxt: 8}, {Mint: 11, Maxt: 12}}, intervals0) // Original slice changed.
|
||||||
|
require.Equal(t, Intervals{{Mint: 1, Maxt: 2}, {Mint: 4, Maxt: 8}, {Mint: 11, Maxt: 12}}, intervals1)
|
||||||
|
|
||||||
|
intervals2, err := stones.Get(1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, Intervals{{Mint: 1, Maxt: 2}, {Mint: 7, Maxt: 8}, {Mint: 11, Maxt: 12}}, intervals2)
|
||||||
|
}
|
||||||
|
|
||||||
func TestTruncateBefore(t *testing.T) {
|
func TestTruncateBefore(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
before Intervals
|
before Intervals
|
||||||
|
@ -210,6 +226,26 @@ func TestAddingNewIntervals(t *testing.T) {
|
||||||
new: Interval{math.MinInt64, 10},
|
new: Interval{math.MinInt64, 10},
|
||||||
exp: Intervals{{math.MinInt64, math.MaxInt64}},
|
exp: Intervals{{math.MinInt64, math.MaxInt64}},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
exist: Intervals{{9, 10}},
|
||||||
|
new: Interval{math.MinInt64, 7},
|
||||||
|
exp: Intervals{{math.MinInt64, 7}, {9, 10}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
exist: Intervals{{9, 10}},
|
||||||
|
new: Interval{12, math.MaxInt64},
|
||||||
|
exp: Intervals{{9, 10}, {12, math.MaxInt64}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
exist: Intervals{{9, 10}},
|
||||||
|
new: Interval{math.MinInt64, 8},
|
||||||
|
exp: Intervals{{math.MinInt64, 10}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
exist: Intervals{{9, 10}},
|
||||||
|
new: Interval{11, math.MaxInt64},
|
||||||
|
exp: Intervals{{9, math.MaxInt64}},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
|
|
Loading…
Reference in a new issue