mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
storage/remote: improve symbol-table handling
On the incoming path, `writeHandler.write()` creates a new table for each request. `labelProtosToLabels` takes a `ScratchBuilder` now. Call `NewScratchBuilder` as required in tests. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
8f525b4ba4
commit
2ac1632eec
|
@ -176,12 +176,13 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
||||||
|
|
||||||
// FromQueryResult unpacks and sorts a QueryResult proto.
|
// FromQueryResult unpacks and sorts a QueryResult proto.
|
||||||
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
|
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
|
||||||
|
b := labels.NewScratchBuilder(0)
|
||||||
series := make([]storage.Series, 0, len(res.Timeseries))
|
series := make([]storage.Series, 0, len(res.Timeseries))
|
||||||
for _, ts := range res.Timeseries {
|
for _, ts := range res.Timeseries {
|
||||||
if err := validateLabelsAndMetricName(ts.Labels); err != nil {
|
if err := validateLabelsAndMetricName(ts.Labels); err != nil {
|
||||||
return errSeriesSet{err: err}
|
return errSeriesSet{err: err}
|
||||||
}
|
}
|
||||||
lbls := labelProtosToLabels(ts.Labels)
|
lbls := labelProtosToLabels(&b, ts.Labels)
|
||||||
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
|
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -616,11 +617,11 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
|
func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemplar.Exemplar {
|
||||||
timestamp := ep.Timestamp
|
timestamp := ep.Timestamp
|
||||||
|
|
||||||
return exemplar.Exemplar{
|
return exemplar.Exemplar{
|
||||||
Labels: labelProtosToLabels(ep.Labels),
|
Labels: labelProtosToLabels(b, ep.Labels),
|
||||||
Value: ep.Value,
|
Value: ep.Value,
|
||||||
Ts: timestamp,
|
Ts: timestamp,
|
||||||
HasTs: timestamp != 0,
|
HasTs: timestamp != 0,
|
||||||
|
@ -760,8 +761,8 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
||||||
return metric
|
return metric
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
|
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels {
|
||||||
b := labels.ScratchBuilder{}
|
b.Reset()
|
||||||
for _, l := range labelPairs {
|
for _, l := range labelPairs {
|
||||||
b.Add(l.Name, l.Value)
|
b.Add(l.Name, l.Value)
|
||||||
}
|
}
|
||||||
|
|
|
@ -788,10 +788,11 @@ func (m *mockWriter) Write(p []byte) (n int, err error) {
|
||||||
type mockChunkSeriesSet struct {
|
type mockChunkSeriesSet struct {
|
||||||
chunkedSeries []*prompb.ChunkedSeries
|
chunkedSeries []*prompb.ChunkedSeries
|
||||||
index int
|
index int
|
||||||
|
builder labels.ScratchBuilder
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockChunkSeriesSet(ss []*prompb.ChunkedSeries) storage.ChunkSeriesSet {
|
func newMockChunkSeriesSet(ss []*prompb.ChunkedSeries) storage.ChunkSeriesSet {
|
||||||
return &mockChunkSeriesSet{chunkedSeries: ss, index: -1}
|
return &mockChunkSeriesSet{chunkedSeries: ss, index: -1, builder: labels.NewScratchBuilder(0)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockChunkSeriesSet) Next() bool {
|
func (c *mockChunkSeriesSet) Next() bool {
|
||||||
|
@ -801,7 +802,7 @@ func (c *mockChunkSeriesSet) Next() bool {
|
||||||
|
|
||||||
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
|
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
|
||||||
return &storage.ChunkSeriesEntry{
|
return &storage.ChunkSeriesEntry{
|
||||||
Lset: labelProtosToLabels(c.chunkedSeries[c.index].Labels),
|
Lset: labelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels),
|
||||||
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
||||||
return &mockChunkIterator{
|
return &mockChunkIterator{
|
||||||
chunks: c.chunkedSeries[c.index].Chunks,
|
chunks: c.chunkedSeries[c.index].Chunks,
|
||||||
|
|
|
@ -523,7 +523,7 @@ func TestShouldReshard(t *testing.T) {
|
||||||
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
|
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
|
||||||
samples := make([]record.RefSample, 0, numSamples)
|
samples := make([]record.RefSample, 0, numSamples)
|
||||||
series := make([]record.RefSeries, 0, numSeries)
|
series := make([]record.RefSeries, 0, numSeries)
|
||||||
lb := labels.ScratchBuilder{}
|
lb := labels.NewScratchBuilder(1 + len(extraLabels))
|
||||||
for i := 0; i < numSeries; i++ {
|
for i := 0; i < numSeries; i++ {
|
||||||
name := fmt.Sprintf("test_metric_%d", i)
|
name := fmt.Sprintf("test_metric_%d", i)
|
||||||
for j := 0; j < numSamples; j++ {
|
for j := 0; j < numSamples; j++ {
|
||||||
|
@ -760,9 +760,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
|
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
builder := labels.NewScratchBuilder(0)
|
||||||
count := 0
|
count := 0
|
||||||
for _, ts := range reqProto.Timeseries {
|
for _, ts := range reqProto.Timeseries {
|
||||||
labels := labelProtosToLabels(ts.Labels)
|
labels := labelProtosToLabels(&builder, ts.Labels)
|
||||||
seriesName := labels.Get("__name__")
|
seriesName := labels.Get("__name__")
|
||||||
for _, sample := range ts.Samples {
|
for _, sample := range ts.Samples {
|
||||||
count++
|
count++
|
||||||
|
@ -1370,7 +1371,7 @@ func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...la
|
||||||
newSamples := make([]record.RefSample, 0, numSamples)
|
newSamples := make([]record.RefSample, 0, numSamples)
|
||||||
samples := make([]record.RefSample, 0, numSamples)
|
samples := make([]record.RefSample, 0, numSamples)
|
||||||
series := make([]record.RefSeries, 0, numSeries)
|
series := make([]record.RefSeries, 0, numSeries)
|
||||||
lb := labels.ScratchBuilder{}
|
lb := labels.NewScratchBuilder(1 + len(extraLabels))
|
||||||
for i := 0; i < numSeries; i++ {
|
for i := 0; i < numSeries; i++ {
|
||||||
name := fmt.Sprintf("test_metric_%d", i)
|
name := fmt.Sprintf("test_metric_%d", i)
|
||||||
// We create half of the samples in the past.
|
// We create half of the samples in the past.
|
||||||
|
|
|
@ -195,6 +195,7 @@ func TestSeriesSetFilter(t *testing.T) {
|
||||||
type mockedRemoteClient struct {
|
type mockedRemoteClient struct {
|
||||||
got *prompb.Query
|
got *prompb.Query
|
||||||
store []*prompb.TimeSeries
|
store []*prompb.TimeSeries
|
||||||
|
b labels.ScratchBuilder
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||||
|
@ -210,7 +211,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom
|
||||||
|
|
||||||
q := &prompb.QueryResult{}
|
q := &prompb.QueryResult{}
|
||||||
for _, s := range c.store {
|
for _, s := range c.store {
|
||||||
l := labelProtosToLabels(s.Labels)
|
l := labelProtosToLabels(&c.b, s.Labels)
|
||||||
var notMatch bool
|
var notMatch bool
|
||||||
|
|
||||||
for _, m := range matchers {
|
for _, m := range matchers {
|
||||||
|
@ -242,6 +243,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
|
||||||
{Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
|
{Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
|
||||||
{Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
|
{Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
|
||||||
},
|
},
|
||||||
|
b: labels.NewScratchBuilder(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/exemplar"
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
|
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
|
||||||
|
@ -112,9 +113,10 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
err = app.Commit()
|
err = app.Commit()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
b := labels.NewScratchBuilder(0)
|
||||||
var exemplarErr error
|
var exemplarErr error
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
labels := labelProtosToLabels(ts.Labels)
|
labels := labelProtosToLabels(&b, ts.Labels)
|
||||||
if !labels.IsValid() {
|
if !labels.IsValid() {
|
||||||
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
|
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
|
||||||
samplesWithInvalidLabels++
|
samplesWithInvalidLabels++
|
||||||
|
@ -137,7 +139,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ep := range ts.Exemplars {
|
for _, ep := range ts.Exemplars {
|
||||||
e := exemplarProtoToExemplar(ep)
|
e := exemplarProtoToExemplar(&b, ep)
|
||||||
|
|
||||||
_, exemplarErr = app.AppendExemplar(0, labels, e)
|
_, exemplarErr = app.AppendExemplar(0, labels, e)
|
||||||
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
|
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
|
||||||
|
|
|
@ -55,18 +55,19 @@ func TestRemoteWriteHandler(t *testing.T) {
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
|
b := labels.NewScratchBuilder(0)
|
||||||
i := 0
|
i := 0
|
||||||
j := 0
|
j := 0
|
||||||
k := 0
|
k := 0
|
||||||
for _, ts := range writeRequestFixture.Timeseries {
|
for _, ts := range writeRequestFixture.Timeseries {
|
||||||
labels := labelProtosToLabels(ts.Labels)
|
labels := labelProtosToLabels(&b, ts.Labels)
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range ts.Exemplars {
|
for _, e := range ts.Exemplars {
|
||||||
exemplarLabels := labelProtosToLabels(e.Labels)
|
exemplarLabels := labelProtosToLabels(&b, e.Labels)
|
||||||
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue