Implement code paths for new proto format

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-02-20 14:10:13 -08:00 committed by Nicolás Pazos
parent 4910a33d78
commit f1992591b2
10 changed files with 367 additions and 60 deletions

View file

@ -146,6 +146,7 @@ type flagConfig struct {
queryConcurrency int queryConcurrency int
queryMaxSamples int queryMaxSamples int
RemoteFlushDeadline model.Duration RemoteFlushDeadline model.Duration
rwProto bool
featureList []string featureList []string
// These options are extracted from featureList // These options are extracted from featureList
@ -210,6 +211,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
continue continue
case "promql-at-modifier", "promql-negative-offset": case "promql-at-modifier", "promql-negative-offset":
level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o) level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o)
case "reduced-rw-proto":
c.rwProto = true
level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.")
default: default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o) level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
} }
@ -595,7 +599,7 @@ func main() {
var ( var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()} localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{} scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.rwProto)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
) )

View file

@ -403,6 +403,8 @@ type QueueManager struct {
sendNativeHistograms bool sendNativeHistograms bool
watcher *wlog.Watcher watcher *wlog.Watcher
metadataWatcher *MetadataWatcher metadataWatcher *MetadataWatcher
// experimental feature, new remote write proto format
internFormat bool
clientMtx sync.RWMutex clientMtx sync.RWMutex
storeClient WriteClient storeClient WriteClient
@ -450,6 +452,7 @@ func NewQueueManager(
sm ReadyScrapeManager, sm ReadyScrapeManager,
enableExemplarRemoteWrite bool, enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool, enableNativeHistogramRemoteWrite bool,
internFormat bool,
) *QueueManager { ) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
@ -472,6 +475,7 @@ func NewQueueManager(
storeClient: client, storeClient: client,
sendExemplars: enableExemplarRemoteWrite, sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite,
internFormat: internFormat,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -771,6 +775,7 @@ outer:
// Start the queue manager sending samples to the remote storage. // Start the queue manager sending samples to the remote storage.
// Does not block. // Does not block.
func (t *QueueManager) Start() { func (t *QueueManager) Start() {
// panic(1)
// Register and initialise some metrics. // Register and initialise some metrics.
t.metrics.register() t.metrics.register()
t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
@ -1169,7 +1174,6 @@ func (s *shards) stop() {
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
shard := uint64(ref) % uint64(len(s.queues)) shard := uint64(ref) % uint64(len(s.queues))
select { select {
case <-s.softShutdown: case <-s.softShutdown:
@ -1343,6 +1347,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}() }()
shardNum := strconv.Itoa(shardID) shardNum := strconv.Itoa(shardID)
pool := newLookupPool()
// Send batches of at most MaxSamplesPerSend samples to the remote storage. // Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways. // If we have fewer samples than that, flush them out after a deadline anyways.
@ -1365,6 +1370,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
pendingReducedData := make([]prompb.ReducedTimeSeries, max)
for i := range pendingReducedData {
pendingReducedData[i].Samples = []prompb.Sample{{}}
if s.qm.sendExemplars {
pendingReducedData[i].Exemplars = []prompb.ExemplarRef{{}}
}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() { stop := func() {
if !timer.Stop() { if !timer.Stop() {
@ -1399,10 +1412,17 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok { if !ok {
return return
} }
if s.qm.internFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
pool.clear()
} else {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
}
queue.ReturnForReuse(batch)
stop() stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1410,11 +1430,21 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C: case <-timer.C:
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) if s.qm.internFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
pool.clear()
} else {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
}
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1556,6 +1586,149 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err return err
} }
func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if s.qm.sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Labels = make([]prompb.LabelRef, len(d.seriesLabels))
for i, sl := range d.seriesLabels {
nRef := pool.intern(sl.Name)
vRef := pool.intern(sl.Value)
pendingData[nPending].Labels[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
}
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
l := make([]prompb.LabelRef, len(d.exemplarLabels))
for i, el := range d.exemplarLabels {
nRef := pool.intern(el.Name)
vRef := pool.intern(el.Value)
l[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
}
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
Labels: l,
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
err := s.sendReducedSamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, pBuf, buf)
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
}
// These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.dataOut.incr(int64(len(samples)))
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars/histograms also should be subtracted as an error means
// they will not be retried.
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount))
s.enqueuedHistograms.Sub(int64(histogramCount))
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendReducedSamplesWithBackoff(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
reqSize := len(req)
*buf = req
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func(try int) error {
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {
span.RecordError(err)
return err
}
return nil
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
}
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
return err
}
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return err
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff backoff := cfg.MinBackoff
sleepDuration := model.Duration(0) sleepDuration := model.Duration(0)
@ -1646,3 +1819,44 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
compressed := reSnappy.Encode(buf, pBuf.Bytes()) compressed := reSnappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil return compressed, highest, nil
} }
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.WriteRequestWithRefs{
StringSymbolTable: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf[0:cap(buf)]
}
compressed := reSnappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil
}

View file

@ -81,7 +81,7 @@ func TestSampleDelivery(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close() defer s.Close()
queueConfig := config.DefaultQueueConfig queueConfig := config.DefaultQueueConfig
@ -132,6 +132,7 @@ func TestSampleDelivery(t *testing.T) {
hash, err := toHash(writeConfig) hash, err := toHash(writeConfig)
require.NoError(t, err) require.NoError(t, err)
qm := s.rws.queues[hash] qm := s.rws.queues[hash]
// time.Sleep(1 * time.Second)
c := NewTestWriteClient() c := NewTestWriteClient()
qm.SetClient(c) qm.SetClient(c)
@ -172,7 +173,7 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -211,7 +212,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -253,7 +254,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -273,7 +274,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -311,7 +312,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
for j := 0; j < numSeries; j++ { for j := 0; j < numSeries; j++ {
@ -340,7 +341,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -376,7 +377,7 @@ func TestReshardRaceWithStop(*testing.T) {
go func() { go func() {
for { for {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start() m.Start()
h.Unlock() h.Unlock()
h.Lock() h.Lock()
@ -411,7 +412,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -456,7 +457,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -483,7 +484,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient() c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -530,7 +531,7 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient() client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.numShards = c.startingShards m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn) m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut) m.dataOut.incr(c.samplesOut)
@ -563,6 +564,9 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
// Create Labels that is name of series plus any extra labels supplied. // Create Labels that is name of series plus any extra labels supplied.
b.Reset() b.Reset()
b.Add(labels.MetricName, name) b.Add(labels.MetricName, name)
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
for _, l := range extraLabels { for _, l := range extraLabels {
b.Add(l.Name, l.Value) b.Add(l.Name, l.Value)
} }
@ -600,6 +604,37 @@ func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Labe
return series return series
} }
func createReducedTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) ([]prompb.ReducedTimeSeries, *lookupPool) {
pool := newLookupPool()
series := make([]prompb.ReducedTimeSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
sample := prompb.Sample{
Value: float64(i),
Timestamp: int64(i),
}
nRef := pool.intern("__name__")
vRef := pool.intern(name)
l := []prompb.LabelRef{{NameRef: nRef, ValueRef: vRef}}
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
for i, v := range extraLabels {
if i > 2 {
break
}
nRef := pool.intern(v.Name)
vRef := pool.intern(v.Value)
l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
}
series = append(series, prompb.ReducedTimeSeries{
Labels: l,
Samples: []prompb.Sample{sample},
})
}
return series, pool
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars) exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries) series := make([]record.RefSeries, 0, numSeries)
@ -779,6 +814,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
return return
} }
c.wg.Wait() c.wg.Wait()
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples { for ts, expectedSamples := range c.expectedSamples {
@ -808,14 +844,17 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
return err return err
} }
var reqProto prompb.WriteRequest var reqProto prompb.WriteRequestWithRefs
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
return err return err
} }
count := 0 count := 0
for _, ts := range reqProto.Timeseries { for _, ts := range reqProto.Timeseries {
labels := labelProtosToLabels(ts.Labels) tsLabels := labels.Labels{}
seriesName := labels.Get("__name__") for _, l := range ts.Labels {
tsLabels = append(tsLabels, labels.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]})
}
seriesName := tsLabels.Get("__name__")
for _, sample := range ts.Samples { for _, sample := range ts.Samples {
count++ count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
@ -823,7 +862,15 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
for _, ex := range ts.Exemplars { for _, ex := range ts.Exemplars {
count++ count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) e := prompb.Exemplar{}
e.Timestamp = ex.Timestamp
e.Value = ex.Value
eLabels := make([]prompb.Label, len(ex.Labels))
for i, l := range ex.Labels {
eLabels[i] = prompb.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]}
}
e.Labels = eLabels
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], e)
} }
for _, histogram := range ts.Histograms { for _, histogram := range ts.Histograms {
@ -931,7 +978,7 @@ func BenchmarkSampleSend(b *testing.B) {
dir := b.TempDir() dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -977,7 +1024,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false) cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()
@ -1060,7 +1107,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
// Need to start the queue manager so the proper metrics are initialized. // Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual // However we can stop it right away since we don't need to do any actual
@ -1137,7 +1184,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for _, tc := range []struct { for _, tc := range []struct {
name string name string
@ -1383,3 +1430,36 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
b.StopTimer() b.StopTimer()
} }
func BenchmarkBuildReducedWriteRequest(b *testing.B) {
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
extraLabels := labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
series, pool := createReducedTimeseriesProto(1, 10000, extraLabels...)
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
buf, _, _ := buildReducedWriteRequest(series, pool.getTable(), nil, nil)
totalSize += len(buf)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
// Do not include shutdown
b.StopTimer()
}

View file

@ -91,7 +91,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs, RemoteReadConfigs: tc.cfgs,

View file

@ -62,7 +62,7 @@ type Storage struct {
} }
// NewStorage returns a remote.Storage. // NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage { func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *Storage {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger, logger: logger,
localStartTimeCallback: stCallback, localStartTimeCallback: stCallback,
} }
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm) s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, writeReducedProto)
return s return s
} }

View file

@ -27,7 +27,7 @@ import (
func TestStorageLifecycle(t *testing.T) { func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{ RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -54,7 +54,7 @@ func TestStorageLifecycle(t *testing.T) {
func TestUpdateRemoteReadConfigs(t *testing.T) { func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{}, GlobalConfig: config.GlobalConfig{},
@ -75,7 +75,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
func TestFilterExternalLabels(t *testing.T) { func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{ GlobalConfig: config.GlobalConfig{
@ -100,7 +100,7 @@ func TestFilterExternalLabels(t *testing.T) {
func TestIgnoreExternalLabels(t *testing.T) { func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{ GlobalConfig: config.GlobalConfig{

View file

@ -65,6 +65,7 @@ type WriteStorage struct {
externalLabels labels.Labels externalLabels labels.Labels
dir string dir string
queues map[string]*QueueManager queues map[string]*QueueManager
writeReducedProto bool
samplesIn *ewmaRate samplesIn *ewmaRate
flushDeadline time.Duration flushDeadline time.Duration
interner *pool interner *pool
@ -76,12 +77,13 @@ type WriteStorage struct {
} }
// NewWriteStorage creates and runs a WriteStorage. // NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage { func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *WriteStorage {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
rws := &WriteStorage{ rws := &WriteStorage{
queues: make(map[string]*QueueManager), queues: make(map[string]*QueueManager),
writeReducedProto: writeReducedProto,
watcherMetrics: wlog.NewWatcherMetrics(reg), watcherMetrics: wlog.NewWatcherMetrics(reg),
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
logger: logger, logger: logger,
@ -153,7 +155,12 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
name = rwConf.Name name = rwConf.Name
} }
c, err := NewWriteClient(name, &ClientConfig{ var c WriteClient
if rwConf.URL.String() == "fake" {
// f := "fake" + strconv.Itoa(rand.Intn(100))
// c = NewTestClient(f, f)
} else {
c, err = NewWriteClient(name, &ClientConfig{
URL: rwConf.URL, URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout, Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig, HTTPClientConfig: rwConf.HTTPClientConfig,
@ -165,6 +172,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
if err != nil { if err != nil {
return err return err
} }
}
queue, ok := rws.queues[hash] queue, ok := rws.queues[hash]
if externalLabelUnchanged && ok { if externalLabelUnchanged && ok {
@ -197,6 +205,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.scraper, rws.scraper,
rwConf.SendExemplars, rwConf.SendExemplars,
rwConf.SendNativeHistograms, rwConf.SendNativeHistograms,
rws.writeReducedProto,
) )
// Keep track of which queues are new so we know which to start. // Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash) newHashes = append(newHashes, hash)

View file

@ -117,7 +117,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs, RemoteWriteConfigs: tc.cfgs,
@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) {
hash, err := toHash(cfg) hash, err := toHash(cfg)
require.NoError(t, err) require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
@ -164,7 +164,7 @@ func TestRestartOnNameChange(t *testing.T) {
func TestUpdateWithRegisterer(t *testing.T) { func TestUpdateWithRegisterer(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil) s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
c1 := &config.RemoteWriteConfig{ c1 := &config.RemoteWriteConfig{
Name: "named", Name: "named",
URL: &common_config.URL{ URL: &common_config.URL{
@ -204,7 +204,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
func TestWriteStorageLifecycle(t *testing.T) { func TestWriteStorageLifecycle(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{ RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -221,7 +221,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
func TestUpdateExternalLabels(t *testing.T) { func TestUpdateExternalLabels(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil) s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
externalLabels := labels.FromStrings("external", "true") externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{ conf := &config.Config{
@ -250,7 +250,7 @@ func TestUpdateExternalLabels(t *testing.T) {
func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{}, GlobalConfig: config.GlobalConfig{},
@ -276,7 +276,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
c0 := &config.RemoteWriteConfig{ c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second), RemoteTimeout: model.Duration(10 * time.Second),

View file

@ -88,7 +88,7 @@ func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *
t.Helper() t.Helper()
dbDir := t.TempDir() dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil) rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, rs.Close()) require.NoError(t, rs.Close())
}) })
@ -584,7 +584,7 @@ func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil) rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, rs.Close()) require.NoError(t, rs.Close())
}) })
@ -604,7 +604,7 @@ func TestLockfile(t *testing.T) {
func Test_ExistingWAL_NextRef(t *testing.T) { func Test_ExistingWAL_NextRef(t *testing.T) {
dbDir := t.TempDir() dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil) rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
defer func() { defer func() {
require.NoError(t, rs.Close()) require.NoError(t, rs.Close())
}() }()

View file

@ -461,7 +461,7 @@ func TestEndpoints(t *testing.T) {
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil return 0, nil
}, dbDir, 1*time.Second, nil) }, dbDir, 1*time.Second, nil, false)
err = remote.ApplyConfig(&config.Config{ err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{ RemoteReadConfigs: []*config.RemoteReadConfig{