Extend the check in TestRun_AvoidNotifyWhenBehind to ensure we are

reading the full contents of each segment when reading segments that are
not the segment which is currently being written to.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2024-07-08 16:53:44 -07:00
parent 7d5b5b0b4a
commit 615ee7b4dc

View file

@ -703,11 +703,11 @@ func TestRun_StartupTime(t *testing.T) {
func TestRun_AvoidNotifyWhenBehind(t *testing.T) { func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const pageSize = 32 * 1024 const pageSize = 32 * 1024
const segments = 10 const segments = 10
const seriesCount = 20 const seriesCount = 10
const samplesCount = 300 const samplesCount = 85
// This test can take longer than intended to finish in cloud CI. // This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second readTimeout := 15 * time.Second
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) { t.Run(string(compress), func(t *testing.T) {
@ -745,11 +745,16 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
require.NoError(t, w.Log(sample)) require.NoError(t, w.Log(sample))
} }
} }
// Force creation of the next segment
w.NextSegment()
} }
wg.Add(1) wg.Add(1)
var samplesAfterStart int
go func() { go func() {
defer wg.Done() defer wg.Done()
for i := 1; i < segments; i++ { // add one more than max segment so that we can exit nicely
// when we get to the max segment value in the watcher
for i := 1; i < segments+1; i++ {
for j := 0; j < seriesCount; j++ { for j := 0; j < seriesCount; j++ {
ref := j + (i * 100) ref := j + (i * 100)
series := enc.Series([]record.RefSeries{ series := enc.Series([]record.RefSeries{
@ -769,9 +774,14 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
V: float64(i), V: float64(i),
}, },
}, nil) }, nil)
if w.segment.i <= segments {
samplesAfterStart++
}
require.NoError(t, w.Log(sample)) require.NoError(t, w.Log(sample))
} }
} }
// Force creation of the next segment
w.NextSegment()
} }
}() }()
@ -781,16 +791,12 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
watcher.setMetrics() watcher.setMetrics()
startTime := time.Now() startTime := time.Now()
err = watcher.Run() err = watcher.Run()
wg.Wait() wg.Wait()
require.Less(t, time.Since(startTime), readTimeout) require.Less(t, time.Since(startTime), readTimeout)
// But samples records shouldn't get dropped require.Eventually(t, func() bool { return samplesAfterStart == wt.samplesAppended }, readTimeout, time.Second, fmt.Sprintf("expected %d samples but saw %d", samplesAfterStart, wt.samplesAppended))
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() > 0
})
require.Greater(t, wt.samplesAppended, 0)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Close()) require.NoError(t, w.Close())
}) })