Async marker handler

This commit is contained in:
Paulin Todev 2023-06-28 13:57:21 +01:00
parent b9ae36c3c3
commit 1ea38884bf
3 changed files with 81 additions and 74 deletions

View file

@ -47,6 +47,7 @@ func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) (MarkerFil
lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"), lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"),
} }
//TODO: Should this be in a separate Start() function?
go mfh.markSegmentAsync() go mfh.markSegmentAsync()
return mfh, nil return mfh, nil

View file

@ -1,24 +1,29 @@
package remote package remote
import ( import (
"sync"
"github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/tsdb/wlog"
) )
type MarkerHandler interface { type MarkerHandler interface {
wlog.Marker wlog.Marker
UpdatePendingData(dataCount, dataSegment int)
ProcessConsumedData(data map[int]int) UpdateReceivedData(segmentId, dataCount int) // Data queued for sending
UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending
Stop() Stop()
} }
type markerHandler struct { type markerHandler struct {
markerFileHandler MarkerFileHandler markerFileHandler MarkerFileHandler
pendingDataMut sync.Mutex lastMarkedSegment int
latestDataSegment int dataIOUpdate chan data
lastMarkedSegment int quit chan struct{}
pendingDataSegments map[int]int // Map of segment index to pending count }
// TODO: Rename this struct
type data struct {
segmentId int
dataCount int
} }
var ( var (
@ -27,10 +32,11 @@ var (
func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler { func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler {
mh := &markerHandler{ mh := &markerHandler{
latestDataSegment: -1, lastMarkedSegment: -1, // Segment ID last marked on disk.
lastMarkedSegment: -1, markerFileHandler: mfh,
pendingDataSegments: make(map[int]int), //TODO: What is a good size for the channel?
markerFileHandler: mfh, dataIOUpdate: make(chan data, 100),
quit: make(chan struct{}),
} }
// Load the last marked segment from disk (if it exists). // Load the last marked segment from disk (if it exists).
@ -38,6 +44,9 @@ func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler {
mh.lastMarkedSegment = lastSegment mh.lastMarkedSegment = lastSegment
} }
//TODO: Should this be in a separate Start() function?
go mh.updatePendingData()
return mh return mh
} }
@ -45,68 +54,62 @@ func (mh *markerHandler) LastMarkedSegment() int {
return mh.markerFileHandler.LastMarkedSegment() return mh.markerFileHandler.LastMarkedSegment()
} }
// updatePendingData updates a counter for how much data is yet to be sent from each segment. func (mh *markerHandler) UpdateReceivedData(segmentId, dataCount int) {
// "dataCount" will be added to the segment with ID "dataSegment". mh.dataIOUpdate <- data{
func (mh *markerHandler) UpdatePendingData(dataCount, dataSegment int) { segmentId: segmentId,
mh.pendingDataMut.Lock() dataCount: dataCount,
defer mh.pendingDataMut.Unlock()
mh.pendingDataSegments[dataSegment] += dataCount
// We want to save segments whose data has been fully processed by the
// QueueManager. To avoid too much overhead when appending data, we'll only
// examine pending data per segment whenever a new segment is detected.
//
// This could cause our saved segment to lag behind multiple segments
// depending on the rate that new segments are created and how long
// data in other segments takes to be processed.
if dataSegment <= mh.latestDataSegment {
return
}
// We've received data for a new segment. We'll use this as a signal to see
// if older segments have been fully processed.
//
// We want to mark the highest segment which has no more pending data and is
// newer than our last mark.
mh.latestDataSegment = dataSegment
var markableSegment int
for segment, count := range mh.pendingDataSegments {
if segment >= dataSegment || count > 0 {
continue
}
if segment > markableSegment {
markableSegment = segment
}
// Clean up the pending map: the current segment has been completely
// consumed and doesn't need to be considered for marking again.
delete(mh.pendingDataSegments, segment)
}
if markableSegment > mh.lastMarkedSegment {
mh.markerFileHandler.MarkSegment(markableSegment)
mh.lastMarkedSegment = markableSegment
} }
} }
// processConsumedData updates a counter for how many samples have been sent for each segment. func (mh *markerHandler) UpdateSentData(segmentId, dataCount int) {
// "data" is a map of segment index to consumed data count (e.g. number of samples). mh.dataIOUpdate <- data{
func (mh *markerHandler) ProcessConsumedData(data map[int]int) { segmentId: segmentId,
mh.pendingDataMut.Lock() dataCount: -1 * dataCount,
defer mh.pendingDataMut.Unlock()
for segment, count := range data {
if _, tracked := mh.pendingDataSegments[segment]; !tracked {
//TODO: How is it possible to not track it? This should log an error?
continue
}
mh.pendingDataSegments[segment] -= count
} }
} }
func (mh *markerHandler) Stop() { func (mh *markerHandler) Stop() {
// Firstly stop the Marker Handler, because it might want to use the Marker File Handler.
mh.quit <- struct{}{}
// Finally, stop the File Handler.
mh.markerFileHandler.Stop() mh.markerFileHandler.Stop()
} }
// updatePendingData updates a counter for how much data is yet to be sent from each segment.
// "dataCount" will be added to the segment with ID "dataSegment".
func (mh *markerHandler) updatePendingData() {
batchSegmentCount := make(map[int]int)
for {
select {
case <-mh.quit:
return
case dataUpdate := <-mh.dataIOUpdate:
batchSegmentCount[dataUpdate.segmentId] += dataUpdate.dataCount
}
markableSegment := -1
for segment, count := range batchSegmentCount {
//TODO: If count is less than 0, then log an error and remove the entry from the map?
if count != 0 {
continue
}
//TODO: Is it save to assume that just because a segment is 0 inside the map,
// all samples from it have been processed?
if segment > markableSegment {
markableSegment = segment
}
// Clean up the pending map: the current segment has been completely
// consumed and doesn't need to be considered for marking again.
delete(batchSegmentCount, segment)
}
if markableSegment > mh.lastMarkedSegment {
mh.markerFileHandler.MarkSegment(markableSegment)
mh.lastMarkedSegment = markableSegment
}
}
}

View file

@ -616,7 +616,7 @@ outer:
sType: tSample, sType: tSample,
segment: segment, segment: segment,
}) { }) {
t.markerHandler.UpdatePendingData(len(samples), segment) t.markerHandler.UpdateReceivedData(segment, len(samples))
continue outer continue outer
} }
@ -670,7 +670,7 @@ outer:
sType: tExemplar, sType: tExemplar,
segment: segment, segment: segment,
}) { }) {
t.markerHandler.UpdatePendingData(len(exemplars), segment) t.markerHandler.UpdateReceivedData(segment, len(exemplars))
continue outer continue outer
} }
@ -719,7 +719,7 @@ outer:
sType: tHistogram, sType: tHistogram,
segment: segment, segment: segment,
}) { }) {
t.markerHandler.UpdatePendingData(len(histograms), segment) t.markerHandler.UpdateReceivedData(segment, len(histograms))
continue outer continue outer
} }
@ -768,7 +768,7 @@ outer:
sType: tFloatHistogram, sType: tFloatHistogram,
segment: segment, segment: segment,
}) { }) {
t.markerHandler.UpdatePendingData(len(floatHistograms), segment) t.markerHandler.UpdateReceivedData(segment, len(floatHistograms))
continue outer continue outer
} }
@ -1500,7 +1500,10 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
// Inform our queue manager about the data that got processed and clear out // Inform our queue manager about the data that got processed and clear out
// our map to prepare for the next batch. // our map to prepare for the next batch.
s.qm.markerHandler.ProcessConsumedData(batchSegmentCount) // Even if the sending failed, we still have to move on with the WAL marker
for segment, count := range batchSegmentCount {
s.qm.markerHandler.UpdateSentData(segment, count)
}
for segment := range batchSegmentCount { for segment := range batchSegmentCount {
delete(batchSegmentCount, segment) delete(batchSegmentCount, segment)
} }