mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Implement code paths for new proto format
Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
441a080e8a
commit
4923ff95b3
|
@ -146,6 +146,7 @@ type flagConfig struct {
|
|||
queryConcurrency int
|
||||
queryMaxSamples int
|
||||
RemoteFlushDeadline model.Duration
|
||||
rwProto bool
|
||||
|
||||
featureList []string
|
||||
// These options are extracted from featureList
|
||||
|
@ -210,6 +211,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
|||
continue
|
||||
case "promql-at-modifier", "promql-negative-offset":
|
||||
level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o)
|
||||
case "reduced-rw-proto":
|
||||
c.rwProto = true
|
||||
level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.")
|
||||
default:
|
||||
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
|
||||
}
|
||||
|
@ -595,7 +599,7 @@ func main() {
|
|||
var (
|
||||
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
|
||||
scraper = &readyScrapeManager{}
|
||||
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
|
||||
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.rwProto)
|
||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
|
||||
|
|
|
@ -403,6 +403,8 @@ type QueueManager struct {
|
|||
sendNativeHistograms bool
|
||||
watcher *wlog.Watcher
|
||||
metadataWatcher *MetadataWatcher
|
||||
// experimental feature, new remote write proto format
|
||||
internFormat bool
|
||||
|
||||
clientMtx sync.RWMutex
|
||||
storeClient WriteClient
|
||||
|
@ -450,6 +452,7 @@ func NewQueueManager(
|
|||
sm ReadyScrapeManager,
|
||||
enableExemplarRemoteWrite bool,
|
||||
enableNativeHistogramRemoteWrite bool,
|
||||
internFormat bool,
|
||||
) *QueueManager {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
|
@ -472,6 +475,7 @@ func NewQueueManager(
|
|||
storeClient: client,
|
||||
sendExemplars: enableExemplarRemoteWrite,
|
||||
sendNativeHistograms: enableNativeHistogramRemoteWrite,
|
||||
internFormat: internFormat,
|
||||
|
||||
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
|
||||
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
|
||||
|
@ -771,6 +775,7 @@ outer:
|
|||
// Start the queue manager sending samples to the remote storage.
|
||||
// Does not block.
|
||||
func (t *QueueManager) Start() {
|
||||
// panic(1)
|
||||
// Register and initialise some metrics.
|
||||
t.metrics.register()
|
||||
t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
|
||||
|
@ -1169,7 +1174,6 @@ func (s *shards) stop() {
|
|||
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
shard := uint64(ref) % uint64(len(s.queues))
|
||||
select {
|
||||
case <-s.softShutdown:
|
||||
|
@ -1343,6 +1347,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
}()
|
||||
|
||||
shardNum := strconv.Itoa(shardID)
|
||||
pool := newLookupPool()
|
||||
|
||||
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
|
||||
// If we have fewer samples than that, flush them out after a deadline anyways.
|
||||
|
@ -1365,6 +1370,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
}
|
||||
}
|
||||
|
||||
pendingReducedData := make([]prompb.ReducedTimeSeries, max)
|
||||
for i := range pendingReducedData {
|
||||
pendingReducedData[i].Samples = []prompb.Sample{{}}
|
||||
if s.qm.sendExemplars {
|
||||
pendingReducedData[i].Exemplars = []prompb.ExemplarRef{{}}
|
||||
}
|
||||
}
|
||||
|
||||
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||
stop := func() {
|
||||
if !timer.Stop() {
|
||||
|
@ -1399,10 +1412,17 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
|
||||
if s.qm.internFormat {
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||
pool.clear()
|
||||
} else {
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||
}
|
||||
queue.ReturnForReuse(batch)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||
|
||||
stop()
|
||||
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||
|
@ -1410,11 +1430,21 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
case <-timer.C:
|
||||
batch := queue.Batch()
|
||||
if len(batch) > 0 {
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
||||
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
|
||||
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||
if s.qm.internFormat {
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
||||
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
|
||||
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||
pool.clear()
|
||||
|
||||
} else {
|
||||
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
|
||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
||||
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
|
||||
}
|
||||
}
|
||||
queue.ReturnForReuse(batch)
|
||||
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||
|
@ -1556,6 +1586,149 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries) (int, int, int) {
|
||||
var nPendingSamples, nPendingExemplars, nPendingHistograms int
|
||||
for nPending, d := range batch {
|
||||
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
||||
if s.qm.sendExemplars {
|
||||
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
|
||||
}
|
||||
if s.qm.sendNativeHistograms {
|
||||
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
|
||||
}
|
||||
|
||||
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
||||
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
||||
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
||||
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||
pendingData[nPending].Labels = make([]prompb.LabelRef, len(d.seriesLabels))
|
||||
for i, sl := range d.seriesLabels {
|
||||
nRef := pool.intern(sl.Name)
|
||||
vRef := pool.intern(sl.Value)
|
||||
pendingData[nPending].Labels[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
|
||||
}
|
||||
switch d.sType {
|
||||
case tSample:
|
||||
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
|
||||
Value: d.value,
|
||||
Timestamp: d.timestamp,
|
||||
})
|
||||
nPendingSamples++
|
||||
case tExemplar:
|
||||
l := make([]prompb.LabelRef, len(d.exemplarLabels))
|
||||
for i, el := range d.exemplarLabels {
|
||||
nRef := pool.intern(el.Name)
|
||||
vRef := pool.intern(el.Value)
|
||||
l[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
|
||||
}
|
||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
|
||||
Labels: l,
|
||||
Value: d.value,
|
||||
Timestamp: d.timestamp,
|
||||
})
|
||||
nPendingExemplars++
|
||||
case tHistogram:
|
||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
|
||||
nPendingHistograms++
|
||||
}
|
||||
}
|
||||
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
||||
}
|
||||
|
||||
func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
||||
begin := time.Now()
|
||||
err := s.sendReducedSamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, pBuf, buf)
|
||||
if err != nil {
|
||||
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
|
||||
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
|
||||
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
|
||||
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
|
||||
}
|
||||
|
||||
// These counters are used to calculate the dynamic sharding, and as such
|
||||
// should be maintained irrespective of success or failure.
|
||||
s.qm.dataOut.incr(int64(len(samples)))
|
||||
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
|
||||
s.qm.lastSendTimestamp.Store(time.Now().Unix())
|
||||
// Pending samples/exemplars/histograms also should be subtracted as an error means
|
||||
// they will not be retried.
|
||||
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
|
||||
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
|
||||
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
|
||||
s.enqueuedSamples.Sub(int64(sampleCount))
|
||||
s.enqueuedExemplars.Sub(int64(exemplarCount))
|
||||
s.enqueuedHistograms.Sub(int64(histogramCount))
|
||||
}
|
||||
|
||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||
func (s *shards) sendReducedSamplesWithBackoff(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
|
||||
// Build the WriteRequest with no metadata.
|
||||
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf)
|
||||
if err != nil {
|
||||
// Failing to build the write request is non-recoverable, since it will
|
||||
// only error if marshaling the proto to bytes fails.
|
||||
return err
|
||||
}
|
||||
|
||||
reqSize := len(req)
|
||||
*buf = req
|
||||
|
||||
// An anonymous function allows us to defer the completion of our per-try spans
|
||||
// without causing a memory leak, and it has the nice effect of not propagating any
|
||||
// parameters for sendSamplesWithBackoff/3.
|
||||
attemptStore := func(try int) error {
|
||||
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(
|
||||
attribute.Int("request_size", reqSize),
|
||||
attribute.Int("samples", sampleCount),
|
||||
attribute.Int("try", try),
|
||||
attribute.String("remote_name", s.qm.storeClient.Name()),
|
||||
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
|
||||
)
|
||||
|
||||
if exemplarCount > 0 {
|
||||
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
|
||||
}
|
||||
if histogramCount > 0 {
|
||||
span.SetAttributes(attribute.Int("histograms", histogramCount))
|
||||
}
|
||||
|
||||
begin := time.Now()
|
||||
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
|
||||
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
||||
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
||||
err := s.qm.client().Store(ctx, *buf, try)
|
||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
onRetry := func() {
|
||||
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
|
||||
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
|
||||
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
|
||||
}
|
||||
|
||||
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
|
||||
// So we exit early to not update the metrics.
|
||||
return err
|
||||
}
|
||||
|
||||
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
|
||||
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
|
||||
backoff := cfg.MinBackoff
|
||||
sleepDuration := model.Duration(0)
|
||||
|
@ -1646,3 +1819,44 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
|
|||
compressed := reSnappy.Encode(buf, pBuf.Bytes())
|
||||
return compressed, highest, nil
|
||||
}
|
||||
|
||||
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
|
||||
var highest int64
|
||||
for _, ts := range samples {
|
||||
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
|
||||
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
|
||||
highest = ts.Samples[0].Timestamp
|
||||
}
|
||||
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
|
||||
highest = ts.Exemplars[0].Timestamp
|
||||
}
|
||||
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
|
||||
highest = ts.Histograms[0].Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
req := &prompb.WriteRequestWithRefs{
|
||||
StringSymbolTable: labels,
|
||||
Timeseries: samples,
|
||||
}
|
||||
|
||||
if pBuf == nil {
|
||||
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
|
||||
} else {
|
||||
pBuf.Reset()
|
||||
}
|
||||
err := pBuf.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// snappy uses len() to see if it needs to allocate a new slice. Make the
|
||||
// buffer as long as possible.
|
||||
if buf != nil {
|
||||
buf = buf[0:cap(buf)]
|
||||
}
|
||||
|
||||
compressed := reSnappy.Encode(buf, pBuf.Bytes())
|
||||
|
||||
return compressed, highest, nil
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ func TestSampleDelivery(t *testing.T) {
|
|||
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
defer s.Close()
|
||||
|
||||
queueConfig := config.DefaultQueueConfig
|
||||
|
@ -132,6 +132,7 @@ func TestSampleDelivery(t *testing.T) {
|
|||
hash, err := toHash(writeConfig)
|
||||
require.NoError(t, err)
|
||||
qm := s.rws.queues[hash]
|
||||
// time.Sleep(1 * time.Second)
|
||||
|
||||
c := NewTestWriteClient()
|
||||
qm.SetClient(c)
|
||||
|
@ -172,7 +173,7 @@ func TestMetadataDelivery(t *testing.T) {
|
|||
mcfg := config.DefaultMetadataConfig
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
|
@ -211,7 +212,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
@ -253,7 +254,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
|||
mcfg := config.DefaultMetadataConfig
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
m.Start()
|
||||
|
@ -273,7 +274,7 @@ func TestShutdown(t *testing.T) {
|
|||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
||||
samples, series := createTimeseries(n, n)
|
||||
m.StoreSeries(series, 0)
|
||||
|
@ -311,7 +312,7 @@ func TestSeriesReset(t *testing.T) {
|
|||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
for i := 0; i < numSegments; i++ {
|
||||
series := []record.RefSeries{}
|
||||
for j := 0; j < numSeries; j++ {
|
||||
|
@ -340,7 +341,7 @@ func TestReshard(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
m.Start()
|
||||
|
@ -376,7 +377,7 @@ func TestReshardRaceWithStop(*testing.T) {
|
|||
go func() {
|
||||
for {
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.Start()
|
||||
h.Unlock()
|
||||
h.Lock()
|
||||
|
@ -411,7 +412,7 @@ func TestReshardPartialBatch(t *testing.T) {
|
|||
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
m.Start()
|
||||
|
@ -456,7 +457,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
|||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
@ -483,7 +484,7 @@ func TestReleaseNoninternedString(t *testing.T) {
|
|||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
c := NewTestWriteClient()
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
|
@ -530,7 +531,7 @@ func TestShouldReshard(t *testing.T) {
|
|||
for _, c := range cases {
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
client := NewTestWriteClient()
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.numShards = c.startingShards
|
||||
m.dataIn.incr(c.samplesIn)
|
||||
m.dataOut.incr(c.samplesOut)
|
||||
|
@ -563,6 +564,9 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
|
|||
// Create Labels that is name of series plus any extra labels supplied.
|
||||
b.Reset()
|
||||
b.Add(labels.MetricName, name)
|
||||
rand.Shuffle(len(extraLabels), func(i, j int) {
|
||||
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
|
||||
})
|
||||
for _, l := range extraLabels {
|
||||
b.Add(l.Name, l.Value)
|
||||
}
|
||||
|
@ -600,6 +604,37 @@ func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Labe
|
|||
return series
|
||||
}
|
||||
|
||||
func createReducedTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) ([]prompb.ReducedTimeSeries, *lookupPool) {
|
||||
pool := newLookupPool()
|
||||
series := make([]prompb.ReducedTimeSeries, 0, numSeries)
|
||||
for i := 0; i < numSeries; i++ {
|
||||
name := fmt.Sprintf("test_metric_%d", i)
|
||||
sample := prompb.Sample{
|
||||
Value: float64(i),
|
||||
Timestamp: int64(i),
|
||||
}
|
||||
nRef := pool.intern("__name__")
|
||||
vRef := pool.intern(name)
|
||||
l := []prompb.LabelRef{{NameRef: nRef, ValueRef: vRef}}
|
||||
rand.Shuffle(len(extraLabels), func(i, j int) {
|
||||
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
|
||||
})
|
||||
for i, v := range extraLabels {
|
||||
if i > 2 {
|
||||
break
|
||||
}
|
||||
nRef := pool.intern(v.Name)
|
||||
vRef := pool.intern(v.Value)
|
||||
l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
|
||||
}
|
||||
series = append(series, prompb.ReducedTimeSeries{
|
||||
Labels: l,
|
||||
Samples: []prompb.Sample{sample},
|
||||
})
|
||||
}
|
||||
return series, pool
|
||||
}
|
||||
|
||||
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
|
||||
exemplars := make([]record.RefExemplar, 0, numExemplars)
|
||||
series := make([]record.RefSeries, 0, numSeries)
|
||||
|
@ -779,6 +814,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
|
|||
return
|
||||
}
|
||||
c.wg.Wait()
|
||||
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
for ts, expectedSamples := range c.expectedSamples {
|
||||
|
@ -808,14 +844,17 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
var reqProto prompb.WriteRequest
|
||||
var reqProto prompb.WriteRequestWithRefs
|
||||
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
|
||||
return err
|
||||
}
|
||||
count := 0
|
||||
for _, ts := range reqProto.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
seriesName := labels.Get("__name__")
|
||||
tsLabels := labels.Labels{}
|
||||
for _, l := range ts.Labels {
|
||||
tsLabels = append(tsLabels, labels.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]})
|
||||
}
|
||||
seriesName := tsLabels.Get("__name__")
|
||||
for _, sample := range ts.Samples {
|
||||
count++
|
||||
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
|
||||
|
@ -823,7 +862,15 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
|||
|
||||
for _, ex := range ts.Exemplars {
|
||||
count++
|
||||
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
|
||||
e := prompb.Exemplar{}
|
||||
e.Timestamp = ex.Timestamp
|
||||
e.Value = ex.Value
|
||||
eLabels := make([]prompb.Label, len(ex.Labels))
|
||||
for i, l := range ex.Labels {
|
||||
eLabels[i] = prompb.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]}
|
||||
}
|
||||
e.Labels = eLabels
|
||||
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], e)
|
||||
}
|
||||
|
||||
for _, histogram := range ts.Histograms {
|
||||
|
@ -931,7 +978,7 @@ func BenchmarkSampleSend(b *testing.B) {
|
|||
dir := b.TempDir()
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
// These should be received by the client.
|
||||
|
@ -977,7 +1024,7 @@ func BenchmarkStartup(b *testing.B) {
|
|||
c := NewTestBlockedWriteClient()
|
||||
m := NewQueueManager(metrics, nil, nil, logger, dir,
|
||||
newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
||||
m.watcher.MaxSegment = segments[len(segments)-2]
|
||||
err := m.watcher.Run()
|
||||
|
@ -1060,7 +1107,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
|||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
|
||||
// Need to start the queue manager so the proper metrics are initialized.
|
||||
// However we can stop it right away since we don't need to do any actual
|
||||
|
@ -1137,7 +1184,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
|
|||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
|
@ -1383,3 +1430,36 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
|
|||
b.StopTimer()
|
||||
}
|
||||
|
||||
func BenchmarkBuildReducedWriteRequest(b *testing.B) {
|
||||
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
||||
extraLabels := labels.Labels{
|
||||
{Name: "kubernetes_io_arch", Value: "amd64"},
|
||||
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
|
||||
{Name: "kubernetes_io_os", Value: "linux"},
|
||||
{Name: "container_name", Value: "some-name"},
|
||||
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
|
||||
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
|
||||
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
|
||||
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
|
||||
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
|
||||
{Name: "job", Value: "kubernetes-cadvisor"},
|
||||
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
|
||||
{Name: "monitor", Value: "prod"},
|
||||
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
|
||||
{Name: "namespace", Value: "kube-system"},
|
||||
{Name: "pod_name", Value: "some-other-name-5j8s8"},
|
||||
}
|
||||
series, pool := createReducedTimeseriesProto(1, 10000, extraLabels...)
|
||||
|
||||
b.ResetTimer()
|
||||
totalSize := 0
|
||||
for i := 0; i < b.N; i++ {
|
||||
buf, _, _ := buildReducedWriteRequest(series, pool.getTable(), nil, nil)
|
||||
totalSize += len(buf)
|
||||
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
|
||||
|
||||
}
|
||||
|
||||
// Do not include shutdown
|
||||
b.StopTimer()
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run("", func(t *testing.T) {
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteReadConfigs: tc.cfgs,
|
||||
|
|
|
@ -62,7 +62,7 @@ type Storage struct {
|
|||
}
|
||||
|
||||
// NewStorage returns a remote.Storage.
|
||||
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
|
||||
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *Storage {
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
|
|||
logger: logger,
|
||||
localStartTimeCallback: stCallback,
|
||||
}
|
||||
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
|
||||
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, writeReducedProto)
|
||||
return s
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import (
|
|||
func TestStorageLifecycle(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
@ -54,7 +54,7 @@ func TestStorageLifecycle(t *testing.T) {
|
|||
func TestUpdateRemoteReadConfigs(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
|
@ -75,7 +75,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
|
|||
func TestFilterExternalLabels(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
|
@ -100,7 +100,7 @@ func TestFilterExternalLabels(t *testing.T) {
|
|||
func TestIgnoreExternalLabels(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
|
|
|
@ -65,6 +65,7 @@ type WriteStorage struct {
|
|||
externalLabels labels.Labels
|
||||
dir string
|
||||
queues map[string]*QueueManager
|
||||
writeReducedProto bool
|
||||
samplesIn *ewmaRate
|
||||
flushDeadline time.Duration
|
||||
interner *pool
|
||||
|
@ -76,12 +77,13 @@ type WriteStorage struct {
|
|||
}
|
||||
|
||||
// NewWriteStorage creates and runs a WriteStorage.
|
||||
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage {
|
||||
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *WriteStorage {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
rws := &WriteStorage{
|
||||
queues: make(map[string]*QueueManager),
|
||||
writeReducedProto: writeReducedProto,
|
||||
watcherMetrics: wlog.NewWatcherMetrics(reg),
|
||||
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
|
||||
logger: logger,
|
||||
|
@ -153,17 +155,23 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
|||
name = rwConf.Name
|
||||
}
|
||||
|
||||
c, err := NewWriteClient(name, &ClientConfig{
|
||||
URL: rwConf.URL,
|
||||
Timeout: rwConf.RemoteTimeout,
|
||||
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||
SigV4Config: rwConf.SigV4Config,
|
||||
AzureADConfig: rwConf.AzureADConfig,
|
||||
Headers: rwConf.Headers,
|
||||
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
var c WriteClient
|
||||
if rwConf.URL.String() == "fake" {
|
||||
// f := "fake" + strconv.Itoa(rand.Intn(100))
|
||||
// c = NewTestClient(f, f)
|
||||
} else {
|
||||
c, err = NewWriteClient(name, &ClientConfig{
|
||||
URL: rwConf.URL,
|
||||
Timeout: rwConf.RemoteTimeout,
|
||||
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||
SigV4Config: rwConf.SigV4Config,
|
||||
AzureADConfig: rwConf.AzureADConfig,
|
||||
Headers: rwConf.Headers,
|
||||
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
queue, ok := rws.queues[hash]
|
||||
|
@ -197,6 +205,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
|||
rws.scraper,
|
||||
rwConf.SendExemplars,
|
||||
rwConf.SendNativeHistograms,
|
||||
rws.writeReducedProto,
|
||||
)
|
||||
// Keep track of which queues are new so we know which to start.
|
||||
newHashes = append(newHashes, hash)
|
||||
|
|
|
@ -117,7 +117,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: tc.cfgs,
|
||||
|
@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) {
|
|||
hash, err := toHash(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
|
@ -164,7 +164,7 @@ func TestRestartOnNameChange(t *testing.T) {
|
|||
func TestUpdateWithRegisterer(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
|
||||
c1 := &config.RemoteWriteConfig{
|
||||
Name: "named",
|
||||
URL: &common_config.URL{
|
||||
|
@ -204,7 +204,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
|
|||
func TestWriteStorageLifecycle(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
@ -221,7 +221,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
|
|||
func TestUpdateExternalLabels(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
|
||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
|
||||
|
||||
externalLabels := labels.FromStrings("external", "true")
|
||||
conf := &config.Config{
|
||||
|
@ -250,7 +250,7 @@ func TestUpdateExternalLabels(t *testing.T) {
|
|||
func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
|
@ -276,7 +276,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
|
|||
func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
||||
|
||||
c0 := &config.RemoteWriteConfig{
|
||||
RemoteTimeout: model.Duration(10 * time.Second),
|
||||
|
|
|
@ -88,7 +88,7 @@ func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *
|
|||
t.Helper()
|
||||
|
||||
dbDir := t.TempDir()
|
||||
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
|
||||
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
@ -584,7 +584,7 @@ func TestLockfile(t *testing.T) {
|
|||
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
|
||||
logger := log.NewNopLogger()
|
||||
reg := prometheus.NewRegistry()
|
||||
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
|
||||
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
@ -604,7 +604,7 @@ func TestLockfile(t *testing.T) {
|
|||
|
||||
func Test_ExistingWAL_NextRef(t *testing.T) {
|
||||
dbDir := t.TempDir()
|
||||
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil)
|
||||
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
|
||||
defer func() {
|
||||
require.NoError(t, rs.Close())
|
||||
}()
|
||||
|
|
|
@ -461,7 +461,7 @@ func TestEndpoints(t *testing.T) {
|
|||
|
||||
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
|
||||
return 0, nil
|
||||
}, dbDir, 1*time.Second, nil)
|
||||
}, dbDir, 1*time.Second, nil, false)
|
||||
|
||||
err = remote.ApplyConfig(&config.Config{
|
||||
RemoteReadConfigs: []*config.RemoteReadConfig{
|
||||
|
|
Loading…
Reference in a new issue