This commit is contained in:
Junang Li 2024-07-22 01:46:55 -04:00
parent 2f919193fe
commit ba8397df61
29 changed files with 258 additions and 8925 deletions

View file

@ -28,7 +28,6 @@ import (
"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/client_golang/prometheus"
@ -36,7 +35,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb"
@ -251,7 +249,7 @@ func serve(logger log.Logger, addr string, writers []writer, readers []reader) e
}
var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
if err := req.Unmarshal(reqBuf); err != nil {
level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
@ -272,7 +270,7 @@ func serve(logger log.Logger, addr string, writers []writer, readers []reader) e
return
}
data, err := proto.Marshal(resp)
data, err := resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View file

@ -31,7 +31,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
dto "github.com/prometheus/client_model/go"
)
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
@ -595,7 +595,7 @@ func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
return 0, fmt.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
mf.Reset()
return totalLength, mf.UnmarshalVT(b[varIntLength:totalLength])
return totalLength, proto.Unmarshal(b[varIntLength:totalLength], mf)
}
// formatOpenMetricsFloat works like the usual Go string formatting of a fleat

View file

@ -20,16 +20,16 @@ import (
"io"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/util/testutil"
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
dto "github.com/prometheus/client_model/go"
)
func createTestProtoBuf(t *testing.T) *bytes.Buffer {

File diff suppressed because it is too large Load diff

View file

@ -1,159 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This is copied and lightly edited from
// github.com/prometheus/client_model/io/prometheus/client/metrics.proto
// and finally converted to proto3 syntax to make it usable for the
// gogo-protobuf approach taken within prometheus/prometheus.
syntax = "proto3";
package io.prometheus.client;
option go_package = "io_prometheus_client";
import "google/protobuf/timestamp.proto";
message LabelPair {
string name = 1;
string value = 2;
}
enum MetricType {
// COUNTER must use the Metric field "counter".
COUNTER = 0;
// GAUGE must use the Metric field "gauge".
GAUGE = 1;
// SUMMARY must use the Metric field "summary".
SUMMARY = 2;
// UNTYPED must use the Metric field "untyped".
UNTYPED = 3;
// HISTOGRAM must use the Metric field "histogram".
HISTOGRAM = 4;
// GAUGE_HISTOGRAM must use the Metric field "histogram".
GAUGE_HISTOGRAM = 5;
}
message Gauge {
double value = 1;
}
message Counter {
double value = 1;
Exemplar exemplar = 2;
google.protobuf.Timestamp created_timestamp = 3;
}
message Quantile {
double quantile = 1;
double value = 2;
}
message Summary {
uint64 sample_count = 1;
double sample_sum = 2;
repeated Quantile quantile = 3;
google.protobuf.Timestamp created_timestamp = 4;
}
message Untyped {
double value = 1;
}
message Histogram {
uint64 sample_count = 1;
double sample_count_float = 4; // Overrides sample_count if > 0.
double sample_sum = 2;
// Buckets for the classic histogram.
repeated Bucket bucket = 3; // Ordered in increasing order of upper_bound, +Inf bucket is optional.
google.protobuf.Timestamp created_timestamp = 15;
// Everything below here is for native histograms (also known as sparse histograms).
// Native histograms are an experimental feature without stability guarantees.
// schema defines the bucket schema. Currently, valid numbers are -4 <= n <= 8.
// They are all for base-2 bucket schemas, where 1 is a bucket boundary in each case, and
// then each power of two is divided into 2^n logarithmic buckets.
// Or in other words, each bucket boundary is the previous boundary times 2^(2^-n).
// In the future, more bucket schemas may be added using numbers < -4 or > 8.
sint32 schema = 5;
double zero_threshold = 6; // Breadth of the zero bucket.
uint64 zero_count = 7; // Count in zero bucket.
double zero_count_float = 8; // Overrides sb_zero_count if > 0.
// Negative buckets for the native histogram.
repeated BucketSpan negative_span = 9;
// Use either "negative_delta" or "negative_count", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_delta = 10; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_count = 11; // Absolute count of each bucket.
// Positive buckets for the native histogram.
// Use a no-op span (offset 0, length 0) for a native histogram without any
// observations yet and with a zero_threshold of 0. Otherwise, it would be
// indistinguishable from a classic histogram.
repeated BucketSpan positive_span = 12;
// Use either "positive_delta" or "positive_count", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_delta = 13; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_count = 14; // Absolute count of each bucket.
// Only used for native histograms. These exemplars MUST have a timestamp.
repeated Exemplar exemplars = 16;
}
message Bucket {
uint64 cumulative_count = 1; // Cumulative in increasing order.
double cumulative_count_float = 4; // Overrides cumulative_count if > 0.
double upper_bound = 2; // Inclusive.
Exemplar exemplar = 3;
}
// A BucketSpan defines a number of consecutive buckets in a native
// histogram with their offset. Logically, it would be more
// straightforward to include the bucket counts in the Span. However,
// the protobuf representation is more compact in the way the data is
// structured here (with all the buckets in a single array separate
// from the Spans).
message BucketSpan {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}
message Exemplar {
repeated LabelPair label = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3; // OpenMetrics-style.
}
message Metric {
repeated LabelPair label = 1;
Gauge gauge = 2;
Counter counter = 3;
Summary summary = 4;
Untyped untyped = 5;
Histogram histogram = 7;
int64 timestamp_ms = 6;
}
message MetricFamily {
string name = 1;
string help = 2;
MetricType type = 3;
repeated Metric metric = 4;
string unit = 5;
}

File diff suppressed because it is too large Load diff

View file

@ -1,168 +0,0 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writev2
import (
math_bits "math/bits"
"slices"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.SizeVT()
if cap(dst) < siz {
dst = make([]byte, siz)
}
n, err := m.OptimizedMarshalToSizedBuffer(dst[:siz])
if err != nil {
return nil, err
}
return dst[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *Request) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
// Removed XXX_unrecognized handling
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
}
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintTypes(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x22
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelsRefs in place without extra allocations.
func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.CreatedTimestamp != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.CreatedTimestamp))
i--
dAtA[i] = 0x30
}
{
size, err := m.Metadata.MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelsRefs) > 0 {
var j10 int
start := i
for _, num := range m.LabelsRefs {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
offset -= sovTypes(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func sovTypes(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

View file

@ -1,97 +0,0 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writev2
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestOptimizedMarshal(t *testing.T) {
for _, tt := range []struct {
name string
m *Request
}{
{
name: "empty",
m: &Request{},
},
{
name: "simple",
m: &Request{
Timeseries: []TimeSeries{
{
LabelsRefs: []uint32{
0, 1,
2, 3,
4, 5,
6, 7,
8, 9,
10, 11,
12, 13,
14, 15,
},
Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []Exemplar{{LabelsRefs: []uint32{0, 1}, Value: 1, Timestamp: 0}},
Histograms: nil,
},
{
LabelsRefs: []uint32{
0, 1,
2, 3,
4, 5,
6, 7,
8, 9,
10, 11,
12, 13,
14, 15,
},
Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []Exemplar{{LabelsRefs: []uint32{0, 1}, Value: 2, Timestamp: 1}},
Histograms: nil,
},
},
Symbols: []string{
"a", "b",
"c", "d",
"e", "f",
"g", "h",
"i", "j",
"k", "l",
"m", "n",
"o", "p",
},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
// Keep the slice allocated to mimic what std Marshal
// would give to sized Marshal.
got := make([]byte, 0)
// Should be the same as the standard marshal.
expected, err := tt.m.Marshal()
require.NoError(t, err)
got, err = tt.m.OptimizedMarshal(got)
require.NoError(t, err)
require.Equal(t, expected, got)
// Unmarshal should work too.
m := &Request{}
require.NoError(t, m.Unmarshal(got))
require.Equal(t, tt.m, m)
})
}
}

View file

@ -17,8 +17,8 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/prompb"
)
@ -37,9 +37,9 @@ func TestInteropV2UnmarshalWithV1_DeterministicEmpty(t *testing.T) {
{
incoming: &Request{
Symbols: []string{"", "__name__", "metric1"},
Timeseries: []TimeSeries{
Timeseries: []*TimeSeries{
{LabelsRefs: []uint32{1, 2}},
{Samples: []Sample{{Value: 21.4, Timestamp: time.Now().UnixMilli()}}},
{Samples: []*Sample{{Value: 21.4, Timestamp: time.Now().UnixMilli()}}},
}, // NOTE: Without reserved fields, proto: illegal wireType 7
},
},
@ -52,10 +52,6 @@ func TestInteropV2UnmarshalWithV1_DeterministicEmpty(t *testing.T) {
out := &prompb.WriteRequest{}
require.NoError(t, proto.Unmarshal(in, out))
// Drop unknowns, we expect them when incoming payload had some fields.
// This field & method will be likely gone after gogo removal.
out.XXX_unrecognized = nil // NOTE: out.XXX_DiscardUnknown() does not work with nullables.
require.Equal(t, expectedV1Empty, out)
})
}
@ -69,10 +65,10 @@ func TestInteropV1UnmarshalWithV2_DeterministicEmpty(t *testing.T) {
},
{
incoming: &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}},
Samples: []prompb.Sample{{Value: 21.4, Timestamp: time.Now().UnixMilli()}},
Labels: []*prompb.Label{{Name: "__name__", Value: "metric1"}},
Samples: []*prompb.Sample{{Value: 21.4, Timestamp: time.Now().UnixMilli()}},
},
},
},
@ -87,10 +83,6 @@ func TestInteropV1UnmarshalWithV2_DeterministicEmpty(t *testing.T) {
out := &Request{}
require.NoError(t, proto.Unmarshal(in, out))
// Drop unknowns, we expect them when incoming payload had some fields.
// This field & method will be likely gone after gogo removal.
out.XXX_unrecognized = nil // NOTE: out.XXX_DiscardUnknown() does not work with nullables.
require.Equal(t, expectedV2Empty, out)
})
}

View file

@ -30,7 +30,7 @@ func TestToLabels(t *testing.T) {
expected := labels.FromStrings("__name__", "metric1", "foo", "bar")
t.Run("v1", func(t *testing.T) {
ts := prompb.TimeSeries{Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}, {Name: "foo", Value: "bar"}}}
ts := prompb.TimeSeries{Labels: []*prompb.Label{{Name: "__name__", Value: "metric1"}, {Name: "foo", Value: "bar"}}}
b := labels.NewScratchBuilder(2)
require.Equal(t, expected, ts.ToLabels(&b, nil))
require.Equal(t, ts.Labels, prompb.FromLabels(expected, nil))
@ -86,41 +86,41 @@ func TestToMetadata(t *testing.T) {
sym := writev2.NewSymbolTable()
for _, tc := range []struct {
input writev2.Metadata
expected metadata.Metadata
input *writev2.Metadata
expected *metadata.Metadata
}{
{
input: writev2.Metadata{},
expected: metadata.Metadata{
input: &writev2.Metadata{},
expected: &metadata.Metadata{
Type: model.MetricTypeUnknown,
},
},
{
input: writev2.Metadata{
input: &writev2.Metadata{
Type: 12414, // Unknown.
},
expected: metadata.Metadata{
expected: &metadata.Metadata{
Type: model.MetricTypeUnknown,
},
},
{
input: writev2.Metadata{
input: &writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER,
HelpRef: sym.Symbolize("help1"),
UnitRef: sym.Symbolize("unit1"),
},
expected: metadata.Metadata{
expected: &metadata.Metadata{
Type: model.MetricTypeCounter,
Help: "help1",
Unit: "unit1",
},
},
{
input: writev2.Metadata{
input: &writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_STATESET,
HelpRef: sym.Symbolize("help2"),
},
expected: metadata.Metadata{
expected: &metadata.Metadata{
Type: model.MetricTypeStateset,
Help: "help2",
},

View file

@ -17,7 +17,7 @@ import (
"bytes"
"encoding/binary"
"github.com/gogo/protobuf/proto"
"google.golang.org/protobuf/proto"
// Intentionally using client model to simulate client in tests.
dto "github.com/prometheus/client_model/go"

View file

@ -24,9 +24,9 @@ import (
"sync"
"testing"
"github.com/gogo/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"

View file

@ -26,11 +26,11 @@ import (
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2"

View file

@ -32,13 +32,13 @@ import (
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"

View file

@ -18,7 +18,6 @@
package tools
import (
_ "github.com/gogo/protobuf/protoc-gen-gogofast"
_ "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway"
_ "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger"
_ "golang.org/x/tools/cmd/goimports"

View file

@ -23,7 +23,7 @@ import (
"io"
"net/http"
"github.com/gogo/protobuf/proto"
"google.golang.org/protobuf/proto"
)
// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.

View file

@ -23,7 +23,6 @@ import (
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
@ -33,6 +32,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"

View file

@ -24,10 +24,10 @@ import (
"sort"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"

View file

@ -20,9 +20,9 @@ import (
"testing"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -50,30 +50,30 @@ var (
}
writeRequestFixture = &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 1}},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, &testHistogram), prompb.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
Samples: []*prompb.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 1}},
Histograms: []*prompb.Histogram{prompb.FromIntHistogram(1, &testHistogram), prompb.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 2}},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(3, &testHistogram), prompb.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
Samples: []*prompb.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 2}},
Histograms: []*prompb.Histogram{prompb.FromIntHistogram(3, &testHistogram), prompb.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
},
},
}
@ -93,30 +93,30 @@ var (
// NOTE: Use TestWriteV2RequestFixture and copy the diff to regenerate if needed.
writeV2RequestFixture = &writev2.Request{
Symbols: []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
Timeseries: []writev2.TimeSeries{
Timeseries: []*writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Metadata: writev2.Metadata{
Metadata: &writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE, // writeV2RequestSeries1Metadata.Type.
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
Samples: []*writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []*writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
Histograms: []*writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
},
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first.
Metadata: writev2.Metadata{
Metadata: &writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries2Metadata.Type.
HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help.
// No unit.
},
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
Samples: []*writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []*writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}},
Histograms: []*writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
},
},
}
@ -130,28 +130,28 @@ func TestWriteV2RequestFixture(t *testing.T) {
exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[1].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
expected := &writev2.Request{
Timeseries: []writev2.TimeSeries{
Timeseries: []*writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Metadata: &writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help),
UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit),
},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
Samples: []*writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []*writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}},
Histograms: []*writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
},
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Metadata: &writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER,
HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help),
// No unit.
},
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
Samples: []*writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []*writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}},
Histograms: []*writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
},
},
Symbols: st.Symbols(),
@ -162,12 +162,12 @@ func TestWriteV2RequestFixture(t *testing.T) {
func TestValidateLabelsAndMetricName(t *testing.T) {
tests := []struct {
input []prompb.Label
input []*prompb.Label
expectedErr string
description string
}{
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name"},
{Name: "labelName", Value: "labelValue"},
},
@ -175,7 +175,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "regular labels",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name"},
{Name: "_labelName", Value: "labelValue"},
},
@ -183,7 +183,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "label name with _",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name"},
{Name: "@labelName", Value: "labelValue"},
},
@ -191,7 +191,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "label name with @",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name"},
{Name: "123labelName", Value: "labelValue"},
},
@ -199,7 +199,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "label name starts with numbers",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name"},
{Name: "", Value: "labelValue"},
},
@ -207,7 +207,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "label name is empty string",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name"},
{Name: "labelName", Value: string([]byte{0xff})},
},
@ -215,14 +215,14 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "label value is an invalid UTF-8 value",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "@invalid_name"},
},
expectedErr: "invalid metric name: @invalid_name",
description: "metric name starts with @",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "__name__", Value: "name1"},
{Name: "__name__", Value: "name2"},
},
@ -230,7 +230,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "duplicate label names",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "label1", Value: "name"},
{Name: "label2", Value: "name"},
},
@ -238,7 +238,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
description: "duplicate label values",
},
{
input: []prompb.Label{
input: []*prompb.Label{
{Name: "", Value: "name"},
{Name: "label2", Value: "name"},
},
@ -263,11 +263,11 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
func TestConcreteSeriesSet(t *testing.T) {
series1 := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
floats: []prompb.Sample{{Value: 1, Timestamp: 2}},
floats: []*prompb.Sample{{Value: 1, Timestamp: 2}},
}
series2 := &concreteSeries{
labels: labels.FromStrings("foo", "baz"),
floats: []prompb.Sample{{Value: 3, Timestamp: 4}},
floats: []*prompb.Sample{{Value: 3, Timestamp: 4}},
}
c := &concreteSeriesSet{
series: []storage.Series{series1, series2},
@ -297,7 +297,7 @@ func TestConcreteSeriesClonesLabels(t *testing.T) {
func TestConcreteSeriesIterator_FloatSamples(t *testing.T) {
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
floats: []prompb.Sample{
floats: []*prompb.Sample{
{Value: 1, Timestamp: 1},
{Value: 1.5, Timestamp: 1},
{Value: 2, Timestamp: 2},
@ -345,7 +345,7 @@ func TestConcreteSeriesIterator_FloatSamples(t *testing.T) {
func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
histograms := tsdbutil.GenerateTestHistograms(5)
histProtos := make([]prompb.Histogram, len(histograms))
histProtos := make([]*prompb.Histogram, len(histograms))
for i, h := range histograms {
// Results in ts sequence of 1, 1, 2, 3, 4.
var ts int64
@ -402,7 +402,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
// Series starts as histograms, then transitions to floats at ts=8 (with an overlap from ts=8 to ts=10), then
// transitions back to histograms at ts=16.
histograms := tsdbutil.GenerateTestHistograms(15)
histProtos := make([]prompb.Histogram, len(histograms))
histProtos := make([]*prompb.Histogram, len(histograms))
for i, h := range histograms {
if i < 10 {
histProtos[i] = prompb.FromIntHistogram(int64(i+1), h)
@ -412,7 +412,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
}
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
floats: []prompb.Sample{
floats: []*prompb.Sample{
{Value: 1, Timestamp: 8},
{Value: 2, Timestamp: 9},
{Value: 3, Timestamp: 10},
@ -504,11 +504,11 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
func TestFromQueryResultWithDuplicates(t *testing.T) {
ts1 := prompb.TimeSeries{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "foo", Value: "bar"},
{Name: "foo", Value: "def"},
},
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{Value: 0.0, Timestamp: 0},
},
}
@ -554,17 +554,17 @@ func TestNegotiateResponseType(t *testing.T) {
func TestMergeLabels(t *testing.T) {
for _, tc := range []struct {
primary, secondary, expected []prompb.Label
primary, secondary, expected []*prompb.Label
}{
{
primary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
secondary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
primary: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
secondary: []*prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
expected: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
},
{
primary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
secondary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
primary: []*prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
secondary: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
expected: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
},
} {
require.Equal(t, tc.expected, MergeLabels(tc.primary, tc.secondary))
@ -592,21 +592,21 @@ func TestDecodeWriteV2Request(t *testing.T) {
func TestStreamResponse(t *testing.T) {
lbs1 := prompb.FromLabels(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
lbs2 := prompb.FromLabels(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
chunk := prompb.Chunk{
chunk := &prompb.Chunk{
Type: prompb.Chunk_XOR,
Data: make([]byte, 100),
}
lbSize, chunkSize := 0, chunk.Size()
lbSize, chunkSize := 0, chunk.SizeVT()
for _, lb := range lbs1 {
lbSize += lb.Size()
lbSize += lb.SizeVT()
}
maxBytesInFrame := lbSize + chunkSize*2
testData := []*prompb.ChunkedSeries{{
Labels: lbs1,
Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk},
Chunks: []*prompb.Chunk{chunk, chunk, chunk, chunk},
}, {
Labels: lbs2,
Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk},
Chunks: []*prompb.Chunk{chunk, chunk, chunk, chunk},
}}
css := newMockChunkSeriesSet(testData)
writer := mockWriter{}
@ -619,16 +619,16 @@ func TestStreamResponse(t *testing.T) {
require.NoError(t, err)
expectData := []*prompb.ChunkedSeries{{
Labels: lbs1,
Chunks: []prompb.Chunk{chunk, chunk},
Chunks: []*prompb.Chunk{chunk, chunk},
}, {
Labels: lbs1,
Chunks: []prompb.Chunk{chunk, chunk},
Chunks: []*prompb.Chunk{chunk, chunk},
}, {
Labels: lbs2,
Chunks: []prompb.Chunk{chunk, chunk},
Chunks: []*prompb.Chunk{chunk, chunk},
}, {
Labels: lbs2,
Chunks: []prompb.Chunk{chunk, chunk},
Chunks: []*prompb.Chunk{chunk, chunk},
}}
require.Equal(t, expectData, writer.actual)
}
@ -680,7 +680,7 @@ func (c *mockChunkSeriesSet) Err() error {
}
type mockChunkIterator struct {
chunks []prompb.Chunk
chunks []*prompb.Chunk
index int
}

View file

@ -13,9 +13,10 @@
package prometheusremotewrite
import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"github.com/prometheus/prometheus/prompb"
@ -155,7 +156,9 @@ func TestCreateAttributes(t *testing.T) {
}
lbls := createAttributes(resource, attrs, settings, nil, false)
assert.ElementsMatch(t, lbls, tc.expectedLabels)
for i := range lbls {
require.True(t, proto.Equal(lbls[i], &tc.expectedLabels[i]))
}
})
}
}

View file

@ -24,7 +24,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -1537,6 +1537,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
batchQueue := queue.Chan()
pendingData := make([]*prompb.TimeSeries, max)
for i := range pendingData {
pendingData[i] = &prompb.TimeSeries{}
pendingData[i].Samples = []*prompb.Sample{{}}
if s.qm.sendExemplars {
pendingData[i].Exemplars = []*prompb.Exemplar{{}}
@ -1544,6 +1545,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
pendingDataV2 := make([]*writev2.TimeSeries, max)
for i := range pendingDataV2 {
pendingDataV2[i] = &writev2.TimeSeries{}
pendingDataV2[i].Samples = []*writev2.Sample{{}}
}
@ -1945,6 +1947,7 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
// todo: should we also safeguard against empty metadata here?
if d.metadata != nil {
pendingData[nPending].Metadata = &writev2.Metadata{}
pendingData[nPending].Metadata.Type = writev2.FromMetadataType(d.metadata.Type)
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Help)
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Unit)
@ -2201,7 +2204,7 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label
pBuf = &[]byte{} // For convenience in tests. Not efficient.
}
data, err := req.OptimizedMarshal(*pBuf)
data, err := req.MarshalVT()
if err != nil {
return nil, highest, lowest, err
}

View file

@ -29,7 +29,7 @@ import (
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
@ -803,7 +803,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled")
}
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
func createTimeseries(numSamples, numSeries int, extraLabels ...*labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.NewScratchBuilder(1 + len(extraLabels))
@ -834,16 +834,16 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
return samples, series
}
func createProtoTimeseriesWithOld(numSamples, baseTs int64, extraLabels ...labels.Label) []prompb.TimeSeries {
samples := make([]prompb.TimeSeries, numSamples)
func createProtoTimeseriesWithOld(numSamples, baseTs int64, extraLabels ...*labels.Label) []*prompb.TimeSeries {
samples := make([]*prompb.TimeSeries, numSamples)
// use a fixed rand source so tests are consistent
r := rand.New(rand.NewSource(99))
for j := int64(0); j < numSamples; j++ {
name := fmt.Sprintf("test_metric_%d", j)
samples[j] = prompb.TimeSeries{
Labels: []prompb.Label{{Name: "__name__", Value: name}},
Samples: []prompb.Sample{
samples[j] = &prompb.TimeSeries{
Labels: []*prompb.Label{{Name: "__name__", Value: name}},
Samples: []*prompb.Sample{
{
Timestamp: baseTs + j,
Value: float64(j),
@ -947,15 +947,15 @@ func getSeriesIDFromRef(r record.RefSeries) string {
// TestWriteClient represents write client which does not call remote storage,
// but instead re-implements fake WriteHandler for test purposes.
type TestWriteClient struct {
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
receivedExemplars map[string][]prompb.Exemplar
expectedExemplars map[string][]prompb.Exemplar
receivedHistograms map[string][]prompb.Histogram
receivedFloatHistograms map[string][]prompb.Histogram
expectedHistograms map[string][]prompb.Histogram
expectedFloatHistograms map[string][]prompb.Histogram
receivedMetadata map[string][]prompb.MetricMetadata
receivedSamples map[string][]*prompb.Sample
expectedSamples map[string][]*prompb.Sample
receivedExemplars map[string][]*prompb.Exemplar
expectedExemplars map[string][]*prompb.Exemplar
receivedHistograms map[string][]*prompb.Histogram
receivedFloatHistograms map[string][]*prompb.Histogram
expectedHistograms map[string][]*prompb.Histogram
expectedFloatHistograms map[string][]*prompb.Histogram
receivedMetadata map[string][]*prompb.MetricMetadata
writesReceived int
mtx sync.Mutex
buf []byte
@ -972,9 +972,9 @@ type TestWriteClient struct {
// NewTestWriteClient creates a new testing write client.
func NewTestWriteClient(protoMsg config.RemoteWriteProtoMsg) *TestWriteClient {
return &TestWriteClient{
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
receivedSamples: map[string][]*prompb.Sample{},
expectedSamples: map[string][]*prompb.Sample{},
receivedMetadata: map[string][]*prompb.MetricMetadata{},
protoMsg: protoMsg,
storeWait: 0,
returnError: nil,
@ -991,12 +991,12 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedSamples = map[string][]prompb.Sample{}
c.receivedSamples = map[string][]prompb.Sample{}
c.expectedSamples = map[string][]*prompb.Sample{}
c.receivedSamples = map[string][]*prompb.Sample{}
for _, s := range ss {
tsID := getSeriesIDFromRef(series[s.Ref])
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], &prompb.Sample{
Timestamp: s.T,
Value: s.V,
})
@ -1007,12 +1007,12 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedExemplars = map[string][]prompb.Exemplar{}
c.receivedExemplars = map[string][]prompb.Exemplar{}
c.expectedExemplars = map[string][]*prompb.Exemplar{}
c.receivedExemplars = map[string][]*prompb.Exemplar{}
for _, s := range ss {
tsID := getSeriesIDFromRef(series[s.Ref])
e := prompb.Exemplar{
e := &prompb.Exemplar{
Labels: prompb.FromLabels(s.Labels, nil),
Timestamp: s.T,
Value: s.V,
@ -1025,8 +1025,8 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedHistograms = map[string][]prompb.Histogram{}
c.receivedHistograms = map[string][]prompb.Histogram{}
c.expectedHistograms = map[string][]*prompb.Histogram{}
c.receivedHistograms = map[string][]*prompb.Histogram{}
for _, h := range hh {
tsID := getSeriesIDFromRef(series[h.Ref])
@ -1038,8 +1038,8 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedFloatHistograms = map[string][]prompb.Histogram{}
c.receivedFloatHistograms = map[string][]prompb.Histogram{}
c.expectedFloatHistograms = map[string][]*prompb.Histogram{}
c.receivedFloatHistograms = map[string][]*prompb.Histogram{}
for _, fh := range fhs {
tsID := getSeriesIDFromRef(series[fh.Ref])
@ -1193,24 +1193,26 @@ func (c *TestWriteClient) Endpoint() string {
func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)),
Timeseries: make([]*prompb.TimeSeries, len(v2Req.Timeseries)),
// TODO handle metadata?
}
b := labels.NewScratchBuilder(0)
for i, rts := range v2Req.Timeseries {
rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
req.Timeseries[i] = &prompb.TimeSeries{}
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, &prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
exemplars := make([]*prompb.Exemplar, len(rts.Exemplars))
for j, e := range rts.Exemplars {
exemplars[j] = &prompb.Exemplar{}
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
exemplars[j].Labels = append(exemplars[j].Labels, &prompb.Label{
Name: l.Name,
Value: l.Value,
})
@ -1218,13 +1220,13 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro
}
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
req.Timeseries[i].Samples = make([]*prompb.Sample, len(rts.Samples))
for j, s := range rts.Samples {
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
req.Timeseries[i].Samples[j].Value = s.Value
}
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
req.Timeseries[i].Histograms = make([]*prompb.Histogram, len(rts.Histograms))
for j, h := range rts.Histograms {
if h.IsFloatHistogram() {
req.Timeseries[i].Histograms[j] = prompb.FromFloatHistogram(h.Timestamp, h.ToFloatHistogram())
@ -1289,7 +1291,7 @@ func (c *MockWriteClient) Name() string { return c.NameFunc() }
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
var extraLabels []labels.Label = []labels.Label{
var extraLabels []*labels.Label = []*labels.Label{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
@ -1849,10 +1851,11 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
noopLogger := log.NewNopLogger()
bench := func(b *testing.B, batch []timeSeries) {
buff := make([]byte, 0)
seriesBuff := make([]prompb.TimeSeries, len(batch))
seriesBuff := make([]*prompb.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
seriesBuff[i] = &prompb.TimeSeries{}
seriesBuff[i].Samples = []*prompb.Sample{{}}
seriesBuff[i].Exemplars = []*prompb.Exemplar{{}}
}
pBuf := proto.NewBuffer(nil)
@ -1905,10 +1908,10 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) {
for _, tc := range testCases {
symbolTable := writev2.NewSymbolTable()
buff := make([]byte, 0)
seriesBuff := make([]writev2.TimeSeries, len(tc.batch))
seriesBuff := make([]*writev2.TimeSeries, len(tc.batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []writev2.Sample{{}}
seriesBuff[i].Exemplars = []writev2.Exemplar{{}}
seriesBuff[i].Samples = []*writev2.Sample{{}}
seriesBuff[i].Exemplars = []*writev2.Exemplar{{}}
}
pBuf := []byte{}
@ -1989,7 +1992,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
m.Start()
batchID := 0
expectedSamples := map[string][]prompb.Sample{}
expectedSamples := map[string][]*prompb.Sample{}
appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) {
t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped)
@ -2000,7 +2003,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
if !shouldBeDropped {
for _, s := range samples {
tsID := getSeriesIDFromRef(series[s.Ref])
expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
expectedSamples[tsID] = append(c.expectedSamples[tsID], &prompb.Sample{
Timestamp: s.T,
Value: s.V,
})
@ -2097,15 +2100,15 @@ func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...la
return samples, newSamples, series
}
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {
func filterTsLimit(limit int64, ts *prompb.TimeSeries) bool {
return limit > ts.Samples[0].Timestamp
}
func TestBuildTimeSeries(t *testing.T) {
testCases := []struct {
name string
ts []prompb.TimeSeries
filter func(ts prompb.TimeSeries) bool
ts []*prompb.TimeSeries
filter func(ts *prompb.TimeSeries) bool
lowestTs int64
highestTs int64
droppedSamples int
@ -2113,9 +2116,9 @@ func TestBuildTimeSeries(t *testing.T) {
}{
{
name: "No filter applied",
ts: []prompb.TimeSeries{
ts: []*prompb.TimeSeries{
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
@ -2123,7 +2126,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
@ -2131,7 +2134,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.34,
@ -2146,9 +2149,9 @@ func TestBuildTimeSeries(t *testing.T) {
},
{
name: "Filter applied, samples in order",
ts: []prompb.TimeSeries{
ts: []*prompb.TimeSeries{
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
@ -2156,7 +2159,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
@ -2164,7 +2167,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
@ -2172,7 +2175,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567893,
Value: 3.45,
@ -2180,7 +2183,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
filter: func(ts *prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
@ -2188,9 +2191,9 @@ func TestBuildTimeSeries(t *testing.T) {
},
{
name: "Filter applied, samples out of order",
ts: []prompb.TimeSeries{
ts: []*prompb.TimeSeries{
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
@ -2198,7 +2201,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
@ -2206,7 +2209,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567893,
Value: 3.45,
@ -2214,7 +2217,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
@ -2222,7 +2225,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
filter: func(ts *prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
@ -2230,9 +2233,9 @@ func TestBuildTimeSeries(t *testing.T) {
},
{
name: "Filter applied, samples not consecutive",
ts: []prompb.TimeSeries{
ts: []*prompb.TimeSeries{
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
@ -2240,7 +2243,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
@ -2248,7 +2251,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567895,
Value: 6.78,
@ -2256,7 +2259,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
{
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{
Timestamp: 1234567897,
Value: 6.78,
@ -2264,7 +2267,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
filter: func(ts *prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
responseLen: 2,
lowestTs: 1234567895,
highestTs: 1234567897,
@ -2288,7 +2291,7 @@ func TestBuildTimeSeries(t *testing.T) {
func BenchmarkBuildTimeSeries(b *testing.B) {
// Send one sample per series, which is the typical remote_write case
const numSamples = 10000
filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
filter := func(ts *prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
for i := 0; i < b.N; i++ {
samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...)
_, _, result, _, _, _ := buildTimeSeries(samples, filter)

View file

@ -23,9 +23,9 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
@ -102,14 +102,14 @@ func TestSampledReadEndpoint(t *testing.T) {
require.Equal(t, &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
},
},
}, resp.Results[0])
@ -117,13 +117,13 @@ func TestSampledReadEndpoint(t *testing.T) {
require.Equal(t, &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_histogram_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
},
Histograms: []prompb.Histogram{
Histograms: []*prompb.Histogram{
prompb.FromFloatHistogram(0, tsdbutil.GenerateTestFloatHistogram(0)),
},
},
@ -297,14 +297,14 @@ func TestStreamReadEndpoint(t *testing.T) {
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar1"},
},
Chunks: []prompb.Chunk{
Chunks: []*prompb.Chunk{
{
Type: prompb.Chunk_XOR,
MaxTimeMs: 7140000,
@ -317,14 +317,14 @@ func TestStreamReadEndpoint(t *testing.T) {
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar2"},
},
Chunks: []prompb.Chunk{
Chunks: []*prompb.Chunk{
{
Type: prompb.Chunk_XOR,
MaxTimeMs: 7140000,
@ -343,14 +343,14 @@ func TestStreamReadEndpoint(t *testing.T) {
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar3"},
},
Chunks: []prompb.Chunk{
Chunks: []*prompb.Chunk{
{
Type: prompb.Chunk_XOR,
MaxTimeMs: 7140000,
@ -369,14 +369,14 @@ func TestStreamReadEndpoint(t *testing.T) {
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar3"},
},
Chunks: []prompb.Chunk{
Chunks: []*prompb.Chunk{
{
Type: prompb.Chunk_XOR,
MinTimeMs: 14400000,
@ -390,14 +390,14 @@ func TestStreamReadEndpoint(t *testing.T) {
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar1"},
},
Chunks: []prompb.Chunk{
Chunks: []*prompb.Chunk{
{
Type: prompb.Chunk_XOR,
MaxTimeMs: 7140000,
@ -411,13 +411,13 @@ func TestStreamReadEndpoint(t *testing.T) {
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_histogram_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
},
Chunks: []prompb.Chunk{
Chunks: []*prompb.Chunk{
{
Type: prompb.Chunk_FLOAT_HISTOGRAM,
MaxTimeMs: 1440000,

View file

@ -239,9 +239,9 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
m := &mockedRemoteClient{
// Samples does not matter for below tests.
store: []*prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "a", Value: "b"}}},
{Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
{Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
{Labels: []*prompb.Label{{Name: "a", Value: "b"}}},
{Labels: []*prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
{Labels: []*prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
},
b: labels.NewScratchBuilder(0),
}

View file

@ -24,10 +24,10 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"

View file

@ -28,9 +28,9 @@ import (
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
@ -305,7 +305,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
// V2 supports partial writes for non-retriable errors, so test them.
for _, tc := range []struct {
desc string
input []writev2.TimeSeries
input []*writev2.TimeSeries
expectedCode int
expectedRespBody string
@ -324,7 +324,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
desc: "Partial write; first series with invalid labels (no metric name)",
input: append(
// Series with test_metric1="test_metric1" labels.
[]writev2.TimeSeries{{LabelsRefs: []uint32{2, 2}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}},
[]*writev2.TimeSeries{{LabelsRefs: []uint32{2, 2}, Samples: []*writev2.Sample{{Value: 1, Timestamp: 1}}}},
writeV2RequestFixture.Timeseries...),
expectedCode: http.StatusBadRequest,
expectedRespBody: "invalid metric name or labels, got {test_metric1=\"test_metric1\"}\n",
@ -333,16 +333,16 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
desc: "Partial write; first series with invalid labels (empty metric name)",
input: append(
// Series with __name__="" labels.
[]writev2.TimeSeries{{LabelsRefs: []uint32{1, 0}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}},
[]*writev2.TimeSeries{{LabelsRefs: []uint32{1, 0}, Samples: []*writev2.Sample{{Value: 1, Timestamp: 1}}}},
writeV2RequestFixture.Timeseries...),
expectedCode: http.StatusBadRequest,
expectedRespBody: "invalid metric name or labels, got {__name__=\"\"}\n",
},
{
desc: "Partial write; first series with one OOO sample",
input: func() []writev2.TimeSeries {
input: func() []*writev2.TimeSeries {
f := proto.Clone(writeV2RequestFixture).(*writev2.Request)
f.Timeseries[0].Samples = append(f.Timeseries[0].Samples, writev2.Sample{Value: 2, Timestamp: 0})
f.Timeseries[0].Samples = append(f.Timeseries[0].Samples, &writev2.Sample{Value: 2, Timestamp: 0})
return f.Timeseries
}(),
expectedCode: http.StatusBadRequest,
@ -350,7 +350,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
},
{
desc: "Partial write; first series with one dup sample",
input: func() []writev2.TimeSeries {
input: func() []*writev2.TimeSeries {
f := proto.Clone(writeV2RequestFixture).(*writev2.Request)
f.Timeseries[0].Samples = append(f.Timeseries[0].Samples, f.Timeseries[0].Samples[0])
return f.Timeseries
@ -360,7 +360,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
},
{
desc: "Partial write; first series with one OOO histogram sample",
input: func() []writev2.TimeSeries {
input: func() []*writev2.TimeSeries {
f := proto.Clone(writeV2RequestFixture).(*writev2.Request)
f.Timeseries[0].Histograms = append(f.Timeseries[0].Histograms, writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil)))
return f.Timeseries
@ -370,7 +370,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
},
{
desc: "Partial write; first series with one dup histogram sample",
input: func() []writev2.TimeSeries {
input: func() []*writev2.TimeSeries {
f := proto.Clone(writeV2RequestFixture).(*writev2.Request)
f.Timeseries[0].Histograms = append(f.Timeseries[0].Histograms, f.Timeseries[0].Histograms[1])
return f.Timeseries
@ -525,9 +525,9 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
},
} {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: tc.Timestamp}},
payload, _, _, err := buildWriteRequest(nil, []*prompb.TimeSeries{{
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []*prompb.Sample{{Value: 1, Timestamp: tc.Timestamp}},
}}, nil, nil, nil, nil, "snappy")
require.NoError(t, err)
@ -567,9 +567,9 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) {
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: tc.Timestamp}},
payload, _, _, err := buildWriteRequest(nil, []*prompb.TimeSeries{{
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: tc.Timestamp}},
}}, nil, nil, nil, nil, "snappy")
require.NoError(t, err)
@ -605,9 +605,9 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
},
} {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(tc.Timestamp, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
payload, _, _, err := buildWriteRequest(nil, []*prompb.TimeSeries{{
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []*prompb.Histogram{prompb.FromIntHistogram(tc.Timestamp, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
}}, nil, nil, nil, nil, "snappy")
require.NoError(t, err)
@ -631,12 +631,12 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
var reqs []*http.Request
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{
buf, _, _, err := buildWriteRequest(nil, []*prompb.TimeSeries{{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram)},
Histograms: []*prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram)},
}}, nil, nil, nil, nil, "snappy")
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -746,12 +746,12 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
}
}
func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries {
var series []prompb.TimeSeries
func genSeriesWithSample(numSeries int, ts int64) []*prompb.TimeSeries {
var series []*prompb.TimeSeries
for i := 0; i < numSeries; i++ {
s := prompb.TimeSeries{
Labels: []prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}},
Samples: []prompb.Sample{{Value: float64(i), Timestamp: ts}},
s := &prompb.TimeSeries{
Labels: []*prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}},
Samples: []*prompb.Sample{{Value: float64(i), Timestamp: ts}},
}
series = append(series, s)
}

View file

@ -87,7 +87,7 @@ func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels m
}
func toTimeseries(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) {
var ts *prompb.TimeSeries
ts := &prompb.TimeSeries{}
ts.Labels = makeLabels(labels)
ts.Samples = []*prompb.Sample{
{

View file

@ -23,7 +23,7 @@ import (
)
var writeRequestFixture = &prompb.WriteRequest{
Metadata: []prompb.MetricMetadata{
Metadata: []*prompb.MetricMetadata{
{
MetricFamilyName: "http_request_duration_seconds",
Type: 3,
@ -45,111 +45,111 @@ var writeRequestFixture = &prompb.WriteRequest{
Help: "This is a test metric.",
},
},
Timeseries: []prompb.TimeSeries{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds_bucket"},
{Name: "job", Value: "promtool"},
{Name: "le", Value: "0.1"},
},
Samples: []prompb.Sample{{Value: 33444, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 33444, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds_bucket"},
{Name: "job", Value: "promtool"},
{Name: "le", Value: "0.5"},
},
Samples: []prompb.Sample{{Value: 129389, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 129389, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds_bucket"},
{Name: "job", Value: "promtool"},
{Name: "le", Value: "1"},
},
Samples: []prompb.Sample{{Value: 133988, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 133988, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds_bucket"},
{Name: "job", Value: "promtool"},
{Name: "le", Value: "+Inf"},
},
Samples: []prompb.Sample{{Value: 144320, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 144320, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds_sum"},
{Name: "job", Value: "promtool"},
},
Samples: []prompb.Sample{{Value: 53423, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 53423, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds_count"},
{Name: "job", Value: "promtool"},
},
Samples: []prompb.Sample{{Value: 144320, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 144320, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_requests_total"},
{Name: "code", Value: "200"},
{Name: "job", Value: "promtool"},
{Name: "method", Value: "post"},
},
Samples: []prompb.Sample{{Value: 1027, Timestamp: 1395066363000}},
Samples: []*prompb.Sample{{Value: 1027, Timestamp: 1395066363000}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "http_requests_total"},
{Name: "code", Value: "400"},
{Name: "job", Value: "promtool"},
{Name: "method", Value: "post"},
},
Samples: []prompb.Sample{{Value: 3, Timestamp: 1395066363000}},
Samples: []*prompb.Sample{{Value: 3, Timestamp: 1395066363000}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "rpc_duration_seconds"},
{Name: "job", Value: "promtool"},
{Name: "quantile", Value: "0.01"},
},
Samples: []prompb.Sample{{Value: 3102, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 3102, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "rpc_duration_seconds"},
{Name: "job", Value: "promtool"},
{Name: "quantile", Value: "0.5"},
},
Samples: []prompb.Sample{{Value: 4773, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 4773, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "rpc_duration_seconds"},
{Name: "job", Value: "promtool"},
{Name: "quantile", Value: "0.99"},
},
Samples: []prompb.Sample{{Value: 76656, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 76656, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "rpc_duration_seconds_sum"},
{Name: "job", Value: "promtool"},
},
Samples: []prompb.Sample{{Value: 1.7560473e+07, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 1.7560473e+07, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "rpc_duration_seconds_count"},
{Name: "job", Value: "promtool"},
},
Samples: []prompb.Sample{{Value: 2693, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 2693, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
@ -157,10 +157,10 @@ var writeRequestFixture = &prompb.WriteRequest{
{Name: "foo", Value: "bar"},
{Name: "job", Value: "promtool"},
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 1, Timestamp: 1}},
},
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
@ -168,7 +168,7 @@ var writeRequestFixture = &prompb.WriteRequest{
{Name: "foo", Value: "bar"},
{Name: "job", Value: "promtool"},
},
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Samples: []*prompb.Sample{{Value: 2, Timestamp: 1}},
},
},
}

View file

@ -22,11 +22,11 @@ import (
"strings"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/proto"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"