From 74f6aeeed9174c12bee774db7f017593c9161354 Mon Sep 17 00:00:00 2001 From: Julien Levesy Date: Fri, 1 Dec 2023 23:00:26 +0100 Subject: [PATCH 1/2] feat(tsdb/agent): notify remote storage when commit happens (#13223) Signed-off-by: Julien Levesy Signed-off-by: Callum Styan Co-authored-by: Callum Styan --- CHANGELOG.md | 4 ++++ cmd/prometheus/main.go | 1 + tsdb/agent/db.go | 12 ++++++++++++ 3 files changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bab5107cf..c541dd7050 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## unreleased + +* [BUGFIX] Agent: Participate in notify calls. #13223 + ## 2.48.0 / 2023-11-16 * [CHANGE] Remote-write: respect Retry-After header on 5xx errors. #12677 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 43b781f62c..b2f0d150ba 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1112,6 +1112,7 @@ func main() { ) localStorage.Set(db, 0) + db.SetWriteNotified(remoteStorage) close(dbOpen) <-cancel return nil diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 3912b9d52f..d44757bd74 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -241,6 +241,8 @@ type DB struct { donec chan struct{} stopc chan struct{} + writeNotified wlog.WriteNotified + metrics *dbMetrics } @@ -311,6 +313,12 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin return db, nil } +// SetWriteNotified allows to set an instance to notify when a write happens. +// It must be used during initialization. It is not safe to use it during execution. +func (db *DB) SetWriteNotified(wn wlog.WriteNotified) { + db.writeNotified = wn +} + func validateOptions(opts *Options) *Options { if opts == nil { opts = DefaultOptions() @@ -961,6 +969,10 @@ func (a *appender) Commit() error { a.clearData() a.appenderPool.Put(a) + + if a.writeNotified != nil { + a.writeNotified.Notify() + } return nil } From 0758ff1d1276166f7037646b93cc98a67488aa0a Mon Sep 17 00:00:00 2001 From: Julien Levesy Date: Fri, 1 Dec 2023 23:26:38 +0100 Subject: [PATCH 2/2] fix(wlog/watcher): read segment synchronously when not tailing (#13224) Signed-off-by: Julien Levesy Signed-off-by: Callum Styan Co-authored-by: Callum Styan --- CHANGELOG.md | 1 + tsdb/wlog/watcher.go | 28 +++++++++---------- tsdb/wlog/watcher_test.go | 58 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c541dd7050..71b8c97fe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## unreleased +* [ENHANCEMENT] TSDB: Make the wlog watcher read segments synchronously when not tailing. #13224 * [BUGFIX] Agent: Participate in notify calls. #13223 ## 2.48.0 / 2023-11-16 diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 221e9607ca..3315d19221 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -65,7 +65,7 @@ type WriteTo interface { SeriesReset(int) } -// Used to notifier the watcher that data has been written so that it can read. +// Used to notify the watcher that data has been written so that it can read. type WriteNotified interface { Notify() } @@ -398,8 +398,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { reader := NewLiveReader(w.logger, w.readerMetrics, segment) - readTicker := time.NewTicker(readTimeout) - defer readTicker.Stop() + size := int64(math.MaxInt64) + if !tail { + var err error + size, err = getSegmentSize(w.walDir, segmentNum) + if err != nil { + return errors.Wrap(err, "getSegmentSize") + } + + return w.readAndHandleError(reader, segmentNum, tail, size) + } checkpointTicker := time.NewTicker(checkpointPeriod) defer checkpointTicker.Stop() @@ -407,18 +415,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() - // If we're replaying the segment we need to know the size of the file to know - // when to return from watch and move on to the next segment. - size := int64(math.MaxInt64) - if !tail { - segmentTicker.Stop() - checkpointTicker.Stop() - var err error - size, err = getSegmentSize(w.walDir, segmentNum) - if err != nil { - return errors.Wrap(err, "getSegmentSize") - } - } + readTicker := time.NewTicker(readTimeout) + defer readTicker.Stop() gcSem := make(chan struct{}, 1) for { diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index bc6a10126e..fc665b57d6 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -630,3 +630,61 @@ func TestCheckpointSeriesReset(t *testing.T) { }) } } + +func TestRun_StartupTime(t *testing.T) { + const pageSize = 32 * 1024 + const segments = 10 + const seriesCount = 20 + const samplesCount = 300 + + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(string(compress), func(t *testing.T) { + dir := t.TempDir() + + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, pageSize, compress) + require.NoError(t, err) + + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + require.NoError(t, w.Close()) + + wt := newWriteToMock() + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher.MaxSegment = segments + + watcher.setMetrics() + startTime := time.Now() + + err = watcher.Run() + require.Less(t, time.Since(startTime), readTimeout) + require.NoError(t, err) + }) + } +}