From d6f911b5114d0942b234acc8ca6b97b54d685979 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 12 Feb 2019 14:58:25 +0000 Subject: [PATCH] Factor out logging ratelimit & dedupe middleware. Signed-off-by: Tom Wilkie --- go.mod | 1 + pkg/logging/dedupe.go | 126 ++++++++++++++++++++++++++++++++ pkg/logging/dedupe_test.go | 46 ++++++++++++ pkg/logging/ratelimit.go | 38 ++++++++++ storage/remote/queue_manager.go | 17 +---- storage/remote/storage.go | 3 +- 6 files changed, 216 insertions(+), 15 deletions(-) create mode 100644 pkg/logging/dedupe.go create mode 100644 pkg/logging/dedupe_test.go create mode 100644 pkg/logging/ratelimit.go diff --git a/go.mod b/go.mod index e983dff38..04c036a62 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/getsentry/raven-go v0.1.0 // indirect github.com/go-ini/ini v1.21.1 // indirect github.com/go-kit/kit v0.8.0 + github.com/go-logfmt/logfmt v0.4.0 github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-sql-driver/mysql v1.4.0 // indirect github.com/gogo/protobuf v1.2.0 diff --git a/pkg/logging/dedupe.go b/pkg/logging/dedupe.go new file mode 100644 index 000000000..2b0c95c63 --- /dev/null +++ b/pkg/logging/dedupe.go @@ -0,0 +1,126 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package logging + +import ( + "bytes" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-logfmt/logfmt" +) + +const ( + garbageCollectEvery = 10 * time.Second + expireEntriesAfter = 1 * time.Minute +) + +type logfmtEncoder struct { + *logfmt.Encoder + buf bytes.Buffer +} + +var logfmtEncoderPool = sync.Pool{ + New: func() interface{} { + var enc logfmtEncoder + enc.Encoder = logfmt.NewEncoder(&enc.buf) + return &enc + }, +} + +// Deduper implement log.Logger, dedupes log lines. +type Deduper struct { + next log.Logger + repeat time.Duration + quit chan struct{} + mtx sync.RWMutex + seen map[string]time.Time +} + +// Dedupe log lines to next, only repeating every repeat duration. +func Dedupe(next log.Logger, repeat time.Duration) *Deduper { + d := &Deduper{ + next: next, + repeat: repeat, + quit: make(chan struct{}), + seen: map[string]time.Time{}, + } + go d.run() + return d +} + +// Stop the Deduper. +func (d *Deduper) Stop() { + close(d.quit) +} + +func (d *Deduper) run() { + ticker := time.NewTicker(garbageCollectEvery) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + d.mtx.Lock() + now := time.Now() + for line, seen := range d.seen { + if now.Sub(seen) > expireEntriesAfter { + delete(d.seen, line) + } + } + d.mtx.Unlock() + case <-d.quit: + return + } + } +} + +// Log implements log.Logger. +func (d *Deduper) Log(keyvals ...interface{}) error { + line, err := encode(keyvals...) + if err != nil { + return err + } + + d.mtx.RLock() + last, ok := d.seen[line] + d.mtx.RUnlock() + + if ok && time.Since(last) < d.repeat { + return nil + } + + d.mtx.Lock() + d.seen[line] = time.Now() + d.mtx.Unlock() + + return d.next.Log(keyvals...) +} + +func encode(keyvals ...interface{}) (string, error) { + enc := logfmtEncoderPool.Get().(*logfmtEncoder) + enc.buf.Reset() + defer logfmtEncoderPool.Put(enc) + + if err := enc.EncodeKeyvals(keyvals...); err != nil { + return "", err + } + + // Add newline to the end of the buffer + if err := enc.EndRecord(); err != nil { + return "", err + } + + return enc.buf.String(), nil +} diff --git a/pkg/logging/dedupe_test.go b/pkg/logging/dedupe_test.go new file mode 100644 index 000000000..37520e739 --- /dev/null +++ b/pkg/logging/dedupe_test.go @@ -0,0 +1,46 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package logging + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type counter int + +func (c *counter) Log(keyvals ...interface{}) error { + (*c)++ + return nil +} + +func TestDedupe(t *testing.T) { + var c counter + d := Dedupe(&c, 100*time.Millisecond) + defer d.Stop() + + // Log 10 times quickly, ensure they are deduped. + for i := 0; i < 10; i++ { + err := d.Log("msg", "hello") + require.NoError(t, err) + } + require.Equal(t, 1, int(c)) + + // Wait, then log again, make sure it is logged. + time.Sleep(200 * time.Millisecond) + err := d.Log("msg", "hello") + require.NoError(t, err) + require.Equal(t, 2, int(c)) +} diff --git a/pkg/logging/ratelimit.go b/pkg/logging/ratelimit.go new file mode 100644 index 000000000..86d315561 --- /dev/null +++ b/pkg/logging/ratelimit.go @@ -0,0 +1,38 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package logging + +import ( + "github.com/go-kit/kit/log" + "golang.org/x/time/rate" +) + +type ratelimiter struct { + limiter *rate.Limiter + next log.Logger +} + +// RateLimit write to a loger. +func RateLimit(next log.Logger, limit rate.Limit) log.Logger { + return &ratelimiter{ + limiter: rate.NewLimiter(limit, int(limit)), + next: next, + } +} + +func (r *ratelimiter) Log(keyvals ...interface{}) error { + if r.limiter.Allow() { + return r.next.Log(keyvals...) + } + return nil +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 33f8c4e9a..bc295cb93 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -21,8 +21,6 @@ import ( "sync/atomic" "time" - "golang.org/x/time/rate" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" @@ -50,10 +48,6 @@ const ( // Allow 30% too many shards before scaling down. shardToleranceFraction = 0.3 - - // Limit to 1 log event every 10s - logRateLimit = 0.1 - logBurst = 10 ) var ( @@ -194,7 +188,6 @@ type QueueManager struct { relabelConfigs []*pkgrelabel.Config client StorageClient queueName string - logLimiter *rate.Limiter watcher *WALWatcher lastSendTimestampMetric prometheus.Gauge highestSentTimestampMetric prometheus.Gauge @@ -244,7 +237,6 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high seriesSegmentIndexes: make(map[uint64]int), droppedSeries: make(map[uint64]struct{}), - logLimiter: rate.NewLimiter(logRateLimit, logBurst), numShards: cfg.MinShards, reshardChan: make(chan int), quit: make(chan struct{}), @@ -290,7 +282,7 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool { // If we have no labels for the series, due to relabelling or otherwise, don't send the sample. if _, ok := t.seriesLabels[sample.Ref]; !ok { droppedSamplesTotal.WithLabelValues(t.queueName).Inc() - if _, ok := t.droppedSeries[sample.Ref]; !ok && t.logLimiter.Allow() { + if _, ok := t.droppedSeries[sample.Ref]; !ok { level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", sample.Ref) } continue @@ -700,7 +692,7 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) { begin := time.Now() err := s.sendSamplesWithBackoff(ctx, samples) - if err != nil && s.qm.logLimiter.Allow() { + if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) } @@ -743,10 +735,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti return err } retriedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) - - if s.qm.logLimiter.Allow() { - level.Error(s.qm.logger).Log("err", err) - } + level.Error(s.qm.logger).Log("err", err) time.Sleep(time.Duration(backoff)) backoff = backoff * 2 diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 062a5c949..3b71cd11c 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/logging" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" ) @@ -59,7 +60,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal } shardUpdateDuration := 10 * time.Second s := &Storage{ - logger: l, + logger: logging.RateLimit(logging.Dedupe(l, 1*time.Minute), 1), localStartTimeCallback: stCallback, flushDeadline: flushDeadline, walDir: walDir,