some refactoring

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-07-12 16:46:26 -07:00
parent 3450572840
commit 1cf79e70da
5 changed files with 300 additions and 80 deletions

View file

@ -1,8 +1,6 @@
package remote
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
@ -22,40 +20,37 @@ type MarkerFileHandler interface {
type markerFileHandler struct {
segmentToMark chan int
quit chan struct{}
dir string
logger log.Logger
lastMarkedSegmentFilePath string
}
var (
_ MarkerFileHandler = (*markerFileHandler)(nil)
)
func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) (MarkerFileHandler, error) {
markerDir := filepath.Join(walDir, "remote", markerId)
func NewMarkerFileHandler(logger log.Logger, walDir, markerId string) MarkerFileHandler {
dir := filepath.Join(walDir, "remote", markerId)
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, fmt.Errorf("error creating segment marker folder %q: %w", dir, err)
}
mfh := &markerFileHandler{
segmentToMark: make(chan int, 1),
quit: make(chan struct{}),
logger: logger,
lastMarkedSegmentFilePath: filepath.Join(markerDir, "segment"),
dir: dir,
lastMarkedSegmentFilePath: filepath.Join(dir, "segment"),
}
//TODO: Should this be in a separate Start() function?
go mfh.markSegmentAsync()
return mfh, nil
return mfh
}
func (mfh *markerFileHandler) Start() {
go mfh.markSegmentAsync()
}
// LastMarkedSegment implements wlog.Marker.
func (mfh *markerFileHandler) LastMarkedSegment() int {
bb, err := ioutil.ReadFile(mfh.lastMarkedSegmentFilePath)
bb, err := os.ReadFile(mfh.lastMarkedSegmentFilePath)
if os.IsNotExist(err) {
level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath)
return -1

View file

@ -1,6 +1,9 @@
package remote
import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/tsdb/wlog"
)
@ -10,10 +13,13 @@ type MarkerHandler interface {
UpdateReceivedData(segmentId, dataCount int) // Data queued for sending
UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending
Start()
Stop()
}
type markerHandler struct {
logger log.Logger
clientName string
markerFileHandler MarkerFileHandler
lastMarkedSegment int
dataIOUpdate chan data
@ -26,12 +32,10 @@ type data struct {
dataCount int
}
var (
_ MarkerHandler = (*markerHandler)(nil)
)
func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler {
func NewMarkerHandler(logger log.Logger, clientName string, mfh MarkerFileHandler) MarkerHandler {
mh := &markerHandler{
logger: logger,
clientName: clientName,
lastMarkedSegment: -1, // Segment ID last marked on disk.
markerFileHandler: mfh,
//TODO: What is a good size for the channel?
@ -39,15 +43,16 @@ func NewMarkerHandler(mfh MarkerFileHandler) MarkerHandler {
quit: make(chan struct{}),
}
return mh
}
func (mh *markerHandler) Start() {
// Load the last marked segment from disk (if it exists).
if lastSegment := mh.markerFileHandler.LastMarkedSegment(); lastSegment >= 0 {
mh.lastMarkedSegment = lastSegment
}
//TODO: Should this be in a separate Start() function?
level.Info(mh.logger).Log("msg", "Starting WAL marker handler", "queue", mh.clientName)
go mh.updatePendingData()
return mh
}
func (mh *markerHandler) LastMarkedSegment() int {

View file

@ -428,10 +428,6 @@ type QueueManager struct {
highestRecvTimestamp *maxTimestamp
}
var (
_ wlog.WriteTo = (*QueueManager)(nil)
)
// NewQueueManager builds a new QueueManager and starts a new
// WAL watcher with queue manager as the WriteTo destination.
// The WAL watcher takes the dir parameter as the base directory
@ -455,7 +451,7 @@ func NewQueueManager(
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
markerHandler MarkerHandler,
markerDir string,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -484,8 +480,6 @@ func NewQueueManager(
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
markerHandler: markerHandler,
numShards: cfg.MinShards,
reshardChan: make(chan int),
quit: make(chan struct{}),
@ -505,6 +499,8 @@ func NewQueueManager(
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
}
t.shards = t.newShards()
markerFileHandler := NewMarkerFileHandler(logger, markerDir, client.Name())
t.markerHandler = NewMarkerHandler(logger, client.Name(), markerFileHandler)
return t
}
@ -796,6 +792,7 @@ func (t *QueueManager) Start() {
t.shards.start(t.numShards)
t.watcher.Start()
t.markerHandler.Start()
if t.mcfg.Send {
t.metadataWatcher.Start()
}
@ -821,6 +818,7 @@ func (t *QueueManager) Stop() {
if t.mcfg.Send {
t.metadataWatcher.Stop()
}
// we should stop the marker last so that it can update the marker based on any last batches the shards sent
t.markerHandler.Stop()
// On shutdown, release the strings in the labels from the intern pool.

View file

@ -16,6 +16,7 @@ package remote
import (
"context"
"fmt"
"github.com/prometheus/prometheus/model/timestamp"
"math"
"os"
"runtime/pprof"
@ -39,7 +40,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -167,13 +167,28 @@ func TestMetadataDelivery(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.Start()
defer m.Stop()
@ -211,10 +226,25 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -254,13 +284,28 @@ func TestSampleDeliveryOrder(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.StoreSeries(series, 0)
m.Start()
@ -276,13 +321,28 @@ func TestShutdown(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
deadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -317,12 +377,27 @@ func TestSeriesReset(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
deadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -350,10 +425,25 @@ func TestReshard(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.StoreSeries(series, 0)
m.Start()
@ -389,7 +479,24 @@ func TestReshardRaceWithStop(*testing.T) {
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m = NewQueueManager(metrics,
nil,
nil,
nil,
"",
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.Start()
h.Unlock()
h.Lock()
@ -424,7 +531,24 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics,
nil,
nil,
nil,
t.TempDir(),
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
flushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.StoreSeries(series, 0)
m.Start()
@ -469,7 +593,24 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics,
nil,
nil,
nil,
t.TempDir(),
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
flushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -496,7 +637,24 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics,
nil,
nil,
nil,
"",
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.Start()
defer m.Stop()
@ -543,7 +701,24 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics,
nil,
nil,
nil,
"",
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
client,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
client.Name())
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@ -686,14 +861,6 @@ func NewTestWriteClient() *TestWriteClient {
}
}
func NewTestMarkerHandler(t *testing.T, walDir, markerId string) MarkerHandler {
//TODO: Mock the filesystem?
markerFileHandler, err := NewMarkerFileHandler(log.NewNopLogger(), walDir, markerId)
require.NoError(t, err)
return NewMarkerHandler(markerFileHandler)
}
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
@ -927,7 +1094,24 @@ func BenchmarkSampleSend(b *testing.B) {
dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.StoreSeries(series, 0)
// These should be received by the client.
@ -971,9 +1155,24 @@ func BenchmarkStartup(b *testing.B) {
for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir,
m := NewQueueManager(metrics,
nil,
nil,
nil,
"",
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false)
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
@ -1054,11 +1253,26 @@ func TestCalculateDesiredShards(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
samplesIn,
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
@ -1133,11 +1347,26 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
dir := t.TempDir()
markerHandler := NewTestMarkerHandler(t, dir, "rw1")
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, markerHandler)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
samplesIn,
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
c.Name())
for _, tc := range []struct {
name string

View file

@ -180,13 +180,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
logger = log.NewNopLogger()
}
markerFileHandler, err := NewMarkerFileHandler(logger, rws.dir, hash)
if err != nil {
return err
}
markerHandler := NewMarkerHandler(markerFileHandler)
// Redacted to remove any passwords in the URL (that are
// technically accepted but not recommended) since this is
// only used for metric labels.
@ -209,7 +202,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.scraper,
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
markerHandler,
rws.dir,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)