Attempt to append ooo sample at the end first (#11615)

This is an optimization on the existing append in OOOChunk.

What we've been doing so far is find the place inside the out-of-order
slice where the new sample should go in and then place it there and move
any samples to the right if necessary. This is OK but requires a binary
search every time the slice is bigger than 0.

The optimization is opinionated and suggests that although out-of-order
samples can be out-of-order amongst themselves they'll probably be in
order thus we can probably optimistically append at the end and if not
do the binary search.

OOOChunks are capped to 30 samples by default so this is a small
optimization but everything adds up, specially if you handle many active
timeseries with out-of-order samples.

Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
Signed-off-by: Jesus Vazquez <jesusvazquez@users.noreply.github.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Jesus Vazquez 2023-01-13 14:30:50 +01:00 committed by GitHub
parent 721f33dbb0
commit 136956cca4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 0 deletions

View file

@ -21,6 +21,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -31,6 +32,7 @@ import (
"github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
) )
func TestRemoteWriteHandler(t *testing.T) { func TestRemoteWriteHandler(t *testing.T) {
@ -171,6 +173,65 @@ func TestCommitErr(t *testing.T) {
require.Equal(t, "commit error\n", string(body)) require.Equal(t, "commit error\n", string(body))
} }
func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
dir := b.TempDir()
opts := tsdb.DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
db, err := tsdb.Open(dir, nil, nil, opts, nil)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, db.Close())
})
handler := NewWriteHandler(log.NewNopLogger(), db.Head())
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
require.Equal(b, http.StatusNoContent, recorder.Code)
require.Equal(b, db.Head().NumSeries(), uint64(1000))
var bufRequests [][]byte
for i := 0; i < 100; i++ {
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err)
bufRequests = append(bufRequests, buf)
}
b.ResetTimer()
for i := 0; i < 100; i++ {
req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i]))
require.NoError(b, err)
recorder = httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
require.Equal(b, http.StatusNoContent, recorder.Code)
require.Equal(b, db.Head().NumSeries(), uint64(1000))
}
}
func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries {
var series []prompb.TimeSeries
for i := 0; i < numSeries; i++ {
s := prompb.TimeSeries{
Labels: []prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}},
Samples: []prompb.Sample{{Value: float64(i), Timestamp: ts}},
}
series = append(series, s)
}
return series
}
type mockAppendable struct { type mockAppendable struct {
latestSample int64 latestSample int64
samples []mockSample samples []mockSample

View file

@ -36,6 +36,15 @@ func NewOOOChunk() *OOOChunk {
// Insert inserts the sample such that order is maintained. // Insert inserts the sample such that order is maintained.
// Returns false if insert was not possible due to the same timestamp already existing. // Returns false if insert was not possible due to the same timestamp already existing.
func (o *OOOChunk) Insert(t int64, v float64) bool { func (o *OOOChunk) Insert(t int64, v float64) bool {
// Although out-of-order samples can be out-of-order amongst themselves, we
// are opinionated and expect them to be usually in-order meaning we could
// try to append at the end first if the new timestamp is higher than the
// last known timestamp.
if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t {
o.samples = append(o.samples, sample{t, v, nil, nil})
return true
}
// Find index of sample we should replace. // Find index of sample we should replace.
i := sort.Search(len(o.samples), func(i int) bool { return o.samples[i].t >= t }) i := sort.Search(len(o.samples), func(i int) bool { return o.samples[i].t >= t })
@ -45,6 +54,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
return true return true
} }
// Duplicate sample for timestamp is not allowed.
if o.samples[i].t == t { if o.samples[i].t == t {
return false return false
} }