remote_write: reduce blocking from garbage-collect of series (#9109)

* Refactor: pass segment-reading function as param

To allow a different implementation to be used when garbage-collecting.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* remote_write: reduce blocking from GC of series

Add a method `UpdateSeriesSegment()` which is used together with
`SeriesReset()` to garbage-collect old series. This allows us to
split the lock around queueManager series data and avoid blocking
`Append()` while reading series from the last checkpoint.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Cosmetic: review feedback on comments

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* remote-write benchmark: include GC of series

Reduce the total number of samples per iteration from 5000*5000
(25 million) which is too big for my laptop, to 1*10000.

Extend `createTimeseries()` to add additional labels, so that the
queue manager is doing more realistic work.

Move the Append() call to a background goroutine - this works because
TestWriteClient uses a WaitGroup to signal completion.

Call `StoreSeries()` and `SeriesReset()` while adding samples, to
simulate the garbage-collection that wal.Watcher does.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Change BenchmarkSampleDelivery to call UpdateSeriesSegment

This matches what Watcher.garbageCollectSeries() is doing now

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2021-07-27 21:21:48 +01:00 committed by GitHub
parent 5cc30b716e
commit 60804c5a09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 16 deletions

View file

@ -352,10 +352,12 @@ type QueueManager struct {
clientMtx sync.RWMutex clientMtx sync.RWMutex
storeClient WriteClient storeClient WriteClient
seriesMtx sync.Mutex seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries.
seriesLabels map[uint64]labels.Labels seriesLabels map[uint64]labels.Labels
droppedSeries map[uint64]struct{}
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[uint64]int seriesSegmentIndexes map[uint64]int
droppedSeries map[uint64]struct{}
shards *shards shards *shards
numShards int numShards int
@ -642,6 +644,8 @@ func (t *QueueManager) Stop() {
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
t.seriesMtx.Lock() t.seriesMtx.Lock()
defer t.seriesMtx.Unlock() defer t.seriesMtx.Unlock()
t.seriesSegmentMtx.Lock()
defer t.seriesSegmentMtx.Unlock()
for _, s := range series { for _, s := range series {
// Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking. // Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking.
t.seriesSegmentIndexes[s.Ref] = index t.seriesSegmentIndexes[s.Ref] = index
@ -664,12 +668,23 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
} }
} }
// Update the segment number held against the series, so we can trim older ones in SeriesReset.
func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) {
t.seriesSegmentMtx.Lock()
defer t.seriesSegmentMtx.Unlock()
for _, s := range series {
t.seriesSegmentIndexes[s.Ref] = index
}
}
// SeriesReset is used when reading a checkpoint. WAL Watcher should have // SeriesReset is used when reading a checkpoint. WAL Watcher should have
// stored series records with the checkpoints index number, so we can now // stored series records with the checkpoints index number, so we can now
// delete any ref ID's lower than that # from the two maps. // delete any ref ID's lower than that # from the two maps.
func (t *QueueManager) SeriesReset(index int) { func (t *QueueManager) SeriesReset(index int) {
t.seriesMtx.Lock() t.seriesMtx.Lock()
defer t.seriesMtx.Unlock() defer t.seriesMtx.Unlock()
t.seriesSegmentMtx.Lock()
defer t.seriesSegmentMtx.Unlock()
// Check for series that are in segments older than the checkpoint // Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint. // that were not also present in the checkpoint.
for k, v := range t.seriesSegmentIndexes { for k, v := range t.seriesSegmentIndexes {

View file

@ -474,7 +474,7 @@ func TestShouldReshard(t *testing.T) {
} }
} }
func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.RefSeries) { func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples) samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries) series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ { for i := 0; i < numSeries; i++ {
@ -488,7 +488,7 @@ func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.R
} }
series = append(series, record.RefSeries{ series = append(series, record.RefSeries{
Ref: uint64(i), Ref: uint64(i),
Labels: labels.Labels{{Name: "__name__", Value: name}}, Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...),
}) })
} }
return samples, series return samples, series
@ -709,10 +709,29 @@ func (c *TestBlockingWriteClient) Endpoint() string {
} }
func BenchmarkSampleDelivery(b *testing.B) { func BenchmarkSampleDelivery(b *testing.B) {
// Let's create an even number of send batches so we don't run into the // Send one sample per series, which is the typical remote_write case
// batch timeout case. const numSamples = 1
n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 const numSeries = 10000
samples, series := createTimeseries(n, n)
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
var extraLabels = labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
c := NewTestWriteClient() c := NewTestWriteClient()
@ -736,7 +755,9 @@ func BenchmarkSampleDelivery(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
c.expectDataCount(len(samples)) c.expectDataCount(len(samples))
m.Append(samples) go m.Append(samples)
m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1)
c.waitForExpectedDataCount() c.waitForExpectedDataCount()
} }
// Do not include shutdown // Do not include shutdown

View file

@ -48,7 +48,10 @@ type WriteTo interface {
Append([]record.RefSample) bool Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool AppendExemplars([]record.RefExemplar) bool
StoreSeries([]record.RefSeries, int) StoreSeries([]record.RefSeries, int)
// SeriesReset is called after reading a checkpoint to allow the deletion // Next two methods are intended for garbage-collection: first we call
// UpdateSeriesSegment on all current series
UpdateSeriesSegment([]record.RefSeries, int)
// Then SeriesReset is called to allow the deletion
// of all series created in a segment lower than the argument. // of all series created in a segment lower than the argument.
SeriesReset(int) SeriesReset(int)
} }
@ -234,7 +237,7 @@ func (w *Watcher) Run() error {
} }
if err == nil { if err == nil {
if err = w.readCheckpoint(lastCheckpoint); err != nil { if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil {
return errors.Wrap(err, "readCheckpoint") return errors.Wrap(err, "readCheckpoint")
} }
} }
@ -454,7 +457,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum) level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum)
if err = w.readCheckpoint(dir); err != nil { if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil {
return errors.Wrap(err, "readCheckpoint") return errors.Wrap(err, "readCheckpoint")
} }
@ -463,6 +466,8 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
return nil return nil
} }
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var ( var (
dec record.Decoder dec record.Decoder
@ -538,6 +543,39 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
} }
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
var (
dec record.Decoder
series []record.RefSeries
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.UpdateSeriesSegment(series, segmentNum)
// Ignore these; we're only interested in series.
case record.Samples:
case record.Exemplars:
case record.Tombstones:
default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
}
}
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
}
func (w *Watcher) SetStartTime(t time.Time) { func (w *Watcher) SetStartTime(t time.Time) {
w.startTime = t w.startTime = t
w.startTimestamp = timestamp.FromTime(t) w.startTimestamp = timestamp.FromTime(t)
@ -556,8 +594,10 @@ func recordType(rt record.Type) string {
} }
} }
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
// Read all the series records from a Checkpoint directory. // Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string) error { func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir) level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
index, err := checkpointNum(checkpointDir) index, err := checkpointNum(checkpointDir)
if err != nil { if err != nil {
@ -582,7 +622,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error {
defer sr.Close() defer sr.Close()
r := NewLiveReader(w.logger, w.readerMetrics, sr) r := NewLiveReader(w.logger, w.readerMetrics, sr)
if err := w.readSegment(r, index, false); errors.Cause(err) != io.EOF && err != nil { if err := readFn(w, r, index, false); errors.Cause(err) != io.EOF && err != nil {
return errors.Wrap(err, "readSegment") return errors.Wrap(err, "readSegment")
} }

View file

@ -66,6 +66,10 @@ func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
} }
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.UpdateSeriesSegment(series, index)
}
func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) {
wtm.seriesLock.Lock() wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock() defer wtm.seriesLock.Unlock()
for _, s := range series { for _, s := range series {
@ -496,7 +500,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) lastCheckpoint, _, err := LastCheckpoint(watcher.walDir)
require.NoError(t, err) require.NoError(t, err)
err = watcher.readCheckpoint(lastCheckpoint) err = watcher.readCheckpoint(lastCheckpoint, (*Watcher).readSegment)
require.NoError(t, err) require.NoError(t, err)
}) })
} }