From b9ae36c3c3e7b69dffed2b79ccc3f56d81a7590b Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 28 Jun 2023 12:32:05 +0100 Subject: [PATCH] Robert's changes + async file write --- storage/remote/marker_file_handler.go | 131 ++++++++++++++++++++++++++ storage/remote/marker_handler.go | 112 ++++++++++++++++++++++ storage/remote/queue_manager.go | 60 +++++++++--- storage/remote/queue_manager_test.go | 34 +++---- storage/remote/write.go | 15 ++- tsdb/wlog/watcher.go | 63 ++++++++++--- tsdb/wlog/watcher_test.go | 20 ++-- 7 files changed, 383 insertions(+), 52 deletions(-) create mode 100644 storage/remote/marker_file_handler.go create mode 100644 storage/remote/marker_handler.go diff --git a/storage/remote/marker_file_handler.go b/storage/remote/marker_file_handler.go new file mode 100644 index 0000000000..f7b62cf5f3 --- /dev/null +++ b/storage/remote/marker_file_handler.go @@ -0,0 +1,131 @@ +package remote + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/wlog" +) + +type MarkerFileHandler interface { + wlog.Marker + MarkSegment(segment int) + Stop() +} + +type markerFileHandler struct { + segmentToMark chan int + quit chan struct{} + + logger log.Logger + + lastMarkedSegmentFilePath string +} + +var ( + _ MarkerFileHandler = (*markerFileHandler)(nil) +) + +func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) (MarkerFileHandler, error) { + markerDir := filepath.Join(walDir, "remote", markerId) + + dir := filepath.Join(walDir, "remote", markerId) + if err := os.MkdirAll(dir, 0o777); err != nil { + return nil, fmt.Errorf("error creating segment marker folder %q: %w", dir, err) + } + + mfh := &markerFileHandler{ + segmentToMark: make(chan int, 1), + quit: make(chan struct{}), + logger: logger, + lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"), + } + + go mfh.markSegmentAsync() + + return mfh, nil +} + +// LastMarkedSegment implements wlog.Marker. +func (mfh *markerFileHandler) LastMarkedSegment() int { + bb, err := ioutil.ReadFile(mfh.lastMarkedSegmentFilePath) + if os.IsNotExist(err) { + level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath) + return -1 + } else if err != nil { + level.Error(mfh.logger).Log("msg", "could not access segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return -1 + } + + savedSegment, err := strconv.Atoi(string(bb)) + if err != nil { + level.Error(mfh.logger).Log("msg", "could not read segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return -1 + } + + if savedSegment < 0 { + level.Error(mfh.logger).Log("msg", "invalid segment number inside marker file", "file", mfh.lastMarkedSegmentFilePath, "segment number", savedSegment) + return -1 + } + + return savedSegment +} + +// MarkSegment implements MarkerHandler. +func (mfh *markerFileHandler) MarkSegment(segment int) { + var ( + segmentText = strconv.Itoa(segment) + tmp = mfh.lastMarkedSegmentFilePath + ".tmp" + ) + + if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil { + level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err) + return + } + if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil { + level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return + } + + level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment) +} + +// Stop implements MarkerHandler. +func (mfh *markerFileHandler) Stop() { + level.Debug(mfh.logger).Log("msg", "waiting for marker file handler to shut down...") + mfh.quit <- struct{}{} +} + +func (mfh *markerFileHandler) markSegmentAsync() { + for { + select { + case segmentToMark := <-mfh.segmentToMark: + if segmentToMark >= 0 { + var ( + segmentText = strconv.Itoa(segmentToMark) + tmp = mfh.lastMarkedSegmentFilePath + ".tmp" + ) + + if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil { + level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err) + return + } + if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil { + level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return + } + + level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segmentToMark) + } + case <-mfh.quit: + level.Debug(mfh.logger).Log("msg", "quitting marker handler") + return + } + } +} diff --git a/storage/remote/marker_handler.go b/storage/remote/marker_handler.go new file mode 100644 index 0000000000..59fe09b983 --- /dev/null +++ b/storage/remote/marker_handler.go @@ -0,0 +1,112 @@ +package remote + +import ( + "sync" + + "github.com/prometheus/prometheus/tsdb/wlog" +) + +type MarkerHandler interface { + wlog.Marker + UpdatePendingData(dataCount, dataSegment int) + ProcessConsumedData(data map[int]int) + Stop() +} + +type markerHandler struct { + markerFileHandler MarkerFileHandler + pendingDataMut sync.Mutex + latestDataSegment int + lastMarkedSegment int + pendingDataSegments map[int]int // Map of segment index to pending count +} + +var ( + _ MarkerHandler = (*markerHandler)(nil) +) + +func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler { + mh := &markerHandler{ + latestDataSegment: -1, + lastMarkedSegment: -1, + pendingDataSegments: make(map[int]int), + markerFileHandler: mfh, + } + + // Load the last marked segment from disk (if it exists). + if lastSegment := mh.markerFileHandler.LastMarkedSegment(); lastSegment >= 0 { + mh.lastMarkedSegment = lastSegment + } + + return mh +} + +func (mh *markerHandler) LastMarkedSegment() int { + return mh.markerFileHandler.LastMarkedSegment() +} + +// updatePendingData updates a counter for how much data is yet to be sent from each segment. +// "dataCount" will be added to the segment with ID "dataSegment". +func (mh *markerHandler) UpdatePendingData(dataCount, dataSegment int) { + mh.pendingDataMut.Lock() + defer mh.pendingDataMut.Unlock() + + mh.pendingDataSegments[dataSegment] += dataCount + + // We want to save segments whose data has been fully processed by the + // QueueManager. To avoid too much overhead when appending data, we'll only + // examine pending data per segment whenever a new segment is detected. + // + // This could cause our saved segment to lag behind multiple segments + // depending on the rate that new segments are created and how long + // data in other segments takes to be processed. + if dataSegment <= mh.latestDataSegment { + return + } + + // We've received data for a new segment. We'll use this as a signal to see + // if older segments have been fully processed. + // + // We want to mark the highest segment which has no more pending data and is + // newer than our last mark. + mh.latestDataSegment = dataSegment + + var markableSegment int + for segment, count := range mh.pendingDataSegments { + if segment >= dataSegment || count > 0 { + continue + } + + if segment > markableSegment { + markableSegment = segment + } + + // Clean up the pending map: the current segment has been completely + // consumed and doesn't need to be considered for marking again. + delete(mh.pendingDataSegments, segment) + } + + if markableSegment > mh.lastMarkedSegment { + mh.markerFileHandler.MarkSegment(markableSegment) + mh.lastMarkedSegment = markableSegment + } +} + +// processConsumedData updates a counter for how many samples have been sent for each segment. +// "data" is a map of segment index to consumed data count (e.g. number of samples). +func (mh *markerHandler) ProcessConsumedData(data map[int]int) { + mh.pendingDataMut.Lock() + defer mh.pendingDataMut.Unlock() + + for segment, count := range data { + if _, tracked := mh.pendingDataSegments[segment]; !tracked { + //TODO: How is it possible to not track it? This should log an error? + continue + } + mh.pendingDataSegments[segment] -= count + } +} + +func (mh *markerHandler) Stop() { + mh.markerFileHandler.Stop() +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3edd31b918..95ec8fd51c 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -413,6 +413,8 @@ type QueueManager struct { seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[chunks.HeadSeriesRef]int + markerHandler MarkerHandler + shards *shards numShards int reshardChan chan int @@ -426,6 +428,10 @@ type QueueManager struct { highestRecvTimestamp *maxTimestamp } +var ( + _ wlog.WriteTo = (*QueueManager)(nil) +) + // NewQueueManager builds a new QueueManager and starts a new // WAL watcher with queue manager as the WriteTo destination. // The WAL watcher takes the dir parameter as the base directory @@ -449,6 +455,7 @@ func NewQueueManager( sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, + markerHandler MarkerHandler, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -461,6 +468,7 @@ func NewQueueManager( }) logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint()) + t := &QueueManager{ logger: logger, flushDeadline: flushDeadline, @@ -476,6 +484,8 @@ func NewQueueManager( seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), + markerHandler: markerHandler, + numShards: cfg.MinShards, reshardChan: make(chan int), quit: make(chan struct{}), @@ -490,7 +500,7 @@ func NewQueueManager( highestRecvTimestamp: highestRecvTimestamp, } - t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) + t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, t.markerHandler, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) if t.mcfg.Send { t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } @@ -547,6 +557,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p ) begin := time.Now() + //TODO: Should metadata be part of the marker? err := t.storeClient.Store(ctx, req) t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) @@ -572,7 +583,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. -func (t *QueueManager) Append(samples []record.RefSample) bool { +func (t *QueueManager) Append(samples []record.RefSample, segment int) bool { outer: for _, s := range samples { t.seriesMtx.Lock() @@ -603,7 +614,9 @@ outer: timestamp: s.T, value: s.V, sType: tSample, + segment: segment, }) { + t.markerHandler.UpdatePendingData(len(samples), segment) continue outer } @@ -620,7 +633,7 @@ outer: return true } -func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool { +func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar, segment int) bool { if !t.sendExemplars { return true } @@ -648,13 +661,16 @@ outer: return false default: } + //TODO: Write a unit test where the enqueue is retried? if t.shards.enqueue(e.Ref, timeSeries{ seriesLabels: lbls, timestamp: e.T, value: e.V, exemplarLabels: e.Labels, sType: tExemplar, + segment: segment, }) { + t.markerHandler.UpdatePendingData(len(exemplars), segment) continue outer } @@ -669,7 +685,7 @@ outer: return true } -func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) bool { +func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample, segment int) bool { if !t.sendNativeHistograms { return true } @@ -701,7 +717,9 @@ outer: timestamp: h.T, histogram: h.H, sType: tHistogram, + segment: segment, }) { + t.markerHandler.UpdatePendingData(len(histograms), segment) continue outer } @@ -716,7 +734,7 @@ outer: return true } -func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample) bool { +func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample, segment int) bool { if !t.sendNativeHistograms { return true } @@ -748,7 +766,9 @@ outer: timestamp: h.T, floatHistogram: h.FH, sType: tFloatHistogram, + segment: segment, }) { + t.markerHandler.UpdatePendingData(len(floatHistograms), segment) continue outer } @@ -801,6 +821,7 @@ func (t *QueueManager) Stop() { if t.mcfg.Send { t.metadataWatcher.Stop() } + t.markerHandler.Stop() // On shutdown, release the strings in the labels from the intern pool. t.seriesMtx.Lock() @@ -1211,6 +1232,8 @@ type timeSeries struct { exemplarLabels labels.Labels // The type of series: sample, exemplar, or histogram. sType seriesType + // WAL segment number this timeSeries came from + segment int } type seriesType int @@ -1360,6 +1383,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } + // batchSegmentCount tracks the count of data per segment in our batch. + // After sending the batch (regardless of success), + // QueueManager.processConsumedData is called to recalculate the total + // amount of unprocessed data. + batchSegmentCount := make(map[int]int) + timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { @@ -1394,10 +1423,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData, batchSegmentCount) queue.ReturnForReuse(batch) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, pendingData[:n], batchSegmentCount, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1405,11 +1434,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData, batchSegmentCount) n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, pendingData[:n], batchSegmentCount, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1417,7 +1446,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } -func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) { +func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, batchSegmentCount map[int]int) (int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] @@ -1453,11 +1482,13 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) nPendingHistograms++ } + + batchSegmentCount[d.segment]++ } return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, batchSegmentCount map[int]int, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) if err != nil { @@ -1467,6 +1498,13 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) } + // Inform our queue manager about the data that got processed and clear out + // our map to prepare for the next batch. + s.qm.markerHandler.ProcessConsumedData(batchSegmentCount) + for segment := range batchSegmentCount { + delete(batchSegmentCount, segment) + } + // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure. s.qm.dataOut.incr(int64(len(samples))) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index b43258ff06..7361a59df7 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -142,10 +142,10 @@ func TestSampleDelivery(t *testing.T) { c.expectExemplars(exemplars[:len(exemplars)/2], series) c.expectHistograms(histograms[:len(histograms)/2], series) c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series) - qm.Append(samples[:len(samples)/2]) - qm.AppendExemplars(exemplars[:len(exemplars)/2]) - qm.AppendHistograms(histograms[:len(histograms)/2]) - qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) + qm.Append(samples[:len(samples)/2], 0) + qm.AppendExemplars(exemplars[:len(exemplars)/2], 0) + qm.AppendHistograms(histograms[:len(histograms)/2], 0) + qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2], 0) c.waitForExpectedData(t) // Send second half of data. @@ -153,10 +153,10 @@ func TestSampleDelivery(t *testing.T) { c.expectExemplars(exemplars[len(exemplars)/2:], series) c.expectHistograms(histograms[len(histograms)/2:], series) c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series) - qm.Append(samples[len(samples)/2:]) - qm.AppendExemplars(exemplars[len(exemplars)/2:]) - qm.AppendHistograms(histograms[len(histograms)/2:]) - qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:]) + qm.Append(samples[len(samples)/2:], 0) + qm.AppendExemplars(exemplars[len(exemplars)/2:], 0) + qm.AppendHistograms(histograms[len(histograms)/2:], 0) + qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:], 0) c.waitForExpectedData(t) }) } @@ -217,11 +217,13 @@ func TestSampleDeliveryTimeout(t *testing.T) { // Send the samples twice, waiting for the samples in the meantime. c.expectSamples(samples, series) - m.Append(samples) + m.Append(samples, 0) + //TODO: Check that the current marker is at -1? c.waitForExpectedData(t) c.expectSamples(samples, series) - m.Append(samples) + m.Append(samples, 1) + //TODO: Check that the current marker is at 0? c.waitForExpectedData(t) } @@ -258,7 +260,7 @@ func TestSampleDeliveryOrder(t *testing.T) { m.Start() defer m.Stop() // These should be received by the client. - m.Append(samples) + m.Append(samples, 0) c.waitForExpectedData(t) } @@ -280,7 +282,7 @@ func TestShutdown(t *testing.T) { // Append blocks to guarantee delivery, so we do it in the background. go func() { - m.Append(samples) + m.Append(samples, 0) }() time.Sleep(100 * time.Millisecond) @@ -347,7 +349,7 @@ func TestReshard(t *testing.T) { go func() { for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity { - sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity]) + sent := m.Append(samples[i:i+config.DefaultQueueConfig.Capacity], 0) require.True(t, sent, "samples not sent") time.Sleep(100 * time.Millisecond) } @@ -418,7 +420,7 @@ func TestReshardPartialBatch(t *testing.T) { for i := 0; i < 100; i++ { done := make(chan struct{}) go func() { - m.Append(samples) + m.Append(samples, 0) time.Sleep(batchSendDeadline) m.shards.stop() m.shards.start(1) @@ -464,7 +466,7 @@ func TestQueueFilledDeadlock(t *testing.T) { done := make(chan struct{}) go func() { time.Sleep(batchSendDeadline) - m.Append(samples) + m.Append(samples, 0) done <- struct{}{} }() select { @@ -914,7 +916,7 @@ func BenchmarkSampleSend(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - m.Append(samples) + m.Append(samples, 0) m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does m.SeriesReset(i + 1) } diff --git a/storage/remote/write.go b/storage/remote/write.go index 4b0a249014..95fe61335f 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -175,6 +175,18 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { continue } + logger := rws.logger + if logger == nil { + logger = log.NewNopLogger() + } + + markerFileHandler, err := NewMarkerFileHandler(logger, rws.dir, hash) + if err != nil { + return err + } + + markerHandler := NewMarkerHandler(markerFileHandler) + // Redacted to remove any passwords in the URL (that are // technically accepted but not recommended) since this is // only used for metric labels. @@ -183,7 +195,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { newQueueManagerMetrics(rws.reg, name, endpoint), rws.watcherMetrics, rws.liveReaderMetrics, - rws.logger, + logger, rws.dir, rws.samplesIn, rwConf.QueueConfig, @@ -197,6 +209,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, + markerHandler, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 221e9607ca..23e975d8a7 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -51,18 +51,18 @@ type WriteTo interface { // Append and AppendExemplar should block until the samples are fully accepted, // whether enqueued in memory or successfully written to it's final destination. // Once returned, the WAL Watcher will not attempt to pass that data again. - Append([]record.RefSample) bool - AppendExemplars([]record.RefExemplar) bool - AppendHistograms([]record.RefHistogramSample) bool - AppendFloatHistograms([]record.RefFloatHistogramSample) bool - StoreSeries([]record.RefSeries, int) + Append(samples []record.RefSample, segment int) bool + AppendExemplars(exemplars []record.RefExemplar, segment int) bool + AppendHistograms(histogramSamples []record.RefHistogramSample, segment int) bool + AppendFloatHistograms(floatHistogramSamples []record.RefFloatHistogramSample, segment int) bool + StoreSeries(series []record.RefSeries, segment int) // Next two methods are intended for garbage-collection: first we call // UpdateSeriesSegment on all current series - UpdateSeriesSegment([]record.RefSeries, int) + UpdateSeriesSegment(series []record.RefSeries, segment int) // Then SeriesReset is called to allow the deletion // of all series created in a segment lower than the argument. - SeriesReset(int) + SeriesReset(segment int) } // Used to notifier the watcher that data has been written so that it can read. @@ -70,6 +70,19 @@ type WriteNotified interface { Notify() } +// Marker allows the Watcher to start from a specific segment in the WAL. +// Implementers can use this interface to save and restore save points. +type Marker interface { + // LastMarkedSegment should return the last segment stored in the marker. + // Must return nil if there is no mark. + // + // The Watcher will start reading the first segment whose value is greater + // than the return value. + LastMarkedSegment() int + //TODO: Can it not just return -1? Is the *int some Go thing? Also, if it's -1 maybe the Watcher will just read from 0? + //TODO: Also, we anyway have to check for negative integers because this is not unsigned? +} + type WatcherMetrics struct { recordsRead *prometheus.CounterVec recordDecodeFails *prometheus.CounterVec @@ -82,6 +95,7 @@ type WatcherMetrics struct { type Watcher struct { name string writer WriteTo + marker Marker logger log.Logger walDir string lastCheckpoint string @@ -92,6 +106,7 @@ type Watcher struct { startTime time.Time startTimestamp int64 // the start time as a Prometheus timestamp + savedSegment int // Last tailed marker. Overrides startTimestamp. sendSamples bool recordsReadMetric *prometheus.CounterVec @@ -169,13 +184,15 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher { +// Reading will start at the segment returned by marker if marker is non-nil. +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, marker Marker, dir string, sendExemplars, sendHistograms bool) *Watcher { if logger == nil { logger = log.NewNopLogger() } return &Watcher{ logger: logger, writer: writer, + marker: marker, metrics: metrics, readerMetrics: readerMetrics, walDir: filepath.Join(dir, "wal"), @@ -189,6 +206,7 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge MaxSegment: -1, } + //TODO: Set savedSegment to -1? } func (w *Watcher) Notify() { @@ -246,7 +264,14 @@ func (w *Watcher) loop() { // We may encounter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { + if w.marker != nil { + w.savedSegment = w.marker.LastMarkedSegment() + //TODO: Does this print the int or the address of the int? + level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment) + } + w.SetStartTime(time.Now()) + if err := w.Run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -568,17 +593,17 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return err } for _, s := range samples { - if s.T > w.startTimestamp { + if w.canSendSamples(segmentNum, s.T) { if !w.sendSamples { w.sendSamples = true duration := time.Since(w.startTime) - level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration) + level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration, "segment", segmentNum) } samplesToSend = append(samplesToSend, s) } } if len(samplesToSend) > 0 { - w.writer.Append(samplesToSend) + w.writer.Append(samplesToSend, segmentNum) samplesToSend = samplesToSend[:0] } @@ -597,7 +622,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.recordDecodeFailsMetric.Inc() return err } - w.writer.AppendExemplars(exemplars) + w.writer.AppendExemplars(exemplars, segmentNum) case record.HistogramSamples: // Skip if experimental "histograms over remote write" is not enabled. @@ -623,7 +648,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } } if len(histogramsToSend) > 0 { - w.writer.AppendHistograms(histogramsToSend) + w.writer.AppendHistograms(histogramsToSend, segmentNum) histogramsToSend = histogramsToSend[:0] } case record.FloatHistogramSamples: @@ -650,7 +675,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } } if len(floatHistogramsToSend) > 0 { - w.writer.AppendFloatHistograms(floatHistogramsToSend) + w.writer.AppendFloatHistograms(floatHistogramsToSend, segmentNum) floatHistogramsToSend = floatHistogramsToSend[:0] } case record.Tombstones: @@ -663,6 +688,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) } +// TODO: Add a function description +func (w *Watcher) canSendSamples(segmentNum int, ts int64) bool { + //TODO: What if we have already replayed the WAL? Then segmentNum > *w.savedSegment + // will always return true, and we will never get to ts > w.startTimestamp? + if w.savedSegment >= 0 && segmentNum > w.savedSegment { + return true + } + return ts > w.startTimestamp +} + // Go through all series in a segment updating the segmentNum, so we can delete older series. // Used with readCheckpoint - implements segmentReadFn. func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 94b6a92d12..555e9a83fe 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -60,22 +60,22 @@ type writeToMock struct { seriesSegmentIndexes map[chunks.HeadSeriesRef]int } -func (wtm *writeToMock) Append(s []record.RefSample) bool { +func (wtm *writeToMock) Append(s []record.RefSample, segment int) bool { wtm.samplesAppended += len(s) return true } -func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { +func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar, segment int) bool { wtm.exemplarsAppended += len(e) return true } -func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { +func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample, segment int) bool { wtm.histogramsAppended += len(h) return true } -func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { +func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample, segment int) bool { wtm.floatHistogramsAppended += len(fh) return true } @@ -210,7 +210,7 @@ func TestTailSamples(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, true, true) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -295,7 +295,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false) go watcher.Start() expected := seriesCount @@ -384,7 +384,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { require.NoError(t, err) readTimeout = time.Second wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false) go watcher.Start() expected := seriesCount * 2 @@ -455,7 +455,7 @@ func TestReadCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false) go watcher.Start() expectedSeries := seriesCount @@ -524,7 +524,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -597,7 +597,7 @@ func TestCheckpointSeriesReset(t *testing.T) { readTimeout = time.Second wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false) watcher.MaxSegment = -1 go watcher.Start()