Factor out logging ratelimit & dedupe middleware.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2019-02-12 14:58:25 +00:00 committed by Tom Wilkie
parent a5c20642b3
commit d6f911b511
6 changed files with 216 additions and 15 deletions

1
go.mod
View file

@ -29,6 +29,7 @@ require (
github.com/getsentry/raven-go v0.1.0 // indirect
github.com/go-ini/ini v1.21.1 // indirect
github.com/go-kit/kit v0.8.0
github.com/go-logfmt/logfmt v0.4.0
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-sql-driver/mysql v1.4.0 // indirect
github.com/gogo/protobuf v1.2.0

126
pkg/logging/dedupe.go Normal file
View file

@ -0,0 +1,126 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (
"bytes"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-logfmt/logfmt"
)
const (
garbageCollectEvery = 10 * time.Second
expireEntriesAfter = 1 * time.Minute
)
type logfmtEncoder struct {
*logfmt.Encoder
buf bytes.Buffer
}
var logfmtEncoderPool = sync.Pool{
New: func() interface{} {
var enc logfmtEncoder
enc.Encoder = logfmt.NewEncoder(&enc.buf)
return &enc
},
}
// Deduper implement log.Logger, dedupes log lines.
type Deduper struct {
next log.Logger
repeat time.Duration
quit chan struct{}
mtx sync.RWMutex
seen map[string]time.Time
}
// Dedupe log lines to next, only repeating every repeat duration.
func Dedupe(next log.Logger, repeat time.Duration) *Deduper {
d := &Deduper{
next: next,
repeat: repeat,
quit: make(chan struct{}),
seen: map[string]time.Time{},
}
go d.run()
return d
}
// Stop the Deduper.
func (d *Deduper) Stop() {
close(d.quit)
}
func (d *Deduper) run() {
ticker := time.NewTicker(garbageCollectEvery)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.mtx.Lock()
now := time.Now()
for line, seen := range d.seen {
if now.Sub(seen) > expireEntriesAfter {
delete(d.seen, line)
}
}
d.mtx.Unlock()
case <-d.quit:
return
}
}
}
// Log implements log.Logger.
func (d *Deduper) Log(keyvals ...interface{}) error {
line, err := encode(keyvals...)
if err != nil {
return err
}
d.mtx.RLock()
last, ok := d.seen[line]
d.mtx.RUnlock()
if ok && time.Since(last) < d.repeat {
return nil
}
d.mtx.Lock()
d.seen[line] = time.Now()
d.mtx.Unlock()
return d.next.Log(keyvals...)
}
func encode(keyvals ...interface{}) (string, error) {
enc := logfmtEncoderPool.Get().(*logfmtEncoder)
enc.buf.Reset()
defer logfmtEncoderPool.Put(enc)
if err := enc.EncodeKeyvals(keyvals...); err != nil {
return "", err
}
// Add newline to the end of the buffer
if err := enc.EndRecord(); err != nil {
return "", err
}
return enc.buf.String(), nil
}

View file

@ -0,0 +1,46 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
type counter int
func (c *counter) Log(keyvals ...interface{}) error {
(*c)++
return nil
}
func TestDedupe(t *testing.T) {
var c counter
d := Dedupe(&c, 100*time.Millisecond)
defer d.Stop()
// Log 10 times quickly, ensure they are deduped.
for i := 0; i < 10; i++ {
err := d.Log("msg", "hello")
require.NoError(t, err)
}
require.Equal(t, 1, int(c))
// Wait, then log again, make sure it is logged.
time.Sleep(200 * time.Millisecond)
err := d.Log("msg", "hello")
require.NoError(t, err)
require.Equal(t, 2, int(c))
}

38
pkg/logging/ratelimit.go Normal file
View file

@ -0,0 +1,38 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (
"github.com/go-kit/kit/log"
"golang.org/x/time/rate"
)
type ratelimiter struct {
limiter *rate.Limiter
next log.Logger
}
// RateLimit write to a loger.
func RateLimit(next log.Logger, limit rate.Limit) log.Logger {
return &ratelimiter{
limiter: rate.NewLimiter(limit, int(limit)),
next: next,
}
}
func (r *ratelimiter) Log(keyvals ...interface{}) error {
if r.limiter.Allow() {
return r.next.Log(keyvals...)
}
return nil
}

View file

@ -21,8 +21,6 @@ import (
"sync/atomic"
"time"
"golang.org/x/time/rate"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
@ -50,10 +48,6 @@ const (
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
// Limit to 1 log event every 10s
logRateLimit = 0.1
logBurst = 10
)
var (
@ -194,7 +188,6 @@ type QueueManager struct {
relabelConfigs []*pkgrelabel.Config
client StorageClient
queueName string
logLimiter *rate.Limiter
watcher *WALWatcher
lastSendTimestampMetric prometheus.Gauge
highestSentTimestampMetric prometheus.Gauge
@ -244,7 +237,6 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high
seriesSegmentIndexes: make(map[uint64]int),
droppedSeries: make(map[uint64]struct{}),
logLimiter: rate.NewLimiter(logRateLimit, logBurst),
numShards: cfg.MinShards,
reshardChan: make(chan int),
quit: make(chan struct{}),
@ -290,7 +282,7 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool {
// If we have no labels for the series, due to relabelling or otherwise, don't send the sample.
if _, ok := t.seriesLabels[sample.Ref]; !ok {
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
if _, ok := t.droppedSeries[sample.Ref]; !ok && t.logLimiter.Allow() {
if _, ok := t.droppedSeries[sample.Ref]; !ok {
level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", sample.Ref)
}
continue
@ -700,7 +692,7 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples)
if err != nil && s.qm.logLimiter.Allow() {
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
}
@ -743,10 +735,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err
}
retriedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
if s.qm.logLimiter.Allow() {
level.Error(s.qm.logger).Log("err", err)
}
level.Error(s.qm.logger).Log("err", err)
time.Sleep(time.Duration(backoff))
backoff = backoff * 2

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/logging"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
)
@ -59,7 +60,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
}
shardUpdateDuration := 10 * time.Second
s := &Storage{
logger: l,
logger: logging.RateLimit(logging.Dedupe(l, 1*time.Minute), 1),
localStartTimeCallback: stCallback,
flushDeadline: flushDeadline,
walDir: walDir,