Merge pull request #313 from grafana/load-ooo-time-window-once-per-appender

Load OutOfOrderTimeWindow only once per appender
This commit is contained in:
Oleg Zaytsev 2022-08-11 11:09:32 +02:00 committed by GitHub
commit c40ecf4bae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -128,6 +128,7 @@ func (h *Head) appender() *headAppender {
mint: math.MaxInt64, mint: math.MaxInt64,
maxt: math.MinInt64, maxt: math.MinInt64,
headMaxt: h.MaxTime(), headMaxt: h.MaxTime(),
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
samples: h.getAppendBuffer(), samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(), sampleSeries: h.getSeriesBuffer(),
exemplars: exemplarsBuf, exemplars: exemplarsBuf,
@ -236,10 +237,11 @@ type exemplarWithSeriesRef struct {
} }
type headAppender struct { type headAppender struct {
head *Head head *Head
minValidTime int64 // No samples below this timestamp are allowed. minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64 mint, maxt int64
headMaxt int64 // We track it here to not take the lock for every sample appended. headMaxt int64 // We track it here to not take the lock for every sample appended.
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
series []record.RefSeries // New series held by this appender. series []record.RefSeries // New series held by this appender.
samples []record.RefSample // New samples held by this appender. samples []record.RefSample // New samples held by this appender.
@ -253,8 +255,7 @@ type headAppender struct {
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append. // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append.
// If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work. // If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work.
oooTimeWindow := a.head.opts.OutOfOrderTimeWindow.Load() if a.oooTimeWindow == 0 && t < a.minValidTime {
if oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc() a.head.metrics.outOfBoundSamples.Inc()
return 0, storage.ErrOutOfBounds return 0, storage.ErrOutOfBounds
} }
@ -288,7 +289,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
s.Lock() s.Lock()
// TODO: if we definitely know at this point that the sample is ooo, then optimise // TODO: if we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL. // to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, oooTimeWindow) _, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil { if err == nil {
s.pendingCommit = true s.pendingCommit = true
} }
@ -547,12 +548,11 @@ func (a *headAppender) Commit() (err error) {
wblSamples = nil wblSamples = nil
oooMmapMarkers = nil oooMmapMarkers = nil
} }
oooTimeWindow := a.head.opts.OutOfOrderTimeWindow.Load()
for i, s := range a.samples { for i, s := range a.samples {
series = a.sampleSeries[i] series = a.sampleSeries[i]
series.Lock() series.Lock()
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, oooTimeWindow) oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
switch err { switch err {
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
samplesAppended-- samplesAppended--