more cleanup, mostly linting fixes

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Callum Styan 2023-11-20 17:28:05 -08:00 committed by Nicolás Pazos
parent 766a12fb76
commit a8639ddb36
9 changed files with 1856 additions and 19091 deletions

View file

@ -985,7 +985,7 @@ func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestL
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) { func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)), Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
//Metadata: redReq.Metadata, // TODO handle metadata?
} }
for i, rts := range redReq.Timeseries { for i, rts := range redReq.Timeseries {

View file

@ -77,7 +77,7 @@ var writeRequestFixture = &prompb.WriteRequest{
// writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation. // writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation.
var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest { var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest {
st := newRwSymbolTable() st := newRwSymbolTable()
labels := []uint32{} var labels []uint32
for _, s := range []string{ for _, s := range []string{
"__name__", "test_metric1", "__name__", "test_metric1",
"b", "c", "b", "c",
@ -85,8 +85,8 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest {
"d", "e", "d", "e",
"foo", "bar", "foo", "bar",
} { } {
off, len := st.Ref(s) off, length := st.Ref(s)
labels = append(labels, off, len) labels = append(labels, off, length)
} }
return &prompb.MinimizedWriteRequest{ return &prompb.MinimizedWriteRequest{
Timeseries: []prompb.MinimizedTimeSeries{ Timeseries: []prompb.MinimizedTimeSeries{

View file

@ -1524,7 +1524,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) { func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the ReducedWriteRequest with no metadata. // Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
@ -1696,20 +1696,7 @@ func populateMinimizedTimeSeriesLen(symbolTable *rwSymbolTable, batch []timeSeri
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
nPendingSamples++ nPendingSamples++
// TODO: handle all types // TODO: handle all exemplars
//case tExemplar:
// l := make([]prompb.LabelRef, 0, d.exemplarLabels.Len())
// d.exemplarLabels.Range(func(el labels.Label) {
// nRef := pool.intern(el.Name)
// vRef := pool.intern(el.Value)
// l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
// })
// pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
// Labels: l,
// Value: d.value,
// Timestamp: d.timestamp,
// })
// nPendingExemplars++
case tHistogram: case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram)) pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++ nPendingHistograms++
@ -1811,7 +1798,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
buf = &[]byte{} buf = &[]byte{}
} }
compressed := snappy.Encode(*buf, pBuf.Bytes()) compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) { if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time // grow the buffer for the next time
*buf = make([]byte, n) *buf = make([]byte, n)
} }
@ -1841,20 +1828,20 @@ func newRwSymbolTable() rwSymbolTable {
} }
} }
func (r *rwSymbolTable) Ref(str string) (off uint32, leng uint32) { func (r *rwSymbolTable) Ref(str string) (uint32, uint32) {
if offlen, ok := r.symbolsMap[str]; ok { if offlen, ok := r.symbolsMap[str]; ok {
return offlen.Off, offlen.Len return offlen.Off, offlen.Len
} }
off, leng = uint32(len(r.symbols)), uint32(len(str)) off, length := uint32(len(r.symbols)), uint32(len(str))
if int(off) > len(r.symbols) { if int(off) > len(r.symbols) {
panic(1) panic(1)
} }
r.symbols = append(r.symbols, str...) r.symbols = append(r.symbols, str...)
if len(r.symbols) < int(off+leng) { if len(r.symbols) < int(off+length) {
panic(2) panic(2)
} }
r.symbolsMap[str] = offLenPair{off, leng} r.symbolsMap[str] = offLenPair{off, length}
return return off, length
} }
func (r *rwSymbolTable) RefLen(str string) uint32 { func (r *rwSymbolTable) RefLen(str string) uint32 {
@ -1892,7 +1879,7 @@ func (r *rwSymbolTable) clear() {
r.symbols = r.symbols[:0] r.symbols = r.symbols[:0]
} }
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) { func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1930,7 +1917,7 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
} }
compressed := snappy.Encode(*buf, data) compressed := snappy.Encode(*buf, data)
if n := snappy.MaxEncodedLen(len(data)); buf != nil && n > len(*buf) { if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
// grow the buffer for the next time // grow the buffer for the next time
*buf = make([]byte, n) *buf = make([]byte, n)
} }
@ -1976,7 +1963,7 @@ func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labe
} }
compressed := snappy.Encode(*buf, pBuf.Bytes()) compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) { if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time // grow the buffer for the next time
*buf = make([]byte, n) *buf = make([]byte, n)
} }

View file

@ -206,7 +206,6 @@ func TestMetadataDelivery(t *testing.T) {
func TestSampleDeliveryTimeout(t *testing.T) { func TestSampleDeliveryTimeout(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
//remoteWrite11 := proto == "1.1"
// Let's send one less sample than batch size, and wait the timeout duration // Let's send one less sample than batch size, and wait the timeout duration
n := 9 n := 9
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
@ -511,7 +510,6 @@ func TestQueueFilledDeadlock(t *testing.T) {
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -1498,9 +1496,9 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
batch []timeSeries batch []timeSeries
} }
testCases := []testcase{ testCases := []testcase{
testcase{createDummyTimeSeries(2)}, {createDummyTimeSeries(2)},
testcase{createDummyTimeSeries(10)}, {createDummyTimeSeries(10)},
testcase{createDummyTimeSeries(100)}, {createDummyTimeSeries(100)},
} }
for _, tc := range testCases { for _, tc := range testCases {
symbolTable := newRwSymbolTable() symbolTable := newRwSymbolTable()

View file

@ -355,7 +355,6 @@ func (h *writeHandler) writeMinLen(ctx context.Context, req *prompb.MinimizedWri
for _, ep := range ts.Exemplars { for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep) e := exemplarProtoToExemplar(ep)
//e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs) h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
} }

View file

@ -236,53 +236,6 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
} }
} }
// TODO(npazosmendez): adapt to minimized version
// func BenchmarkReducedRemoteWriteHandler(b *testing.B) {
// const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
// reqs := []*http.Request{}
// for i := 0; i < b.N; i++ {
// pool := newLookupPool()
// num := strings.Repeat(strconv.Itoa(i), 16)
// buf, _, err := buildReducedWriteRequest([]prompb.ReducedTimeSeries{{
// Labels: []prompb.LabelRef{
// {NameRef: pool.intern("__name__"), ValueRef: pool.intern("test_metric")},
// {NameRef: pool.intern("test_label_name_" + num), ValueRef: pool.intern(labelValue + num)},
// },
// Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
// }}, pool.getTable(), nil, nil)
// require.NoError(b, err)
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(b, err)
// req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue)
// reqs = append(reqs, req)
// }
// TODO(npazosmendez): add benchmarks with realistic scenarios
//func BenchmarkMin64RemoteWriteHandler(b *testing.B) {
// const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
// reqs := []*http.Request{}
// for i := 0; i < b.N; i++ {
// rw := newRwSymbolTable()
// num := strings.Repeat(strconv.Itoa(i), 16)
// buf, _, err := buildMinimizedWriteRequestFixed64([]prompb.MinimizedTimeSeriesFixed64{{
// LabelSymbols: []uint64{
// rw.Ref64Packed("__name__"), rw.Ref64Packed("test_metric"),
// rw.Ref64Packed("test_label_name_" + num), rw.Ref64Packed(labelValue + num),
// },
// Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
// }}, rw.LabelsString(), nil, nil)
// require.NoError(b, err)
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// require.NoError(b, err)
// req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue)
// reqs = append(reqs, req)
// }
//
// appendable := &mockAppendable{}
// // TODO: test with other proto format(s)
// handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
// recorder := httptest.NewRecorder()
func TestCommitErr(t *testing.T) { func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)

View file

@ -17,7 +17,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/prometheus/prometheus/storage/remote"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
@ -36,6 +35,7 @@ import (
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/annotations"
) )

20844
web/ui/package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/prometheus/prometheus/storage/remote"
"io" "io"
stdlog "log" stdlog "log"
"math" "math"
@ -58,6 +57,7 @@ import (
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/template"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
api_v1 "github.com/prometheus/prometheus/web/api/v1" api_v1 "github.com/prometheus/prometheus/web/api/v1"