mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
tsdb.CircularExemplarStorage: Avoid racing (#15231)
* tsdb.CircularExemplarStorage: Avoid racing --------- Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
559722dc68
commit
706dcfeecf
|
@ -152,13 +152,13 @@ func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQ
|
||||||
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
|
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
|
||||||
ret := make([]exemplar.QueryResult, 0)
|
ret := make([]exemplar.QueryResult, 0)
|
||||||
|
|
||||||
|
ce.lock.RLock()
|
||||||
|
defer ce.lock.RUnlock()
|
||||||
|
|
||||||
if len(ce.exemplars) == 0 {
|
if len(ce.exemplars) == 0 {
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ce.lock.RLock()
|
|
||||||
defer ce.lock.RUnlock()
|
|
||||||
|
|
||||||
// Loop through each index entry, which will point us to first/last exemplar for each series.
|
// Loop through each index entry, which will point us to first/last exemplar for each series.
|
||||||
for _, idx := range ce.index {
|
for _, idx := range ce.index {
|
||||||
var se exemplar.QueryResult
|
var se exemplar.QueryResult
|
||||||
|
@ -281,13 +281,13 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
|
||||||
l = 0
|
l = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ce.lock.Lock()
|
||||||
|
defer ce.lock.Unlock()
|
||||||
|
|
||||||
if l == int64(len(ce.exemplars)) {
|
if l == int64(len(ce.exemplars)) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
ce.lock.Lock()
|
|
||||||
defer ce.lock.Unlock()
|
|
||||||
|
|
||||||
oldBuffer := ce.exemplars
|
oldBuffer := ce.exemplars
|
||||||
oldNextIndex := int64(ce.nextIndex)
|
oldNextIndex := int64(ce.nextIndex)
|
||||||
|
|
||||||
|
@ -349,6 +349,11 @@ func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
|
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
|
||||||
|
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
|
||||||
|
// Optimize by moving the lock to be per series (& benchmark it).
|
||||||
|
ce.lock.Lock()
|
||||||
|
defer ce.lock.Unlock()
|
||||||
|
|
||||||
if len(ce.exemplars) == 0 {
|
if len(ce.exemplars) == 0 {
|
||||||
return storage.ErrExemplarsDisabled
|
return storage.ErrExemplarsDisabled
|
||||||
}
|
}
|
||||||
|
@ -356,11 +361,6 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
||||||
var buf [1024]byte
|
var buf [1024]byte
|
||||||
seriesLabels := l.Bytes(buf[:])
|
seriesLabels := l.Bytes(buf[:])
|
||||||
|
|
||||||
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
|
|
||||||
// Optimize by moving the lock to be per series (& benchmark it).
|
|
||||||
ce.lock.Lock()
|
|
||||||
defer ce.lock.Unlock()
|
|
||||||
|
|
||||||
idx, ok := ce.index[string(seriesLabels)]
|
idx, ok := ce.index[string(seriesLabels)]
|
||||||
err := ce.validateExemplar(idx, e, true)
|
err := ce.validateExemplar(idx, e, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -499,3 +500,40 @@ func BenchmarkResizeExemplars(b *testing.B) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCircularExemplarStorage_Concurrent_AddExemplar_Resize tries to provoke a data race between AddExemplar and Resize.
|
||||||
|
// Run with race detection enabled.
|
||||||
|
func TestCircularExemplarStorage_Concurrent_AddExemplar_Resize(t *testing.T) {
|
||||||
|
exs, err := NewCircularExemplarStorage(0, eMetrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
es := exs.(*CircularExemplarStorage)
|
||||||
|
|
||||||
|
l := labels.FromStrings("service", "asdf")
|
||||||
|
e := exemplar.Exemplar{
|
||||||
|
Labels: labels.FromStrings("trace_id", "qwerty"),
|
||||||
|
Value: 0.1,
|
||||||
|
Ts: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
t.Cleanup(wg.Wait)
|
||||||
|
|
||||||
|
started := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
<-started
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
require.NoError(t, es.AddExemplar(l, e))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
es.Resize(int64(i + 1))
|
||||||
|
if i == 0 {
|
||||||
|
close(started)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue