Robert's changes + async file write

This commit is contained in:
Paulin Todev 2023-06-28 12:32:05 +01:00
parent fe639b545f
commit b9ae36c3c3
7 changed files with 383 additions and 52 deletions

View file

@ -0,0 +1,131 @@
package remote
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/wlog"
)
type MarkerFileHandler interface {
wlog.Marker
MarkSegment(segment int)
Stop()
}
type markerFileHandler struct {
segmentToMark chan int
quit chan struct{}
logger log.Logger
lastMarkedSegmentFilePath string
}
var (
_ MarkerFileHandler = (*markerFileHandler)(nil)
)
func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) (MarkerFileHandler, error) {
markerDir := filepath.Join(walDir, "remote", markerId)
dir := filepath.Join(walDir, "remote", markerId)
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, fmt.Errorf("error creating segment marker folder %q: %w", dir, err)
}
mfh := &markerFileHandler{
segmentToMark: make(chan int, 1),
quit: make(chan struct{}),
logger: logger,
lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"),
}
go mfh.markSegmentAsync()
return mfh, nil
}
// LastMarkedSegment implements wlog.Marker.
func (mfh *markerFileHandler) LastMarkedSegment() int {
bb, err := ioutil.ReadFile(mfh.lastMarkedSegmentFilePath)
if os.IsNotExist(err) {
level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath)
return -1
} else if err != nil {
level.Error(mfh.logger).Log("msg", "could not access segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return -1
}
savedSegment, err := strconv.Atoi(string(bb))
if err != nil {
level.Error(mfh.logger).Log("msg", "could not read segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return -1
}
if savedSegment < 0 {
level.Error(mfh.logger).Log("msg", "invalid segment number inside marker file", "file", mfh.lastMarkedSegmentFilePath, "segment number", savedSegment)
return -1
}
return savedSegment
}
// MarkSegment implements MarkerHandler.
func (mfh *markerFileHandler) MarkSegment(segment int) {
var (
segmentText = strconv.Itoa(segment)
tmp = mfh.lastMarkedSegmentFilePath + ".tmp"
)
if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil {
level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err)
return
}
if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil {
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return
}
level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment)
}
// Stop implements MarkerHandler.
func (mfh *markerFileHandler) Stop() {
level.Debug(mfh.logger).Log("msg", "waiting for marker file handler to shut down...")
mfh.quit <- struct{}{}
}
func (mfh *markerFileHandler) markSegmentAsync() {
for {
select {
case segmentToMark := <-mfh.segmentToMark:
if segmentToMark >= 0 {
var (
segmentText = strconv.Itoa(segmentToMark)
tmp = mfh.lastMarkedSegmentFilePath + ".tmp"
)
if err := os.WriteFile(tmp, []byte(segmentText), 0o666); err != nil {
level.Error(mfh.logger).Log("msg", "could not create segment marker file", "file", tmp, "err", err)
return
}
if err := fileutil.Replace(tmp, mfh.lastMarkedSegmentFilePath); err != nil {
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return
}
level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segmentToMark)
}
case <-mfh.quit:
level.Debug(mfh.logger).Log("msg", "quitting marker handler")
return
}
}
}

View file

@ -0,0 +1,112 @@
package remote
import (
"sync"
"github.com/prometheus/prometheus/tsdb/wlog"
)
type MarkerHandler interface {
wlog.Marker
UpdatePendingData(dataCount, dataSegment int)
ProcessConsumedData(data map[int]int)
Stop()
}
type markerHandler struct {
markerFileHandler MarkerFileHandler
pendingDataMut sync.Mutex
latestDataSegment int
lastMarkedSegment int
pendingDataSegments map[int]int // Map of segment index to pending count
}
var (
_ MarkerHandler = (*markerHandler)(nil)
)
func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler {
mh := &markerHandler{
latestDataSegment: -1,
lastMarkedSegment: -1,
pendingDataSegments: make(map[int]int),
markerFileHandler: mfh,
}
// Load the last marked segment from disk (if it exists).
if lastSegment := mh.markerFileHandler.LastMarkedSegment(); lastSegment >= 0 {
mh.lastMarkedSegment = lastSegment
}
return mh
}
func (mh *markerHandler) LastMarkedSegment() int {
return mh.markerFileHandler.LastMarkedSegment()
}
// updatePendingData updates a counter for how much data is yet to be sent from each segment.
// "dataCount" will be added to the segment with ID "dataSegment".
func (mh *markerHandler) UpdatePendingData(dataCount, dataSegment int) {
mh.pendingDataMut.Lock()
defer mh.pendingDataMut.Unlock()
mh.pendingDataSegments[dataSegment] += dataCount
// We want to save segments whose data has been fully processed by the
// QueueManager. To avoid too much overhead when appending data, we'll only
// examine pending data per segment whenever a new segment is detected.
//
// This could cause our saved segment to lag behind multiple segments
// depending on the rate that new segments are created and how long
// data in other segments takes to be processed.
if dataSegment <= mh.latestDataSegment {
return
}
// We've received data for a new segment. We'll use this as a signal to see
// if older segments have been fully processed.
//
// We want to mark the highest segment which has no more pending data and is
// newer than our last mark.
mh.latestDataSegment = dataSegment
var markableSegment int
for segment, count := range mh.pendingDataSegments {
if segment >= dataSegment || count > 0 {
continue
}
if segment > markableSegment {
markableSegment = segment
}
// Clean up the pending map: the current segment has been completely
// consumed and doesn't need to be considered for marking again.
delete(mh.pendingDataSegments, segment)
}
if markableSegment > mh.lastMarkedSegment {
mh.markerFileHandler.MarkSegment(markableSegment)
mh.lastMarkedSegment = markableSegment
}
}
// processConsumedData updates a counter for how many samples have been sent for each segment.
// "data" is a map of segment index to consumed data count (e.g. number of samples).
func (mh *markerHandler) ProcessConsumedData(data map[int]int) {
mh.pendingDataMut.Lock()
defer mh.pendingDataMut.Unlock()
for segment, count := range data {
if _, tracked := mh.pendingDataSegments[segment]; !tracked {
//TODO: How is it possible to not track it? This should log an error?
continue
}
mh.pendingDataSegments[segment] -= count
}
}
func (mh *markerHandler) Stop() {
mh.markerFileHandler.Stop()
}

View file

@ -413,6 +413,8 @@ type QueueManager struct {
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
markerHandler MarkerHandler
shards *shards
numShards int
reshardChan chan int
@ -426,6 +428,10 @@ type QueueManager struct {
highestRecvTimestamp *maxTimestamp
}
var (
_ wlog.WriteTo = (*QueueManager)(nil)
)
// NewQueueManager builds a new QueueManager and starts a new
// WAL watcher with queue manager as the WriteTo destination.
// The WAL watcher takes the dir parameter as the base directory
@ -449,6 +455,7 @@ func NewQueueManager(
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
markerHandler MarkerHandler,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -461,6 +468,7 @@ func NewQueueManager(
})
logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint())
t := &QueueManager{
logger: logger,
flushDeadline: flushDeadline,
@ -476,6 +484,8 @@ func NewQueueManager(
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
markerHandler: markerHandler,
numShards: cfg.MinShards,
reshardChan: make(chan int),
quit: make(chan struct{}),
@ -490,7 +500,7 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp,
}
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, t.markerHandler, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
}
@ -547,6 +557,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
)
begin := time.Now()
//TODO: Should metadata be part of the marker?
err := t.storeClient.Store(ctx, req)
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -572,7 +583,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
func (t *QueueManager) Append(samples []record.RefSample, segment int) bool {
outer:
for _, s := range samples {
t.seriesMtx.Lock()
@ -603,7 +614,9 @@ outer:
timestamp: s.T,
value: s.V,
sType: tSample,
segment: segment,
}) {
t.markerHandler.UpdatePendingData(len(samples), segment)
continue outer
}
@ -620,7 +633,7 @@ outer:
return true
}
func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar, segment int) bool {
if !t.sendExemplars {
return true
}
@ -648,13 +661,16 @@ outer:
return false
default:
}
//TODO: Write a unit test where the enqueue is retried?
if t.shards.enqueue(e.Ref, timeSeries{
seriesLabels: lbls,
timestamp: e.T,
value: e.V,
exemplarLabels: e.Labels,
sType: tExemplar,
segment: segment,
}) {
t.markerHandler.UpdatePendingData(len(exemplars), segment)
continue outer
}
@ -669,7 +685,7 @@ outer:
return true
}
func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) bool {
func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample, segment int) bool {
if !t.sendNativeHistograms {
return true
}
@ -701,7 +717,9 @@ outer:
timestamp: h.T,
histogram: h.H,
sType: tHistogram,
segment: segment,
}) {
t.markerHandler.UpdatePendingData(len(histograms), segment)
continue outer
}
@ -716,7 +734,7 @@ outer:
return true
}
func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample) bool {
func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHistogramSample, segment int) bool {
if !t.sendNativeHistograms {
return true
}
@ -748,7 +766,9 @@ outer:
timestamp: h.T,
floatHistogram: h.FH,
sType: tFloatHistogram,
segment: segment,
}) {
t.markerHandler.UpdatePendingData(len(floatHistograms), segment)
continue outer
}
@ -801,6 +821,7 @@ func (t *QueueManager) Stop() {
if t.mcfg.Send {
t.metadataWatcher.Stop()
}
t.markerHandler.Stop()
// On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock()
@ -1211,6 +1232,8 @@ type timeSeries struct {
exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram.
sType seriesType
// WAL segment number this timeSeries came from
segment int
}
type seriesType int
@ -1360,6 +1383,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}
// batchSegmentCount tracks the count of data per segment in our batch.
// After sending the batch (regardless of success),
// QueueManager.processConsumedData is called to recalculate the total
// amount of unprocessed data.
batchSegmentCount := make(map[int]int)
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
if !timer.Stop() {
@ -1394,10 +1423,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok {
return
}
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData, batchSegmentCount)
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendSamples(ctx, pendingData[:n], batchSegmentCount, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1405,11 +1434,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData, batchSegmentCount)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendSamples(ctx, pendingData[:n], batchSegmentCount, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1417,7 +1446,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) {
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, batchSegmentCount map[int]int) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1453,11 +1482,13 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
batchSegmentCount[d.segment]++
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, batchSegmentCount map[int]int, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf)
if err != nil {
@ -1467,6 +1498,13 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
}
// Inform our queue manager about the data that got processed and clear out
// our map to prepare for the next batch.
s.qm.markerHandler.ProcessConsumedData(batchSegmentCount)
for segment := range batchSegmentCount {
delete(batchSegmentCount, segment)
}
// These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.dataOut.incr(int64(len(samples)))

View file

@ -142,10 +142,10 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
qm.Append(samples[:len(samples)/2], 0)
qm.AppendExemplars(exemplars[:len(exemplars)/2], 0)
qm.AppendHistograms(histograms[:len(histograms)/2], 0)
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2], 0)
c.waitForExpectedData(t)
// Send second half of data.
@ -153,10 +153,10 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series)
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
qm.Append(samples[len(samples)/2:], 0)
qm.AppendExemplars(exemplars[len(exemplars)/2:], 0)
qm.AppendHistograms(histograms[len(histograms)/2:], 0)
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:], 0)
c.waitForExpectedData(t)
})
}
@ -217,11 +217,13 @@ func TestSampleDeliveryTimeout(t *testing.T) {
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
m.Append(samples, 0)
//TODO: Check that the current marker is at -1?
c.waitForExpectedData(t)
c.expectSamples(samples, series)
m.Append(samples)
m.Append(samples, 1)
//TODO: Check that the current marker is at 0?
c.waitForExpectedData(t)
}
@ -258,7 +260,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(samples)
m.Append(samples, 0)
c.waitForExpectedData(t)
}
@ -280,7 +282,7 @@ func TestShutdown(t *testing.T) {
// Append blocks to guarantee delivery, so we do it in the background.
go func() {
m.Append(samples)
m.Append(samples, 0)
}()
time.Sleep(100 * time.Millisecond)
@ -347,7 +349,7 @@ func TestReshard(t *testing.T) {
go func() {
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
sent := m.Append(samples[i:i+config.DefaultQueueConfig.Capacity], 0)
require.True(t, sent, "samples not sent")
time.Sleep(100 * time.Millisecond)
}
@ -418,7 +420,7 @@ func TestReshardPartialBatch(t *testing.T) {
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
m.Append(samples)
m.Append(samples, 0)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
@ -464,7 +466,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(samples)
m.Append(samples, 0)
done <- struct{}{}
}()
select {
@ -914,7 +916,7 @@ func BenchmarkSampleSend(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Append(samples)
m.Append(samples, 0)
m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1)
}

View file

@ -175,6 +175,18 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
continue
}
logger := rws.logger
if logger == nil {
logger = log.NewNopLogger()
}
markerFileHandler, err := NewMarkerFileHandler(logger, rws.dir, hash)
if err != nil {
return err
}
markerHandler := NewMarkerHandler(markerFileHandler)
// Redacted to remove any passwords in the URL (that are
// technically accepted but not recommended) since this is
// only used for metric labels.
@ -183,7 +195,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newQueueManagerMetrics(rws.reg, name, endpoint),
rws.watcherMetrics,
rws.liveReaderMetrics,
rws.logger,
logger,
rws.dir,
rws.samplesIn,
rwConf.QueueConfig,
@ -197,6 +209,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.scraper,
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
markerHandler,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

View file

@ -51,18 +51,18 @@ type WriteTo interface {
// Append and AppendExemplar should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to it's final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogramSample) bool
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
StoreSeries([]record.RefSeries, int)
Append(samples []record.RefSample, segment int) bool
AppendExemplars(exemplars []record.RefExemplar, segment int) bool
AppendHistograms(histogramSamples []record.RefHistogramSample, segment int) bool
AppendFloatHistograms(floatHistogramSamples []record.RefFloatHistogramSample, segment int) bool
StoreSeries(series []record.RefSeries, segment int)
// Next two methods are intended for garbage-collection: first we call
// UpdateSeriesSegment on all current series
UpdateSeriesSegment([]record.RefSeries, int)
UpdateSeriesSegment(series []record.RefSeries, segment int)
// Then SeriesReset is called to allow the deletion
// of all series created in a segment lower than the argument.
SeriesReset(int)
SeriesReset(segment int)
}
// Used to notifier the watcher that data has been written so that it can read.
@ -70,6 +70,19 @@ type WriteNotified interface {
Notify()
}
// Marker allows the Watcher to start from a specific segment in the WAL.
// Implementers can use this interface to save and restore save points.
type Marker interface {
// LastMarkedSegment should return the last segment stored in the marker.
// Must return nil if there is no mark.
//
// The Watcher will start reading the first segment whose value is greater
// than the return value.
LastMarkedSegment() int
//TODO: Can it not just return -1? Is the *int some Go thing? Also, if it's -1 maybe the Watcher will just read from 0?
//TODO: Also, we anyway have to check for negative integers because this is not unsigned?
}
type WatcherMetrics struct {
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
@ -82,6 +95,7 @@ type WatcherMetrics struct {
type Watcher struct {
name string
writer WriteTo
marker Marker
logger log.Logger
walDir string
lastCheckpoint string
@ -92,6 +106,7 @@ type Watcher struct {
startTime time.Time
startTimestamp int64 // the start time as a Prometheus timestamp
savedSegment int // Last tailed marker. Overrides startTimestamp.
sendSamples bool
recordsReadMetric *prometheus.CounterVec
@ -169,13 +184,15 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
}
// NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher {
// Reading will start at the segment returned by marker if marker is non-nil.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, marker Marker, dir string, sendExemplars, sendHistograms bool) *Watcher {
if logger == nil {
logger = log.NewNopLogger()
}
return &Watcher{
logger: logger,
writer: writer,
marker: marker,
metrics: metrics,
readerMetrics: readerMetrics,
walDir: filepath.Join(dir, "wal"),
@ -189,6 +206,7 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
MaxSegment: -1,
}
//TODO: Set savedSegment to -1?
}
func (w *Watcher) Notify() {
@ -246,7 +264,14 @@ func (w *Watcher) loop() {
// We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {
if w.marker != nil {
w.savedSegment = w.marker.LastMarkedSegment()
//TODO: Does this print the int or the address of the int?
level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment)
}
w.SetStartTime(time.Now())
if err := w.Run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}
@ -568,17 +593,17 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return err
}
for _, s := range samples {
if s.T > w.startTimestamp {
if w.canSendSamples(segmentNum, s.T) {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration, "segment", segmentNum)
}
samplesToSend = append(samplesToSend, s)
}
}
if len(samplesToSend) > 0 {
w.writer.Append(samplesToSend)
w.writer.Append(samplesToSend, segmentNum)
samplesToSend = samplesToSend[:0]
}
@ -597,7 +622,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.AppendExemplars(exemplars)
w.writer.AppendExemplars(exemplars, segmentNum)
case record.HistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
@ -623,7 +648,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
}
}
if len(histogramsToSend) > 0 {
w.writer.AppendHistograms(histogramsToSend)
w.writer.AppendHistograms(histogramsToSend, segmentNum)
histogramsToSend = histogramsToSend[:0]
}
case record.FloatHistogramSamples:
@ -650,7 +675,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
}
}
if len(floatHistogramsToSend) > 0 {
w.writer.AppendFloatHistograms(floatHistogramsToSend)
w.writer.AppendFloatHistograms(floatHistogramsToSend, segmentNum)
floatHistogramsToSend = floatHistogramsToSend[:0]
}
case record.Tombstones:
@ -663,6 +688,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
}
// TODO: Add a function description
func (w *Watcher) canSendSamples(segmentNum int, ts int64) bool {
//TODO: What if we have already replayed the WAL? Then segmentNum > *w.savedSegment
// will always return true, and we will never get to ts > w.startTimestamp?
if w.savedSegment >= 0 && segmentNum > w.savedSegment {
return true
}
return ts > w.startTimestamp
}
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {

View file

@ -60,22 +60,22 @@ type writeToMock struct {
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
}
func (wtm *writeToMock) Append(s []record.RefSample) bool {
func (wtm *writeToMock) Append(s []record.RefSample, segment int) bool {
wtm.samplesAppended += len(s)
return true
}
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar, segment int) bool {
wtm.exemplarsAppended += len(e)
return true
}
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample, segment int) bool {
wtm.histogramsAppended += len(h)
return true
}
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample, segment int) bool {
wtm.floatHistogramsAppended += len(fh)
return true
}
@ -210,7 +210,7 @@ func TestTailSamples(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, true, true)
watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers.
@ -295,7 +295,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false)
go watcher.Start()
expected := seriesCount
@ -384,7 +384,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false)
go watcher.Start()
expected := seriesCount * 2
@ -455,7 +455,7 @@ func TestReadCheckpoint(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false)
go watcher.Start()
expectedSeries := seriesCount
@ -524,7 +524,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
}
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false)
watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
@ -597,7 +597,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
readTimeout = time.Second
wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, nil, dir, false, false)
watcher.MaxSegment = -1
go watcher.Start()