From 763057785042a725fc8c8368580398bbcd9cc632 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 23 Nov 2023 11:31:10 -0800 Subject: [PATCH] more cleanup, address review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Callum Styan Signed-off-by: Nicolás Pazos --- scripts/remotewrite11-bench/run.sh | 2 +- storage/remote/queue_manager.go | 32 +++++++++++++++--------------- web/web.go | 1 - 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/scripts/remotewrite11-bench/run.sh b/scripts/remotewrite11-bench/run.sh index 174421c069..431be4c658 100755 --- a/scripts/remotewrite11-bench/run.sh +++ b/scripts/remotewrite11-bench/run.sh @@ -8,7 +8,7 @@ declare -a INSTANCES # (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags) INSTANCES+=('sender-v1;;receiver-v1;') INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-v11-min32-optimized-varint;--remote-write-format 1') -INSTANCES+=('sender-v11-min-len;--remote-write-format 5;receiver-v11-min-len;--remote-write-format 5') +INSTANCES+=('sender-v11-min-len;--remote-write-format 2;receiver-v11-min-len;--remote-write-format 2') # ~~~~~~~~~~~~~ diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 850344cc93..560a0dc540 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -486,7 +486,9 @@ func NewQueueManager( storeClient: client, sendExemplars: enableExemplarRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite, - rwFormat: rwFormat, + // TODO: we should eventually set the format via content negotiation, + // so this field would be the desired format, maybe with a fallback? + rwFormat: rwFormat, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), @@ -1464,7 +1466,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) symbolTable.clear() + case MinLen: + nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, + "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) + s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + symbolTable.clear() } + } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1812,19 +1822,15 @@ type offLenPair struct { } type rwSymbolTable struct { - symbols []byte - symbolsMap map[string]offLenPair - symbolsMap64Packed map[string]uint64 - symbolsMap32Packed map[string]uint32 - symbolsMapBytes map[string]uint32 + symbols []byte + symbolsMap map[string]offLenPair + symbolsMapBytes map[string]uint32 } func newRwSymbolTable() rwSymbolTable { return rwSymbolTable{ - symbolsMap: make(map[string]offLenPair), - symbolsMap64Packed: make(map[string]uint64), - symbolsMap32Packed: make(map[string]uint32), - symbolsMapBytes: make(map[string]uint32), + symbolsMap: make(map[string]offLenPair), + symbolsMapBytes: make(map[string]uint32), } } @@ -1867,12 +1873,6 @@ func (r *rwSymbolTable) clear() { for k := range r.symbolsMap { delete(r.symbolsMap, k) } - for k := range r.symbolsMap64Packed { - delete(r.symbolsMap64Packed, k) - } - for k := range r.symbolsMap32Packed { - delete(r.symbolsMap32Packed, k) - } for k := range r.symbolsMapBytes { delete(r.symbolsMapBytes, k) } diff --git a/web/web.go b/web/web.go index b997a17546..69cd67d38d 100644 --- a/web/web.go +++ b/web/web.go @@ -323,7 +323,6 @@ func New(logger log.Logger, o *Options) *Handler { app = h.storage } - fmt.Println("rw format for handler is: ", o.RemoteWriteFormat) h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, h.exemplarStorage, factorySPr, factoryTr, factoryAr, func() config.Config { h.mtx.RLock()