more cleanup, address review comments

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Callum Styan 2023-11-23 11:31:10 -08:00 committed by Nicolás Pazos
parent 58b1a34d89
commit 7630577850
3 changed files with 17 additions and 18 deletions

View file

@ -8,7 +8,7 @@ declare -a INSTANCES
# (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags) # (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags)
INSTANCES+=('sender-v1;;receiver-v1;') INSTANCES+=('sender-v1;;receiver-v1;')
INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-v11-min32-optimized-varint;--remote-write-format 1') INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-v11-min32-optimized-varint;--remote-write-format 1')
INSTANCES+=('sender-v11-min-len;--remote-write-format 5;receiver-v11-min-len;--remote-write-format 5') INSTANCES+=('sender-v11-min-len;--remote-write-format 2;receiver-v11-min-len;--remote-write-format 2')
# ~~~~~~~~~~~~~ # ~~~~~~~~~~~~~

View file

@ -486,6 +486,8 @@ func NewQueueManager(
storeClient: client, storeClient: client,
sendExemplars: enableExemplarRemoteWrite, sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite,
// TODO: we should eventually set the format via content negotiation,
// so this field would be the desired format, maybe with a fallback?
rwFormat: rwFormat, rwFormat: rwFormat,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
@ -1464,7 +1466,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
case MinLen:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
} }
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1814,16 +1824,12 @@ type offLenPair struct {
type rwSymbolTable struct { type rwSymbolTable struct {
symbols []byte symbols []byte
symbolsMap map[string]offLenPair symbolsMap map[string]offLenPair
symbolsMap64Packed map[string]uint64
symbolsMap32Packed map[string]uint32
symbolsMapBytes map[string]uint32 symbolsMapBytes map[string]uint32
} }
func newRwSymbolTable() rwSymbolTable { func newRwSymbolTable() rwSymbolTable {
return rwSymbolTable{ return rwSymbolTable{
symbolsMap: make(map[string]offLenPair), symbolsMap: make(map[string]offLenPair),
symbolsMap64Packed: make(map[string]uint64),
symbolsMap32Packed: make(map[string]uint32),
symbolsMapBytes: make(map[string]uint32), symbolsMapBytes: make(map[string]uint32),
} }
} }
@ -1867,12 +1873,6 @@ func (r *rwSymbolTable) clear() {
for k := range r.symbolsMap { for k := range r.symbolsMap {
delete(r.symbolsMap, k) delete(r.symbolsMap, k)
} }
for k := range r.symbolsMap64Packed {
delete(r.symbolsMap64Packed, k)
}
for k := range r.symbolsMap32Packed {
delete(r.symbolsMap32Packed, k)
}
for k := range r.symbolsMapBytes { for k := range r.symbolsMapBytes {
delete(r.symbolsMapBytes, k) delete(r.symbolsMapBytes, k)
} }

View file

@ -323,7 +323,6 @@ func New(logger log.Logger, o *Options) *Handler {
app = h.storage app = h.storage
} }
fmt.Println("rw format for handler is: ", o.RemoteWriteFormat)
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, h.exemplarStorage, factorySPr, factoryTr, factoryAr, h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, h.exemplarStorage, factorySPr, factoryTr, factoryAr,
func() config.Config { func() config.Config {
h.mtx.RLock() h.mtx.RLock()