diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 1e1b5f03c..40a74fe58 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -166,7 +166,6 @@ type QueueManager struct { client StorageClient watcher *WALWatcher - seriesMtx sync.Mutex seriesLabels map[uint64][]prompb.Label seriesSegmentIndexes map[uint64]int droppedSeries map[uint64]struct{} @@ -231,16 +230,10 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(s []tsdb.RefSample) bool { - type enqueuable struct { - ts prompb.TimeSeries - ref uint64 - } - - tempSamples := make([]enqueuable, 0, len(s)) - t.seriesMtx.Lock() +outer: for _, sample := range s { - // If we have no labels for the series, due to relabelling or otherwise, don't send the sample. - if _, ok := t.seriesLabels[sample.Ref]; !ok { + lbls, ok := t.seriesLabels[sample.Ref] + if !ok { t.droppedSamplesTotal.Inc() t.samplesDropped.incr(1) if _, ok := t.droppedSeries[sample.Ref]; !ok { @@ -248,23 +241,6 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool { } continue } - tempSamples = append(tempSamples, enqueuable{ - ts: prompb.TimeSeries{ - Labels: t.seriesLabels[sample.Ref], - Samples: []prompb.Sample{ - prompb.Sample{ - Value: float64(sample.V), - Timestamp: sample.T, - }, - }, - }, - ref: sample.Ref, - }) - } - t.seriesMtx.Unlock() - -outer: - for _, sample := range tempSamples { // This will only loop if the queues are being resharded. backoff := t.cfg.MinBackoff for { @@ -274,7 +250,16 @@ outer: default: } - if t.shards.enqueue(sample.ref, sample.ts) { + ts := prompb.TimeSeries{ + Labels: lbls, + Samples: []prompb.Sample{ + prompb.Sample{ + Value: float64(sample.V), + Timestamp: sample.T, + }, + }, + } + if t.shards.enqueue(sample.Ref, ts) { continue outer } @@ -336,8 +321,6 @@ func (t *QueueManager) Stop() { t.watcher.Stop() // On shutdown, release the strings in the labels from the intern pool. - t.seriesMtx.Lock() - defer t.seriesMtx.Unlock() for _, labels := range t.seriesLabels { release(labels) } @@ -357,11 +340,6 @@ func (t *QueueManager) Stop() { // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { - // Lock before any calls to labelsToLabels proto, as that's where string interning is done. - t.seriesMtx.Lock() - defer t.seriesMtx.Unlock() - - temp := make(map[uint64][]prompb.Label, len(series)) for _, s := range series { ls := processExternalLabels(s.Labels, t.externalLabels) rl := relabel.Process(ls, t.relabelConfigs...) @@ -369,19 +347,16 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { t.droppedSeries[s.Ref] = struct{}{} continue } - temp[s.Ref] = labelsToLabelsProto(rl) - } - - for ref, labels := range temp { - t.seriesSegmentIndexes[ref] = index + t.seriesSegmentIndexes[s.Ref] = index + labels := labelsToLabelsProto(rl) // We should not ever be replacing a series labels in the map, but just // in case we do we need to ensure we do not leak the replaced interned // strings. - if orig, ok := t.seriesLabels[ref]; ok { + if orig, ok := t.seriesLabels[s.Ref]; ok { release(orig) } - t.seriesLabels[ref] = labels + t.seriesLabels[s.Ref] = labels } } @@ -389,9 +364,6 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { // stored series records with the checkpoints index number, so we can now // delete any ref ID's lower than that # from the two maps. func (t *QueueManager) SeriesReset(index int) { - t.seriesMtx.Lock() - defer t.seriesMtx.Unlock() - // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. for k, v := range t.seriesSegmentIndexes { @@ -661,9 +633,10 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline // anyways. - pendingSamples := []prompb.TimeSeries{} - max := s.qm.cfg.MaxSamplesPerSend + pendingSamples := make([]prompb.TimeSeries, 0, max) + var buf []byte + timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { @@ -684,7 +657,7 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri if !ok { if len(pendingSamples) > 0 { level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", len(pendingSamples)) - s.sendSamples(ctx, pendingSamples) + s.sendSamples(ctx, pendingSamples, &buf) s.qm.pendingSamplesMetric.Sub(float64(len(pendingSamples))) level.Debug(s.qm.logger).Log("msg", "Done flushing.") } @@ -698,8 +671,8 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri s.qm.pendingSamplesMetric.Inc() if len(pendingSamples) >= max { - s.sendSamples(ctx, pendingSamples[:max]) - pendingSamples = pendingSamples[max:] + s.sendSamples(ctx, pendingSamples[:max], &buf) + pendingSamples = append(pendingSamples[:0], pendingSamples[max:]...) s.qm.pendingSamplesMetric.Sub(float64(max)) stop() @@ -710,7 +683,7 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri if len(pendingSamples) > 0 { level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", len(pendingSamples), "shard", shardNum) n := len(pendingSamples) - s.sendSamples(ctx, pendingSamples) + s.sendSamples(ctx, pendingSamples, &buf) pendingSamples = pendingSamples[:0] s.qm.pendingSamplesMetric.Sub(float64(n)) } @@ -719,9 +692,9 @@ func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeri } } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples) + err := s.sendSamplesWithBackoff(ctx, samples, buf) if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) s.qm.failedSamplesTotal.Add(float64(len(samples))) @@ -734,9 +707,10 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) { } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error { backoff := s.qm.cfg.MinBackoff - req, highest, err := buildWriteRequest(samples) + req, highest, err := buildWriteRequest(samples, *buf) + *buf = req if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. @@ -774,7 +748,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } } -func buildWriteRequest(samples []prompb.TimeSeries) ([]byte, int64, error) { +func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample in it. @@ -791,6 +765,11 @@ func buildWriteRequest(samples []prompb.TimeSeries) ([]byte, int64, error) { return nil, highest, err } - compressed := snappy.Encode(nil, data) + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + buf = buf[0:cap(buf)] + } + compressed := snappy.Encode(buf, data) return compressed, highest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index f79bfb675..ba71084da 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -260,18 +260,8 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { c := NewTestStorageClient() - var m *QueueManager - h := sync.Mutex{} - - h.Lock() - - m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() - go func() { - for { - m.SeriesReset(1) - } - }() for i := 1; i < 1000; i++ { m.StoreSeries([]tsdb.RefSeries{ @@ -285,6 +275,7 @@ func TestReleaseNoninternedString(t *testing.T) { }, }, }, 0) + m.SeriesReset(1) } metric := client_testutil.ToFloat64(noReferenceReleases) @@ -323,6 +314,7 @@ type TestStorageClient struct { expectedSamples map[string][]prompb.Sample wg sync.WaitGroup mtx sync.Mutex + buf []byte } func NewTestStorageClient() *TestStorageClient { @@ -349,21 +341,36 @@ func (c *TestStorageClient) expectSamples(ss []tsdb.RefSample, series []tsdb.Ref c.wg.Add(len(ss)) } -func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { +func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { c.wg.Wait() c.mtx.Lock() defer c.mtx.Unlock() for ts, expectedSamples := range c.expectedSamples { if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) { - t.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts]) + tb.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts]) } } } +func (c *TestStorageClient) expectSampleCount(ss []tsdb.RefSample) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.wg.Add(len(ss)) +} + +func (c *TestStorageClient) waitForExpectedSampleCount() { + c.wg.Wait() +} + func (c *TestStorageClient) Store(_ context.Context, req []byte) error { c.mtx.Lock() defer c.mtx.Unlock() - reqBuf, err := snappy.Decode(nil, req) + // nil buffers are ok for snappy, ignore cast error. + if c.buf != nil { + c.buf = c.buf[:cap(c.buf)] + } + reqBuf, err := snappy.Decode(c.buf, req) + c.buf = reqBuf if err != nil { return err } @@ -421,6 +428,39 @@ func (c *TestBlockingStorageClient) Name() string { return "testblockingstorageclient" } +func BenchmarkSampleDelivery(b *testing.B) { + // Let's create an even number of send batches so we don't run into the + // batch timeout case. + n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 + samples, series := createTimeseries(n) + + c := NewTestStorageClient() + + cfg := config.DefaultQueueConfig + cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) + cfg.MaxShards = 1 + + dir, err := ioutil.TempDir("", "BenchmarkSampleDelivery") + testutil.Ok(b, err) + defer os.RemoveAll(dir) + + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m.StoreSeries(series, 0) + + // These should be received by the client. + m.Start() + defer m.Stop() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.expectSampleCount(samples) + m.Append(samples) + c.waitForExpectedSampleCount() + } + // Do not include shutdown + b.StopTimer() +} + func BenchmarkStartup(b *testing.B) { dir := os.Getenv("WALDIR") if dir == "" { diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 3dba373b4..39f73ba76 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -418,6 +418,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e dec tsdb.RecordDecoder series []tsdb.RefSeries samples []tsdb.RefSample + send []tsdb.RefSample ) for r.Next() && !isClosed(w.quit) { @@ -444,7 +445,6 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e w.recordDecodeFailsMetric.Inc() return err } - var send []tsdb.RefSample for _, s := range samples { if s.T > w.startTime { send = append(send, s) @@ -453,6 +453,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) e if len(send) > 0 { // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). w.writer.Append(send) + send = send[:0] } case tsdb.RecordTombstones: