add functionality for new minimized remote write request format

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Callum Styan 2023-11-06 16:49:49 -08:00 committed by Nicolás Pazos
parent a396fc65ea
commit 622a723626
15 changed files with 419 additions and 119 deletions

View file

@ -155,6 +155,7 @@ type flagConfig struct {
enablePerStepStats bool
enableAutoGOMAXPROCS bool
enableSenderRemoteWrite11 bool
enableSenderRemoteWrite11Minimized bool
prometheusURL string
corsRegexString string
@ -223,8 +224,13 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "rw-1-1-sender":
c.enableSenderRemoteWrite11 = true
level.Info(logger).Log("msg", "Experimental remote write 1.1 will be used on the sender end, receiver must be able to parse this new protobuf format.")
case "rw-1-1-sender-min":
c.enableSenderRemoteWrite11Minimized = true
level.Info(logger).Log("msg", "Experimental remote write 1.1 will be used on the sender end, receiver must be able to parse this new protobuf format.")
case "rw-1-1-receiver":
c.web.EnableReceiverRemoteWrite11 = true
case "rw-1-1-receiver-min":
c.web.EnableReceiverRemoteWrite11Min = true
level.Info(logger).Log("msg", "Experimental remote write 1.1 will be supported on the receiver end, sender can send this new protobuf format.")
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
@ -611,7 +617,7 @@ func main() {
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.enableSenderRemoteWrite11)
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.enableSenderRemoteWrite11, cfg.enableSenderRemoteWrite11Minimized)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

View file

@ -89,5 +89,22 @@ func main() {
}
})
http.HandleFunc("/receiveMinimized", func(w http.ResponseWriter, r *http.Request) {
req, err := remote.DecodeMinimizedWriteRequest(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, ts := range req.Timeseries {
ls := remote.Uint32RefToLabels(req.Symbols, ts.LabelSymbols)
fmt.Println(ls)
for _, s := range ts.Samples {
fmt.Printf("\tSample: %f %d\n", s.Value, s.Timestamp)
}
}
})
log.Fatal(http.ListenAndServe(":1234", nil))
}

View file

@ -8,7 +8,7 @@ declare -a INSTANCES
# (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags)
INSTANCES+=('sender-v1;;receiver-v1;')
INSTANCES+=('sender-v11;--enable-feature rw-1-1-sender;receiver-v11;--enable-feature rw-1-1-receiver')
INSTANCES+=('sender-v11-min;--enable-feature rw-1-1-sender-min;receiver-v11-min;--enable-feature rw-1-1-receiver-min')
# ~~~~~~~~~~~~~

View file

@ -957,6 +957,27 @@ func DecodeReducedWriteRequest(r io.Reader) (*prompb.WriteRequestWithRefs, error
return &req, nil
}
// DecodeMinimizedWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func ReducedWriteRequestToWriteRequest(redReq *prompb.WriteRequestWithRefs) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),

View file

@ -17,7 +17,7 @@ import (
"context"
"errors"
"math"
"strconv"
"strings"
"sync"
"time"
@ -406,6 +406,7 @@ type QueueManager struct {
metadataWatcher *MetadataWatcher
// experimental feature, new remote write proto format
internFormat bool
secondInternFormat bool
clientMtx sync.RWMutex
storeClient WriteClient
@ -454,6 +455,7 @@ func NewQueueManager(
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
internFormat bool,
secondInternFormat bool,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -477,6 +479,7 @@ func NewQueueManager(
sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite,
internFormat: internFormat,
secondInternFormat: secondInternFormat,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -1348,6 +1351,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
shardNum := strconv.Itoa(shardID)
pool := newLookupPool()
symbolTable := newRwSymbolTable()
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways.
@ -1378,6 +1382,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}
pendingMinimizedData := make([]prompb.MinimizedTimeSeries, max)
for i := range pendingMinimizedData {
pendingMinimizedData[i].Samples = []prompb.Sample{{}}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
if !timer.Stop() {
@ -1412,7 +1421,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok {
return
}
if s.qm.internFormat {
if s.qm.secondInternFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
} else if s.qm.internFormat && !s.qm.secondInternFormat {
// the new internFormat feature flag is be set
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
@ -1518,6 +1533,18 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) {
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
@ -1648,6 +1675,56 @@ func populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func populateMinimizedTimeSeries(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32Slice(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
// TODO: handle all types
//case tExemplar:
// // TODO(npazosmendez) optimize?
// l := make([]prompb.LabelRef, 0, d.exemplarLabels.Len())
// d.exemplarLabels.Range(func(el labels.Label) {
// nRef := pool.intern(el.Name)
// vRef := pool.intern(el.Value)
// l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
// })
// pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
// Labels: l,
// Value: d.value,
// Timestamp: d.timestamp,
// })
// nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff
sleepDuration := model.Duration(0)
@ -1791,3 +1868,82 @@ func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uin
}
return compressed, highest, nil
}
type rwSymbolTable struct {
symbols strings.Builder
symbolsMap map[string]uint32
}
func newRwSymbolTable() rwSymbolTable {
return rwSymbolTable{
symbolsMap: make(map[string]uint32),
}
}
func (r *rwSymbolTable) Ref(str string) uint32 {
// todo, check for overflowing the uint32 based on documented format?
if ref, ok := r.symbolsMap[str]; ok {
return ref
}
r.symbolsMap[str] = packRef(r.symbols.Len(), len(str))
r.symbols.WriteString(str)
return r.symbolsMap[str]
}
func (r *rwSymbolTable) LabelsString() string {
return r.symbols.String()
}
func (r *rwSymbolTable) clear() {
for k := range r.symbolsMap {
delete(r.symbolsMap, k)
}
r.symbols.Reset()
}
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequest{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}

View file

@ -106,7 +106,7 @@ func TestSampleDelivery(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.remoteWrite11)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.remoteWrite11, false)
defer s.Close()
var (
@ -178,7 +178,7 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
m.Start()
defer m.Stop()
@ -220,7 +220,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -267,7 +267,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.StoreSeries(series, 0)
m.Start()
@ -289,7 +289,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -327,7 +327,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -359,7 +359,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.StoreSeries(series, 0)
m.Start()
@ -400,7 +400,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.Start()
h.Unlock()
h.Lock()
@ -440,7 +440,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.StoreSeries(series, 0)
m.Start()
@ -490,7 +490,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -522,7 +522,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient(remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false)
m.Start()
defer m.Stop()
@ -571,7 +571,7 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient(false)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@ -965,7 +965,7 @@ func BenchmarkSampleSend(b *testing.B) {
dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -1011,7 +1011,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false)
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
@ -1094,7 +1094,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
@ -1171,7 +1171,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false)
for _, tc := range []struct {
name string
@ -1473,19 +1473,20 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
}
}
two_batch := createDummyTimeSeries(2)
ten_batch := createDummyTimeSeries(10)
hundred_batch := createDummyTimeSeries(100)
b.Run("2 instances", func(b *testing.B) {
batch := createDummyTimeSeries(2)
bench(b, batch)
bench(b, two_batch)
})
b.Run("10 instances", func(b *testing.B) {
batch := createDummyTimeSeries(10)
bench(b, batch)
bench(b, ten_batch)
})
b.Run("1k instances", func(b *testing.B) {
batch := createDummyTimeSeries(1000)
bench(b, batch)
bench(b, hundred_batch)
})
}
@ -1520,18 +1521,63 @@ func BenchmarkBuildReducedWriteRequest(b *testing.B) {
}
}
two_batch := createDummyTimeSeries(2)
ten_batch := createDummyTimeSeries(10)
hundred_batch := createDummyTimeSeries(100)
b.Run("2 instances", func(b *testing.B) {
batch := createDummyTimeSeries(2)
bench(b, batch)
bench(b, two_batch)
})
b.Run("10 instances", func(b *testing.B) {
batch := createDummyTimeSeries(10)
bench(b, batch)
bench(b, ten_batch)
})
b.Run("1k instances", func(b *testing.B) {
batch := createDummyTimeSeries(1000)
bench(b, batch)
bench(b, hundred_batch)
})
}
func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
type testcase struct {
batch []timeSeries
}
testCases := []testcase{
testcase{createDummyTimeSeries(2)},
testcase{createDummyTimeSeries(10)},
testcase{createDummyTimeSeries(100)},
}
for _, tc := range testCases {
symbolTable := newRwSymbolTable()
buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch))
//total := 0
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
// todo: add other types
//seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
pBuf := proto.NewBuffer(nil)
// Warmup buffers
for i := 0; i < 10; i++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequest(seriesBuff, symbolTable.symbols.String(), pBuf, &buff)
}
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0
for j := 0; j < b.N; j++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer()
req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.symbols.String(), pBuf, &buff)
if err != nil {
b.Fatal(err)
}
symbolTable.clear()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
})
}
}

View file

@ -91,7 +91,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,

View file

@ -62,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool) *Storage {
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool, remoteWrite11Minimized bool) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, remoteWrite11)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, remoteWrite11, remoteWrite11Minimized)
return s
}

View file

@ -29,7 +29,7 @@ import (
func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) {
func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) {
func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{

View file

@ -66,6 +66,7 @@ type WriteStorage struct {
dir string
queues map[string]*QueueManager
remoteWrite11 bool
remoteWrite11Minimized bool
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
@ -77,13 +78,14 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool, remoteWrite11Minimized bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
rws := &WriteStorage{
queues: make(map[string]*QueueManager),
remoteWrite11: remoteWrite11,
remoteWrite11Minimized: remoteWrite11Minimized,
watcherMetrics: wlog.NewWatcherMetrics(reg),
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
logger: logger,
@ -211,6 +213,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
rws.remoteWrite11,
rws.remoteWrite11Minimized,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

View file

@ -47,15 +47,18 @@ type writeHandler struct {
// Experimental feature, new remote write proto format
// The handler will accept the new format, but it can still accept the old one
enableRemoteWrite11 bool
enableRemoteWrite11Minimized bool
}
// NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, enableRemoteWrite11 bool) http.Handler {
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, enableRemoteWrite11 bool, enableRemoteWrite11Minimized bool) http.Handler {
h := &writeHandler{
logger: logger,
appendable: appendable,
enableRemoteWrite11: enableRemoteWrite11,
enableRemoteWrite11Minimized: enableRemoteWrite11Minimized,
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
@ -74,7 +77,11 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error
var req *prompb.WriteRequest
var reqWithRefs *prompb.WriteRequestWithRefs
if h.enableRemoteWrite11 && r.Header.Get(RemoteWriteVersionHeader) == RemoteWriteVersion11HeaderValue {
var reqMin *prompb.MinimizedWriteRequest
if h.enableRemoteWrite11Minimized {
reqMin, err = DecodeMinimizedWriteRequest(r.Body)
} else if !h.enableRemoteWrite11Minimized && h.enableRemoteWrite11 && r.Header.Get(RemoteWriteVersionHeader) == RemoteWriteVersion11HeaderValue {
reqWithRefs, err = DecodeReducedWriteRequest(r.Body)
} else {
req, err = DecodeWriteRequest(r.Body)
@ -86,7 +93,9 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if h.enableRemoteWrite11 {
if h.enableRemoteWrite11Minimized {
err = h.writeMin(r.Context(), reqMin)
} else if h.enableRemoteWrite11 {
err = h.writeReduced(r.Context(), reqWithRefs)
} else {
err = h.write(r.Context(), req)
@ -139,23 +148,23 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}()
for _, ts := range req.Timeseries {
labels := labelProtosToLabels(ts.Labels)
if !labels.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
ls := labelProtosToLabels(ts.Labels)
if !ls.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String())
samplesWithInvalidLabels++
continue
}
err := h.appendSamples(app, ts.Samples, labels)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, labels, &outOfOrderExemplarErrs)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, labels)
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
@ -328,3 +337,42 @@ func (h *writeHandler) writeReduced(ctx context.Context, req *prompb.WriteReques
return nil
}
func (h *writeHandler) writeMin(ctx context.Context, req *prompb.MinimizedWriteRequest) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32RefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
//e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}

View file

@ -45,7 +45,7 @@ func TestRemoteWriteHandler(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -96,7 +96,7 @@ func TestOutOfOrderSample(t *testing.T) {
appendable := &mockAppendable{
latestSample: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -121,7 +121,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
appendable := &mockAppendable{
latestExemplar: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -145,7 +145,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
latestHistogram: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -173,7 +173,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
}
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false)
recorder := httptest.NewRecorder()
b.ResetTimer()
@ -204,7 +204,7 @@ func BenchmarkReducedRemoteWriteHandler(b *testing.B) {
}
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, true)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, true, false)
recorder := httptest.NewRecorder()
b.ResetTimer()
@ -223,7 +223,7 @@ func TestCommitErr(t *testing.T) {
appendable := &mockAppendable{
commitErr: fmt.Errorf("commit error"),
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -249,7 +249,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close())
})
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), false)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), false, false)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err)
@ -303,7 +303,7 @@ func TestRemoteWriteHandlerReducedProtocol(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(nil, nil, appendable, true)
handler := NewWriteHandler(nil, nil, appendable, true, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

View file

@ -117,7 +117,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
}
for _, tc := range cases {
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) {
hash, err := toHash(cfg)
require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
@ -164,7 +164,7 @@ func TestRestartOnNameChange(t *testing.T) {
func TestUpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false, false)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -204,7 +204,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
func TestWriteStorageLifecycle(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -221,7 +221,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
func TestUpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false, false)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
@ -250,7 +250,7 @@ func TestUpdateExternalLabels(t *testing.T) {
func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -276,7 +276,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),

View file

@ -255,6 +255,7 @@ func NewAPI(
rwEnabled bool,
otlpEnabled bool,
enableRemoteWrite11 bool,
enableRemoteWrite11Min bool,
) *API {
a := &API{
QueryEngine: qe,
@ -296,7 +297,7 @@ func NewAPI(
}
if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, enableRemoteWrite11)
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, enableRemoteWrite11, enableRemoteWrite11Min)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)

View file

@ -262,6 +262,7 @@ type Options struct {
IsAgent bool
AppName string
EnableReceiverRemoteWrite11 bool
EnableReceiverRemoteWrite11Min bool
Gatherer prometheus.Gatherer
Registerer prometheus.Registerer
@ -353,6 +354,7 @@ func New(logger log.Logger, o *Options) *Handler {
o.EnableRemoteWriteReceiver,
o.EnableOTLPWriteReceiver,
o.EnableReceiverRemoteWrite11,
o.EnableReceiverRemoteWrite11Min,
)
if o.RoutePrefix != "/" {