mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-25 12:42:47 -08:00
Include out of order samples in BenchmarkQueries (#286)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
ff0dc75758
commit
00b379c3a5
|
@ -27,6 +27,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
@ -511,6 +512,66 @@ func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir str
|
||||||
return head
|
return head
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createHeadWithOOOSamples(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head {
|
||||||
|
opts := DefaultHeadOptions()
|
||||||
|
opts.ChunkDirRoot = chunkDir
|
||||||
|
opts.OutOfOrderTimeWindow.Store(10000000000)
|
||||||
|
head, err := NewHead(nil, nil, w, nil, opts, nil)
|
||||||
|
require.NoError(tb, err)
|
||||||
|
|
||||||
|
oooSampleLabels := make([]labels.Labels, 0, len(series))
|
||||||
|
oooSamples := make([]tsdbutil.SampleSlice, 0, len(series))
|
||||||
|
|
||||||
|
totalSamples := 0
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
for _, s := range series {
|
||||||
|
ref := storage.SeriesRef(0)
|
||||||
|
it := s.Iterator()
|
||||||
|
lset := s.Labels()
|
||||||
|
os := tsdbutil.SampleSlice{}
|
||||||
|
count := 0
|
||||||
|
for it.Next() {
|
||||||
|
totalSamples++
|
||||||
|
count++
|
||||||
|
t, v := it.At()
|
||||||
|
if count%oooSampleFrequency == 0 {
|
||||||
|
os = append(os, sample{t: t, v: v})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ref, err = app.Append(ref, lset, t, v)
|
||||||
|
require.NoError(tb, err)
|
||||||
|
}
|
||||||
|
require.NoError(tb, it.Err())
|
||||||
|
if len(os) > 0 {
|
||||||
|
oooSampleLabels = append(oooSampleLabels, lset)
|
||||||
|
oooSamples = append(oooSamples, os)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(tb, app.Commit())
|
||||||
|
|
||||||
|
oooSamplesAppended := 0
|
||||||
|
require.Equal(tb, float64(0), prom_testutil.ToFloat64(head.metrics.outOfOrderSamplesAppended))
|
||||||
|
|
||||||
|
app = head.Appender(context.Background())
|
||||||
|
for i, lset := range oooSampleLabels {
|
||||||
|
ref := storage.SeriesRef(0)
|
||||||
|
for _, sample := range oooSamples[i] {
|
||||||
|
ref, err = app.Append(ref, lset, sample.T(), sample.V())
|
||||||
|
require.NoError(tb, err)
|
||||||
|
oooSamplesAppended++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(tb, app.Commit())
|
||||||
|
|
||||||
|
actOOOAppended := prom_testutil.ToFloat64(head.metrics.outOfOrderSamplesAppended)
|
||||||
|
require.GreaterOrEqual(tb, actOOOAppended, float64(oooSamplesAppended-len(series)))
|
||||||
|
require.LessOrEqual(tb, actOOOAppended, float64(oooSamplesAppended))
|
||||||
|
|
||||||
|
require.Equal(tb, float64(totalSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended))
|
||||||
|
|
||||||
|
return head
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultLabelName = "labelName"
|
defaultLabelName = "labelName"
|
||||||
defaultLabelValue = "labelValue"
|
defaultLabelValue = "labelValue"
|
||||||
|
|
|
@ -1923,13 +1923,17 @@ func BenchmarkQueries(b *testing.B) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
queryTypes := make(map[string]storage.Querier)
|
type qt struct {
|
||||||
|
typ string
|
||||||
|
querier storage.Querier
|
||||||
|
}
|
||||||
|
var queryTypes []qt // We use a slice instead of map to keep the order of test cases consistent.
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, q := range queryTypes {
|
for _, q := range queryTypes {
|
||||||
// Can't run a check for error here as some of these will fail as
|
// Can't run a check for error here as some of these will fail as
|
||||||
// queryTypes is using the same slice for the different block queriers
|
// queryTypes is using the same slice for the different block queriers
|
||||||
// and would have been closed in the previous iteration.
|
// and would have been closed in the previous iteration.
|
||||||
q.Close()
|
q.querier.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -1970,21 +1974,38 @@ func BenchmarkQueries(b *testing.B) {
|
||||||
qs = append(qs, q)
|
qs = append(qs, q)
|
||||||
}
|
}
|
||||||
|
|
||||||
queryTypes["_1-Block"] = storage.NewMergeQuerier(qs[:1], nil, storage.ChainedSeriesMerge)
|
queryTypes = append(queryTypes, qt{"_1-Block", storage.NewMergeQuerier(qs[:1], nil, storage.ChainedSeriesMerge)})
|
||||||
queryTypes["_3-Blocks"] = storage.NewMergeQuerier(qs[0:3], nil, storage.ChainedSeriesMerge)
|
queryTypes = append(queryTypes, qt{"_3-Blocks", storage.NewMergeQuerier(qs[0:3], nil, storage.ChainedSeriesMerge)})
|
||||||
queryTypes["_10-Blocks"] = storage.NewMergeQuerier(qs, nil, storage.ChainedSeriesMerge)
|
queryTypes = append(queryTypes, qt{"_10-Blocks", storage.NewMergeQuerier(qs, nil, storage.ChainedSeriesMerge)})
|
||||||
|
|
||||||
chunkDir := b.TempDir()
|
chunkDir := b.TempDir()
|
||||||
head := createHead(b, nil, series, chunkDir)
|
head := createHead(b, nil, series, chunkDir)
|
||||||
qHead, err := NewBlockQuerier(head, 1, nSamples)
|
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
queryTypes["_Head"] = qHead
|
queryTypes = append(queryTypes, qt{"_Head", qHead})
|
||||||
|
|
||||||
for qtype, querier := range queryTypes {
|
for _, oooPercentage := range []int{1, 3, 5, 10} {
|
||||||
b.Run(title+qtype+"_nSeries:"+strconv.Itoa(nSeries)+"_nSamples:"+strconv.Itoa(int(nSamples)), func(b *testing.B) {
|
chunkDir := b.TempDir()
|
||||||
|
totalOOOSamples := oooPercentage * int(nSamples) / 100
|
||||||
|
oooSampleFrequency := int(nSamples) / totalOOOSamples
|
||||||
|
head := createHeadWithOOOSamples(b, nil, series, chunkDir, oooSampleFrequency)
|
||||||
|
|
||||||
|
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
|
||||||
|
require.NoError(b, err)
|
||||||
|
qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples), 1, nSamples)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
queryTypes = append(queryTypes, qt{
|
||||||
|
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage),
|
||||||
|
storage.NewMergeQuerier([]storage.Querier{qHead, qOOOHead}, nil, storage.ChainedSeriesMerge),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, q := range queryTypes {
|
||||||
|
b.Run(title+q.typ+"_nSeries:"+strconv.Itoa(nSeries)+"_nSamples:"+strconv.Itoa(int(nSamples)), func(b *testing.B) {
|
||||||
expExpansions, err := strconv.Atoi(string(title[len(title)-1]))
|
expExpansions, err := strconv.Atoi(string(title[len(title)-1]))
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
benchQuery(b, expExpansions, querier, selectors)
|
benchQuery(b, expExpansions, q.querier, selectors)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
require.NoError(b, head.Close())
|
require.NoError(b, head.Close())
|
||||||
|
@ -2004,6 +2025,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
|
||||||
s.Labels()
|
s.Labels()
|
||||||
it := s.Iterator()
|
it := s.Iterator()
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
|
_, _ = it.At()
|
||||||
}
|
}
|
||||||
actualExpansions++
|
actualExpansions++
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue