mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Review feedback.
- Update read path to use labels.Labels. - Fix the tests. - Remove pack. - Remove unused function. - Fix race in tests. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
1a7923dde3
commit
807fd33ecc
|
@ -26,9 +26,9 @@ import (
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -138,7 +138,7 @@ func (m *Manager) reload() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setJitterSeed calculates a global jitterSeed per server relying on extra label set.
|
// setJitterSeed calculates a global jitterSeed per server relying on extra label set.
|
||||||
func (m *Manager) setJitterSeed(labels model.LabelSet) error {
|
func (m *Manager) setJitterSeed(labels labels.Labels) error {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
hostname, err := getFqdn()
|
hostname, err := getFqdn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -60,7 +60,6 @@ func (p *pool) intern(s string) string {
|
||||||
return interned.s
|
return interned.s
|
||||||
}
|
}
|
||||||
|
|
||||||
s = pack(s)
|
|
||||||
p.pool[s] = &entry{
|
p.pool[s] = &entry{
|
||||||
s: s,
|
s: s,
|
||||||
refs: 1,
|
refs: 1,
|
||||||
|
@ -89,11 +88,3 @@ func (p *pool) release(s string) {
|
||||||
}
|
}
|
||||||
delete(p.pool, s)
|
delete(p.pool, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrPack returns a new instance of s which is tightly packed in memory.
|
|
||||||
// It is intended for avoiding the situation where having a live reference
|
|
||||||
// to a string slice over an unreferenced biger underlying string keeps the biger one
|
|
||||||
// in memory anyway - it can't be GCed.
|
|
||||||
func pack(s string) string {
|
|
||||||
return string([]byte(s))
|
|
||||||
}
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -79,7 +80,9 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
|
|
||||||
|
interner.mtx.RLock()
|
||||||
interned, ok = interner.pool[testString]
|
interned, ok = interner.pool[testString]
|
||||||
|
interner.mtx.RUnlock()
|
||||||
testutil.Equals(t, ok, true)
|
testutil.Equals(t, ok, true)
|
||||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
testutil.Assert(t, atomic.LoadInt64(&interned.refs) == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||||
}
|
}
|
||||||
|
|
|
@ -325,6 +325,13 @@ func (t *QueueManager) Stop() {
|
||||||
t.shards.stop()
|
t.shards.stop()
|
||||||
t.watcher.Stop()
|
t.watcher.Stop()
|
||||||
t.wg.Wait()
|
t.wg.Wait()
|
||||||
|
|
||||||
|
// On shutdown, release the strings in the labels from the intern pool.
|
||||||
|
t.seriesMtx.Lock()
|
||||||
|
defer t.seriesMtx.Unlock()
|
||||||
|
for _, labels := range t.seriesLabels {
|
||||||
|
release(labels)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
||||||
|
@ -345,6 +352,9 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) {
|
||||||
for ref, labels := range temp {
|
for ref, labels := range temp {
|
||||||
t.seriesSegmentIndexes[ref] = index
|
t.seriesSegmentIndexes[ref] = index
|
||||||
|
|
||||||
|
// We should not ever be replacing a series labels in the map, but just
|
||||||
|
// in case we do we need to ensure we do not leak the replaced interned
|
||||||
|
// strings.
|
||||||
if orig, ok := t.seriesLabels[ref]; ok {
|
if orig, ok := t.seriesLabels[ref]; ok {
|
||||||
release(orig)
|
release(orig)
|
||||||
}
|
}
|
||||||
|
@ -377,7 +387,7 @@ func release(ls []prompb.Label) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processExternalLabels merges externalLabels into ls. If ls contains
|
// processExternalLabels merges externalLabels into ls. If ls contains
|
||||||
// a label in externalLabels, the value in ls wins.
|
// a label in externalLabels, the value in ls wins.
|
||||||
func processExternalLabels(ls tsdbLabels.Labels, externalLabels labels.Labels) labels.Labels {
|
func processExternalLabels(ls tsdbLabels.Labels, externalLabels labels.Labels) labels.Labels {
|
||||||
i, j, result := 0, 0, make(labels.Labels, 0, len(ls)+len(externalLabels))
|
i, j, result := 0, 0, make(labels.Labels, 0, len(ls)+len(externalLabels))
|
||||||
|
|
|
@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.seriesLabels = refSeriesToLabelsProto(series)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -89,7 +89,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.seriesLabels = refSeriesToLabelsProto(series)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||||
m.seriesLabels = refSeriesToLabelsProto(series)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -148,7 +148,7 @@ func TestShutdown(t *testing.T) {
|
||||||
|
|
||||||
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||||
samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend)
|
samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend)
|
||||||
m.seriesLabels = refSeriesToLabelsProto(series)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
|
|
||||||
// Append blocks to guarantee delivery, so we do it in the background.
|
// Append blocks to guarantee delivery, so we do it in the background.
|
||||||
|
@ -211,7 +211,7 @@ func TestReshard(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
m.seriesLabels = refSeriesToLabelsProto(series)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -260,19 +260,6 @@ func getSeriesNameFromRef(r tsdb.RefSeries) string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func refSeriesToLabelsProto(series []tsdb.RefSeries) map[uint64][]prompb.Label {
|
|
||||||
result := make(map[uint64][]prompb.Label)
|
|
||||||
for _, s := range series {
|
|
||||||
for _, l := range s.Labels {
|
|
||||||
result[s.Ref] = append(result[s.Ref], prompb.Label{
|
|
||||||
Name: l.Name,
|
|
||||||
Value: l.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
type TestStorageClient struct {
|
type TestStorageClient struct {
|
||||||
receivedSamples map[string][]prompb.Sample
|
receivedSamples map[string][]prompb.Sample
|
||||||
expectedSamples map[string][]prompb.Sample
|
expectedSamples map[string][]prompb.Sample
|
||||||
|
|
|
@ -17,7 +17,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
@ -195,18 +194,23 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la
|
||||||
// We return the new set of matchers, along with a map of labels for which
|
// We return the new set of matchers, along with a map of labels for which
|
||||||
// matchers were added, so that these can later be removed from the result
|
// matchers were added, so that these can later be removed from the result
|
||||||
// time series again.
|
// time series again.
|
||||||
func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) {
|
func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) {
|
||||||
el := make(model.LabelSet, len(q.externalLabels))
|
el := make(labels.Labels, len(q.externalLabels))
|
||||||
for _, l := range q.externalLabels {
|
copy(el, q.externalLabels)
|
||||||
el[model.LabelName(l.Name)] = model.LabelValue(l.Value)
|
|
||||||
}
|
// ms won't be sorted, so have to O(n^2) the search.
|
||||||
for _, m := range ms {
|
for _, m := range ms {
|
||||||
if _, ok := el[model.LabelName(m.Name)]; ok {
|
for i := 0; i < len(el); {
|
||||||
delete(el, model.LabelName(m.Name))
|
if el[i].Name == m.Name {
|
||||||
|
el = el[:i+copy(el[i:], el[i+1:])]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for k, v := range el {
|
|
||||||
m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v))
|
for _, l := range el {
|
||||||
|
m, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -215,7 +219,7 @@ func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*label
|
||||||
return ms, el
|
return ms, el
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet {
|
func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet {
|
||||||
return &seriesSetFilter{
|
return &seriesSetFilter{
|
||||||
SeriesSet: ss,
|
SeriesSet: ss,
|
||||||
toFilter: toFilter,
|
toFilter: toFilter,
|
||||||
|
@ -224,7 +228,7 @@ func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.S
|
||||||
|
|
||||||
type seriesSetFilter struct {
|
type seriesSetFilter struct {
|
||||||
storage.SeriesSet
|
storage.SeriesSet
|
||||||
toFilter model.LabelSet
|
toFilter labels.Labels
|
||||||
querier storage.Querier
|
querier storage.Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,17 +249,20 @@ func (ssf seriesSetFilter) At() storage.Series {
|
||||||
|
|
||||||
type seriesFilter struct {
|
type seriesFilter struct {
|
||||||
storage.Series
|
storage.Series
|
||||||
toFilter model.LabelSet
|
toFilter labels.Labels
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sf seriesFilter) Labels() labels.Labels {
|
func (sf seriesFilter) Labels() labels.Labels {
|
||||||
labels := sf.Series.Labels()
|
labels := sf.Series.Labels()
|
||||||
for i := 0; i < len(labels); {
|
for i, j := 0, 0; i < len(labels) && j < len(sf.toFilter); {
|
||||||
if _, ok := sf.toFilter[model.LabelName(labels[i].Name)]; ok {
|
if labels[i].Name < sf.toFilter[j].Name {
|
||||||
|
i++
|
||||||
|
} else if labels[i].Name > sf.toFilter[j].Name {
|
||||||
|
j++
|
||||||
|
} else {
|
||||||
labels = labels[:i+copy(labels[i:], labels[i+1:])]
|
labels = labels[:i+copy(labels[i:], labels[i+1:])]
|
||||||
continue
|
j++
|
||||||
}
|
}
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
return labels
|
return labels
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
@ -33,7 +32,6 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func TestExternalLabelsQuerierSelect(t *testing.T) {
|
func TestExternalLabelsQuerierSelect(t *testing.T) {
|
||||||
matchers := []*labels.Matcher{
|
matchers := []*labels.Matcher{
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
||||||
|
@ -52,14 +50,14 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
|
||||||
if !reflect.DeepEqual(want, have) {
|
if !reflect.DeepEqual(want, have) {
|
||||||
t.Errorf("expected series set %+v, got %+v", want, have)
|
t.Errorf("expected series set %+v, got %+v", want, have)
|
||||||
}
|
}
|
||||||
}*/
|
}
|
||||||
|
|
||||||
func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
el labels.Labels
|
el labels.Labels
|
||||||
inMatchers []*labels.Matcher
|
inMatchers []*labels.Matcher
|
||||||
outMatchers []*labels.Matcher
|
outMatchers []*labels.Matcher
|
||||||
added model.LabelSet
|
added labels.Labels
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
inMatchers: []*labels.Matcher{
|
inMatchers: []*labels.Matcher{
|
||||||
|
@ -68,12 +66,12 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
||||||
outMatchers: []*labels.Matcher{
|
outMatchers: []*labels.Matcher{
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
||||||
},
|
},
|
||||||
added: model.LabelSet{},
|
added: labels.Labels{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
el: labels.Labels{
|
el: labels.Labels{
|
||||||
{Name: "region", Value: "europe"},
|
|
||||||
{Name: "dc", Value: "berlin-01"},
|
{Name: "dc", Value: "berlin-01"},
|
||||||
|
{Name: "region", Value: "europe"},
|
||||||
},
|
},
|
||||||
inMatchers: []*labels.Matcher{
|
inMatchers: []*labels.Matcher{
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
||||||
|
@ -83,7 +81,10 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"),
|
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"),
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"),
|
mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"),
|
||||||
},
|
},
|
||||||
added: model.LabelSet{"region": "europe", "dc": "berlin-01"},
|
added: labels.Labels{
|
||||||
|
{Name: "dc", Value: "berlin-01"},
|
||||||
|
{Name: "region", Value: "europe"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
el: labels.Labels{
|
el: labels.Labels{
|
||||||
|
@ -99,7 +100,9 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"),
|
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"),
|
||||||
mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"),
|
mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"),
|
||||||
},
|
},
|
||||||
added: model.LabelSet{"region": "europe"},
|
added: labels.Labels{
|
||||||
|
{Name: "region", Value: "europe"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,12 +125,12 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
||||||
func TestSeriesSetFilter(t *testing.T) {
|
func TestSeriesSetFilter(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
in *prompb.QueryResult
|
in *prompb.QueryResult
|
||||||
toRemove model.LabelSet
|
toRemove labels.Labels
|
||||||
|
|
||||||
expected *prompb.QueryResult
|
expected *prompb.QueryResult
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
toRemove: model.LabelSet{"foo": "bar"},
|
toRemove: labels.Labels{{Name: "foo", Value: "bar"}},
|
||||||
in: &prompb.QueryResult{
|
in: &prompb.QueryResult{
|
||||||
Timeseries: []*prompb.TimeSeries{
|
Timeseries: []*prompb.TimeSeries{
|
||||||
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []prompb.Sample{}},
|
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []prompb.Sample{}},
|
||||||
|
|
Loading…
Reference in a new issue