diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 655886b46..9631de1c9 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -352,10 +352,12 @@ type QueueManager struct { clientMtx sync.RWMutex storeClient WriteClient - seriesMtx sync.Mutex - seriesLabels map[uint64]labels.Labels + seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries. + seriesLabels map[uint64]labels.Labels + droppedSeries map[uint64]struct{} + + seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[uint64]int - droppedSeries map[uint64]struct{} shards *shards numShards int @@ -642,6 +644,8 @@ func (t *QueueManager) Stop() { func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { t.seriesMtx.Lock() defer t.seriesMtx.Unlock() + t.seriesSegmentMtx.Lock() + defer t.seriesSegmentMtx.Unlock() for _, s := range series { // Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking. t.seriesSegmentIndexes[s.Ref] = index @@ -664,12 +668,23 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { } } +// Update the segment number held against the series, so we can trim older ones in SeriesReset. +func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) { + t.seriesSegmentMtx.Lock() + defer t.seriesSegmentMtx.Unlock() + for _, s := range series { + t.seriesSegmentIndexes[s.Ref] = index + } +} + // SeriesReset is used when reading a checkpoint. WAL Watcher should have // stored series records with the checkpoints index number, so we can now // delete any ref ID's lower than that # from the two maps. func (t *QueueManager) SeriesReset(index int) { t.seriesMtx.Lock() defer t.seriesMtx.Unlock() + t.seriesSegmentMtx.Lock() + defer t.seriesSegmentMtx.Unlock() // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. for k, v := range t.seriesSegmentIndexes { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 94546fb1d..fe976fc1b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -474,7 +474,7 @@ func TestShouldReshard(t *testing.T) { } } -func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.RefSeries) { +func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) { samples := make([]record.RefSample, 0, numSamples) series := make([]record.RefSeries, 0, numSeries) for i := 0; i < numSeries; i++ { @@ -488,7 +488,7 @@ func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.R } series = append(series, record.RefSeries{ Ref: uint64(i), - Labels: labels.Labels{{Name: "__name__", Value: name}}, + Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...), }) } return samples, series @@ -709,10 +709,29 @@ func (c *TestBlockingWriteClient) Endpoint() string { } func BenchmarkSampleDelivery(b *testing.B) { - // Let's create an even number of send batches so we don't run into the - // batch timeout case. - n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 - samples, series := createTimeseries(n, n) + // Send one sample per series, which is the typical remote_write case + const numSamples = 1 + const numSeries = 10000 + + // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. + var extraLabels = labels.Labels{ + {Name: "kubernetes_io_arch", Value: "amd64"}, + {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, + {Name: "kubernetes_io_os", Value: "linux"}, + {Name: "container_name", Value: "some-name"}, + {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, + {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, + {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, + {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, + {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, + {Name: "job", Value: "kubernetes-cadvisor"}, + {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, + {Name: "monitor", Value: "prod"}, + {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, + {Name: "namespace", Value: "kube-system"}, + {Name: "pod_name", Value: "some-other-name-5j8s8"}, + } + samples, series := createTimeseries(numSamples, numSeries, extraLabels...) c := NewTestWriteClient() @@ -736,7 +755,9 @@ func BenchmarkSampleDelivery(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { c.expectDataCount(len(samples)) - m.Append(samples) + go m.Append(samples) + m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does + m.SeriesReset(i + 1) c.waitForExpectedDataCount() } // Do not include shutdown diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index fda5ad1e6..accefa7aa 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -48,7 +48,10 @@ type WriteTo interface { Append([]record.RefSample) bool AppendExemplars([]record.RefExemplar) bool StoreSeries([]record.RefSeries, int) - // SeriesReset is called after reading a checkpoint to allow the deletion + // Next two methods are intended for garbage-collection: first we call + // UpdateSeriesSegment on all current series + UpdateSeriesSegment([]record.RefSeries, int) + // Then SeriesReset is called to allow the deletion // of all series created in a segment lower than the argument. SeriesReset(int) } @@ -234,7 +237,7 @@ func (w *Watcher) Run() error { } if err == nil { - if err = w.readCheckpoint(lastCheckpoint); err != nil { + if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil { return errors.Wrap(err, "readCheckpoint") } } @@ -454,7 +457,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum) - if err = w.readCheckpoint(dir); err != nil { + if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil { return errors.Wrap(err, "readCheckpoint") } @@ -463,6 +466,8 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { return nil } +// Read from a segment and pass the details to w.writer. +// Also used with readCheckpoint - implements segmentReadFn. func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( dec record.Decoder @@ -538,6 +543,39 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) } +// Go through all series in a segment updating the segmentNum, so we can delete older series. +// Used with readCheckpoint - implements segmentReadFn. +func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { + var ( + dec record.Decoder + series []record.RefSeries + ) + for r.Next() && !isClosed(w.quit) { + rec := r.Record() + w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() + + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.UpdateSeriesSegment(series, segmentNum) + + // Ignore these; we're only interested in series. + case record.Samples: + case record.Exemplars: + case record.Tombstones: + + default: + // Could be corruption, or reading from a WAL from a newer Prometheus. + w.recordDecodeFailsMetric.Inc() + } + } + return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) +} + func (w *Watcher) SetStartTime(t time.Time) { w.startTime = t w.startTimestamp = timestamp.FromTime(t) @@ -556,8 +594,10 @@ func recordType(rt record.Type) string { } } +type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error + // Read all the series records from a Checkpoint directory. -func (w *Watcher) readCheckpoint(checkpointDir string) error { +func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error { level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir) index, err := checkpointNum(checkpointDir) if err != nil { @@ -582,7 +622,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error { defer sr.Close() r := NewLiveReader(w.logger, w.readerMetrics, sr) - if err := w.readSegment(r, index, false); errors.Cause(err) != io.EOF && err != nil { + if err := readFn(w, r, index, false); errors.Cause(err) != io.EOF && err != nil { return errors.Wrap(err, "readSegment") } diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index dcde251a2..3ff7d1ad2 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -66,6 +66,10 @@ func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { } func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { + wtm.UpdateSeriesSegment(series, index) +} + +func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) { wtm.seriesLock.Lock() defer wtm.seriesLock.Unlock() for _, s := range series { @@ -496,7 +500,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) require.NoError(t, err) - err = watcher.readCheckpoint(lastCheckpoint) + err = watcher.readCheckpoint(lastCheckpoint, (*Watcher).readSegment) require.NoError(t, err) }) }