Merge pull request #6021 from prometheus/reload-checkpoint-async

Garbage collect asynchronously in the WAL Watcher
This commit is contained in:
Björn Rabenstein 2019-10-08 11:01:13 +02:00 committed by GitHub
commit fc945c4e8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 4 deletions

View file

@ -194,6 +194,7 @@ type QueueManager struct {
client StorageClient client StorageClient
watcher *wal.Watcher watcher *wal.Watcher
seriesMtx sync.Mutex
seriesLabels map[uint64]labels.Labels seriesLabels map[uint64]labels.Labels
seriesSegmentIndexes map[uint64]int seriesSegmentIndexes map[uint64]int
droppedSeries map[uint64]struct{} droppedSeries map[uint64]struct{}
@ -264,6 +265,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string
func (t *QueueManager) Append(samples []record.RefSample) bool { func (t *QueueManager) Append(samples []record.RefSample) bool {
outer: outer:
for _, s := range samples { for _, s := range samples {
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref] lbls, ok := t.seriesLabels[s.Ref]
if !ok { if !ok {
t.droppedSamplesTotal.Inc() t.droppedSamplesTotal.Inc()
@ -271,8 +273,10 @@ outer:
if _, ok := t.droppedSeries[s.Ref]; !ok { if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
} }
t.seriesMtx.Unlock()
continue continue
} }
t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded. // This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff backoff := t.cfg.MinBackoff
for { for {
@ -356,9 +360,11 @@ func (t *QueueManager) Stop() {
t.watcher.Stop() t.watcher.Stop()
// On shutdown, release the strings in the labels from the intern pool. // On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock()
for _, labels := range t.seriesLabels { for _, labels := range t.seriesLabels {
releaseLabels(labels) releaseLabels(labels)
} }
t.seriesMtx.Unlock()
// Delete metrics so we don't have alerts for queues that are gone. // Delete metrics so we don't have alerts for queues that are gone.
name := t.client.Name() name := t.client.Name()
queueHighestSentTimestamp.DeleteLabelValues(name) queueHighestSentTimestamp.DeleteLabelValues(name)
@ -378,6 +384,8 @@ func (t *QueueManager) Stop() {
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote. // StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
t.seriesMtx.Lock()
defer t.seriesMtx.Unlock()
for _, s := range series { for _, s := range series {
ls := processExternalLabels(s.Labels, t.externalLabels) ls := processExternalLabels(s.Labels, t.externalLabels)
lbls := relabel.Process(ls, t.relabelConfigs...) lbls := relabel.Process(ls, t.relabelConfigs...)
@ -402,6 +410,8 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
// stored series records with the checkpoints index number, so we can now // stored series records with the checkpoints index number, so we can now
// delete any ref ID's lower than that # from the two maps. // delete any ref ID's lower than that # from the two maps.
func (t *QueueManager) SeriesReset(index int) { func (t *QueueManager) SeriesReset(index int) {
t.seriesMtx.Lock()
defer t.seriesMtx.Unlock()
// Check for series that are in segments older than the checkpoint // Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint. // that were not also present in the checkpoint.
for k, v := range t.seriesSegmentIndexes { for k, v := range t.seriesSegmentIndexes {
@ -409,6 +419,7 @@ func (t *QueueManager) SeriesReset(index int) {
delete(t.seriesSegmentIndexes, k) delete(t.seriesSegmentIndexes, k)
releaseLabels(t.seriesLabels[k]) releaseLabels(t.seriesLabels[k])
delete(t.seriesLabels, k) delete(t.seriesLabels, k)
delete(t.droppedSeries, k)
} }
} }
} }

View file

@ -41,10 +41,13 @@ const (
) )
// WriteTo is an interface used by the Watcher to send the samples it's read // WriteTo is an interface used by the Watcher to send the samples it's read
// from the WAL on to somewhere else. // from the WAL on to somewhere else. Functions will be called concurrently
// and it is left to the implementer to make sure they are safe.
type WriteTo interface { type WriteTo interface {
Append([]record.RefSample) bool Append([]record.RefSample) bool
StoreSeries([]record.RefSeries, int) StoreSeries([]record.RefSeries, int)
// SeriesReset is called after reading a checkpoint to allow the deletion
// of all series created in a segment lower than the argument.
SeriesReset(int) SeriesReset(int)
} }
@ -337,6 +340,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
} }
} }
gcSem := make(chan struct{}, 1)
for { for {
select { select {
case <-w.quit: case <-w.quit:
@ -345,9 +349,21 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
case <-checkpointTicker.C: case <-checkpointTicker.C:
// Periodically check if there is a new checkpoint so we can garbage // Periodically check if there is a new checkpoint so we can garbage
// collect labels. As this is considered an optimisation, we ignore // collect labels. As this is considered an optimisation, we ignore
// errors during checkpoint processing. // errors during checkpoint processing. Doing the process asynchronously
if err := w.garbageCollectSeries(segmentNum); err != nil { // allows the current WAL segment to be processed while reading the
level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) // checkpoint.
select {
case gcSem <- struct{}{}:
go func() {
defer func() {
<-gcSem
}()
if err := w.garbageCollectSeries(segmentNum); err != nil {
level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err)
}
}()
default:
// Currently doing a garbage collect, try again later.
} }
case <-segmentTicker.C: case <-segmentTicker.C: