Remote Storage: string interner should not panic in release (#5487)

* Don't panic if we try to release a string that is not in the interner.

* Move seriesMtx locking in QueueManager's StoreSeries function.

This stops us from calling release for strings that aren't interned if
there's a race between reading a checkpoint and storing new series
labels, which could happen during checkpointing or reloading config.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2019-04-24 02:46:31 -07:00 committed by Brian Brazil
parent 24efe92593
commit 3639d51eb6
5 changed files with 237 additions and 3 deletions

View file

@ -21,9 +21,18 @@ package remote
import (
"sync"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var interner = newPool()
var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "string_interner_zero_reference_releases_total",
Help: "The number of times release has been called for strings that are not interned.",
})
type pool struct {
mtx sync.RWMutex
@ -73,7 +82,8 @@ func (p *pool) release(s string) {
p.mtx.RUnlock()
if !ok {
panic("released unknown string")
noReferenceReleases.Inc()
return
}
refs := atomic.AddInt64(&interned.refs, -1)

View file

@ -357,6 +357,10 @@ func (t *QueueManager) Stop() {
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) {
// Lock before any calls to labelsToLabels proto, as that's where string interning is done.
t.seriesMtx.Lock()
defer t.seriesMtx.Unlock()
temp := make(map[uint64][]prompb.Label, len(series))
for _, s := range series {
ls := processExternalLabels(s.Labels, t.externalLabels)
@ -368,8 +372,6 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) {
temp[s.Ref] = labelsToLabelsProto(rl)
}
t.seriesMtx.Lock()
defer t.seriesMtx.Unlock()
for ref, labels := range temp {
t.seriesSegmentIndexes[ref] = index

View file

@ -32,6 +32,7 @@ import (
"github.com/golang/snappy"
"github.com/stretchr/testify/require"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
@ -257,6 +258,39 @@ func TestReshardRaceWithStop(t *testing.T) {
}
}
func TestReleaseNoninternedString(t *testing.T) {
c := NewTestStorageClient()
var m *QueueManager
h := sync.Mutex{}
h.Lock()
m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start()
go func() {
for {
m.SeriesReset(1)
}
}()
for i := 1; i < 1000; i++ {
m.StoreSeries([]tsdb.RefSeries{
tsdb.RefSeries{
Ref: uint64(i),
Labels: tsdbLabels.Labels{
tsdbLabels.Label{
Name: "asdf",
Value: fmt.Sprintf("%d", i),
},
},
},
}, 0)
}
metric := client_testutil.ToFloat64(noReferenceReleases)
testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
}
func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) {
samples := make([]tsdb.RefSample, 0, n)
series := make([]tsdb.RefSeries, 0, n)

View file

@ -0,0 +1,187 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package testutil provides helpers to test code using the prometheus package
// of client_golang.
//
// While writing unit tests to verify correct instrumentation of your code, it's
// a common mistake to mostly test the instrumentation library instead of your
// own code. Rather than verifying that a prometheus.Counter's value has changed
// as expected or that it shows up in the exposition after registration, it is
// in general more robust and more faithful to the concept of unit tests to use
// mock implementations of the prometheus.Counter and prometheus.Registerer
// interfaces that simply assert that the Add or Register methods have been
// called with the expected arguments. However, this might be overkill in simple
// scenarios. The ToFloat64 function is provided for simple inspection of a
// single-value metric, but it has to be used with caution.
//
// End-to-end tests to verify all or larger parts of the metrics exposition can
// be implemented with the CollectAndCompare or GatherAndCompare functions. The
// most appropriate use is not so much testing instrumentation of your code, but
// testing custom prometheus.Collector implementations and in particular whole
// exporters, i.e. programs that retrieve telemetry data from a 3rd party source
// and convert it into Prometheus metrics.
package testutil
import (
"bytes"
"fmt"
"io"
"github.com/prometheus/common/expfmt"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/internal"
)
// ToFloat64 collects all Metrics from the provided Collector. It expects that
// this results in exactly one Metric being collected, which must be a Gauge,
// Counter, or Untyped. In all other cases, ToFloat64 panics. ToFloat64 returns
// the value of the collected Metric.
//
// The Collector provided is typically a simple instance of Gauge or Counter, or
// less commonly a GaugeVec or CounterVec with exactly one element. But any
// Collector fulfilling the prerequisites described above will do.
//
// Use this function with caution. It is computationally very expensive and thus
// not suited at all to read values from Metrics in regular code. This is really
// only for testing purposes, and even for testing, other approaches are often
// more appropriate (see this package's documentation).
//
// A clear anti-pattern would be to use a metric type from the prometheus
// package to track values that are also needed for something else than the
// exposition of Prometheus metrics. For example, you would like to track the
// number of items in a queue because your code should reject queuing further
// items if a certain limit is reached. It is tempting to track the number of
// items in a prometheus.Gauge, as it is then easily available as a metric for
// exposition, too. However, then you would need to call ToFloat64 in your
// regular code, potentially quite often. The recommended way is to track the
// number of items conventionally (in the way you would have done it without
// considering Prometheus metrics) and then expose the number with a
// prometheus.GaugeFunc.
func ToFloat64(c prometheus.Collector) float64 {
var (
m prometheus.Metric
mCount int
mChan = make(chan prometheus.Metric)
done = make(chan struct{})
)
go func() {
for m = range mChan {
mCount++
}
close(done)
}()
c.Collect(mChan)
close(mChan)
<-done
if mCount != 1 {
panic(fmt.Errorf("collected %d metrics instead of exactly 1", mCount))
}
pb := &dto.Metric{}
m.Write(pb)
if pb.Gauge != nil {
return pb.Gauge.GetValue()
}
if pb.Counter != nil {
return pb.Counter.GetValue()
}
if pb.Untyped != nil {
return pb.Untyped.GetValue()
}
panic(fmt.Errorf("collected a non-gauge/counter/untyped metric: %s", pb))
}
// CollectAndCompare registers the provided Collector with a newly created
// pedantic Registry. It then does the same as GatherAndCompare, gathering the
// metrics from the pedantic Registry.
func CollectAndCompare(c prometheus.Collector, expected io.Reader, metricNames ...string) error {
reg := prometheus.NewPedanticRegistry()
if err := reg.Register(c); err != nil {
return fmt.Errorf("registering collector failed: %s", err)
}
return GatherAndCompare(reg, expected, metricNames...)
}
// GatherAndCompare gathers all metrics from the provided Gatherer and compares
// it to an expected output read from the provided Reader in the Prometheus text
// exposition format. If any metricNames are provided, only metrics with those
// names are compared.
func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error {
got, err := g.Gather()
if err != nil {
return fmt.Errorf("gathering metrics failed: %s", err)
}
if metricNames != nil {
got = filterMetrics(got, metricNames)
}
var tp expfmt.TextParser
wantRaw, err := tp.TextToMetricFamilies(expected)
if err != nil {
return fmt.Errorf("parsing expected metrics failed: %s", err)
}
want := internal.NormalizeMetricFamilies(wantRaw)
return compare(got, want)
}
// compare encodes both provided slices of metric families into the text format,
// compares their string message, and returns an error if they do not match.
// The error contains the encoded text of both the desired and the actual
// result.
func compare(got, want []*dto.MetricFamily) error {
var gotBuf, wantBuf bytes.Buffer
enc := expfmt.NewEncoder(&gotBuf, expfmt.FmtText)
for _, mf := range got {
if err := enc.Encode(mf); err != nil {
return fmt.Errorf("encoding gathered metrics failed: %s", err)
}
}
enc = expfmt.NewEncoder(&wantBuf, expfmt.FmtText)
for _, mf := range want {
if err := enc.Encode(mf); err != nil {
return fmt.Errorf("encoding expected metrics failed: %s", err)
}
}
if wantBuf.String() != gotBuf.String() {
return fmt.Errorf(`
metric output does not match expectation; want:
%s
got:
%s`, wantBuf.String(), gotBuf.String())
}
return nil
}
func filterMetrics(metrics []*dto.MetricFamily, names []string) []*dto.MetricFamily {
var filtered []*dto.MetricFamily
for _, m := range metrics {
for _, name := range names {
if m.GetName() == name {
filtered = append(filtered, m)
break
}
}
}
return filtered
}

1
vendor/modules.txt vendored
View file

@ -231,6 +231,7 @@ github.com/prometheus/client_golang/api/prometheus/v1
github.com/prometheus/client_golang/prometheus/promhttp
github.com/prometheus/client_golang/prometheus/promauto
github.com/prometheus/client_golang/prometheus/internal
github.com/prometheus/client_golang/prometheus/testutil
# github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f
github.com/prometheus/client_model/go
# github.com/prometheus/common v0.3.0