From 1ea38884bf8bb617cdcd57d0c2d90f6109910b32 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 28 Jun 2023 13:57:21 +0100 Subject: [PATCH] Async marker handler --- storage/remote/marker_file_handler.go | 1 + storage/remote/marker_handler.go | 141 +++++++++++++------------- storage/remote/queue_manager.go | 13 ++- 3 files changed, 81 insertions(+), 74 deletions(-) diff --git a/storage/remote/marker_file_handler.go b/storage/remote/marker_file_handler.go index f7b62cf5f3..37e0106750 100644 --- a/storage/remote/marker_file_handler.go +++ b/storage/remote/marker_file_handler.go @@ -47,6 +47,7 @@ func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) (MarkerFil lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"), } + //TODO: Should this be in a separate Start() function? go mfh.markSegmentAsync() return mfh, nil diff --git a/storage/remote/marker_handler.go b/storage/remote/marker_handler.go index 59fe09b983..998acff16b 100644 --- a/storage/remote/marker_handler.go +++ b/storage/remote/marker_handler.go @@ -1,24 +1,29 @@ package remote import ( - "sync" - "github.com/prometheus/prometheus/tsdb/wlog" ) type MarkerHandler interface { wlog.Marker - UpdatePendingData(dataCount, dataSegment int) - ProcessConsumedData(data map[int]int) + + UpdateReceivedData(segmentId, dataCount int) // Data queued for sending + UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending + Stop() } type markerHandler struct { - markerFileHandler MarkerFileHandler - pendingDataMut sync.Mutex - latestDataSegment int - lastMarkedSegment int - pendingDataSegments map[int]int // Map of segment index to pending count + markerFileHandler MarkerFileHandler + lastMarkedSegment int + dataIOUpdate chan data + quit chan struct{} +} + +// TODO: Rename this struct +type data struct { + segmentId int + dataCount int } var ( @@ -27,10 +32,11 @@ var ( func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler { mh := &markerHandler{ - latestDataSegment: -1, - lastMarkedSegment: -1, - pendingDataSegments: make(map[int]int), - markerFileHandler: mfh, + lastMarkedSegment: -1, // Segment ID last marked on disk. + markerFileHandler: mfh, + //TODO: What is a good size for the channel? + dataIOUpdate: make(chan data, 100), + quit: make(chan struct{}), } // Load the last marked segment from disk (if it exists). @@ -38,6 +44,9 @@ func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler { mh.lastMarkedSegment = lastSegment } + //TODO: Should this be in a separate Start() function? + go mh.updatePendingData() + return mh } @@ -45,68 +54,62 @@ func (mh *markerHandler) LastMarkedSegment() int { return mh.markerFileHandler.LastMarkedSegment() } -// updatePendingData updates a counter for how much data is yet to be sent from each segment. -// "dataCount" will be added to the segment with ID "dataSegment". -func (mh *markerHandler) UpdatePendingData(dataCount, dataSegment int) { - mh.pendingDataMut.Lock() - defer mh.pendingDataMut.Unlock() - - mh.pendingDataSegments[dataSegment] += dataCount - - // We want to save segments whose data has been fully processed by the - // QueueManager. To avoid too much overhead when appending data, we'll only - // examine pending data per segment whenever a new segment is detected. - // - // This could cause our saved segment to lag behind multiple segments - // depending on the rate that new segments are created and how long - // data in other segments takes to be processed. - if dataSegment <= mh.latestDataSegment { - return - } - - // We've received data for a new segment. We'll use this as a signal to see - // if older segments have been fully processed. - // - // We want to mark the highest segment which has no more pending data and is - // newer than our last mark. - mh.latestDataSegment = dataSegment - - var markableSegment int - for segment, count := range mh.pendingDataSegments { - if segment >= dataSegment || count > 0 { - continue - } - - if segment > markableSegment { - markableSegment = segment - } - - // Clean up the pending map: the current segment has been completely - // consumed and doesn't need to be considered for marking again. - delete(mh.pendingDataSegments, segment) - } - - if markableSegment > mh.lastMarkedSegment { - mh.markerFileHandler.MarkSegment(markableSegment) - mh.lastMarkedSegment = markableSegment +func (mh *markerHandler) UpdateReceivedData(segmentId, dataCount int) { + mh.dataIOUpdate <- data{ + segmentId: segmentId, + dataCount: dataCount, } } -// processConsumedData updates a counter for how many samples have been sent for each segment. -// "data" is a map of segment index to consumed data count (e.g. number of samples). -func (mh *markerHandler) ProcessConsumedData(data map[int]int) { - mh.pendingDataMut.Lock() - defer mh.pendingDataMut.Unlock() - - for segment, count := range data { - if _, tracked := mh.pendingDataSegments[segment]; !tracked { - //TODO: How is it possible to not track it? This should log an error? - continue - } - mh.pendingDataSegments[segment] -= count +func (mh *markerHandler) UpdateSentData(segmentId, dataCount int) { + mh.dataIOUpdate <- data{ + segmentId: segmentId, + dataCount: -1 * dataCount, } } func (mh *markerHandler) Stop() { + // Firstly stop the Marker Handler, because it might want to use the Marker File Handler. + mh.quit <- struct{}{} + + // Finally, stop the File Handler. mh.markerFileHandler.Stop() } + +// updatePendingData updates a counter for how much data is yet to be sent from each segment. +// "dataCount" will be added to the segment with ID "dataSegment". +func (mh *markerHandler) updatePendingData() { + batchSegmentCount := make(map[int]int) + + for { + select { + case <-mh.quit: + return + case dataUpdate := <-mh.dataIOUpdate: + batchSegmentCount[dataUpdate.segmentId] += dataUpdate.dataCount + } + + markableSegment := -1 + for segment, count := range batchSegmentCount { + //TODO: If count is less than 0, then log an error and remove the entry from the map? + if count != 0 { + continue + } + + //TODO: Is it save to assume that just because a segment is 0 inside the map, + // all samples from it have been processed? + if segment > markableSegment { + markableSegment = segment + } + + // Clean up the pending map: the current segment has been completely + // consumed and doesn't need to be considered for marking again. + delete(batchSegmentCount, segment) + } + + if markableSegment > mh.lastMarkedSegment { + mh.markerFileHandler.MarkSegment(markableSegment) + mh.lastMarkedSegment = markableSegment + } + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 95ec8fd51c..fb65cac017 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -616,7 +616,7 @@ outer: sType: tSample, segment: segment, }) { - t.markerHandler.UpdatePendingData(len(samples), segment) + t.markerHandler.UpdateReceivedData(segment, len(samples)) continue outer } @@ -670,7 +670,7 @@ outer: sType: tExemplar, segment: segment, }) { - t.markerHandler.UpdatePendingData(len(exemplars), segment) + t.markerHandler.UpdateReceivedData(segment, len(exemplars)) continue outer } @@ -719,7 +719,7 @@ outer: sType: tHistogram, segment: segment, }) { - t.markerHandler.UpdatePendingData(len(histograms), segment) + t.markerHandler.UpdateReceivedData(segment, len(histograms)) continue outer } @@ -768,7 +768,7 @@ outer: sType: tFloatHistogram, segment: segment, }) { - t.markerHandler.UpdatePendingData(len(floatHistograms), segment) + t.markerHandler.UpdateReceivedData(segment, len(floatHistograms)) continue outer } @@ -1500,7 +1500,10 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b // Inform our queue manager about the data that got processed and clear out // our map to prepare for the next batch. - s.qm.markerHandler.ProcessConsumedData(batchSegmentCount) + // Even if the sending failed, we still have to move on with the WAL marker + for segment, count := range batchSegmentCount { + s.qm.markerHandler.UpdateSentData(segment, count) + } for segment := range batchSegmentCount { delete(batchSegmentCount, segment) }