From 807fd33ecc97297cde8c0e9d8127368fcf6aa461 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 13 Mar 2019 10:02:36 +0000 Subject: [PATCH] Review feedback. - Update read path to use labels.Labels. - Fix the tests. - Remove pack. - Remove unused function. - Fix race in tests. Signed-off-by: Tom Wilkie --- scrape/manager.go | 4 +-- storage/remote/intern.go | 9 ------ storage/remote/intern_test.go | 5 +++- storage/remote/queue_manager.go | 12 +++++++- storage/remote/queue_manager_test.go | 23 ++++------------ storage/remote/read.go | 41 ++++++++++++++++------------ storage/remote/read_test.go | 23 +++++++++------- 7 files changed, 59 insertions(+), 58 deletions(-) diff --git a/scrape/manager.go b/scrape/manager.go index 163d7ed14d..cf8154aa60 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -26,9 +26,9 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -138,7 +138,7 @@ func (m *Manager) reload() { } // setJitterSeed calculates a global jitterSeed per server relying on extra label set. -func (m *Manager) setJitterSeed(labels model.LabelSet) error { +func (m *Manager) setJitterSeed(labels labels.Labels) error { h := fnv.New64a() hostname, err := getFqdn() if err != nil { diff --git a/storage/remote/intern.go b/storage/remote/intern.go index 785741f858..ab2741be5e 100644 --- a/storage/remote/intern.go +++ b/storage/remote/intern.go @@ -60,7 +60,6 @@ func (p *pool) intern(s string) string { return interned.s } - s = pack(s) p.pool[s] = &entry{ s: s, refs: 1, @@ -89,11 +88,3 @@ func (p *pool) release(s string) { } delete(p.pool, s) } - -// StrPack returns a new instance of s which is tightly packed in memory. -// It is intended for avoiding the situation where having a live reference -// to a string slice over an unreferenced biger underlying string keeps the biger one -// in memory anyway - it can't be GCed. -func pack(s string) string { - return string([]byte(s)) -} diff --git a/storage/remote/intern_test.go b/storage/remote/intern_test.go index c6a915356c..ec17f96ad7 100644 --- a/storage/remote/intern_test.go +++ b/storage/remote/intern_test.go @@ -20,6 +20,7 @@ package remote import ( "fmt" + "sync/atomic" "testing" "time" @@ -79,7 +80,9 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) { time.Sleep(time.Millisecond) + interner.mtx.RLock() interned, ok = interner.pool[testString] + interner.mtx.RUnlock() testutil.Equals(t, ok, true) - testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + testutil.Assert(t, atomic.LoadInt64(&interned.refs) == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index dd56102379..e9a5dba87e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -325,6 +325,13 @@ func (t *QueueManager) Stop() { t.shards.stop() t.watcher.Stop() t.wg.Wait() + + // On shutdown, release the strings in the labels from the intern pool. + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() + for _, labels := range t.seriesLabels { + release(labels) + } } // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. @@ -345,6 +352,9 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { for ref, labels := range temp { t.seriesSegmentIndexes[ref] = index + // We should not ever be replacing a series labels in the map, but just + // in case we do we need to ensure we do not leak the replaced interned + // strings. if orig, ok := t.seriesLabels[ref]; ok { release(orig) } @@ -377,7 +387,7 @@ func release(ls []prompb.Label) { } } -// processExternalLabels merges externalLabels into ls. If ls contains +// processExternalLabels merges externalLabels into ls. If ls contains // a label in externalLabels, the value in ls wins. func processExternalLabels(ls tsdbLabels.Labels, externalLabels labels.Labels) labels.Labels { i, j, result := 0, 0, make(labels.Labels, 0, len(ls)+len(externalLabels)) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 01f9de32a3..48ecc50523 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) { defer os.RemoveAll(dir) m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) - m.seriesLabels = refSeriesToLabelsProto(series) + m.StoreSeries(series, 0) // These should be received by the client. m.Start() @@ -89,7 +89,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { defer os.RemoveAll(dir) m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) - m.seriesLabels = refSeriesToLabelsProto(series) + m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -129,7 +129,7 @@ func TestSampleDeliveryOrder(t *testing.T) { defer os.RemoveAll(dir) m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) - m.seriesLabels = refSeriesToLabelsProto(series) + m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -148,7 +148,7 @@ func TestShutdown(t *testing.T) { m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) - m.seriesLabels = refSeriesToLabelsProto(series) + m.StoreSeries(series, 0) m.Start() // Append blocks to guarantee delivery, so we do it in the background. @@ -211,7 +211,7 @@ func TestReshard(t *testing.T) { defer os.RemoveAll(dir) m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) - m.seriesLabels = refSeriesToLabelsProto(series) + m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -260,19 +260,6 @@ func getSeriesNameFromRef(r tsdb.RefSeries) string { return "" } -func refSeriesToLabelsProto(series []tsdb.RefSeries) map[uint64][]prompb.Label { - result := make(map[uint64][]prompb.Label) - for _, s := range series { - for _, l := range s.Labels { - result[s.Ref] = append(result[s.Ref], prompb.Label{ - Name: l.Name, - Value: l.Value, - }) - } - } - return result -} - type TestStorageClient struct { receivedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample diff --git a/storage/remote/read.go b/storage/remote/read.go index 5d77c26fe7..eefb826b52 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -17,7 +17,6 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -195,18 +194,23 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la // We return the new set of matchers, along with a map of labels for which // matchers were added, so that these can later be removed from the result // time series again. -func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { - el := make(model.LabelSet, len(q.externalLabels)) - for _, l := range q.externalLabels { - el[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } +func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) { + el := make(labels.Labels, len(q.externalLabels)) + copy(el, q.externalLabels) + + // ms won't be sorted, so have to O(n^2) the search. for _, m := range ms { - if _, ok := el[model.LabelName(m.Name)]; ok { - delete(el, model.LabelName(m.Name)) + for i := 0; i < len(el); { + if el[i].Name == m.Name { + el = el[:i+copy(el[i:], el[i+1:])] + continue + } + i++ } } - for k, v := range el { - m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v)) + + for _, l := range el { + m, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) if err != nil { panic(err) } @@ -215,7 +219,7 @@ func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*label return ms, el } -func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { +func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet { return &seriesSetFilter{ SeriesSet: ss, toFilter: toFilter, @@ -224,7 +228,7 @@ func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.S type seriesSetFilter struct { storage.SeriesSet - toFilter model.LabelSet + toFilter labels.Labels querier storage.Querier } @@ -245,17 +249,20 @@ func (ssf seriesSetFilter) At() storage.Series { type seriesFilter struct { storage.Series - toFilter model.LabelSet + toFilter labels.Labels } func (sf seriesFilter) Labels() labels.Labels { labels := sf.Series.Labels() - for i := 0; i < len(labels); { - if _, ok := sf.toFilter[model.LabelName(labels[i].Name)]; ok { + for i, j := 0, 0; i < len(labels) && j < len(sf.toFilter); { + if labels[i].Name < sf.toFilter[j].Name { + i++ + } else if labels[i].Name > sf.toFilter[j].Name { + j++ + } else { labels = labels[:i+copy(labels[i:], labels[i+1:])] - continue + j++ } - i++ } return labels } diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index efeb76db63..650f80df3d 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -19,7 +19,6 @@ import ( "sort" "testing" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -33,7 +32,6 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher return m } -/* func TestExternalLabelsQuerierSelect(t *testing.T) { matchers := []*labels.Matcher{ mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), @@ -52,14 +50,14 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { if !reflect.DeepEqual(want, have) { t.Errorf("expected series set %+v, got %+v", want, have) } -}*/ +} func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { tests := []struct { el labels.Labels inMatchers []*labels.Matcher outMatchers []*labels.Matcher - added model.LabelSet + added labels.Labels }{ { inMatchers: []*labels.Matcher{ @@ -68,12 +66,12 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { outMatchers: []*labels.Matcher{ mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), }, - added: model.LabelSet{}, + added: labels.Labels{}, }, { el: labels.Labels{ - {Name: "region", Value: "europe"}, {Name: "dc", Value: "berlin-01"}, + {Name: "region", Value: "europe"}, }, inMatchers: []*labels.Matcher{ mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), @@ -83,7 +81,10 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"), }, - added: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + added: labels.Labels{ + {Name: "dc", Value: "berlin-01"}, + {Name: "region", Value: "europe"}, + }, }, { el: labels.Labels{ @@ -99,7 +100,9 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), }, - added: model.LabelSet{"region": "europe"}, + added: labels.Labels{ + {Name: "region", Value: "europe"}, + }, }, } @@ -122,12 +125,12 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { func TestSeriesSetFilter(t *testing.T) { tests := []struct { in *prompb.QueryResult - toRemove model.LabelSet + toRemove labels.Labels expected *prompb.QueryResult }{ { - toRemove: model.LabelSet{"foo": "bar"}, + toRemove: labels.Labels{{Name: "foo", Value: "bar"}}, in: &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ {Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []prompb.Sample{}},