[REFACTORY] simplify appender commit (#15112)

* [REFACTOR] simplify appender commit

Signed-off-by: Nicolas Takashi <nicolas.tcs@hotmail.com>
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
Nicolas Takashi 2024-10-29 12:34:02 +00:00 committed by GitHub
parent 350e0d5bc1
commit b6c538972c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 538 additions and 401 deletions

View file

@ -984,23 +984,38 @@ func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar {
return ret
}
// Commit writes to the WAL and adds the data to the Head.
// TODO(codesome): Refactor this method to reduce indentation and make it more readable.
func (a *headAppender) Commit() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
if err := a.log(); err != nil {
_ = a.Rollback() // Most likely the same error will happen again.
return fmt.Errorf("write to WAL: %w", err)
}
if a.head.writeNotified != nil {
a.head.writeNotified.Notify()
}
type appenderCommitContext struct {
floatsAppended int
histogramsAppended int
// Number of samples out of order but accepted: with ooo enabled and within time window.
oooFloatsAccepted int
oooHistogramAccepted int
// Number of samples rejected due to: out of order but OOO support disabled.
floatOOORejected int
histoOOORejected int
// Number of samples rejected due to: out of order but too old (OOO support enabled, but outside time window).
floatTooOldRejected int
histoTooOldRejected int
// Number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled).
floatOOBRejected int
histoOOBRejected int
inOrderMint int64
inOrderMaxt int64
oooMinT int64
oooMaxT int64
wblSamples []record.RefSample
wblHistograms []record.RefHistogramSample
wblFloatHistograms []record.RefFloatHistogramSample
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
oooMmapMarkersCount int
oooRecords [][]byte
oooCapMax int64
appendChunkOpts chunkOpts
enc record.Encoder
}
// commitExemplars adds all exemplars from headAppender to the head's exemplar storage.
func (a *headAppender) commitExemplars() {
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _, e := range a.exemplars {
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
@ -1018,6 +1033,396 @@ func (a *headAppender) Commit() (err error) {
a.head.logger.Debug("Unknown error while adding exemplar", "err", err)
}
}
}
func (acc *appenderCommitContext) collectOOORecords(a *headAppender) {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
acc.wblSamples = nil
acc.wblHistograms = nil
acc.wblFloatHistograms = nil
acc.oooMmapMarkers = nil
acc.oooMmapMarkersCount = 0
return
}
// The m-map happens before adding a new sample. So we collect
// the m-map markers first, and then samples.
// WBL Graphically:
// WBL Before this Commit(): [old samples before this commit for chunk 1]
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
if acc.oooMmapMarkers != nil {
markers := make([]record.RefMmapMarker, 0, acc.oooMmapMarkersCount)
for ref, mmapRefs := range acc.oooMmapMarkers {
for _, mmapRef := range mmapRefs {
markers = append(markers, record.RefMmapMarker{
Ref: ref,
MmapRef: mmapRef,
})
}
}
r := acc.enc.MmapMarkers(markers, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
if len(acc.wblSamples) > 0 {
r := acc.enc.Samples(acc.wblSamples, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
if len(acc.wblHistograms) > 0 {
r := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
if len(acc.wblFloatHistograms) > 0 {
r := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
acc.wblSamples = nil
acc.wblHistograms = nil
acc.wblFloatHistograms = nil
acc.oooMmapMarkers = nil
}
// handleAppendableError processes errors encountered during sample appending and updates
// the provided counters accordingly.
//
// Parameters:
// - err: The error encountered during appending.
// - appended: Pointer to the counter tracking the number of successfully appended samples.
// - oooRejected: Pointer to the counter tracking the number of out-of-order samples rejected.
// - oobRejected: Pointer to the counter tracking the number of out-of-bounds samples rejected.
// - tooOldRejected: Pointer to the counter tracking the number of too-old samples rejected.
func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOldRejected *int) {
switch {
case errors.Is(err, storage.ErrOutOfOrderSample):
*appended--
*oooRejected++
case errors.Is(err, storage.ErrOutOfBounds):
*appended--
*oobRejected++
case errors.Is(err, storage.ErrTooOldSample):
*appended--
*tooOldRejected++
default:
*appended--
}
}
// commitSamples processes and commits the samples in the headAppender to the series.
// It handles both in-order and out-of-order samples, updating the appenderCommitContext
// with the results of the append operations.
//
// The function iterates over the samples in the headAppender and attempts to append each sample
// to its corresponding series. It handles various error cases such as out-of-order samples,
// out-of-bounds samples, and too-old samples, updating the appenderCommitContext accordingly.
//
// For out-of-order samples, it checks if the sample can be inserted into the series and updates
// the out-of-order mmap markers if necessary. It also updates the write-ahead log (WBL) samples
// and the minimum and maximum timestamps for out-of-order samples.
//
// For in-order samples, it attempts to append the sample to the series and updates the minimum
// and maximum timestamps for in-order samples.
//
// The function also increments the chunk metrics if a new chunk is created and performs cleanup
// operations on the series after appending the samples.
//
// There are also specific functions to commit histograms and float histograms.
func (a *headAppender) commitSamples(acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil {
handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected)
}
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != nil means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
acc.collectOOORecords(a)
}
if acc.oooMmapMarkers == nil {
acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
acc.oooMmapMarkers[series.ref] = mmapRefs
acc.oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
acc.oooMmapMarkersCount++
}
}
if ok {
acc.wblSamples = append(acc.wblSamples, s)
if s.T < acc.oooMinT {
acc.oooMinT = s.T
}
if s.T > acc.oooMaxT {
acc.oooMaxT = s.T
}
acc.oooFloatsAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
acc.floatsAppended--
}
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
}
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
acc.floatsAppended--
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
}
// For details on the commitHistograms function, see the commitSamples docs.
func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if err != nil {
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
}
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
acc.collectOOORecords(a)
}
if acc.oooMmapMarkers == nil {
acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
acc.oooMmapMarkers[series.ref] = mmapRefs
acc.oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
acc.oooMmapMarkersCount++
}
}
if ok {
acc.wblHistograms = append(acc.wblHistograms, s)
if s.T < acc.oooMinT {
acc.oooMinT = s.T
}
if s.T > acc.oooMaxT {
acc.oooMaxT = s.T
}
acc.oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
acc.histogramsAppended--
}
default:
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
}
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
}
// For details on the commitFloatHistograms function, see the commitSamples docs.
func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
series.Lock()
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if err != nil {
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
}
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
acc.collectOOORecords(a)
}
if acc.oooMmapMarkers == nil {
acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
acc.oooMmapMarkers[series.ref] = mmapRefs
acc.oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
acc.oooMmapMarkersCount++
}
}
if ok {
acc.wblFloatHistograms = append(acc.wblFloatHistograms, s)
if s.T < acc.oooMinT {
acc.oooMinT = s.T
}
if s.T > acc.oooMaxT {
acc.oooMaxT = s.T
}
acc.oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
acc.histogramsAppended--
}
default:
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
}
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
}
// commitMetadata commits the metadata for each series in the headAppender.
// It iterates over the metadata slice and updates the corresponding series
// with the new metadata information. The series is locked during the update
// to ensure thread safety.
func (a *headAppender) commitMetadata() {
var series *memSeries
for i, m := range a.metadata {
series = a.metadataSeries[i]
series.Lock()
series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help}
series.Unlock()
}
}
// Commit writes to the WAL and adds the data to the Head.
// TODO(codesome): Refactor this method to reduce indentation and make it more readable.
func (a *headAppender) Commit() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
if err := a.log(); err != nil {
_ = a.Rollback() // Most likely the same error will happen again.
return fmt.Errorf("write to WAL: %w", err)
}
if a.head.writeNotified != nil {
a.head.writeNotified.Notify()
}
a.commitExemplars()
defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
@ -1028,401 +1433,46 @@ func (a *headAppender) Commit() (err error) {
defer a.head.putMetadataBuffer(a.metadata)
defer a.head.iso.closeAppend(a.appendID)
var (
floatsAppended = len(a.samples)
histogramsAppended = len(a.histograms) + len(a.floatHistograms)
// number of samples out of order but accepted: with ooo enabled and within time window
oooFloatsAccepted int
oooHistogramAccepted int
// number of samples rejected due to: out of order but OOO support disabled.
floatOOORejected int
histoOOORejected int
// number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
floatTooOldRejected int
histoTooOldRejected int
// number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
floatOOBRejected int
histoOOBRejected int
inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64
oooMinT int64 = math.MaxInt64
oooMaxT int64 = math.MinInt64
wblSamples []record.RefSample
wblHistograms []record.RefHistogramSample
wblFloatHistograms []record.RefFloatHistogramSample
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
oooMmapMarkersCount int
oooRecords [][]byte
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
series *memSeries
appendChunkOpts = chunkOpts{
acc := &appenderCommitContext{
floatsAppended: len(a.samples),
histogramsAppended: len(a.histograms) + len(a.floatHistograms),
inOrderMint: math.MaxInt64,
inOrderMaxt: math.MinInt64,
oooMinT: math.MaxInt64,
oooMaxT: math.MinInt64,
oooCapMax: a.head.opts.OutOfOrderCapMax.Load(),
appendChunkOpts: chunkOpts{
chunkDiskMapper: a.head.chunkDiskMapper,
chunkRange: a.head.chunkRange.Load(),
samplesPerChunk: a.head.opts.SamplesPerChunk,
}
enc record.Encoder
)
},
}
defer func() {
for i := range oooRecords {
a.head.putBytesBuffer(oooRecords[i][:0])
for i := range acc.oooRecords {
a.head.putBytesBuffer(acc.oooRecords[i][:0])
}
}()
collectOOORecords := func() {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
wblSamples = nil
wblHistograms = nil
wblFloatHistograms = nil
oooMmapMarkers = nil
oooMmapMarkersCount = 0
return
}
// The m-map happens before adding a new sample. So we collect
// the m-map markers first, and then samples.
// WBL Graphically:
// WBL Before this Commit(): [old samples before this commit for chunk 1]
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
if oooMmapMarkers != nil {
markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount)
for ref, mmapRefs := range oooMmapMarkers {
for _, mmapRef := range mmapRefs {
markers = append(markers, record.RefMmapMarker{
Ref: ref,
MmapRef: mmapRef,
})
}
}
r := enc.MmapMarkers(markers, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
if len(wblSamples) > 0 {
r := enc.Samples(wblSamples, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
if len(wblHistograms) > 0 {
r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
if len(wblFloatHistograms) > 0 {
r := enc.FloatHistogramSamples(wblFloatHistograms, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
a.commitSamples(acc)
a.commitHistograms(acc)
a.commitFloatHistograms(acc)
a.commitMetadata()
wblSamples = nil
wblHistograms = nil
wblFloatHistograms = nil
oooMmapMarkers = nil
}
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
switch {
case err == nil:
// Do nothing.
case errors.Is(err, storage.ErrOutOfOrderSample):
floatsAppended--
floatOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
floatsAppended--
floatOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
floatsAppended--
floatTooOldRejected++
default:
floatsAppended--
}
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != nil means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblSamples = append(wblSamples, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooFloatsAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
floatsAppended--
}
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
floatsAppended--
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
switch {
case err == nil:
// Do nothing.
case errors.Is(err, storage.ErrOutOfOrderSample):
histogramsAppended--
histoOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
histogramsAppended--
histoOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
histogramsAppended--
histoTooOldRejected++
default:
histogramsAppended--
}
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblHistograms = append(wblHistograms, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
histogramsAppended--
}
default:
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsAppended--
histoOOORejected++
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
series.Lock()
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
switch {
case err == nil:
// Do nothing.
case errors.Is(err, storage.ErrOutOfOrderSample):
histogramsAppended--
histoOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
histogramsAppended--
histoOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
histogramsAppended--
histoTooOldRejected++
default:
histogramsAppended--
}
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblFloatHistograms = append(wblFloatHistograms, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
histogramsAppended--
}
default:
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsAppended--
histoOOORejected++
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
for i, m := range a.metadata {
series = a.metadataSeries[i]
series.Lock()
series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help}
series.Unlock()
}
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOORejected))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histoOOORejected))
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOBRejected))
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(oooHistogramAccepted))
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT)
collectOOORecords()
acc.collectOOORecords(a)
if a.head.wbl != nil {
if err := a.head.wbl.Log(oooRecords...); err != nil {
if err := a.head.wbl.Log(acc.oooRecords...); err != nil {
// TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging
// until we have found what samples become OOO. We can try having a metric for this failure.
// Returning the error here is not correct because we have already put the samples into the memory,

View file

@ -14,15 +14,22 @@
package tsdb
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
)
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
@ -79,6 +86,86 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
}
}
func BenchmarkHead_WalCommit(b *testing.B) {
seriesCounts := []int{100, 1000, 10000}
series := genSeries(10000, 10, 0, 0) // Only using the generated labels.
appendSamples := func(b *testing.B, app storage.Appender, seriesCount int, ts int64) {
var err error
for i, s := range series[:seriesCount] {
var ref storage.SeriesRef
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts, float64(ts))
} else {
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
}
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts,
})
require.NoError(b, err)
}
}
for _, seriesCount := range seriesCounts {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, commits := range []int64{1, 2} { // To test commits that create new series and when the series already exists.
b.Run(fmt.Sprintf("%d commits", commits), func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
h, w := newTestHead(b, 10000, wlog.CompressionNone, false)
b.Cleanup(func() {
if h != nil {
h.Close()
}
if w != nil {
w.Close()
}
})
app := h.Appender(context.Background())
appendSamples(b, app, seriesCount, 0)
b.StartTimer()
require.NoError(b, app.Commit())
if commits == 2 {
b.StopTimer()
app = h.Appender(context.Background())
appendSamples(b, app, seriesCount, 1)
b.StartTimer()
require.NoError(b, app.Commit())
}
b.StopTimer()
h.Close()
h = nil
w.Close()
w = nil
}
})
}
})
}
}
type failingSeriesLifecycleCallback struct{}
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }