From 48e39068bdf699501d304d26305a767e87a16c34 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 25 Jan 2018 11:11:55 +0000 Subject: [PATCH 1/4] Don't allocate a mergeSeries if there is only one series to merge. --- storage/fanout.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage/fanout.go b/storage/fanout.go index ae74ad2ca..dd33999eb 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -340,6 +340,9 @@ func (c *mergeSeriesSet) Next() bool { } func (c *mergeSeriesSet) At() Series { + if len(c.currentSets) == 1 { + return c.currentSets[0].At() + } series := []Series{} for _, seriesSet := range c.currentSets { series = append(series, seriesSet.At()) From 749781edf364da5245e71df95e4e878b0220fbb3 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 25 Jan 2018 11:17:16 +0000 Subject: [PATCH 2/4] Also, don't make a mergeSeriesSet if there is only one SeriesSet. --- storage/fanout.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/storage/fanout.go b/storage/fanout.go index dd33999eb..40be2536d 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -303,6 +303,10 @@ type mergeSeriesSet struct { // NewMergeSeriesSet returns a new series set that merges (deduplicates) // series returned by the input series sets when iterating. func NewMergeSeriesSet(sets []SeriesSet) SeriesSet { + if len(sets) == 1 { + return sets[0] + } + // Sets need to be pre-advanced, so we can introspect the label of the // series under the cursor. var h seriesSetHeap From da29c09dca55bb255e3df6027b958e4bbe386dfc Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 26 Jan 2018 11:01:59 +0000 Subject: [PATCH 3/4] Some benchmarks for the mergeSeries set. --- storage/buffer_test.go | 11 ++++++++ storage/fanout_test.go | 62 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/storage/buffer_test.go b/storage/buffer_test.go index a03081ab6..2a1b87248 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -201,6 +201,17 @@ type mockSeries struct { iterator func() SeriesIterator } +func newMockSeries(lset labels.Labels, samples []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { + return lset + }, + iterator: func() SeriesIterator { + return newListSeriesIterator(samples) + }, + } +} + func (m *mockSeries) Labels() labels.Labels { return m.labels() } func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index c8094ea33..4e3f5b329 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -14,6 +14,7 @@ package storage import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -218,13 +219,58 @@ func (m *mockSeriesSet) Err() error { return nil } -func newMockSeries(lset labels.Labels, samples []sample) Series { - return &mockSeries{ - labels: func() labels.Labels { - return lset - }, - iterator: func() SeriesIterator { - return newListSeriesIterator(samples) - }, +var result []sample + +func makeSeriesSet(numSeries, numSamples int) SeriesSet { + series := []Series{} + for j := 0; j < numSeries; j++ { + labels := labels.Labels{{Name: "foo", Value: fmt.Sprintf("bar%d", j)}} + samples := []sample{} + for k := 0; k < numSamples; k++ { + samples = append(samples, sample{t: int64(k), v: float64(k)}) + } + series = append(series, newMockSeries(labels, samples)) + } + return newMockSeriesSet(series...) +} + +func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet { + seriesSets := []SeriesSet{} + for i := 0; i < numSeriesSets; i++ { + seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples)) + } + return NewMergeSeriesSet(seriesSets) +} + +func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { + for n := 0; n < b.N; n++ { + for seriesSet.Next() { + result = drainSamples(seriesSet.At().Iterator()) + } } } + +func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) { + seriesSet := makeSeriesSet(100, 100) + benchmarkDrain(seriesSet, b) +} + +func BenchmarkMergeSeriesSet_1_100_100(b *testing.B) { + seriesSet := makeMergeSeriesSet(1, 100, 100) + benchmarkDrain(seriesSet, b) +} + +func BenchmarkMergeSeriesSet_10_100_100(b *testing.B) { + seriesSet := makeMergeSeriesSet(10, 100, 100) + benchmarkDrain(seriesSet, b) +} + +func BenchmarkMergeSeriesSet_100_100_100(b *testing.B) { + seriesSet := makeMergeSeriesSet(100, 100, 100) + benchmarkDrain(seriesSet, b) +} + +func BenchmarkMergeSeriesSet_1000_100_100(b *testing.B) { + seriesSet := makeMergeSeriesSet(100, 100, 100) + benchmarkDrain(seriesSet, b) +} From 3dc5b8eef542a066f78e37ebaf3416ba6778926d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 29 Jan 2018 11:37:48 +0000 Subject: [PATCH 4/4] Use sub benchmarks. --- storage/fanout_test.go | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 4e3f5b329..66d86fc93 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -255,22 +255,17 @@ func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) { benchmarkDrain(seriesSet, b) } -func BenchmarkMergeSeriesSet_1_100_100(b *testing.B) { - seriesSet := makeMergeSeriesSet(1, 100, 100) - benchmarkDrain(seriesSet, b) -} - -func BenchmarkMergeSeriesSet_10_100_100(b *testing.B) { - seriesSet := makeMergeSeriesSet(10, 100, 100) - benchmarkDrain(seriesSet, b) -} - -func BenchmarkMergeSeriesSet_100_100_100(b *testing.B) { - seriesSet := makeMergeSeriesSet(100, 100, 100) - benchmarkDrain(seriesSet, b) -} - -func BenchmarkMergeSeriesSet_1000_100_100(b *testing.B) { - seriesSet := makeMergeSeriesSet(100, 100, 100) - benchmarkDrain(seriesSet, b) +func BenchmarkMergeSeriesSet(b *testing.B) { + for _, bm := range []struct { + numSeriesSets, numSeries, numSamples int + }{ + {1, 100, 100}, + {10, 100, 100}, + {100, 100, 100}, + } { + seriesSet := makeMergeSeriesSet(bm.numSeriesSets, bm.numSeries, bm.numSamples) + b.Run(fmt.Sprintf("%d_%d_%d", bm.numSeriesSets, bm.numSeries, bm.numSamples), func(b *testing.B) { + benchmarkDrain(seriesSet, b) + }) + } }