diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index bdcde240fe..cb19ace723 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -150,11 +150,12 @@ type flagConfig struct { featureList []string // These options are extracted from featureList // for ease of use. - enableExpandExternalLabels bool - enableNewSDManager bool - enablePerStepStats bool - enableAutoGOMAXPROCS bool - enableSenderRemoteWrite11 bool + enableExpandExternalLabels bool + enableNewSDManager bool + enablePerStepStats bool + enableAutoGOMAXPROCS bool + enableSenderRemoteWrite11 bool + enableSenderRemoteWrite11Minimized bool prometheusURL string corsRegexString string @@ -223,8 +224,13 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "rw-1-1-sender": c.enableSenderRemoteWrite11 = true level.Info(logger).Log("msg", "Experimental remote write 1.1 will be used on the sender end, receiver must be able to parse this new protobuf format.") + case "rw-1-1-sender-min": + c.enableSenderRemoteWrite11Minimized = true + level.Info(logger).Log("msg", "Experimental remote write 1.1 will be used on the sender end, receiver must be able to parse this new protobuf format.") case "rw-1-1-receiver": c.web.EnableReceiverRemoteWrite11 = true + case "rw-1-1-receiver-min": + c.web.EnableReceiverRemoteWrite11Min = true level.Info(logger).Log("msg", "Experimental remote write 1.1 will be supported on the receiver end, sender can send this new protobuf format.") default: level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o) @@ -611,7 +617,7 @@ func main() { var ( localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.enableSenderRemoteWrite11) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.enableSenderRemoteWrite11, cfg.enableSenderRemoteWrite11Minimized) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index a502505fa7..90e9f743b1 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -89,5 +89,22 @@ func main() { } }) + http.HandleFunc("/receiveMinimized", func(w http.ResponseWriter, r *http.Request) { + req, err := remote.DecodeMinimizedWriteRequest(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + for _, ts := range req.Timeseries { + ls := remote.Uint32RefToLabels(req.Symbols, ts.LabelSymbols) + fmt.Println(ls) + + for _, s := range ts.Samples { + fmt.Printf("\tSample: %f %d\n", s.Value, s.Timestamp) + } + } + }) + log.Fatal(http.ListenAndServe(":1234", nil)) } diff --git a/scripts/remotewrite11-bench/run.sh b/scripts/remotewrite11-bench/run.sh index 28185194db..60dd1fadf2 100755 --- a/scripts/remotewrite11-bench/run.sh +++ b/scripts/remotewrite11-bench/run.sh @@ -8,7 +8,7 @@ declare -a INSTANCES # (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags) INSTANCES+=('sender-v1;;receiver-v1;') INSTANCES+=('sender-v11;--enable-feature rw-1-1-sender;receiver-v11;--enable-feature rw-1-1-receiver') - +INSTANCES+=('sender-v11-min;--enable-feature rw-1-1-sender-min;receiver-v11-min;--enable-feature rw-1-1-receiver-min') # ~~~~~~~~~~~~~ diff --git a/storage/remote/codec.go b/storage/remote/codec.go index c4a4a319a5..1a0956f952 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -957,6 +957,27 @@ func DecodeReducedWriteRequest(r io.Reader) (*prompb.WriteRequestWithRefs, error return &req, nil } +// DecodeMinimizedWriteRequest from an io.Reader into a prompb.WriteRequest, handling +// snappy decompression. +func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) { + compressed, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.MinimizedWriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} + func ReducedWriteRequestToWriteRequest(redReq *prompb.WriteRequestWithRefs) (*prompb.WriteRequest, error) { req := &prompb.WriteRequest{ Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)), diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 497b6986bb..d67261f266 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -17,7 +17,7 @@ import ( "context" "errors" "math" - "strconv" + "strings" "sync" "time" @@ -405,7 +405,8 @@ type QueueManager struct { watcher *wlog.Watcher metadataWatcher *MetadataWatcher // experimental feature, new remote write proto format - internFormat bool + internFormat bool + secondInternFormat bool clientMtx sync.RWMutex storeClient WriteClient @@ -454,6 +455,7 @@ func NewQueueManager( enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, internFormat bool, + secondInternFormat bool, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -477,6 +479,7 @@ func NewQueueManager( sendExemplars: enableExemplarRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite, internFormat: internFormat, + secondInternFormat: secondInternFormat, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), @@ -1348,6 +1351,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { shardNum := strconv.Itoa(shardID) pool := newLookupPool() + symbolTable := newRwSymbolTable() // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline anyways. @@ -1378,6 +1382,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } + pendingMinimizedData := make([]prompb.MinimizedTimeSeries, max) + for i := range pendingMinimizedData { + pendingMinimizedData[i].Samples = []prompb.Sample{{}} + } + timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { @@ -1412,7 +1421,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - if s.qm.internFormat { + if s.qm.secondInternFormat { + nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) + + n := nPendingSamples + nPendingExemplars + nPendingHistograms + s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + symbolTable.clear() + } else if s.qm.internFormat && !s.qm.secondInternFormat { // the new internFormat feature flag is be set nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms @@ -1518,6 +1533,18 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) } +func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { + begin := time.Now() + // Build the ReducedWriteRequest with no metadata. + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. + req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf) + if err == nil { + err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) + } + s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) +} + func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) { if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) @@ -1648,6 +1675,56 @@ func populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData return nPendingSamples, nPendingExemplars, nPendingHistograms } +func populateMinimizedTimeSeries(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { + var nPendingSamples, nPendingExemplars, nPendingHistograms int + for nPending, d := range batch { + pendingData[nPending].Samples = pendingData[nPending].Samples[:0] + if sendExemplars { + pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] + } + if sendNativeHistograms { + pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] + } + + // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) + // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll + // stop reading from the queue. This makes it safe to reference pendingSamples by index. + // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + + pendingData[nPending].LabelSymbols = labelsToUint32Slice(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols) + switch d.sType { + case tSample: + pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ + Value: d.value, + Timestamp: d.timestamp, + }) + nPendingSamples++ + // TODO: handle all types + //case tExemplar: + // // TODO(npazosmendez) optimize? + // l := make([]prompb.LabelRef, 0, d.exemplarLabels.Len()) + // d.exemplarLabels.Range(func(el labels.Label) { + // nRef := pool.intern(el.Name) + // vRef := pool.intern(el.Value) + // l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef}) + // }) + // pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{ + // Labels: l, + // Value: d.value, + // Timestamp: d.timestamp, + // }) + // nPendingExemplars++ + case tHistogram: + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram)) + nPendingHistograms++ + case tFloatHistogram: + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) + nPendingHistograms++ + } + } + return nPendingSamples, nPendingExemplars, nPendingHistograms +} + func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { backoff := cfg.MinBackoff sleepDuration := model.Duration(0) @@ -1791,3 +1868,82 @@ func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uin } return compressed, highest, nil } + +type rwSymbolTable struct { + symbols strings.Builder + symbolsMap map[string]uint32 +} + +func newRwSymbolTable() rwSymbolTable { + return rwSymbolTable{ + symbolsMap: make(map[string]uint32), + } +} + +func (r *rwSymbolTable) Ref(str string) uint32 { + // todo, check for overflowing the uint32 based on documented format? + if ref, ok := r.symbolsMap[str]; ok { + return ref + } + r.symbolsMap[str] = packRef(r.symbols.Len(), len(str)) + r.symbols.WriteString(str) + + return r.symbolsMap[str] +} + +func (r *rwSymbolTable) LabelsString() string { + return r.symbols.String() +} + +func (r *rwSymbolTable) clear() { + for k := range r.symbolsMap { + delete(r.symbolsMap, k) + } + r.symbols.Reset() +} + +func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { + var highest int64 + for _, ts := range samples { + // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. + if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { + highest = ts.Samples[0].Timestamp + } + if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { + highest = ts.Exemplars[0].Timestamp + } + if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { + highest = ts.Histograms[0].Timestamp + } + } + + req := &prompb.MinimizedWriteRequest{ + Symbols: labels, + Timeseries: samples, + } + + if pBuf == nil { + pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. + } else { + pBuf.Reset() + } + err := pBuf.Marshal(req) + if err != nil { + return nil, 0, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + *buf = (*buf)[0:cap(*buf)] + } else { + buf = &[]byte{} + } + + compressed := snappy.Encode(*buf, pBuf.Bytes()) + if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) { + // grow the buffer for the next time + *buf = make([]byte, n) + } + return compressed, highest, nil +} diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a6662eb20f..459e071dac 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -106,7 +106,7 @@ func TestSampleDelivery(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.remoteWrite11) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.remoteWrite11, false) defer s.Close() var ( @@ -178,7 +178,7 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) m.Start() defer m.Stop() @@ -220,7 +220,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -267,7 +267,7 @@ func TestSampleDeliveryOrder(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.StoreSeries(series, 0) m.Start() @@ -289,7 +289,7 @@ func TestShutdown(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -327,7 +327,7 @@ func TestSeriesReset(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -359,7 +359,7 @@ func TestReshard(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.StoreSeries(series, 0) m.Start() @@ -400,7 +400,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.Start() h.Unlock() h.Lock() @@ -440,7 +440,7 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.StoreSeries(series, 0) m.Start() @@ -490,7 +490,7 @@ func TestQueueFilledDeadlock(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -522,7 +522,7 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient(remoteWrite11) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11, false) m.Start() defer m.Stop() @@ -571,7 +571,7 @@ func TestShouldReshard(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient(false) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -965,7 +965,7 @@ func BenchmarkSampleSend(b *testing.B) { dir := b.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) m.StoreSeries(series, 0) // These should be received by the client. @@ -1011,7 +1011,7 @@ func BenchmarkStartup(b *testing.B) { c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false) + cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -1094,7 +1094,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1171,7 +1171,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, false) for _, tc := range []struct { name string @@ -1473,19 +1473,20 @@ func BenchmarkBuildWriteRequest(b *testing.B) { } } + two_batch := createDummyTimeSeries(2) + ten_batch := createDummyTimeSeries(10) + hundred_batch := createDummyTimeSeries(100) + b.Run("2 instances", func(b *testing.B) { - batch := createDummyTimeSeries(2) - bench(b, batch) + bench(b, two_batch) }) b.Run("10 instances", func(b *testing.B) { - batch := createDummyTimeSeries(10) - bench(b, batch) + bench(b, ten_batch) }) b.Run("1k instances", func(b *testing.B) { - batch := createDummyTimeSeries(1000) - bench(b, batch) + bench(b, hundred_batch) }) } @@ -1520,18 +1521,63 @@ func BenchmarkBuildReducedWriteRequest(b *testing.B) { } } + two_batch := createDummyTimeSeries(2) + ten_batch := createDummyTimeSeries(10) + hundred_batch := createDummyTimeSeries(100) + b.Run("2 instances", func(b *testing.B) { - batch := createDummyTimeSeries(2) - bench(b, batch) + bench(b, two_batch) }) b.Run("10 instances", func(b *testing.B) { - batch := createDummyTimeSeries(10) - bench(b, batch) + bench(b, ten_batch) }) b.Run("1k instances", func(b *testing.B) { - batch := createDummyTimeSeries(1000) - bench(b, batch) + bench(b, hundred_batch) }) } + +func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { + type testcase struct { + batch []timeSeries + } + testCases := []testcase{ + testcase{createDummyTimeSeries(2)}, + testcase{createDummyTimeSeries(10)}, + testcase{createDummyTimeSeries(100)}, + } + for _, tc := range testCases { + symbolTable := newRwSymbolTable() + buff := make([]byte, 0) + seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) + //total := 0 + for i := range seriesBuff { + seriesBuff[i].Samples = []prompb.Sample{{}} + // todo: add other types + //seriesBuff[i].Exemplars = []prompb.Exemplar{{}} + } + pBuf := proto.NewBuffer(nil) + + // Warmup buffers + for i := 0; i < 10; i++ { + populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) + buildMinimizedWriteRequest(seriesBuff, symbolTable.symbols.String(), pBuf, &buff) + } + + b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { + totalSize := 0 + for j := 0; j < b.N; j++ { + populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) + b.ResetTimer() + req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.symbols.String(), pBuf, &buff) + if err != nil { + b.Fatal(err) + } + symbolTable.clear() + totalSize += len(req) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + } + }) + } +} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 75321bd9cc..200b047ccd 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -91,7 +91,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { for _, tc := range cases { t.Run("", func(t *testing.T) { - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteReadConfigs: tc.cfgs, diff --git a/storage/remote/storage.go b/storage/remote/storage.go index b5dc607db4..b1b51e2494 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -62,7 +62,7 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool) *Storage { +func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool, remoteWrite11Minimized bool) *Storage { if l == nil { l = log.NewNopLogger() } @@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal logger: logger, localStartTimeCallback: stCallback, } - s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, remoteWrite11) + s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, remoteWrite11, remoteWrite11Minimized) return s } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index d0f9322c79..9a395455cb 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -29,7 +29,7 @@ import ( func TestStorageLifecycle(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) { func TestUpdateRemoteReadConfigs(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { func TestFilterExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) { func TestIgnoreExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ diff --git a/storage/remote/write.go b/storage/remote/write.go index d6ac9d23c7..196cbc59fe 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -60,40 +60,42 @@ type WriteStorage struct { reg prometheus.Registerer mtx sync.Mutex - watcherMetrics *wlog.WatcherMetrics - liveReaderMetrics *wlog.LiveReaderMetrics - externalLabels labels.Labels - dir string - queues map[string]*QueueManager - remoteWrite11 bool - samplesIn *ewmaRate - flushDeadline time.Duration - interner *pool - scraper ReadyScrapeManager - quit chan struct{} + watcherMetrics *wlog.WatcherMetrics + liveReaderMetrics *wlog.LiveReaderMetrics + externalLabels labels.Labels + dir string + queues map[string]*QueueManager + remoteWrite11 bool + remoteWrite11Minimized bool + samplesIn *ewmaRate + flushDeadline time.Duration + interner *pool + scraper ReadyScrapeManager + quit chan struct{} // For timestampTracker. highestTimestamp *maxTimestamp } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool) *WriteStorage { +func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool, remoteWrite11Minimized bool) *WriteStorage { if logger == nil { logger = log.NewNopLogger() } rws := &WriteStorage{ - queues: make(map[string]*QueueManager), - remoteWrite11: remoteWrite11, - watcherMetrics: wlog.NewWatcherMetrics(reg), - liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), - logger: logger, - reg: reg, - flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - dir: dir, - interner: newPool(), - scraper: sm, - quit: make(chan struct{}), + queues: make(map[string]*QueueManager), + remoteWrite11: remoteWrite11, + remoteWrite11Minimized: remoteWrite11Minimized, + watcherMetrics: wlog.NewWatcherMetrics(reg), + liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), + logger: logger, + reg: reg, + flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + dir: dir, + interner: newPool(), + scraper: sm, + quit: make(chan struct{}), highestTimestamp: &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -211,6 +213,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rwConf.SendExemplars, rwConf.SendNativeHistograms, rws.remoteWrite11, + rws.remoteWrite11Minimized, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 79f5bd4e37..ce7048803b 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -47,15 +47,18 @@ type writeHandler struct { // Experimental feature, new remote write proto format // The handler will accept the new format, but it can still accept the old one enableRemoteWrite11 bool + + enableRemoteWrite11Minimized bool } // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. -func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, enableRemoteWrite11 bool) http.Handler { +func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, enableRemoteWrite11 bool, enableRemoteWrite11Minimized bool) http.Handler { h := &writeHandler{ - logger: logger, - appendable: appendable, - enableRemoteWrite11: enableRemoteWrite11, + logger: logger, + appendable: appendable, + enableRemoteWrite11: enableRemoteWrite11, + enableRemoteWrite11Minimized: enableRemoteWrite11Minimized, samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "prometheus", @@ -74,7 +77,11 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var err error var req *prompb.WriteRequest var reqWithRefs *prompb.WriteRequestWithRefs - if h.enableRemoteWrite11 && r.Header.Get(RemoteWriteVersionHeader) == RemoteWriteVersion11HeaderValue { + var reqMin *prompb.MinimizedWriteRequest + + if h.enableRemoteWrite11Minimized { + reqMin, err = DecodeMinimizedWriteRequest(r.Body) + } else if !h.enableRemoteWrite11Minimized && h.enableRemoteWrite11 && r.Header.Get(RemoteWriteVersionHeader) == RemoteWriteVersion11HeaderValue { reqWithRefs, err = DecodeReducedWriteRequest(r.Body) } else { req, err = DecodeWriteRequest(r.Body) @@ -86,7 +93,9 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if h.enableRemoteWrite11 { + if h.enableRemoteWrite11Minimized { + err = h.writeMin(r.Context(), reqMin) + } else if h.enableRemoteWrite11 { err = h.writeReduced(r.Context(), reqWithRefs) } else { err = h.write(r.Context(), req) @@ -139,23 +148,23 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err }() for _, ts := range req.Timeseries { - labels := labelProtosToLabels(ts.Labels) - if !labels.IsValid() { - level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String()) + ls := labelProtosToLabels(ts.Labels) + if !ls.IsValid() { + level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String()) samplesWithInvalidLabels++ continue } - err := h.appendSamples(app, ts.Samples, labels) + err := h.appendSamples(app, ts.Samples, ls) if err != nil { return err } for _, ep := range ts.Exemplars { e := exemplarProtoToExemplar(ep) - h.appendExemplar(app, e, labels, &outOfOrderExemplarErrs) + h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs) } - err = h.appendHistograms(app, ts.Histograms, labels) + err = h.appendHistograms(app, ts.Histograms, ls) if err != nil { return err } @@ -328,3 +337,42 @@ func (h *writeHandler) writeReduced(ctx context.Context, req *prompb.WriteReques return nil } + +func (h *writeHandler) writeMin(ctx context.Context, req *prompb.MinimizedWriteRequest) (err error) { + outOfOrderExemplarErrs := 0 + + app := h.appendable.Appender(ctx) + defer func() { + if err != nil { + _ = app.Rollback() + return + } + err = app.Commit() + }() + + for _, ts := range req.Timeseries { + ls := Uint32RefToLabels(req.Symbols, ts.LabelSymbols) + + err := h.appendSamples(app, ts.Samples, ls) + if err != nil { + return err + } + + for _, ep := range ts.Exemplars { + e := exemplarProtoToExemplar(ep) + //e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep) + h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs) + } + + err = h.appendHistograms(app, ts.Histograms, ls) + if err != nil { + return err + } + } + + if outOfOrderExemplarErrs > 0 { + _ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs) + } + + return nil +} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 98fedfee07..f1d35e7738 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -45,7 +45,7 @@ func TestRemoteWriteHandler(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -96,7 +96,7 @@ func TestOutOfOrderSample(t *testing.T) { appendable := &mockAppendable{ latestSample: 100, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -121,7 +121,7 @@ func TestOutOfOrderExemplar(t *testing.T) { appendable := &mockAppendable{ latestExemplar: 100, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -145,7 +145,7 @@ func TestOutOfOrderHistogram(t *testing.T) { latestHistogram: 100, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -173,7 +173,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { } appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false) recorder := httptest.NewRecorder() b.ResetTimer() @@ -204,7 +204,7 @@ func BenchmarkReducedRemoteWriteHandler(b *testing.B) { } appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, true) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, true, false) recorder := httptest.NewRecorder() b.ResetTimer() @@ -223,7 +223,7 @@ func TestCommitErr(t *testing.T) { appendable := &mockAppendable{ commitErr: fmt.Errorf("commit error"), } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -249,7 +249,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) - handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), false) + handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), false, false) buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) require.NoError(b, err) @@ -303,7 +303,7 @@ func TestRemoteWriteHandlerReducedProtocol(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{} - handler := NewWriteHandler(nil, nil, appendable, true) + handler := NewWriteHandler(nil, nil, appendable, true, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 14ff466a9e..19470001f4 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -117,7 +117,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) { } for _, tc := range cases { - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: tc.cfgs, @@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) { hash, err := toHash(cfg) require.NoError(t, err) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -164,7 +164,7 @@ func TestRestartOnNameChange(t *testing.T) { func TestUpdateWithRegisterer(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false, false) c1 := &config.RemoteWriteConfig{ Name: "named", URL: &common_config.URL{ @@ -204,7 +204,7 @@ func TestUpdateWithRegisterer(t *testing.T) { func TestWriteStorageLifecycle(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -221,7 +221,7 @@ func TestWriteStorageLifecycle(t *testing.T) { func TestUpdateExternalLabels(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false, false) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ @@ -250,7 +250,7 @@ func TestUpdateExternalLabels(t *testing.T) { func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -276,7 +276,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { dir := t.TempDir() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false) c0 := &config.RemoteWriteConfig{ RemoteTimeout: model.Duration(10 * time.Second), diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 9e3873aef6..b180612637 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -255,6 +255,7 @@ func NewAPI( rwEnabled bool, otlpEnabled bool, enableRemoteWrite11 bool, + enableRemoteWrite11Min bool, ) *API { a := &API{ QueryEngine: qe, @@ -296,7 +297,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, enableRemoteWrite11) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, enableRemoteWrite11, enableRemoteWrite11Min) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) diff --git a/web/web.go b/web/web.go index 0e067f2db2..2317c9d0da 100644 --- a/web/web.go +++ b/web/web.go @@ -241,27 +241,28 @@ type Options struct { Version *PrometheusVersion Flags map[string]string - ListenAddress string - CORSOrigin *regexp.Regexp - ReadTimeout time.Duration - MaxConnections int - ExternalURL *url.URL - RoutePrefix string - UseLocalAssets bool - UserAssetsPath string - ConsoleTemplatesPath string - ConsoleLibrariesPath string - EnableLifecycle bool - EnableAdminAPI bool - PageTitle string - RemoteReadSampleLimit int - RemoteReadConcurrencyLimit int - RemoteReadBytesInFrame int - EnableRemoteWriteReceiver bool - EnableOTLPWriteReceiver bool - IsAgent bool - AppName string - EnableReceiverRemoteWrite11 bool + ListenAddress string + CORSOrigin *regexp.Regexp + ReadTimeout time.Duration + MaxConnections int + ExternalURL *url.URL + RoutePrefix string + UseLocalAssets bool + UserAssetsPath string + ConsoleTemplatesPath string + ConsoleLibrariesPath string + EnableLifecycle bool + EnableAdminAPI bool + PageTitle string + RemoteReadSampleLimit int + RemoteReadConcurrencyLimit int + RemoteReadBytesInFrame int + EnableRemoteWriteReceiver bool + EnableOTLPWriteReceiver bool + IsAgent bool + AppName string + EnableReceiverRemoteWrite11 bool + EnableReceiverRemoteWrite11Min bool Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -353,6 +354,7 @@ func New(logger log.Logger, o *Options) *Handler { o.EnableRemoteWriteReceiver, o.EnableOTLPWriteReceiver, o.EnableReceiverRemoteWrite11, + o.EnableReceiverRemoteWrite11Min, ) if o.RoutePrefix != "/" {