diff --git a/docs/storage.md b/docs/storage.md index 947960fe12..55d4309d37 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -137,6 +137,18 @@ will be used. Expired block cleanup happens in the background. It may take up to two hours to remove expired blocks. Blocks must be fully expired before they are removed. +## Right-Sizing Retention Size + +If you are utilizing `storage.tsdb.retention.size` to set a size limit, you +will want to consider the right size for this value relative to the storage you +have allocated for Prometheus. It is wise to reduce the retention size to provide +a buffer, ensuring that older entries will be removed before the allocated storage +for Prometheus becomes full. + +At present, we recommend setting the retention size to, at most, 80-85% of your +allocated Prometheus disk space. This increases the likelihood that older entires +will be removed prior to hitting any disk limitations. + ## Remote storage integrations Prometheus's local storage is limited to a single node's scalability and durability. diff --git a/promql/promqltest/testdata/histograms.test b/promql/promqltest/testdata/histograms.test index e1fb1d85ac..349a1e79c0 100644 --- a/promql/promqltest/testdata/histograms.test +++ b/promql/promqltest/testdata/histograms.test @@ -73,22 +73,32 @@ eval instant at 50m histogram_count(testhistogram3) {start="positive"} 110 {start="negative"} 20 +# Classic way of accessing the count still works. +eval instant at 50m testhistogram3_count + testhistogram3_count{start="positive"} 110 + testhistogram3_count{start="negative"} 20 + # Test histogram_sum. eval instant at 50m histogram_sum(testhistogram3) {start="positive"} 330 {start="negative"} 80 -# Test histogram_avg. +# Classic way of accessing the sum still works. +eval instant at 50m testhistogram3_sum + testhistogram3_sum{start="positive"} 330 + testhistogram3_sum{start="negative"} 80 + +# Test histogram_avg. This has no classic equivalent. eval instant at 50m histogram_avg(testhistogram3) {start="positive"} 3 {start="negative"} 4 -# Test histogram_stddev. +# Test histogram_stddev. This has no classic equivalent. eval instant at 50m histogram_stddev(testhistogram3) {start="positive"} 2.8189265757336734 {start="negative"} 4.182715937754936 -# Test histogram_stdvar. +# Test histogram_stdvar. This has no classic equivalent. eval instant at 50m histogram_stdvar(testhistogram3) {start="positive"} 7.946347039377573 {start="negative"} 17.495112615949154 @@ -103,137 +113,282 @@ eval instant at 50m histogram_fraction(0, 0.2, rate(testhistogram3[5m])) {start="positive"} 0.6363636363636364 {start="negative"} 0 -# Test histogram_quantile. +# In the classic histogram, we can access the corresponding bucket (if +# it exists) and divide by the count to get the same result. + +eval instant at 50m testhistogram3_bucket{le=".2"} / ignoring(le) testhistogram3_count + {start="positive"} 0.6363636363636364 + +eval instant at 50m rate(testhistogram3_bucket{le=".2"}[5m]) / ignoring(le) rate(testhistogram3_count[5m]) + {start="positive"} 0.6363636363636364 + +# Test histogram_quantile, native and classic. + +eval instant at 50m histogram_quantile(0, testhistogram3) + {start="positive"} 0 + {start="negative"} -0.25 eval instant at 50m histogram_quantile(0, testhistogram3_bucket) {start="positive"} 0 {start="negative"} -0.25 +eval instant at 50m histogram_quantile(0.25, testhistogram3) + {start="positive"} 0.055 + {start="negative"} -0.225 + eval instant at 50m histogram_quantile(0.25, testhistogram3_bucket) {start="positive"} 0.055 {start="negative"} -0.225 +eval instant at 50m histogram_quantile(0.5, testhistogram3) + {start="positive"} 0.125 + {start="negative"} -0.2 + eval instant at 50m histogram_quantile(0.5, testhistogram3_bucket) {start="positive"} 0.125 {start="negative"} -0.2 +eval instant at 50m histogram_quantile(0.75, testhistogram3) + {start="positive"} 0.45 + {start="negative"} -0.15 + eval instant at 50m histogram_quantile(0.75, testhistogram3_bucket) {start="positive"} 0.45 {start="negative"} -0.15 +eval instant at 50m histogram_quantile(1, testhistogram3) + {start="positive"} 1 + {start="negative"} -0.1 + eval instant at 50m histogram_quantile(1, testhistogram3_bucket) {start="positive"} 1 {start="negative"} -0.1 # Quantile too low. + +eval_warn instant at 50m histogram_quantile(-0.1, testhistogram) + {start="positive"} -Inf + {start="negative"} -Inf + eval_warn instant at 50m histogram_quantile(-0.1, testhistogram_bucket) {start="positive"} -Inf {start="negative"} -Inf # Quantile too high. + +eval_warn instant at 50m histogram_quantile(1.01, testhistogram) + {start="positive"} +Inf + {start="negative"} +Inf + eval_warn instant at 50m histogram_quantile(1.01, testhistogram_bucket) {start="positive"} +Inf {start="negative"} +Inf # Quantile invalid. + +eval_warn instant at 50m histogram_quantile(NaN, testhistogram) + {start="positive"} NaN + {start="negative"} NaN + eval_warn instant at 50m histogram_quantile(NaN, testhistogram_bucket) {start="positive"} NaN {start="negative"} NaN # Quantile value in lowest bucket. + +eval instant at 50m histogram_quantile(0, testhistogram) + {start="positive"} 0 + {start="negative"} -0.2 + eval instant at 50m histogram_quantile(0, testhistogram_bucket) {start="positive"} 0 {start="negative"} -0.2 # Quantile value in highest bucket. + +eval instant at 50m histogram_quantile(1, testhistogram) + {start="positive"} 1 + {start="negative"} 0.3 + eval instant at 50m histogram_quantile(1, testhistogram_bucket) {start="positive"} 1 {start="negative"} 0.3 # Finally some useful quantiles. + +eval instant at 50m histogram_quantile(0.2, testhistogram) + {start="positive"} 0.048 + {start="negative"} -0.2 + eval instant at 50m histogram_quantile(0.2, testhistogram_bucket) {start="positive"} 0.048 {start="negative"} -0.2 +eval instant at 50m histogram_quantile(0.5, testhistogram) + {start="positive"} 0.15 + {start="negative"} -0.15 + eval instant at 50m histogram_quantile(0.5, testhistogram_bucket) {start="positive"} 0.15 {start="negative"} -0.15 +eval instant at 50m histogram_quantile(0.8, testhistogram) + {start="positive"} 0.72 + {start="negative"} 0.3 + eval instant at 50m histogram_quantile(0.8, testhistogram_bucket) {start="positive"} 0.72 {start="negative"} 0.3 # More realistic with rates. + +eval instant at 50m histogram_quantile(0.2, rate(testhistogram[5m])) + {start="positive"} 0.048 + {start="negative"} -0.2 + eval instant at 50m histogram_quantile(0.2, rate(testhistogram_bucket[5m])) {start="positive"} 0.048 {start="negative"} -0.2 +eval instant at 50m histogram_quantile(0.5, rate(testhistogram[5m])) + {start="positive"} 0.15 + {start="negative"} -0.15 + eval instant at 50m histogram_quantile(0.5, rate(testhistogram_bucket[5m])) {start="positive"} 0.15 {start="negative"} -0.15 +eval instant at 50m histogram_quantile(0.8, rate(testhistogram[5m])) + {start="positive"} 0.72 + {start="negative"} 0.3 + eval instant at 50m histogram_quantile(0.8, rate(testhistogram_bucket[5m])) {start="positive"} 0.72 {start="negative"} 0.3 # Want results exactly in the middle of the bucket. + +eval instant at 7m histogram_quantile(1./6., testhistogram2) + {} 1 + eval instant at 7m histogram_quantile(1./6., testhistogram2_bucket) {} 1 +eval instant at 7m histogram_quantile(0.5, testhistogram2) + {} 3 + eval instant at 7m histogram_quantile(0.5, testhistogram2_bucket) {} 3 +eval instant at 7m histogram_quantile(5./6., testhistogram2) + {} 5 + eval instant at 7m histogram_quantile(5./6., testhistogram2_bucket) {} 5 +eval instant at 47m histogram_quantile(1./6., rate(testhistogram2[15m])) + {} 1 + eval instant at 47m histogram_quantile(1./6., rate(testhistogram2_bucket[15m])) {} 1 +eval instant at 47m histogram_quantile(0.5, rate(testhistogram2[15m])) + {} 3 + eval instant at 47m histogram_quantile(0.5, rate(testhistogram2_bucket[15m])) {} 3 +eval instant at 47m histogram_quantile(5./6., rate(testhistogram2[15m])) + {} 5 + eval instant at 47m histogram_quantile(5./6., rate(testhistogram2_bucket[15m])) {} 5 -# Aggregated histogram: Everything in one. +# Aggregated histogram: Everything in one. Note how native histograms +# don't require aggregation by le. + +eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m]))) + {} 0.075 + eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le)) {} 0.075 +eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m]))) + {} 0.1277777777777778 + eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le)) {} 0.1277777777777778 # Aggregated histogram: Everything in one. Now with avg, which does not change anything. + +eval instant at 50m histogram_quantile(0.3, avg(rate(request_duration_seconds[5m]))) + {} 0.075 + eval instant at 50m histogram_quantile(0.3, avg(rate(request_duration_seconds_bucket[5m])) by (le)) {} 0.075 +eval instant at 50m histogram_quantile(0.5, avg(rate(request_duration_seconds[5m]))) + {} 0.12777777777777778 + eval instant at 50m histogram_quantile(0.5, avg(rate(request_duration_seconds_bucket[5m])) by (le)) {} 0.12777777777777778 # Aggregated histogram: By instance. + +eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])) by (instance)) + {instance="ins1"} 0.075 + {instance="ins2"} 0.075 + eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance)) {instance="ins1"} 0.075 {instance="ins2"} 0.075 +eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])) by (instance)) + {instance="ins1"} 0.1333333333 + {instance="ins2"} 0.125 + eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance)) {instance="ins1"} 0.1333333333 {instance="ins2"} 0.125 # Aggregated histogram: By job. + +eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])) by (job)) + {job="job1"} 0.1 + {job="job2"} 0.0642857142857143 + eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job)) {job="job1"} 0.1 {job="job2"} 0.0642857142857143 +eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])) by (job)) + {job="job1"} 0.14 + {job="job2"} 0.1125 + eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job)) {job="job1"} 0.14 {job="job2"} 0.1125 # Aggregated histogram: By job and instance. + +eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])) by (job, instance)) + {instance="ins1", job="job1"} 0.11 + {instance="ins2", job="job1"} 0.09 + {instance="ins1", job="job2"} 0.06 + {instance="ins2", job="job2"} 0.0675 + eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance)) {instance="ins1", job="job1"} 0.11 {instance="ins2", job="job1"} 0.09 {instance="ins1", job="job2"} 0.06 {instance="ins2", job="job2"} 0.0675 +eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])) by (job, instance)) + {instance="ins1", job="job1"} 0.15 + {instance="ins2", job="job1"} 0.1333333333333333 + {instance="ins1", job="job2"} 0.1 + {instance="ins2", job="job2"} 0.1166666666666667 + eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance)) {instance="ins1", job="job1"} 0.15 {instance="ins2", job="job1"} 0.1333333333333333 @@ -241,18 +396,32 @@ eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bu {instance="ins2", job="job2"} 0.1166666666666667 # The unaggregated histogram for comparison. Same result as the previous one. + +eval instant at 50m histogram_quantile(0.3, rate(request_duration_seconds[5m])) + {instance="ins1", job="job1"} 0.11 + {instance="ins2", job="job1"} 0.09 + {instance="ins1", job="job2"} 0.06 + {instance="ins2", job="job2"} 0.0675 + eval instant at 50m histogram_quantile(0.3, rate(request_duration_seconds_bucket[5m])) {instance="ins1", job="job1"} 0.11 {instance="ins2", job="job1"} 0.09 {instance="ins1", job="job2"} 0.06 {instance="ins2", job="job2"} 0.0675 +eval instant at 50m histogram_quantile(0.5, rate(request_duration_seconds[5m])) + {instance="ins1", job="job1"} 0.15 + {instance="ins2", job="job1"} 0.13333333333333333 + {instance="ins1", job="job2"} 0.1 + {instance="ins2", job="job2"} 0.11666666666666667 + eval instant at 50m histogram_quantile(0.5, rate(request_duration_seconds_bucket[5m])) {instance="ins1", job="job1"} 0.15 {instance="ins2", job="job1"} 0.13333333333333333 {instance="ins1", job="job2"} 0.1 {instance="ins2", job="job2"} 0.11666666666666667 +# All NHCBs summed into one. eval instant at 50m sum(request_duration_seconds) {} {{schema:-53 count:250 custom_values:[0.1 0.2] buckets:[100 90 60]}} @@ -303,11 +472,13 @@ load_with_nhcb 5m eval instant at 50m histogram_quantile(0.2, rate(empty_bucket[5m])) {instance="ins1", job="job1"} NaN -# Load a duplicate histogram with a different name to test failure scenario on multiple histograms with the same label set +# Load a duplicate histogram with a different name to test failure scenario on multiple histograms with the same label set. # https://github.com/prometheus/prometheus/issues/9910 load_with_nhcb 5m - request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.1"} 0+1x10 - request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.2"} 0+3x10 - request_duration_seconds2_bucket{job="job1", instance="ins1", le="+Inf"} 0+4x10 + request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.1"} 0+1x10 + request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.2"} 0+3x10 + request_duration_seconds2_bucket{job="job1", instance="ins1", le="+Inf"} 0+4x10 -eval_fail instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket$"}) +eval_fail instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket"}) + +eval_fail instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*"}) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index f45ab606ba..8d66d1e818 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -846,16 +846,17 @@ func (a *headAppender) Commit() (err error) { // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled) floatOOBRejected int - inOrderMint int64 = math.MaxInt64 - inOrderMaxt int64 = math.MinInt64 - ooomint int64 = math.MaxInt64 - ooomaxt int64 = math.MinInt64 - wblSamples []record.RefSample - oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef - oooRecords [][]byte - oooCapMax = a.head.opts.OutOfOrderCapMax.Load() - series *memSeries - appendChunkOpts = chunkOpts{ + inOrderMint int64 = math.MaxInt64 + inOrderMaxt int64 = math.MinInt64 + oooMinT int64 = math.MaxInt64 + oooMaxT int64 = math.MinInt64 + wblSamples []record.RefSample + oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef + oooMmapMarkersCount int + oooRecords [][]byte + oooCapMax = a.head.opts.OutOfOrderCapMax.Load() + series *memSeries + appendChunkOpts = chunkOpts{ chunkDiskMapper: a.head.chunkDiskMapper, chunkRange: a.head.chunkRange.Load(), samplesPerChunk: a.head.opts.SamplesPerChunk, @@ -872,6 +873,7 @@ func (a *headAppender) Commit() (err error) { // WBL is not enabled. So no need to collect. wblSamples = nil oooMmapMarkers = nil + oooMmapMarkersCount = 0 return } // The m-map happens before adding a new sample. So we collect @@ -880,12 +882,14 @@ func (a *headAppender) Commit() (err error) { // WBL Before this Commit(): [old samples before this commit for chunk 1] // WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3] if oooMmapMarkers != nil { - markers := make([]record.RefMmapMarker, 0, len(oooMmapMarkers)) - for ref, mmapRef := range oooMmapMarkers { - markers = append(markers, record.RefMmapMarker{ - Ref: ref, - MmapRef: mmapRef, - }) + markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount) + for ref, mmapRefs := range oooMmapMarkers { + for _, mmapRef := range mmapRefs { + markers = append(markers, record.RefMmapMarker{ + Ref: ref, + MmapRef: mmapRef, + }) + } } r := enc.MmapMarkers(markers, a.head.getBytesBuffer()) oooRecords = append(oooRecords, r) @@ -928,32 +932,39 @@ func (a *headAppender) Commit() (err error) { case oooSample: // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. - var mmapRef chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) if chunkCreated { r, ok := oooMmapMarkers[series.ref] - if !ok || r != 0 { + if !ok || r != nil { // !ok means there are no markers collected for these samples yet. So we first flush the samples // before setting this m-map marker. - // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // r != nil means we have already m-mapped a chunk for this series in the same Commit(). // Hence, before we m-map again, we should add the samples and m-map markers // seen till now to the WBL records. collectOOORecords() } if oooMmapMarkers == nil { - oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef) + oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + oooMmapMarkers[series.ref] = mmapRefs + oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + oooMmapMarkersCount++ } - oooMmapMarkers[series.ref] = mmapRef } if ok { wblSamples = append(wblSamples, s) - if s.T < ooomint { - ooomint = s.T + if s.T < oooMinT { + oooMinT = s.T } - if s.T > ooomaxt { - ooomaxt = s.T + if s.T > oooMaxT { + oooMaxT = s.T } floatOOOAccepted++ } else { @@ -1053,7 +1064,7 @@ func (a *headAppender) Commit() (err error) { a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted)) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) - a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt) + a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT) collectOOORecords() if a.head.wbl != nil { @@ -1069,14 +1080,14 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper) + c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper) chunkCreated = true } @@ -1089,7 +1100,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk c.maxTime = t } } - return ok, chunkCreated, mmapRef + return ok, chunkCreated, mmapRefs } // chunkOpts are chunk-level options that are passed when appending to a memSeries. @@ -1431,7 +1442,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) { +func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) s.ooo.oooHeadChunk = &oooHeadChunk{ @@ -1443,21 +1454,29 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk return s.ooo.oooHeadChunk, ref } -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef { +func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { - // There is no head chunk, so nothing to m-map here. - return 0 + // OOO is not enabled or there is no head chunk, so nothing to m-map here. + return nil + } + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + if err != nil { + handleChunkWriteError(err) + return nil + } + chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1) + for _, memchunk := range chks { + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError) + chunkRefs = append(chunkRefs, chunkRef) + s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ + ref: chunkRef, + numSamples: uint16(memchunk.chunk.NumSamples()), + minTime: memchunk.minTime, + maxTime: memchunk.maxTime, + }) } - xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError) - s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ - ref: chunkRef, - numSamples: uint16(xor.NumSamples()), - minTime: s.ooo.oooHeadChunk.minTime, - maxTime: s.ooo.oooHeadChunk.maxTime, - }) s.ooo.oooHeadChunk = nil - return chunkRef + return chunkRefs } // mmapChunks will m-map all but first chunk on s.headChunks list. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fa48345165..c192c8a078 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4730,6 +4730,14 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { // TestWBLReplay checks the replay at a low level. func TestWBLReplay(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testWBLReplay(t, scenario) + }) + } +} + +func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) @@ -4745,11 +4753,11 @@ func TestWBLReplay(t *testing.T) { require.NoError(t, err) require.NoError(t, h.Init(0)) - var expOOOSamples []sample + var expOOOSamples []chunks.Sample l := labels.FromStrings("foo", "bar") - appendSample := func(mins int64, isOOO bool) { + appendSample := func(mins int64, val float64, isOOO bool) { app := h.Appender(context.Background()) - ts, v := mins*time.Minute.Milliseconds(), float64(mins) + ts, v := mins*time.Minute.Milliseconds(), val _, err := app.Append(0, l, ts, v) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -4760,15 +4768,15 @@ func TestWBLReplay(t *testing.T) { } // In-order sample. - appendSample(60, false) + appendSample(60, 60, false) // Out of order samples. - appendSample(40, true) - appendSample(35, true) - appendSample(50, true) - appendSample(55, true) - appendSample(59, true) - appendSample(31, true) + appendSample(40, 40, true) + appendSample(35, 35, true) + appendSample(50, 50, true) + appendSample(55, 55, true) + appendSample(59, 59, true) + appendSample(31, 31, true) // Check that Head's time ranges are set properly. require.Equal(t, 60*time.Minute.Milliseconds(), h.MinTime()) @@ -4792,22 +4800,23 @@ func TestWBLReplay(t *testing.T) { require.False(t, ok) require.NotNil(t, ms) - xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR() + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) require.NoError(t, err) + require.Len(t, chks, 1) - it := xor.Iterator(nil) - actOOOSamples := make([]sample, 0, len(expOOOSamples)) - for it.Next() == chunkenc.ValFloat { - ts, v := it.At() - actOOOSamples = append(actOOOSamples, sample{t: ts, f: v}) - } + it := chks[0].chunk.Iterator(nil) + actOOOSamples, err := storage.ExpandSamples(it, nil) + require.NoError(t, err) // OOO chunk will be sorted. Hence sort the expected samples. sort.Slice(expOOOSamples, func(i, j int) bool { - return expOOOSamples[i].t < expOOOSamples[j].t + return expOOOSamples[i].T() < expOOOSamples[j].T() }) - require.Equal(t, expOOOSamples, actOOOSamples) + // Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers + // from being factored in to the sample comparison + // TODO(fionaliao): understand counter reset behaviour, might want to modify this later + requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true) require.NoError(t, h.Close()) } diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 7f2110fa65..b2556d62e9 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -17,9 +17,10 @@ import ( "fmt" "sort" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/oklog/ulid" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -74,24 +75,22 @@ func (o *OOOChunk) NumSamples() int { return len(o.samples) } -func (o *OOOChunk) ToXOR() (*chunkenc.XORChunk, error) { - x := chunkenc.NewXORChunk() - app, err := x.Appender() - if err != nil { - return nil, err - } - for _, s := range o.samples { - app.Append(s.t, s.f) - } - return x, nil -} - -func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, error) { - x := chunkenc.NewXORChunk() - app, err := x.Appender() - if err != nil { - return nil, err +// ToEncodedChunks returns chunks with the samples in the OOOChunk. +// +//nolint:revive // unexported-return. +func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) { + if len(o.samples) == 0 { + return nil, nil } + // The most common case is that there will be a single chunk, with the same type of samples in it - this is always true for float samples. + chks = make([]memChunk, 0, 1) + var ( + cmint int64 + cmaxt int64 + chunk chunkenc.Chunk + app chunkenc.Appender + ) + prevEncoding := chunkenc.EncNone // Yes we could call the chunk for this, but this is more efficient. for _, s := range o.samples { if s.t < mint { continue @@ -99,9 +98,77 @@ func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, if s.t > maxt { break } - app.Append(s.t, s.f) + encoding := chunkenc.EncXOR + if s.h != nil { + encoding = chunkenc.EncHistogram + } else if s.fh != nil { + encoding = chunkenc.EncFloatHistogram + } + + // prevApp is the appender for the previous sample. + prevApp := app + + if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram + if prevEncoding != chunkenc.EncNone { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + cmint = s.t + switch encoding { + case chunkenc.EncXOR: + chunk = chunkenc.NewXORChunk() + case chunkenc.EncHistogram: + chunk = chunkenc.NewHistogramChunk() + case chunkenc.EncFloatHistogram: + chunk = chunkenc.NewFloatHistogramChunk() + default: + chunk = chunkenc.NewXORChunk() + } + app, err = chunk.Appender() + if err != nil { + return + } + } + switch encoding { + case chunkenc.EncXOR: + app.Append(s.t, s.f) + case chunkenc.EncHistogram: + // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. + prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) + var ( + newChunk chunkenc.Chunk + recoded bool + ) + newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false) + if newChunk != nil { // A new chunk was allocated. + if !recoded { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + chunk = newChunk + cmint = s.t + } + case chunkenc.EncFloatHistogram: + // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. + prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender) + var ( + newChunk chunkenc.Chunk + recoded bool + ) + newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false) + if newChunk != nil { // A new chunk was allocated. + if !recoded { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + chunk = newChunk + cmint = s.t + } + } + cmaxt = s.t + prevEncoding = encoding } - return x, nil + if prevEncoding != chunkenc.EncNone { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + return chks, nil } var _ BlockReader = &OOORangeHead{} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 4e8329c99b..a35276af50 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -108,11 +108,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra c := s.ooo.oooHeadChunk if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) - var xor chunkenc.Chunk if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. - xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail. + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) + if err != nil { + handleChunkWriteError(err) + return nil + } + for _, chk := range chks { + addChunk(c.minTime, c.maxTime, ref, chk.chunk) + } + } else { + var emptyChunk chunkenc.Chunk + addChunk(c.minTime, c.maxTime, ref, emptyChunk) } - addChunk(c.minTime, c.maxTime, ref, xor) } } for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { @@ -341,14 +349,20 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, continue } - mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) - if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 { + var lastMmapRef chunks.ChunkDiskMapperRef + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) + if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. - mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref + mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref} } - seq, off := mmapRef.Unpack() + if len(mmapRefs) == 0 { + lastMmapRef = 0 + } else { + lastMmapRef = mmapRefs[len(mmapRefs)-1] + } + seq, off := lastMmapRef.Unpack() if seq > lastSeq || (seq == lastSeq && off > lastOff) { - ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off + ch.lastMmapRef, lastSeq, lastOff = lastMmapRef, seq, off } if len(ms.ooo.oooMmappedChunks) > 0 { ch.postings = append(ch.postings, seriesRef)