mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
3a82cd5a7e
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
* Add streaming remote read to ReadClient Signed-off-by: Justin Lei <justin.lei@grafana.com> * Apply suggestions from code review Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Justin Lei <justin.lei@grafana.com> * Remote read instrumentation tweaks Signed-off-by: Justin Lei <lei.justin@gmail.com> * Minor cleanups Signed-off-by: Justin Lei <lei.justin@gmail.com> * In-line handleChunkedResponse Signed-off-by: Justin Lei <lei.justin@gmail.com> * Fix lints Signed-off-by: Justin Lei <lei.justin@gmail.com> * Explicitly call cancel() when needed Signed-off-by: Justin Lei <lei.justin@gmail.com> * Update chunkedSeries, chunkedSeriesIterator for new interfaces Signed-off-by: Justin Lei <lei.justin@gmail.com> * Adapt remote.chunkedSeries to use prompb.ChunkedSeries Signed-off-by: Justin Lei <lei.justin@gmail.com> * Fix lint Signed-off-by: Justin Lei <lei.justin@gmail.com> --------- Signed-off-by: Justin Lei <justin.lei@grafana.com> Signed-off-by: Justin Lei <lei.justin@gmail.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
977 lines
28 KiB
Go
977 lines
28 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/metadata"
|
|
"github.com/prometheus/prometheus/prompb"
|
|
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
|
"github.com/prometheus/prometheus/util/annotations"
|
|
)
|
|
|
|
var (
|
|
testHistogram = histogram.Histogram{
|
|
Schema: 2,
|
|
ZeroThreshold: 1e-128,
|
|
ZeroCount: 0,
|
|
Count: 0,
|
|
Sum: 20,
|
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
PositiveBuckets: []int64{1},
|
|
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
NegativeBuckets: []int64{-1},
|
|
}
|
|
|
|
writeRequestFixture = &prompb.WriteRequest{
|
|
Timeseries: []prompb.TimeSeries{
|
|
{
|
|
Labels: []prompb.Label{
|
|
{Name: "__name__", Value: "test_metric1"},
|
|
{Name: "b", Value: "c"},
|
|
{Name: "baz", Value: "qux"},
|
|
{Name: "d", Value: "e"},
|
|
{Name: "foo", Value: "bar"},
|
|
},
|
|
Samples: []prompb.Sample{{Value: 1, Timestamp: 1}},
|
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 1}},
|
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, &testHistogram), prompb.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
|
|
},
|
|
{
|
|
Labels: []prompb.Label{
|
|
{Name: "__name__", Value: "test_metric1"},
|
|
{Name: "b", Value: "c"},
|
|
{Name: "baz", Value: "qux"},
|
|
{Name: "d", Value: "e"},
|
|
{Name: "foo", Value: "bar"},
|
|
},
|
|
Samples: []prompb.Sample{{Value: 2, Timestamp: 2}},
|
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 2}},
|
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(3, &testHistogram), prompb.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
|
|
},
|
|
},
|
|
}
|
|
|
|
writeV2RequestSeries1Metadata = metadata.Metadata{
|
|
Type: model.MetricTypeGauge,
|
|
Help: "Test gauge for test purposes",
|
|
Unit: "Maybe op/sec who knows (:",
|
|
}
|
|
writeV2RequestSeries2Metadata = metadata.Metadata{
|
|
Type: model.MetricTypeCounter,
|
|
Help: "Test counter for test purposes",
|
|
}
|
|
|
|
// writeV2RequestFixture represents the same request as writeRequestFixture,
|
|
// but using the v2 representation, plus includes writeV2RequestSeries1Metadata and writeV2RequestSeries2Metadata.
|
|
// NOTE: Use TestWriteV2RequestFixture and copy the diff to regenerate if needed.
|
|
writeV2RequestFixture = &writev2.Request{
|
|
Symbols: []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
|
|
Timeseries: []writev2.TimeSeries{
|
|
{
|
|
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
|
|
Metadata: writev2.Metadata{
|
|
Type: writev2.Metadata_METRIC_TYPE_GAUGE, // writeV2RequestSeries1Metadata.Type.
|
|
|
|
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
|
|
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
|
|
},
|
|
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
|
|
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
|
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
|
|
},
|
|
{
|
|
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first.
|
|
Metadata: writev2.Metadata{
|
|
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries2Metadata.Type.
|
|
|
|
HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help.
|
|
// No unit.
|
|
},
|
|
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
|
|
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}},
|
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
|
|
},
|
|
},
|
|
}
|
|
)
|
|
|
|
func TestWriteV2RequestFixture(t *testing.T) {
|
|
// Generate dynamically writeV2RequestFixture, reusing v1 fixture elements.
|
|
st := writev2.NewSymbolTable()
|
|
b := labels.NewScratchBuilder(0)
|
|
labelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].ToLabels(&b, nil), nil)
|
|
exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
|
|
exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[1].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
|
|
expected := &writev2.Request{
|
|
Timeseries: []writev2.TimeSeries{
|
|
{
|
|
LabelsRefs: labelRefs,
|
|
Metadata: writev2.Metadata{
|
|
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
|
|
HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help),
|
|
UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit),
|
|
},
|
|
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
|
|
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}},
|
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
|
|
},
|
|
{
|
|
LabelsRefs: labelRefs,
|
|
Metadata: writev2.Metadata{
|
|
Type: writev2.Metadata_METRIC_TYPE_COUNTER,
|
|
HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help),
|
|
// No unit.
|
|
},
|
|
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
|
|
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}},
|
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
|
|
},
|
|
},
|
|
Symbols: st.Symbols(),
|
|
}
|
|
// Check if it matches static writeV2RequestFixture.
|
|
require.Equal(t, expected, writeV2RequestFixture)
|
|
}
|
|
|
|
func TestValidateLabelsAndMetricName(t *testing.T) {
|
|
tests := []struct {
|
|
input []prompb.Label
|
|
expectedErr string
|
|
description string
|
|
}{
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name"},
|
|
{Name: "labelName", Value: "labelValue"},
|
|
},
|
|
expectedErr: "",
|
|
description: "regular labels",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name"},
|
|
{Name: "_labelName", Value: "labelValue"},
|
|
},
|
|
expectedErr: "",
|
|
description: "label name with _",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name"},
|
|
{Name: "@labelName", Value: "labelValue"},
|
|
},
|
|
expectedErr: "invalid label name: @labelName",
|
|
description: "label name with @",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name"},
|
|
{Name: "123labelName", Value: "labelValue"},
|
|
},
|
|
expectedErr: "invalid label name: 123labelName",
|
|
description: "label name starts with numbers",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name"},
|
|
{Name: "", Value: "labelValue"},
|
|
},
|
|
expectedErr: "invalid label name: ",
|
|
description: "label name is empty string",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name"},
|
|
{Name: "labelName", Value: string([]byte{0xff})},
|
|
},
|
|
expectedErr: "invalid label value: " + string([]byte{0xff}),
|
|
description: "label value is an invalid UTF-8 value",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "@invalid_name"},
|
|
},
|
|
expectedErr: "invalid metric name: @invalid_name",
|
|
description: "metric name starts with @",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "__name__", Value: "name1"},
|
|
{Name: "__name__", Value: "name2"},
|
|
},
|
|
expectedErr: "duplicate label with name: __name__",
|
|
description: "duplicate label names",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "label1", Value: "name"},
|
|
{Name: "label2", Value: "name"},
|
|
},
|
|
expectedErr: "",
|
|
description: "duplicate label values",
|
|
},
|
|
{
|
|
input: []prompb.Label{
|
|
{Name: "", Value: "name"},
|
|
{Name: "label2", Value: "name"},
|
|
},
|
|
expectedErr: "invalid label name: ",
|
|
description: "don't report as duplicate label name",
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.description, func(t *testing.T) {
|
|
err := validateLabelsAndMetricName(test.input)
|
|
if test.expectedErr != "" {
|
|
require.Error(t, err)
|
|
require.Equal(t, test.expectedErr, err.Error())
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestConcreteSeriesSet(t *testing.T) {
|
|
series1 := &concreteSeries{
|
|
labels: labels.FromStrings("foo", "bar"),
|
|
floats: []prompb.Sample{{Value: 1, Timestamp: 2}},
|
|
}
|
|
series2 := &concreteSeries{
|
|
labels: labels.FromStrings("foo", "baz"),
|
|
floats: []prompb.Sample{{Value: 3, Timestamp: 4}},
|
|
}
|
|
c := &concreteSeriesSet{
|
|
series: []storage.Series{series1, series2},
|
|
}
|
|
require.True(t, c.Next(), "Expected Next() to be true.")
|
|
require.Equal(t, series1, c.At(), "Unexpected series returned.")
|
|
require.True(t, c.Next(), "Expected Next() to be true.")
|
|
require.Equal(t, series2, c.At(), "Unexpected series returned.")
|
|
require.False(t, c.Next(), "Expected Next() to be false.")
|
|
}
|
|
|
|
func TestConcreteSeriesClonesLabels(t *testing.T) {
|
|
lbls := labels.FromStrings("a", "b", "c", "d")
|
|
cs := concreteSeries{
|
|
labels: lbls,
|
|
}
|
|
|
|
gotLabels := cs.Labels()
|
|
require.Equal(t, lbls, gotLabels)
|
|
|
|
gotLabels.CopyFrom(labels.FromStrings("a", "foo", "c", "foo"))
|
|
|
|
gotLabels = cs.Labels()
|
|
require.Equal(t, lbls, gotLabels)
|
|
}
|
|
|
|
func TestConcreteSeriesIterator_FloatSamples(t *testing.T) {
|
|
series := &concreteSeries{
|
|
labels: labels.FromStrings("foo", "bar"),
|
|
floats: []prompb.Sample{
|
|
{Value: 1, Timestamp: 1},
|
|
{Value: 1.5, Timestamp: 1},
|
|
{Value: 2, Timestamp: 2},
|
|
{Value: 3, Timestamp: 3},
|
|
{Value: 4, Timestamp: 4},
|
|
},
|
|
}
|
|
it := series.Iterator(nil)
|
|
|
|
// Seek to the first sample with ts=1.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
|
|
ts, v := it.At()
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, 1., v)
|
|
|
|
// Seek one further, next sample still has ts=1.
|
|
require.Equal(t, chunkenc.ValFloat, it.Next())
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, 1.5, v)
|
|
|
|
// Seek again to 1 and make sure we stay where we are.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, 1.5, v)
|
|
|
|
// Another seek.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(3), ts)
|
|
require.Equal(t, 3., v)
|
|
|
|
// And we don't go back.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(3), ts)
|
|
require.Equal(t, 3., v)
|
|
|
|
// Seek beyond the end.
|
|
require.Equal(t, chunkenc.ValNone, it.Seek(5))
|
|
// And we don't go back. (This exposes issue #10027.)
|
|
require.Equal(t, chunkenc.ValNone, it.Seek(2))
|
|
}
|
|
|
|
func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
|
|
histograms := tsdbutil.GenerateTestHistograms(5)
|
|
histProtos := make([]prompb.Histogram, len(histograms))
|
|
for i, h := range histograms {
|
|
// Results in ts sequence of 1, 1, 2, 3, 4.
|
|
var ts int64
|
|
if i == 0 {
|
|
ts = 1
|
|
} else {
|
|
ts = int64(i)
|
|
}
|
|
histProtos[i] = prompb.FromIntHistogram(ts, h)
|
|
}
|
|
series := &concreteSeries{
|
|
labels: labels.FromStrings("foo", "bar"),
|
|
histograms: histProtos,
|
|
}
|
|
it := series.Iterator(nil)
|
|
|
|
// Seek to the first sample with ts=1.
|
|
require.Equal(t, chunkenc.ValHistogram, it.Seek(1))
|
|
ts, v := it.AtHistogram(nil)
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, histograms[0], v)
|
|
|
|
// Seek one further, next sample still has ts=1.
|
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
|
ts, v = it.AtHistogram(nil)
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, histograms[1], v)
|
|
|
|
// Seek again to 1 and make sure we stay where we are.
|
|
require.Equal(t, chunkenc.ValHistogram, it.Seek(1))
|
|
ts, v = it.AtHistogram(nil)
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, histograms[1], v)
|
|
|
|
// Another seek.
|
|
require.Equal(t, chunkenc.ValHistogram, it.Seek(3))
|
|
ts, v = it.AtHistogram(nil)
|
|
require.Equal(t, int64(3), ts)
|
|
require.Equal(t, histograms[3], v)
|
|
|
|
// And we don't go back.
|
|
require.Equal(t, chunkenc.ValHistogram, it.Seek(2))
|
|
ts, v = it.AtHistogram(nil)
|
|
require.Equal(t, int64(3), ts)
|
|
require.Equal(t, histograms[3], v)
|
|
|
|
// Seek beyond the end.
|
|
require.Equal(t, chunkenc.ValNone, it.Seek(5))
|
|
// And we don't go back. (This exposes issue #10027.)
|
|
require.Equal(t, chunkenc.ValNone, it.Seek(2))
|
|
}
|
|
|
|
func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
|
|
// Series starts as histograms, then transitions to floats at ts=8 (with an overlap from ts=8 to ts=10), then
|
|
// transitions back to histograms at ts=16.
|
|
histograms := tsdbutil.GenerateTestHistograms(15)
|
|
histProtos := make([]prompb.Histogram, len(histograms))
|
|
for i, h := range histograms {
|
|
if i < 10 {
|
|
histProtos[i] = prompb.FromIntHistogram(int64(i+1), h)
|
|
} else {
|
|
histProtos[i] = prompb.FromIntHistogram(int64(i+6), h)
|
|
}
|
|
}
|
|
series := &concreteSeries{
|
|
labels: labels.FromStrings("foo", "bar"),
|
|
floats: []prompb.Sample{
|
|
{Value: 1, Timestamp: 8},
|
|
{Value: 2, Timestamp: 9},
|
|
{Value: 3, Timestamp: 10},
|
|
{Value: 4, Timestamp: 11},
|
|
{Value: 5, Timestamp: 12},
|
|
{Value: 6, Timestamp: 13},
|
|
{Value: 7, Timestamp: 14},
|
|
{Value: 8, Timestamp: 15},
|
|
},
|
|
histograms: histProtos,
|
|
}
|
|
it := series.Iterator(nil)
|
|
|
|
var (
|
|
ts int64
|
|
v float64
|
|
h *histogram.Histogram
|
|
fh *histogram.FloatHistogram
|
|
)
|
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
|
ts, h = it.AtHistogram(nil)
|
|
require.Equal(t, int64(1), ts)
|
|
require.Equal(t, histograms[0], h)
|
|
|
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
|
ts, h = it.AtHistogram(nil)
|
|
require.Equal(t, int64(2), ts)
|
|
require.Equal(t, histograms[1], h)
|
|
|
|
// Seek to the first float/histogram sample overlap at ts=8 (should prefer the float sample).
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(8))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(8), ts)
|
|
require.Equal(t, 1., v)
|
|
|
|
// Attempting to seek backwards should do nothing.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(8), ts)
|
|
require.Equal(t, 1., v)
|
|
|
|
// Seeking to 8 again should also do nothing.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(8))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(8), ts)
|
|
require.Equal(t, 1., v)
|
|
|
|
// Again, should prefer the float sample.
|
|
require.Equal(t, chunkenc.ValFloat, it.Next())
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(9), ts)
|
|
require.Equal(t, 2., v)
|
|
|
|
// Seek to ts=11 where there are only float samples.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(11))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(11), ts)
|
|
require.Equal(t, 4., v)
|
|
|
|
// Seek to ts=15 right before the transition back to histogram samples.
|
|
require.Equal(t, chunkenc.ValFloat, it.Seek(15))
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(15), ts)
|
|
require.Equal(t, 8., v)
|
|
|
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
|
ts, h = it.AtHistogram(nil)
|
|
require.Equal(t, int64(16), ts)
|
|
require.Equal(t, histograms[10], h)
|
|
|
|
// Getting a float histogram from an int histogram works.
|
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
|
ts, fh = it.AtFloatHistogram(nil)
|
|
require.Equal(t, int64(17), ts)
|
|
expected := prompb.FromIntHistogram(int64(17), histograms[11]).ToFloatHistogram()
|
|
require.Equal(t, expected, fh)
|
|
|
|
// Keep calling Next() until the end.
|
|
for i := 0; i < 3; i++ {
|
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
|
}
|
|
|
|
// The iterator is exhausted.
|
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
|
// Should also not be able to seek backwards again.
|
|
require.Equal(t, chunkenc.ValNone, it.Seek(1))
|
|
}
|
|
|
|
func TestFromQueryResultWithDuplicates(t *testing.T) {
|
|
ts1 := prompb.TimeSeries{
|
|
Labels: []prompb.Label{
|
|
{Name: "foo", Value: "bar"},
|
|
{Name: "foo", Value: "def"},
|
|
},
|
|
Samples: []prompb.Sample{
|
|
{Value: 0.0, Timestamp: 0},
|
|
},
|
|
}
|
|
|
|
res := prompb.QueryResult{
|
|
Timeseries: []*prompb.TimeSeries{
|
|
&ts1,
|
|
},
|
|
}
|
|
|
|
series := FromQueryResult(false, &res)
|
|
|
|
errSeries, isErrSeriesSet := series.(errSeriesSet)
|
|
|
|
require.True(t, isErrSeriesSet, "Expected resulting series to be an errSeriesSet")
|
|
errMessage := errSeries.Err().Error()
|
|
require.Equal(t, "duplicate label with name: foo", errMessage, fmt.Sprintf("Expected error to be from duplicate label, but got: %s", errMessage))
|
|
}
|
|
|
|
func TestNegotiateResponseType(t *testing.T) {
|
|
r, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{
|
|
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
|
|
prompb.ReadRequest_SAMPLES,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, prompb.ReadRequest_STREAMED_XOR_CHUNKS, r)
|
|
|
|
r2, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{
|
|
prompb.ReadRequest_SAMPLES,
|
|
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, prompb.ReadRequest_SAMPLES, r2)
|
|
|
|
r3, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{})
|
|
require.NoError(t, err)
|
|
require.Equal(t, prompb.ReadRequest_SAMPLES, r3)
|
|
|
|
_, err = NegotiateResponseType([]prompb.ReadRequest_ResponseType{20})
|
|
require.Error(t, err, "expected error due to not supported requested response types")
|
|
require.Equal(t, "server does not support any of the requested response types: [20]; supported: map[SAMPLES:{} STREAMED_XOR_CHUNKS:{}]", err.Error())
|
|
}
|
|
|
|
func TestMergeLabels(t *testing.T) {
|
|
for _, tc := range []struct {
|
|
primary, secondary, expected []prompb.Label
|
|
}{
|
|
{
|
|
primary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
|
|
secondary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
|
|
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
|
|
},
|
|
{
|
|
primary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
|
|
secondary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
|
|
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
|
|
},
|
|
} {
|
|
require.Equal(t, tc.expected, MergeLabels(tc.primary, tc.secondary))
|
|
}
|
|
}
|
|
|
|
func TestDecodeWriteRequest(t *testing.T) {
|
|
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
|
|
require.NoError(t, err)
|
|
|
|
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
|
|
require.NoError(t, err)
|
|
require.Equal(t, writeRequestFixture, actual)
|
|
}
|
|
|
|
func TestDecodeWriteV2Request(t *testing.T) {
|
|
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeV2RequestFixture.Timeseries, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy")
|
|
require.NoError(t, err)
|
|
|
|
actual, err := DecodeWriteV2Request(bytes.NewReader(buf))
|
|
require.NoError(t, err)
|
|
require.Equal(t, writeV2RequestFixture, actual)
|
|
}
|
|
|
|
func TestStreamResponse(t *testing.T) {
|
|
lbs1 := prompb.FromLabels(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
|
|
lbs2 := prompb.FromLabels(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
|
|
chunk := prompb.Chunk{
|
|
Type: prompb.Chunk_XOR,
|
|
Data: make([]byte, 100),
|
|
}
|
|
lbSize, chunkSize := 0, chunk.Size()
|
|
for _, lb := range lbs1 {
|
|
lbSize += lb.Size()
|
|
}
|
|
maxBytesInFrame := lbSize + chunkSize*2
|
|
testData := []*prompb.ChunkedSeries{{
|
|
Labels: lbs1,
|
|
Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk},
|
|
}, {
|
|
Labels: lbs2,
|
|
Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk},
|
|
}}
|
|
css := newMockChunkSeriesSet(testData)
|
|
writer := mockWriter{}
|
|
warning, err := StreamChunkedReadResponses(&writer, 0,
|
|
css,
|
|
nil,
|
|
maxBytesInFrame,
|
|
&sync.Pool{})
|
|
require.Nil(t, warning)
|
|
require.NoError(t, err)
|
|
expectData := []*prompb.ChunkedSeries{{
|
|
Labels: lbs1,
|
|
Chunks: []prompb.Chunk{chunk, chunk},
|
|
}, {
|
|
Labels: lbs1,
|
|
Chunks: []prompb.Chunk{chunk, chunk},
|
|
}, {
|
|
Labels: lbs2,
|
|
Chunks: []prompb.Chunk{chunk, chunk},
|
|
}, {
|
|
Labels: lbs2,
|
|
Chunks: []prompb.Chunk{chunk, chunk},
|
|
}}
|
|
require.Equal(t, expectData, writer.actual)
|
|
}
|
|
|
|
type mockWriter struct {
|
|
actual []*prompb.ChunkedSeries
|
|
}
|
|
|
|
func (m *mockWriter) Write(p []byte) (n int, err error) {
|
|
cr := &prompb.ChunkedReadResponse{}
|
|
if err := proto.Unmarshal(p, cr); err != nil {
|
|
return 0, fmt.Errorf("unmarshaling: %w", err)
|
|
}
|
|
m.actual = append(m.actual, cr.ChunkedSeries...)
|
|
return len(p), nil
|
|
}
|
|
|
|
type mockChunkSeriesSet struct {
|
|
chunkedSeries []*prompb.ChunkedSeries
|
|
index int
|
|
builder labels.ScratchBuilder
|
|
}
|
|
|
|
func newMockChunkSeriesSet(ss []*prompb.ChunkedSeries) storage.ChunkSeriesSet {
|
|
return &mockChunkSeriesSet{chunkedSeries: ss, index: -1, builder: labels.NewScratchBuilder(0)}
|
|
}
|
|
|
|
func (c *mockChunkSeriesSet) Next() bool {
|
|
c.index++
|
|
return c.index < len(c.chunkedSeries)
|
|
}
|
|
|
|
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
|
|
return &storage.ChunkSeriesEntry{
|
|
Lset: c.chunkedSeries[c.index].ToLabels(&c.builder, nil),
|
|
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
|
return &mockChunkIterator{
|
|
chunks: c.chunkedSeries[c.index].Chunks,
|
|
index: -1,
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }
|
|
|
|
func (c *mockChunkSeriesSet) Err() error {
|
|
return nil
|
|
}
|
|
|
|
type mockChunkIterator struct {
|
|
chunks []prompb.Chunk
|
|
index int
|
|
}
|
|
|
|
func (c *mockChunkIterator) At() chunks.Meta {
|
|
one := c.chunks[c.index]
|
|
chunk, err := chunkenc.FromData(chunkenc.Encoding(one.Type), one.Data)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return chunks.Meta{
|
|
Chunk: chunk,
|
|
MinTime: one.MinTimeMs,
|
|
MaxTime: one.MaxTimeMs,
|
|
}
|
|
}
|
|
|
|
func (c *mockChunkIterator) Next() bool {
|
|
c.index++
|
|
return c.index < len(c.chunks)
|
|
}
|
|
|
|
func (c *mockChunkIterator) Err() error {
|
|
return nil
|
|
}
|
|
|
|
func TestChunkedSeriesIterator(t *testing.T) {
|
|
t.Run("happy path", func(t *testing.T) {
|
|
chks := buildTestChunks(t)
|
|
|
|
it := newChunkedSeriesIterator(chks, 2000, 12000)
|
|
|
|
require.NoError(t, it.err)
|
|
require.NotNil(t, it.cur)
|
|
|
|
// Initial next; advance to first valid sample of first chunk.
|
|
res := it.Next()
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
require.NoError(t, it.Err())
|
|
|
|
ts, v := it.At()
|
|
require.Equal(t, int64(2000), ts)
|
|
require.Equal(t, float64(2), v)
|
|
|
|
// Next to the second sample of the first chunk.
|
|
res = it.Next()
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
require.NoError(t, it.Err())
|
|
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(3000), ts)
|
|
require.Equal(t, float64(3), v)
|
|
|
|
// Attempt to seek to the first sample of the first chunk (should return current sample).
|
|
res = it.Seek(0)
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(3000), ts)
|
|
require.Equal(t, float64(3), v)
|
|
|
|
// Seek to the end of the first chunk.
|
|
res = it.Seek(4000)
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(4000), ts)
|
|
require.Equal(t, float64(4), v)
|
|
|
|
// Next to the first sample of the second chunk.
|
|
res = it.Next()
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
require.NoError(t, it.Err())
|
|
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(5000), ts)
|
|
require.Equal(t, float64(1), v)
|
|
|
|
// Seek to the second sample of the third chunk.
|
|
res = it.Seek(10999)
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
require.NoError(t, it.Err())
|
|
|
|
ts, v = it.At()
|
|
require.Equal(t, int64(11000), ts)
|
|
require.Equal(t, float64(3), v)
|
|
|
|
// Attempt to seek to something past the last sample (should return false and exhaust the iterator).
|
|
res = it.Seek(99999)
|
|
require.Equal(t, chunkenc.ValNone, res)
|
|
require.NoError(t, it.Err())
|
|
|
|
// Attempt to next past the last sample (should return false as the iterator is exhausted).
|
|
res = it.Next()
|
|
require.Equal(t, chunkenc.ValNone, res)
|
|
require.NoError(t, it.Err())
|
|
})
|
|
|
|
t.Run("invalid chunk encoding error", func(t *testing.T) {
|
|
chks := buildTestChunks(t)
|
|
|
|
// Set chunk type to an invalid value.
|
|
chks[0].Type = 8
|
|
|
|
it := newChunkedSeriesIterator(chks, 0, 14000)
|
|
|
|
res := it.Next()
|
|
require.Equal(t, chunkenc.ValNone, res)
|
|
|
|
res = it.Seek(1000)
|
|
require.Equal(t, chunkenc.ValNone, res)
|
|
|
|
require.ErrorContains(t, it.err, "invalid chunk encoding")
|
|
require.Nil(t, it.cur)
|
|
})
|
|
|
|
t.Run("empty chunks", func(t *testing.T) {
|
|
emptyChunks := make([]prompb.Chunk, 0)
|
|
|
|
it1 := newChunkedSeriesIterator(emptyChunks, 0, 1000)
|
|
require.Equal(t, chunkenc.ValNone, it1.Next())
|
|
require.Equal(t, chunkenc.ValNone, it1.Seek(1000))
|
|
require.NoError(t, it1.Err())
|
|
|
|
var nilChunks []prompb.Chunk
|
|
|
|
it2 := newChunkedSeriesIterator(nilChunks, 0, 1000)
|
|
require.Equal(t, chunkenc.ValNone, it2.Next())
|
|
require.Equal(t, chunkenc.ValNone, it2.Seek(1000))
|
|
require.NoError(t, it2.Err())
|
|
})
|
|
}
|
|
|
|
func TestChunkedSeries(t *testing.T) {
|
|
t.Run("happy path", func(t *testing.T) {
|
|
chks := buildTestChunks(t)
|
|
|
|
s := chunkedSeries{
|
|
ChunkedSeries: prompb.ChunkedSeries{
|
|
Labels: []prompb.Label{
|
|
{Name: "foo", Value: "bar"},
|
|
{Name: "asdf", Value: "zxcv"},
|
|
},
|
|
Chunks: chks,
|
|
},
|
|
}
|
|
|
|
require.Equal(t, labels.FromStrings("asdf", "zxcv", "foo", "bar"), s.Labels())
|
|
|
|
it := s.Iterator(nil)
|
|
res := it.Next() // Behavior is undefined w/o the initial call to Next.
|
|
|
|
require.Equal(t, chunkenc.ValFloat, res)
|
|
require.NoError(t, it.Err())
|
|
|
|
ts, v := it.At()
|
|
require.Equal(t, int64(0), ts)
|
|
require.Equal(t, float64(0), v)
|
|
})
|
|
}
|
|
|
|
func TestChunkedSeriesSet(t *testing.T) {
|
|
t.Run("happy path", func(t *testing.T) {
|
|
buf := &bytes.Buffer{}
|
|
flusher := &mockFlusher{}
|
|
|
|
w := NewChunkedWriter(buf, flusher)
|
|
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil)
|
|
|
|
chks := buildTestChunks(t)
|
|
l := []prompb.Label{
|
|
{Name: "foo", Value: "bar"},
|
|
}
|
|
|
|
for i, c := range chks {
|
|
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
|
|
readResp := prompb.ChunkedReadResponse{
|
|
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
|
|
QueryIndex: int64(i),
|
|
}
|
|
|
|
b, err := proto.Marshal(&readResp)
|
|
require.NoError(t, err)
|
|
|
|
_, err = w.Write(b)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {})
|
|
require.NoError(t, ss.Err())
|
|
require.Nil(t, ss.Warnings())
|
|
|
|
res := ss.Next()
|
|
require.True(t, res)
|
|
require.NoError(t, ss.Err())
|
|
|
|
s := ss.At()
|
|
require.Equal(t, 1, s.Labels().Len())
|
|
require.True(t, s.Labels().Has("foo"))
|
|
require.Equal(t, "bar", s.Labels().Get("foo"))
|
|
|
|
it := s.Iterator(nil)
|
|
it.Next()
|
|
ts, v := it.At()
|
|
require.Equal(t, int64(0), ts)
|
|
require.Equal(t, float64(0), v)
|
|
|
|
numResponses := 1
|
|
for ss.Next() {
|
|
numResponses++
|
|
}
|
|
require.Equal(t, numTestChunks, numResponses)
|
|
require.NoError(t, ss.Err())
|
|
})
|
|
|
|
t.Run("chunked reader error", func(t *testing.T) {
|
|
buf := &bytes.Buffer{}
|
|
flusher := &mockFlusher{}
|
|
|
|
w := NewChunkedWriter(buf, flusher)
|
|
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil)
|
|
|
|
chks := buildTestChunks(t)
|
|
l := []prompb.Label{
|
|
{Name: "foo", Value: "bar"},
|
|
}
|
|
|
|
for i, c := range chks {
|
|
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
|
|
readResp := prompb.ChunkedReadResponse{
|
|
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
|
|
QueryIndex: int64(i),
|
|
}
|
|
|
|
b, err := proto.Marshal(&readResp)
|
|
require.NoError(t, err)
|
|
|
|
b[0] = 0xFF // Corruption!
|
|
|
|
_, err = w.Write(b)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {})
|
|
require.NoError(t, ss.Err())
|
|
require.Nil(t, ss.Warnings())
|
|
|
|
res := ss.Next()
|
|
require.False(t, res)
|
|
require.ErrorContains(t, ss.Err(), "proto: illegal wireType 7")
|
|
})
|
|
}
|
|
|
|
// mockFlusher implements http.Flusher.
|
|
type mockFlusher struct{}
|
|
|
|
func (f *mockFlusher) Flush() {}
|
|
|
|
const (
|
|
numTestChunks = 3
|
|
numSamplesPerTestChunk = 5
|
|
)
|
|
|
|
func buildTestChunks(t *testing.T) []prompb.Chunk {
|
|
startTime := int64(0)
|
|
chks := make([]prompb.Chunk, 0, numTestChunks)
|
|
|
|
time := startTime
|
|
|
|
for i := 0; i < numTestChunks; i++ {
|
|
c := chunkenc.NewXORChunk()
|
|
|
|
a, err := c.Appender()
|
|
require.NoError(t, err)
|
|
|
|
minTimeMs := time
|
|
|
|
for j := 0; j < numSamplesPerTestChunk; j++ {
|
|
a.Append(time, float64(i+j))
|
|
time += int64(1000)
|
|
}
|
|
|
|
chks = append(chks, prompb.Chunk{
|
|
MinTimeMs: minTimeMs,
|
|
MaxTimeMs: time,
|
|
Type: prompb.Chunk_XOR,
|
|
Data: c.Bytes(),
|
|
})
|
|
}
|
|
|
|
return chks
|
|
}
|