prometheus/storage/remote/codec_test.go
Tom Wilkie d479151f1f Various enhancements and refactorings for remote write receiver:
- Remove unrelated changes
- Refactor code out of the API module - that is already getting pretty crowded.
- Don't track reference for AddFast in remote write.  This has the potential to consume unlimited server-side memory if a malicious client pushes a different label set for every series.  For now, its easier and safer to always use the 'slow' path.
- Return 400 on out of order samples.
- Use remote.DecodeWriteRequest in the remote write adapters.
- Put this behing the 'remote-write-server' feature flag
- Add some (very) basic docs.
- Used named return & add test for commit error propagation

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
2021-02-08 20:41:23 +00:00

300 lines
8.2 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"bytes"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
)
var writeRequestFixture = &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
},
},
}
func TestValidateLabelsAndMetricName(t *testing.T) {
tests := []struct {
input labels.Labels
expectedErr string
description string
}{
{
input: labels.FromStrings(
"__name__", "name",
"labelName", "labelValue",
),
expectedErr: "",
description: "regular labels",
},
{
input: labels.FromStrings(
"__name__", "name",
"_labelName", "labelValue",
),
expectedErr: "",
description: "label name with _",
},
{
input: labels.FromStrings(
"__name__", "name",
"@labelName", "labelValue",
),
expectedErr: "invalid label name: @labelName",
description: "label name with @",
},
{
input: labels.FromStrings(
"__name__", "name",
"123labelName", "labelValue",
),
expectedErr: "invalid label name: 123labelName",
description: "label name starts with numbers",
},
{
input: labels.FromStrings(
"__name__", "name",
"", "labelValue",
),
expectedErr: "invalid label name: ",
description: "label name is empty string",
},
{
input: labels.FromStrings(
"__name__", "name",
"labelName", string([]byte{0xff}),
),
expectedErr: "invalid label value: " + string([]byte{0xff}),
description: "label value is an invalid UTF-8 value",
},
{
input: labels.FromStrings(
"__name__", "@invalid_name",
),
expectedErr: "invalid metric name: @invalid_name",
description: "metric name starts with @",
},
{
input: labels.FromStrings(
"__name__", "name1",
"__name__", "name2",
),
expectedErr: "duplicate label with name: __name__",
description: "duplicate label names",
},
{
input: labels.FromStrings(
"label1", "name",
"label2", "name",
),
expectedErr: "",
description: "duplicate label values",
},
{
input: labels.FromStrings(
"", "name",
"label2", "name",
),
expectedErr: "invalid label name: ",
description: "don't report as duplicate label name",
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
err := validateLabelsAndMetricName(test.input)
if test.expectedErr != "" {
require.Error(t, err)
require.Equal(t, test.expectedErr, err.Error())
} else {
require.NoError(t, err)
}
})
}
}
func TestConcreteSeriesSet(t *testing.T) {
series1 := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
samples: []prompb.Sample{{Value: 1, Timestamp: 2}},
}
series2 := &concreteSeries{
labels: labels.FromStrings("foo", "baz"),
samples: []prompb.Sample{{Value: 3, Timestamp: 4}},
}
c := &concreteSeriesSet{
series: []storage.Series{series1, series2},
}
require.True(t, c.Next(), "Expected Next() to be true.")
require.Equal(t, series1, c.At(), "Unexpected series returned.")
require.True(t, c.Next(), "Expected Next() to be true.")
require.Equal(t, series2, c.At(), "Unexpected series returned.")
require.False(t, c.Next(), "Expected Next() to be false.")
}
func TestConcreteSeriesClonesLabels(t *testing.T) {
lbls := labels.Labels{
labels.Label{Name: "a", Value: "b"},
labels.Label{Name: "c", Value: "d"},
}
cs := concreteSeries{
labels: labels.New(lbls...),
}
gotLabels := cs.Labels()
require.Equal(t, lbls, gotLabels)
gotLabels[0].Value = "foo"
gotLabels[1].Value = "bar"
gotLabels = cs.Labels()
require.Equal(t, lbls, gotLabels)
}
func TestFromQueryResultWithDuplicates(t *testing.T) {
ts1 := prompb.TimeSeries{
Labels: []prompb.Label{
{Name: "foo", Value: "bar"},
{Name: "foo", Value: "def"},
},
Samples: []prompb.Sample{
{Value: 0.0, Timestamp: 0},
},
}
res := prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&ts1,
},
}
series := FromQueryResult(false, &res)
errSeries, isErrSeriesSet := series.(errSeriesSet)
require.True(t, isErrSeriesSet, "Expected resulting series to be an errSeriesSet")
errMessage := errSeries.Err().Error()
require.Equal(t, "duplicate label with name: foo", errMessage, fmt.Sprintf("Expected error to be from duplicate label, but got: %s", errMessage))
}
func TestNegotiateResponseType(t *testing.T) {
r, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
prompb.ReadRequest_SAMPLES,
})
require.NoError(t, err)
require.Equal(t, prompb.ReadRequest_STREAMED_XOR_CHUNKS, r)
r2, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{
prompb.ReadRequest_SAMPLES,
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
})
require.NoError(t, err)
require.Equal(t, prompb.ReadRequest_SAMPLES, r2)
r3, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{})
require.NoError(t, err)
require.Equal(t, prompb.ReadRequest_SAMPLES, r3)
_, err = NegotiateResponseType([]prompb.ReadRequest_ResponseType{20})
require.Error(t, err, "expected error due to not supported requested response types")
require.Equal(t, "server does not support any of the requested response types: [20]; supported: map[SAMPLES:{} STREAMED_XOR_CHUNKS:{}]", err.Error())
}
func TestMergeLabels(t *testing.T) {
for _, tc := range []struct {
primary, secondary, expected []prompb.Label
}{
{
primary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
secondary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
},
{
primary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
secondary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
},
} {
require.Equal(t, tc.expected, MergeLabels(tc.primary, tc.secondary))
}
}
func TestMetricTypeToMetricTypeProto(t *testing.T) {
tc := []struct {
desc string
input textparse.MetricType
expected prompb.MetricMetadata_MetricType
}{
{
desc: "with a single-word metric",
input: textparse.MetricTypeCounter,
expected: prompb.MetricMetadata_COUNTER,
},
{
desc: "with a two-word metric",
input: textparse.MetricTypeStateset,
expected: prompb.MetricMetadata_STATESET,
},
{
desc: "with an unknown metric",
input: "not-known",
expected: prompb.MetricMetadata_UNKNOWN,
},
}
for _, tt := range tc {
t.Run(tt.desc, func(t *testing.T) {
m := metricTypeToMetricTypeProto(tt.input)
require.Equal(t, tt.expected, m)
})
}
}
func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
require.NoError(t, err)
require.Equal(t, writeRequestFixture, actual)
}