mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-27 21:52:25 -08:00
Merge branch 'prometheus:main' into chore/bump-to-go1.21
This commit is contained in:
commit
d55fe5064d
|
@ -262,10 +262,8 @@ func (w *Watcher) loop() {
|
||||||
// Run the watcher, which will tail the WAL until the quit channel is closed
|
// Run the watcher, which will tail the WAL until the quit channel is closed
|
||||||
// or an error case is hit.
|
// or an error case is hit.
|
||||||
func (w *Watcher) Run() error {
|
func (w *Watcher) Run() error {
|
||||||
_, lastSegment, err := w.firstAndLast()
|
var lastSegment int
|
||||||
if err != nil {
|
var err error
|
||||||
return fmt.Errorf("wal.Segments: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We want to ensure this is false across iterations since
|
// We want to ensure this is false across iterations since
|
||||||
// Run will be called again if there was a failure to read the WAL.
|
// Run will be called again if there was a failure to read the WAL.
|
||||||
|
@ -296,9 +294,17 @@ func (w *Watcher) Run() error {
|
||||||
w.currentSegmentMetric.Set(float64(currentSegment))
|
w.currentSegmentMetric.Set(float64(currentSegment))
|
||||||
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
|
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
|
||||||
|
|
||||||
|
// Reset the value of lastSegment each iteration, this is to avoid having to wait too long for
|
||||||
|
// between reads if we're reading a segment that is not the most recent segment after startup.
|
||||||
|
_, lastSegment, err = w.firstAndLast()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("wal.Segments: %w", err)
|
||||||
|
}
|
||||||
|
tail := currentSegment >= lastSegment
|
||||||
|
|
||||||
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
||||||
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
||||||
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
|
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,29 +58,47 @@ type writeToMock struct {
|
||||||
floatHistogramsAppended int
|
floatHistogramsAppended int
|
||||||
seriesLock sync.Mutex
|
seriesLock sync.Mutex
|
||||||
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
|
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
|
||||||
|
|
||||||
|
// delay reads with a short sleep
|
||||||
|
delay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wtm *writeToMock) Append(s []record.RefSample) bool {
|
func (wtm *writeToMock) Append(s []record.RefSample) bool {
|
||||||
|
if wtm.delay > 0 {
|
||||||
|
time.Sleep(wtm.delay)
|
||||||
|
}
|
||||||
wtm.samplesAppended += len(s)
|
wtm.samplesAppended += len(s)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
|
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
|
||||||
|
if wtm.delay > 0 {
|
||||||
|
time.Sleep(wtm.delay)
|
||||||
|
}
|
||||||
wtm.exemplarsAppended += len(e)
|
wtm.exemplarsAppended += len(e)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
|
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
|
||||||
|
if wtm.delay > 0 {
|
||||||
|
time.Sleep(wtm.delay)
|
||||||
|
}
|
||||||
wtm.histogramsAppended += len(h)
|
wtm.histogramsAppended += len(h)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
|
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
|
||||||
|
if wtm.delay > 0 {
|
||||||
|
time.Sleep(wtm.delay)
|
||||||
|
}
|
||||||
wtm.floatHistogramsAppended += len(fh)
|
wtm.floatHistogramsAppended += len(fh)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
|
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
|
||||||
|
if wtm.delay > 0 {
|
||||||
|
time.Sleep(wtm.delay)
|
||||||
|
}
|
||||||
wtm.UpdateSeriesSegment(series, index)
|
wtm.UpdateSeriesSegment(series, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int {
|
||||||
return len(wtm.seriesSegmentIndexes)
|
return len(wtm.seriesSegmentIndexes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWriteToMock() *writeToMock {
|
func newWriteToMock(delay time.Duration) *writeToMock {
|
||||||
return &writeToMock{
|
return &writeToMock{
|
||||||
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
|
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
|
||||||
|
delay: delay,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) {
|
||||||
first, last, err := Segments(w.Dir())
|
first, last, err := Segments(w.Dir())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
|
||||||
watcher.SetStartTime(now)
|
watcher.SetStartTime(now)
|
||||||
|
|
||||||
|
@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||||
_, _, err = Segments(w.Dir())
|
_, _, err = Segments(w.Dir())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
|
@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||||
_, _, err = Segments(w.Dir())
|
_, _, err = Segments(w.Dir())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
readTimeout = time.Second
|
readTimeout = time.Second
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
|
@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) {
|
||||||
_, _, err = Segments(w.Dir())
|
_, _, err = Segments(w.Dir())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
|
||||||
|
@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
watcher.MaxSegment = -1
|
watcher.MaxSegment = -1
|
||||||
|
|
||||||
|
@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
readTimeout = time.Second
|
readTimeout = time.Second
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
watcher.MaxSegment = -1
|
watcher.MaxSegment = -1
|
||||||
go watcher.Start()
|
go watcher.Start()
|
||||||
|
@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.NoError(t, w.Close())
|
require.NoError(t, w.Close())
|
||||||
|
|
||||||
wt := newWriteToMock()
|
wt := newWriteToMock(0)
|
||||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
watcher.MaxSegment = segments
|
watcher.MaxSegment = segments
|
||||||
|
|
||||||
|
@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
|
||||||
|
const pageSize = 32 * 1024
|
||||||
|
const segments = 10
|
||||||
|
const seriesCount = 20
|
||||||
|
const samplesCount = 300
|
||||||
|
|
||||||
|
// This test can take longer than intended to finish in cloud CI.
|
||||||
|
readTimeout := 10 * time.Second
|
||||||
|
|
||||||
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
|
t.Run(string(compress), func(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
wdir := path.Join(dir, "wal")
|
||||||
|
err := os.Mkdir(wdir, 0o777)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
enc := record.Encoder{}
|
||||||
|
w, err := NewSize(nil, nil, wdir, pageSize, compress)
|
||||||
|
require.NoError(t, err)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
// add one segment initially to ensure there's a value > 0 for the last segment id
|
||||||
|
for i := 0; i < 1; i++ {
|
||||||
|
for j := 0; j < seriesCount; j++ {
|
||||||
|
ref := j + (i * 100)
|
||||||
|
series := enc.Series([]record.RefSeries{
|
||||||
|
{
|
||||||
|
Ref: chunks.HeadSeriesRef(ref),
|
||||||
|
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, w.Log(series))
|
||||||
|
|
||||||
|
for k := 0; k < samplesCount; k++ {
|
||||||
|
inner := rand.Intn(ref + 1)
|
||||||
|
sample := enc.Samples([]record.RefSample{
|
||||||
|
{
|
||||||
|
Ref: chunks.HeadSeriesRef(inner),
|
||||||
|
T: int64(i),
|
||||||
|
V: float64(i),
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, w.Log(sample))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 1; i < segments; i++ {
|
||||||
|
for j := 0; j < seriesCount; j++ {
|
||||||
|
ref := j + (i * 100)
|
||||||
|
series := enc.Series([]record.RefSeries{
|
||||||
|
{
|
||||||
|
Ref: chunks.HeadSeriesRef(ref),
|
||||||
|
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, w.Log(series))
|
||||||
|
|
||||||
|
for k := 0; k < samplesCount; k++ {
|
||||||
|
inner := rand.Intn(ref + 1)
|
||||||
|
sample := enc.Samples([]record.RefSample{
|
||||||
|
{
|
||||||
|
Ref: chunks.HeadSeriesRef(inner),
|
||||||
|
T: int64(i),
|
||||||
|
V: float64(i),
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, w.Log(sample))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wt := newWriteToMock(time.Millisecond)
|
||||||
|
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||||
|
watcher.MaxSegment = segments
|
||||||
|
|
||||||
|
watcher.setMetrics()
|
||||||
|
startTime := time.Now()
|
||||||
|
err = watcher.Run()
|
||||||
|
wg.Wait()
|
||||||
|
require.Less(t, time.Since(startTime), readTimeout)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, w.Close())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue