more minor refactoring plus add the start of a test for the marker

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-07-14 18:51:08 -07:00
parent 1cf79e70da
commit 908c0d093c
4 changed files with 175 additions and 60 deletions

View file

@ -1,6 +1,7 @@
package remote package remote
import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -13,7 +14,7 @@ import (
type MarkerFileHandler interface { type MarkerFileHandler interface {
wlog.Marker wlog.Marker
MarkSegment(segment int) MarkSegment(segment int) error
Stop() Stop()
} }
@ -27,8 +28,8 @@ type markerFileHandler struct {
lastMarkedSegmentFilePath string lastMarkedSegmentFilePath string
} }
func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) MarkerFileHandler { func NewMarkerFileHandler(logger log.Logger, dir, markerId string) MarkerFileHandler {
dir := filepath.Join(walDir, "remote", markerId) //dir := filepath.Join(walDir, "remote", markerId)
mfh := &markerFileHandler{ mfh := &markerFileHandler{
segmentToMark: make(chan int, 1), segmentToMark: make(chan int, 1),
@ -39,14 +40,14 @@ func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) MarkerFile
} }
//TODO: Should this be in a separate Start() function? //TODO: Should this be in a separate Start() function?
go mfh.markSegmentAsync() //go mfh.markSegmentAsync()
return mfh return mfh
} }
func (mfh *markerFileHandler) Start() { //func (mfh *markerFileHandler) Start() {
go mfh.markSegmentAsync() // go mfh.markSegmentAsync()
} //}
// LastMarkedSegment implements wlog.Marker. // LastMarkedSegment implements wlog.Marker.
func (mfh *markerFileHandler) LastMarkedSegment() int { func (mfh *markerFileHandler) LastMarkedSegment() int {
@ -74,7 +75,7 @@ func (mfh *markerFileHandler) LastMarkedSegment() int {
} }
// MarkSegment implements MarkerHandler. // MarkSegment implements MarkerHandler.
func (mfh *markerFileHandler) MarkSegment(segment int) { func (mfh *markerFileHandler) MarkSegment(segment int) error {
var ( var (
segmentText = strconv.Itoa(segment) segmentText = strconv.Itoa(segment)
tmp = mfh.lastMarkedSegmentFilePath + ".tmp" tmp = mfh.lastMarkedSegmentFilePath + ".tmp"
@ -82,46 +83,50 @@ func (mfh *markerFileHandler) MarkSegment(segment int) {
if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil { if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil {
level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err) level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err)
return return err
} }
if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil { if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil {
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return return err
} }
level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment) level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment)
return fmt.Errorf("hello")
} }
// Stop implements MarkerHandler. // Stop implements MarkerHandler.
func (mfh *markerFileHandler) Stop() { func (mfh *markerFileHandler) Stop() {
level.Debug(mfh.logger).Log("msg", "waiting for marker file handler to shut down...") level.Debug(mfh.logger).Log("msg", "waiting for marker file handler to shut down...")
mfh.quit <- struct{}{} //mfh.quit <- struct{}{}
} }
func (mfh *markerFileHandler) markSegmentAsync() { //
for { //func (mfh *markerFileHandler) markSegmentAsync() {
select { // for {
case segmentToMark := <-mfh.segmentToMark: // select {
if segmentToMark >= 0 { // case segmentToMark := <-mfh.segmentToMark:
var ( // fmt.Println("got message to mark a file: ", segmentToMark)
segmentText = strconv.Itoa(segmentToMark) // if segmentToMark >= 0 {
tmp = mfh.lastMarkedSegmentFilePath + ".tmp" // var (
) // segmentText = strconv.Itoa(segmentToMark)
// tmp = mfh.lastMarkedSegmentFilePath + ".tmp"
if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil { // )
level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err) //
return // if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil {
} // fmt.Println("error: ", err)
if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil { // level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err)
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) // return
return // }
} // if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil {
// level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segmentToMark) // return
} // }
case <-mfh.quit: //
level.Debug(mfh.logger).Log("msg", "quitting marker handler") // level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segmentToMark)
return // }
} // case <-mfh.quit:
} // level.Debug(mfh.logger).Log("msg", "quitting marker handler")
} // return
// }
// }
//}

View file

@ -55,6 +55,14 @@ func (mh *markerHandler) Start() {
go mh.updatePendingData() go mh.updatePendingData()
} }
func (mh *markerHandler) Stop() {
// Firstly stop the Marker Handler, because it might want to use the Marker File Handler.
mh.quit <- struct{}{}
// Finally, stop the File Handler.
mh.markerFileHandler.Stop()
}
func (mh *markerHandler) LastMarkedSegment() int { func (mh *markerHandler) LastMarkedSegment() int {
return mh.markerFileHandler.LastMarkedSegment() return mh.markerFileHandler.LastMarkedSegment()
} }
@ -73,14 +81,6 @@ func (mh *markerHandler) UpdateSentData(segmentId, dataCount int) {
} }
} }
func (mh *markerHandler) Stop() {
// Firstly stop the Marker Handler, because it might want to use the Marker File Handler.
mh.quit <- struct{}{}
// Finally, stop the File Handler.
mh.markerFileHandler.Stop()
}
// updatePendingData updates a counter for how much data is yet to be sent from each segment. // updatePendingData updates a counter for how much data is yet to be sent from each segment.
// "dataCount" will be added to the segment with ID "dataSegment". // "dataCount" will be added to the segment with ID "dataSegment".
func (mh *markerHandler) updatePendingData() { func (mh *markerHandler) updatePendingData() {
@ -91,30 +91,34 @@ func (mh *markerHandler) updatePendingData() {
case <-mh.quit: case <-mh.quit:
return return
case dataUpdate := <-mh.dataIOUpdate: case dataUpdate := <-mh.dataIOUpdate:
//TODO: If count is less than 0, then log an error and remove the entry from the map?
batchSegmentCount[dataUpdate.segmentId] += dataUpdate.dataCount batchSegmentCount[dataUpdate.segmentId] += dataUpdate.dataCount
} }
markableSegment := -1 markableSegment := -1
for segment, count := range batchSegmentCount { for segment, count := range batchSegmentCount {
//TODO: If count is less than 0, then log an error and remove the entry from the map? if count > 0 {
if count != 0 {
continue continue
} }
//TODO: Is it save to assume that just because a segment is 0 inside the map, // TODO: should we just track the lowest segment ID with samples and the highest segment ID with samples?
// all samples from it have been processed? // then we can just check if the current segment was not the highest segment ID, ie is there any higher segment id with samples currently
if segment > markableSegment { // in reality the % of the time that there will be samples for more than 1 segment is almost 0
if segment > markableSegment && batchSegmentCount[segment+1] > 0 {
markableSegment = segment markableSegment = segment
}
// Clean up the pending map: the current segment has been completely // Clean up the pending map: the current segment has been completely
// consumed and doesn't need to be considered for marking again. // consumed and doesn't need to be considered for marking again.
// we probably need to have a smarter cleanup either on a time ticker
// or here we delete all keys that are a segment ID lower than us
delete(batchSegmentCount, segment) delete(batchSegmentCount, segment)
} }
}
if markableSegment > mh.lastMarkedSegment { if markableSegment > mh.lastMarkedSegment {
mh.markerFileHandler.MarkSegment(markableSegment) // how to handle error here?
if err := mh.markerFileHandler.MarkSegment(markableSegment); err == nil {
mh.lastMarkedSegment = markableSegment mh.lastMarkedSegment = markableSegment
} }
} }
}
} }

View file

@ -16,7 +16,9 @@ package remote
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math" "math"
"os"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -493,12 +495,16 @@ func NewQueueManager(
interner: interner, interner: interner,
highestRecvTimestamp: highestRecvTimestamp, highestRecvTimestamp: highestRecvTimestamp,
} }
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, t.markerHandler, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, t.markerHandler, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
if t.mcfg.Send { if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
} }
t.shards = t.newShards() t.shards = t.newShards()
// IMO we need to ensure the dir exists in another piece of the code, and handle the error there
err := os.MkdirAll(markerDir+client.Name(), 0o777)
if err != nil {
fmt.Println("error mkdir: ", err)
}
markerFileHandler := NewMarkerFileHandler(logger, markerDir, client.Name()) markerFileHandler := NewMarkerFileHandler(logger, markerDir, client.Name())
t.markerHandler = NewMarkerHandler(logger, client.Name(), markerFileHandler) t.markerHandler = NewMarkerHandler(logger, client.Name(), markerFileHandler)
@ -612,7 +618,7 @@ outer:
sType: tSample, sType: tSample,
segment: segment, segment: segment,
}) { }) {
t.markerHandler.UpdateReceivedData(segment, len(samples)) t.markerHandler.UpdateReceivedData(segment, 1)
continue outer continue outer
} }

View file

@ -79,7 +79,7 @@ func TestSampleDelivery(t *testing.T) {
n := 3 n := 3
dir := t.TempDir() dir := t.TempDir()
fmt.Println("creating storage, dir is: ", dir)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close() defer s.Close()
@ -315,6 +315,106 @@ func TestSampleDeliveryOrder(t *testing.T) {
c.waitForExpectedData(t) c.waitForExpectedData(t)
} }
func TestSegmentMarker(t *testing.T) {
testcases := []struct {
name string
samples bool
exemplars bool
histograms bool
floatHistograms bool
}{
// todo: test with multiple data types
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
//{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
//{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
//{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
//{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
}
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := 3
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
// We need to set URL's so that metric creation doesn't panic.
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = queueConfig
writeConfig.SendExemplars = true
writeConfig.SendNativeHistograms = true
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
writeConfig,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var (
series []record.RefSeries
samples []record.RefSample
//exemplars []record.RefExemplar
//histograms []record.RefHistogramSample
//floatHistograms []record.RefFloatHistogramSample
)
// Generates same series in both cases.
if tc.samples {
samples, series = createTimeseries(n, n)
}
//if tc.exemplars {
// exemplars, series = createExemplars(n, n)
//}
//if tc.histograms {
// histograms, _, series = createHistograms(n, n, false)
//}
//if tc.floatHistograms {
// _, floatHistograms, series = createHistograms(n, n, true)
//}
// Apply new config.
queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples) / 2
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient()
qm.SetClient(c)
qm.StoreSeries(series, 0)
// Send first half of data.
c.expectSamples(samples[:len(samples)/2], series)
qm.Append(samples[:len(samples)/4], 0)
qm.Append(samples[len(samples)/4:len(samples)/2], 1)
c.waitForExpectedData(t)
// last marked segment should be 0, since we can't know for sure that we won't get more data in segment 1
require.Eventually(t, func() bool { return qm.markerHandler.LastMarkedSegment() == 0 }, 2*time.Second, 100*time.Millisecond, "LastMarkedSegment was never updated to the expected value: %d", 0)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
//c.expectExemplars(exemplars[len(exemplars)/2:], series)
//c.expectHistograms(histograms[len(histograms)/2:], series)
//c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
//qm.Append(samples[len(samples)/2:], 0)
//qm.AppendExemplars(exemplars[len(exemplars)/2:], 0)
//qm.AppendHistograms(histograms[len(histograms)/2:], 0)
//qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:], 0)
//c.waitForExpectedData(t)
})
}
}
func TestShutdown(t *testing.T) { func TestShutdown(t *testing.T) {
deadline := 1 * time.Second deadline := 1 * time.Second
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()