fix(wlog/watcher_test.go): make TestRun_AvoidNotifyWhenBehind more resilient

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
machine424 2024-08-23 14:39:26 +02:00 committed by Ayoub Mrini
parent 00d23c9689
commit d1b4312f0a

View file

@ -25,6 +25,7 @@ import (
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -52,6 +53,13 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) {
t.Logf("function returned false") t.Logf("function returned false")
} }
// Overwrite readTimeout defined in watcher.go.
func overwriteReadTimeout(t *testing.T, val time.Duration) {
initialVal := readTimeout
readTimeout = val
t.Cleanup(func() { readTimeout = initialVal })
}
type writeToMock struct { type writeToMock struct {
samplesAppended int samplesAppended int
exemplarsAppended int exemplarsAppended int
@ -302,7 +310,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
} }
} }
require.NoError(t, w.Log(recs...)) require.NoError(t, w.Log(recs...))
readTimeout = time.Second overwriteReadTimeout(t, time.Second)
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
@ -394,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
readTimeout = time.Second overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0) wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
go watcher.Start() go watcher.Start()
@ -607,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
readTimeout = time.Second overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0) wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
@ -742,9 +750,6 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const seriesCount = 10 const seriesCount = 10
const samplesCount = 50 const samplesCount = 50
// This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) { t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
@ -755,36 +760,50 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
w, err := NewSize(nil, nil, wdir, segmentSize, compress) w, err := NewSize(nil, nil, wdir, segmentSize, compress)
require.NoError(t, err) require.NoError(t, err)
var wg sync.WaitGroup // Write to 00000000, the watcher will read series from it.
// Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk.
require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount)) require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
w.NextSegment() // Force creation of the next segment // Create 00000001, the watcher will tail it once started.
wg.Add(1) w.NextSegment()
go func() {
defer wg.Done()
for i := 1; i < segmentsToWrite; i++ {
require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
w.NextSegment()
}
}()
// Set up the watcher and run it in the background.
wt := newWriteToMock(time.Millisecond) wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.setMetrics()
watcher.MaxSegment = segmentsToRead watcher.MaxSegment = segmentsToRead
watcher.setMetrics() var g errgroup.Group
startTime := time.Now() g.Go(func() error {
err = watcher.Run() startTime := time.Now()
wg.Wait() err = watcher.Run()
require.Less(t, time.Since(startTime), readTimeout) if err != nil {
return err
// But samples records shouldn't get dropped }
retry(t, defaultRetryInterval, defaultRetries, func() bool { // If the watcher was to wait for readTicker to read every new segment, it would need readTimeout * segmentsToRead.
return wt.checkNumSeries() > 0 d := time.Since(startTime)
if d > readTimeout {
return fmt.Errorf("watcher ran for %s, it shouldn't rely on readTicker=%s to read the new segments", d, readTimeout)
}
return nil
}) })
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
require.NoError(t, err) // The watcher went through 00000000 and is tailing the next one.
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() == seriesCount
})
// In the meantime, add some new segments in bulk.
// We should end up with segmentsToWrite + 1 segments now.
for i := 1; i < segmentsToWrite; i++ {
require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
w.NextSegment()
}
// Wait for the watcher.
require.NoError(t, g.Wait())
// All series and samples were read.
require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read.
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
require.NoError(t, w.Close()) require.NoError(t, w.Close())
}) })
} }