mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 13:14:05 -08:00
fix(wlog/watcher): read segment synchronously when not tailing (#13224)
Signed-off-by: Julien Levesy <jlevesy@gmail.com> Signed-off-by: Callum Styan <callumstyan@gmail.com> Co-authored-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
501f514389
commit
e4ec263bcc
|
@ -2,6 +2,7 @@
|
|||
|
||||
## unreleased
|
||||
|
||||
* [ENHANCEMENT] TSDB: Make the wlog watcher read segments synchronously when not tailing. #13224
|
||||
* [BUGFIX] Agent: Participate in notify calls. #13223
|
||||
|
||||
## 2.48.0 / 2023-11-16
|
||||
|
|
|
@ -65,7 +65,7 @@ type WriteTo interface {
|
|||
SeriesReset(int)
|
||||
}
|
||||
|
||||
// Used to notifier the watcher that data has been written so that it can read.
|
||||
// Used to notify the watcher that data has been written so that it can read.
|
||||
type WriteNotified interface {
|
||||
Notify()
|
||||
}
|
||||
|
@ -398,8 +398,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|||
|
||||
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
|
||||
|
||||
readTicker := time.NewTicker(readTimeout)
|
||||
defer readTicker.Stop()
|
||||
size := int64(math.MaxInt64)
|
||||
if !tail {
|
||||
var err error
|
||||
size, err = getSegmentSize(w.walDir, segmentNum)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getSegmentSize")
|
||||
}
|
||||
|
||||
return w.readAndHandleError(reader, segmentNum, tail, size)
|
||||
}
|
||||
|
||||
checkpointTicker := time.NewTicker(checkpointPeriod)
|
||||
defer checkpointTicker.Stop()
|
||||
|
@ -407,18 +415,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|||
segmentTicker := time.NewTicker(segmentCheckPeriod)
|
||||
defer segmentTicker.Stop()
|
||||
|
||||
// If we're replaying the segment we need to know the size of the file to know
|
||||
// when to return from watch and move on to the next segment.
|
||||
size := int64(math.MaxInt64)
|
||||
if !tail {
|
||||
segmentTicker.Stop()
|
||||
checkpointTicker.Stop()
|
||||
var err error
|
||||
size, err = getSegmentSize(w.walDir, segmentNum)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getSegmentSize")
|
||||
}
|
||||
}
|
||||
readTicker := time.NewTicker(readTimeout)
|
||||
defer readTicker.Stop()
|
||||
|
||||
gcSem := make(chan struct{}, 1)
|
||||
for {
|
||||
|
|
|
@ -630,3 +630,61 @@ func TestCheckpointSeriesReset(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun_StartupTime(t *testing.T) {
|
||||
const pageSize = 32 * 1024
|
||||
const segments = 10
|
||||
const seriesCount = 20
|
||||
const samplesCount = 300
|
||||
|
||||
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||
t.Run(string(compress), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
err := os.Mkdir(wdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{}
|
||||
w, err := NewSize(nil, nil, wdir, pageSize, compress)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 0; i < segments; i++ {
|
||||
for j := 0; j < seriesCount; j++ {
|
||||
ref := j + (i * 100)
|
||||
series := enc.Series([]record.RefSeries{
|
||||
{
|
||||
Ref: chunks.HeadSeriesRef(ref),
|
||||
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
|
||||
},
|
||||
}, nil)
|
||||
require.NoError(t, w.Log(series))
|
||||
|
||||
for k := 0; k < samplesCount; k++ {
|
||||
inner := rand.Intn(ref + 1)
|
||||
sample := enc.Samples([]record.RefSample{
|
||||
{
|
||||
Ref: chunks.HeadSeriesRef(inner),
|
||||
T: int64(i),
|
||||
V: float64(i),
|
||||
},
|
||||
}, nil)
|
||||
require.NoError(t, w.Log(sample))
|
||||
}
|
||||
}
|
||||
}
|
||||
require.NoError(t, w.Close())
|
||||
|
||||
wt := newWriteToMock()
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
|
||||
watcher.MaxSegment = segments
|
||||
|
||||
watcher.setMetrics()
|
||||
startTime := time.Now()
|
||||
|
||||
err = watcher.Run()
|
||||
require.Less(t, time.Since(startTime), readTimeout)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue