diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 9c732990b..603b96cfc 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -984,23 +984,38 @@ func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar { return ret } -// Commit writes to the WAL and adds the data to the Head. -// TODO(codesome): Refactor this method to reduce indentation and make it more readable. -func (a *headAppender) Commit() (err error) { - if a.closed { - return ErrAppenderClosed - } - defer func() { a.closed = true }() - - if err := a.log(); err != nil { - _ = a.Rollback() // Most likely the same error will happen again. - return fmt.Errorf("write to WAL: %w", err) - } - - if a.head.writeNotified != nil { - a.head.writeNotified.Notify() - } +type appenderCommitContext struct { + floatsAppended int + histogramsAppended int + // Number of samples out of order but accepted: with ooo enabled and within time window. + oooFloatsAccepted int + oooHistogramAccepted int + // Number of samples rejected due to: out of order but OOO support disabled. + floatOOORejected int + histoOOORejected int + // Number of samples rejected due to: out of order but too old (OOO support enabled, but outside time window). + floatTooOldRejected int + histoTooOldRejected int + // Number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled). + floatOOBRejected int + histoOOBRejected int + inOrderMint int64 + inOrderMaxt int64 + oooMinT int64 + oooMaxT int64 + wblSamples []record.RefSample + wblHistograms []record.RefHistogramSample + wblFloatHistograms []record.RefFloatHistogramSample + oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef + oooMmapMarkersCount int + oooRecords [][]byte + oooCapMax int64 + appendChunkOpts chunkOpts + enc record.Encoder +} +// commitExemplars adds all exemplars from headAppender to the head's exemplar storage. +func (a *headAppender) commitExemplars() { // No errors logging to WAL, so pass the exemplars along to the in memory storage. for _, e := range a.exemplars { s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) @@ -1018,6 +1033,396 @@ func (a *headAppender) Commit() (err error) { a.head.logger.Debug("Unknown error while adding exemplar", "err", err) } } +} + +func (acc *appenderCommitContext) collectOOORecords(a *headAppender) { + if a.head.wbl == nil { + // WBL is not enabled. So no need to collect. + acc.wblSamples = nil + acc.wblHistograms = nil + acc.wblFloatHistograms = nil + acc.oooMmapMarkers = nil + acc.oooMmapMarkersCount = 0 + return + } + + // The m-map happens before adding a new sample. So we collect + // the m-map markers first, and then samples. + // WBL Graphically: + // WBL Before this Commit(): [old samples before this commit for chunk 1] + // WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3] + if acc.oooMmapMarkers != nil { + markers := make([]record.RefMmapMarker, 0, acc.oooMmapMarkersCount) + for ref, mmapRefs := range acc.oooMmapMarkers { + for _, mmapRef := range mmapRefs { + markers = append(markers, record.RefMmapMarker{ + Ref: ref, + MmapRef: mmapRef, + }) + } + } + r := acc.enc.MmapMarkers(markers, a.head.getBytesBuffer()) + acc.oooRecords = append(acc.oooRecords, r) + } + + if len(acc.wblSamples) > 0 { + r := acc.enc.Samples(acc.wblSamples, a.head.getBytesBuffer()) + acc.oooRecords = append(acc.oooRecords, r) + } + if len(acc.wblHistograms) > 0 { + r := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer()) + acc.oooRecords = append(acc.oooRecords, r) + } + if len(acc.wblFloatHistograms) > 0 { + r := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer()) + acc.oooRecords = append(acc.oooRecords, r) + } + + acc.wblSamples = nil + acc.wblHistograms = nil + acc.wblFloatHistograms = nil + acc.oooMmapMarkers = nil +} + +// handleAppendableError processes errors encountered during sample appending and updates +// the provided counters accordingly. +// +// Parameters: +// - err: The error encountered during appending. +// - appended: Pointer to the counter tracking the number of successfully appended samples. +// - oooRejected: Pointer to the counter tracking the number of out-of-order samples rejected. +// - oobRejected: Pointer to the counter tracking the number of out-of-bounds samples rejected. +// - tooOldRejected: Pointer to the counter tracking the number of too-old samples rejected. +func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOldRejected *int) { + switch { + case errors.Is(err, storage.ErrOutOfOrderSample): + *appended-- + *oooRejected++ + case errors.Is(err, storage.ErrOutOfBounds): + *appended-- + *oobRejected++ + case errors.Is(err, storage.ErrTooOldSample): + *appended-- + *tooOldRejected++ + default: + *appended-- + } +} + +// commitSamples processes and commits the samples in the headAppender to the series. +// It handles both in-order and out-of-order samples, updating the appenderCommitContext +// with the results of the append operations. +// +// The function iterates over the samples in the headAppender and attempts to append each sample +// to its corresponding series. It handles various error cases such as out-of-order samples, +// out-of-bounds samples, and too-old samples, updating the appenderCommitContext accordingly. +// +// For out-of-order samples, it checks if the sample can be inserted into the series and updates +// the out-of-order mmap markers if necessary. It also updates the write-ahead log (WBL) samples +// and the minimum and maximum timestamps for out-of-order samples. +// +// For in-order samples, it attempts to append the sample to the series and updates the minimum +// and maximum timestamps for in-order samples. +// +// The function also increments the chunk metrics if a new chunk is created and performs cleanup +// operations on the series after appending the samples. +// +// There are also specific functions to commit histograms and float histograms. +func (a *headAppender) commitSamples(acc *appenderCommitContext) { + var ok, chunkCreated bool + var series *memSeries + + for i, s := range a.samples { + series = a.sampleSeries[i] + series.Lock() + + oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) + if err != nil { + handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) + } + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + if chunkCreated { + r, ok := acc.oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != nil means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + acc.collectOOORecords(a) + } + + if acc.oooMmapMarkers == nil { + acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + acc.oooMmapMarkers[series.ref] = mmapRefs + acc.oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + acc.oooMmapMarkersCount++ + } + } + if ok { + acc.wblSamples = append(acc.wblSamples, s) + if s.T < acc.oooMinT { + acc.oooMinT = s.T + } + if s.T > acc.oooMaxT { + acc.oooMaxT = s.T + } + acc.oooFloatsAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + acc.floatsAppended-- + } + default: + ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + if ok { + if s.T < acc.inOrderMint { + acc.inOrderMint = s.T + } + if s.T > acc.inOrderMaxt { + acc.inOrderMaxt = s.T + } + } else { + // The sample is an exact duplicate, and should be silently dropped. + acc.floatsAppended-- + } + } + + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } +} + +// For details on the commitHistograms function, see the commitSamples docs. +func (a *headAppender) commitHistograms(acc *appenderCommitContext) { + var ok, chunkCreated bool + var series *memSeries + + for i, s := range a.histograms { + series = a.histogramSeries[i] + series.Lock() + + oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + if err != nil { + handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) + } + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + if chunkCreated { + r, ok := acc.oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + acc.collectOOORecords(a) + } + + if acc.oooMmapMarkers == nil { + acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + acc.oooMmapMarkers[series.ref] = mmapRefs + acc.oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + acc.oooMmapMarkersCount++ + } + } + if ok { + acc.wblHistograms = append(acc.wblHistograms, s) + if s.T < acc.oooMinT { + acc.oooMinT = s.T + } + if s.T > acc.oooMaxT { + acc.oooMaxT = s.T + } + acc.oooHistogramAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + acc.histogramsAppended-- + } + default: + ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) + if ok { + if s.T < acc.inOrderMint { + acc.inOrderMint = s.T + } + if s.T > acc.inOrderMaxt { + acc.inOrderMaxt = s.T + } + } else { + acc.histogramsAppended-- + acc.histoOOORejected++ + } + } + + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } +} + +// For details on the commitFloatHistograms function, see the commitSamples docs. +func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { + var ok, chunkCreated bool + var series *memSeries + + for i, s := range a.floatHistograms { + series = a.floatHistogramSeries[i] + series.Lock() + + oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + if err != nil { + handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) + } + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + if chunkCreated { + r, ok := acc.oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + acc.collectOOORecords(a) + } + + if acc.oooMmapMarkers == nil { + acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + acc.oooMmapMarkers[series.ref] = mmapRefs + acc.oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + acc.oooMmapMarkersCount++ + } + } + if ok { + acc.wblFloatHistograms = append(acc.wblFloatHistograms, s) + if s.T < acc.oooMinT { + acc.oooMinT = s.T + } + if s.T > acc.oooMaxT { + acc.oooMaxT = s.T + } + acc.oooHistogramAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + acc.histogramsAppended-- + } + default: + ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) + if ok { + if s.T < acc.inOrderMint { + acc.inOrderMint = s.T + } + if s.T > acc.inOrderMaxt { + acc.inOrderMaxt = s.T + } + } else { + acc.histogramsAppended-- + acc.histoOOORejected++ + } + } + + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } +} + +// commitMetadata commits the metadata for each series in the headAppender. +// It iterates over the metadata slice and updates the corresponding series +// with the new metadata information. The series is locked during the update +// to ensure thread safety. +func (a *headAppender) commitMetadata() { + var series *memSeries + for i, m := range a.metadata { + series = a.metadataSeries[i] + series.Lock() + series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help} + series.Unlock() + } +} + +// Commit writes to the WAL and adds the data to the Head. +// TODO(codesome): Refactor this method to reduce indentation and make it more readable. +func (a *headAppender) Commit() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() + + if err := a.log(); err != nil { + _ = a.Rollback() // Most likely the same error will happen again. + return fmt.Errorf("write to WAL: %w", err) + } + + if a.head.writeNotified != nil { + a.head.writeNotified.Notify() + } + + a.commitExemplars() defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) @@ -1028,401 +1433,46 @@ func (a *headAppender) Commit() (err error) { defer a.head.putMetadataBuffer(a.metadata) defer a.head.iso.closeAppend(a.appendID) - var ( - floatsAppended = len(a.samples) - histogramsAppended = len(a.histograms) + len(a.floatHistograms) - // number of samples out of order but accepted: with ooo enabled and within time window - oooFloatsAccepted int - oooHistogramAccepted int - // number of samples rejected due to: out of order but OOO support disabled. - floatOOORejected int - histoOOORejected int - // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window) - floatTooOldRejected int - histoTooOldRejected int - // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled) - floatOOBRejected int - histoOOBRejected int - inOrderMint int64 = math.MaxInt64 - inOrderMaxt int64 = math.MinInt64 - oooMinT int64 = math.MaxInt64 - oooMaxT int64 = math.MinInt64 - wblSamples []record.RefSample - wblHistograms []record.RefHistogramSample - wblFloatHistograms []record.RefFloatHistogramSample - oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef - oooMmapMarkersCount int - oooRecords [][]byte - oooCapMax = a.head.opts.OutOfOrderCapMax.Load() - series *memSeries - appendChunkOpts = chunkOpts{ + acc := &appenderCommitContext{ + floatsAppended: len(a.samples), + histogramsAppended: len(a.histograms) + len(a.floatHistograms), + inOrderMint: math.MaxInt64, + inOrderMaxt: math.MinInt64, + oooMinT: math.MaxInt64, + oooMaxT: math.MinInt64, + oooCapMax: a.head.opts.OutOfOrderCapMax.Load(), + appendChunkOpts: chunkOpts{ chunkDiskMapper: a.head.chunkDiskMapper, chunkRange: a.head.chunkRange.Load(), samplesPerChunk: a.head.opts.SamplesPerChunk, - } - enc record.Encoder - ) + }, + } + defer func() { - for i := range oooRecords { - a.head.putBytesBuffer(oooRecords[i][:0]) + for i := range acc.oooRecords { + a.head.putBytesBuffer(acc.oooRecords[i][:0]) } }() - collectOOORecords := func() { - if a.head.wbl == nil { - // WBL is not enabled. So no need to collect. - wblSamples = nil - wblHistograms = nil - wblFloatHistograms = nil - oooMmapMarkers = nil - oooMmapMarkersCount = 0 - return - } - // The m-map happens before adding a new sample. So we collect - // the m-map markers first, and then samples. - // WBL Graphically: - // WBL Before this Commit(): [old samples before this commit for chunk 1] - // WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3] - if oooMmapMarkers != nil { - markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount) - for ref, mmapRefs := range oooMmapMarkers { - for _, mmapRef := range mmapRefs { - markers = append(markers, record.RefMmapMarker{ - Ref: ref, - MmapRef: mmapRef, - }) - } - } - r := enc.MmapMarkers(markers, a.head.getBytesBuffer()) - oooRecords = append(oooRecords, r) - } - if len(wblSamples) > 0 { - r := enc.Samples(wblSamples, a.head.getBytesBuffer()) - oooRecords = append(oooRecords, r) - } - if len(wblHistograms) > 0 { - r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer()) - oooRecords = append(oooRecords, r) - } - if len(wblFloatHistograms) > 0 { - r := enc.FloatHistogramSamples(wblFloatHistograms, a.head.getBytesBuffer()) - oooRecords = append(oooRecords, r) - } + a.commitSamples(acc) + a.commitHistograms(acc) + a.commitFloatHistograms(acc) + a.commitMetadata() - wblSamples = nil - wblHistograms = nil - wblFloatHistograms = nil - oooMmapMarkers = nil - } - for i, s := range a.samples { - series = a.sampleSeries[i] - series.Lock() + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) + a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected)) + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected)) + a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended)) + a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended)) + a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted)) + a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted)) + a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt) + a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT) - oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) - switch { - case err == nil: - // Do nothing. - case errors.Is(err, storage.ErrOutOfOrderSample): - floatsAppended-- - floatOOORejected++ - case errors.Is(err, storage.ErrOutOfBounds): - floatsAppended-- - floatOOBRejected++ - case errors.Is(err, storage.ErrTooOldSample): - floatsAppended-- - floatTooOldRejected++ - default: - floatsAppended-- - } - - var ok, chunkCreated bool - - switch { - case err != nil: - // Do nothing here. - case oooSample: - // Sample is OOO and OOO handling is enabled - // and the delta is within the OOO tolerance. - var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) - if chunkCreated { - r, ok := oooMmapMarkers[series.ref] - if !ok || r != nil { - // !ok means there are no markers collected for these samples yet. So we first flush the samples - // before setting this m-map marker. - - // r != nil means we have already m-mapped a chunk for this series in the same Commit(). - // Hence, before we m-map again, we should add the samples and m-map markers - // seen till now to the WBL records. - collectOOORecords() - } - - if oooMmapMarkers == nil { - oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) - } - if len(mmapRefs) > 0 { - oooMmapMarkers[series.ref] = mmapRefs - oooMmapMarkersCount += len(mmapRefs) - } else { - // No chunk was written to disk, so we need to set an initial marker for this series. - oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} - oooMmapMarkersCount++ - } - } - if ok { - wblSamples = append(wblSamples, s) - if s.T < oooMinT { - oooMinT = s.T - } - if s.T > oooMaxT { - oooMaxT = s.T - } - oooFloatsAccepted++ - } else { - // Sample is an exact duplicate of the last sample. - // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, - // not with samples in already flushed OOO chunks. - // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. - floatsAppended-- - } - default: - ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts) - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { - // The sample is an exact duplicate, and should be silently dropped. - floatsAppended-- - } - } - - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } - - for i, s := range a.histograms { - series = a.histogramSeries[i] - series.Lock() - - oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) - switch { - case err == nil: - // Do nothing. - case errors.Is(err, storage.ErrOutOfOrderSample): - histogramsAppended-- - histoOOORejected++ - case errors.Is(err, storage.ErrOutOfBounds): - histogramsAppended-- - histoOOBRejected++ - case errors.Is(err, storage.ErrTooOldSample): - histogramsAppended-- - histoTooOldRejected++ - default: - histogramsAppended-- - } - - var ok, chunkCreated bool - - switch { - case err != nil: - // Do nothing here. - case oooSample: - // Sample is OOO and OOO handling is enabled - // and the delta is within the OOO tolerance. - var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) - if chunkCreated { - r, ok := oooMmapMarkers[series.ref] - if !ok || r != nil { - // !ok means there are no markers collected for these samples yet. So we first flush the samples - // before setting this m-map marker. - - // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). - // Hence, before we m-map again, we should add the samples and m-map markers - // seen till now to the WBL records. - collectOOORecords() - } - - if oooMmapMarkers == nil { - oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) - } - if len(mmapRefs) > 0 { - oooMmapMarkers[series.ref] = mmapRefs - oooMmapMarkersCount += len(mmapRefs) - } else { - // No chunk was written to disk, so we need to set an initial marker for this series. - oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} - oooMmapMarkersCount++ - } - } - if ok { - wblHistograms = append(wblHistograms, s) - if s.T < oooMinT { - oooMinT = s.T - } - if s.T > oooMaxT { - oooMaxT = s.T - } - oooHistogramAccepted++ - } else { - // Sample is an exact duplicate of the last sample. - // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, - // not with samples in already flushed OOO chunks. - // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. - histogramsAppended-- - } - default: - ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts) - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { - histogramsAppended-- - histoOOORejected++ - } - } - - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } - - for i, s := range a.floatHistograms { - series = a.floatHistogramSeries[i] - series.Lock() - - oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) - switch { - case err == nil: - // Do nothing. - case errors.Is(err, storage.ErrOutOfOrderSample): - histogramsAppended-- - histoOOORejected++ - case errors.Is(err, storage.ErrOutOfBounds): - histogramsAppended-- - histoOOBRejected++ - case errors.Is(err, storage.ErrTooOldSample): - histogramsAppended-- - histoTooOldRejected++ - default: - histogramsAppended-- - } - - var ok, chunkCreated bool - - switch { - case err != nil: - // Do nothing here. - case oooSample: - // Sample is OOO and OOO handling is enabled - // and the delta is within the OOO tolerance. - var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax, a.head.logger) - if chunkCreated { - r, ok := oooMmapMarkers[series.ref] - if !ok || r != nil { - // !ok means there are no markers collected for these samples yet. So we first flush the samples - // before setting this m-map marker. - - // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). - // Hence, before we m-map again, we should add the samples and m-map markers - // seen till now to the WBL records. - collectOOORecords() - } - - if oooMmapMarkers == nil { - oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) - } - if len(mmapRefs) > 0 { - oooMmapMarkers[series.ref] = mmapRefs - oooMmapMarkersCount += len(mmapRefs) - } else { - // No chunk was written to disk, so we need to set an initial marker for this series. - oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} - oooMmapMarkersCount++ - } - } - if ok { - wblFloatHistograms = append(wblFloatHistograms, s) - if s.T < oooMinT { - oooMinT = s.T - } - if s.T > oooMaxT { - oooMaxT = s.T - } - oooHistogramAccepted++ - } else { - // Sample is an exact duplicate of the last sample. - // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, - // not with samples in already flushed OOO chunks. - // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. - histogramsAppended-- - } - default: - ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts) - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { - histogramsAppended-- - histoOOORejected++ - } - } - - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } - - for i, m := range a.metadata { - series = a.metadataSeries[i] - series.Lock() - series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help} - series.Unlock() - } - - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOORejected)) - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histoOOORejected)) - a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOBRejected)) - a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected)) - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended)) - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(oooHistogramAccepted)) - a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) - a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT) - - collectOOORecords() + acc.collectOOORecords(a) if a.head.wbl != nil { - if err := a.head.wbl.Log(oooRecords...); err != nil { + if err := a.head.wbl.Log(acc.oooRecords...); err != nil { // TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging // until we have found what samples become OOO. We can try having a metric for this failure. // Returning the error here is not correct because we have already put the samples into the memory, diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 51de50ec2..aa2cf2214 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -14,15 +14,22 @@ package tsdb import ( + "context" "errors" + "fmt" + "math/rand" "strconv" "testing" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/wlog" ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { @@ -79,6 +86,86 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) { } } +func BenchmarkHead_WalCommit(b *testing.B) { + seriesCounts := []int{100, 1000, 10000} + series := genSeries(10000, 10, 0, 0) // Only using the generated labels. + + appendSamples := func(b *testing.B, app storage.Appender, seriesCount int, ts int64) { + var err error + for i, s := range series[:seriesCount] { + var ref storage.SeriesRef + // if i is even, append a sample, else append a histogram. + if i%2 == 0 { + ref, err = app.Append(ref, s.Labels(), ts, float64(ts)) + } else { + h := &histogram.Histogram{ + Count: 7 + uint64(ts*5), + ZeroCount: 2 + uint64(ts), + ZeroThreshold: 0.001, + Sum: 18.4 * rand.Float64(), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{ts + 1, 1, -1, 0}, + } + ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil) + } + require.NoError(b, err) + + _, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{ + Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())), + Value: rand.Float64(), + Ts: ts, + }) + require.NoError(b, err) + } + } + + for _, seriesCount := range seriesCounts { + b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) { + for _, commits := range []int64{1, 2} { // To test commits that create new series and when the series already exists. + b.Run(fmt.Sprintf("%d commits", commits), func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + h, w := newTestHead(b, 10000, wlog.CompressionNone, false) + b.Cleanup(func() { + if h != nil { + h.Close() + } + if w != nil { + w.Close() + } + }) + app := h.Appender(context.Background()) + + appendSamples(b, app, seriesCount, 0) + + b.StartTimer() + require.NoError(b, app.Commit()) + if commits == 2 { + b.StopTimer() + app = h.Appender(context.Background()) + appendSamples(b, app, seriesCount, 1) + b.StartTimer() + require.NoError(b, app.Commit()) + } + b.StopTimer() + h.Close() + h = nil + w.Close() + w = nil + } + }) + } + }) + } +} + type failingSeriesLifecycleCallback struct{} func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }