From 1cf79e70da4437152e405cf4aded841312022310 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 12 Jul 2023 16:46:26 -0700 Subject: [PATCH] some refactoring Signed-off-by: Callum Styan --- storage/remote/marker_file_handler.go | 25 +-- storage/remote/marker_handler.go | 23 +- storage/remote/queue_manager.go | 12 +- storage/remote/queue_manager_test.go | 311 ++++++++++++++++++++++---- storage/remote/write.go | 9 +- 5 files changed, 300 insertions(+), 80 deletions(-) diff --git a/storage/remote/marker_file_handler.go b/storage/remote/marker_file_handler.go index 37e010675..0a268a636 100644 --- a/storage/remote/marker_file_handler.go +++ b/storage/remote/marker_file_handler.go @@ -1,8 +1,6 @@ package remote import ( - "fmt" - "io/ioutil" "os" "path/filepath" "strconv" @@ -22,40 +20,37 @@ type MarkerFileHandler interface { type markerFileHandler struct { segmentToMark chan int quit chan struct{} + dir string logger log.Logger lastMarkedSegmentFilePath string } -var ( - _ MarkerFileHandler = (*markerFileHandler)(nil) -) - -func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) (MarkerFileHandler, error) { - markerDir := filepath.Join(walDir, "remote", markerId) - +func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) MarkerFileHandler { dir := filepath.Join(walDir, "remote", markerId) - if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, fmt.Errorf("error creating segment marker folder %q: %w", dir, err) - } mfh := &markerFileHandler{ segmentToMark: make(chan int, 1), quit: make(chan struct{}), logger: logger, - lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"), + dir: dir, + lastMarkedSegmentFilePath: filepath.Join(dir, "segment"), } //TODO: Should this be in a separate Start() function? go mfh.markSegmentAsync() - return mfh, nil + return mfh +} + +func (mfh *markerFileHandler) Start() { + go mfh.markSegmentAsync() } // LastMarkedSegment implements wlog.Marker. func (mfh *markerFileHandler) LastMarkedSegment() int { - bb, err := ioutil.ReadFile(mfh.lastMarkedSegmentFilePath) + bb, err := os.ReadFile(mfh.lastMarkedSegmentFilePath) if os.IsNotExist(err) { level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath) return -1 diff --git a/storage/remote/marker_handler.go b/storage/remote/marker_handler.go index 998acff16..04ced058d 100644 --- a/storage/remote/marker_handler.go +++ b/storage/remote/marker_handler.go @@ -1,6 +1,9 @@ package remote import ( + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/tsdb/wlog" ) @@ -10,10 +13,13 @@ type MarkerHandler interface { UpdateReceivedData(segmentId, dataCount int) // Data queued for sending UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending + Start() Stop() } type markerHandler struct { + logger log.Logger + clientName string markerFileHandler MarkerFileHandler lastMarkedSegment int dataIOUpdate chan data @@ -26,12 +32,10 @@ type data struct { dataCount int } -var ( - _ MarkerHandler = (*markerHandler)(nil) -) - -func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler { +func NewMarkerHandler(logger log.Logger, clientName string, mfh MarkerFileHandler) MarkerHandler { mh := &markerHandler{ + logger: logger, + clientName: clientName, lastMarkedSegment: -1, // Segment ID last marked on disk. markerFileHandler: mfh, //TODO: What is a good size for the channel? @@ -39,15 +43,16 @@ func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler { quit: make(chan struct{}), } + return mh +} + +func (mh *markerHandler) Start() { // Load the last marked segment from disk (if it exists). if lastSegment := mh.markerFileHandler.LastMarkedSegment(); lastSegment >= 0 { mh.lastMarkedSegment = lastSegment } - - //TODO: Should this be in a separate Start() function? + level.Info(mh.logger).Log("msg", "Starting WAL marker handler", "queue", mh.clientName) go mh.updatePendingData() - - return mh } func (mh *markerHandler) LastMarkedSegment() int { diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index fb65cac01..7f01a025a 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -428,10 +428,6 @@ type QueueManager struct { highestRecvTimestamp *maxTimestamp } -var ( - _ wlog.WriteTo = (*QueueManager)(nil) -) - // NewQueueManager builds a new QueueManager and starts a new // WAL watcher with queue manager as the WriteTo destination. // The WAL watcher takes the dir parameter as the base directory @@ -455,7 +451,7 @@ func NewQueueManager( sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, - markerHandler MarkerHandler, + markerDir string, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -484,8 +480,6 @@ func NewQueueManager( seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), - markerHandler: markerHandler, - numShards: cfg.MinShards, reshardChan: make(chan int), quit: make(chan struct{}), @@ -505,6 +499,8 @@ func NewQueueManager( t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } t.shards = t.newShards() + markerFileHandler := NewMarkerFileHandler(logger, markerDir, client.Name()) + t.markerHandler = NewMarkerHandler(logger, client.Name(), markerFileHandler) return t } @@ -796,6 +792,7 @@ func (t *QueueManager) Start() { t.shards.start(t.numShards) t.watcher.Start() + t.markerHandler.Start() if t.mcfg.Send { t.metadataWatcher.Start() } @@ -821,6 +818,7 @@ func (t *QueueManager) Stop() { if t.mcfg.Send { t.metadataWatcher.Stop() } + // we should stop the marker last so that it can update the marker based on any last batches the shards sent t.markerHandler.Stop() // On shutdown, release the strings in the labels from the intern pool. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 75269441f..ace19dc4b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -16,6 +16,7 @@ package remote import ( "context" "fmt" + "github.com/prometheus/prometheus/model/timestamp" "math" "os" "runtime/pprof" @@ -39,7 +40,6 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" @@ -167,13 +167,28 @@ func TestMetadataDelivery(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.Start() defer m.Stop() @@ -211,10 +226,25 @@ func TestSampleDeliveryTimeout(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -254,13 +284,28 @@ func TestSampleDeliveryOrder(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.StoreSeries(series, 0) m.Start() @@ -276,13 +321,28 @@ func TestShutdown(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + deadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -317,12 +377,27 @@ func TestSeriesReset(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + deadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -350,10 +425,25 @@ func TestReshard(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.StoreSeries(series, 0) m.Start() @@ -389,7 +479,24 @@ func TestReshardRaceWithStop(*testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m = NewQueueManager(metrics, + nil, + nil, + nil, + "", + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.Start() h.Unlock() h.Lock() @@ -424,7 +531,24 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, + nil, + nil, + nil, + t.TempDir(), + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + flushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.StoreSeries(series, 0) m.Start() @@ -469,7 +593,24 @@ func TestQueueFilledDeadlock(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, + nil, + nil, + nil, + t.TempDir(), + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + flushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -496,7 +637,24 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, + nil, + nil, + nil, + "", + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.Start() defer m.Stop() @@ -543,7 +701,24 @@ func TestShouldReshard(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, + nil, + nil, + nil, + "", + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + client, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + client.Name()) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -686,14 +861,6 @@ func NewTestWriteClient() *TestWriteClient { } } -func NewTestMarkerHandler(t *testing.T, walDir, markerId string) MarkerHandler { - //TODO: Mock the filesystem? - markerFileHandler, err := NewMarkerFileHandler(log.NewNopLogger(), walDir, markerId) - require.NoError(t, err) - - return NewMarkerHandler(markerFileHandler) -} - func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { if !c.withWaitGroup { return @@ -927,7 +1094,24 @@ func BenchmarkSampleSend(b *testing.B) { dir := b.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.StoreSeries(series, 0) // These should be received by the client. @@ -971,9 +1155,24 @@ func BenchmarkStartup(b *testing.B) { for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil, "", "") c := NewTestBlockedWriteClient() - m := NewQueueManager(metrics, nil, nil, logger, dir, + m := NewQueueManager(metrics, + nil, + nil, + nil, + "", newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false) + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -1054,11 +1253,26 @@ func TestCalculateDesiredShards(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + samplesIn, + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1133,11 +1347,26 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { dir := t.TempDir() - markerHandler := NewTestMarkerHandler(t, dir, "rw1") - metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + samplesIn, + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + c.Name()) for _, tc := range []struct { name string diff --git a/storage/remote/write.go b/storage/remote/write.go index 95fe61335..f4a2c0869 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -180,13 +180,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { logger = log.NewNopLogger() } - markerFileHandler, err := NewMarkerFileHandler(logger, rws.dir, hash) - if err != nil { - return err - } - - markerHandler := NewMarkerHandler(markerFileHandler) - // Redacted to remove any passwords in the URL (that are // technically accepted but not recommended) since this is // only used for metric labels. @@ -209,7 +202,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, - markerHandler, + rws.dir, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash)