Implement code paths for new proto format

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-02-20 14:10:13 -08:00
parent 80ddf091d7
commit 30bf3b9f2b
10 changed files with 373 additions and 62 deletions

View file

@ -143,6 +143,7 @@ type flagConfig struct {
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration
rwProto bool
featureList []string
// These options are extracted from featureList
@ -202,6 +203,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
continue
case "promql-at-modifier", "promql-negative-offset":
level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o)
case "reduced-rw-proto":
c.rwProto = true
level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.")
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
}
@ -566,7 +570,7 @@ func main() {
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.rwProto)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

View file

@ -402,6 +402,8 @@ type QueueManager struct {
sendNativeHistograms bool
watcher *wlog.Watcher
metadataWatcher *MetadataWatcher
// experimental feature, new remote write proto format
internFormat bool
clientMtx sync.RWMutex
storeClient WriteClient
@ -449,6 +451,7 @@ func NewQueueManager(
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
internFormat bool,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -465,6 +468,7 @@ func NewQueueManager(
storeClient: client,
sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite,
internFormat: internFormat,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -713,6 +717,7 @@ outer:
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
// panic(1)
// Register and initialise some metrics.
t.metrics.register()
t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
@ -1117,7 +1122,6 @@ func (s *shards) stop() {
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
shard := uint64(ref) % uint64(len(s.queues))
select {
case <-s.softShutdown:
@ -1289,6 +1293,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}()
shardNum := strconv.Itoa(shardID)
pool := newLookupPool()
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways.
@ -1311,6 +1316,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}
pendingReducedData := make([]prompb.ReducedTimeSeries, max)
for i := range pendingReducedData {
pendingReducedData[i].Samples = []prompb.Sample{{}}
if s.qm.sendExemplars {
pendingReducedData[i].Exemplars = []prompb.ExemplarRef{{}}
}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
if !timer.Stop() {
@ -1345,10 +1358,17 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok {
return
}
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
if s.qm.internFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
pool.clear()
} else {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
}
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1356,10 +1376,19 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
if s.qm.internFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
pool.clear()
} else {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
}
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1498,6 +1527,149 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err
}
func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if s.qm.sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Labels = make([]prompb.LabelRef, len(d.seriesLabels))
for i, sl := range d.seriesLabels {
nRef := pool.intern(sl.Name)
vRef := pool.intern(sl.Value)
pendingData[nPending].Labels[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
}
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
l := make([]prompb.LabelRef, len(d.exemplarLabels))
for i, el := range d.exemplarLabels {
nRef := pool.intern(el.Name)
vRef := pool.intern(el.Value)
l[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
}
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
Labels: l,
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
err := s.sendReducedSamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, pBuf, buf)
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
}
// These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.dataOut.incr(int64(len(samples)))
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars/histograms also should be subtracted as an error means
// they will not be retried.
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount))
s.enqueuedHistograms.Sub(int64(histogramCount))
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendReducedSamplesWithBackoff(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
reqSize := len(req)
*buf = req
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func(try int) error {
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
err := s.qm.client().Store(ctx, *buf)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {
span.RecordError(err)
return err
}
return nil
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
}
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
return err
}
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return err
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff
sleepDuration := model.Duration(0)
@ -1587,3 +1759,44 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
compressed := reSnappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil
}
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.WriteRequestWithRefs{
StringSymbolTable: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf[0:cap(buf)]
}
compressed := reSnappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil
}

View file

@ -79,7 +79,7 @@ func TestSampleDelivery(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
queueConfig := config.DefaultQueueConfig
@ -126,6 +126,7 @@ func TestSampleDelivery(t *testing.T) {
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
// time.Sleep(1 * time.Second)
c := NewTestWriteClient()
qm.SetClient(c)
@ -162,7 +163,7 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start()
defer m.Stop()
@ -201,7 +202,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -243,7 +244,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
@ -263,7 +264,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -301,7 +302,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -330,7 +331,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
@ -366,7 +367,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start()
h.Unlock()
h.Lock()
@ -401,7 +402,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
@ -446,7 +447,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -473,7 +474,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.Start()
defer m.Stop()
@ -520,7 +521,7 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@ -549,10 +550,17 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
V: float64(i),
})
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...),
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
sr := record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.Labels{{Name: "__name__", Value: name}},
}
if len(extraLabels) != 0 {
sr.Labels = append(sr.Labels, extraLabels[:3]...)
}
series = append(series, sr)
}
return samples, series
}
@ -582,6 +590,37 @@ func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Labe
return series
}
func createReducedTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) ([]prompb.ReducedTimeSeries, *lookupPool) {
pool := newLookupPool()
series := make([]prompb.ReducedTimeSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
sample := prompb.Sample{
Value: float64(i),
Timestamp: int64(i),
}
nRef := pool.intern("__name__")
vRef := pool.intern(name)
l := []prompb.LabelRef{{NameRef: nRef, ValueRef: vRef}}
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
for i, v := range extraLabels {
if i > 2 {
break
}
nRef := pool.intern(v.Name)
vRef := pool.intern(v.Value)
l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
}
series = append(series, prompb.ReducedTimeSeries{
Labels: l,
Samples: []prompb.Sample{sample},
})
}
return series, pool
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries)
@ -727,6 +766,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
return
}
c.wg.Wait()
c.mtx.Lock()
defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples {
@ -753,14 +793,17 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
return err
}
var reqProto prompb.WriteRequest
var reqProto prompb.WriteRequestWithRefs
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
return err
}
count := 0
for _, ts := range reqProto.Timeseries {
labels := labelProtosToLabels(ts.Labels)
seriesName := labels.Get("__name__")
tsLabels := labels.Labels{}
for _, l := range ts.Labels {
tsLabels = append(tsLabels, labels.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]})
}
seriesName := tsLabels.Get("__name__")
for _, sample := range ts.Samples {
count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
@ -768,7 +811,15 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
for _, ex := range ts.Exemplars {
count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
e := prompb.Exemplar{}
e.Timestamp = ex.Timestamp
e.Value = ex.Value
eLabels := make([]prompb.Label, len(ex.Labels))
for i, l := range ex.Labels {
eLabels[i] = prompb.Label{Name: reqProto.StringSymbolTable[l.NameRef], Value: reqProto.StringSymbolTable[l.ValueRef]}
}
e.Labels = eLabels
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], e)
}
for _, histogram := range ts.Histograms {
@ -871,7 +922,7 @@ func BenchmarkSampleSend(b *testing.B) {
dir := b.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -917,7 +968,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false)
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
@ -993,7 +1044,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
@ -1070,7 +1121,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false)
for _, tc := range []struct {
name string
@ -1316,3 +1367,36 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
b.StopTimer()
}
func BenchmarkBuildReducedWriteRequest(b *testing.B) {
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
extraLabels := labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
series, pool := createReducedTimeseriesProto(1, 10000, extraLabels...)
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
buf, _, _ := buildReducedWriteRequest(series, pool.getTable(), nil, nil)
totalSize += len(buf)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
// Do not include shutdown
b.StopTimer()
}

View file

@ -91,7 +91,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,

View file

@ -62,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, writeReducedProto)
return s
}

View file

@ -27,7 +27,7 @@ import (
func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -54,7 +54,7 @@ func TestStorageLifecycle(t *testing.T) {
func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -75,7 +75,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -100,7 +100,7 @@ func TestFilterExternalLabels(t *testing.T) {
func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{

View file

@ -17,6 +17,8 @@ import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"time"
@ -65,6 +67,7 @@ type WriteStorage struct {
externalLabels labels.Labels
dir string
queues map[string]*QueueManager
writeReducedProto bool
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
@ -76,12 +79,13 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
rws := &WriteStorage{
queues: make(map[string]*QueueManager),
writeReducedProto: writeReducedProto,
watcherMetrics: wlog.NewWatcherMetrics(reg),
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
logger: logger,
@ -152,17 +156,22 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
if rwConf.Name != "" {
name = rwConf.Name
}
c, err := NewWriteClient(name, &ClientConfig{
URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig,
SigV4Config: rwConf.SigV4Config,
Headers: rwConf.Headers,
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
})
if err != nil {
return err
var c WriteClient
if rwConf.URL.String() == "fake" {
f := "fake" + strconv.Itoa(rand.Intn(100))
c = NewTestClient(f, f)
} else {
c, err = NewWriteClient(name, &ClientConfig{
URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig,
SigV4Config: rwConf.SigV4Config,
Headers: rwConf.Headers,
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
})
if err != nil {
return err
}
}
queue, ok := rws.queues[hash]
@ -196,6 +205,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.scraper,
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
rws.writeReducedProto,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

View file

@ -111,7 +111,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
}
for _, tc := range cases {
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -133,7 +133,7 @@ func TestRestartOnNameChange(t *testing.T) {
hash, err := toHash(cfg)
require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
@ -158,7 +158,7 @@ func TestRestartOnNameChange(t *testing.T) {
func TestUpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -198,7 +198,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
func TestWriteStorageLifecycle(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -215,7 +215,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
func TestUpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
@ -244,7 +244,7 @@ func TestUpdateExternalLabels(t *testing.T) {
func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -270,7 +270,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),

View file

@ -79,7 +79,7 @@ func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *
t.Helper()
dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@ -393,7 +393,7 @@ func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@ -413,7 +413,7 @@ func TestLockfile(t *testing.T) {
func Test_ExistingWAL_NextRef(t *testing.T) {
dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil)
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
defer func() {
require.NoError(t, rs.Close())
}()

View file

@ -414,7 +414,7 @@ func TestEndpoints(t *testing.T) {
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil
}, dbDir, 1*time.Second, nil)
}, dbDir, 1*time.Second, nil, false)
err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{