Merge pull request #13251 from prometheus/backport-agent-notify-fixes

Backport agent notify fixes
This commit is contained in:
Bartlomiej Plotka 2023-12-07 12:20:35 +00:00 committed by GitHub
commit 6034dcbbac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 15 deletions

View file

@ -1,5 +1,10 @@
# Changelog # Changelog
## unreleased
* [ENHANCEMENT] TSDB: Make the wlog watcher read segments synchronously when not tailing. #13224
* [BUGFIX] Agent: Participate in notify calls. #13223
## 2.48.0 / 2023-11-16 ## 2.48.0 / 2023-11-16
* [CHANGE] Remote-write: respect Retry-After header on 5xx errors. #12677 * [CHANGE] Remote-write: respect Retry-After header on 5xx errors. #12677

View file

@ -1112,6 +1112,7 @@ func main() {
) )
localStorage.Set(db, 0) localStorage.Set(db, 0)
db.SetWriteNotified(remoteStorage)
close(dbOpen) close(dbOpen)
<-cancel <-cancel
return nil return nil

View file

@ -241,6 +241,8 @@ type DB struct {
donec chan struct{} donec chan struct{}
stopc chan struct{} stopc chan struct{}
writeNotified wlog.WriteNotified
metrics *dbMetrics metrics *dbMetrics
} }
@ -311,6 +313,12 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
return db, nil return db, nil
} }
// SetWriteNotified allows to set an instance to notify when a write happens.
// It must be used during initialization. It is not safe to use it during execution.
func (db *DB) SetWriteNotified(wn wlog.WriteNotified) {
db.writeNotified = wn
}
func validateOptions(opts *Options) *Options { func validateOptions(opts *Options) *Options {
if opts == nil { if opts == nil {
opts = DefaultOptions() opts = DefaultOptions()
@ -961,6 +969,10 @@ func (a *appender) Commit() error {
a.clearData() a.clearData()
a.appenderPool.Put(a) a.appenderPool.Put(a)
if a.writeNotified != nil {
a.writeNotified.Notify()
}
return nil return nil
} }

View file

@ -65,7 +65,7 @@ type WriteTo interface {
SeriesReset(int) SeriesReset(int)
} }
// Used to notifier the watcher that data has been written so that it can read. // Used to notify the watcher that data has been written so that it can read.
type WriteNotified interface { type WriteNotified interface {
Notify() Notify()
} }
@ -398,8 +398,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
reader := NewLiveReader(w.logger, w.readerMetrics, segment) reader := NewLiveReader(w.logger, w.readerMetrics, segment)
readTicker := time.NewTicker(readTimeout) size := int64(math.MaxInt64)
defer readTicker.Stop() if !tail {
var err error
size, err = getSegmentSize(w.walDir, segmentNum)
if err != nil {
return errors.Wrap(err, "getSegmentSize")
}
return w.readAndHandleError(reader, segmentNum, tail, size)
}
checkpointTicker := time.NewTicker(checkpointPeriod) checkpointTicker := time.NewTicker(checkpointPeriod)
defer checkpointTicker.Stop() defer checkpointTicker.Stop()
@ -407,18 +415,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
segmentTicker := time.NewTicker(segmentCheckPeriod) segmentTicker := time.NewTicker(segmentCheckPeriod)
defer segmentTicker.Stop() defer segmentTicker.Stop()
// If we're replaying the segment we need to know the size of the file to know readTicker := time.NewTicker(readTimeout)
// when to return from watch and move on to the next segment. defer readTicker.Stop()
size := int64(math.MaxInt64)
if !tail {
segmentTicker.Stop()
checkpointTicker.Stop()
var err error
size, err = getSegmentSize(w.walDir, segmentNum)
if err != nil {
return errors.Wrap(err, "getSegmentSize")
}
}
gcSem := make(chan struct{}, 1) gcSem := make(chan struct{}, 1)
for { for {

View file

@ -630,3 +630,61 @@ func TestCheckpointSeriesReset(t *testing.T) {
}) })
} }
} }
func TestRun_StartupTime(t *testing.T) {
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
const samplesCount = 300
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
require.NoError(t, err)
enc := record.Encoder{}
w, err := NewSize(nil, nil, wdir, pageSize, compress)
require.NoError(t, err)
for i := 0; i < segments; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))
for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
require.NoError(t, w.Close())
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments
watcher.setMetrics()
startTime := time.Now()
err = watcher.Run()
require.Less(t, time.Since(startTime), readTimeout)
require.NoError(t, err)
})
}
}